Coverage for src/debputy/plugin/api/impl.py: 60%

910 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-14 16:42 +0000

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.resources 

6import importlib.util 

7import inspect 

8import itertools 

9import json 

10import os 

11import re 

12import subprocess 

13import sys 

14from abc import ABC 

15from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container 

16from importlib.resources.abc import Traversable 

17from io import IOBase 

18from json import JSONDecodeError 

19from pathlib import Path 

20from types import NoneType 

21from typing import ( 

22 IO, 

23 AbstractSet, 

24 cast, 

25 Any, 

26 Literal, 

27 TYPE_CHECKING, 

28 is_typeddict, 

29 AnyStr, 

30 overload, 

31) 

32 

33import debputy 

34from debputy.exceptions import ( 

35 DebputySubstitutionError, 

36 PluginConflictError, 

37 PluginMetadataError, 

38 PluginBaseError, 

39 PluginInitializationError, 

40 PluginAPIViolationError, 

41 PluginNotFoundError, 

42 PluginIncorrectRegistrationError, 

43) 

44from debputy.maintscript_snippet import ( 

45 STD_CONTROL_SCRIPTS, 

46 MaintscriptSnippetContainer, 

47 MaintscriptSnippet, 

48) 

49from debputy.manifest_parser.exceptions import ManifestParseException 

50from debputy.manifest_parser.parser_data import ParserContextData 

51from debputy.manifest_parser.tagging_types import TypeMapping 

52from debputy.manifest_parser.util import AttributePath 

53from debputy.manifest_parser.util import resolve_package_type_selectors 

54from debputy.plugin.api.doc_parsing import ( 

55 DEBPUTY_DOC_REFERENCE_DATA_PARSER, 

56 parser_type_name, 

57 DebputyParsedDoc, 

58) 

59from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

60from debputy.plugin.api.impl_types import ( 

61 DebputyPluginMetadata, 

62 PackagerProvidedFileClassSpec, 

63 MetadataOrMaintscriptDetector, 

64 PluginProvidedTrigger, 

65 TTP, 

66 DIPHandler, 

67 PF, 

68 SF, 

69 DIPKWHandler, 

70 PluginProvidedManifestVariable, 

71 PluginProvidedPackageProcessor, 

72 PluginProvidedDiscardRule, 

73 AutomaticDiscardRuleExample, 

74 PPFFormatParam, 

75 ServiceManagerDetails, 

76 KnownPackagingFileInfo, 

77 PluginProvidedKnownPackagingFile, 

78 DHCompatibilityBasedRule, 

79 PluginProvidedTypeMapping, 

80 PluginProvidedBuildSystemAutoDetection, 

81 BSR, 

82 TP, 

83) 

84from debputy.plugin.api.plugin_parser import ( 

85 PLUGIN_METADATA_PARSER, 

86 PluginJsonMetadata, 

87 PLUGIN_PPF_PARSER, 

88 PackagerProvidedFileJsonDescription, 

89 PLUGIN_MANIFEST_VARS_PARSER, 

90 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

91) 

92from debputy.plugin.api.spec import ( 

93 MaintscriptAccessor, 

94 Maintscript, 

95 DpkgTriggerType, 

96 BinaryCtrlAccessor, 

97 PackageProcessingContext, 

98 MetadataAutoDetector, 

99 PluginInitializationEntryPoint, 

100 DebputyPluginInitializer, 

101 PackageTypeSelector, 

102 FlushableSubstvars, 

103 ParserDocumentation, 

104 PackageProcessor, 

105 VirtualPath, 

106 ServiceIntegrator, 

107 ServiceDetector, 

108 ServiceRegistry, 

109 ServiceDefinition, 

110 DSD, 

111 ServiceUpgradeRule, 

112 PackagerProvidedFileReferenceDocumentation, 

113 packager_provided_file_reference_documentation, 

114 TypeMappingDocumentation, 

115 DebputyIntegrationMode, 

116 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

117 BuildSystemManifestRuleMetadata, 

118 INTEGRATION_MODE_FULL, 

119 only_integrations, 

120 DebputyPluginDefinition, 

121) 

122from debputy.plugin.api.std_docs import _STD_ATTR_DOCS 

123from debputy.plugin.plugin_state import ( 

124 run_in_context_of_plugin, 

125 run_in_context_of_plugin_wrap_errors, 

126 wrap_plugin_code, 

127 register_manifest_type_value_in_context, 

128) 

129from debputy.plugins.debputy.to_be_api_types import ( 

130 BuildRuleParsedFormat, 

131 BSPF, 

132 debputy_build_system, 

133) 

134from debputy.substitution import ( 

135 Substitution, 

136 VariableNameState, 

137 SUBST_VAR_RE, 

138 VariableContext, 

139) 

140from debputy.util import ( 

141 _normalize_path, 

142 POSTINST_DEFAULT_CONDITION, 

143 _error, 

144 print_command, 

145 _warn, 

146 _debug_log, 

147) 

148from debputy.version import debputy_doc_root_dir 

149from debputy.yaml import MANIFEST_YAML 

150 

151if TYPE_CHECKING: 

152 from debputy.highlevel_manifest import HighLevelManifest 

153 

154PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

155PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__) 

156 

157 

158def _validate_known_packaging_file_dh_compat_rules( 

159 dh_compat_rules: list[DHCompatibilityBasedRule] | None, 

160) -> None: 

161 max_compat = None 

162 if not dh_compat_rules: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 return 

164 dh_compat_rule: DHCompatibilityBasedRule 

165 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

166 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

167 compat = dh_compat_rule.get("starting_with_compat_level") 

168 

169 remaining = dh_compat_rule.keys() - { 

170 "after_debhelper_version", 

171 "starting_with_compat_level", 

172 } 

173 if not remaining: 

174 raise ValueError( 

175 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?" 

176 ) 

177 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

178 raise ValueError( 

179 f"The dh compat-rule at index {idx} is not the last and is missing either" 

180 " before-debhelper-version or before-compat-level" 

181 ) 

182 if compat is not None and compat < 0: 

183 raise ValueError( 

184 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

185 f" for something that appeared when migrating from {compat} to {compat + 1}." 

186 ) 

187 

188 if max_compat is None: 

189 max_compat = compat 

190 elif compat is not None: 

191 if compat >= max_compat: 

192 raise ValueError( 

193 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

194 ) 

195 max_compat = compat 

196 

197 install_pattern = dh_compat_rule.get("install_pattern") 

198 if ( 

199 install_pattern is not None 

200 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

201 ): 

202 raise ValueError( 

203 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

204 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

205 ) 

206 

207 

208class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

209 __slots__ = ( 

210 "_plugin_metadata", 

211 "_feature_set", 

212 "_plugin_detector_ids", 

213 "_substitution", 

214 "_unloaders", 

215 "_is_doc_cache_resolved", 

216 "_doc_cache", 

217 "_registered_manifest_types", 

218 "_load_started", 

219 ) 

220 

221 def __init__( 

222 self, 

223 plugin_metadata: DebputyPluginMetadata, 

224 feature_set: PluginProvidedFeatureSet, 

225 substitution: Substitution, 

226 ) -> None: 

227 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

228 self._feature_set = feature_set 

229 self._plugin_detector_ids: set[str] = set() 

230 self._substitution = substitution 

231 self._unloaders: list[Callable[[], None]] = [] 

232 self._is_doc_cache_resolved: bool = False 

233 self._doc_cache: DebputyParsedDoc | None = None 

234 self._registered_manifest_types: dict[type[Any], DebputyPluginMetadata] = {} 

235 self._load_started = False 

236 

237 @property 

238 def plugin_metadata(self) -> DebputyPluginMetadata: 

239 return self._plugin_metadata 

240 

241 def unload_plugin(self) -> None: 

242 if self._load_started: 

243 for unloader in self._unloaders: 

244 unloader() 

245 del self._feature_set.plugin_data[self._plugin_name] 

246 

247 def load_plugin(self) -> None: 

248 metadata = self._plugin_metadata 

249 if metadata.plugin_name in self._feature_set.plugin_data: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 raise PluginConflictError( 

251 f'The plugin "{metadata.plugin_name}" has already been loaded!?', 

252 metadata, 

253 metadata, 

254 ) 

255 assert ( 

256 metadata.api_compat_version == 1 

257 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

258 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

259 self._load_started = True 

260 assert not metadata.is_initialized 

261 try: 

262 metadata.initialize_plugin(self) 

263 except Exception as e: 

264 initializer = metadata.plugin_initializer 

265 if ( 265 ↛ 270line 265 didn't jump to line 270 because the condition on line 265 was never true

266 isinstance(e, TypeError) 

267 and initializer is not None 

268 and not callable(initializer) 

269 ): 

270 raise PluginMetadataError( 

271 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

272 f" callable (callable returns False). The specified entry point identifies" 

273 f' itself as "{initializer.__qualname__}".' 

274 ) from e 

275 elif isinstance(e, PluginBaseError): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true

276 raise 

277 raise PluginInitializationError( 

278 f"Exception while attempting to load plugin {metadata.plugin_name}" 

279 ) from e 

280 

281 def _resolve_docs(self) -> DebputyParsedDoc | None: 

282 doc_cache = self._doc_cache 

283 if doc_cache is not None: 

284 return doc_cache 

285 

286 plugin_doc_path = self._plugin_metadata.plugin_doc_path 

287 if plugin_doc_path is None or self._is_doc_cache_resolved: 

288 self._is_doc_cache_resolved = True 

289 return None 

290 try: 

291 with plugin_doc_path.open("r", encoding="utf-8") as fd: 

292 raw = MANIFEST_YAML.load(fd) 

293 except FileNotFoundError: 

294 _debug_log( 

295 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}" 

296 ) 

297 self._is_doc_cache_resolved = True 

298 return None 

299 attr_path = AttributePath.root_path(plugin_doc_path) 

300 try: 

301 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path) 

302 except ManifestParseException as e: 

303 raise ValueError( 

304 f"Could not parse documentation in {plugin_doc_path}: {e.message}" 

305 ) from e 

306 try: 

307 res = DebputyParsedDoc.from_ref_data(ref) 

308 except ValueError as e: 

309 raise ValueError( 

310 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}" 

311 ) from e 

312 

