Coverage for src/debputy/plugin/api/impl.py: 60%

910 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-04 10:15 +0000

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.resources 

6import importlib.util 

7import inspect 

8import itertools 

9import json 

10import os 

11import re 

12import subprocess 

13import sys 

14from abc import ABC 

15from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container 

16from importlib.resources.abc import Traversable 

17from io import IOBase 

18from json import JSONDecodeError 

19from pathlib import Path 

20from types import NoneType 

21from typing import ( 

22 IO, 

23 AbstractSet, 

24 cast, 

25 Any, 

26 Literal, 

27 TYPE_CHECKING, 

28 is_typeddict, 

29 AnyStr, 

30 overload, 

31) 

32 

33import debputy 

34from debputy import DEBPUTY_DOC_ROOT_DIR 

35from debputy.exceptions import ( 

36 DebputySubstitutionError, 

37 PluginConflictError, 

38 PluginMetadataError, 

39 PluginBaseError, 

40 PluginInitializationError, 

41 PluginAPIViolationError, 

42 PluginNotFoundError, 

43 PluginIncorrectRegistrationError, 

44) 

45from debputy.maintscript_snippet import ( 

46 STD_CONTROL_SCRIPTS, 

47 MaintscriptSnippetContainer, 

48 MaintscriptSnippet, 

49) 

50from debputy.manifest_parser.exceptions import ManifestParseException 

51from debputy.manifest_parser.parser_data import ParserContextData 

52from debputy.manifest_parser.tagging_types import TypeMapping 

53from debputy.manifest_parser.util import AttributePath 

54from debputy.manifest_parser.util import resolve_package_type_selectors 

55from debputy.plugin.api.doc_parsing import ( 

56 DEBPUTY_DOC_REFERENCE_DATA_PARSER, 

57 parser_type_name, 

58 DebputyParsedDoc, 

59) 

60from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

61from debputy.plugin.api.impl_types import ( 

62 DebputyPluginMetadata, 

63 PackagerProvidedFileClassSpec, 

64 MetadataOrMaintscriptDetector, 

65 PluginProvidedTrigger, 

66 TTP, 

67 DIPHandler, 

68 PF, 

69 SF, 

70 DIPKWHandler, 

71 PluginProvidedManifestVariable, 

72 PluginProvidedPackageProcessor, 

73 PluginProvidedDiscardRule, 

74 AutomaticDiscardRuleExample, 

75 PPFFormatParam, 

76 ServiceManagerDetails, 

77 KnownPackagingFileInfo, 

78 PluginProvidedKnownPackagingFile, 

79 DHCompatibilityBasedRule, 

80 PluginProvidedTypeMapping, 

81 PluginProvidedBuildSystemAutoDetection, 

82 BSR, 

83 TP, 

84) 

85from debputy.plugin.api.plugin_parser import ( 

86 PLUGIN_METADATA_PARSER, 

87 PluginJsonMetadata, 

88 PLUGIN_PPF_PARSER, 

89 PackagerProvidedFileJsonDescription, 

90 PLUGIN_MANIFEST_VARS_PARSER, 

91 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

92) 

93from debputy.plugin.api.spec import ( 

94 MaintscriptAccessor, 

95 Maintscript, 

96 DpkgTriggerType, 

97 BinaryCtrlAccessor, 

98 PackageProcessingContext, 

99 MetadataAutoDetector, 

100 PluginInitializationEntryPoint, 

101 DebputyPluginInitializer, 

102 PackageTypeSelector, 

103 FlushableSubstvars, 

104 ParserDocumentation, 

105 PackageProcessor, 

106 VirtualPath, 

107 ServiceIntegrator, 

108 ServiceDetector, 

109 ServiceRegistry, 

110 ServiceDefinition, 

111 DSD, 

112 ServiceUpgradeRule, 

113 PackagerProvidedFileReferenceDocumentation, 

114 packager_provided_file_reference_documentation, 

115 TypeMappingDocumentation, 

116 DebputyIntegrationMode, 

117 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

118 BuildSystemManifestRuleMetadata, 

119 INTEGRATION_MODE_FULL, 

120 only_integrations, 

121 DebputyPluginDefinition, 

122) 

123from debputy.plugin.api.std_docs import _STD_ATTR_DOCS 

124from debputy.plugin.plugin_state import ( 

125 run_in_context_of_plugin, 

126 run_in_context_of_plugin_wrap_errors, 

127 wrap_plugin_code, 

128 register_manifest_type_value_in_context, 

129) 

130from debputy.plugins.debputy.to_be_api_types import ( 

131 BuildRuleParsedFormat, 

132 BSPF, 

133 debputy_build_system, 

134) 

135from debputy.substitution import ( 

136 Substitution, 

137 VariableNameState, 

138 SUBST_VAR_RE, 

139 VariableContext, 

140) 

141from debputy.util import ( 

142 _normalize_path, 

143 POSTINST_DEFAULT_CONDITION, 

144 _error, 

145 print_command, 

146 _warn, 

147 _debug_log, 

148) 

149from debputy.yaml import MANIFEST_YAML 

150 

151if TYPE_CHECKING: 

152 from debputy.highlevel_manifest import HighLevelManifest 

153 

154PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

155PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__) 

156 

157 

158def _validate_known_packaging_file_dh_compat_rules( 

159 dh_compat_rules: list[DHCompatibilityBasedRule] | None, 

160) -> None: 

161 max_compat = None 

162 if not dh_compat_rules: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 return 

164 dh_compat_rule: DHCompatibilityBasedRule 

165 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

166 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

167 compat = dh_compat_rule.get("starting_with_compat_level") 

168 

169 remaining = dh_compat_rule.keys() - { 

170 "after_debhelper_version", 

171 "starting_with_compat_level", 

172 } 

173 if not remaining: 

174 raise ValueError( 

175 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?" 

176 ) 

177 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

178 raise ValueError( 

179 f"The dh compat-rule at index {idx} is not the last and is missing either" 

180 " before-debhelper-version or before-compat-level" 

181 ) 

182 if compat is not None and compat < 0: 

183 raise ValueError( 

184 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

185 f" for something that appeared when migrating from {compat} to {compat + 1}." 

186 ) 

187 

188 if max_compat is None: 

189 max_compat = compat 

190 elif compat is not None: 

191 if compat >= max_compat: 

192 raise ValueError( 

193 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

194 ) 

195 max_compat = compat 

196 

197 install_pattern = dh_compat_rule.get("install_pattern") 

198 if ( 

199 install_pattern is not None 

200 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

201 ): 

202 raise ValueError( 

203 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

204 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

205 ) 

206 

207 

208class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

209 __slots__ = ( 

210 "_plugin_metadata", 

211 "_feature_set", 

212 "_plugin_detector_ids", 

213 "_substitution", 

214 "_unloaders", 

215 "_is_doc_cache_resolved", 

216 "_doc_cache", 

217 "_registered_manifest_types", 

218 "_load_started", 

219 ) 

220 

221 def __init__( 

222 self, 

223 plugin_metadata: DebputyPluginMetadata, 

224 feature_set: PluginProvidedFeatureSet, 

225 substitution: Substitution, 

226 ) -> None: 

227 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

228 self._feature_set = feature_set 

229 self._plugin_detector_ids: set[str] = set() 

230 self._substitution = substitution 

231 self._unloaders: list[Callable[[], None]] = [] 

232 self._is_doc_cache_resolved: bool = False 

233 self._doc_cache: DebputyParsedDoc | None = None 

234 self._registered_manifest_types: dict[type[Any], DebputyPluginMetadata] = {} 

235 self._load_started = False 

236 

237 @property 

238 def plugin_metadata(self) -> DebputyPluginMetadata: 

239 return self._plugin_metadata 

240 

241 def unload_plugin(self) -> None: 

242 if self._load_started: 

243 for unloader in self._unloaders: 

244 unloader() 

245 del self._feature_set.plugin_data[self._plugin_name] 

246 

247 def load_plugin(self) -> None: 

248 metadata = self._plugin_metadata 

249 if metadata.plugin_name in self._feature_set.plugin_data: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 raise PluginConflictError( 

251 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

252 ) 

253 assert ( 

254 metadata.api_compat_version == 1 

255 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

256 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

257 self._load_started = True 

258 assert not metadata.is_initialized 

259 try: 

260 metadata.initialize_plugin(self) 

261 except Exception as e: 

262 initializer = metadata.plugin_initializer 

263 if ( 263 ↛ 268line 263 didn't jump to line 268 because the condition on line 263 was never true

264 isinstance(e, TypeError) 

265 and initializer is not None 

266 and not callable(initializer) 

267 ): 

268 raise PluginMetadataError( 

269 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

270 f" callable (callable returns False). The specified entry point identifies" 

271 f' itself as "{initializer.__qualname__}".' 

272 ) from e 

273 elif isinstance(e, PluginBaseError): 273 ↛ 275line 273 didn't jump to line 275 because the condition on line 273 was always true

274 raise 

275 raise PluginInitializationError( 

276 f"Exception while attempting to load plugin {metadata.plugin_name}" 

277 ) from e 

278 

279 def _resolve_docs(self) -> DebputyParsedDoc | None: 

280 doc_cache = self._doc_cache 

281 if doc_cache is not None: 

282 return doc_cache 

283 

284 plugin_doc_path = self._plugin_metadata.plugin_doc_path 

285 if plugin_doc_path is None or self._is_doc_cache_resolved: 

286 self._is_doc_cache_resolved = True 

287 return None 

288 try: 

289 with plugin_doc_path.open("r", encoding="utf-8") as fd: 

290 raw = MANIFEST_YAML.load(fd) 

291 except FileNotFoundError: 

292 _debug_log( 

293 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}" 

294 ) 

295 self._is_doc_cache_resolved = True 

296 return None 

297 attr_path = AttributePath.root_path(plugin_doc_path) 

298 try: 

299 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path) 

300 except ManifestParseException as e: 

301 raise ValueError( 

302 f"Could not parse documentation in {plugin_doc_path}: {e.message}" 

303 ) from e 

304 try: 

305 res = DebputyParsedDoc.from_ref_data(ref) 

306 except ValueError as e: 

