Coverage for src/debputy/plugin/api/impl.py: 60%

908 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-14 10:41 +0000

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.resources 

6import importlib.util 

7import inspect 

8import itertools 

9import json 

10import os 

11import re 

12import subprocess 

13import sys 

14from abc import ABC 

15from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container 

16from importlib.resources.abc import Traversable 

17from io import IOBase 

18from json import JSONDecodeError 

19from pathlib import Path 

20from types import NoneType 

21from typing import ( 

22 IO, 

23 AbstractSet, 

24 cast, 

25 Any, 

26 Literal, 

27 TYPE_CHECKING, 

28 is_typeddict, 

29 AnyStr, 

30 overload, 

31) 

32 

33import debputy 

34from debputy.exceptions import ( 

35 DebputySubstitutionError, 

36 PluginConflictError, 

37 PluginMetadataError, 

38 PluginBaseError, 

39 PluginInitializationError, 

40 PluginAPIViolationError, 

41 PluginNotFoundError, 

42 PluginIncorrectRegistrationError, 

43) 

44from debputy.maintscript_snippet import ( 

45 STD_CONTROL_SCRIPTS, 

46 MaintscriptSnippetContainer, 

47 MaintscriptSnippet, 

48) 

49from debputy.manifest_parser.exceptions import ManifestParseException 

50from debputy.manifest_parser.parser_data import ParserContextData 

51from debputy.manifest_parser.tagging_types import TypeMapping 

52from debputy.manifest_parser.util import AttributePath 

53from debputy.plugin.api.doc_parsing import ( 

54 DEBPUTY_DOC_REFERENCE_DATA_PARSER, 

55 parser_type_name, 

56 DebputyParsedDoc, 

57) 

58from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

59from debputy.plugin.api.impl_types import ( 

60 DebputyPluginMetadata, 

61 PackagerProvidedFileClassSpec, 

62 MetadataOrMaintscriptDetector, 

63 PluginProvidedTrigger, 

64 TTP, 

65 DIPHandler, 

66 PF, 

67 SF, 

68 DIPKWHandler, 

69 PluginProvidedManifestVariable, 

70 PluginProvidedPackageProcessor, 

71 PluginProvidedDiscardRule, 

72 AutomaticDiscardRuleExample, 

73 PPFFormatParam, 

74 ServiceManagerDetails, 

75 KnownPackagingFileInfo, 

76 PluginProvidedKnownPackagingFile, 

77 DHCompatibilityBasedRule, 

78 PluginProvidedTypeMapping, 

79 PluginProvidedBuildSystemAutoDetection, 

80 BSR, 

81 TP, 

82) 

83from debputy.plugin.api.plugin_parser import ( 

84 PLUGIN_METADATA_PARSER, 

85 PluginJsonMetadata, 

86 PLUGIN_PPF_PARSER, 

87 PackagerProvidedFileJsonDescription, 

88 PLUGIN_MANIFEST_VARS_PARSER, 

89 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

90) 

91from debputy.plugin.api.spec import ( 

92 MaintscriptAccessor, 

93 Maintscript, 

94 DpkgTriggerType, 

95 BinaryCtrlAccessor, 

96 PackageProcessingContext, 

97 MetadataAutoDetector, 

98 PluginInitializationEntryPoint, 

99 DebputyPluginInitializer, 

100 FlushableSubstvars, 

101 ParserDocumentation, 

102 PackageProcessor, 

103 VirtualPath, 

104 ServiceIntegrator, 

105 ServiceDetector, 

106 ServiceRegistry, 

107 ServiceDefinition, 

108 DSD, 

109 ServiceUpgradeRule, 

110 PackagerProvidedFileReferenceDocumentation, 

111 packager_provided_file_reference_documentation, 

112 TypeMappingDocumentation, 

113 DebputyIntegrationMode, 

114 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

115 BuildSystemManifestRuleMetadata, 

116 INTEGRATION_MODE_FULL, 

117 only_integrations, 

118 DebputyPluginDefinition, 

119) 

120from debputy.plugin.api.std_docs import _STD_ATTR_DOCS 

121from debputy.plugin.plugin_state import ( 

122 run_in_context_of_plugin, 

123 run_in_context_of_plugin_wrap_errors, 

124 wrap_plugin_code, 

125 register_manifest_type_value_in_context, 

126) 

127from debputy.plugins.debputy.to_be_api_types import ( 

128 BuildRuleParsedFormat, 

129 BSPF, 

130 debputy_build_system, 

131) 

132from debputy.substitution import ( 

133 Substitution, 

134 VariableNameState, 

135 SUBST_VAR_RE, 

136 VariableContext, 

137) 

138from debputy.util import ( 

139 _normalize_path, 

140 POSTINST_DEFAULT_CONDITION, 

141 _error, 

142 print_command, 

143 _warn, 

144 _debug_log, 

145 PackageTypeSelector, 

146) 

147from debputy.version import debputy_doc_root_dir 

148from debputy.yaml import MANIFEST_YAML 

149 

150if TYPE_CHECKING: 

151 from debputy.highlevel_manifest import HighLevelManifest 

152 

153PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

154PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__) 

155 

156 

157def _validate_known_packaging_file_dh_compat_rules( 

158 dh_compat_rules: list[DHCompatibilityBasedRule] | None, 

159) -> None: 

160 max_compat = None 

161 if not dh_compat_rules: 161 ↛ 164line 161 didn't jump to line 164 because the condition on line 161 was always true

162 return 

163 dh_compat_rule: DHCompatibilityBasedRule 

164 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

165 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

166 compat = dh_compat_rule.get("starting_with_compat_level") 

167 

168 remaining = dh_compat_rule.keys() - { 

169 "after_debhelper_version", 

170 "starting_with_compat_level", 

171 } 

172 if not remaining: 

173 raise ValueError( 

174 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?" 

175 ) 

176 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

177 raise ValueError( 

178 f"The dh compat-rule at index {idx} is not the last and is missing either" 

179 " before-debhelper-version or before-compat-level" 

180 ) 

181 if compat is not None and compat < 0: 

182 raise ValueError( 

183 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

184 f" for something that appeared when migrating from {compat} to {compat + 1}." 

185 ) 

186 

187 if max_compat is None: 

188 max_compat = compat 

189 elif compat is not None: 

190 if compat >= max_compat: 

191 raise ValueError( 

192 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

193 ) 

194 max_compat = compat 

195 

196 install_pattern = dh_compat_rule.get("install_pattern") 

197 if ( 

198 install_pattern is not None 

199 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

200 ): 

201 raise ValueError( 

202 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

203 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

204 ) 

205 

206 

207class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

208 __slots__ = ( 

209 "_plugin_metadata", 

210 "_feature_set", 

211 "_plugin_detector_ids", 

212 "_substitution", 

213 "_unloaders", 

214 "_is_doc_cache_resolved", 

215 "_doc_cache", 

216 "_registered_manifest_types", 

217 "_load_started", 

218 ) 

219 

220 def __init__( 

221 self, 

222 plugin_metadata: DebputyPluginMetadata, 

223 feature_set: PluginProvidedFeatureSet, 

224 substitution: Substitution, 

225 ) -> None: 

226 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

227 self._feature_set = feature_set 

228 self._plugin_detector_ids: set[str] = set() 

229 self._substitution = substitution 

230 self._unloaders: list[Callable[[], None]] = [] 

231 self._is_doc_cache_resolved: bool = False 

232 self._doc_cache: DebputyParsedDoc | None = None 

233 self._registered_manifest_types: dict[type[Any], DebputyPluginMetadata] = {} 

234 self._load_started = False 

235 

236 @property 

237 def plugin_metadata(self) -> DebputyPluginMetadata: 

238 return self._plugin_metadata 

239 

240 def unload_plugin(self) -> None: 

241 if self._load_started: 

242 for unloader in self._unloaders: 

243 unloader() 

244 del self._feature_set.plugin_data[self._plugin_name] 

245 

246 def load_plugin(self) -> None: 

247 metadata = self._plugin_metadata 

248 if metadata.plugin_name in self._feature_set.plugin_data: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 raise PluginConflictError( 

250 f'The plugin "{metadata.plugin_name}" has already been loaded!?', 

251 metadata, 

252 metadata, 

253 ) 

254 assert ( 

255 metadata.api_compat_version == 1 

256 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

257 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

258 self._load_started = True 

259 assert not metadata.is_initialized 

260 try: 

261 metadata.initialize_plugin(self) 

262 except Exception as e: 

263 initializer = metadata.plugin_initializer 

264 if ( 264 ↛ 269line 264 didn't jump to line 269 because the condition on line 264 was never true

265 isinstance(e, TypeError) 

266 and initializer is not None 

267 and not callable(initializer) 

268 ): 

269 raise PluginMetadataError( 

270 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

271 f" callable (callable returns False). The specified entry point identifies" 

272 f' itself as "{initializer.__qualname__}".' 

273 ) from e 

274 if isinstance(e, PluginBaseError): 274 ↛ 276line 274 didn't jump to line 276 because the condition on line 274 was always true

275 raise 

276 raise PluginInitializationError( 

277 f"Exception while attempting to load plugin {metadata.plugin_name}" 

278 ) from e 

279 

280 def _resolve_docs(self) -> DebputyParsedDoc | None: 

281 doc_cache = self._doc_cache 

282 if doc_cache is not None: 

283 return doc_cache 

284 

285 plugin_doc_path = self._plugin_metadata.plugin_doc_path 

286 if plugin_doc_path is None or self._is_doc_cache_resolved: 

287 self._is_doc_cache_resolved = True 

288 return None 

289 try: 

290 with plugin_doc_path.open("r", encoding="utf-8") as fd: 

291 raw = MANIFEST_YAML.load(fd) 

292 except FileNotFoundError: 

293 _debug_log( 

294 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}" 

295 ) 

296 self._is_doc_cache_resolved = True 

297 return None 

298 attr_path = AttributePath.root_path(plugin_doc_path) 

299 try: 

300 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path) 

301 except ManifestParseException as e: 

302 raise ValueError( 

303 f"Could not parse documentation in {plugin_doc_path}: {e.message}" 

304 ) from e 

305 try: 