313 self._doc_cache = res 

314 self._is_doc_cache_resolved = True 

315 return res 

316 

317 def _pluggable_manifest_docs_for( 

318 self, 

319 rule_type: TTP | str, 

320 rule_name: str | list[str], 

321 *, 

322 inline_reference_documentation: ParserDocumentation | None = None, 

323 ) -> ParserDocumentation | None: 

324 ref_data = self._resolve_docs() 

325 if ref_data is not None: 

326 primary_rule_name = ( 

327 rule_name if isinstance(rule_name, str) else rule_name[0] 

328 ) 

329 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}" 

330 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref) 

331 if resolved_docs is not None: 

332 if inline_reference_documentation is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise ValueError( 

334 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via" 

335 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so" 

336 f" there is only one doc reference" 

337 ) 

338 return resolved_docs 

339 return inline_reference_documentation 

340 

341 def packager_provided_file( 

342 self, 

343 stem: str, 

344 installed_path: str, 

345 *, 

346 default_mode: int = 0o0644, 

347 default_priority: int | None = None, 

348 allow_name_segment: bool = True, 

349 allow_architecture_segment: bool = False, 

350 post_formatting_rewrite: Callable[[str], str] | None = None, 

351 packageless_is_fallback_for_all_packages: bool = False, 

352 reservation_only: bool = False, 

353 format_callback: None | ( 

354 Callable[[str, PPFFormatParam, VirtualPath], str] 

355 ) = None, 

356 reference_documentation: None | ( 

357 PackagerProvidedFileReferenceDocumentation 

358 ) = None, 

359 ) -> None: 

360 packager_provided_files = self._feature_set.packager_provided_files 

361 existing = packager_provided_files.get(stem) 

362 

363 if format_callback is not None and self._plugin_name != "debputy": 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 raise ValueError( 

365 "Sorry; Using format_callback is a debputy-internal" 

366 f" API. Triggered by plugin {self._plugin_name}" 

367 ) 

368 

369 if installed_path.endswith("/"): 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise ValueError( 

371 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

372 f" Triggered by plugin {self._plugin_name}." 

373 ) 

374 

375 installed_path = _normalize_path(installed_path) 

376 

377 has_name_var = "{name}" in installed_path 

378 

379 if installed_path.startswith("./DEBIAN") or reservation_only: 

380 # Special-case, used for control files. 

381 if self._plugin_name != "debputy": 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 raise ValueError( 

383 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

384 f" API. Triggered by plugin {self._plugin_name}" 

385 ) 

386 elif not has_name_var and "{owning_package}" not in installed_path: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise ValueError( 

388 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

389 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

390 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

391 ) 

392 

393 if allow_name_segment and not has_name_var: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 raise ValueError( 

395 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

396 " variable. Otherwise, the name segment will not work properly. Triggered by" 

397 f" plugin {self._plugin_name}" 

398 ) 

399 

400 if ( 400 ↛ 405line 400 didn't jump to line 405 because the condition on line 400 was never true

401 default_priority is not None 

402 and "{priority}" not in installed_path 

403 and "{priority:02}" not in installed_path 

404 ): 

405 raise ValueError( 

406 'When default_priority is not None, the installed_path should have a "{priority}"' 

407 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

408 f" Triggered by plugin {self._plugin_name}" 

409 ) 

410 

411 if existing is not None: 

412 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 412 ↛ 419line 412 didn't jump to line 419 because the condition on line 412 was always true

413 message = ( 

414 f'The stem "{stem}" is registered twice for packager provided files.' 

415 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

416 f" by {self._plugin_name}" 

417 ) 

418 else: 

419 message = ( 

420 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

421 f' stem "{stem}" twice for packager provided files.' 

422 ) 

423 raise PluginConflictError( 

424 message, existing.debputy_plugin_metadata, self._plugin_metadata 

425 ) 

426 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

427 self._plugin_metadata, 

428 stem, 

429 installed_path, 

430 default_mode=default_mode, 

431 default_priority=default_priority, 

432 allow_name_segment=allow_name_segment, 

433 allow_architecture_segment=allow_architecture_segment, 

434 post_formatting_rewrite=post_formatting_rewrite, 

435 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

436 reservation_only=reservation_only, 

437 formatting_callback=format_callback, 

438 reference_documentation=reference_documentation, 

439 ) 

440 

441 def _unload() -> None: 

442 del packager_provided_files[stem] 

443 

444 self._unloaders.append(_unload) 

445 

446 def metadata_or_maintscript_detector( 

447 self, 

448 auto_detector_id: str, 

449 auto_detector: MetadataAutoDetector, 

450 *, 

451 package_type: PackageTypeSelector = "deb", 

452 ) -> None: 

453 if auto_detector_id in self._plugin_detector_ids: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true

454 raise ValueError( 

455 f"The plugin {self._plugin_name} tried to register" 

456 f' "{auto_detector_id}" twice' 

457 ) 

458 self._plugin_detector_ids.add(auto_detector_id) 

459 all_detectors = self._feature_set.metadata_maintscript_detectors 

460 if self._plugin_name not in all_detectors: 

461 all_detectors[self._plugin_name] = [] 

462 package_types = resolve_package_type_selectors(package_type) 

463 all_detectors[self._plugin_name].append( 

464 MetadataOrMaintscriptDetector( 

465 detector_id=auto_detector_id, 

466 detector=wrap_plugin_code(self._plugin_name, auto_detector), 

467 plugin_metadata=self._plugin_metadata, 

468 applies_to_package_types=package_types, 

469 enabled=True, 

470 ) 

471 ) 

472 

473 def _unload() -> None: 

474 if self._plugin_name in all_detectors: 

475 del all_detectors[self._plugin_name] 

476 

477 self._unloaders.append(_unload) 

478 

479 def document_builtin_variable( 

480 self, 

481 variable_name: str, 

482 variable_reference_documentation: str, 

483 *, 

484 is_context_specific: bool = False, 

485 is_for_special_case: bool = False, 

486 ) -> None: 

487 manifest_variables = self._feature_set.manifest_variables 

488 self._restricted_api() 

489 state = self._substitution.variable_state(variable_name) 

490 if state == VariableNameState.UNDEFINED: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 raise ValueError( 

492 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

493 f" but it is not known to be a variable" 

494 ) 

495 

496 assert variable_name not in manifest_variables 

497 

498 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

499 self._plugin_metadata, 

500 variable_name, 

501 None, 

502 is_context_specific_variable=is_context_specific, 

503 variable_reference_documentation=variable_reference_documentation, 

504 is_documentation_placeholder=True, 

505 is_for_special_case=is_for_special_case, 

506 ) 

507 

508 def _unload() -> None: 

509 del manifest_variables[variable_name] 

510 

511 self._unloaders.append(_unload) 

512 

513 def manifest_variable_provider( 

514 self, 

515 provider: Callable[[VariableContext], Mapping[str, str]], 

516 variables: Sequence[str] | Mapping[str, str | None], 

517 ) -> None: 

518 self._restricted_api() 

519 cached_provider = functools.lru_cache(None)(provider) 

520 permitted_variables = frozenset(variables) 

521 variables_iter: Iterable[tuple[str, str | None]] 

522 if not isinstance(variables, Mapping): 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 variables_iter = zip(variables, itertools.repeat(None)) 

524 else: 

525 variables_iter = variables.items() 

526 

527 checked_vars = False 

528 manifest_variables = self._feature_set.manifest_variables 

529 plugin_name = self._plugin_name 

530 

531 def _value_resolver_generator( 

532 variable_name: str, 

533 ) -> Callable[[VariableContext], str]: 

534 def _value_resolver(variable_context: VariableContext) -> str: 

535 res = cached_provider(variable_context) 

536 nonlocal checked_vars 

537 if not checked_vars: 537 ↛ 548line 537 didn't jump to line 548 because the condition on line 537 was always true

538 if permitted_variables != res.keys(): 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true

539 expected = ", ".join(sorted(permitted_variables)) 

540 actual = ", ".join(sorted(res)) 

541 raise PluginAPIViolationError( 

542 f"The plugin {plugin_name} claimed to provide" 

543 f" the following variables {expected}," 

544 f" but when resolving the variables, the plugin provided" 

545 f" {actual}. These two lists should have been the same." 

546 ) 

547 checked_vars = False 

548 return res[variable_name] 

549 

550 return _value_resolver 

551 

552 for varname, vardoc in variables_iter: 

553 self._check_variable_name(varname) 

554 manifest_variables[varname] = PluginProvidedManifestVariable( 

555 self._plugin_metadata, 

556 varname, 

557 _value_resolver_generator(varname), 

558 is_context_specific_variable=False, 

559 variable_reference_documentation=vardoc, 

560 ) 

561 

562 def _unload() -> None: 

563 raise PluginInitializationError( 

564 "Cannot unload manifest_variable_provider (not implemented)" 

565 ) 

566 

567 self._unloaders.append(_unload) 

568 

569 def _check_variable_name(self, variable_name: str) -> None: 

570 manifest_variables = self._feature_set.manifest_variables 

571 existing = manifest_variables.get(variable_name) 

572 

573 if existing is not None: 

574 if existing.plugin_metadata.plugin_name == self._plugin_name: 574 ↛ 580line 574 didn't jump to line 580 because the condition on line 574 was always true

575 message = ( 

576 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

577 f' manifest variable "{variable_name}" twice.' 

578 ) 

579 else: 

580 message = ( 

581 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

582 f" both tried to provide the manifest variable {variable_name}" 

583 ) 

584 raise PluginConflictError( 

585 message, existing.plugin_metadata, self._plugin_metadata 

586 ) 

587 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

588 raise ValueError( 

589 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

590 f" which is not a valid variable name" 

591 ) 

592 

593 namespace = "" 

594 variable_basename = variable_name 

595 if ":" in variable_name: 

596 namespace, variable_basename = variable_name.rsplit(":", 1) 

597 assert namespace != "" 

598 assert variable_name != "" 

599 

600 if namespace != "" and namespace not in ("token", "path"): 

601 raise ValueError( 

602 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

603 f" which is in the reserved namespace {namespace}" 

604 ) 

605 

606 variable_name_upper = variable_name.upper() 