307 raise ValueError( 

308 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}" 

309 ) from e 

310 

311 self._doc_cache = res 

312 self._is_doc_cache_resolved = True 

313 return res 

314 

315 def _pluggable_manifest_docs_for( 

316 self, 

317 rule_type: TTP | str, 

318 rule_name: str | list[str], 

319 *, 

320 inline_reference_documentation: ParserDocumentation | None = None, 

321 ) -> ParserDocumentation | None: 

322 ref_data = self._resolve_docs() 

323 if ref_data is not None: 

324 primary_rule_name = ( 

325 rule_name if isinstance(rule_name, str) else rule_name[0] 

326 ) 

327 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}" 

328 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref) 

329 if resolved_docs is not None: 

330 if inline_reference_documentation is not None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 raise ValueError( 

332 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via" 

333 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so" 

334 f" there is only one doc reference" 

335 ) 

336 return resolved_docs 

337 return inline_reference_documentation 

338 

339 def packager_provided_file( 

340 self, 

341 stem: str, 

342 installed_path: str, 

343 *, 

344 default_mode: int = 0o0644, 

345 default_priority: int | None = None, 

346 allow_name_segment: bool = True, 

347 allow_architecture_segment: bool = False, 

348 post_formatting_rewrite: Callable[[str], str] | None = None, 

349 packageless_is_fallback_for_all_packages: bool = False, 

350 reservation_only: bool = False, 

351 format_callback: None | ( 

352 Callable[[str, PPFFormatParam, VirtualPath], str] 

353 ) = None, 

354 reference_documentation: None | ( 

355 PackagerProvidedFileReferenceDocumentation 

356 ) = None, 

357 ) -> None: 

358 packager_provided_files = self._feature_set.packager_provided_files 

359 existing = packager_provided_files.get(stem) 

360 

361 if format_callback is not None and self._plugin_name != "debputy": 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 raise ValueError( 

363 "Sorry; Using format_callback is a debputy-internal" 

364 f" API. Triggered by plugin {self._plugin_name}" 

365 ) 

366 

367 if installed_path.endswith("/"): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 raise ValueError( 

369 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

370 f" Triggered by plugin {self._plugin_name}." 

371 ) 

372 

373 installed_path = _normalize_path(installed_path) 

374 

375 has_name_var = "{name}" in installed_path 

376 

377 if installed_path.startswith("./DEBIAN") or reservation_only: 

378 # Special-case, used for control files. 

379 if self._plugin_name != "debputy": 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 raise ValueError( 

381 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

382 f" API. Triggered by plugin {self._plugin_name}" 

383 ) 

384 elif not has_name_var and "{owning_package}" not in installed_path: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 raise ValueError( 

386 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

387 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

388 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

389 ) 

390 

391 if allow_name_segment and not has_name_var: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 raise ValueError( 

393 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

394 " variable. Otherwise, the name segment will not work properly. Triggered by" 

395 f" plugin {self._plugin_name}" 

396 ) 

397 

398 if ( 398 ↛ 403line 398 didn't jump to line 403 because the condition on line 398 was never true

399 default_priority is not None 

400 and "{priority}" not in installed_path 

401 and "{priority:02}" not in installed_path 

402 ): 

403 raise ValueError( 

404 'When default_priority is not None, the installed_path should have a "{priority}"' 

405 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

406 f" Triggered by plugin {self._plugin_name}" 

407 ) 

408 

409 if existing is not None: 

410 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 410 ↛ 417line 410 didn't jump to line 417 because the condition on line 410 was always true

411 message = ( 

412 f'The stem "{stem}" is registered twice for packager provided files.' 

413 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

414 f" by {self._plugin_name}" 

415 ) 

416 else: 

417 message = ( 

418 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

419 f' stem "{stem}" twice for packager provided files.' 

420 ) 

421 raise PluginConflictError( 

422 message, existing.debputy_plugin_metadata, self._plugin_metadata 

423 ) 

424 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

425 self._plugin_metadata, 

426 stem, 

427 installed_path, 

428 default_mode=default_mode, 

429 default_priority=default_priority, 

430 allow_name_segment=allow_name_segment, 

431 allow_architecture_segment=allow_architecture_segment, 

432 post_formatting_rewrite=post_formatting_rewrite, 

433 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

434 reservation_only=reservation_only, 

435 formatting_callback=format_callback, 

436 reference_documentation=reference_documentation, 

437 ) 

438 

439 def _unload() -> None: 

440 del packager_provided_files[stem] 

441 

442 self._unloaders.append(_unload) 

443 

444 def metadata_or_maintscript_detector( 

445 self, 

446 auto_detector_id: str, 

447 auto_detector: MetadataAutoDetector, 

448 *, 

449 package_type: PackageTypeSelector = "deb", 

450 ) -> None: 

451 if auto_detector_id in self._plugin_detector_ids: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 raise ValueError( 

453 f"The plugin {self._plugin_name} tried to register" 

454 f' "{auto_detector_id}" twice' 

455 ) 

456 self._plugin_detector_ids.add(auto_detector_id) 

457 all_detectors = self._feature_set.metadata_maintscript_detectors 

458 if self._plugin_name not in all_detectors: 

459 all_detectors[self._plugin_name] = [] 

460 package_types = resolve_package_type_selectors(package_type) 

461 all_detectors[self._plugin_name].append( 

462 MetadataOrMaintscriptDetector( 

463 detector_id=auto_detector_id, 

464 detector=wrap_plugin_code(self._plugin_name, auto_detector), 

465 plugin_metadata=self._plugin_metadata, 

466 applies_to_package_types=package_types, 

467 enabled=True, 

468 ) 

469 ) 

470 

471 def _unload() -> None: 

472 if self._plugin_name in all_detectors: 

473 del all_detectors[self._plugin_name] 

474 

475 self._unloaders.append(_unload) 

476 

477 def document_builtin_variable( 

478 self, 

479 variable_name: str, 

480 variable_reference_documentation: str, 

481 *, 

482 is_context_specific: bool = False, 

483 is_for_special_case: bool = False, 

484 ) -> None: 

485 manifest_variables = self._feature_set.manifest_variables 

486 self._restricted_api() 

487 state = self._substitution.variable_state(variable_name) 

488 if state == VariableNameState.UNDEFINED: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 raise ValueError( 

490 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

491 f" but it is not known to be a variable" 

492 ) 

493 

494 assert variable_name not in manifest_variables 

495 

496 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

497 self._plugin_metadata, 

498 variable_name, 

499 None, 

500 is_context_specific_variable=is_context_specific, 

501 variable_reference_documentation=variable_reference_documentation, 

502 is_documentation_placeholder=True, 

503 is_for_special_case=is_for_special_case, 

504 ) 

505 

506 def _unload() -> None: 

507 del manifest_variables[variable_name] 

508 

509 self._unloaders.append(_unload) 

510 

511 def manifest_variable_provider( 

512 self, 

513 provider: Callable[[VariableContext], Mapping[str, str]], 

514 variables: Sequence[str] | Mapping[str, str | None], 

515 ) -> None: 

516 self._restricted_api() 

517 cached_provider = functools.lru_cache(None)(provider) 

518 permitted_variables = frozenset(variables) 

519 variables_iter: Iterable[tuple[str, str | None]] 

520 if not isinstance(variables, Mapping): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 variables_iter = zip(variables, itertools.repeat(None)) 

522 else: 

523 variables_iter = variables.items() 

524 

525 checked_vars = False 

526 manifest_variables = self._feature_set.manifest_variables 

527 plugin_name = self._plugin_name 

528 

529 def _value_resolver_generator( 

530 variable_name: str, 

531 ) -> Callable[[VariableContext], str]: 

532 def _value_resolver(variable_context: VariableContext) -> str: 

533 res = cached_provider(variable_context) 

534 nonlocal checked_vars 

535 if not checked_vars: 535 ↛ 546line 535 didn't jump to line 546 because the condition on line 535 was always true

536 if permitted_variables != res.keys(): 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true

537 expected = ", ".join(sorted(permitted_variables)) 

538 actual = ", ".join(sorted(res)) 

539 raise PluginAPIViolationError( 

540 f"The plugin {plugin_name} claimed to provide" 

541 f" the following variables {expected}," 

542 f" but when resolving the variables, the plugin provided" 

543 f" {actual}. These two lists should have been the same." 

544 ) 

545 checked_vars = False 

546 return res[variable_name] 

547 

548 return _value_resolver 

549 

550 for varname, vardoc in variables_iter: 

551 self._check_variable_name(varname) 

552 manifest_variables[varname] = PluginProvidedManifestVariable( 

553 self._plugin_metadata, 

554 varname, 

555 _value_resolver_generator(varname), 

556 is_context_specific_variable=False, 

557 variable_reference_documentation=vardoc, 

558 ) 

559 

560 def _unload() -> None: 

561 raise PluginInitializationError( 

562 "Cannot unload manifest_variable_provider (not implemented)" 

563 ) 

564 

565 self._unloaders.append(_unload) 

566 

567 def _check_variable_name(self, variable_name: str) -> None: 

568 manifest_variables = self._feature_set.manifest_variables 

569 existing = manifest_variables.get(variable_name) 

570 

571 if existing is not None: 

572 if existing.plugin_metadata.plugin_name == self._plugin_name: 572 ↛ 578line 572 didn't jump to line 578 because the condition on line 572 was always true

573 message = ( 

574 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

575 f' manifest variable "{variable_name}" twice.' 

576 ) 

577 else: 

578 message = ( 

579 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

580 f" both tried to provide the manifest variable {variable_name}" 

581 ) 

582 raise PluginConflictError( 

583 message, existing.plugin_metadata, self._plugin_metadata 

584 ) 

585 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

586 raise ValueError( 

587 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

588 f" which is not a valid variable name" 

589 ) 

590 

591 namespace = "" 

592 variable_basename = variable_name 

593 if ":" in variable_name: 

594 namespace, variable_basename = variable_name.rsplit(":", 1) 

595 assert namespace != "" 

596 assert variable_name != "" 

597 

598 if namespace != "" and namespace not in ("token", "path"): 

599 raise ValueError( 

600 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

601 f" which is in the reserved namespace {namespace}" 

602 ) 