306 res = DebputyParsedDoc.from_ref_data(ref) 

307 except ValueError as e: 

308 raise ValueError( 

309 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}" 

310 ) from e 

311 

312 self._doc_cache = res 

313 self._is_doc_cache_resolved = True 

314 return res 

315 

316 def _pluggable_manifest_docs_for( 

317 self, 

318 rule_type: TTP | str, 

319 rule_name: str | list[str], 

320 *, 

321 inline_reference_documentation: ParserDocumentation | None = None, 

322 ) -> ParserDocumentation | None: 

323 ref_data = self._resolve_docs() 

324 if ref_data is not None: 

325 primary_rule_name = ( 

326 rule_name if isinstance(rule_name, str) else rule_name[0] 

327 ) 

328 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}" 

329 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref) 

330 if resolved_docs is not None: 

331 if inline_reference_documentation is not None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValueError( 

333 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via" 

334 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so" 

335 f" there is only one doc reference" 

336 ) 

337 return resolved_docs 

338 return inline_reference_documentation 

339 

340 def packager_provided_file( 

341 self, 

342 stem: str, 

343 installed_path: str, 

344 *, 

345 default_mode: int = 0o0644, 

346 default_priority: int | None = None, 

347 allow_name_segment: bool = True, 

348 allow_architecture_segment: bool = False, 

349 post_formatting_rewrite: Callable[[str], str] | None = None, 

350 packageless_is_fallback_for_all_packages: bool = False, 

351 reservation_only: bool = False, 

352 format_callback: None | ( 

353 Callable[[str, PPFFormatParam, VirtualPath], str] 

354 ) = None, 

355 reference_documentation: None | ( 

356 PackagerProvidedFileReferenceDocumentation 

357 ) = None, 

358 ) -> None: 

359 packager_provided_files = self._feature_set.packager_provided_files 

360 existing = packager_provided_files.get(stem) 

361 

362 if format_callback is not None and self._plugin_name != "debputy": 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 raise ValueError( 

364 "Sorry; Using format_callback is a debputy-internal" 

365 f" API. Triggered by plugin {self._plugin_name}" 

366 ) 

367 

368 if installed_path.endswith("/"): 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 raise ValueError( 

370 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

371 f" Triggered by plugin {self._plugin_name}." 

372 ) 

373 

374 installed_path = _normalize_path(installed_path) 

375 

376 has_name_var = "{name}" in installed_path 

377 

378 if installed_path.startswith("./DEBIAN") or reservation_only: 

379 # Special-case, used for control files. 

380 if self._plugin_name != "debputy": 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 raise ValueError( 

382 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

383 f" API. Triggered by plugin {self._plugin_name}" 

384 ) 

385 elif not has_name_var and "{owning_package}" not in installed_path: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 raise ValueError( 

387 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

388 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

389 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

390 ) 

391 

392 if allow_name_segment and not has_name_var: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 raise ValueError( 

394 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

395 " variable. Otherwise, the name segment will not work properly. Triggered by" 

396 f" plugin {self._plugin_name}" 

397 ) 

398 

399 if ( 399 ↛ 404line 399 didn't jump to line 404 because the condition on line 399 was never true

400 default_priority is not None 

401 and "{priority}" not in installed_path 

402 and "{priority:02}" not in installed_path 

403 ): 

404 raise ValueError( 

405 'When default_priority is not None, the installed_path should have a "{priority}"' 

406 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

407 f" Triggered by plugin {self._plugin_name}" 

408 ) 

409 

410 if existing is not None: 

411 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 411 ↛ 418line 411 didn't jump to line 418 because the condition on line 411 was always true

412 message = ( 

413 f'The stem "{stem}" is registered twice for packager provided files.' 

414 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

415 f" by {self._plugin_name}" 

416 ) 

417 else: 

418 message = ( 

419 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

420 f' stem "{stem}" twice for packager provided files.' 

421 ) 

422 raise PluginConflictError( 

423 message, existing.debputy_plugin_metadata, self._plugin_metadata 

424 ) 

425 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

426 self._plugin_metadata, 

427 stem, 

428 installed_path, 

429 default_mode=default_mode, 

430 default_priority=default_priority, 

431 allow_name_segment=allow_name_segment, 

432 allow_architecture_segment=allow_architecture_segment, 

433 post_formatting_rewrite=post_formatting_rewrite, 

434 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

435 reservation_only=reservation_only, 

436 formatting_callback=format_callback, 

437 reference_documentation=reference_documentation, 

438 ) 

439 

440 def _unload() -> None: 

441 del packager_provided_files[stem] 

442 

443 self._unloaders.append(_unload) 

444 

445 def metadata_or_maintscript_detector( 

446 self, 

447 auto_detector_id: str, 

448 auto_detector: MetadataAutoDetector, 

449 *, 

450 package_types: PackageTypeSelector = PackageTypeSelector.DEB, 

451 ) -> None: 

452 if auto_detector_id in self._plugin_detector_ids: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 raise ValueError( 

454 f"The plugin {self._plugin_name} tried to register" 

455 f' "{auto_detector_id}" twice' 

456 ) 

457 self._plugin_detector_ids.add(auto_detector_id) 

458 all_detectors = self._feature_set.metadata_maintscript_detectors 

459 if self._plugin_name not in all_detectors: 

460 all_detectors[self._plugin_name] = [] 

461 all_detectors[self._plugin_name].append( 

462 MetadataOrMaintscriptDetector( 

463 detector_id=auto_detector_id, 

464 detector=wrap_plugin_code(self._plugin_name, auto_detector), 

465 plugin_metadata=self._plugin_metadata, 

466 applies_to_package_types=package_types, 

467 enabled=True, 

468 ) 

469 ) 

470 

471 def _unload() -> None: 

472 if self._plugin_name in all_detectors: 

473 del all_detectors[self._plugin_name] 

474 

475 self._unloaders.append(_unload) 

476 

477 def document_builtin_variable( 

478 self, 

479 variable_name: str, 

480 variable_reference_documentation: str, 

481 *, 

482 is_context_specific: bool = False, 

483 is_for_special_case: bool = False, 

484 ) -> None: 

485 manifest_variables = self._feature_set.manifest_variables 

486 self._restricted_api() 

487 state = self._substitution.variable_state(variable_name) 

488 if state == VariableNameState.UNDEFINED: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 raise ValueError( 

490 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

491 f" but it is not known to be a variable" 

492 ) 

493 

494 assert variable_name not in manifest_variables 

495 

496 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

497 self._plugin_metadata, 

498 variable_name, 

499 None, 

500 is_context_specific_variable=is_context_specific, 

501 variable_reference_documentation=variable_reference_documentation, 

502 is_documentation_placeholder=True, 

503 is_for_special_case=is_for_special_case, 

504 ) 

505 

506 def _unload() -> None: 

507 del manifest_variables[variable_name] 

508 

509 self._unloaders.append(_unload) 

510 

511 def manifest_variable_provider( 

512 self, 

513 provider: Callable[[VariableContext], Mapping[str, str]], 

514 variables: Sequence[str] | Mapping[str, str | None], 

515 ) -> None: 

516 self._restricted_api() 

517 cached_provider = functools.lru_cache(None)(provider) 

518 permitted_variables = frozenset(variables) 

519 variables_iter: Iterable[tuple[str, str | None]] 

520 if not isinstance(variables, Mapping): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 variables_iter = zip(variables, itertools.repeat(None)) 

522 else: 

523 variables_iter = variables.items() 

524 

525 checked_vars = False 

526 manifest_variables = self._feature_set.manifest_variables 

527 plugin_name = self._plugin_name 

528 

529 def _value_resolver_generator( 

530 variable_name: str, 

531 ) -> Callable[[VariableContext], str]: 

532 def _value_resolver(variable_context: VariableContext) -> str: 

533 res = cached_provider(variable_context) 

534 nonlocal checked_vars 

535 if not checked_vars: 535 ↛ 546line 535 didn't jump to line 546 because the condition on line 535 was always true

536 if permitted_variables != res.keys(): 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true

537 expected = ", ".join(sorted(permitted_variables)) 

538 actual = ", ".join(sorted(res)) 

539 raise PluginAPIViolationError( 

540 f"The plugin {plugin_name} claimed to provide" 

541 f" the following variables {expected}," 

542 f" but when resolving the variables, the plugin provided" 

543 f" {actual}. These two lists should have been the same." 

544 ) 

545 checked_vars = False 

546 return res[variable_name] 

547 

548 return _value_resolver 

549 

550 for varname, vardoc in variables_iter: 

551 self._check_variable_name(varname) 

552 manifest_variables[varname] = PluginProvidedManifestVariable( 

553 self._plugin_metadata, 

554 varname, 

555 _value_resolver_generator(varname), 

556 is_context_specific_variable=False, 

557 variable_reference_documentation=vardoc, 

558 ) 

559 

560 def _unload() -> None: 

561 raise PluginInitializationError( 

562 "Cannot unload manifest_variable_provider (not implemented)" 

563 ) 

564 

565 self._unloaders.append(_unload) 

566 

567 def _check_variable_name(self, variable_name: str) -> None: 

568 manifest_variables = self._feature_set.manifest_variables 

569 existing = manifest_variables.get(variable_name) 

570 

571 if existing is not None: 

572 if existing.plugin_metadata.plugin_name == self._plugin_name: 572 ↛ 578line 572 didn't jump to line 578 because the condition on line 572 was always true

573 message = ( 

574 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

575 f' manifest variable "{variable_name}" twice.' 

576 ) 

577 else: 

578 message = ( 

579 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

580 f" both tried to provide the manifest variable {variable_name}" 

581 ) 

582 raise PluginConflictError( 

583 message, existing.plugin_metadata, self._plugin_metadata 

584 ) 

585 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

586 raise ValueError( 

587 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

588 f" which is not a valid variable name" 

589 ) 

590 

591 namespace = "" 

592 variable_basename = variable_name 

593 if ":" in variable_name: 

594 namespace, variable_basename = variable_name.rsplit(":", 1) 

595 assert namespace != "" 

596 assert variable_name != "" 

597 

598 if namespace != "" and namespace not in ("token", "path"): 