607 if ( 

608 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

609 or variable_basename.startswith("_") 

610 or variable_basename.upper().startswith("DEBPUTY") 

611 ) and self._plugin_name != "debputy": 

612 raise ValueError( 

613 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

614 f" which is a variable name reserved by debputy" 

615 ) 

616 

617 state = self._substitution.variable_state(variable_name) 

618 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

619 raise ValueError( 

620 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

621 f" which would shadow a built-in variable" 

622 ) 

623 

624 def package_processor( 

625 self, 

626 processor_id: str, 

627 processor: PackageProcessor, 

628 *, 

629 depends_on_processor: Iterable[str] = tuple(), 

630 package_type: PackageTypeSelector = "deb", 

631 ) -> None: 

632 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"}) 

633 package_processors = self._feature_set.all_package_processors 

634 dependencies = set() 

635 processor_key = (self._plugin_name, processor_id) 

636 

637 if processor_key in package_processors: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 raise PluginConflictError( 

639 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

640 self._plugin_metadata, 

641 self._plugin_metadata, 

642 ) 

643 

644 for depends_ref in depends_on_processor: 

645 if isinstance(depends_ref, str): 645 ↛ 659line 645 didn't jump to line 659 because the condition on line 645 was always true

646 if (self._plugin_name, depends_ref) in package_processors: 646 ↛ 648line 646 didn't jump to line 648 because the condition on line 646 was always true

647 depends_key = (self._plugin_name, depends_ref) 

648 elif ("debputy", depends_ref) in package_processors: 

649 depends_key = ("debputy", depends_ref) 

650 else: 

651 raise ValueError( 

652 f'Could not resolve dependency "{depends_ref}" for' 

653 f' "{processor_id}". It was not provided by the plugin itself' 

654 f" ({self._plugin_name}) nor debputy." 

655 ) 

656 else: 

657 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

658 # via the direct dependencies. 

659 assert False 

660 

661 existing_processor = package_processors.get(depends_key) 

662 if existing_processor is None: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was never true

663 # We currently require the processor to be declared already. If this ever changes, 

664 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

665 dplugin_name, dprocessor_name = depends_key 

666 available_processors = ", ".join( 

667 n for p, n in package_processors.keys() if p == dplugin_name 

668 ) 

669 raise ValueError( 

670 f"The plugin {dplugin_name} does not provide a processor called" 

671 f" {dprocessor_name}. Available processors for that plugin are:" 

672 f" {available_processors}" 

673 ) 

674 dependencies.add(depends_key) 

675 

676 package_processors[processor_key] = PluginProvidedPackageProcessor( 

677 processor_id, 

678 resolve_package_type_selectors(package_type), 

679 wrap_plugin_code(self._plugin_name, processor), 

680 frozenset(dependencies), 

681 self._plugin_metadata, 

682 ) 

683 

684 def _unload() -> None: 

685 del package_processors[processor_key] 

686 

687 self._unloaders.append(_unload) 

688 

689 def automatic_discard_rule( 

690 self, 

691 name: str, 

692 should_discard: Callable[[VirtualPath], bool], 

693 *, 

694 rule_reference_documentation: str | None = None, 

695 examples: ( 

696 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample] 

697 ) = tuple(), 

698 ) -> None: 

699 """Register an automatic discard rule 

700 

701 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

702 If any discard rule concludes that a path should not be installed, then the path is not installed. 

703 In the case where the discard path is a: 

704 

705 * directory: Then the entire directory is excluded along with anything beneath it. 

706 * symlink: Then the symlink itself (but not its target) is excluded. 

707 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

708 

709 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

710 

711 Automatic discard rules should be written with the assumption that directories will be tested 

712 before their content *when it is relevant* for the discard rule to examine whether the directory 

713 can be excluded. 

714 

715 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

716 without any globs. As example: 

717 

718 installations: 

719 - install: 

720 sources: 

721 - usr/lib/libfoo.la # <-- This path is always installed 

722 # (Discard rules are never asked in this case) 

723 # 

724 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

725 # Though, they will not examine `libfoo.la` as it has already been installed 

726 # 

727 # Note: usr/lib itself is never tested in this case (it is assumed to be 

728 # explicitly requested). But any subdir of usr/lib will be examined. 

729 

730 When an automatic discard rule is evaluated, it can see the source path currently being considered 

731 for installation. While it can look at "surrounding" context (like parent directory), it will not 

732 know whether those paths are to be installed or will be installed. 

733 

734 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

735 metacharacters and spaces. 

736 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

737 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

738 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

739 A source path will either be from the root of the source tree or the root of a search directory such as 

740 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

741 evaluated. 

742 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

743 looks up this automatic discard rule. 

744 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

745 generate the examples. 

746 

747 """ 

748 self._restricted_api() 

749 auto_discard_rules = self._feature_set.auto_discard_rules 

750 existing = auto_discard_rules.get(name) 

751 if existing is not None: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 if existing.plugin_metadata.plugin_name == self._plugin_name: 

753 message = ( 

754 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

755 f' automatic discard rule "{name}" twice.' 

756 ) 

757 else: 

758 message = ( 

759 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

760 f" both tried to provide the automatic discard rule {name}" 

761 ) 

762 raise PluginConflictError( 

763 message, existing.plugin_metadata, self._plugin_metadata 

764 ) 

765 examples = ( 

766 (examples,) 

767 if isinstance(examples, AutomaticDiscardRuleExample) 

768 else tuple(examples) 

769 ) 

770 auto_discard_rules[name] = PluginProvidedDiscardRule( 

771 name, 

772 self._plugin_metadata, 

773 should_discard, 

774 rule_reference_documentation, 

775 examples, 

776 ) 

777 

778 def _unload() -> None: 

779 del auto_discard_rules[name] 

780 

781 self._unloaders.append(_unload) 

782 

783 def service_provider( 

784 self, 

785 service_manager: str, 

786 detector: ServiceDetector, 

787 integrator: ServiceIntegrator, 

788 ) -> None: 

789 self._restricted_api() 

790 service_managers = self._feature_set.service_managers 

791 existing = service_managers.get(service_manager) 

792 if existing is not None: 792 ↛ 793line 792 didn't jump to line 793 because the condition on line 792 was never true

793 if existing.plugin_metadata.plugin_name == self._plugin_name: 

794 message = ( 

795 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

796 f' service manager "{service_manager}" twice.' 

797 ) 

798 else: 

799 message = ( 

800 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

801 f' both tried to provide the service manager "{service_manager}"' 

802 ) 

803 raise PluginConflictError( 

804 message, existing.plugin_metadata, self._plugin_metadata 

805 ) 

806 service_managers[service_manager] = ServiceManagerDetails( 

807 service_manager, 

808 wrap_plugin_code(self._plugin_name, detector), 

809 wrap_plugin_code(self._plugin_name, integrator), 

810 self._plugin_metadata, 

811 ) 

812 

813 def _unload() -> None: 

814 del service_managers[service_manager] 

815 

816 self._unloaders.append(_unload) 

817 

818 def manifest_variable( 

819 self, 

820 variable_name: str, 

821 value: str, 

822 *, 

823 variable_reference_documentation: str | None = None, 

824 ) -> None: 

825 self._check_variable_name(variable_name) 

826 manifest_variables = self._feature_set.manifest_variables 

827 try: 

828 resolved_value = self._substitution.substitute( 

829 value, "Plugin initialization" 

830 ) 

831 depends_on_variable = resolved_value != value 

832 except DebputySubstitutionError: 

833 depends_on_variable = True 

834 if depends_on_variable: 

835 raise ValueError( 

836 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

837 f" This value depends on another variable, which is not supported. This restriction may be" 

838 f" lifted in the future." 

839 ) 

840 

841 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

842 self._plugin_metadata, 

843 variable_name, 

844 value, 

845 is_context_specific_variable=False, 

846 variable_reference_documentation=variable_reference_documentation, 

847 ) 

848 

849 def _unload() -> None: 

850 # We need to check it was never resolved 

851 raise PluginInitializationError( 

852 "Cannot unload manifest_variable (not implemented)" 

853 ) 

854 

855 self._unloaders.append(_unload) 

856 

857 @property 

858 def _plugin_name(self) -> str: 

859 return self._plugin_metadata.plugin_name 

860 

861 def provide_manifest_keyword( 

862 self, 

863 rule_type: TTP, 

864 rule_name: str | list[str], 

865 handler: DIPKWHandler, 

866 *, 

867 inline_reference_documentation: ParserDocumentation | None = None, 

868 ) -> None: 

869 self._restricted_api() 

870 parser_generator = self._feature_set.manifest_parser_generator 

871 if rule_type not in parser_generator.dispatchable_table_parsers: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 types = ", ".join( 

873 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

874 ) 

875 raise ValueError( 

876 f"The rule_type was not a supported type. It must be one of {types}" 

877 ) 

878 

879 inline_reference_documentation = self._pluggable_manifest_docs_for( 

880 rule_type, 

881 rule_name, 

882 inline_reference_documentation=inline_reference_documentation, 

883 ) 

884 

885 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

886 dispatching_parser.register_keyword( 

887 rule_name, 

888 wrap_plugin_code(self._plugin_name, handler), 

889 self._plugin_metadata, 

890 inline_reference_documentation=inline_reference_documentation, 

891 ) 

892 

893 def _unload() -> None: 

894 raise PluginInitializationError( 

895 "Cannot unload provide_manifest_keyword (not implemented)" 

896 ) 

897 

898 self._unloaders.append(_unload) 

899 

900 def pluggable_object_parser( 

901 self, 

902 rule_type: str, 

903 rule_name: str, 

904 *, 

905 object_parser_key: str | None = None, 

906 on_end_parse_step: None | ( 

907 Callable[ 

908 [str, Mapping[str, Any] | None, AttributePath, ParserContextData], 

909 None, 

910 ] 

911 ) = None, 

912 nested_in_package_context: bool = False, 

913 ) -> None: 

914 self._restricted_api() 

915 if object_parser_key is None: 915 ↛ 916line 915 didn't jump to line 916 because the condition on line 915 was never true

916 object_parser_key = rule_name 

917 

918 parser_generator = self._feature_set.manifest_parser_generator 

919 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

920 if rule_type not in dispatchable_object_parsers: 920 ↛ 921line 920 didn't jump to line 921 because the condition on line 920 was never true