603 

604 variable_name_upper = variable_name.upper() 

605 if ( 

606 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

607 or variable_basename.startswith("_") 

608 or variable_basename.upper().startswith("DEBPUTY") 

609 ) and self._plugin_name != "debputy": 

610 raise ValueError( 

611 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

612 f" which is a variable name reserved by debputy" 

613 ) 

614 

615 state = self._substitution.variable_state(variable_name) 

616 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

617 raise ValueError( 

618 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

619 f" which would shadow a built-in variable" 

620 ) 

621 

622 def package_processor( 

623 self, 

624 processor_id: str, 

625 processor: PackageProcessor, 

626 *, 

627 depends_on_processor: Iterable[str] = tuple(), 

628 package_type: PackageTypeSelector = "deb", 

629 ) -> None: 

630 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"}) 

631 package_processors = self._feature_set.all_package_processors 

632 dependencies = set() 

633 processor_key = (self._plugin_name, processor_id) 

634 

635 if processor_key in package_processors: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 raise PluginConflictError( 

637 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

638 self._plugin_metadata, 

639 self._plugin_metadata, 

640 ) 

641 

642 for depends_ref in depends_on_processor: 

643 if isinstance(depends_ref, str): 643 ↛ 657line 643 didn't jump to line 657 because the condition on line 643 was always true

644 if (self._plugin_name, depends_ref) in package_processors: 644 ↛ 646line 644 didn't jump to line 646 because the condition on line 644 was always true

645 depends_key = (self._plugin_name, depends_ref) 

646 elif ("debputy", depends_ref) in package_processors: 

647 depends_key = ("debputy", depends_ref) 

648 else: 

649 raise ValueError( 

650 f'Could not resolve dependency "{depends_ref}" for' 

651 f' "{processor_id}". It was not provided by the plugin itself' 

652 f" ({self._plugin_name}) nor debputy." 

653 ) 

654 else: 

655 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

656 # via the direct dependencies. 

657 assert False 

658 

659 existing_processor = package_processors.get(depends_key) 

660 if existing_processor is None: 660 ↛ 663line 660 didn't jump to line 663 because the condition on line 660 was never true

661 # We currently require the processor to be declared already. If this ever changes, 

662 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

663 dplugin_name, dprocessor_name = depends_key 

664 available_processors = ", ".join( 

665 n for p, n in package_processors.keys() if p == dplugin_name 

666 ) 

667 raise ValueError( 

668 f"The plugin {dplugin_name} does not provide a processor called" 

669 f" {dprocessor_name}. Available processors for that plugin are:" 

670 f" {available_processors}" 

671 ) 

672 dependencies.add(depends_key) 

673 

674 package_processors[processor_key] = PluginProvidedPackageProcessor( 

675 processor_id, 

676 resolve_package_type_selectors(package_type), 

677 wrap_plugin_code(self._plugin_name, processor), 

678 frozenset(dependencies), 

679 self._plugin_metadata, 

680 ) 

681 

682 def _unload() -> None: 

683 del package_processors[processor_key] 

684 

685 self._unloaders.append(_unload) 

686 

687 def automatic_discard_rule( 

688 self, 

689 name: str, 

690 should_discard: Callable[[VirtualPath], bool], 

691 *, 

692 rule_reference_documentation: str | None = None, 

693 examples: ( 

694 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample] 

695 ) = tuple(), 

696 ) -> None: 

697 """Register an automatic discard rule 

698 

699 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

700 If any discard rule concludes that a path should not be installed, then the path is not installed. 

701 In the case where the discard path is a: 

702 

703 * directory: Then the entire directory is excluded along with anything beneath it. 

704 * symlink: Then the symlink itself (but not its target) is excluded. 

705 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

706 

707 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

708 

709 Automatic discard rules should be written with the assumption that directories will be tested 

710 before their content *when it is relevant* for the discard rule to examine whether the directory 

711 can be excluded. 

712 

713 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

714 without any globs. As example: 

715 

716 installations: 

717 - install: 

718 sources: 

719 - usr/lib/libfoo.la # <-- This path is always installed 

720 # (Discard rules are never asked in this case) 

721 # 

722 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

723 # Though, they will not examine `libfoo.la` as it has already been installed 

724 # 

725 # Note: usr/lib itself is never tested in this case (it is assumed to be 

726 # explicitly requested). But any subdir of usr/lib will be examined. 

727 

728 When an automatic discard rule is evaluated, it can see the source path currently being considered 

729 for installation. While it can look at "surrounding" context (like parent directory), it will not 

730 know whether those paths are to be installed or will be installed. 

731 

732 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

733 metacharacters and spaces. 

734 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

735 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

736 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

737 A source path will either be from the root of the source tree or the root of a search directory such as 

738 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

739 evaluated. 

740 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

741 looks up this automatic discard rule. 

742 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

743 generate the examples. 

744 

745 """ 

746 self._restricted_api() 

747 auto_discard_rules = self._feature_set.auto_discard_rules 

748 existing = auto_discard_rules.get(name) 

749 if existing is not None: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 if existing.plugin_metadata.plugin_name == self._plugin_name: 

751 message = ( 

752 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

753 f' automatic discard rule "{name}" twice.' 

754 ) 

755 else: 

756 message = ( 

757 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

758 f" both tried to provide the automatic discard rule {name}" 

759 ) 

760 raise PluginConflictError( 

761 message, existing.plugin_metadata, self._plugin_metadata 

762 ) 

763 examples = ( 

764 (examples,) 

765 if isinstance(examples, AutomaticDiscardRuleExample) 

766 else tuple(examples) 

767 ) 

768 auto_discard_rules[name] = PluginProvidedDiscardRule( 

769 name, 

770 self._plugin_metadata, 

771 should_discard, 

772 rule_reference_documentation, 

773 examples, 

774 ) 

775 

776 def _unload() -> None: 

777 del auto_discard_rules[name] 

778 

779 self._unloaders.append(_unload) 

780 

781 def service_provider( 

782 self, 

783 service_manager: str, 

784 detector: ServiceDetector, 

785 integrator: ServiceIntegrator, 

786 ) -> None: 

787 self._restricted_api() 

788 service_managers = self._feature_set.service_managers 

789 existing = service_managers.get(service_manager) 

790 if existing is not None: 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true

791 if existing.plugin_metadata.plugin_name == self._plugin_name: 

792 message = ( 

793 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

794 f' service manager "{service_manager}" twice.' 

795 ) 

796 else: 

797 message = ( 

798 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

799 f' both tried to provide the service manager "{service_manager}"' 

800 ) 

801 raise PluginConflictError( 

802 message, existing.plugin_metadata, self._plugin_metadata 

803 ) 

804 service_managers[service_manager] = ServiceManagerDetails( 

805 service_manager, 

806 wrap_plugin_code(self._plugin_name, detector), 

807 wrap_plugin_code(self._plugin_name, integrator), 

808 self._plugin_metadata, 

809 ) 

810 

811 def _unload() -> None: 

812 del service_managers[service_manager] 

813 

814 self._unloaders.append(_unload) 

815 

816 def manifest_variable( 

817 self, 

818 variable_name: str, 

819 value: str, 

820 *, 

821 variable_reference_documentation: str | None = None, 

822 ) -> None: 

823 self._check_variable_name(variable_name) 

824 manifest_variables = self._feature_set.manifest_variables 

825 try: 

826 resolved_value = self._substitution.substitute( 

827 value, "Plugin initialization" 

828 ) 

829 depends_on_variable = resolved_value != value 

830 except DebputySubstitutionError: 

831 depends_on_variable = True 

832 if depends_on_variable: 

833 raise ValueError( 

834 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

835 f" This value depends on another variable, which is not supported. This restriction may be" 

836 f" lifted in the future." 

837 ) 

838 

839 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

840 self._plugin_metadata, 

841 variable_name, 

842 value, 

843 is_context_specific_variable=False, 

844 variable_reference_documentation=variable_reference_documentation, 

845 ) 

846 

847 def _unload() -> None: 

848 # We need to check it was never resolved 

849 raise PluginInitializationError( 

850 "Cannot unload manifest_variable (not implemented)" 

851 ) 

852 

853 self._unloaders.append(_unload) 

854 

855 @property 

856 def _plugin_name(self) -> str: 

857 return self._plugin_metadata.plugin_name 

858 

859 def provide_manifest_keyword( 

860 self, 

861 rule_type: TTP, 

862 rule_name: str | list[str], 

863 handler: DIPKWHandler, 

864 *, 

865 inline_reference_documentation: ParserDocumentation | None = None, 

866 ) -> None: 

867 self._restricted_api() 

868 parser_generator = self._feature_set.manifest_parser_generator 

869 if rule_type not in parser_generator.dispatchable_table_parsers: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 types = ", ".join( 

871 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

872 ) 

873 raise ValueError( 

874 f"The rule_type was not a supported type. It must be one of {types}" 

875 ) 

876 

877 inline_reference_documentation = self._pluggable_manifest_docs_for( 

878 rule_type, 

879 rule_name, 

880 inline_reference_documentation=inline_reference_documentation, 

881 ) 

882 

883 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

884 dispatching_parser.register_keyword( 

885 rule_name, 

886 wrap_plugin_code(self._plugin_name, handler), 

887 self._plugin_metadata, 

888 inline_reference_documentation=inline_reference_documentation, 

889 ) 

890 

891 def _unload() -> None: 

892 raise PluginInitializationError( 

893 "Cannot unload provide_manifest_keyword (not implemented)" 

894 ) 

895 

896 self._unloaders.append(_unload) 

897 

898 def pluggable_object_parser( 

899 self, 

900 rule_type: str, 

901 rule_name: str, 

902 *, 

903 object_parser_key: str | None = None, 

904 on_end_parse_step: None | ( 

905 Callable[ 

906 [str, Mapping[str, Any] | None, AttributePath, ParserContextData], 

907 None, 

908 ] 

909 ) = None, 

910 nested_in_package_context: bool = False, 

911 ) -> None: 

912 self._restricted_api() 

913 if object_parser_key is None: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 object_parser_key = rule_name 

915 