599 raise ValueError( 

600 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

601 f" which is in the reserved namespace {namespace}" 

602 ) 

603 

604 variable_name_upper = variable_name.upper() 

605 if ( 

606 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

607 or variable_basename.startswith("_") 

608 or variable_basename.upper().startswith("DEBPUTY") 

609 ) and self._plugin_name != "debputy": 

610 raise ValueError( 

611 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

612 f" which is a variable name reserved by debputy" 

613 ) 

614 

615 state = self._substitution.variable_state(variable_name) 

616 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

617 raise ValueError( 

618 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

619 f" which would shadow a built-in variable" 

620 ) 

621 

622 def package_processor( 

623 self, 

624 processor_id: str, 

625 processor: PackageProcessor, 

626 *, 

627 depends_on_processor: Iterable[str] = tuple(), 

628 package_types: PackageTypeSelector = PackageTypeSelector.DEB, 

629 ) -> None: 

630 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"}) 

631 package_processors = self._feature_set.all_package_processors 

632 dependencies = set() 

633 processor_key = (self._plugin_name, processor_id) 

634 

635 if processor_key in package_processors: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 raise PluginConflictError( 

637 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

638 self._plugin_metadata, 

639 self._plugin_metadata, 

640 ) 

641 

642 for depends_ref in depends_on_processor: 

643 if isinstance(depends_ref, str): 643 ↛ 657line 643 didn't jump to line 657 because the condition on line 643 was always true

644 if (self._plugin_name, depends_ref) in package_processors: 644 ↛ 646line 644 didn't jump to line 646 because the condition on line 644 was always true

645 depends_key = (self._plugin_name, depends_ref) 

646 elif ("debputy", depends_ref) in package_processors: 

647 depends_key = ("debputy", depends_ref) 

648 else: 

649 raise ValueError( 

650 f'Could not resolve dependency "{depends_ref}" for' 

651 f' "{processor_id}". It was not provided by the plugin itself' 

652 f" ({self._plugin_name}) nor debputy." 

653 ) 

654 else: 

655 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

656 # via the direct dependencies. 

657 assert False 

658 

659 existing_processor = package_processors.get(depends_key) 

660 if existing_processor is None: 660 ↛ 663line 660 didn't jump to line 663 because the condition on line 660 was never true

661 # We currently require the processor to be declared already. If this ever changes, 

662 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

663 dplugin_name, dprocessor_name = depends_key 

664 available_processors = ", ".join( 

665 n for p, n in package_processors.keys() if p == dplugin_name 

666 ) 

667 raise ValueError( 

668 f"The plugin {dplugin_name} does not provide a processor called" 

669 f" {dprocessor_name}. Available processors for that plugin are:" 

670 f" {available_processors}" 

671 ) 

672 dependencies.add(depends_key) 

673 

674 package_processors[processor_key] = PluginProvidedPackageProcessor( 

675 processor_id, 

676 package_types, 

677 wrap_plugin_code(self._plugin_name, processor), 

678 frozenset(dependencies), 

679 self._plugin_metadata, 

680 ) 

681 

682 def _unload() -> None: 

683 del package_processors[processor_key] 

684 

685 self._unloaders.append(_unload) 

686 

687 def automatic_discard_rule( 

688 self, 

689 name: str, 

690 should_discard: Callable[[VirtualPath], bool], 

691 *, 

692 rule_reference_documentation: str | None = None, 

693 examples: ( 

694 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample] 

695 ) = tuple(), 

696 ) -> None: 

697 """Register an automatic discard rule 

698 

699 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

700 If any discard rule concludes that a path should not be installed, then the path is not installed. 

701 In the case where the discard path is a: 

702 

703 * directory: Then the entire directory is excluded along with anything beneath it. 

704 * symlink: Then the symlink itself (but not its target) is excluded. 

705 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

706 

707 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

708 

709 Automatic discard rules should be written with the assumption that directories will be tested 

710 before their content *when it is relevant* for the discard rule to examine whether the directory 

711 can be excluded. 

712 

713 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

714 without any globs. As example: 

715 

716 installations: 

717 - install: 

718 sources: 

719 - usr/lib/libfoo.la # <-- This path is always installed 

720 # (Discard rules are never asked in this case) 

721 # 

722 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

723 # Though, they will not examine `libfoo.la` as it has already been installed 

724 # 

725 # Note: usr/lib itself is never tested in this case (it is assumed to be 

726 # explicitly requested). But any subdir of usr/lib will be examined. 

727 

728 When an automatic discard rule is evaluated, it can see the source path currently being considered 

729 for installation. While it can look at "surrounding" context (like parent directory), it will not 

730 know whether those paths are to be installed or will be installed. 

731 

732 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

733 metacharacters and spaces. 

734 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

735 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

736 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

737 A source path will either be from the root of the source tree or the root of a search directory such as 

738 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

739 evaluated. 

740 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

741 looks up this automatic discard rule. 

742 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

743 generate the examples. 

744 

745 """ 

746 self._restricted_api() 

747 auto_discard_rules = self._feature_set.auto_discard_rules 

748 existing = auto_discard_rules.get(name) 

749 if existing is not None: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 if existing.plugin_metadata.plugin_name == self._plugin_name: 

751 message = ( 

752 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

753 f' automatic discard rule "{name}" twice.' 

754 ) 

755 else: 

756 message = ( 

757 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

758 f" both tried to provide the automatic discard rule {name}" 

759 ) 

760 raise PluginConflictError( 

761 message, existing.plugin_metadata, self._plugin_metadata 

762 ) 

763 examples = ( 

764 (examples,) 

765 if isinstance(examples, AutomaticDiscardRuleExample) 

766 else tuple(examples) 

767 ) 

768 auto_discard_rules[name] = PluginProvidedDiscardRule( 

769 name, 

770 self._plugin_metadata, 

771 should_discard, 

772 rule_reference_documentation, 

773 examples, 

774 ) 

775 

776 def _unload() -> None: 

777 del auto_discard_rules[name] 

778 

779 self._unloaders.append(_unload) 

780 

781 def service_provider( 

782 self, 

783 service_manager: str, 

784 detector: ServiceDetector, 

785 integrator: ServiceIntegrator, 

786 ) -> None: 

787 self._restricted_api() 

788 service_managers = self._feature_set.service_managers 

789 existing = service_managers.get(service_manager) 

790 if existing is not None: 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true

791 if existing.plugin_metadata.plugin_name == self._plugin_name: 

792 message = ( 

793 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

794 f' service manager "{service_manager}" twice.' 

795 ) 

796 else: 

797 message = ( 

798 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

799 f' both tried to provide the service manager "{service_manager}"' 

800 ) 

801 raise PluginConflictError( 

802 message, existing.plugin_metadata, self._plugin_metadata 

803 ) 

804 service_managers[service_manager] = ServiceManagerDetails( 

805 service_manager, 

806 wrap_plugin_code(self._plugin_name, detector), 

807 wrap_plugin_code(self._plugin_name, integrator), 

808 self._plugin_metadata, 

809 ) 

810 

811 def _unload() -> None: 

812 del service_managers[service_manager] 

813 

814 self._unloaders.append(_unload) 

815 

816 def manifest_variable( 

817 self, 

818 variable_name: str, 

819 value: str, 

820 *, 

821 variable_reference_documentation: str | None = None, 

822 ) -> None: 

823 self._check_variable_name(variable_name) 

824 manifest_variables = self._feature_set.manifest_variables 

825 try: 

826 resolved_value = self._substitution.substitute( 

827 value, "Plugin initialization" 

828 ) 

829 depends_on_variable = resolved_value != value 

830 except DebputySubstitutionError: 

831 depends_on_variable = True 

832 if depends_on_variable: 

833 raise ValueError( 

834 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

835 f" This value depends on another variable, which is not supported. This restriction may be" 

836 f" lifted in the future." 

837 ) 

838 

839 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

840 self._plugin_metadata, 

841 variable_name, 

842 value, 

843 is_context_specific_variable=False, 

844 variable_reference_documentation=variable_reference_documentation, 

845 ) 

846 

847 def _unload() -> None: 

848 # We need to check it was never resolved 

849 raise PluginInitializationError( 

850 "Cannot unload manifest_variable (not implemented)" 

851 ) 

852 

853 self._unloaders.append(_unload) 

854 

855 @property 

856 def _plugin_name(self) -> str: 

857 return self._plugin_metadata.plugin_name 

858 

859 def provide_manifest_keyword( 

860 self, 

861 rule_type: TTP, 

862 rule_name: str | list[str], 

863 handler: DIPKWHandler, 

864 *, 

865 inline_reference_documentation: ParserDocumentation | None = None, 

866 ) -> None: 

867 self._restricted_api() 

868 parser_generator = self._feature_set.manifest_parser_generator 

869 if rule_type not in parser_generator.dispatchable_table_parsers: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 types = ", ".join( 

871 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

872 ) 

873 raise ValueError( 

874 f"The rule_type was not a supported type. It must be one of {types}" 

875 ) 

876 

877 inline_reference_documentation = self._pluggable_manifest_docs_for( 

878 rule_type, 

879 rule_name, 

880 inline_reference_documentation=inline_reference_documentation, 

881 ) 

882 

883 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

884 dispatching_parser.register_keyword( 

885 rule_name, 

886 wrap_plugin_code(self._plugin_name, handler), 

887 self._plugin_metadata, 

888 inline_reference_documentation=inline_reference_documentation, 

889 ) 

890 

891 def _unload() -> None: 

892 raise PluginInitializationError( 

893 "Cannot unload provide_manifest_keyword (not implemented)" 

894 ) 

895 

896 self._unloaders.append(_unload) 

897 

898 def pluggable_object_parser( 

899 self, 

900 rule_type: str, 

901 rule_name: str, 

902 *, 

903 object_parser_key: str | None = None, 

904 on_end_parse_step: None | ( 

905 Callable[ 

906 [str, Mapping[str, Any] | None, AttributePath, ParserContextData], 

907 None, 

908 ] 

909 ) = None, 

910 nested_in_package_context: bool = False, 

911 ) -> None: 

912 self._restricted_api() 

913 if object_parser_key is None: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 object_parser_key = rule_name 

915 