921 types = ", ".join(sorted(dispatchable_object_parsers)) 

922 raise ValueError( 

923 f"The rule_type was not a supported type. It must be one of {types}" 

924 ) 

925 if object_parser_key not in dispatchable_object_parsers: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 types = ", ".join(sorted(dispatchable_object_parsers)) 

927 raise ValueError( 

928 f"The object_parser_key was not a supported type. It must be one of {types}" 

929 ) 

930 parent_dispatcher = dispatchable_object_parsers[rule_type] 

931 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

932 

933 if on_end_parse_step is not None: 933 ↛ 936line 933 didn't jump to line 936 because the condition on line 933 was always true

934 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step) 

935 

936 parent_dispatcher.register_child_parser( 

937 rule_name, 

938 child_dispatcher, 

939 self._plugin_metadata, 

940 on_end_parse_step=on_end_parse_step, 

941 nested_in_package_context=nested_in_package_context, 

942 ) 

943 

944 def _unload() -> None: 

945 raise PluginInitializationError( 

946 "Cannot unload pluggable_object_parser (not implemented)" 

947 ) 

948 

949 self._unloaders.append(_unload) 

950 

951 def pluggable_manifest_rule( 

952 self, 

953 rule_type: TTP | str, 

954 rule_name: str | Sequence[str], 

955 parsed_format: type[PF], 

956 handler: DIPHandler, 

957 *, 

958 source_format: SF | None = None, 

959 inline_reference_documentation: ParserDocumentation | None = None, 

960 expected_debputy_integration_mode: None | ( 

961 Container[DebputyIntegrationMode] 

962 ) = None, 

963 apply_standard_attribute_documentation: bool = False, 

964 register_value: bool = True, 

965 ) -> None: 

966 # When changing this, consider which types will be unrestricted 

967 self._restricted_api() 

968 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 968 ↛ 969line 968 didn't jump to line 969 because the condition on line 968 was never true

969 _error( 

970 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to" 

971 f" its use of apply_standard_attribute_documentation" 

972 ) 

973 feature_set = self._feature_set 

974 parser_generator = feature_set.manifest_parser_generator 

975 if isinstance(rule_type, str): 

976 if rule_type not in parser_generator.dispatchable_object_parsers: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true

977 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

978 raise ValueError( 

979 f"The rule_type was not a supported type. It must be one of {types}" 

980 ) 

981 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

982 signature = inspect.signature(handler) 

983 if ( 983 ↛ 987line 983 didn't jump to line 987 because the condition on line 983 was never true

984 signature.return_annotation is signature.empty 

985 or signature.return_annotation == NoneType 

986 ): 

987 raise ValueError( 

988 "The handler must have a return type (that is not None)" 

989 ) 

990 register_as_type = signature.return_annotation 

991 else: 

992 # Dispatchable types cannot be resolved 

993 register_as_type = None 

994 if rule_type not in parser_generator.dispatchable_table_parsers: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true

995 types = ", ".join( 

996 sorted( 

997 x.__name__ for x in parser_generator.dispatchable_table_parsers 

998 ) 

999 ) 

1000 raise ValueError( 

1001 f"The rule_type was not a supported type. It must be one of {types}" 

1002 ) 

1003 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

1004 

1005 if register_as_type is not None and not register_value: 

1006 register_as_type = None 

1007 

1008 if register_as_type is not None: 

1009 existing_registration = self._registered_manifest_types.get( 

1010 register_as_type 

1011 ) 

1012 if existing_registration is not None: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true

1013 raise ValueError( 

1014 f"Cannot register rule {rule_name!r} for plugin {self._plugin_name}. The plugin {existing_registration.plugin_name} already registered a manifest rule with type {register_as_type!r}" 

1015 ) 

1016 self._registered_manifest_types[register_as_type] = self._plugin_metadata 

1017 

1018 inline_reference_documentation = self._pluggable_manifest_docs_for( 

1019 rule_type, 

1020 rule_name, 

1021 inline_reference_documentation=inline_reference_documentation, 

1022 ) 

1023 

1024 if apply_standard_attribute_documentation: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true

1025 docs = _STD_ATTR_DOCS 

1026 else: 

1027 docs = None 

1028 

1029 parser = feature_set.manifest_parser_generator.generate_parser( 

1030 parsed_format, 

1031 source_content=source_format, 

1032 inline_reference_documentation=inline_reference_documentation, 

1033 expected_debputy_integration_mode=expected_debputy_integration_mode, 

1034 automatic_docs=docs, 

1035 ) 

1036 

1037 def _registering_handler( 

1038 name: str, 

1039 parsed_data: PF, 

1040 attribute_path: AttributePath, 

1041 parser_context: ParserContextData, 

1042 ) -> TP: 

1043 value = handler(name, parsed_data, attribute_path, parser_context) 

1044 if register_as_type is not None: 

1045 register_manifest_type_value_in_context(register_as_type, value) 

1046 return value 

1047 

1048 dispatching_parser.register_parser( 

1049 rule_name, 

1050 parser, 

1051 wrap_plugin_code(self._plugin_name, _registering_handler), 

1052 self._plugin_metadata, 

1053 ) 

1054 

1055 def _unload() -> None: 

1056 raise PluginInitializationError( 

1057 "Cannot unload pluggable_manifest_rule (not implemented)" 

1058 ) 

1059 

1060 self._unloaders.append(_unload) 

1061 

1062 def register_build_system( 

1063 self, 

1064 build_system_definition: type[BSPF], 

1065 ) -> None: 

1066 self._restricted_api() 

1067 if not is_typeddict(build_system_definition): 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true

1068 raise PluginInitializationError( 

1069 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__}," 

1070 f" but got {build_system_definition.__name__} instead" 

1071 ) 

1072 metadata = getattr( 

1073 build_system_definition, 

1074 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

1075 None, 

1076 ) 

1077 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true

1078 raise PluginIncorrectRegistrationError( 

1079 f"The {build_system_definition.__qualname__} type should have been annotated with" 

1080 f" @{debputy_build_system.__name__}." 

1081 ) 

1082 assert len(metadata.manifest_keywords) == 1 

1083 build_system_impl = metadata.build_system_impl 

1084 assert build_system_impl is not None 

1085 manifest_keyword = next(iter(metadata.manifest_keywords)) 

1086 self.pluggable_manifest_rule( 

1087 metadata.dispatched_type, 

1088 metadata.manifest_keywords, 

1089 build_system_definition, 

1090 # pluggable_manifest_rule does the wrapping 

1091 metadata.unwrapped_constructor, 

1092 source_format=metadata.source_format, 

1093 inline_reference_documentation=metadata.online_reference_documentation, 

1094 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL), 

1095 ) 

1096 self._auto_detectable_build_system( 

1097 manifest_keyword, 

1098 build_system_impl, 

1099 constructor=wrap_plugin_code( 

1100 self._plugin_name, 

1101 build_system_impl, 

1102 ), 

1103 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems, 

1104 ) 

1105 

1106 def _auto_detectable_build_system( 

1107 self, 

1108 manifest_keyword: str, 

1109 rule_type: type[BSR], 

1110 *, 

1111 shadowing_build_systems_when_active: frozenset[str] = frozenset(), 

1112 constructor: None | ( 

1113 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR] 

1114 ) = None, 

1115 ) -> None: 

1116 self._restricted_api() 

1117 feature_set = self._feature_set 

1118 existing = feature_set.auto_detectable_build_systems.get(rule_type) 

1119 if existing is not None: 1119 ↛ 1120line 1119 didn't jump to line 1120 because the condition on line 1119 was never true

1120 bs_name = rule_type.__class__.__name__ 

1121 if existing.plugin_metadata.plugin_name == self._plugin_name: 

1122 message = ( 

1123 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1124 f' auto-detection of the build system "{bs_name}" twice.' 

1125 ) 

1126 else: 

1127 message = ( 

1128 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

1129 f' both tried to provide auto-detection of the build system "{bs_name}"' 

1130 ) 

1131 raise PluginConflictError( 

1132 message, existing.plugin_metadata, self._plugin_metadata 

1133 ) 

1134 

1135 if constructor is None: 1135 ↛ 1137line 1135 didn't jump to line 1137 because the condition on line 1135 was never true

1136 

1137 def impl( 

1138 attributes: BuildRuleParsedFormat, 

1139 attribute_path: AttributePath, 

1140 manifest: "HighLevelManifest", 

1141 ) -> BSR: 

1142 return rule_type(attributes, attribute_path, manifest) 

1143 

1144 else: 

1145 impl = constructor 

1146 

1147 feature_set.auto_detectable_build_systems[rule_type] = ( 

1148 PluginProvidedBuildSystemAutoDetection( 

1149 manifest_keyword, 

1150 rule_type, 

1151 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system), 

1152 impl, 

1153 shadowing_build_systems_when_active, 

1154 self._plugin_metadata, 

1155 ) 

1156 ) 

1157 

1158 def _unload() -> None: 

1159 try: 

1160 del feature_set.auto_detectable_build_systems[rule_type] 

1161 except KeyError: 

1162 pass 

1163 

1164 self._unloaders.append(_unload) 

1165 

1166 def known_packaging_files( 

1167 self, 

1168 packaging_file_details: KnownPackagingFileInfo, 

1169 ) -> None: 

1170 known_packaging_files = self._feature_set.known_packaging_files 

1171 detection_method = packaging_file_details.get( 

1172 "detection_method", cast("Literal['path']", "path") 

1173 ) 

1174 path = packaging_file_details.get("path") 

1175 dhpkgfile = packaging_file_details.get("pkgfile") 

1176 

1177 packaging_file_details = packaging_file_details.copy() 

1178 

1179 if detection_method == "path": 1179 ↛ 1195line 1179 didn't jump to line 1195 because the condition on line 1179 was always true

1180 if dhpkgfile is not None: 1180 ↛ 1181line 1180 didn't jump to line 1181 because the condition on line 1180 was never true

1181 raise ValueError( 

1182 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

1183 ) 

1184 if path is None: 1184 ↛ 1185line 1184 didn't jump to line 1185 because the condition on line 1184 was never true

1185 raise ValueError( 

1186 'The "path" attribute must be present when detection-method is "path" (or omitted)' 

1187 ) 