916 parser_generator = self._feature_set.manifest_parser_generator 

917 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

918 if rule_type not in dispatchable_object_parsers: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true

919 types = ", ".join(sorted(dispatchable_object_parsers)) 

920 raise ValueError( 

921 f"The rule_type was not a supported type. It must be one of {types}" 

922 ) 

923 if object_parser_key not in dispatchable_object_parsers: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true

924 types = ", ".join(sorted(dispatchable_object_parsers)) 

925 raise ValueError( 

926 f"The object_parser_key was not a supported type. It must be one of {types}" 

927 ) 

928 parent_dispatcher = dispatchable_object_parsers[rule_type] 

929 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

930 

931 if on_end_parse_step is not None: 931 ↛ 934line 931 didn't jump to line 934 because the condition on line 931 was always true

932 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step) 

933 

934 parent_dispatcher.register_child_parser( 

935 rule_name, 

936 child_dispatcher, 

937 self._plugin_metadata, 

938 on_end_parse_step=on_end_parse_step, 

939 nested_in_package_context=nested_in_package_context, 

940 ) 

941 

942 def _unload() -> None: 

943 raise PluginInitializationError( 

944 "Cannot unload pluggable_object_parser (not implemented)" 

945 ) 

946 

947 self._unloaders.append(_unload) 

948 

949 def pluggable_manifest_rule( 

950 self, 

951 rule_type: TTP | str, 

952 rule_name: str | Sequence[str], 

953 parsed_format: type[PF], 

954 handler: DIPHandler, 

955 *, 

956 source_format: SF | None = None, 

957 inline_reference_documentation: ParserDocumentation | None = None, 

958 expected_debputy_integration_mode: None | ( 

959 Container[DebputyIntegrationMode] 

960 ) = None, 

961 apply_standard_attribute_documentation: bool = False, 

962 register_value: bool = True, 

963 ) -> None: 

964 # When changing this, consider which types will be unrestricted 

965 self._restricted_api() 

966 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true

967 _error( 

968 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to" 

969 f" its use of apply_standard_attribute_documentation" 

970 ) 

971 feature_set = self._feature_set 

972 parser_generator = feature_set.manifest_parser_generator 

973 if isinstance(rule_type, str): 

974 if rule_type not in parser_generator.dispatchable_object_parsers: 974 ↛ 975line 974 didn't jump to line 975 because the condition on line 974 was never true

975 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

976 raise ValueError( 

977 f"The rule_type was not a supported type. It must be one of {types}" 

978 ) 

979 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

980 signature = inspect.signature(handler) 

981 if ( 981 ↛ 985line 981 didn't jump to line 985 because the condition on line 981 was never true

982 signature.return_annotation is signature.empty 

983 or signature.return_annotation == NoneType 

984 ): 

985 raise ValueError( 

986 "The handler must have a return type (that is not None)" 

987 ) 

988 register_as_type = signature.return_annotation 

989 else: 

990 # Dispatchable types cannot be resolved 

991 register_as_type = None 

992 if rule_type not in parser_generator.dispatchable_table_parsers: 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true

993 types = ", ".join( 

994 sorted( 

995 x.__name__ for x in parser_generator.dispatchable_table_parsers 

996 ) 

997 ) 

998 raise ValueError( 

999 f"The rule_type was not a supported type. It must be one of {types}" 

1000 ) 

1001 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

1002 

1003 if register_as_type is not None and not register_value: 

1004 register_as_type = None 

1005 

1006 if register_as_type is not None: 

1007 existing_registration = self._registered_manifest_types.get( 

1008 register_as_type 

1009 ) 

1010 if existing_registration is not None: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 raise ValueError( 

1012 f"Cannot register rule {rule_name!r} for plugin {self._plugin_name}. The plugin {existing_registration.plugin_name} already registered a manifest rule with type {register_as_type!r}" 

1013 ) 

1014 self._registered_manifest_types[register_as_type] = self._plugin_metadata 

1015 

1016 inline_reference_documentation = self._pluggable_manifest_docs_for( 

1017 rule_type, 

1018 rule_name, 

1019 inline_reference_documentation=inline_reference_documentation, 

1020 ) 

1021 

1022 if apply_standard_attribute_documentation: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true

1023 docs = _STD_ATTR_DOCS 

1024 else: 

1025 docs = None 

1026 

1027 parser = feature_set.manifest_parser_generator.generate_parser( 

1028 parsed_format, 

1029 source_content=source_format, 

1030 inline_reference_documentation=inline_reference_documentation, 

1031 expected_debputy_integration_mode=expected_debputy_integration_mode, 

1032 automatic_docs=docs, 

1033 ) 

1034 

1035 def _registering_handler( 

1036 name: str, 

1037 parsed_data: PF, 

1038 attribute_path: AttributePath, 

1039 parser_context: ParserContextData, 

1040 ) -> TP: 

1041 value = handler(name, parsed_data, attribute_path, parser_context) 

1042 if register_as_type is not None: 

1043 register_manifest_type_value_in_context(register_as_type, value) 

1044 return value 

1045 

1046 dispatching_parser.register_parser( 

1047 rule_name, 

1048 parser, 

1049 wrap_plugin_code(self._plugin_name, _registering_handler), 

1050 self._plugin_metadata, 

1051 ) 

1052 

1053 def _unload() -> None: 

1054 raise PluginInitializationError( 

1055 "Cannot unload pluggable_manifest_rule (not implemented)" 

1056 ) 

1057 

1058 self._unloaders.append(_unload) 

1059 

1060 def register_build_system( 

1061 self, 

1062 build_system_definition: type[BSPF], 

1063 ) -> None: 

1064 self._restricted_api() 

1065 if not is_typeddict(build_system_definition): 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true

1066 raise PluginInitializationError( 

1067 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__}," 

1068 f" but got {build_system_definition.__name__} instead" 

1069 ) 

1070 metadata = getattr( 

1071 build_system_definition, 

1072 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

1073 None, 

1074 ) 

1075 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1075 ↛ 1076line 1075 didn't jump to line 1076 because the condition on line 1075 was never true

1076 raise PluginIncorrectRegistrationError( 

1077 f"The {build_system_definition.__qualname__} type should have been annotated with" 

1078 f" @{debputy_build_system.__name__}." 

1079 ) 

1080 assert len(metadata.manifest_keywords) == 1 

1081 build_system_impl = metadata.build_system_impl 

1082 assert build_system_impl is not None 

1083 manifest_keyword = next(iter(metadata.manifest_keywords)) 

1084 self.pluggable_manifest_rule( 

1085 metadata.dispatched_type, 

1086 metadata.manifest_keywords, 

1087 build_system_definition, 

1088 # pluggable_manifest_rule does the wrapping 

1089 metadata.unwrapped_constructor, 

1090 source_format=metadata.source_format, 

1091 inline_reference_documentation=metadata.online_reference_documentation, 

1092 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL), 

1093 ) 

1094 self._auto_detectable_build_system( 

1095 manifest_keyword, 

1096 build_system_impl, 

1097 constructor=wrap_plugin_code( 

1098 self._plugin_name, 

1099 build_system_impl, 

1100 ), 

1101 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems, 

1102 ) 

1103 

1104 def _auto_detectable_build_system( 

1105 self, 

1106 manifest_keyword: str, 

1107 rule_type: type[BSR], 

1108 *, 

1109 shadowing_build_systems_when_active: frozenset[str] = frozenset(), 

1110 constructor: None | ( 

1111 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR] 

1112 ) = None, 

1113 ) -> None: 

1114 self._restricted_api() 

1115 feature_set = self._feature_set 

1116 existing = feature_set.auto_detectable_build_systems.get(rule_type) 

1117 if existing is not None: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 bs_name = rule_type.__class__.__name__ 

1119 if existing.plugin_metadata.plugin_name == self._plugin_name: 

1120 message = ( 

1121 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1122 f' auto-detection of the build system "{bs_name}" twice.' 

1123 ) 

1124 else: 

1125 message = ( 

1126 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

1127 f' both tried to provide auto-detection of the build system "{bs_name}"' 

1128 ) 

1129 raise PluginConflictError( 

1130 message, existing.plugin_metadata, self._plugin_metadata 

1131 ) 

1132 

1133 if constructor is None: 1133 ↛ 1135line 1133 didn't jump to line 1135 because the condition on line 1133 was never true

1134 

1135 def impl( 

1136 attributes: BuildRuleParsedFormat, 

1137 attribute_path: AttributePath, 

1138 manifest: "HighLevelManifest", 

1139 ) -> BSR: 

1140 return rule_type(attributes, attribute_path, manifest) 

1141 

1142 else: 

1143 impl = constructor 

1144 

1145 feature_set.auto_detectable_build_systems[rule_type] = ( 

1146 PluginProvidedBuildSystemAutoDetection( 

1147 manifest_keyword, 

1148 rule_type, 

1149 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system), 

1150 impl, 

1151 shadowing_build_systems_when_active, 

1152 self._plugin_metadata, 

1153 ) 

1154 ) 

1155 

1156 def _unload() -> None: 

1157 try: 

1158 del feature_set.auto_detectable_build_systems[rule_type] 

1159 except KeyError: 

1160 pass 

1161 

1162 self._unloaders.append(_unload) 

1163 

1164 def known_packaging_files( 

1165 self, 

1166 packaging_file_details: KnownPackagingFileInfo, 

1167 ) -> None: 

1168 known_packaging_files = self._feature_set.known_packaging_files 

1169 detection_method = packaging_file_details.get( 

1170 "detection_method", cast("Literal['path']", "path") 

1171 ) 

1172 path = packaging_file_details.get("path") 

1173 dhpkgfile = packaging_file_details.get("pkgfile") 

1174 

1175 packaging_file_details = packaging_file_details.copy() 

1176 

1177 if detection_method == "path": 1177 ↛ 1193line 1177 didn't jump to line 1193 because the condition on line 1177 was always true

1178 if dhpkgfile is not None: 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true

1179 raise ValueError( 

1180 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

1181 ) 

1182 if path is None: 1182 ↛ 1183line 1182 didn't jump to line 1183 because the condition on line 1182 was never true