916 parser_generator = self._feature_set.manifest_parser_generator 

917 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

918 if rule_type not in dispatchable_object_parsers: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true

919 types = ", ".join(sorted(dispatchable_object_parsers)) 

920 raise ValueError( 

921 f"The rule_type was not a supported type. It must be one of {types}" 

922 ) 

923 if object_parser_key not in dispatchable_object_parsers: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true

924 types = ", ".join(sorted(dispatchable_object_parsers)) 

925 raise ValueError( 

926 f"The object_parser_key was not a supported type. It must be one of {types}" 

927 ) 

928 parent_dispatcher = dispatchable_object_parsers[rule_type] 

929 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

930 

931 if on_end_parse_step is not None: 931 ↛ 934line 931 didn't jump to line 934 because the condition on line 931 was always true

932 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step) 

933 

934 parent_dispatcher.register_child_parser( 

935 rule_name, 

936 child_dispatcher, 

937 self._plugin_metadata, 

938 on_end_parse_step=on_end_parse_step, 

939 nested_in_package_context=nested_in_package_context, 

940 ) 

941 

942 def _unload() -> None: 

943 raise PluginInitializationError( 

944 "Cannot unload pluggable_object_parser (not implemented)" 

945 ) 

946 

947 self._unloaders.append(_unload) 

948 

949 def pluggable_manifest_rule( 

950 self, 

951 rule_type: TTP | str, 

952 rule_name: str | Sequence[str], 

953 parsed_format: type[PF], 

954 handler: DIPHandler, 

955 *, 

956 source_format: SF | None = None, 

957 inline_reference_documentation: ParserDocumentation | None = None, 

958 expected_debputy_integration_mode: None | ( 

959 Container[DebputyIntegrationMode] 

960 ) = None, 

961 apply_standard_attribute_documentation: bool = False, 

962 register_value: bool = True, 

963 ) -> None: 

964 # When changing this, consider which types will be unrestricted 

965 self._restricted_api() 

966 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true

967 _error( 

968 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to" 

969 f" its use of apply_standard_attribute_documentation" 

970 ) 

971 feature_set = self._feature_set 

972 parser_generator = feature_set.manifest_parser_generator 

973 if isinstance(rule_type, str): 

974 if rule_type not in parser_generator.dispatchable_object_parsers: 974 ↛ 975line 974 didn't jump to line 975 because the condition on line 974 was never true

975 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

976 raise ValueError( 

977 f"The rule_type was not a supported type. It must be one of {types}" 

978 ) 

979 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

980 signature = inspect.signature(handler) 

981 if ( 981 ↛ 985line 981 didn't jump to line 985 because the condition on line 981 was never true

982 signature.return_annotation is signature.empty 

983 or signature.return_annotation == NoneType 

984 ): 

985 raise ValueError( 

986 "The handler must have a return type (that is not None)" 

987 ) 

988 register_as_type = signature.return_annotation 

989 else: 

990 # Dispatchable types cannot be resolved 

991 register_as_type = None 

992 if rule_type not in parser_generator.dispatchable_table_parsers: 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true

993 types = ", ".join( 

994 sorted( 

995 x.__name__ for x in parser_generator.dispatchable_table_parsers 

996 ) 

997 ) 

998 raise ValueError( 

999 f"The rule_type was not a supported type. It must be one of {types}" 

1000 ) 

1001 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

1002 

1003 if register_as_type is not None and not register_value: 

1004 register_as_type = None 

1005 

1006 if register_as_type is not None: 

1007 existing_registration = self._registered_manifest_types.get( 

1008 register_as_type 

1009 ) 

1010 if existing_registration is not None: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 raise ValueError( 

1012 f"Cannot register rule {rule_name!r} for plugin {self._plugin_name}. The plugin {existing_registration.plugin_name} already registered a manifest rule with type {register_as_type!r}" 

1013 ) 

1014 self._registered_manifest_types[register_as_type] = self._plugin_metadata 

1015 

1016 inline_reference_documentation = self._pluggable_manifest_docs_for( 

1017 rule_type, 

1018 rule_name, 

1019 inline_reference_documentation=inline_reference_documentation, 

1020 ) 

1021 

1022 if apply_standard_attribute_documentation: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true

1023 docs = _STD_ATTR_DOCS 

1024 else: 

1025 docs = None 

1026 

1027 parser = feature_set.manifest_parser_generator.generate_parser( 

1028 parsed_format, 

1029 source_content=source_format, 

1030 inline_reference_documentation=inline_reference_documentation, 

1031 expected_debputy_integration_mode=expected_debputy_integration_mode, 

1032 automatic_docs=docs, 

1033 ) 

1034 

1035 def _registering_handler( 

1036 name: str, 

1037 parsed_data: PF, 

1038 attribute_path: AttributePath, 

1039 parser_context: ParserContextData, 

1040 ) -> TP: 

1041 value = handler(name, parsed_data, attribute_path, parser_context) 

1042 if register_as_type is not None: 

1043 register_manifest_type_value_in_context(register_as_type, value) 

1044 return value 

1045 

1046 dispatching_parser.register_parser( 

1047 rule_name, 

1048 parser, 

1049 wrap_plugin_code(self._plugin_name, _registering_handler), 

1050 self._plugin_metadata, 

1051 ) 

1052 

1053 def _unload() -> None: 

1054 raise PluginInitializationError( 

1055 "Cannot unload pluggable_manifest_rule (not implemented)" 

1056 ) 

1057 

1058 self._unloaders.append(_unload) 

1059 

1060 def register_build_system( 

1061 self, 

1062 build_system_definition: type[BSPF], 

1063 ) -> None: 

1064 self._restricted_api() 

1065 if not is_typeddict(build_system_definition): 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true

1066 raise PluginInitializationError( 

1067 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__}," 

1068 f" but got {build_system_definition.__name__} instead" 

1069 ) 

1070 metadata = getattr( 

1071 build_system_definition, 

1072 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

1073 None, 

1074 ) 

1075 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1075 ↛ 1076line 1075 didn't jump to line 1076 because the condition on line 1075 was never true

1076 raise PluginIncorrectRegistrationError( 

1077 f"The {build_system_definition.__qualname__} type should have been annotated with" 

1078 f" @{debputy_build_system.__name__}." 

1079 ) 

1080 assert len(metadata.manifest_keywords) == 1 

1081 build_system_impl = metadata.build_system_impl 

1082 assert build_system_impl is not None 

1083 manifest_keyword = next(iter(metadata.manifest_keywords)) 

1084 self.pluggable_manifest_rule( 

1085 metadata.dispatched_type, 

1086 metadata.manifest_keywords, 

1087 build_system_definition, 

1088 # pluggable_manifest_rule does the wrapping 

1089 metadata.unwrapped_constructor, 

1090 source_format=metadata.source_format, 

1091 inline_reference_documentation=metadata.online_reference_documentation, 

1092 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL), 

1093 ) 

1094 self._auto_detectable_build_system( 

1095 manifest_keyword, 

1096 build_system_impl, 

1097 constructor=wrap_plugin_code( 

1098 self._plugin_name, 

1099 build_system_impl, 

1100 ), 

1101 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems, 

1102 ) 

1103 

1104 def _auto_detectable_build_system( 

1105 self, 

1106 manifest_keyword: str, 

1107 rule_type: type[BSR], 

1108 *, 

1109 shadowing_build_systems_when_active: frozenset[str] = frozenset(), 

1110 constructor: None | ( 

1111 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR] 

1112 ) = None, 

1113 ) -> None: 

1114 self._restricted_api() 

1115 feature_set = self._feature_set 

1116 existing = feature_set.auto_detectable_build_systems.get(rule_type) 

1117 if existing is not None: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 bs_name = rule_type.__class__.__name__ 

1119 if existing.plugin_metadata.plugin_name == self._plugin_name: 

1120 message = ( 

1121 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1122 f' auto-detection of the build system "{bs_name}" twice.' 

1123 ) 

1124 else: 

1125 message = ( 

1126 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

1127 f' both tried to provide auto-detection of the build system "{bs_name}"' 

1128 ) 

1129 raise PluginConflictError( 

1130 message, existing.plugin_metadata, self._plugin_metadata 

1131 ) 

1132 

1133 if constructor is None: 1133 ↛ 1135line 1133 didn't jump to line 1135 because the condition on line 1133 was never true

1134 

1135 def impl( 

1136 attributes: BuildRuleParsedFormat, 

1137 attribute_path: AttributePath, 

1138 manifest: "HighLevelManifest", 

1139 ) -> BSR: 

1140 return rule_type(attributes, attribute_path, manifest) 

1141 

1142 else: 

1143 impl = constructor 

1144 

1145 feature_set.auto_detectable_build_systems[rule_type] = ( 

1146 PluginProvidedBuildSystemAutoDetection( 

1147 manifest_keyword, 

1148 rule_type, 

1149 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system), 

1150 impl, 

1151 shadowing_build_systems_when_active, 

1152 self._plugin_metadata, 

1153 ) 

1154 ) 

1155 

1156 def _unload() -> None: 

1157 try: 

1158 del feature_set.auto_detectable_build_systems[rule_type] 

1159 except KeyError: 

1160 pass 

1161 

1162 self._unloaders.append(_unload) 

1163 

1164 def known_packaging_files( 

1165 self, 

1166 packaging_file_details: KnownPackagingFileInfo, 

1167 ) -> None: 

1168 known_packaging_files = self._feature_set.known_packaging_files 

1169 detection_method = packaging_file_details.get( 

1170 "detection_method", cast("Literal['path']", "path") 

1171 ) 

1172 path = packaging_file_details.get("path") 

1173 dhpkgfile = packaging_file_details.get("pkgfile") 

1174 

1175 packaging_file_details = packaging_file_details.copy() 

1176 

1177 if detection_method == "path": 1177 ↛ 1193line 1177 didn't jump to line 1193 because the condition on line 1177 was always true

1178 if dhpkgfile is not None: 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true

1179 raise ValueError( 

1180 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

1181 ) 

1182 if path is None: 1182 ↛ 1183line 1182 didn't jump to line 1183 because the condition on line 1182 was never true