1188 if path != _normalize_path(path, with_prefix=False): 1188 ↛ 1189line 1188 didn't jump to line 1189 because the condition on line 1188 was never true

1189 raise ValueError( 

1190 f"The path for known packaging files must be normalized. Please replace" 

1191 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

1192 ) 

1193 detection_value = path 

1194 else: 

1195 assert detection_method == "dh.pkgfile" 

1196 if path is not None: 

1197 raise ValueError( 

1198 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

1199 ) 

1200 if dhpkgfile is None: 

1201 raise ValueError( 

1202 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"' 

1203 ) 

1204 if "/" in dhpkgfile: 

1205 raise ValueError( 

1206 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

1207 ) 

1208 detection_value = dhpkgfile 

1209 key = f"{detection_method}::{detection_value}" 

1210 existing = known_packaging_files.get(key) 

1211 if existing is not None: 1211 ↛ 1212line 1211 didn't jump to line 1212 because the condition on line 1211 was never true

1212 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1213 message = ( 

1214 f'The key "{key}" is registered twice for known packaging files.' 

1215 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1216 ) 

1217 else: 

1218 message = ( 

1219 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1220 f' key "{key}" twice for known packaging files.' 

1221 ) 

1222 raise PluginConflictError( 

1223 message, existing.plugin_metadata, self._plugin_metadata 

1224 ) 

1225 _validate_known_packaging_file_dh_compat_rules( 

1226 packaging_file_details.get("dh_compat_rules") 

1227 ) 

1228 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

1229 packaging_file_details, 

1230 detection_method, 

1231 detection_value, 

1232 self._plugin_metadata, 

1233 ) 

1234 

1235 def _unload() -> None: 

1236 del known_packaging_files[key] 

1237 

1238 self._unloaders.append(_unload) 

1239 

1240 def register_mapped_type( 

1241 self, 

1242 type_mapping: TypeMapping, 

1243 *, 

1244 reference_documentation: TypeMappingDocumentation | None = None, 

1245 ) -> None: 

1246 self._restricted_api() 

1247 target_type = type_mapping.target_type 

1248 mapped_types = self._feature_set.mapped_types 

1249 existing = mapped_types.get(target_type) 

1250 if existing is not None: 1250 ↛ 1251line 1250 didn't jump to line 1251 because the condition on line 1250 was never true

1251 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1252 message = ( 

1253 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

1254 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1255 ) 

1256 else: 

1257 message = ( 

1258 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1259 f' key "{target_type.__name__}" twice for known packaging files.' 

1260 ) 

1261 raise PluginConflictError( 

1262 message, existing.plugin_metadata, self._plugin_metadata 

1263 ) 

1264 parser_generator = self._feature_set.manifest_parser_generator 

1265 # TODO: Wrap the mapper in the plugin context 

1266 mapped_types[target_type] = PluginProvidedTypeMapping( 

1267 type_mapping, reference_documentation, self._plugin_metadata 

1268 ) 

1269 parser_generator.register_mapped_type(type_mapping) 

1270 

1271 def _restricted_api( 

1272 self, 

1273 *, 

1274 allowed_plugins: set[str] | frozenset[str] = frozenset(), 

1275 ) -> None: 

1276 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1276 ↛ 1277line 1276 didn't jump to line 1277 because the condition on line 1276 was never true

1277 raise PluginAPIViolationError( 

1278 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

1279 " If you are the maintainer of this plugin and want access to this" 

1280 " API, please file a feature request to make this public." 

1281 " (The API is currently private as it is unstable.)" 

1282 ) 

1283 

1284 

1285class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

1286 __slots__ = () 

1287 

1288 def _append_script( 

1289 self, 

1290 caller_name: str, 

1291 maintscript: Maintscript, 

1292 full_script: str, 

1293 /, 

1294 perform_substitution: bool = True, 

1295 ) -> None: 

1296 raise NotImplementedError 

1297 

1298 @classmethod 

1299 def _apply_condition_to_script( 

1300 cls, 

1301 condition: str, 

1302 run_snippet: str, 

1303 /, 

1304 indent: bool | None = None, 

1305 ) -> str: 

1306 if indent is None: 

1307 # We auto-determine this based on heredocs currently 

1308 indent = "<<" not in run_snippet 

1309 

1310 if indent: 

1311 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1312 if not run_snippet.endswith("\n"): 

1313 run_snippet += "\n" 

1314 condition_line = f"if {condition}; then\n" 

1315 end_line = "fi\n" 

1316 return "".join((condition_line, run_snippet, end_line)) 

1317 

1318 def on_configure( 

1319 self, 

1320 run_snippet: str, 

1321 /, 

1322 indent: bool | None = None, 

1323 perform_substitution: bool = True, 

1324 skip_on_rollback: bool = False, 

1325 ) -> None: 

1326 condition = POSTINST_DEFAULT_CONDITION 

1327 if skip_on_rollback: 1327 ↛ 1328line 1327 didn't jump to line 1328 because the condition on line 1327 was never true

1328 condition = '[ "$1" = "configure" ]' 

1329 return self._append_script( 

1330 "on_configure", 

1331 "postinst", 

1332 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1333 perform_substitution=perform_substitution, 

1334 ) 

1335 

1336 def on_initial_install( 

1337 self, 

1338 run_snippet: str, 

1339 /, 

1340 indent: bool | None = None, 

1341 perform_substitution: bool = True, 

1342 ) -> None: 

1343 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1344 return self._append_script( 

1345 "on_initial_install", 

1346 "postinst", 

1347 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1348 perform_substitution=perform_substitution, 

1349 ) 

1350 

1351 def on_upgrade( 

1352 self, 

1353 run_snippet: str, 

1354 /, 

1355 indent: bool | None = None, 

1356 perform_substitution: bool = True, 

1357 ) -> None: 

1358 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1359 return self._append_script( 

1360 "on_upgrade", 

1361 "postinst", 

1362 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1363 perform_substitution=perform_substitution, 

1364 ) 

1365 

1366 def on_upgrade_from( 

1367 self, 

1368 version: str, 

1369 run_snippet: str, 

1370 /, 

1371 indent: bool | None = None, 

1372 perform_substitution: bool = True, 

1373 ) -> None: 

1374 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1375 return self._append_script( 

1376 "on_upgrade_from", 

1377 "postinst", 

1378 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1379 perform_substitution=perform_substitution, 

1380 ) 

1381 

1382 def on_before_removal( 

1383 self, 

1384 run_snippet: str, 

1385 /, 

1386 indent: bool | None = None, 

1387 perform_substitution: bool = True, 

1388 ) -> None: 

1389 condition = '[ "$1" = "remove" ]' 

1390 return self._append_script( 

1391 "on_before_removal", 

1392 "prerm", 

1393 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1394 perform_substitution=perform_substitution, 

1395 ) 

1396 

1397 def on_removed( 

1398 self, 

1399 run_snippet: str, 

1400 /, 

1401 indent: bool | None = None, 

1402 perform_substitution: bool = True, 

1403 ) -> None: 

1404 condition = '[ "$1" = "remove" ]' 

1405 return self._append_script( 

1406 "on_removed", 

1407 "postrm", 

1408 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1409 perform_substitution=perform_substitution, 

1410 ) 

1411 

1412 def on_purge( 

1413 self, 

1414 run_snippet: str, 

1415 /, 

1416 indent: bool | None = None, 

1417 perform_substitution: bool = True, 

1418 ) -> None: 

1419 condition = '[ "$1" = "purge" ]' 

1420 return self._append_script( 

1421 "on_purge", 

1422 "postrm", 

1423 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1424 perform_substitution=perform_substitution, 

1425 ) 

1426 

1427 def unconditionally_in_script( 

1428 self, 

1429 maintscript: Maintscript, 

1430 run_snippet: str, 

1431 /, 

1432 perform_substitution: bool = True, 

1433 ) -> None: 

1434 if maintscript not in STD_CONTROL_SCRIPTS: 1434 ↛ 1435line 1434 didn't jump to line 1435 because the condition on line 1434 was never true

1435 raise ValueError( 

1436 f'Unknown script "{maintscript}". Should have been one of:' 

1437 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1438 ) 

1439 return self._append_script( 

1440 "unconditionally_in_script", 

1441 maintscript, 

1442 run_snippet, 

1443 perform_substitution=perform_substitution, 

1444 ) 

1445 

1446 

1447class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1448 __slots__ = ( 

1449 "_plugin_metadata", 

1450 "_maintscript_snippets", 

1451 "_plugin_source_id", 

1452 "_package_substitution", 

1453 "_default_snippet_order", 

1454 ) 

1455 

1456 def __init__( 

1457 self, 

1458 plugin_metadata: DebputyPluginMetadata, 

1459 plugin_source_id: str, 

1460 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1461 package_substitution: Substitution, 

1462 *, 

1463 default_snippet_order: Literal["service"] | None = None, 

1464 ): 

1465 self._plugin_metadata = plugin_metadata 

1466 self._plugin_source_id = plugin_source_id 

1467 self._maintscript_snippets = maintscript_snippets 

1468 self._package_substitution = package_substitution 

1469 self._default_snippet_order = default_snippet_order 

1470 

1471 def _append_script( 

1472 self, 

1473 caller_name: str, 

1474 maintscript: Maintscript, 

1475 full_script: str, 

1476 /, 

1477 perform_substitution: bool = True, 

1478 ) -> None: 

1479 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1480 if perform_substitution: 

1481 full_script = self._package_substitution.substitute(full_script, def_source) 

1482 

1483 snippet = MaintscriptSnippet( 

1484 snippet=full_script, 

1485 definition_source=def_source, 

1486 snippet_order=self._default_snippet_order, 

1487 ) 

1488 self._maintscript_snippets[maintscript].append(snippet) 

1489 

1490 

1491class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1492 __slots__ = ( 

1493 "_plugin_metadata", 

1494 "_plugin_source_id", 

1495 "_package_metadata_context", 

1496 "_triggers", 

1497 "_substvars", 

1498 "_maintscript", 

1499 "_shlibs_details", 

1500 ) 

1501 