1183 raise ValueError( 

1184 'The "path" attribute must be present when detection-method is "path" (or omitted)' 

1185 ) 

1186 if path != _normalize_path(path, with_prefix=False): 1186 ↛ 1187line 1186 didn't jump to line 1187 because the condition on line 1186 was never true

1187 raise ValueError( 

1188 f"The path for known packaging files must be normalized. Please replace" 

1189 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

1190 ) 

1191 detection_value = path 

1192 else: 

1193 assert detection_method == "dh.pkgfile" 

1194 if path is not None: 

1195 raise ValueError( 

1196 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

1197 ) 

1198 if dhpkgfile is None: 

1199 raise ValueError( 

1200 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"' 

1201 ) 

1202 if "/" in dhpkgfile: 

1203 raise ValueError( 

1204 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

1205 ) 

1206 detection_value = dhpkgfile 

1207 key = f"{detection_method}::{detection_value}" 

1208 existing = known_packaging_files.get(key) 

1209 if existing is not None: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true

1210 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1211 message = ( 

1212 f'The key "{key}" is registered twice for known packaging files.' 

1213 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1214 ) 

1215 else: 

1216 message = ( 

1217 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1218 f' key "{key}" twice for known packaging files.' 

1219 ) 

1220 raise PluginConflictError( 

1221 message, existing.plugin_metadata, self._plugin_metadata 

1222 ) 

1223 _validate_known_packaging_file_dh_compat_rules( 

1224 packaging_file_details.get("dh_compat_rules") 

1225 ) 

1226 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

1227 packaging_file_details, 

1228 detection_method, 

1229 detection_value, 

1230 self._plugin_metadata, 

1231 ) 

1232 

1233 def _unload() -> None: 

1234 del known_packaging_files[key] 

1235 

1236 self._unloaders.append(_unload) 

1237 

1238 def register_mapped_type( 

1239 self, 

1240 type_mapping: TypeMapping, 

1241 *, 

1242 reference_documentation: TypeMappingDocumentation | None = None, 

1243 ) -> None: 

1244 self._restricted_api() 

1245 target_type = type_mapping.target_type 

1246 mapped_types = self._feature_set.mapped_types 

1247 existing = mapped_types.get(target_type) 

1248 if existing is not None: 1248 ↛ 1249line 1248 didn't jump to line 1249 because the condition on line 1248 was never true

1249 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1250 message = ( 

1251 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

1252 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1253 ) 

1254 else: 

1255 message = ( 

1256 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1257 f' key "{target_type.__name__}" twice for known packaging files.' 

1258 ) 

1259 raise PluginConflictError( 

1260 message, existing.plugin_metadata, self._plugin_metadata 

1261 ) 

1262 parser_generator = self._feature_set.manifest_parser_generator 

1263 # TODO: Wrap the mapper in the plugin context 

1264 mapped_types[target_type] = PluginProvidedTypeMapping( 

1265 type_mapping, reference_documentation, self._plugin_metadata 

1266 ) 

1267 parser_generator.register_mapped_type(type_mapping) 

1268 

1269 def _restricted_api( 

1270 self, 

1271 *, 

1272 allowed_plugins: set[str] | frozenset[str] = frozenset(), 

1273 ) -> None: 

1274 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true

1275 raise PluginAPIViolationError( 

1276 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

1277 " If you are the maintainer of this plugin and want access to this" 

1278 " API, please file a feature request to make this public." 

1279 " (The API is currently private as it is unstable.)" 

1280 ) 

1281 

1282 

1283class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

1284 __slots__ = () 

1285 

1286 def _append_script( 

1287 self, 

1288 caller_name: str, 

1289 maintscript: Maintscript, 

1290 full_script: str, 

1291 /, 

1292 perform_substitution: bool = True, 

1293 ) -> None: 

1294 raise NotImplementedError 

1295 

1296 @classmethod 

1297 def _apply_condition_to_script( 

1298 cls, 

1299 condition: str, 

1300 run_snippet: str, 

1301 /, 

1302 indent: bool | None = None, 

1303 ) -> str: 

1304 if indent is None: 

1305 # We auto-determine this based on heredocs currently 

1306 indent = "<<" not in run_snippet 

1307 

1308 if indent: 

1309 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1310 if not run_snippet.endswith("\n"): 

1311 run_snippet += "\n" 

1312 condition_line = f"if {condition}; then\n" 

1313 end_line = "fi\n" 

1314 return "".join((condition_line, run_snippet, end_line)) 

1315 

1316 def on_configure( 

1317 self, 

1318 run_snippet: str, 

1319 /, 

1320 indent: bool | None = None, 

1321 perform_substitution: bool = True, 

1322 skip_on_rollback: bool = False, 

1323 ) -> None: 

1324 condition = POSTINST_DEFAULT_CONDITION 

1325 if skip_on_rollback: 1325 ↛ 1326line 1325 didn't jump to line 1326 because the condition on line 1325 was never true

1326 condition = '[ "$1" = "configure" ]' 

1327 return self._append_script( 

1328 "on_configure", 

1329 "postinst", 

1330 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1331 perform_substitution=perform_substitution, 

1332 ) 

1333 

1334 def on_initial_install( 

1335 self, 

1336 run_snippet: str, 

1337 /, 

1338 indent: bool | None = None, 

1339 perform_substitution: bool = True, 

1340 ) -> None: 

1341 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1342 return self._append_script( 

1343 "on_initial_install", 

1344 "postinst", 

1345 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1346 perform_substitution=perform_substitution, 

1347 ) 

1348 

1349 def on_upgrade( 

1350 self, 

1351 run_snippet: str, 

1352 /, 

1353 indent: bool | None = None, 

1354 perform_substitution: bool = True, 

1355 ) -> None: 

1356 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1357 return self._append_script( 

1358 "on_upgrade", 

1359 "postinst", 

1360 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1361 perform_substitution=perform_substitution, 

1362 ) 

1363 

1364 def on_upgrade_from( 

1365 self, 

1366 version: str, 

1367 run_snippet: str, 

1368 /, 

1369 indent: bool | None = None, 

1370 perform_substitution: bool = True, 

1371 ) -> None: 

1372 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1373 return self._append_script( 

1374 "on_upgrade_from", 

1375 "postinst", 

1376 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1377 perform_substitution=perform_substitution, 

1378 ) 

1379 

1380 def on_before_removal( 

1381 self, 

1382 run_snippet: str, 

1383 /, 

1384 indent: bool | None = None, 

1385 perform_substitution: bool = True, 

1386 ) -> None: 

1387 condition = '[ "$1" = "remove" ]' 

1388 return self._append_script( 

1389 "on_before_removal", 

1390 "prerm", 

1391 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1392 perform_substitution=perform_substitution, 

1393 ) 

1394 

1395 def on_removed( 

1396 self, 

1397 run_snippet: str, 

1398 /, 

1399 indent: bool | None = None, 

1400 perform_substitution: bool = True, 

1401 ) -> None: 

1402 condition = '[ "$1" = "remove" ]' 

1403 return self._append_script( 

1404 "on_removed", 

1405 "postrm", 

1406 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1407 perform_substitution=perform_substitution, 

1408 ) 

1409 

1410 def on_purge( 

1411 self, 

1412 run_snippet: str, 

1413 /, 

1414 indent: bool | None = None, 

1415 perform_substitution: bool = True, 

1416 ) -> None: 

1417 condition = '[ "$1" = "purge" ]' 

1418 return self._append_script( 

1419 "on_purge", 

1420 "postrm", 

1421 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1422 perform_substitution=perform_substitution, 

1423 ) 

1424 

1425 def unconditionally_in_script( 

1426 self, 

1427 maintscript: Maintscript, 

1428 run_snippet: str, 

1429 /, 

1430 perform_substitution: bool = True, 

1431 ) -> None: 

1432 if maintscript not in STD_CONTROL_SCRIPTS: 1432 ↛ 1433line 1432 didn't jump to line 1433 because the condition on line 1432 was never true

1433 raise ValueError( 

1434 f'Unknown script "{maintscript}". Should have been one of:' 

1435 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1436 ) 

1437 return self._append_script( 

1438 "unconditionally_in_script", 

1439 maintscript, 

1440 run_snippet, 

1441 perform_substitution=perform_substitution, 

1442 ) 

1443 

1444 

1445class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1446 __slots__ = ( 

1447 "_plugin_metadata", 

1448 "_maintscript_snippets", 

1449 "_plugin_source_id", 

1450 "_package_substitution", 

1451 "_default_snippet_order", 

1452 ) 

1453 

1454 def __init__( 

1455 self, 

1456 plugin_metadata: DebputyPluginMetadata, 

1457 plugin_source_id: str, 

1458 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1459 package_substitution: Substitution, 

1460 *, 

1461 default_snippet_order: Literal["service"] | None = None, 

1462 ): 

1463 self._plugin_metadata = plugin_metadata 

1464 self._plugin_source_id = plugin_source_id 

1465 self._maintscript_snippets = maintscript_snippets 

1466 self._package_substitution = package_substitution 

1467 self._default_snippet_order = default_snippet_order 

1468 

1469 def _append_script( 

1470 self, 

1471 caller_name: str, 

1472 maintscript: Maintscript, 

1473 full_script: str, 

1474 /, 

1475 perform_substitution: bool = True, 

1476 ) -> None: 

1477 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1478 if perform_substitution: 

1479 full_script = self._package_substitution.substitute(full_script, def_source) 

1480 

1481 snippet = MaintscriptSnippet( 

1482 snippet=full_script, 

1483 definition_source=def_source, 

1484 snippet_order=self._default_snippet_order, 

1485 ) 

1486 self._maintscript_snippets[maintscript].append(snippet) 

1487 

1488 

1489class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1490 __slots__ = ( 

1491 "_plugin_metadata", 

1492 "_plugin_source_id", 

1493 "_package_metadata_context", 

1494 "_triggers", 

1495 "_substvars", 

1496 "_maintscript", 

1497 "_shlibs_details", 

1498 ) 

1499 

1500 def __init__( 

1501 self, 

1502 plugin_metadata: DebputyPluginMetadata, 

1503 plugin_source_id: str, 

1504 package_metadata_context: PackageProcessingContext, 

1505 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1506 substvars: FlushableSubstvars, 

1507 shlibs_details: tuple[str | None, list[str] | None], 

1508 ) -> None: 