1183 raise ValueError( 

1184 'The "path" attribute must be present when detection-method is "path" (or omitted)' 

1185 ) 

1186 if path != _normalize_path(path, with_prefix=False): 1186 ↛ 1187line 1186 didn't jump to line 1187 because the condition on line 1186 was never true

1187 raise ValueError( 

1188 f"The path for known packaging files must be normalized. Please replace" 

1189 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

1190 ) 

1191 detection_value = path 

1192 else: 

1193 assert detection_method == "dh.pkgfile" 

1194 if path is not None: 

1195 raise ValueError( 

1196 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

1197 ) 

1198 if dhpkgfile is None: 

1199 raise ValueError( 

1200 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"' 

1201 ) 

1202 if "/" in dhpkgfile: 

1203 raise ValueError( 

1204 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

1205 ) 

1206 detection_value = dhpkgfile 

1207 key = f"{detection_method}::{detection_value}" 

1208 existing = known_packaging_files.get(key) 

1209 if existing is not None: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true

1210 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1211 message = ( 

1212 f'The key "{key}" is registered twice for known packaging files.' 

1213 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1214 ) 

1215 else: 

1216 message = ( 

1217 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1218 f' key "{key}" twice for known packaging files.' 

1219 ) 

1220 raise PluginConflictError( 

1221 message, existing.plugin_metadata, self._plugin_metadata 

1222 ) 

1223 _validate_known_packaging_file_dh_compat_rules( 

1224 packaging_file_details.get("dh_compat_rules") 

1225 ) 

1226 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

1227 packaging_file_details, 

1228 detection_method, 

1229 detection_value, 

1230 self._plugin_metadata, 

1231 ) 

1232 

1233 def _unload() -> None: 

1234 del known_packaging_files[key] 

1235 

1236 self._unloaders.append(_unload) 

1237 

1238 def register_mapped_type( 

1239 self, 

1240 type_mapping: TypeMapping, 

1241 *, 

1242 reference_documentation: TypeMappingDocumentation | None = None, 

1243 ) -> None: 

1244 self._restricted_api() 

1245 target_type = type_mapping.target_type 

1246 mapped_types = self._feature_set.mapped_types 

1247 existing = mapped_types.get(target_type) 

1248 if existing is not None: 1248 ↛ 1249line 1248 didn't jump to line 1249 because the condition on line 1248 was never true

1249 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1250 message = ( 

1251 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

1252 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1253 ) 

1254 else: 

1255 message = ( 

1256 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1257 f' key "{target_type.__name__}" twice for known packaging files.' 

1258 ) 

1259 raise PluginConflictError( 

1260 message, existing.plugin_metadata, self._plugin_metadata 

1261 ) 

1262 parser_generator = self._feature_set.manifest_parser_generator 

1263 # TODO: Wrap the mapper in the plugin context 

1264 mapped_types[target_type] = PluginProvidedTypeMapping( 

1265 type_mapping, reference_documentation, self._plugin_metadata 

1266 ) 

1267 parser_generator.register_mapped_type(type_mapping) 

1268 

1269 def _restricted_api( 

1270 self, 

1271 *, 

1272 allowed_plugins: set[str] | frozenset[str] = frozenset(), 

1273 ) -> None: 

1274 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true

1275 raise PluginAPIViolationError( 

1276 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

1277 " If you are the maintainer of this plugin and want access to this" 

1278 " API, please file a feature request to make this public." 

1279 " (The API is currently private as it is unstable.)" 

1280 ) 

1281 

1282 

1283class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

1284 __slots__ = () 

1285 

1286 def _append_script( 

1287 self, 

1288 caller_name: str, 

1289 maintscript: Maintscript, 

1290 full_script: str, 

1291 /, 

1292 perform_substitution: bool = True, 

1293 ) -> None: 

1294 raise NotImplementedError 

1295 

1296 @classmethod 

1297 def _apply_condition_to_script( 

1298 cls, 

1299 condition: str, 

1300 run_snippet: str, 

1301 /, 

1302 indent: bool | None = None, 

1303 ) -> str: 

1304 if indent is None: 

1305 # We auto-determine this based on heredocs currently 

1306 indent = "<<" not in run_snippet 

1307 

1308 if indent: 

1309 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1310 if not run_snippet.endswith("\n"): 

1311 run_snippet += "\n" 

1312 condition_line = f"if {condition}; then\n" 

1313 end_line = "fi\n" 

1314 return "".join((condition_line, run_snippet, end_line)) 

1315 

1316 def on_configure( 

1317 self, 

1318 run_snippet: str, 

1319 /, 

1320 indent: bool | None = None, 

1321 perform_substitution: bool = True, 

1322 skip_on_rollback: bool = False, 

1323 ) -> None: 

1324 condition = POSTINST_DEFAULT_CONDITION 

1325 if skip_on_rollback: 1325 ↛ 1326line 1325 didn't jump to line 1326 because the condition on line 1325 was never true

1326 condition = '[ "$1" = "configure" ]' 

1327 return self._append_script( 

1328 "on_configure", 

1329 "postinst", 

1330 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1331 perform_substitution=perform_substitution, 

1332 ) 

1333 

1334 def on_initial_install( 

1335 self, 

1336 run_snippet: str, 

1337 /, 

1338 indent: bool | None = None, 

1339 perform_substitution: bool = True, 

1340 ) -> None: 

1341 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1342 return self._append_script( 

1343 "on_initial_install", 

1344 "postinst", 

1345 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1346 perform_substitution=perform_substitution, 

1347 ) 

1348 

1349 def on_upgrade( 

1350 self, 

1351 run_snippet: str, 

1352 /, 

1353 indent: bool | None = None, 

1354 perform_substitution: bool = True, 

1355 ) -> None: 

1356 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1357 return self._append_script( 

1358 "on_upgrade", 

1359 "postinst", 

1360 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1361 perform_substitution=perform_substitution, 

1362 ) 

1363 

1364 def on_upgrade_from( 

1365 self, 

1366 version: str, 

1367 run_snippet: str, 

1368 /, 

1369 indent: bool | None = None, 

1370 perform_substitution: bool = True, 

1371 ) -> None: 

1372 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1373 return self._append_script( 

1374 "on_upgrade_from", 

1375 "postinst", 

1376 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1377 perform_substitution=perform_substitution, 

1378 ) 

1379 

1380 def on_before_removal( 

1381 self, 

1382 run_snippet: str, 

1383 /, 

1384 indent: bool | None = None, 

1385 perform_substitution: bool = True, 

1386 ) -> None: 

1387 condition = '[ "$1" = "remove" ]' 

1388 return self._append_script( 

1389 "on_before_removal", 

1390 "prerm", 

1391 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1392 perform_substitution=perform_substitution, 

1393 ) 

1394 

1395 def on_removed( 

1396 self, 

1397 run_snippet: str, 

1398 /, 

1399 indent: bool | None = None, 

1400 perform_substitution: bool = True, 

1401 ) -> None: 

1402 condition = '[ "$1" = "remove" ]' 

1403 return self._append_script( 

1404 "on_removed", 

1405 "postrm", 

1406 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1407 perform_substitution=perform_substitution, 

1408 ) 

1409 

1410 def on_purge( 

1411 self, 

1412 run_snippet: str, 

1413 /, 

1414 indent: bool | None = None, 

1415 perform_substitution: bool = True, 

1416 ) -> None: 

1417 condition = '[ "$1" = "purge" ]' 

1418 return self._append_script( 

1419 "on_purge", 

1420 "postrm", 

1421 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1422 perform_substitution=perform_substitution, 

1423 ) 

1424 

1425 def unconditionally_in_script( 

1426 self, 

1427 maintscript: Maintscript, 

1428 run_snippet: str, 

1429 /, 

1430 perform_substitution: bool = True, 

1431 ) -> None: 

1432 if maintscript not in STD_CONTROL_SCRIPTS: 1432 ↛ 1433line 1432 didn't jump to line 1433 because the condition on line 1432 was never true

1433 raise ValueError( 

1434 f'Unknown script "{maintscript}". Should have been one of:' 

1435 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1436 ) 

1437 return self._append_script( 

1438 "unconditionally_in_script", 

1439 maintscript, 

1440 run_snippet, 

1441 perform_substitution=perform_substitution, 

1442 ) 

1443 

1444 

1445class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1446 __slots__ = ( 

1447 "_plugin_metadata", 

1448 "_maintscript_snippets", 

1449 "_plugin_source_id", 

1450 "_package_substitution", 

1451 "_default_snippet_order", 

1452 ) 

1453 

1454 def __init__( 

1455 self, 

1456 plugin_metadata: DebputyPluginMetadata, 

1457 plugin_source_id: str, 

1458 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1459 package_substitution: Substitution, 

1460 *, 

1461 default_snippet_order: Literal["service"] | None = None, 

1462 ): 

1463 self._plugin_metadata = plugin_metadata 

1464 self._plugin_source_id = plugin_source_id 

1465 self._maintscript_snippets = maintscript_snippets 

1466 self._package_substitution = package_substitution 

1467 self._default_snippet_order = default_snippet_order 

1468 

1469 def _append_script( 

1470 self, 

1471 caller_name: str, 

1472 maintscript: Maintscript, 

1473 full_script: str, 

1474 /, 

1475 perform_substitution: bool = True, 

1476 ) -> None: 

1477 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1478 if perform_substitution: 

1479 full_script = self._package_substitution.substitute(full_script, def_source) 

1480 

1481 snippet = MaintscriptSnippet( 

1482 snippet=full_script, 

1483 definition_source=def_source, 

1484 snippet_order=self._default_snippet_order, 

1485 ) 

1486 self._maintscript_snippets[maintscript].append(snippet) 

1487 

1488 

1489class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1490 __slots__ = ( 

1491 "_plugin_metadata", 

1492 "_plugin_source_id", 

1493 "_package_metadata_context", 

1494 "_triggers", 

1495 "_substvars", 

1496 "_maintscript", 

1497 "_shlibs_details", 

1498 ) 

1499 

1500 def __init__( 

1501 self, 

1502 plugin_metadata: DebputyPluginMetadata, 

1503 plugin_source_id: str, 

1504 package_metadata_context: PackageProcessingContext, 

1505 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1506 substvars: FlushableSubstvars, 

1507 shlibs_details: tuple[str | None, list[str] | None], 

1508 ) -> None: 