1502 def __init__( 

1503 self, 

1504 plugin_metadata: DebputyPluginMetadata, 

1505 plugin_source_id: str, 

1506 package_metadata_context: PackageProcessingContext, 

1507 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1508 substvars: FlushableSubstvars, 

1509 shlibs_details: tuple[str | None, list[str] | None], 

1510 ) -> None: 

1511 self._plugin_metadata = plugin_metadata 

1512 self._plugin_source_id = plugin_source_id 

1513 self._package_metadata_context = package_metadata_context 

1514 self._triggers = triggers 

1515 self._substvars = substvars 

1516 self._maintscript: MaintscriptAccessor | None = None 

1517 self._shlibs_details = shlibs_details 

1518 

1519 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1520 raise NotImplementedError 

1521 

1522 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1523 """Register a declarative dpkg level trigger 

1524 

1525 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1526 

1527 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1528 """ 

1529 key = (trigger_type, trigger_target) 

1530 if key in self._triggers: 1530 ↛ 1531line 1530 didn't jump to line 1531 because the condition on line 1530 was never true

1531 return 

1532 self._triggers[key] = PluginProvidedTrigger( 

1533 dpkg_trigger_type=trigger_type, 

1534 dpkg_trigger_target=trigger_target, 

1535 provider=self._plugin_metadata, 

1536 provider_source_id=self._plugin_source_id, 

1537 ) 

1538 

1539 @property 

1540 def maintscript(self) -> MaintscriptAccessor: 

1541 maintscript = self._maintscript 

1542 if maintscript is None: 

1543 maintscript = self._create_maintscript_accessor() 

1544 self._maintscript = maintscript 

1545 return maintscript 

1546 

1547 @property 

1548 def substvars(self) -> FlushableSubstvars: 

1549 return self._substvars 

1550 

1551 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1552 binary_package = self._package_metadata_context.binary_package 

1553 with self.substvars.flush() as substvars_file: 

1554 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1555 if binary_package.is_udeb: 

1556 dpkg_cmd.append("-tudeb") 

1557 if binary_package.is_essential: 1557 ↛ 1558line 1557 didn't jump to line 1558 because the condition on line 1557 was never true

1558 dpkg_cmd.append("-dPre-Depends") 

1559 shlibs_local, shlib_dirs = self._shlibs_details 

1560 if shlibs_local is not None: 1560 ↛ 1561line 1560 didn't jump to line 1561 because the condition on line 1560 was never true

1561 dpkg_cmd.append(f"-L{shlibs_local}") 

1562 if shlib_dirs: 1562 ↛ 1563line 1562 didn't jump to line 1563 because the condition on line 1562 was never true

1563 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1564 dpkg_cmd.extend(p.fs_path for p in paths) 

1565 print_command(*dpkg_cmd) 

1566 try: 

1567 subprocess.check_call(dpkg_cmd) 

1568 except subprocess.CalledProcessError: 

1569 _error( 

1570 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1571 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1572 ) 

1573 

1574 

1575class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1576 __slots__ = ( 

1577 "_maintscript", 

1578 "_maintscript_snippets", 

1579 "_package_substitution", 

1580 ) 

1581 

1582 def __init__( 

1583 self, 

1584 plugin_metadata: DebputyPluginMetadata, 

1585 plugin_source_id: str, 

1586 package_metadata_context: PackageProcessingContext, 

1587 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1588 substvars: FlushableSubstvars, 

1589 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1590 package_substitution: Substitution, 

1591 shlibs_details: tuple[str | None, list[str] | None], 

1592 *, 

1593 default_snippet_order: Literal["service"] | None = None, 

1594 ) -> None: 

1595 super().__init__( 

1596 plugin_metadata, 

1597 plugin_source_id, 

1598 package_metadata_context, 

1599 triggers, 

1600 substvars, 

1601 shlibs_details, 

1602 ) 

1603 self._maintscript_snippets = maintscript_snippets 

1604 self._package_substitution = package_substitution 

1605 self._maintscript = MaintscriptAccessorProvider( 

1606 plugin_metadata, 

1607 plugin_source_id, 

1608 maintscript_snippets, 

1609 package_substitution, 

1610 default_snippet_order=default_snippet_order, 

1611 ) 

1612 

1613 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1614 return MaintscriptAccessorProvider( 

1615 self._plugin_metadata, 

1616 self._plugin_source_id, 

1617 self._maintscript_snippets, 

1618 self._package_substitution, 

1619 ) 

1620 

1621 

1622class BinaryCtrlAccessorProviderCreator: 

1623 def __init__( 

1624 self, 

1625 package_metadata_context: PackageProcessingContext, 

1626 substvars: FlushableSubstvars, 

1627 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1628 substitution: Substitution, 

1629 ) -> None: 

1630 self._package_metadata_context = package_metadata_context 

1631 self._substvars = substvars 

1632 self._maintscript_snippets = maintscript_snippets 

1633 self._substitution = substitution 

1634 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1635 self.shlibs_details: tuple[str | None, list[str] | None] = None, None 

1636 

1637 def for_plugin( 

1638 self, 

1639 plugin_metadata: DebputyPluginMetadata, 

1640 plugin_source_id: str, 

1641 *, 

1642 default_snippet_order: Literal["service"] | None = None, 

1643 ) -> BinaryCtrlAccessor: 

1644 return BinaryCtrlAccessorProvider( 

1645 plugin_metadata, 

1646 plugin_source_id, 

1647 self._package_metadata_context, 

1648 self._triggers, 

1649 self._substvars, 

1650 self._maintscript_snippets, 

1651 self._substitution, 

1652 self.shlibs_details, 

1653 default_snippet_order=default_snippet_order, 

1654 ) 

1655 

1656 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1657 return self._triggers.values() 

1658 

1659 

1660def _resolve_bundled_plugin_docs_path( 

1661 plugin_name: str, 

1662 loader: PluginInitializationEntryPoint | None, 

1663) -> Traversable | Path | None: 

1664 plugin_module = getattr(loader, "__module__") 

1665 assert plugin_module is not None 

1666 plugin_package_name = sys.modules[plugin_module].__package__ 

1667 return importlib.resources.files(plugin_package_name).joinpath( 

1668 f"{plugin_name}_docs.yaml" 

1669 ) 

1670 

1671 

1672def plugin_metadata_for_debputys_own_plugin( 

1673 loader: PluginInitializationEntryPoint | None = None, 

1674) -> DebputyPluginMetadata: 

1675 if loader is None: 

1676 from debputy.plugins.debputy.debputy_plugin import ( 

1677 initialize_debputy_features, 

1678 ) 

1679 

1680 loader = initialize_debputy_features 

1681 plugin_name = "debputy" 

1682 return DebputyPluginMetadata( 

1683 plugin_name="debputy", 

1684 api_compat_version=1, 

1685 plugin_initializer=loader, 

1686 plugin_loader=None, 

1687 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path( 

1688 plugin_name, 

1689 loader, 

1690 ), 

1691 plugin_path="<bundled>", 

1692 ) 

1693 

1694 

1695def load_plugin_features( 

1696 plugin_search_dirs: Sequence[str], 

1697 substitution: Substitution, 

1698 requested_plugins_only: Sequence[str] | None = None, 

1699 required_plugins: set[str] | None = None, 

1700 plugin_feature_set: PluginProvidedFeatureSet | None = None, 

1701 debug_mode: bool = False, 

1702) -> PluginProvidedFeatureSet: 

1703 if plugin_feature_set is None: 

1704 plugin_feature_set = PluginProvidedFeatureSet() 

1705 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1706 unloadable_plugins = set() 

1707 if required_plugins: 

1708 plugins.extend( 

1709 find_json_plugins( 

1710 plugin_search_dirs, 

1711 required_plugins, 

1712 ) 

1713 ) 

1714 if requested_plugins_only is not None: 

1715 plugins.extend( 

1716 find_json_plugins( 

1717 plugin_search_dirs, 

1718 requested_plugins_only, 

1719 ) 

1720 ) 

1721 else: 

1722 auto_loaded = _find_all_json_plugins( 

1723 plugin_search_dirs, 

1724 required_plugins if required_plugins is not None else frozenset(), 

1725 debug_mode=debug_mode, 

1726 ) 

1727 for plugin_metadata in auto_loaded: 

1728 plugins.append(plugin_metadata) 

1729 unloadable_plugins.add(plugin_metadata.plugin_name) 

1730 

1731 for plugin_metadata in plugins: 

1732 api = DebputyPluginInitializerProvider( 

1733 plugin_metadata, plugin_feature_set, substitution 

1734 ) 

1735 try: 

1736 api.load_plugin() 

1737 except PluginBaseError as e: 

1738 if plugin_metadata.plugin_name not in unloadable_plugins: 

1739 raise 

1740 if debug_mode: 

1741 _warn( 

1742 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1743 f" to --debug/-d or DEBPUTY_DEBUG=1" 

1744 ) 

1745 raise 

1746 try: 

1747 api.unload_plugin() 

1748 except Exception: 

1749 _warn( 

1750 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1751 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1752 " module might have tainted the feature set." 

1753 ) 

1754 raise e from None 

1755 else: 

1756 _warn( 

1757 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1758 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace" 

1759 f" (the warning will become an error)" 

1760 ) 

1761 

1762 return plugin_feature_set 

1763 

1764 

1765def find_json_plugin( 

1766 search_dirs: Sequence[str], 

1767 requested_plugin: str, 

1768) -> DebputyPluginMetadata: 

1769 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1770 assert len(r) == 1 

1771 return r[0] 

1772 

1773 

1774def find_related_implementation_files_for_plugin( 

1775 plugin_metadata: DebputyPluginMetadata, 

1776) -> list[str]: 

1777 if plugin_metadata.is_bundled: 

1778 plugin_name = plugin_metadata.plugin_name 

1779 _error( 

1780 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1781 " or loaded via a mechanism that does not support detecting its tests." 

1782 ) 

1783 

1784 if plugin_metadata.is_from_python_path: 

1785 plugin_name = plugin_metadata.plugin_name 

1786 # Maybe they could be, but that is for another day. 

1787 _error( 

1788 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1789 " and these are not supported." 

1790 ) 

1791 files = [] 

1792 module_name, module_file = _find_plugin_implementation_file( 

1793 plugin_metadata.plugin_name, 

1794 plugin_metadata.plugin_path, 

1795 ) 