1509 self._plugin_metadata = plugin_metadata 

1510 self._plugin_source_id = plugin_source_id 

1511 self._package_metadata_context = package_metadata_context 

1512 self._triggers = triggers 

1513 self._substvars = substvars 

1514 self._maintscript: MaintscriptAccessor | None = None 

1515 self._shlibs_details = shlibs_details 

1516 

1517 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1518 raise NotImplementedError 

1519 

1520 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1521 """Register a declarative dpkg level trigger 

1522 

1523 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1524 

1525 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1526 """ 

1527 key = (trigger_type, trigger_target) 

1528 if key in self._triggers: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true

1529 return 

1530 self._triggers[key] = PluginProvidedTrigger( 

1531 dpkg_trigger_type=trigger_type, 

1532 dpkg_trigger_target=trigger_target, 

1533 provider=self._plugin_metadata, 

1534 provider_source_id=self._plugin_source_id, 

1535 ) 

1536 

1537 @property 

1538 def maintscript(self) -> MaintscriptAccessor: 

1539 maintscript = self._maintscript 

1540 if maintscript is None: 

1541 maintscript = self._create_maintscript_accessor() 

1542 self._maintscript = maintscript 

1543 return maintscript 

1544 

1545 @property 

1546 def substvars(self) -> FlushableSubstvars: 

1547 return self._substvars 

1548 

1549 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1550 binary_package = self._package_metadata_context.binary_package 

1551 with self.substvars.flush() as substvars_file: 

1552 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1553 if binary_package.is_udeb: 

1554 dpkg_cmd.append("-tudeb") 

1555 if binary_package.is_essential: 1555 ↛ 1556line 1555 didn't jump to line 1556 because the condition on line 1555 was never true

1556 dpkg_cmd.append("-dPre-Depends") 

1557 shlibs_local, shlib_dirs = self._shlibs_details 

1558 if shlibs_local is not None: 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true

1559 dpkg_cmd.append(f"-L{shlibs_local}") 

1560 if shlib_dirs: 1560 ↛ 1561line 1560 didn't jump to line 1561 because the condition on line 1560 was never true

1561 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1562 dpkg_cmd.extend(p.fs_path for p in paths) 

1563 print_command(*dpkg_cmd) 

1564 try: 

1565 subprocess.check_call(dpkg_cmd) 

1566 except subprocess.CalledProcessError: 

1567 _error( 

1568 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1569 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1570 ) 

1571 

1572 

1573class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1574 __slots__ = ( 

1575 "_maintscript", 

1576 "_maintscript_snippets", 

1577 "_package_substitution", 

1578 ) 

1579 

1580 def __init__( 

1581 self, 

1582 plugin_metadata: DebputyPluginMetadata, 

1583 plugin_source_id: str, 

1584 package_metadata_context: PackageProcessingContext, 

1585 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1586 substvars: FlushableSubstvars, 

1587 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1588 package_substitution: Substitution, 

1589 shlibs_details: tuple[str | None, list[str] | None], 

1590 *, 

1591 default_snippet_order: Literal["service"] | None = None, 

1592 ) -> None: 

1593 super().__init__( 

1594 plugin_metadata, 

1595 plugin_source_id, 

1596 package_metadata_context, 

1597 triggers, 

1598 substvars, 

1599 shlibs_details, 

1600 ) 

1601 self._maintscript_snippets = maintscript_snippets 

1602 self._package_substitution = package_substitution 

1603 self._maintscript = MaintscriptAccessorProvider( 

1604 plugin_metadata, 

1605 plugin_source_id, 

1606 maintscript_snippets, 

1607 package_substitution, 

1608 default_snippet_order=default_snippet_order, 

1609 ) 

1610 

1611 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1612 return MaintscriptAccessorProvider( 

1613 self._plugin_metadata, 

1614 self._plugin_source_id, 

1615 self._maintscript_snippets, 

1616 self._package_substitution, 

1617 ) 

1618 

1619 

1620class BinaryCtrlAccessorProviderCreator: 

1621 def __init__( 

1622 self, 

1623 package_metadata_context: PackageProcessingContext, 

1624 substvars: FlushableSubstvars, 

1625 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1626 substitution: Substitution, 

1627 ) -> None: 

1628 self._package_metadata_context = package_metadata_context 

1629 self._substvars = substvars 

1630 self._maintscript_snippets = maintscript_snippets 

1631 self._substitution = substitution 

1632 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1633 self.shlibs_details: tuple[str | None, list[str] | None] = None, None 

1634 

1635 def for_plugin( 

1636 self, 

1637 plugin_metadata: DebputyPluginMetadata, 

1638 plugin_source_id: str, 

1639 *, 

1640 default_snippet_order: Literal["service"] | None = None, 

1641 ) -> BinaryCtrlAccessor: 

1642 return BinaryCtrlAccessorProvider( 

1643 plugin_metadata, 

1644 plugin_source_id, 

1645 self._package_metadata_context, 

1646 self._triggers, 

1647 self._substvars, 

1648 self._maintscript_snippets, 

1649 self._substitution, 

1650 self.shlibs_details, 

1651 default_snippet_order=default_snippet_order, 

1652 ) 

1653 

1654 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1655 return self._triggers.values() 

1656 

1657 

1658def _resolve_bundled_plugin_docs_path( 

1659 plugin_name: str, 

1660 loader: PluginInitializationEntryPoint | None, 

1661) -> Traversable | Path | None: 

1662 plugin_module = getattr(loader, "__module__") 

1663 assert plugin_module is not None 

1664 plugin_package_name = sys.modules[plugin_module].__package__ 

1665 return importlib.resources.files(plugin_package_name).joinpath( 

1666 f"{plugin_name}_docs.yaml" 

1667 ) 

1668 

1669 

1670def plugin_metadata_for_debputys_own_plugin( 

1671 loader: PluginInitializationEntryPoint | None = None, 

1672) -> DebputyPluginMetadata: 

1673 if loader is None: 

1674 from debputy.plugins.debputy.debputy_plugin import ( 

1675 initialize_debputy_features, 

1676 ) 

1677 

1678 loader = initialize_debputy_features 

1679 plugin_name = "debputy" 

1680 return DebputyPluginMetadata( 

1681 plugin_name="debputy", 

1682 api_compat_version=1, 

1683 plugin_initializer=loader, 

1684 plugin_loader=None, 

1685 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path( 

1686 plugin_name, 

1687 loader, 

1688 ), 

1689 plugin_path="<bundled>", 

1690 ) 

1691 

1692 

1693def load_plugin_features( 

1694 plugin_search_dirs: Sequence[str], 

1695 substitution: Substitution, 

1696 requested_plugins_only: Sequence[str] | None = None, 

1697 required_plugins: set[str] | None = None, 

1698 plugin_feature_set: PluginProvidedFeatureSet | None = None, 

1699 debug_mode: bool = False, 

1700) -> PluginProvidedFeatureSet: 

1701 if plugin_feature_set is None: 

1702 plugin_feature_set = PluginProvidedFeatureSet() 

1703 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1704 unloadable_plugins = set() 

1705 if required_plugins: 

1706 plugins.extend( 

1707 find_json_plugins( 

1708 plugin_search_dirs, 

1709 required_plugins, 

1710 ) 

1711 ) 

1712 if requested_plugins_only is not None: 

1713 plugins.extend( 

1714 find_json_plugins( 

1715 plugin_search_dirs, 

1716 requested_plugins_only, 

1717 ) 

1718 ) 

1719 else: 

1720 auto_loaded = _find_all_json_plugins( 

1721 plugin_search_dirs, 

1722 required_plugins if required_plugins is not None else frozenset(), 

1723 debug_mode=debug_mode, 

1724 ) 

1725 for plugin_metadata in auto_loaded: 

1726 plugins.append(plugin_metadata) 

1727 unloadable_plugins.add(plugin_metadata.plugin_name) 

1728 

1729 for plugin_metadata in plugins: 

1730 api = DebputyPluginInitializerProvider( 

1731 plugin_metadata, plugin_feature_set, substitution 

1732 ) 

1733 try: 

1734 api.load_plugin() 

1735 except PluginBaseError as e: 

1736 if plugin_metadata.plugin_name not in unloadable_plugins: 

1737 raise 

1738 if debug_mode: 

1739 _warn( 

1740 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1741 f" to --debug/-d or DEBPUTY_DEBUG=1" 

1742 ) 

1743 raise 

1744 try: 

1745 api.unload_plugin() 

1746 except Exception: 

1747 _warn( 

1748 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1749 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1750 " module might have tainted the feature set." 

1751 ) 

1752 raise e from None 

1753 else: 

1754 _warn( 

1755 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1756 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace" 

1757 f" (the warning will become an error)" 

1758 ) 

1759 

1760 return plugin_feature_set 

1761 

1762 

1763def find_json_plugin( 

1764 search_dirs: Sequence[str], 

1765 requested_plugin: str, 

1766) -> DebputyPluginMetadata: 

1767 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1768 assert len(r) == 1 

1769 return r[0] 

1770 

1771 

1772def find_related_implementation_files_for_plugin( 

1773 plugin_metadata: DebputyPluginMetadata, 

1774) -> list[str]: 

1775 if plugin_metadata.is_bundled: 

1776 plugin_name = plugin_metadata.plugin_name 

1777 _error( 

1778 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1779 " or loaded via a mechanism that does not support detecting its tests." 

1780 ) 

1781 

1782 if plugin_metadata.is_from_python_path: 

1783 plugin_name = plugin_metadata.plugin_name 

1784 # Maybe they could be, but that is for another day. 

1785 _error( 

1786 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1787 " and these are not supported." 

1788 ) 

1789 files = [] 

1790 module_name, module_file = _find_plugin_implementation_file( 

1791 plugin_metadata.plugin_name, 

1792 plugin_metadata.plugin_path, 

1793 ) 

1794 if os.path.isfile(module_file): 

1795 files.append(module_file) 

1796 else: 

1797 if not plugin_metadata.is_loaded: 