1509 self._plugin_metadata = plugin_metadata 

1510 self._plugin_source_id = plugin_source_id 

1511 self._package_metadata_context = package_metadata_context 

1512 self._triggers = triggers 

1513 self._substvars = substvars 

1514 self._maintscript: MaintscriptAccessor | None = None 

1515 self._shlibs_details = shlibs_details 

1516 

1517 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1518 raise NotImplementedError 

1519 

1520 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1521 """Register a declarative dpkg level trigger 

1522 

1523 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1524 

1525 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1526 """ 

1527 key = (trigger_type, trigger_target) 

1528 if key in self._triggers: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true

1529 return 

1530 self._triggers[key] = PluginProvidedTrigger( 

1531 dpkg_trigger_type=trigger_type, 

1532 dpkg_trigger_target=trigger_target, 

1533 provider=self._plugin_metadata, 

1534 provider_source_id=self._plugin_source_id, 

1535 ) 

1536 

1537 @property 

1538 def maintscript(self) -> MaintscriptAccessor: 

1539 maintscript = self._maintscript 

1540 if maintscript is None: 

1541 maintscript = self._create_maintscript_accessor() 

1542 self._maintscript = maintscript 

1543 return maintscript 

1544 

1545 @property 

1546 def substvars(self) -> FlushableSubstvars: 

1547 return self._substvars 

1548 

1549 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1550 binary_package = self._package_metadata_context.binary_package 

1551 with self.substvars.flush() as substvars_file: 

1552 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1553 if binary_package.is_udeb: 

1554 dpkg_cmd.append("-tudeb") 

1555 if binary_package.is_essential: 1555 ↛ 1556line 1555 didn't jump to line 1556 because the condition on line 1555 was never true

1556 dpkg_cmd.append("-dPre-Depends") 

1557 shlibs_local, shlib_dirs = self._shlibs_details 

1558 if shlibs_local is not None: 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true

1559 dpkg_cmd.append(f"-L{shlibs_local}") 

1560 if shlib_dirs: 1560 ↛ 1561line 1560 didn't jump to line 1561 because the condition on line 1560 was never true

1561 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1562 dpkg_cmd.extend(p.fs_path for p in paths) 

1563 print_command(*dpkg_cmd) 

1564 try: 

1565 subprocess.check_call(dpkg_cmd) 

1566 except subprocess.CalledProcessError: 

1567 _error( 

1568 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1569 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1570 ) 

1571 

1572 

1573class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1574 __slots__ = ( 

1575 "_maintscript", 

1576 "_maintscript_snippets", 

1577 "_package_substitution", 

1578 ) 

1579 

1580 def __init__( 

1581 self, 

1582 plugin_metadata: DebputyPluginMetadata, 

1583 plugin_source_id: str, 

1584 package_metadata_context: PackageProcessingContext, 

1585 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1586 substvars: FlushableSubstvars, 

1587 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1588 package_substitution: Substitution, 

1589 shlibs_details: tuple[str | None, list[str] | None], 

1590 *, 

1591 default_snippet_order: Literal["service"] | None = None, 

1592 ) -> None: 

1593 super().__init__( 

1594 plugin_metadata, 

1595 plugin_source_id, 

1596 package_metadata_context, 

1597 triggers, 

1598 substvars, 

1599 shlibs_details, 

1600 ) 

1601 self._maintscript_snippets = maintscript_snippets 

1602 self._package_substitution = package_substitution 

1603 self._maintscript = MaintscriptAccessorProvider( 

1604 plugin_metadata, 

1605 plugin_source_id, 

1606 maintscript_snippets, 

1607 package_substitution, 

1608 default_snippet_order=default_snippet_order, 

1609 ) 

1610 

1611 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1612 return MaintscriptAccessorProvider( 

1613 self._plugin_metadata, 

1614 self._plugin_source_id, 

1615 self._maintscript_snippets, 

1616 self._package_substitution, 

1617 ) 

1618 

1619 

1620class BinaryCtrlAccessorProviderCreator: 

1621 def __init__( 

1622 self, 

1623 package_metadata_context: PackageProcessingContext, 

1624 substvars: FlushableSubstvars, 

1625 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1626 substitution: Substitution, 

1627 ) -> None: 

1628 self._package_metadata_context = package_metadata_context 

1629 self._substvars = substvars 

1630 self._maintscript_snippets = maintscript_snippets 

1631 self._substitution = substitution 

1632 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1633 self.shlibs_details: tuple[str | None, list[str] | None] = None, None 

1634 

1635 def for_plugin( 

1636 self, 

1637 plugin_metadata: DebputyPluginMetadata, 

1638 plugin_source_id: str, 

1639 *, 

1640 default_snippet_order: Literal["service"] | None = None, 

1641 ) -> BinaryCtrlAccessor: 

1642 return BinaryCtrlAccessorProvider( 

1643 plugin_metadata, 

1644 plugin_source_id, 

1645 self._package_metadata_context, 

1646 self._triggers, 

1647 self._substvars, 

1648 self._maintscript_snippets, 

1649 self._substitution, 

1650 self.shlibs_details, 

1651 default_snippet_order=default_snippet_order, 

1652 ) 

1653 

1654 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1655 return self._triggers.values() 

1656 

1657 

1658def _resolve_bundled_plugin_docs_path( 

1659 plugin_name: str, 

1660 loader: PluginInitializationEntryPoint | None, 

1661) -> Traversable | Path | None: 

1662 plugin_module = getattr(loader, "__module__") 

1663 assert plugin_module is not None 

1664 plugin_package_name = sys.modules[plugin_module].__package__ 

1665 return importlib.resources.files(plugin_package_name).joinpath( 

1666 f"{plugin_name}_docs.yaml" 

1667 ) 

1668 

1669 

1670def plugin_metadata_for_debputys_own_plugin( 

1671 loader: PluginInitializationEntryPoint | None = None, 

1672) -> DebputyPluginMetadata: 

1673 if loader is None: 

1674 from debputy.plugins.debputy.debputy_plugin import ( 

1675 initialize_debputy_features, 

1676 ) 

1677 

1678 loader = initialize_debputy_features 

1679 plugin_name = "debputy" 

1680 return DebputyPluginMetadata( 

1681 plugin_name="debputy", 

1682 api_compat_version=1, 

1683 plugin_initializer=loader, 

1684 plugin_loader=None, 

1685 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path( 

1686 plugin_name, 

1687 loader, 

1688 ), 

1689 plugin_path="<bundled>", 

1690 ) 

1691 

1692 

1693def load_plugin_features( 

1694 plugin_search_dirs: Sequence[str], 

1695 substitution: Substitution, 

1696 requested_plugins_only: Sequence[str] | None = None, 

1697 required_plugins: set[str] | None = None, 

1698 plugin_feature_set: PluginProvidedFeatureSet | None = None, 

1699 debug_mode: bool = False, 

1700) -> PluginProvidedFeatureSet: 

1701 if plugin_feature_set is None: 

1702 plugin_feature_set = PluginProvidedFeatureSet() 

1703 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1704 unloadable_plugins = set() 

1705 if required_plugins: 

1706 plugins.extend( 

1707 find_json_plugins( 

1708 plugin_search_dirs, 

1709 required_plugins, 

1710 ) 

1711 ) 

1712 if requested_plugins_only is not None: 

1713 plugins.extend( 

1714 find_json_plugins( 

1715 plugin_search_dirs, 

1716 requested_plugins_only, 

1717 ) 

1718 ) 

1719 else: 

1720 auto_loaded = _find_all_json_plugins( 

1721 plugin_search_dirs, 

1722 required_plugins if required_plugins is not None else frozenset(), 

1723 debug_mode=debug_mode, 

1724 ) 

1725 for plugin_metadata in auto_loaded: 

1726 plugins.append(plugin_metadata) 

1727 unloadable_plugins.add(plugin_metadata.plugin_name) 

1728 

1729 for plugin_metadata in plugins: 

1730 api = DebputyPluginInitializerProvider( 

1731 plugin_metadata, plugin_feature_set, substitution 

1732 ) 

1733 try: 

1734 api.load_plugin() 

1735 except PluginBaseError as e: 

1736 if plugin_metadata.plugin_name not in unloadable_plugins: 

1737 raise 

1738 if debug_mode: 

1739 _warn( 

1740 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1741 f" to --debug/-d or DEBPUTY_DEBUG=1" 

1742 ) 

1743 raise 

1744 try: 

1745 api.unload_plugin() 

1746 except Exception: 

1747 _warn( 

1748 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1749 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1750 " module might have tainted the feature set." 

1751 ) 

1752 raise e from None 

1753 _warn( 

1754 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1755 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace" 

1756 f" (the warning will become an error)" 

1757 ) 

1758 

1759 return plugin_feature_set 

1760 

1761 

1762def find_json_plugin( 

1763 search_dirs: Sequence[str], 

1764 requested_plugin: str, 

1765) -> DebputyPluginMetadata: 

1766 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1767 assert len(r) == 1 

1768 return r[0] 

1769 

1770 

1771def find_related_implementation_files_for_plugin( 

1772 plugin_metadata: DebputyPluginMetadata, 

1773) -> list[str]: 

1774 if plugin_metadata.is_bundled: 

1775 plugin_name = plugin_metadata.plugin_name 

1776 _error( 

1777 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1778 " or loaded via a mechanism that does not support detecting its tests." 

1779 ) 

1780 

1781 if plugin_metadata.is_from_python_path: 

1782 plugin_name = plugin_metadata.plugin_name 

1783 # Maybe they could be, but that is for another day. 

1784 _error( 

1785 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1786 " and these are not supported." 

1787 ) 

1788 files = [] 

1789 module_name, module_file = _find_plugin_implementation_file( 

1790 plugin_metadata.plugin_name, 

1791 plugin_metadata.plugin_path, 

1792 ) 

1793 if os.path.isfile(module_file): 

1794 files.append(module_file) 

1795 else: 

1796 if not plugin_metadata.is_loaded: 

1797 plugin_metadata.load_plugin() 

1798 if module_name in sys.modules: 