1796 if os.path.isfile(module_file): 

1797 files.append(module_file) 

1798 else: 

1799 if not plugin_metadata.is_loaded: 

1800 plugin_metadata.load_plugin() 

1801 if module_name in sys.modules: 

1802 _error( 

1803 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1804 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1805 f" installed via this method. The related Python would not be installed" 

1806 f" (which would result in a plugin that would fail to load)" 

1807 ) 

1808 

1809 return files 

1810 

1811 

1812def find_tests_for_plugin( 

1813 plugin_metadata: DebputyPluginMetadata, 

1814) -> list[str]: 

1815 plugin_name = plugin_metadata.plugin_name 

1816 plugin_path = plugin_metadata.plugin_path 

1817 

1818 if plugin_metadata.is_bundled: 

1819 _error( 

1820 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1821 " mechanism that does not support detecting its tests." 

1822 ) 

1823 

1824 if plugin_metadata.is_from_python_path: 

1825 plugin_name = plugin_metadata.plugin_name 

1826 # Maybe they could be, but that is for another day. 

1827 _error( 

1828 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1829 " and these are not supported." 

1830 ) 

1831 

1832 plugin_dir = os.path.dirname(plugin_path) 

1833 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1834 tests = [] 

1835 with os.scandir(plugin_dir) as dir_iter: 

1836 for p in dir_iter: 

1837 if ( 

1838 p.is_file() 

1839 and p.name.startswith(test_basename_prefix) 

1840 and PLUGIN_TEST_SUFFIX.search(p.name) 

1841 ): 

1842 tests.append(p.path) 

1843 return tests 

1844 

1845 

1846def find_json_plugins( 

1847 search_dirs: Sequence[str], 

1848 requested_plugins: Iterable[str], 

1849) -> Iterable[DebputyPluginMetadata]: 

1850 for plugin_name_or_path in requested_plugins: 1850 ↛ exitline 1850 didn't return from function 'find_json_plugins' because the loop on line 1850 didn't complete

1851 if "/" in plugin_name_or_path: 1851 ↛ 1852line 1851 didn't jump to line 1852 because the condition on line 1851 was never true

1852 if not os.path.isfile(plugin_name_or_path): 

1853 raise PluginNotFoundError( 

1854 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1855 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1856 " is not used." 

1857 ) 

1858 yield parse_json_plugin_desc(plugin_name_or_path) 

1859 return 

1860 for search_dir in search_dirs: 1860 ↛ 1869line 1860 didn't jump to line 1869 because the loop on line 1860 didn't complete

1861 path = os.path.join( 

1862 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1863 ) 

1864 if not os.path.isfile(path): 1864 ↛ 1865line 1864 didn't jump to line 1865 because the condition on line 1864 was never true

1865 continue 

1866 yield parse_json_plugin_desc(path) 

1867 return 

1868 

1869 path_root = PLUGIN_PYTHON_RES_PATH 

1870 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json") 

1871 if pp_path or pp_path.is_file(): 

1872 with pp_path.open() as fd: 

1873 yield parse_json_plugin_desc( 

1874 f"PYTHONPATH:debputy/plugins/{pp_path.name}", 

1875 fd=fd, 

1876 is_from_python_path=True, 

1877 ) 

1878 return 

1879 

1880 search_dir_str = ":".join(search_dirs) 

1881 raise PluginNotFoundError( 

1882 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1883 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1884 ) 

1885 

1886 

1887def _find_all_json_plugins( 

1888 search_dirs: Sequence[str], 

1889 required_plugins: AbstractSet[str], 

1890 debug_mode: bool = False, 

1891) -> Iterable[DebputyPluginMetadata]: 

1892 seen = set(required_plugins) 

1893 error_seen = False 

1894 for search_dir in search_dirs: 

1895 try: 

1896 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1897 except FileNotFoundError: 

1898 continue 

1899 with dir_fd: 

1900 for entry in dir_fd: 

1901 if ( 

1902 not entry.is_file(follow_symlinks=True) 

1903 or not entry.name.endswith(".json") 

1904 or entry.name in seen 

1905 ): 

1906 continue 

1907 seen.add(entry.name) 

1908 try: 

1909 plugin_metadata = parse_json_plugin_desc(entry.path) 

1910 except PluginBaseError as e: 

1911 if debug_mode: 

1912 raise 

1913 if not error_seen: 

1914 error_seen = True 

1915 _warn( 

1916 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1917 ) 

1918 else: 

1919 _warn( 

1920 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1921 ) 

1922 else: 

1923 yield plugin_metadata 

1924 

1925 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir(): 

1926 if ( 

1927 not pp_entry.name.endswith(".json") 

1928 or not pp_entry.is_file() 

1929 or pp_entry.name in seen 

1930 ): 

1931 continue 

1932 seen.add(pp_entry.name) 

1933 with pp_entry.open() as fd: 

1934 yield parse_json_plugin_desc( 

1935 f"PYTHONPATH:debputy/plugins/{pp_entry.name}", 

1936 fd=fd, 

1937 is_from_python_path=True, 

1938 ) 

1939 

1940 

1941def _find_plugin_implementation_file( 

1942 plugin_name: str, 

1943 json_file_path: str, 

1944) -> tuple[str, str]: 

1945 guessed_module_basename = plugin_name.replace("-", "_") 

1946 module_name = f"debputy.plugins.{guessed_module_basename}" 

1947 module_fs_path = os.path.join( 

1948 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1949 ) 

1950 return module_name, module_fs_path 

1951 

1952 

1953def _resolve_module_initializer( 

1954 plugin_name: str, 

1955 plugin_initializer_name: str, 

1956 module_name: str | None, 

1957 json_file_path: str, 

1958) -> PluginInitializationEntryPoint: 

1959 module = None 

1960 module_fs_path = None 

1961 if module_name is None: 1961 ↛ 1989line 1961 didn't jump to line 1989 because the condition on line 1961 was always true

1962 module_name, module_fs_path = _find_plugin_implementation_file( 

1963 plugin_name, json_file_path 

1964 ) 

1965 if os.path.isfile(module_fs_path): 1965 ↛ 1989line 1965 didn't jump to line 1989 because the condition on line 1965 was always true

1966 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1967 if spec is None: 1967 ↛ 1968line 1967 didn't jump to line 1968 because the condition on line 1967 was never true

1968 raise PluginInitializationError( 

1969 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1970 " The spec_from_file_location function returned None." 

1971 ) 

1972 mod = importlib.util.module_from_spec(spec) 

1973 loader = spec.loader 

1974 if loader is None: 1974 ↛ 1975line 1974 didn't jump to line 1975 because the condition on line 1974 was never true

1975 raise PluginInitializationError( 

1976 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1977 " Python could not find a suitable loader (spec.loader was None)" 

1978 ) 

1979 sys.modules[module_name] = mod 

1980 try: 

1981 run_in_context_of_plugin(plugin_name, loader.exec_module, mod) 

1982 except (Exception, GeneratorExit) as e: 

1983 raise PluginInitializationError( 

1984 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1985 " The module threw an exception while being loaded." 

1986 ) from e 

1987 module = mod 

1988 

1989 if module is None: 1989 ↛ 1990line 1989 didn't jump to line 1990 because the condition on line 1989 was never true

1990 try: 

1991 module = run_in_context_of_plugin( 

1992 plugin_name, importlib.import_module, module_name 

1993 ) 

1994 except ModuleNotFoundError as e: 

1995 if module_fs_path is None: 

1996 raise PluginMetadataError( 

1997 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1998 " this module is not available in the python search path" 

1999 ) from e 

2000 raise PluginInitializationError( 

2001 f"Failed to load {plugin_name}. Tried loading it from" 

2002 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

2003 f" {module_name} (where it was not found either). Please ensure" 

2004 " the module code is installed in the correct spot or provide an" 

2005 f' explicit "module" definition in {json_file_path}.' 

2006 ) from e 

2007 

2008 plugin_initializer = run_in_context_of_plugin_wrap_errors( 

2009 plugin_name, 

2010 getattr, 

2011 module, 

2012 plugin_initializer_name, 

2013 None, 

2014 ) 

2015 

2016 if plugin_initializer is None: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true

2017 raise PluginMetadataError( 

2018 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2019 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute' 

2020 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name" 

2021 " in the Python module." 

2022 ) 

2023 if isinstance(plugin_initializer, DebputyPluginDefinition): 

2024 return plugin_initializer.initialize 

2025 if not callable(plugin_initializer): 2025 ↛ 2026line 2025 didn't jump to line 2026 because the condition on line 2025 was never true