1798 plugin_metadata.load_plugin() 

1799 if module_name in sys.modules: 

1800 _error( 

1801 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1802 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1803 f" installed via this method. The related Python would not be installed" 

1804 f" (which would result in a plugin that would fail to load)" 

1805 ) 

1806 

1807 return files 

1808 

1809 

1810def find_tests_for_plugin( 

1811 plugin_metadata: DebputyPluginMetadata, 

1812) -> list[str]: 

1813 plugin_name = plugin_metadata.plugin_name 

1814 plugin_path = plugin_metadata.plugin_path 

1815 

1816 if plugin_metadata.is_bundled: 

1817 _error( 

1818 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1819 " mechanism that does not support detecting its tests." 

1820 ) 

1821 

1822 if plugin_metadata.is_from_python_path: 

1823 plugin_name = plugin_metadata.plugin_name 

1824 # Maybe they could be, but that is for another day. 

1825 _error( 

1826 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1827 " and these are not supported." 

1828 ) 

1829 

1830 plugin_dir = os.path.dirname(plugin_path) 

1831 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1832 tests = [] 

1833 with os.scandir(plugin_dir) as dir_iter: 

1834 for p in dir_iter: 

1835 if ( 

1836 p.is_file() 

1837 and p.name.startswith(test_basename_prefix) 

1838 and PLUGIN_TEST_SUFFIX.search(p.name) 

1839 ): 

1840 tests.append(p.path) 

1841 return tests 

1842 

1843 

1844def find_json_plugins( 

1845 search_dirs: Sequence[str], 

1846 requested_plugins: Iterable[str], 

1847) -> Iterable[DebputyPluginMetadata]: 

1848 for plugin_name_or_path in requested_plugins: 1848 ↛ exitline 1848 didn't return from function 'find_json_plugins' because the loop on line 1848 didn't complete

1849 if "/" in plugin_name_or_path: 1849 ↛ 1850line 1849 didn't jump to line 1850 because the condition on line 1849 was never true

1850 if not os.path.isfile(plugin_name_or_path): 

1851 raise PluginNotFoundError( 

1852 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1853 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1854 " is not used." 

1855 ) 

1856 yield parse_json_plugin_desc(plugin_name_or_path) 

1857 return 

1858 for search_dir in search_dirs: 1858 ↛ 1867line 1858 didn't jump to line 1867 because the loop on line 1858 didn't complete

1859 path = os.path.join( 

1860 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1861 ) 

1862 if not os.path.isfile(path): 1862 ↛ 1863line 1862 didn't jump to line 1863 because the condition on line 1862 was never true

1863 continue 

1864 yield parse_json_plugin_desc(path) 

1865 return 

1866 

1867 path_root = PLUGIN_PYTHON_RES_PATH 

1868 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json") 

1869 if pp_path or pp_path.is_file(): 

1870 with pp_path.open() as fd: 

1871 yield parse_json_plugin_desc( 

1872 f"PYTHONPATH:debputy/plugins/{pp_path.name}", 

1873 fd=fd, 

1874 is_from_python_path=True, 

1875 ) 

1876 return 

1877 

1878 search_dir_str = ":".join(search_dirs) 

1879 raise PluginNotFoundError( 

1880 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1881 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1882 ) 

1883 

1884 

1885def _find_all_json_plugins( 

1886 search_dirs: Sequence[str], 

1887 required_plugins: AbstractSet[str], 

1888 debug_mode: bool = False, 

1889) -> Iterable[DebputyPluginMetadata]: 

1890 seen = set(required_plugins) 

1891 error_seen = False 

1892 for search_dir in search_dirs: 

1893 try: 

1894 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1895 except FileNotFoundError: 

1896 continue 

1897 with dir_fd: 

1898 for entry in dir_fd: 

1899 if ( 

1900 not entry.is_file(follow_symlinks=True) 

1901 or not entry.name.endswith(".json") 

1902 or entry.name in seen 

1903 ): 

1904 continue 

1905 seen.add(entry.name) 

1906 try: 

1907 plugin_metadata = parse_json_plugin_desc(entry.path) 

1908 except PluginBaseError as e: 

1909 if debug_mode: 

1910 raise 

1911 if not error_seen: 

1912 error_seen = True 

1913 _warn( 

1914 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1915 ) 

1916 else: 

1917 _warn( 

1918 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1919 ) 

1920 else: 

1921 yield plugin_metadata 

1922 

1923 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir(): 

1924 if ( 

1925 not pp_entry.name.endswith(".json") 

1926 or not pp_entry.is_file() 

1927 or pp_entry.name in seen 

1928 ): 

1929 continue 

1930 seen.add(pp_entry.name) 

1931 with pp_entry.open() as fd: 

1932 yield parse_json_plugin_desc( 

1933 f"PYTHONPATH:debputy/plugins/{pp_entry.name}", 

1934 fd=fd, 

1935 is_from_python_path=True, 

1936 ) 

1937 

1938 

1939def _find_plugin_implementation_file( 

1940 plugin_name: str, 

1941 json_file_path: str, 

1942) -> tuple[str, str]: 

1943 guessed_module_basename = plugin_name.replace("-", "_") 

1944 module_name = f"debputy.plugins.{guessed_module_basename}" 

1945 module_fs_path = os.path.join( 

1946 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1947 ) 

1948 return module_name, module_fs_path 

1949 

1950 

1951def _resolve_module_initializer( 

1952 plugin_name: str, 

1953 plugin_initializer_name: str, 

1954 module_name: str | None, 

1955 json_file_path: str, 

1956) -> PluginInitializationEntryPoint: 

1957 module = None 

1958 module_fs_path = None 

1959 if module_name is None: 1959 ↛ 1987line 1959 didn't jump to line 1987 because the condition on line 1959 was always true

1960 module_name, module_fs_path = _find_plugin_implementation_file( 

1961 plugin_name, json_file_path 

1962 ) 

1963 if os.path.isfile(module_fs_path): 1963 ↛ 1987line 1963 didn't jump to line 1987 because the condition on line 1963 was always true

1964 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1965 if spec is None: 1965 ↛ 1966line 1965 didn't jump to line 1966 because the condition on line 1965 was never true

1966 raise PluginInitializationError( 

1967 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1968 " The spec_from_file_location function returned None." 

1969 ) 

1970 mod = importlib.util.module_from_spec(spec) 

1971 loader = spec.loader 

1972 if loader is None: 1972 ↛ 1973line 1972 didn't jump to line 1973 because the condition on line 1972 was never true

1973 raise PluginInitializationError( 

1974 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1975 " Python could not find a suitable loader (spec.loader was None)" 

1976 ) 

1977 sys.modules[module_name] = mod 

1978 try: 

1979 run_in_context_of_plugin(plugin_name, loader.exec_module, mod) 

1980 except (Exception, GeneratorExit) as e: 

1981 raise PluginInitializationError( 

1982 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1983 " The module threw an exception while being loaded." 

1984 ) from e 

1985 module = mod 

1986 

1987 if module is None: 1987 ↛ 1988line 1987 didn't jump to line 1988 because the condition on line 1987 was never true

1988 try: 

1989 module = run_in_context_of_plugin( 

1990 plugin_name, importlib.import_module, module_name 

1991 ) 

1992 except ModuleNotFoundError as e: 

1993 if module_fs_path is None: 

1994 raise PluginMetadataError( 

1995 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1996 " this module is not available in the python search path" 

1997 ) from e 

1998 raise PluginInitializationError( 

1999 f"Failed to load {plugin_name}. Tried loading it from" 

2000 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

2001 f" {module_name} (where it was not found either). Please ensure" 

2002 " the module code is installed in the correct spot or provide an" 

2003 f' explicit "module" definition in {json_file_path}.' 

2004 ) from e 

2005 

2006 plugin_initializer = run_in_context_of_plugin_wrap_errors( 

2007 plugin_name, 

2008 getattr, 

2009 module, 

2010 plugin_initializer_name, 

2011 None, 

2012 ) 

2013 

2014 if plugin_initializer is None: 2014 ↛ 2015line 2014 didn't jump to line 2015 because the condition on line 2014 was never true

2015 raise PluginMetadataError( 

2016 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2017 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute' 

2018 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name" 

2019 " in the Python module." 

2020 ) 

2021 if isinstance(plugin_initializer, DebputyPluginDefinition): 

2022 return plugin_initializer.initialize 

2023 if not callable(plugin_initializer): 2023 ↛ 2024line 2023 didn't jump to line 2024 because the condition on line 2023 was never true