1799 _error( 

1800 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1801 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1802 f" installed via this method. The related Python would not be installed" 

1803 f" (which would result in a plugin that would fail to load)" 

1804 ) 

1805 

1806 return files 

1807 

1808 

1809def find_tests_for_plugin( 

1810 plugin_metadata: DebputyPluginMetadata, 

1811) -> list[str]: 

1812 plugin_name = plugin_metadata.plugin_name 

1813 plugin_path = plugin_metadata.plugin_path 

1814 

1815 if plugin_metadata.is_bundled: 

1816 _error( 

1817 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1818 " mechanism that does not support detecting its tests." 

1819 ) 

1820 

1821 if plugin_metadata.is_from_python_path: 

1822 plugin_name = plugin_metadata.plugin_name 

1823 # Maybe they could be, but that is for another day. 

1824 _error( 

1825 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1826 " and these are not supported." 

1827 ) 

1828 

1829 plugin_dir = os.path.dirname(plugin_path) 

1830 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1831 tests = [] 

1832 with os.scandir(plugin_dir) as dir_iter: 

1833 for p in dir_iter: 

1834 if ( 

1835 p.is_file() 

1836 and p.name.startswith(test_basename_prefix) 

1837 and PLUGIN_TEST_SUFFIX.search(p.name) 

1838 ): 

1839 tests.append(p.path) 

1840 return tests 

1841 

1842 

1843def find_json_plugins( 

1844 search_dirs: Sequence[str], 

1845 requested_plugins: Iterable[str], 

1846) -> Iterable[DebputyPluginMetadata]: 

1847 for plugin_name_or_path in requested_plugins: 1847 ↛ exitline 1847 didn't return from function 'find_json_plugins' because the loop on line 1847 didn't complete

1848 if "/" in plugin_name_or_path: 1848 ↛ 1849line 1848 didn't jump to line 1849 because the condition on line 1848 was never true

1849 if not os.path.isfile(plugin_name_or_path): 

1850 raise PluginNotFoundError( 

1851 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1852 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1853 " is not used." 

1854 ) 

1855 yield parse_json_plugin_desc(plugin_name_or_path) 

1856 return 

1857 for search_dir in search_dirs: 1857 ↛ 1866line 1857 didn't jump to line 1866 because the loop on line 1857 didn't complete

1858 path = os.path.join( 

1859 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1860 ) 

1861 if not os.path.isfile(path): 1861 ↛ 1862line 1861 didn't jump to line 1862 because the condition on line 1861 was never true

1862 continue 

1863 yield parse_json_plugin_desc(path) 

1864 return 

1865 

1866 path_root = PLUGIN_PYTHON_RES_PATH 

1867 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json") 

1868 if pp_path or pp_path.is_file(): 

1869 with pp_path.open() as fd: 

1870 yield parse_json_plugin_desc( 

1871 f"PYTHONPATH:debputy/plugins/{pp_path.name}", 

1872 fd=fd, 

1873 is_from_python_path=True, 

1874 ) 

1875 return 

1876 

1877 search_dir_str = ":".join(search_dirs) 

1878 raise PluginNotFoundError( 

1879 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1880 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1881 ) 

1882 

1883 

1884def _find_all_json_plugins( 

1885 search_dirs: Sequence[str], 

1886 required_plugins: AbstractSet[str], 

1887 debug_mode: bool = False, 

1888) -> Iterable[DebputyPluginMetadata]: 

1889 seen = set(required_plugins) 

1890 error_seen = False 

1891 for search_dir in search_dirs: 

1892 try: 

1893 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1894 except FileNotFoundError: 

1895 continue 

1896 with dir_fd: 

1897 for entry in dir_fd: 

1898 if ( 

1899 not entry.is_file(follow_symlinks=True) 

1900 or not entry.name.endswith(".json") 

1901 or entry.name in seen 

1902 ): 

1903 continue 

1904 seen.add(entry.name) 

1905 try: 

1906 plugin_metadata = parse_json_plugin_desc(entry.path) 

1907 except PluginBaseError as e: 

1908 if debug_mode: 

1909 raise 

1910 if not error_seen: 

1911 error_seen = True 

1912 _warn( 

1913 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1914 ) 

1915 else: 

1916 _warn( 

1917 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1918 ) 

1919 else: 

1920 yield plugin_metadata 

1921 

1922 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir(): 

1923 if ( 

1924 not pp_entry.name.endswith(".json") 

1925 or not pp_entry.is_file() 

1926 or pp_entry.name in seen 

1927 ): 

1928 continue 

1929 seen.add(pp_entry.name) 

1930 with pp_entry.open() as fd: 

1931 yield parse_json_plugin_desc( 

1932 f"PYTHONPATH:debputy/plugins/{pp_entry.name}", 

1933 fd=fd, 

1934 is_from_python_path=True, 

1935 ) 

1936 

1937 

1938def _find_plugin_implementation_file( 

1939 plugin_name: str, 

1940 json_file_path: str, 

1941) -> tuple[str, str]: 

1942 guessed_module_basename = plugin_name.replace("-", "_") 

1943 module_name = f"debputy.plugins.{guessed_module_basename}" 

1944 module_fs_path = os.path.join( 

1945 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1946 ) 

1947 return module_name, module_fs_path 

1948 

1949 

1950def _resolve_module_initializer( 

1951 plugin_name: str, 

1952 plugin_initializer_name: str, 

1953 module_name: str | None, 

1954 json_file_path: str, 

1955) -> PluginInitializationEntryPoint: 

1956 module = None 

1957 module_fs_path = None 

1958 if module_name is None: 1958 ↛ 1986line 1958 didn't jump to line 1986 because the condition on line 1958 was always true

1959 module_name, module_fs_path = _find_plugin_implementation_file( 

1960 plugin_name, json_file_path 

1961 ) 

1962 if os.path.isfile(module_fs_path): 1962 ↛ 1986line 1962 didn't jump to line 1986 because the condition on line 1962 was always true

1963 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1964 if spec is None: 1964 ↛ 1965line 1964 didn't jump to line 1965 because the condition on line 1964 was never true

1965 raise PluginInitializationError( 

1966 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1967 " The spec_from_file_location function returned None." 

1968 ) 

1969 mod = importlib.util.module_from_spec(spec) 

1970 loader = spec.loader 

1971 if loader is None: 1971 ↛ 1972line 1971 didn't jump to line 1972 because the condition on line 1971 was never true

1972 raise PluginInitializationError( 

1973 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1974 " Python could not find a suitable loader (spec.loader was None)" 

1975 ) 

1976 sys.modules[module_name] = mod 

1977 try: 

1978 run_in_context_of_plugin(plugin_name, loader.exec_module, mod) 

1979 except (Exception, GeneratorExit) as e: 

1980 raise PluginInitializationError( 

1981 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1982 " The module threw an exception while being loaded." 

1983 ) from e 

1984 module = mod 

1985 

1986 if module is None: 1986 ↛ 1987line 1986 didn't jump to line 1987 because the condition on line 1986 was never true

1987 try: 

1988 module = run_in_context_of_plugin( 

1989 plugin_name, importlib.import_module, module_name 

1990 ) 

1991 except ModuleNotFoundError as e: 

1992 if module_fs_path is None: 

1993 raise PluginMetadataError( 

1994 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1995 " this module is not available in the python search path" 

1996 ) from e 

1997 raise PluginInitializationError( 

1998 f"Failed to load {plugin_name}. Tried loading it from" 

1999 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

2000 f" {module_name} (where it was not found either). Please ensure" 

2001 " the module code is installed in the correct spot or provide an" 

2002 f' explicit "module" definition in {json_file_path}.' 

2003 ) from e 

2004 

2005 plugin_initializer = run_in_context_of_plugin_wrap_errors( 

2006 plugin_name, 

2007 getattr, 

2008 module, 

2009 plugin_initializer_name, 

2010 None, 

2011 ) 

2012 

2013 if plugin_initializer is None: 2013 ↛ 2014line 2013 didn't jump to line 2014 because the condition on line 2013 was never true

2014 raise PluginMetadataError( 

2015 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2016 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute' 

2017 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name" 

2018 " in the Python module." 

2019 ) 

2020 if isinstance(plugin_initializer, DebputyPluginDefinition): 

2021 return plugin_initializer.initialize 

2022 if not callable(plugin_initializer): 2022 ↛ 2023line 2022 didn't jump to line 2023 because the condition on line 2022 was never true