2026 raise PluginMetadataError( 

2027 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2028 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that' 

2029 " attribute exists, it is neither a `DebputyPluginDefinition`" 

2030 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`" 

2031 " (`def initialize(api: DebputyPluginInitializer) -> None:`)." 

2032 ) 

2033 return cast("PluginInitializationEntryPoint", plugin_initializer) 

2034 

2035 

2036def _json_plugin_loader( 

2037 plugin_name: str, 

2038 plugin_json_metadata: PluginJsonMetadata, 

2039 json_file_path: str, 

2040 attribute_path: AttributePath, 

2041) -> Callable[["DebputyPluginInitializer"], None]: 

2042 api_compat = plugin_json_metadata["api_compat_version"] 

2043 module_name = plugin_json_metadata.get("module") 

2044 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

2045 packager_provided_files_raw = plugin_json_metadata.get( 

2046 "packager_provided_files", [] 

2047 ) 

2048 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

2049 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

2050 if api_compat != 1: 2050 ↛ 2051line 2050 didn't jump to line 2051 because the condition on line 2050 was never true

2051 raise PluginMetadataError( 

2052 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

2053 f" version of debputy only supports API compat version of 1" 

2054 ) 

2055 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2055 ↛ 2056line 2055 didn't jump to line 2056 because the condition on line 2055 was never true

2056 p = attribute_path["plugin_initializer"] 

2057 raise PluginMetadataError( 

2058 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

2059 ) 

2060 

2061 plugin_initializers = [] 

2062 

2063 if plugin_initializer_name is not None: 

2064 plugin_initializer = _resolve_module_initializer( 

2065 plugin_name, 

2066 plugin_initializer_name, 

2067 module_name, 

2068 json_file_path, 

2069 ) 

2070 plugin_initializers.append(plugin_initializer) 

2071 

2072 if known_packaging_files_raw: 

2073 kpf_root_path = attribute_path["known_packaging_files"] 

2074 known_packaging_files = [] 

2075 for k, v in enumerate(known_packaging_files_raw): 

2076 kpf_path = kpf_root_path[k] 

2077 p = v.get("path") 

2078 if isinstance(p, str): 2078 ↛ 2080line 2078 didn't jump to line 2080 because the condition on line 2078 was always true

2079 kpf_path.path_hint = p 

2080 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2080 ↛ 2092line 2080 didn't jump to line 2092 because the condition on line 2080 was always true

2081 docs = v.get("documentation-uris") 

2082 if docs is not None and isinstance(docs, list): 

2083 docs = [ 

2084 ( 

2085 d.replace("@DEBPUTY_DOC_ROOT_DIR@", debputy_doc_root_dir()) 

2086 if isinstance(d, str) 

2087 else d 

2088 ) 

2089 for d in docs 

2090 ] 

2091 v["documentation-uris"] = docs 

2092 known_packaging_file: KnownPackagingFileInfo = ( 

2093 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

2094 v, 

2095 kpf_path, 

2096 ) 

2097 ) 

2098 known_packaging_files.append((kpf_path, known_packaging_file)) 

2099 

2100 def _initialize_json_provided_known_packaging_files( 

2101 api: DebputyPluginInitializerProvider, 

2102 ) -> None: 

2103 for p, details in known_packaging_files: 

2104 try: 

2105 api.known_packaging_files(details) 

2106 except ValueError as ex: 

2107 raise PluginMetadataError( 

2108 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

2109 ) 

2110 

2111 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

2112 

2113 if manifest_variables_raw: 

2114 manifest_var_path = attribute_path["manifest_variables"] 

2115 manifest_variables = [ 

2116 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

2117 for i, p in enumerate(manifest_variables_raw) 

2118 ] 

2119 

2120 def _initialize_json_provided_manifest_vars( 

2121 api: DebputyPluginInitializer, 

2122 ) -> None: 

2123 for idx, manifest_variable in enumerate(manifest_variables): 

2124 name = manifest_variable["name"] 

2125 value = manifest_variable["value"] 

2126 doc = manifest_variable.get("reference_documentation") 

2127 try: 

2128 api.manifest_variable( 

2129 name, value, variable_reference_documentation=doc 

2130 ) 

2131 except ValueError as ex: 

2132 var_path = manifest_var_path[idx] 

2133 raise PluginMetadataError( 

2134 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

2135 ) 

2136 

2137 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

2138 

2139 if packager_provided_files_raw: 

2140 ppf_path = attribute_path["packager_provided_files"] 

2141 ppfs = [ 

2142 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

2143 for i, p in enumerate(packager_provided_files_raw) 

2144 ] 

2145 

2146 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

2147 ppf: PackagerProvidedFileJsonDescription 

2148 for idx, ppf in enumerate(ppfs): 

2149 c = dict(ppf) 

2150 stem = ppf["stem"] 

2151 installed_path = ppf["installed_path"] 

2152 default_mode = ppf.get("default_mode") 

2153 ref_doc_dict = ppf.get("reference_documentation") 

2154 if default_mode is not None: 2154 ↛ 2157line 2154 didn't jump to line 2157 because the condition on line 2154 was always true

2155 c["default_mode"] = default_mode.octal_mode 

2156 

2157 if ref_doc_dict is not None: 2157 ↛ 2162line 2157 didn't jump to line 2162 because the condition on line 2157 was always true

2158 ref_doc = packager_provided_file_reference_documentation( 

2159 **ref_doc_dict 

2160 ) 

2161 else: 

2162 ref_doc = None 

2163 

2164 for k in [ 

2165 "stem", 

2166 "installed_path", 

2167 "reference_documentation", 

2168 ]: 

2169 try: 

2170 del c[k] 

2171 except KeyError: 

2172 pass 

2173 

2174 try: 

2175 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

2176 except ValueError as ex: 

2177 p_path = ppf_path[idx] 

2178 raise PluginMetadataError( 

2179 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

2180 ) 

2181 

2182 plugin_initializers.append(_initialize_json_provided_ppfs) 

2183 

2184 if not plugin_initializers: 2184 ↛ 2185line 2184 didn't jump to line 2185 because the condition on line 2184 was never true

2185 raise PluginMetadataError( 

2186 f"The plugin defined in {json_file_path} does not seem to provide features," 

2187 f" such as module + plugin-initializer or packager-provided-files." 

2188 ) 

2189 

2190 if len(plugin_initializers) == 1: 

2191 return plugin_initializers[0] 

2192 

2193 def _chain_loader(api: DebputyPluginInitializer) -> None: 

2194 for initializer in plugin_initializers: 

2195 initializer(api) 

2196 

2197 return _chain_loader 

2198 

2199 

2200@overload 

2201@contextlib.contextmanager 

2202def _open( 2202 ↛ exitline 2202 didn't return from function '_open' because

2203 path: str, 

2204 fd: IO[AnyStr] | IOBase = ..., 

2205) -> Iterator[IO[AnyStr] | IOBase]: ... 

2206 

2207 

2208@overload 

2209@contextlib.contextmanager 

2210def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2210 ↛ exitline 2210 didn't return from function '_open' because

2211 

2212 

2213@contextlib.contextmanager 

2214def _open( 

2215 path: str, fd: IO[AnyStr] | IOBase | None = None 

2216) -> Iterator[IO[AnyStr] | IOBase]: 

2217 if fd is not None: 

2218 yield fd 

2219 else: 

2220 with open(path, "rb") as fd: 

2221 yield fd 

2222 

2223 

2224def _resolve_json_plugin_docs_path( 

2225 plugin_name: str, 

2226 plugin_path: str, 

2227) -> Traversable | Path | None: 

2228 plugin_dir = os.path.dirname(plugin_path) 

2229 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml")) 

2230 

2231 

2232def parse_json_plugin_desc( 

2233 path: str, 

2234 *, 

2235 fd: IO[AnyStr] | IOBase | None = None, 

2236 is_from_python_path: bool = False, 

2237) -> DebputyPluginMetadata: 

2238 with _open(path, fd=fd) as rfd: 

2239 try: 

2240 raw = json.load(rfd) 

2241 except JSONDecodeError as e: 

2242 raise PluginMetadataError( 

2243 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

2244 ) from e 

2245 plugin_name = os.path.basename(path) 

2246 if plugin_name.endswith(".json"): 

2247 plugin_name = plugin_name[:-5] 

2248 elif plugin_name.endswith(".json.in"): 

2249 plugin_name = plugin_name[:-8] 

2250 

2251 if plugin_name == "debputy": 2251 ↛ 2253line 2251 didn't jump to line 2253 because the condition on line 2251 was never true

2252 # Provide a better error message than "The plugin has already loaded!?" 

2253 raise PluginMetadataError( 

2254 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

2255 f" clash with the bundled plugin of same name." 

2256 ) 

2257 

2258 attribute_path = AttributePath.root_path(raw) 

2259 

2260 try: 

2261 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

2262 raw, 

2263 attribute_path, 

2264 ) 

2265 except ManifestParseException as e: 

2266 raise PluginMetadataError( 

2267 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

2268 ) from e 

2269 api_compat = plugin_json_metadata["api_compat_version"] 

2270 

2271 return DebputyPluginMetadata( 

2272 plugin_name=plugin_name, 

2273 plugin_loader=lambda: _json_plugin_loader( 

2274 plugin_name, 

2275 plugin_json_metadata, 

2276 path, 

2277 attribute_path, 

2278 ), 

2279 api_compat_version=api_compat, 

2280 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path( 

2281 plugin_name, path 

2282 ), 

2283 plugin_initializer=None, 

2284 plugin_path=path, 

2285 is_from_python_path=is_from_python_path, 

2286 ) 

2287 

2288 

2289@dataclasses.dataclass(slots=True, frozen=True) 

2290class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

2291 name: str 

2292 names: Sequence[str] 

2293 path: VirtualPath 

2294 type_of_service: str 

2295 service_scope: str 

2296 auto_enable_on_install: bool 

2297 auto_start_on_install: bool 

2298 on_upgrade: ServiceUpgradeRule 

2299 definition_source: str 

2300 is_plugin_provided_definition: bool 

2301 service_context: DSD | None 

2302 

2303 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

2304 return dataclasses.replace(self, **changes) 

2305 

2306 

2307class ServiceRegistryImpl(ServiceRegistry[DSD]): 

2308 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

2309 

2310 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

2311 self._service_manager_details = service_manager_details 

2312 self._service_definitions: list[ServiceDefinition[DSD]] = [] 

2313 self._seen_services: set[tuple[str, str, str]] = set() 

2314 

2315 @property 

2316 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

2317 return self._service_definitions 

2318 

2319 def register_service( 

2320 self, 

2321 path: VirtualPath, 

2322 name: str | list[str], 

2323 *, 

2324 type_of_service: str = "service", # "timer", etc. 

2325 service_scope: str = "system", 

2326 enable_by_default: bool = True, 

2327 start_by_default: bool = True, 

2328 default_upgrade_rule: ServiceUpgradeRule = "restart", 

2329 service_context: DSD | None = None, 

2330 ) -> None: 

2331 names = name if isinstance(name, list) else [name] 

2332 if len(names) < 1: 

2333 raise ValueError( 

2334 f"The service must have at least one name - {path.absolute} did not have any" 

2335 ) 

2336 for n in names: 

2337 key = (n, type_of_service, service_scope) 

2338 if key in self._seen_services: 

2339 raise PluginAPIViolationError( 

2340 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

2341 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

2342 " allowed by the debputy plugin API." 

2343 ) 

2344 # TODO: We cannot create a service definition immediate once the manifest is involved 

2345 self._service_definitions.append( 

2346 ServiceDefinitionImpl( 

2347 names[0], 

2348 names, 

2349 path, 

2350 type_of_service, 

2351 service_scope, 

2352 enable_by_default, 

2353 start_by_default, 

2354 default_upgrade_rule, 

2355 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

2356 True, 

2357 service_context, 

2358 ) 

2359 )