2024 raise PluginMetadataError( 

2025 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

2026 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that' 

2027 " attribute exists, it is neither a `DebputyPluginDefinition`" 

2028 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`" 

2029 " (`def initialize(api: DebputyPluginInitializer) -> None:`)." 

2030 ) 

2031 return cast("PluginInitializationEntryPoint", plugin_initializer) 

2032 

2033 

2034def _json_plugin_loader( 

2035 plugin_name: str, 

2036 plugin_json_metadata: PluginJsonMetadata, 

2037 json_file_path: str, 

2038 attribute_path: AttributePath, 

2039) -> Callable[["DebputyPluginInitializer"], None]: 

2040 api_compat = plugin_json_metadata["api_compat_version"] 

2041 module_name = plugin_json_metadata.get("module") 

2042 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

2043 packager_provided_files_raw = plugin_json_metadata.get( 

2044 "packager_provided_files", [] 

2045 ) 

2046 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

2047 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

2048 if api_compat != 1: 2048 ↛ 2049line 2048 didn't jump to line 2049 because the condition on line 2048 was never true

2049 raise PluginMetadataError( 

2050 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

2051 f" version of debputy only supports API compat version of 1" 

2052 ) 

2053 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2053 ↛ 2054line 2053 didn't jump to line 2054 because the condition on line 2053 was never true

2054 p = attribute_path["plugin_initializer"] 

2055 raise PluginMetadataError( 

2056 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

2057 ) 

2058 

2059 plugin_initializers = [] 

2060 

2061 if plugin_initializer_name is not None: 

2062 plugin_initializer = _resolve_module_initializer( 

2063 plugin_name, 

2064 plugin_initializer_name, 

2065 module_name, 

2066 json_file_path, 

2067 ) 

2068 plugin_initializers.append(plugin_initializer) 

2069 

2070 if known_packaging_files_raw: 

2071 kpf_root_path = attribute_path["known_packaging_files"] 

2072 known_packaging_files = [] 

2073 for k, v in enumerate(known_packaging_files_raw): 

2074 kpf_path = kpf_root_path[k] 

2075 p = v.get("path") 

2076 if isinstance(p, str): 2076 ↛ 2078line 2076 didn't jump to line 2078 because the condition on line 2076 was always true

2077 kpf_path.path_hint = p 

2078 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2078 ↛ 2090line 2078 didn't jump to line 2090 because the condition on line 2078 was always true

2079 docs = v.get("documentation-uris") 

2080 if docs is not None and isinstance(docs, list): 

2081 docs = [ 

2082 ( 

2083 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

2084 if isinstance(d, str) 

2085 else d 

2086 ) 

2087 for d in docs 

2088 ] 

2089 v["documentation-uris"] = docs 

2090 known_packaging_file: KnownPackagingFileInfo = ( 

2091 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

2092 v, 

2093 kpf_path, 

2094 ) 

2095 ) 

2096 known_packaging_files.append((kpf_path, known_packaging_file)) 

2097 

2098 def _initialize_json_provided_known_packaging_files( 

2099 api: DebputyPluginInitializerProvider, 

2100 ) -> None: 

2101 for p, details in known_packaging_files: 

2102 try: 

2103 api.known_packaging_files(details) 

2104 except ValueError as ex: 

2105 raise PluginMetadataError( 

2106 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

2107 ) 

2108 

2109 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

2110 

2111 if manifest_variables_raw: 

2112 manifest_var_path = attribute_path["manifest_variables"] 

2113 manifest_variables = [ 

2114 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

2115 for i, p in enumerate(manifest_variables_raw) 

2116 ] 

2117 

2118 def _initialize_json_provided_manifest_vars( 

2119 api: DebputyPluginInitializer, 

2120 ) -> None: 

2121 for idx, manifest_variable in enumerate(manifest_variables): 

2122 name = manifest_variable["name"] 

2123 value = manifest_variable["value"] 

2124 doc = manifest_variable.get("reference_documentation") 

2125 try: 

2126 api.manifest_variable( 

2127 name, value, variable_reference_documentation=doc 

2128 ) 

2129 except ValueError as ex: 

2130 var_path = manifest_var_path[idx] 

2131 raise PluginMetadataError( 

2132 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

2133 ) 

2134 

2135 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

2136 

2137 if packager_provided_files_raw: 

2138 ppf_path = attribute_path["packager_provided_files"] 

2139 ppfs = [ 

2140 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

2141 for i, p in enumerate(packager_provided_files_raw) 

2142 ] 

2143 

2144 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

2145 ppf: PackagerProvidedFileJsonDescription 

2146 for idx, ppf in enumerate(ppfs): 

2147 c = dict(ppf) 

2148 stem = ppf["stem"] 

2149 installed_path = ppf["installed_path"] 

2150 default_mode = ppf.get("default_mode") 

2151 ref_doc_dict = ppf.get("reference_documentation") 

2152 if default_mode is not None: 2152 ↛ 2155line 2152 didn't jump to line 2155 because the condition on line 2152 was always true

2153 c["default_mode"] = default_mode.octal_mode 

2154 

2155 if ref_doc_dict is not None: 2155 ↛ 2160line 2155 didn't jump to line 2160 because the condition on line 2155 was always true

2156 ref_doc = packager_provided_file_reference_documentation( 

2157 **ref_doc_dict 

2158 ) 

2159 else: 

2160 ref_doc = None 

2161 

2162 for k in [ 

2163 "stem", 

2164 "installed_path", 

2165 "reference_documentation", 

2166 ]: 

2167 try: 

2168 del c[k] 

2169 except KeyError: 

2170 pass 

2171 

2172 try: 

2173 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

2174 except ValueError as ex: 

2175 p_path = ppf_path[idx] 

2176 raise PluginMetadataError( 

2177 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

2178 ) 

2179 

2180 plugin_initializers.append(_initialize_json_provided_ppfs) 

2181 

2182 if not plugin_initializers: 2182 ↛ 2183line 2182 didn't jump to line 2183 because the condition on line 2182 was never true

2183 raise PluginMetadataError( 

2184 f"The plugin defined in {json_file_path} does not seem to provide features," 

2185 f" such as module + plugin-initializer or packager-provided-files." 

2186 ) 

2187 

2188 if len(plugin_initializers) == 1: 

2189 return plugin_initializers[0] 

2190 

2191 def _chain_loader(api: DebputyPluginInitializer) -> None: 

2192 for initializer in plugin_initializers: 

2193 initializer(api) 

2194 

2195 return _chain_loader 

2196 

2197 

2198@overload 

2199@contextlib.contextmanager 

2200def _open( 2200 ↛ exitline 2200 didn't return from function '_open' because

2201 path: str, 

2202 fd: IO[AnyStr] | IOBase = ..., 

2203) -> Iterator[IO[AnyStr] | IOBase]: ... 

2204 

2205 

2206@overload 

2207@contextlib.contextmanager 

2208def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2208 ↛ exitline 2208 didn't return from function '_open' because

2209 

2210 

2211@contextlib.contextmanager 

2212def _open( 

2213 path: str, fd: IO[AnyStr] | IOBase | None = None 

2214) -> Iterator[IO[AnyStr] | IOBase]: 

2215 if fd is not None: 

2216 yield fd 

2217 else: 

2218 with open(path, "rb") as fd: 

2219 yield fd 

2220 

2221 

2222def _resolve_json_plugin_docs_path( 

2223 plugin_name: str, 

2224 plugin_path: str, 

2225) -> Traversable | Path | None: 

2226 plugin_dir = os.path.dirname(plugin_path) 

2227 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml")) 

2228 

2229 

2230def parse_json_plugin_desc( 

2231 path: str, 

2232 *, 

2233 fd: IO[AnyStr] | IOBase | None = None, 

2234 is_from_python_path: bool = False, 

2235) -> DebputyPluginMetadata: 

2236 with _open(path, fd=fd) as rfd: 

2237 try: 

2238 raw = json.load(rfd) 

2239 except JSONDecodeError as e: 

2240 raise PluginMetadataError( 

2241 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

2242 ) from e 

2243 plugin_name = os.path.basename(path) 

2244 if plugin_name.endswith(".json"): 

2245 plugin_name = plugin_name[:-5] 

2246 elif plugin_name.endswith(".json.in"): 

2247 plugin_name = plugin_name[:-8] 

2248 

2249 if plugin_name == "debputy": 2249 ↛ 2251line 2249 didn't jump to line 2251 because the condition on line 2249 was never true

2250 # Provide a better error message than "The plugin has already loaded!?" 

2251 raise PluginMetadataError( 

2252 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

2253 f" clash with the bundled plugin of same name." 

2254 ) 

2255 

2256 attribute_path = AttributePath.root_path(raw) 

2257 

2258 try: 

2259 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

2260 raw, 

2261 attribute_path, 

2262 ) 

2263 except ManifestParseException as e: 

2264 raise PluginMetadataError( 

2265 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

2266 ) from e 

2267 api_compat = plugin_json_metadata["api_compat_version"] 

2268 

2269 return DebputyPluginMetadata( 

2270 plugin_name=plugin_name, 

2271 plugin_loader=lambda: _json_plugin_loader( 

2272 plugin_name, 

2273 plugin_json_metadata, 

2274 path, 

2275 attribute_path, 

2276 ), 

2277 api_compat_version=api_compat, 

2278 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path( 

2279 plugin_name, path 

2280 ), 

2281 plugin_initializer=None, 

2282 plugin_path=path, 

2283 is_from_python_path=is_from_python_path, 

2284 ) 

2285 

2286 

2287@dataclasses.dataclass(slots=True, frozen=True) 

2288class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

2289 name: str 

2290 names: Sequence[str] 

2291 path: VirtualPath 

2292 type_of_service: str 

2293 service_scope: str 

2294 auto_enable_on_install: bool 

2295 auto_start_on_install: bool 

2296 on_upgrade: ServiceUpgradeRule 

2297 definition_source: str 

2298 is_plugin_provided_definition: bool 

2299 service_context: DSD | None 

2300 

2301 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

2302 return dataclasses.replace(self, **changes) 

2303 

2304 

2305class ServiceRegistryImpl(ServiceRegistry[DSD]): 

2306 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

2307 

2308 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

2309 self._service_manager_details = service_manager_details 

2310 self._service_definitions: list[ServiceDefinition[DSD]] = [] 

2311 self._seen_services: set[tuple[str, str, str]] = set() 

2312 

2313 @property 

2314 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

2315 return self._service_definitions 

2316 

2317 def register_service( 

2318 self, 

2319 path: VirtualPath, 

2320 name: str | list[str], 

2321 *, 

2322 type_of_service: str = "service", # "timer", etc. 

2323 service_scope: str = "system", 

2324 enable_by_default: bool = True, 

2325 start_by_default: bool = True, 

2326 default_upgrade_rule: ServiceUpgradeRule = "restart", 

2327 service_context: DSD | None = None, 

2328 ) -> None: 

2329 names = name if isinstance(name, list) else [name] 

2330 if len(names) < 1: 

2331 raise ValueError( 

2332 f"The service must have at least one name - {path.absolute} did not have any" 

2333 ) 

2334 for n in names: 

2335 key = (n, type_of_service, service_scope) 

2336 if key in self._seen_services: 

2337 raise PluginAPIViolationError( 

2338 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

2339 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

2340 " allowed by the debputy plugin API." 

2341 ) 

2342 # TODO: We cannot create a service definition immediate once the manifest is involved 

2343 self._service_definitions.append( 

2344 ServiceDefinitionImpl( 

2345 names[0], 

2346 names, 

2347 path, 

2348 type_of_service, 

2349 service_scope, 

2350 enable_by_default, 

2351 start_by_default, 

2352 default_upgrade_rule, 

2353 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

2354 True, 

2355 service_context, 

2356 ) 

2357 )