2023 raise PluginMetadataError( 

2024 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2025 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that' 

2026 " attribute exists, it is neither a `DebputyPluginDefinition`" 

2027 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`" 

2028 " (`def initialize(api: DebputyPluginInitializer) -> None:`)." 

2029 ) 

2030 return cast("PluginInitializationEntryPoint", plugin_initializer) 

2031 

2032 

2033def _json_plugin_loader( 

2034 plugin_name: str, 

2035 plugin_json_metadata: PluginJsonMetadata, 

2036 json_file_path: str, 

2037 attribute_path: AttributePath, 

2038) -> Callable[["DebputyPluginInitializer"], None]: 

2039 api_compat = plugin_json_metadata["api_compat_version"] 

2040 module_name = plugin_json_metadata.get("module") 

2041 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

2042 packager_provided_files_raw = plugin_json_metadata.get( 

2043 "packager_provided_files", [] 

2044 ) 

2045 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

2046 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

2047 if api_compat != 1: 2047 ↛ 2048line 2047 didn't jump to line 2048 because the condition on line 2047 was never true

2048 raise PluginMetadataError( 

2049 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

2050 f" version of debputy only supports API compat version of 1" 

2051 ) 

2052 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2052 ↛ 2053line 2052 didn't jump to line 2053 because the condition on line 2052 was never true

2053 p = attribute_path["plugin_initializer"] 

2054 raise PluginMetadataError( 

2055 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

2056 ) 

2057 

2058 plugin_initializers = [] 

2059 

2060 if plugin_initializer_name is not None: 

2061 plugin_initializer = _resolve_module_initializer( 

2062 plugin_name, 

2063 plugin_initializer_name, 

2064 module_name, 

2065 json_file_path, 

2066 ) 

2067 plugin_initializers.append(plugin_initializer) 

2068 

2069 if known_packaging_files_raw: 

2070 kpf_root_path = attribute_path["known_packaging_files"] 

2071 known_packaging_files = [] 

2072 for k, v in enumerate(known_packaging_files_raw): 

2073 kpf_path = kpf_root_path[k] 

2074 p = v.get("path") 

2075 if isinstance(p, str): 2075 ↛ 2077line 2075 didn't jump to line 2077 because the condition on line 2075 was always true

2076 kpf_path.path_hint = p 

2077 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2077 ↛ 2089line 2077 didn't jump to line 2089 because the condition on line 2077 was always true

2078 docs = v.get("documentation-uris") 

2079 if docs is not None and isinstance(docs, list): 

2080 docs = [ 

2081 ( 

2082 d.replace("@DEBPUTY_DOC_ROOT_DIR@", debputy_doc_root_dir()) 

2083 if isinstance(d, str) 

2084 else d 

2085 ) 

2086 for d in docs 

2087 ] 

2088 v["documentation-uris"] = docs 

2089 known_packaging_file: KnownPackagingFileInfo = ( 

2090 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

2091 v, 

2092 kpf_path, 

2093 ) 

2094 ) 

2095 known_packaging_files.append((kpf_path, known_packaging_file)) 

2096 

2097 def _initialize_json_provided_known_packaging_files( 

2098 api: DebputyPluginInitializerProvider, 

2099 ) -> None: 

2100 for p, details in known_packaging_files: 

2101 try: 

2102 api.known_packaging_files(details) 

2103 except ValueError as ex: 

2104 raise PluginMetadataError( 

2105 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

2106 ) 

2107 

2108 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

2109 

2110 if manifest_variables_raw: 

2111 manifest_var_path = attribute_path["manifest_variables"] 

2112 manifest_variables = [ 

2113 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

2114 for i, p in enumerate(manifest_variables_raw) 

2115 ] 

2116 

2117 def _initialize_json_provided_manifest_vars( 

2118 api: DebputyPluginInitializer, 

2119 ) -> None: 

2120 for idx, manifest_variable in enumerate(manifest_variables): 

2121 name = manifest_variable["name"] 

2122 value = manifest_variable["value"] 

2123 doc = manifest_variable.get("reference_documentation") 

2124 try: 

2125 api.manifest_variable( 

2126 name, value, variable_reference_documentation=doc 

2127 ) 

2128 except ValueError as ex: 

2129 var_path = manifest_var_path[idx] 

2130 raise PluginMetadataError( 

2131 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

2132 ) 

2133 

2134 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

2135 

2136 if packager_provided_files_raw: 

2137 ppf_path = attribute_path["packager_provided_files"] 

2138 ppfs = [ 

2139 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

2140 for i, p in enumerate(packager_provided_files_raw) 

2141 ] 

2142 

2143 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

2144 ppf: PackagerProvidedFileJsonDescription 

2145 for idx, ppf in enumerate(ppfs): 

2146 c = dict(ppf) 

2147 stem = ppf["stem"] 

2148 installed_path = ppf["installed_path"] 

2149 default_mode = ppf.get("default_mode") 

2150 ref_doc_dict = ppf.get("reference_documentation") 

2151 if default_mode is not None: 2151 ↛ 2154line 2151 didn't jump to line 2154 because the condition on line 2151 was always true

2152 c["default_mode"] = default_mode.octal_mode 

2153 

2154 if ref_doc_dict is not None: 2154 ↛ 2159line 2154 didn't jump to line 2159 because the condition on line 2154 was always true

2155 ref_doc = packager_provided_file_reference_documentation( 

2156 **ref_doc_dict 

2157 ) 

2158 else: 

2159 ref_doc = None 

2160 

2161 for k in [ 

2162 "stem", 

2163 "installed_path", 

2164 "reference_documentation", 

2165 ]: 

2166 try: 

2167 del c[k] 

2168 except KeyError: 

2169 pass 

2170 

2171 try: 

2172 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

2173 except ValueError as ex: 

2174 p_path = ppf_path[idx] 

2175 raise PluginMetadataError( 

2176 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

2177 ) 

2178 

2179 plugin_initializers.append(_initialize_json_provided_ppfs) 

2180 

2181 if not plugin_initializers: 2181 ↛ 2182line 2181 didn't jump to line 2182 because the condition on line 2181 was never true

2182 raise PluginMetadataError( 

2183 f"The plugin defined in {json_file_path} does not seem to provide features," 

2184 f" such as module + plugin-initializer or packager-provided-files." 

2185 ) 

2186 

2187 if len(plugin_initializers) == 1: 

2188 return plugin_initializers[0] 

2189 

2190 def _chain_loader(api: DebputyPluginInitializer) -> None: 

2191 for initializer in plugin_initializers: 

2192 initializer(api) 

2193 

2194 return _chain_loader 

2195 

2196 

2197@overload 

2198@contextlib.contextmanager 

2199def _open( 2199 ↛ exitline 2199 didn't return from function '_open' because

2200 path: str, 

2201 fd: IO[AnyStr] | IOBase = ..., 

2202) -> Iterator[IO[AnyStr] | IOBase]: ... 

2203 

2204 

2205@overload 

2206@contextlib.contextmanager 

2207def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2207 ↛ exitline 2207 didn't return from function '_open' because

2208 

2209 

2210@contextlib.contextmanager 

2211def _open( 

2212 path: str, fd: IO[AnyStr] | IOBase | None = None 

2213) -> Iterator[IO[AnyStr] | IOBase]: 

2214 if fd is not None: 

2215 yield fd 

2216 else: 

2217 with open(path, "rb") as fd: 

2218 yield fd 

2219 

2220 

2221def _resolve_json_plugin_docs_path( 

2222 plugin_name: str, 

2223 plugin_path: str, 

2224) -> Traversable | Path | None: 

2225 plugin_dir = os.path.dirname(plugin_path) 

2226 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml")) 

2227 

2228 

2229def parse_json_plugin_desc( 

2230 path: str, 

2231 *, 

2232 fd: IO[AnyStr] | IOBase | None = None, 

2233 is_from_python_path: bool = False, 

2234) -> DebputyPluginMetadata: 

2235 with _open(path, fd=fd) as rfd: 

2236 try: 

2237 raw = json.load(rfd) 

2238 except JSONDecodeError as e: 

2239 raise PluginMetadataError( 

2240 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

2241 ) from e 

2242 plugin_name = os.path.basename(path) 

2243 if plugin_name.endswith(".json"): 

2244 plugin_name = plugin_name[:-5] 

2245 elif plugin_name.endswith(".json.in"): 

2246 plugin_name = plugin_name[:-8] 

2247 

2248 if plugin_name == "debputy": 2248 ↛ 2250line 2248 didn't jump to line 2250 because the condition on line 2248 was never true

2249 # Provide a better error message than "The plugin has already loaded!?" 

2250 raise PluginMetadataError( 

2251 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

2252 f" clash with the bundled plugin of same name." 

2253 ) 

2254 

2255 attribute_path = AttributePath.root_path(raw) 

2256 

2257 try: 

2258 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

2259 raw, 

2260 attribute_path, 

2261 ) 

2262 except ManifestParseException as e: 

2263 raise PluginMetadataError( 

2264 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

2265 ) from e 

2266 api_compat = plugin_json_metadata["api_compat_version"] 

2267 

2268 return DebputyPluginMetadata( 

2269 plugin_name=plugin_name, 

2270 plugin_loader=lambda: _json_plugin_loader( 

2271 plugin_name, 

2272 plugin_json_metadata, 

2273 path, 

2274 attribute_path, 

2275 ), 

2276 api_compat_version=api_compat, 

2277 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path( 

2278 plugin_name, path 

2279 ), 

2280 plugin_initializer=None, 

2281 plugin_path=path, 

2282 is_from_python_path=is_from_python_path, 

2283 ) 

2284 

2285 

2286@dataclasses.dataclass(slots=True, frozen=True) 

2287class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

2288 name: str 

2289 names: Sequence[str] 

2290 path: VirtualPath 

2291 type_of_service: str 

2292 service_scope: str 

2293 auto_enable_on_install: bool 

2294 auto_start_on_install: bool 

2295 on_upgrade: ServiceUpgradeRule 

2296 definition_source: str 

2297 is_plugin_provided_definition: bool 

2298 service_context: DSD | None 

2299 

2300 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

2301 return dataclasses.replace(self, **changes) 

2302 

2303 

2304class ServiceRegistryImpl(ServiceRegistry[DSD]): 

2305 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

2306 

2307 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

2308 self._service_manager_details = service_manager_details 

2309 self._service_definitions: list[ServiceDefinition[DSD]] = [] 

2310 self._seen_services: set[tuple[str, str, str]] = set() 

2311 

2312 @property 

2313 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

2314 return self._service_definitions 

2315 

2316 def register_service( 

2317 self, 

2318 path: VirtualPath, 

2319 name: str | list[str], 

2320 *, 

2321 type_of_service: str = "service", # "timer", etc. 

2322 service_scope: str = "system", 

2323 enable_by_default: bool = True, 

2324 start_by_default: bool = True, 

2325 default_upgrade_rule: ServiceUpgradeRule = "restart", 

2326 service_context: DSD | None = None, 

2327 ) -> None: 

2328 names = name if isinstance(name, list) else [name] 

2329 if len(names) < 1: 

2330 raise ValueError( 

2331 f"The service must have at least one name - {path.absolute} did not have any" 

2332 ) 

2333 for n in names: 

2334 key = (n, type_of_service, service_scope) 

2335 if key in self._seen_services: 

2336 raise PluginAPIViolationError( 

2337 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

2338 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

2339 " allowed by the debputy plugin API." 

2340 ) 

2341 # TODO: We cannot create a service definition immediate once the manifest is involved 

2342 self._service_definitions.append( 

2343 ServiceDefinitionImpl( 

2344 names[0], 

2345 names, 

2346 path, 

2347 type_of_service, 

2348 service_scope, 

2349 enable_by_default, 

2350 start_by_default, 

2351 default_upgrade_rule, 

2352 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

2353 True, 

2354 service_context, 

2355 ) 

2356 )