Coverage for src/debputy/plugin/api/impl.py: 60%

890 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.resources 

6import importlib.util 

7import itertools 

8import json 

9import os 

10import re 

11import subprocess 

12import sys 

13from abc import ABC 

14from importlib.resources.abc import Traversable 

15from io import IOBase, BytesIO 

16from json import JSONDecodeError 

17from pathlib import Path 

18from typing import ( 

19 Optional, 

20 Dict, 

21 Tuple, 

22 Type, 

23 List, 

24 Union, 

25 Set, 

26 IO, 

27 AbstractSet, 

28 cast, 

29 FrozenSet, 

30 Any, 

31 Literal, 

32 TYPE_CHECKING, 

33 is_typeddict, 

34 AnyStr, 

35 overload, 

36) 

37from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container 

38 

39import debputy 

40from debputy import DEBPUTY_DOC_ROOT_DIR 

41from debputy.exceptions import ( 

42 DebputySubstitutionError, 

43 PluginConflictError, 

44 PluginMetadataError, 

45 PluginBaseError, 

46 PluginInitializationError, 

47 PluginAPIViolationError, 

48 PluginNotFoundError, 

49 PluginIncorrectRegistrationError, 

50) 

51from debputy.maintscript_snippet import ( 

52 STD_CONTROL_SCRIPTS, 

53 MaintscriptSnippetContainer, 

54 MaintscriptSnippet, 

55) 

56from debputy.manifest_parser.exceptions import ManifestParseException 

57from debputy.manifest_parser.parser_data import ParserContextData 

58from debputy.manifest_parser.tagging_types import TypeMapping 

59from debputy.manifest_parser.util import AttributePath 

60from debputy.manifest_parser.util import resolve_package_type_selectors 

61from debputy.plugin.api.doc_parsing import ( 

62 DEBPUTY_DOC_REFERENCE_DATA_PARSER, 

63 parser_type_name, 

64 DebputyParsedDoc, 

65) 

66from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

67from debputy.plugin.api.impl_types import ( 

68 DebputyPluginMetadata, 

69 PackagerProvidedFileClassSpec, 

70 MetadataOrMaintscriptDetector, 

71 PluginProvidedTrigger, 

72 TTP, 

73 DIPHandler, 

74 PF, 

75 SF, 

76 DIPKWHandler, 

77 PluginProvidedManifestVariable, 

78 PluginProvidedPackageProcessor, 

79 PluginProvidedDiscardRule, 

80 AutomaticDiscardRuleExample, 

81 PPFFormatParam, 

82 ServiceManagerDetails, 

83 KnownPackagingFileInfo, 

84 PluginProvidedKnownPackagingFile, 

85 DHCompatibilityBasedRule, 

86 PluginProvidedTypeMapping, 

87 PluginProvidedBuildSystemAutoDetection, 

88 BSR, 

89) 

90from debputy.plugin.api.plugin_parser import ( 

91 PLUGIN_METADATA_PARSER, 

92 PluginJsonMetadata, 

93 PLUGIN_PPF_PARSER, 

94 PackagerProvidedFileJsonDescription, 

95 PLUGIN_MANIFEST_VARS_PARSER, 

96 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

97) 

98from debputy.plugin.api.spec import ( 

99 MaintscriptAccessor, 

100 Maintscript, 

101 DpkgTriggerType, 

102 BinaryCtrlAccessor, 

103 PackageProcessingContext, 

104 MetadataAutoDetector, 

105 PluginInitializationEntryPoint, 

106 DebputyPluginInitializer, 

107 PackageTypeSelector, 

108 FlushableSubstvars, 

109 ParserDocumentation, 

110 PackageProcessor, 

111 VirtualPath, 

112 ServiceIntegrator, 

113 ServiceDetector, 

114 ServiceRegistry, 

115 ServiceDefinition, 

116 DSD, 

117 ServiceUpgradeRule, 

118 PackagerProvidedFileReferenceDocumentation, 

119 packager_provided_file_reference_documentation, 

120 TypeMappingDocumentation, 

121 DebputyIntegrationMode, 

122 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

123 BuildSystemManifestRuleMetadata, 

124 INTEGRATION_MODE_FULL, 

125 only_integrations, 

126 DebputyPluginDefinition, 

127) 

128from debputy.plugin.api.std_docs import _STD_ATTR_DOCS 

129from debputy.plugins.debputy.to_be_api_types import ( 

130 BuildRuleParsedFormat, 

131 BSPF, 

132 debputy_build_system, 

133) 

134from debputy.plugin.plugin_state import ( 

135 run_in_context_of_plugin, 

136 run_in_context_of_plugin_wrap_errors, 

137 wrap_plugin_code, 

138) 

139from debputy.substitution import ( 

140 Substitution, 

141 VariableNameState, 

142 SUBST_VAR_RE, 

143 VariableContext, 

144) 

145from debputy.util import ( 

146 _normalize_path, 

147 POSTINST_DEFAULT_CONDITION, 

148 _error, 

149 print_command, 

150 _warn, 

151 _debug_log, 

152) 

153from debputy.yaml import MANIFEST_YAML 

154 

155if TYPE_CHECKING: 

156 from debputy.highlevel_manifest import HighLevelManifest 

157 

158PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

159PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__) 

160 

161 

162def _validate_known_packaging_file_dh_compat_rules( 

163 dh_compat_rules: list[DHCompatibilityBasedRule] | None, 

164) -> None: 

165 max_compat = None 

166 if not dh_compat_rules: 166 ↛ 169line 166 didn't jump to line 169 because the condition on line 166 was always true

167 return 

168 dh_compat_rule: DHCompatibilityBasedRule 

169 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

170 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

171 compat = dh_compat_rule.get("starting_with_compat_level") 

172 

173 remaining = dh_compat_rule.keys() - { 

174 "after_debhelper_version", 

175 "starting_with_compat_level", 

176 } 

177 if not remaining: 

178 raise ValueError( 

179 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?" 

180 ) 

181 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

182 raise ValueError( 

183 f"The dh compat-rule at index {idx} is not the last and is missing either" 

184 " before-debhelper-version or before-compat-level" 

185 ) 

186 if compat is not None and compat < 0: 

187 raise ValueError( 

188 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

189 f" for something that appeared when migrating from {compat} to {compat + 1}." 

190 ) 

191 

192 if max_compat is None: 

193 max_compat = compat 

194 elif compat is not None: 

195 if compat >= max_compat: 

196 raise ValueError( 

197 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

198 ) 

199 max_compat = compat 

200 

201 install_pattern = dh_compat_rule.get("install_pattern") 

202 if ( 

203 install_pattern is not None 

204 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

205 ): 

206 raise ValueError( 

207 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

208 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

209 ) 

210 

211 

212class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

213 __slots__ = ( 

214 "_plugin_metadata", 

215 "_feature_set", 

216 "_plugin_detector_ids", 

217 "_substitution", 

218 "_unloaders", 

219 "_is_doc_cache_resolved", 

220 "_doc_cache", 

221 "_load_started", 

222 ) 

223 

224 def __init__( 

225 self, 

226 plugin_metadata: DebputyPluginMetadata, 

227 feature_set: PluginProvidedFeatureSet, 

228 substitution: Substitution, 

229 ) -> None: 

230 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

231 self._feature_set = feature_set 

232 self._plugin_detector_ids: set[str] = set() 

233 self._substitution = substitution 

234 self._unloaders: list[Callable[[], None]] = [] 

235 self._is_doc_cache_resolved: bool = False 

236 self._doc_cache: DebputyParsedDoc | None = None 

237 self._load_started = False 

238 

239 @property 

240 def plugin_metadata(self) -> DebputyPluginMetadata: 

241 return self._plugin_metadata 

242 

243 def unload_plugin(self) -> None: 

244 if self._load_started: 

245 for unloader in self._unloaders: 

246 unloader() 

247 del self._feature_set.plugin_data[self._plugin_name] 

248 

249 def load_plugin(self) -> None: 

250 metadata = self._plugin_metadata 

251 if metadata.plugin_name in self._feature_set.plugin_data: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 raise PluginConflictError( 

253 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

254 ) 

255 assert ( 

256 metadata.api_compat_version == 1 

257 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

258 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

259 self._load_started = True 

260 assert not metadata.is_initialized 

261 try: 

262 metadata.initialize_plugin(self) 

263 except Exception as e: 

264 initializer = metadata.plugin_initializer 

265 if ( 265 ↛ 270line 265 didn't jump to line 270 because the condition on line 265 was never true

266 isinstance(e, TypeError) 

267 and initializer is not None 

268 and not callable(initializer) 

269 ): 

270 raise PluginMetadataError( 

271 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

272 f" callable (callable returns False). The specified entry point identifies" 

273 f' itself as "{initializer.__qualname__}".' 

274 ) from e 

275 elif isinstance(e, PluginBaseError): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true

276 raise 

277 raise PluginInitializationError( 

278 f"Exception while attempting to load plugin {metadata.plugin_name}" 

279 ) from e 

280 

281 def _resolve_docs(self) -> DebputyParsedDoc | None: 

282 doc_cache = self._doc_cache 

283 if doc_cache is not None: 

284 return doc_cache 

285 

286 plugin_doc_path = self._plugin_metadata.plugin_doc_path 

287 if plugin_doc_path is None or self._is_doc_cache_resolved: 

288 self._is_doc_cache_resolved = True 

289 return None 

290 try: 

291 with plugin_doc_path.open("r", encoding="utf-8") as fd: 

292 raw = MANIFEST_YAML.load(fd) 

293 except FileNotFoundError: 

294 _debug_log( 

295 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}" 

296 ) 

297 self._is_doc_cache_resolved = True 

298 return None 

299 attr_path = AttributePath.root_path(plugin_doc_path) 

300 try: 

301 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path) 

302 except ManifestParseException as e: 

303 raise ValueError( 

304 f"Could not parse documentation in {plugin_doc_path}: {e.message}" 

305 ) from e 

306 try: 

307 res = DebputyParsedDoc.from_ref_data(ref) 

308 except ValueError as e: 

309 raise ValueError( 

310 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}" 

311 ) from e 

312 

313 self._doc_cache = res 

314 self._is_doc_cache_resolved = True 

315 return res 

316 

317 def _pluggable_manifest_docs_for( 

318 self, 

319 rule_type: TTP | str, 

320 rule_name: str | list[str], 

321 *, 

322 inline_reference_documentation: ParserDocumentation | None = None, 

323 ) -> ParserDocumentation | None: 

324 ref_data = self._resolve_docs() 

325 if ref_data is not None: 

326 primary_rule_name = ( 

327 rule_name if isinstance(rule_name, str) else rule_name[0] 

328 ) 

329 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}" 

330 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref) 

331 if resolved_docs is not None: 

332 if inline_reference_documentation is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise ValueError( 

334 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via" 

335 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so" 

336 f" there is only one doc reference" 

337 ) 

338 return resolved_docs 

339 return inline_reference_documentation 

340 

341 def packager_provided_file( 

342 self, 

343 stem: str, 

344 installed_path: str, 

345 *, 

346 default_mode: int = 0o0644, 

347 default_priority: int | None = None, 

348 allow_name_segment: bool = True, 

349 allow_architecture_segment: bool = False, 

350 post_formatting_rewrite: Callable[[str], str] | None = None, 

351 packageless_is_fallback_for_all_packages: bool = False, 

352 reservation_only: bool = False, 

353 format_callback: None | ( 

354 Callable[[str, PPFFormatParam, VirtualPath], str] 

355 ) = None, 

356 reference_documentation: None | ( 

357 PackagerProvidedFileReferenceDocumentation 

358 ) = None, 

359 ) -> None: 

360 packager_provided_files = self._feature_set.packager_provided_files 

361 existing = packager_provided_files.get(stem) 

362 

363 if format_callback is not None and self._plugin_name != "debputy": 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 raise ValueError( 

365 "Sorry; Using format_callback is a debputy-internal" 

366 f" API. Triggered by plugin {self._plugin_name}" 

367 ) 

368 

369 if installed_path.endswith("/"): 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise ValueError( 

371 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

372 f" Triggered by plugin {self._plugin_name}." 

373 ) 

374 

375 installed_path = _normalize_path(installed_path) 

376 

377 has_name_var = "{name}" in installed_path 

378 

379 if installed_path.startswith("./DEBIAN") or reservation_only: 

380 # Special-case, used for control files. 

381 if self._plugin_name != "debputy": 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 raise ValueError( 

383 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

384 f" API. Triggered by plugin {self._plugin_name}" 

385 ) 

386 elif not has_name_var and "{owning_package}" not in installed_path: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise ValueError( 

388 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

389 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

390 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

391 ) 

392 

393 if allow_name_segment and not has_name_var: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 raise ValueError( 

395 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

396 " variable. Otherwise, the name segment will not work properly. Triggered by" 

397 f" plugin {self._plugin_name}" 

398 ) 

399 

400 if ( 400 ↛ 405line 400 didn't jump to line 405 because the condition on line 400 was never true

401 default_priority is not None 

402 and "{priority}" not in installed_path 

403 and "{priority:02}" not in installed_path 

404 ): 

405 raise ValueError( 

406 'When default_priority is not None, the installed_path should have a "{priority}"' 

407 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

408 f" Triggered by plugin {self._plugin_name}" 

409 ) 

410 

411 if existing is not None: 

412 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 412 ↛ 419line 412 didn't jump to line 419 because the condition on line 412 was always true

413 message = ( 

414 f'The stem "{stem}" is registered twice for packager provided files.' 

415 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

416 f" by {self._plugin_name}" 

417 ) 

418 else: 

419 message = ( 

420 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

421 f' stem "{stem}" twice for packager provided files.' 

422 ) 

423 raise PluginConflictError( 

424 message, existing.debputy_plugin_metadata, self._plugin_metadata 

425 ) 

426 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

427 self._plugin_metadata, 

428 stem, 

429 installed_path, 

430 default_mode=default_mode, 

431 default_priority=default_priority, 

432 allow_name_segment=allow_name_segment, 

433 allow_architecture_segment=allow_architecture_segment, 

434 post_formatting_rewrite=post_formatting_rewrite, 

435 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

436 reservation_only=reservation_only, 

437 formatting_callback=format_callback, 

438 reference_documentation=reference_documentation, 

439 ) 

440 

441 def _unload() -> None: 

442 del packager_provided_files[stem] 

443 

444 self._unloaders.append(_unload) 

445 

446 def metadata_or_maintscript_detector( 

447 self, 

448 auto_detector_id: str, 

449 auto_detector: MetadataAutoDetector, 

450 *, 

451 package_type: PackageTypeSelector = "deb", 

452 ) -> None: 

453 if auto_detector_id in self._plugin_detector_ids: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true

454 raise ValueError( 

455 f"The plugin {self._plugin_name} tried to register" 

456 f' "{auto_detector_id}" twice' 

457 ) 

458 self._plugin_detector_ids.add(auto_detector_id) 

459 all_detectors = self._feature_set.metadata_maintscript_detectors 

460 if self._plugin_name not in all_detectors: 

461 all_detectors[self._plugin_name] = [] 

462 package_types = resolve_package_type_selectors(package_type) 

463 all_detectors[self._plugin_name].append( 

464 MetadataOrMaintscriptDetector( 

465 detector_id=auto_detector_id, 

466 detector=wrap_plugin_code(self._plugin_name, auto_detector), 

467 plugin_metadata=self._plugin_metadata, 

468 applies_to_package_types=package_types, 

469 enabled=True, 

470 ) 

471 ) 

472 

473 def _unload() -> None: 

474 if self._plugin_name in all_detectors: 

475 del all_detectors[self._plugin_name] 

476 

477 self._unloaders.append(_unload) 

478 

479 def document_builtin_variable( 

480 self, 

481 variable_name: str, 

482 variable_reference_documentation: str, 

483 *, 

484 is_context_specific: bool = False, 

485 is_for_special_case: bool = False, 

486 ) -> None: 

487 manifest_variables = self._feature_set.manifest_variables 

488 self._restricted_api() 

489 state = self._substitution.variable_state(variable_name) 

490 if state == VariableNameState.UNDEFINED: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 raise ValueError( 

492 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

493 f" but it is not known to be a variable" 

494 ) 

495 

496 assert variable_name not in manifest_variables 

497 

498 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

499 self._plugin_metadata, 

500 variable_name, 

501 None, 

502 is_context_specific_variable=is_context_specific, 

503 variable_reference_documentation=variable_reference_documentation, 

504 is_documentation_placeholder=True, 

505 is_for_special_case=is_for_special_case, 

506 ) 

507 

508 def _unload() -> None: 

509 del manifest_variables[variable_name] 

510 

511 self._unloaders.append(_unload) 

512 

513 def manifest_variable_provider( 

514 self, 

515 provider: Callable[[VariableContext], Mapping[str, str]], 

516 variables: Sequence[str] | Mapping[str, str | None], 

517 ) -> None: 

518 self._restricted_api() 

519 cached_provider = functools.lru_cache(None)(provider) 

520 permitted_variables = frozenset(variables) 

521 variables_iter: Iterable[tuple[str, str | None]] 

522 if not isinstance(variables, Mapping): 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 variables_iter = zip(variables, itertools.repeat(None)) 

524 else: 

525 variables_iter = variables.items() 

526 

527 checked_vars = False 

528 manifest_variables = self._feature_set.manifest_variables 

529 plugin_name = self._plugin_name 

530 

531 def _value_resolver_generator( 

532 variable_name: str, 

533 ) -> Callable[[VariableContext], str]: 

534 def _value_resolver(variable_context: VariableContext) -> str: 

535 res = cached_provider(variable_context) 

536 nonlocal checked_vars 

537 if not checked_vars: 537 ↛ 548line 537 didn't jump to line 548 because the condition on line 537 was always true

538 if permitted_variables != res.keys(): 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true

539 expected = ", ".join(sorted(permitted_variables)) 

540 actual = ", ".join(sorted(res)) 

541 raise PluginAPIViolationError( 

542 f"The plugin {plugin_name} claimed to provide" 

543 f" the following variables {expected}," 

544 f" but when resolving the variables, the plugin provided" 

545 f" {actual}. These two lists should have been the same." 

546 ) 

547 checked_vars = False 

548 return res[variable_name] 

549 

550 return _value_resolver 

551 

552 for varname, vardoc in variables_iter: 

553 self._check_variable_name(varname) 

554 manifest_variables[varname] = PluginProvidedManifestVariable( 

555 self._plugin_metadata, 

556 varname, 

557 _value_resolver_generator(varname), 

558 is_context_specific_variable=False, 

559 variable_reference_documentation=vardoc, 

560 ) 

561 

562 def _unload() -> None: 

563 raise PluginInitializationError( 

564 "Cannot unload manifest_variable_provider (not implemented)" 

565 ) 

566 

567 self._unloaders.append(_unload) 

568 

569 def _check_variable_name(self, variable_name: str) -> None: 

570 manifest_variables = self._feature_set.manifest_variables 

571 existing = manifest_variables.get(variable_name) 

572 

573 if existing is not None: 

574 if existing.plugin_metadata.plugin_name == self._plugin_name: 574 ↛ 580line 574 didn't jump to line 580 because the condition on line 574 was always true

575 message = ( 

576 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

577 f' manifest variable "{variable_name}" twice.' 

578 ) 

579 else: 

580 message = ( 

581 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

582 f" both tried to provide the manifest variable {variable_name}" 

583 ) 

584 raise PluginConflictError( 

585 message, existing.plugin_metadata, self._plugin_metadata 

586 ) 

587 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

588 raise ValueError( 

589 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

590 f" which is not a valid variable name" 

591 ) 

592 

593 namespace = "" 

594 variable_basename = variable_name 

595 if ":" in variable_name: 

596 namespace, variable_basename = variable_name.rsplit(":", 1) 

597 assert namespace != "" 

598 assert variable_name != "" 

599 

600 if namespace != "" and namespace not in ("token", "path"): 

601 raise ValueError( 

602 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

603 f" which is in the reserved namespace {namespace}" 

604 ) 

605 

606 variable_name_upper = variable_name.upper() 

607 if ( 

608 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

609 or variable_basename.startswith("_") 

610 or variable_basename.upper().startswith("DEBPUTY") 

611 ) and self._plugin_name != "debputy": 

612 raise ValueError( 

613 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

614 f" which is a variable name reserved by debputy" 

615 ) 

616 

617 state = self._substitution.variable_state(variable_name) 

618 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

619 raise ValueError( 

620 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

621 f" which would shadow a built-in variable" 

622 ) 

623 

624 def package_processor( 

625 self, 

626 processor_id: str, 

627 processor: PackageProcessor, 

628 *, 

629 depends_on_processor: Iterable[str] = tuple(), 

630 package_type: PackageTypeSelector = "deb", 

631 ) -> None: 

632 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"}) 

633 package_processors = self._feature_set.all_package_processors 

634 dependencies = set() 

635 processor_key = (self._plugin_name, processor_id) 

636 

637 if processor_key in package_processors: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 raise PluginConflictError( 

639 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

640 self._plugin_metadata, 

641 self._plugin_metadata, 

642 ) 

643 

644 for depends_ref in depends_on_processor: 

645 if isinstance(depends_ref, str): 645 ↛ 659line 645 didn't jump to line 659 because the condition on line 645 was always true

646 if (self._plugin_name, depends_ref) in package_processors: 646 ↛ 648line 646 didn't jump to line 648 because the condition on line 646 was always true

647 depends_key = (self._plugin_name, depends_ref) 

648 elif ("debputy", depends_ref) in package_processors: 

649 depends_key = ("debputy", depends_ref) 

650 else: 

651 raise ValueError( 

652 f'Could not resolve dependency "{depends_ref}" for' 

653 f' "{processor_id}". It was not provided by the plugin itself' 

654 f" ({self._plugin_name}) nor debputy." 

655 ) 

656 else: 

657 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

658 # via the direct dependencies. 

659 assert False 

660 

661 existing_processor = package_processors.get(depends_key) 

662 if existing_processor is None: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was never true

663 # We currently require the processor to be declared already. If this ever changes, 

664 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

665 dplugin_name, dprocessor_name = depends_key 

666 available_processors = ", ".join( 

667 n for p, n in package_processors.keys() if p == dplugin_name 

668 ) 

669 raise ValueError( 

670 f"The plugin {dplugin_name} does not provide a processor called" 

671 f" {dprocessor_name}. Available processors for that plugin are:" 

672 f" {available_processors}" 

673 ) 

674 dependencies.add(depends_key) 

675 

676 package_processors[processor_key] = PluginProvidedPackageProcessor( 

677 processor_id, 

678 resolve_package_type_selectors(package_type), 

679 wrap_plugin_code(self._plugin_name, processor), 

680 frozenset(dependencies), 

681 self._plugin_metadata, 

682 ) 

683 

684 def _unload() -> None: 

685 del package_processors[processor_key] 

686 

687 self._unloaders.append(_unload) 

688 

689 def automatic_discard_rule( 

690 self, 

691 name: str, 

692 should_discard: Callable[[VirtualPath], bool], 

693 *, 

694 rule_reference_documentation: str | None = None, 

695 examples: ( 

696 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample] 

697 ) = tuple(), 

698 ) -> None: 

699 """Register an automatic discard rule 

700 

701 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

702 If any discard rule concludes that a path should not be installed, then the path is not installed. 

703 In the case where the discard path is a: 

704 

705 * directory: Then the entire directory is excluded along with anything beneath it. 

706 * symlink: Then the symlink itself (but not its target) is excluded. 

707 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

708 

709 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

710 

711 Automatic discard rules should be written with the assumption that directories will be tested 

712 before their content *when it is relevant* for the discard rule to examine whether the directory 

713 can be excluded. 

714 

715 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

716 without any globs. As example: 

717 

718 installations: 

719 - install: 

720 sources: 

721 - usr/lib/libfoo.la # <-- This path is always installed 

722 # (Discard rules are never asked in this case) 

723 # 

724 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

725 # Though, they will not examine `libfoo.la` as it has already been installed 

726 # 

727 # Note: usr/lib itself is never tested in this case (it is assumed to be 

728 # explicitly requested). But any subdir of usr/lib will be examined. 

729 

730 When an automatic discard rule is evaluated, it can see the source path currently being considered 

731 for installation. While it can look at "surrounding" context (like parent directory), it will not 

732 know whether those paths are to be installed or will be installed. 

733 

734 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

735 metacharacters and spaces. 

736 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

737 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

738 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

739 A source path will either be from the root of the source tree or the root of a search directory such as 

740 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

741 evaluated. 

742 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

743 looks up this automatic discard rule. 

744 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

745 generate the examples. 

746 

747 """ 

748 self._restricted_api() 

749 auto_discard_rules = self._feature_set.auto_discard_rules 

750 existing = auto_discard_rules.get(name) 

751 if existing is not None: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 if existing.plugin_metadata.plugin_name == self._plugin_name: 

753 message = ( 

754 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

755 f' automatic discard rule "{name}" twice.' 

756 ) 

757 else: 

758 message = ( 

759 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

760 f" both tried to provide the automatic discard rule {name}" 

761 ) 

762 raise PluginConflictError( 

763 message, existing.plugin_metadata, self._plugin_metadata 

764 ) 

765 examples = ( 

766 (examples,) 

767 if isinstance(examples, AutomaticDiscardRuleExample) 

768 else tuple(examples) 

769 ) 

770 auto_discard_rules[name] = PluginProvidedDiscardRule( 

771 name, 

772 self._plugin_metadata, 

773 should_discard, 

774 rule_reference_documentation, 

775 examples, 

776 ) 

777 

778 def _unload() -> None: 

779 del auto_discard_rules[name] 

780 

781 self._unloaders.append(_unload) 

782 

783 def service_provider( 

784 self, 

785 service_manager: str, 

786 detector: ServiceDetector, 

787 integrator: ServiceIntegrator, 

788 ) -> None: 

789 self._restricted_api() 

790 service_managers = self._feature_set.service_managers 

791 existing = service_managers.get(service_manager) 

792 if existing is not None: 792 ↛ 793line 792 didn't jump to line 793 because the condition on line 792 was never true

793 if existing.plugin_metadata.plugin_name == self._plugin_name: 

794 message = ( 

795 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

796 f' service manager "{service_manager}" twice.' 

797 ) 

798 else: 

799 message = ( 

800 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

801 f' both tried to provide the service manager "{service_manager}"' 

802 ) 

803 raise PluginConflictError( 

804 message, existing.plugin_metadata, self._plugin_metadata 

805 ) 

806 service_managers[service_manager] = ServiceManagerDetails( 

807 service_manager, 

808 wrap_plugin_code(self._plugin_name, detector), 

809 wrap_plugin_code(self._plugin_name, integrator), 

810 self._plugin_metadata, 

811 ) 

812 

813 def _unload() -> None: 

814 del service_managers[service_manager] 

815 

816 self._unloaders.append(_unload) 

817 

818 def manifest_variable( 

819 self, 

820 variable_name: str, 

821 value: str, 

822 *, 

823 variable_reference_documentation: str | None = None, 

824 ) -> None: 

825 self._check_variable_name(variable_name) 

826 manifest_variables = self._feature_set.manifest_variables 

827 try: 

828 resolved_value = self._substitution.substitute( 

829 value, "Plugin initialization" 

830 ) 

831 depends_on_variable = resolved_value != value 

832 except DebputySubstitutionError: 

833 depends_on_variable = True 

834 if depends_on_variable: 

835 raise ValueError( 

836 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

837 f" This value depends on another variable, which is not supported. This restriction may be" 

838 f" lifted in the future." 

839 ) 

840 

841 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

842 self._plugin_metadata, 

843 variable_name, 

844 value, 

845 is_context_specific_variable=False, 

846 variable_reference_documentation=variable_reference_documentation, 

847 ) 

848 

849 def _unload() -> None: 

850 # We need to check it was never resolved 

851 raise PluginInitializationError( 

852 "Cannot unload manifest_variable (not implemented)" 

853 ) 

854 

855 self._unloaders.append(_unload) 

856 

857 @property 

858 def _plugin_name(self) -> str: 

859 return self._plugin_metadata.plugin_name 

860 

861 def provide_manifest_keyword( 

862 self, 

863 rule_type: TTP, 

864 rule_name: str | list[str], 

865 handler: DIPKWHandler, 

866 *, 

867 inline_reference_documentation: ParserDocumentation | None = None, 

868 ) -> None: 

869 self._restricted_api() 

870 parser_generator = self._feature_set.manifest_parser_generator 

871 if rule_type not in parser_generator.dispatchable_table_parsers: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 types = ", ".join( 

873 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

874 ) 

875 raise ValueError( 

876 f"The rule_type was not a supported type. It must be one of {types}" 

877 ) 

878 

879 inline_reference_documentation = self._pluggable_manifest_docs_for( 

880 rule_type, 

881 rule_name, 

882 inline_reference_documentation=inline_reference_documentation, 

883 ) 

884 

885 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

886 dispatching_parser.register_keyword( 

887 rule_name, 

888 wrap_plugin_code(self._plugin_name, handler), 

889 self._plugin_metadata, 

890 inline_reference_documentation=inline_reference_documentation, 

891 ) 

892 

893 def _unload() -> None: 

894 raise PluginInitializationError( 

895 "Cannot unload provide_manifest_keyword (not implemented)" 

896 ) 

897 

898 self._unloaders.append(_unload) 

899 

900 def pluggable_object_parser( 

901 self, 

902 rule_type: str, 

903 rule_name: str, 

904 *, 

905 object_parser_key: str | None = None, 

906 on_end_parse_step: None | ( 

907 Callable[ 

908 [str, Mapping[str, Any] | None, AttributePath, ParserContextData], 

909 None, 

910 ] 

911 ) = None, 

912 nested_in_package_context: bool = False, 

913 ) -> None: 

914 self._restricted_api() 

915 if object_parser_key is None: 915 ↛ 916line 915 didn't jump to line 916 because the condition on line 915 was never true

916 object_parser_key = rule_name 

917 

918 parser_generator = self._feature_set.manifest_parser_generator 

919 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

920 if rule_type not in dispatchable_object_parsers: 920 ↛ 921line 920 didn't jump to line 921 because the condition on line 920 was never true

921 types = ", ".join(sorted(dispatchable_object_parsers)) 

922 raise ValueError( 

923 f"The rule_type was not a supported type. It must be one of {types}" 

924 ) 

925 if object_parser_key not in dispatchable_object_parsers: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 types = ", ".join(sorted(dispatchable_object_parsers)) 

927 raise ValueError( 

928 f"The object_parser_key was not a supported type. It must be one of {types}" 

929 ) 

930 parent_dispatcher = dispatchable_object_parsers[rule_type] 

931 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

932 

933 if on_end_parse_step is not None: 933 ↛ 936line 933 didn't jump to line 936 because the condition on line 933 was always true

934 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step) 

935 

936 parent_dispatcher.register_child_parser( 

937 rule_name, 

938 child_dispatcher, 

939 self._plugin_metadata, 

940 on_end_parse_step=on_end_parse_step, 

941 nested_in_package_context=nested_in_package_context, 

942 ) 

943 

944 def _unload() -> None: 

945 raise PluginInitializationError( 

946 "Cannot unload pluggable_object_parser (not implemented)" 

947 ) 

948 

949 self._unloaders.append(_unload) 

950 

951 def pluggable_manifest_rule( 

952 self, 

953 rule_type: TTP | str, 

954 rule_name: str | Sequence[str], 

955 parsed_format: type[PF], 

956 handler: DIPHandler, 

957 *, 

958 source_format: SF | None = None, 

959 inline_reference_documentation: ParserDocumentation | None = None, 

960 expected_debputy_integration_mode: None | ( 

961 Container[DebputyIntegrationMode] 

962 ) = None, 

963 apply_standard_attribute_documentation: bool = False, 

964 ) -> None: 

965 # When changing this, consider which types will be unrestricted 

966 self._restricted_api() 

967 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 967 ↛ 968line 967 didn't jump to line 968 because the condition on line 967 was never true

968 _error( 

969 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to" 

970 f" its use of apply_standard_attribute_documentation" 

971 ) 

972 feature_set = self._feature_set 

973 parser_generator = feature_set.manifest_parser_generator 

974 if isinstance(rule_type, str): 

975 if rule_type not in parser_generator.dispatchable_object_parsers: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true

976 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

977 raise ValueError( 

978 f"The rule_type was not a supported type. It must be one of {types}" 

979 ) 

980 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

981 else: 

982 if rule_type not in parser_generator.dispatchable_table_parsers: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true

983 types = ", ".join( 

984 sorted( 

985 x.__name__ for x in parser_generator.dispatchable_table_parsers 

986 ) 

987 ) 

988 raise ValueError( 

989 f"The rule_type was not a supported type. It must be one of {types}" 

990 ) 

991 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

992 

993 inline_reference_documentation = self._pluggable_manifest_docs_for( 

994 rule_type, 

995 rule_name, 

996 inline_reference_documentation=inline_reference_documentation, 

997 ) 

998 

999 if apply_standard_attribute_documentation: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true

1000 docs = _STD_ATTR_DOCS 

1001 else: 

1002 docs = None 

1003 

1004 parser = feature_set.manifest_parser_generator.generate_parser( 

1005 parsed_format, 

1006 source_content=source_format, 

1007 inline_reference_documentation=inline_reference_documentation, 

1008 expected_debputy_integration_mode=expected_debputy_integration_mode, 

1009 automatic_docs=docs, 

1010 ) 

1011 dispatching_parser.register_parser( 

1012 rule_name, 

1013 parser, 

1014 wrap_plugin_code(self._plugin_name, handler), 

1015 self._plugin_metadata, 

1016 ) 

1017 

1018 def _unload() -> None: 

1019 raise PluginInitializationError( 

1020 "Cannot unload pluggable_manifest_rule (not implemented)" 

1021 ) 

1022 

1023 self._unloaders.append(_unload) 

1024 

1025 def register_build_system( 

1026 self, 

1027 build_system_definition: type[BSPF], 

1028 ) -> None: 

1029 self._restricted_api() 

1030 if not is_typeddict(build_system_definition): 1030 ↛ 1031line 1030 didn't jump to line 1031 because the condition on line 1030 was never true

1031 raise PluginInitializationError( 

1032 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__}," 

1033 f" but got {build_system_definition.__name__} instead" 

1034 ) 

1035 metadata = getattr( 

1036 build_system_definition, 

1037 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

1038 None, 

1039 ) 

1040 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1040 ↛ 1041line 1040 didn't jump to line 1041 because the condition on line 1040 was never true

1041 raise PluginIncorrectRegistrationError( 

1042 f"The {build_system_definition.__qualname__} type should have been annotated with" 

1043 f" @{debputy_build_system.__name__}." 

1044 ) 

1045 assert len(metadata.manifest_keywords) == 1 

1046 build_system_impl = metadata.build_system_impl 

1047 assert build_system_impl is not None 

1048 manifest_keyword = next(iter(metadata.manifest_keywords)) 

1049 self.pluggable_manifest_rule( 

1050 metadata.dispatched_type, 

1051 metadata.manifest_keywords, 

1052 build_system_definition, 

1053 # pluggable_manifest_rule does the wrapping 

1054 metadata.unwrapped_constructor, 

1055 source_format=metadata.source_format, 

1056 inline_reference_documentation=metadata.online_reference_documentation, 

1057 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL), 

1058 ) 

1059 self._auto_detectable_build_system( 

1060 manifest_keyword, 

1061 build_system_impl, 

1062 constructor=wrap_plugin_code( 

1063 self._plugin_name, 

1064 build_system_impl, 

1065 ), 

1066 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems, 

1067 ) 

1068 

1069 def _auto_detectable_build_system( 

1070 self, 

1071 manifest_keyword: str, 

1072 rule_type: type[BSR], 

1073 *, 

1074 shadowing_build_systems_when_active: frozenset[str] = frozenset(), 

1075 constructor: None | ( 

1076 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR] 

1077 ) = None, 

1078 ) -> None: 

1079 self._restricted_api() 

1080 feature_set = self._feature_set 

1081 existing = feature_set.auto_detectable_build_systems.get(rule_type) 

1082 if existing is not None: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 bs_name = rule_type.__class__.__name__ 

1084 if existing.plugin_metadata.plugin_name == self._plugin_name: 

1085 message = ( 

1086 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1087 f' auto-detection of the build system "{bs_name}" twice.' 

1088 ) 

1089 else: 

1090 message = ( 

1091 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

1092 f' both tried to provide auto-detection of the build system "{bs_name}"' 

1093 ) 

1094 raise PluginConflictError( 

1095 message, existing.plugin_metadata, self._plugin_metadata 

1096 ) 

1097 

1098 if constructor is None: 1098 ↛ 1100line 1098 didn't jump to line 1100 because the condition on line 1098 was never true

1099 

1100 def impl( 

1101 attributes: BuildRuleParsedFormat, 

1102 attribute_path: AttributePath, 

1103 manifest: "HighLevelManifest", 

1104 ) -> BSR: 

1105 return rule_type(attributes, attribute_path, manifest) 

1106 

1107 else: 

1108 impl = constructor 

1109 

1110 feature_set.auto_detectable_build_systems[rule_type] = ( 

1111 PluginProvidedBuildSystemAutoDetection( 

1112 manifest_keyword, 

1113 rule_type, 

1114 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system), 

1115 impl, 

1116 shadowing_build_systems_when_active, 

1117 self._plugin_metadata, 

1118 ) 

1119 ) 

1120 

1121 def _unload() -> None: 

1122 try: 

1123 del feature_set.auto_detectable_build_systems[rule_type] 

1124 except KeyError: 

1125 pass 

1126 

1127 self._unloaders.append(_unload) 

1128 

1129 def known_packaging_files( 

1130 self, 

1131 packaging_file_details: KnownPackagingFileInfo, 

1132 ) -> None: 

1133 known_packaging_files = self._feature_set.known_packaging_files 

1134 detection_method = packaging_file_details.get( 

1135 "detection_method", cast("Literal['path']", "path") 

1136 ) 

1137 path = packaging_file_details.get("path") 

1138 dhpkgfile = packaging_file_details.get("pkgfile") 

1139 

1140 packaging_file_details = packaging_file_details.copy() 

1141 

1142 if detection_method == "path": 1142 ↛ 1158line 1142 didn't jump to line 1158 because the condition on line 1142 was always true

1143 if dhpkgfile is not None: 1143 ↛ 1144line 1143 didn't jump to line 1144 because the condition on line 1143 was never true

1144 raise ValueError( 

1145 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

1146 ) 

1147 if path is None: 1147 ↛ 1148line 1147 didn't jump to line 1148 because the condition on line 1147 was never true

1148 raise ValueError( 

1149 'The "path" attribute must be present when detection-method is "path" (or omitted)' 

1150 ) 

1151 if path != _normalize_path(path, with_prefix=False): 1151 ↛ 1152line 1151 didn't jump to line 1152 because the condition on line 1151 was never true

1152 raise ValueError( 

1153 f"The path for known packaging files must be normalized. Please replace" 

1154 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

1155 ) 

1156 detection_value = path 

1157 else: 

1158 assert detection_method == "dh.pkgfile" 

1159 if path is not None: 

1160 raise ValueError( 

1161 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

1162 ) 

1163 if dhpkgfile is None: 

1164 raise ValueError( 

1165 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"' 

1166 ) 

1167 if "/" in dhpkgfile: 

1168 raise ValueError( 

1169 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

1170 ) 

1171 detection_value = dhpkgfile 

1172 key = f"{detection_method}::{detection_value}" 

1173 existing = known_packaging_files.get(key) 

1174 if existing is not None: 1174 ↛ 1175line 1174 didn't jump to line 1175 because the condition on line 1174 was never true

1175 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1176 message = ( 

1177 f'The key "{key}" is registered twice for known packaging files.' 

1178 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1179 ) 

1180 else: 

1181 message = ( 

1182 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1183 f' key "{key}" twice for known packaging files.' 

1184 ) 

1185 raise PluginConflictError( 

1186 message, existing.plugin_metadata, self._plugin_metadata 

1187 ) 

1188 _validate_known_packaging_file_dh_compat_rules( 

1189 packaging_file_details.get("dh_compat_rules") 

1190 ) 

1191 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

1192 packaging_file_details, 

1193 detection_method, 

1194 detection_value, 

1195 self._plugin_metadata, 

1196 ) 

1197 

1198 def _unload() -> None: 

1199 del known_packaging_files[key] 

1200 

1201 self._unloaders.append(_unload) 

1202 

1203 def register_mapped_type( 

1204 self, 

1205 type_mapping: TypeMapping, 

1206 *, 

1207 reference_documentation: TypeMappingDocumentation | None = None, 

1208 ) -> None: 

1209 self._restricted_api() 

1210 target_type = type_mapping.target_type 

1211 mapped_types = self._feature_set.mapped_types 

1212 existing = mapped_types.get(target_type) 

1213 if existing is not None: 1213 ↛ 1214line 1213 didn't jump to line 1214 because the condition on line 1213 was never true

1214 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1215 message = ( 

1216 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

1217 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1218 ) 

1219 else: 

1220 message = ( 

1221 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1222 f' key "{target_type.__name__}" twice for known packaging files.' 

1223 ) 

1224 raise PluginConflictError( 

1225 message, existing.plugin_metadata, self._plugin_metadata 

1226 ) 

1227 parser_generator = self._feature_set.manifest_parser_generator 

1228 # TODO: Wrap the mapper in the plugin context 

1229 mapped_types[target_type] = PluginProvidedTypeMapping( 

1230 type_mapping, reference_documentation, self._plugin_metadata 

1231 ) 

1232 parser_generator.register_mapped_type(type_mapping) 

1233 

1234 def _restricted_api( 

1235 self, 

1236 *, 

1237 allowed_plugins: set[str] | frozenset[str] = frozenset(), 

1238 ) -> None: 

1239 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1239 ↛ 1240line 1239 didn't jump to line 1240 because the condition on line 1239 was never true

1240 raise PluginAPIViolationError( 

1241 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

1242 " If you are the maintainer of this plugin and want access to this" 

1243 " API, please file a feature request to make this public." 

1244 " (The API is currently private as it is unstable.)" 

1245 ) 

1246 

1247 

1248class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

1249 __slots__ = () 

1250 

1251 def _append_script( 

1252 self, 

1253 caller_name: str, 

1254 maintscript: Maintscript, 

1255 full_script: str, 

1256 /, 

1257 perform_substitution: bool = True, 

1258 ) -> None: 

1259 raise NotImplementedError 

1260 

1261 @classmethod 

1262 def _apply_condition_to_script( 

1263 cls, 

1264 condition: str, 

1265 run_snippet: str, 

1266 /, 

1267 indent: bool | None = None, 

1268 ) -> str: 

1269 if indent is None: 

1270 # We auto-determine this based on heredocs currently 

1271 indent = "<<" not in run_snippet 

1272 

1273 if indent: 

1274 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1275 if not run_snippet.endswith("\n"): 

1276 run_snippet += "\n" 

1277 condition_line = f"if {condition}; then\n" 

1278 end_line = "fi\n" 

1279 return "".join((condition_line, run_snippet, end_line)) 

1280 

1281 def on_configure( 

1282 self, 

1283 run_snippet: str, 

1284 /, 

1285 indent: bool | None = None, 

1286 perform_substitution: bool = True, 

1287 skip_on_rollback: bool = False, 

1288 ) -> None: 

1289 condition = POSTINST_DEFAULT_CONDITION 

1290 if skip_on_rollback: 1290 ↛ 1291line 1290 didn't jump to line 1291 because the condition on line 1290 was never true

1291 condition = '[ "$1" = "configure" ]' 

1292 return self._append_script( 

1293 "on_configure", 

1294 "postinst", 

1295 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1296 perform_substitution=perform_substitution, 

1297 ) 

1298 

1299 def on_initial_install( 

1300 self, 

1301 run_snippet: str, 

1302 /, 

1303 indent: bool | None = None, 

1304 perform_substitution: bool = True, 

1305 ) -> None: 

1306 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1307 return self._append_script( 

1308 "on_initial_install", 

1309 "postinst", 

1310 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1311 perform_substitution=perform_substitution, 

1312 ) 

1313 

1314 def on_upgrade( 

1315 self, 

1316 run_snippet: str, 

1317 /, 

1318 indent: bool | None = None, 

1319 perform_substitution: bool = True, 

1320 ) -> None: 

1321 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1322 return self._append_script( 

1323 "on_upgrade", 

1324 "postinst", 

1325 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1326 perform_substitution=perform_substitution, 

1327 ) 

1328 

1329 def on_upgrade_from( 

1330 self, 

1331 version: str, 

1332 run_snippet: str, 

1333 /, 

1334 indent: bool | None = None, 

1335 perform_substitution: bool = True, 

1336 ) -> None: 

1337 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1338 return self._append_script( 

1339 "on_upgrade_from", 

1340 "postinst", 

1341 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1342 perform_substitution=perform_substitution, 

1343 ) 

1344 

1345 def on_before_removal( 

1346 self, 

1347 run_snippet: str, 

1348 /, 

1349 indent: bool | None = None, 

1350 perform_substitution: bool = True, 

1351 ) -> None: 

1352 condition = '[ "$1" = "remove" ]' 

1353 return self._append_script( 

1354 "on_before_removal", 

1355 "prerm", 

1356 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1357 perform_substitution=perform_substitution, 

1358 ) 

1359 

1360 def on_removed( 

1361 self, 

1362 run_snippet: str, 

1363 /, 

1364 indent: bool | None = None, 

1365 perform_substitution: bool = True, 

1366 ) -> None: 

1367 condition = '[ "$1" = "remove" ]' 

1368 return self._append_script( 

1369 "on_removed", 

1370 "postrm", 

1371 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1372 perform_substitution=perform_substitution, 

1373 ) 

1374 

1375 def on_purge( 

1376 self, 

1377 run_snippet: str, 

1378 /, 

1379 indent: bool | None = None, 

1380 perform_substitution: bool = True, 

1381 ) -> None: 

1382 condition = '[ "$1" = "purge" ]' 

1383 return self._append_script( 

1384 "on_purge", 

1385 "postrm", 

1386 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1387 perform_substitution=perform_substitution, 

1388 ) 

1389 

1390 def unconditionally_in_script( 

1391 self, 

1392 maintscript: Maintscript, 

1393 run_snippet: str, 

1394 /, 

1395 perform_substitution: bool = True, 

1396 ) -> None: 

1397 if maintscript not in STD_CONTROL_SCRIPTS: 1397 ↛ 1398line 1397 didn't jump to line 1398 because the condition on line 1397 was never true

1398 raise ValueError( 

1399 f'Unknown script "{maintscript}". Should have been one of:' 

1400 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1401 ) 

1402 return self._append_script( 

1403 "unconditionally_in_script", 

1404 maintscript, 

1405 run_snippet, 

1406 perform_substitution=perform_substitution, 

1407 ) 

1408 

1409 

1410class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1411 __slots__ = ( 

1412 "_plugin_metadata", 

1413 "_maintscript_snippets", 

1414 "_plugin_source_id", 

1415 "_package_substitution", 

1416 "_default_snippet_order", 

1417 ) 

1418 

1419 def __init__( 

1420 self, 

1421 plugin_metadata: DebputyPluginMetadata, 

1422 plugin_source_id: str, 

1423 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1424 package_substitution: Substitution, 

1425 *, 

1426 default_snippet_order: Literal["service"] | None = None, 

1427 ): 

1428 self._plugin_metadata = plugin_metadata 

1429 self._plugin_source_id = plugin_source_id 

1430 self._maintscript_snippets = maintscript_snippets 

1431 self._package_substitution = package_substitution 

1432 self._default_snippet_order = default_snippet_order 

1433 

1434 def _append_script( 

1435 self, 

1436 caller_name: str, 

1437 maintscript: Maintscript, 

1438 full_script: str, 

1439 /, 

1440 perform_substitution: bool = True, 

1441 ) -> None: 

1442 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1443 if perform_substitution: 

1444 full_script = self._package_substitution.substitute(full_script, def_source) 

1445 

1446 snippet = MaintscriptSnippet( 

1447 snippet=full_script, 

1448 definition_source=def_source, 

1449 snippet_order=self._default_snippet_order, 

1450 ) 

1451 self._maintscript_snippets[maintscript].append(snippet) 

1452 

1453 

1454class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1455 __slots__ = ( 

1456 "_plugin_metadata", 

1457 "_plugin_source_id", 

1458 "_package_metadata_context", 

1459 "_triggers", 

1460 "_substvars", 

1461 "_maintscript", 

1462 "_shlibs_details", 

1463 ) 

1464 

1465 def __init__( 

1466 self, 

1467 plugin_metadata: DebputyPluginMetadata, 

1468 plugin_source_id: str, 

1469 package_metadata_context: PackageProcessingContext, 

1470 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1471 substvars: FlushableSubstvars, 

1472 shlibs_details: tuple[str | None, list[str] | None], 

1473 ) -> None: 

1474 self._plugin_metadata = plugin_metadata 

1475 self._plugin_source_id = plugin_source_id 

1476 self._package_metadata_context = package_metadata_context 

1477 self._triggers = triggers 

1478 self._substvars = substvars 

1479 self._maintscript: MaintscriptAccessor | None = None 

1480 self._shlibs_details = shlibs_details 

1481 

1482 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1483 raise NotImplementedError 

1484 

1485 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1486 """Register a declarative dpkg level trigger 

1487 

1488 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1489 

1490 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1491 """ 

1492 key = (trigger_type, trigger_target) 

1493 if key in self._triggers: 1493 ↛ 1494line 1493 didn't jump to line 1494 because the condition on line 1493 was never true

1494 return 

1495 self._triggers[key] = PluginProvidedTrigger( 

1496 dpkg_trigger_type=trigger_type, 

1497 dpkg_trigger_target=trigger_target, 

1498 provider=self._plugin_metadata, 

1499 provider_source_id=self._plugin_source_id, 

1500 ) 

1501 

1502 @property 

1503 def maintscript(self) -> MaintscriptAccessor: 

1504 maintscript = self._maintscript 

1505 if maintscript is None: 

1506 maintscript = self._create_maintscript_accessor() 

1507 self._maintscript = maintscript 

1508 return maintscript 

1509 

1510 @property 

1511 def substvars(self) -> FlushableSubstvars: 

1512 return self._substvars 

1513 

1514 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1515 binary_package = self._package_metadata_context.binary_package 

1516 with self.substvars.flush() as substvars_file: 

1517 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1518 if binary_package.is_udeb: 

1519 dpkg_cmd.append("-tudeb") 

1520 if binary_package.is_essential: 1520 ↛ 1521line 1520 didn't jump to line 1521 because the condition on line 1520 was never true

1521 dpkg_cmd.append("-dPre-Depends") 

1522 shlibs_local, shlib_dirs = self._shlibs_details 

1523 if shlibs_local is not None: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true

1524 dpkg_cmd.append(f"-L{shlibs_local}") 

1525 if shlib_dirs: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true

1526 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1527 dpkg_cmd.extend(p.fs_path for p in paths) 

1528 print_command(*dpkg_cmd) 

1529 try: 

1530 subprocess.check_call(dpkg_cmd) 

1531 except subprocess.CalledProcessError: 

1532 _error( 

1533 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1534 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1535 ) 

1536 

1537 

1538class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1539 __slots__ = ( 

1540 "_maintscript", 

1541 "_maintscript_snippets", 

1542 "_package_substitution", 

1543 ) 

1544 

1545 def __init__( 

1546 self, 

1547 plugin_metadata: DebputyPluginMetadata, 

1548 plugin_source_id: str, 

1549 package_metadata_context: PackageProcessingContext, 

1550 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1551 substvars: FlushableSubstvars, 

1552 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1553 package_substitution: Substitution, 

1554 shlibs_details: tuple[str | None, list[str] | None], 

1555 *, 

1556 default_snippet_order: Literal["service"] | None = None, 

1557 ) -> None: 

1558 super().__init__( 

1559 plugin_metadata, 

1560 plugin_source_id, 

1561 package_metadata_context, 

1562 triggers, 

1563 substvars, 

1564 shlibs_details, 

1565 ) 

1566 self._maintscript_snippets = maintscript_snippets 

1567 self._package_substitution = package_substitution 

1568 self._maintscript = MaintscriptAccessorProvider( 

1569 plugin_metadata, 

1570 plugin_source_id, 

1571 maintscript_snippets, 

1572 package_substitution, 

1573 default_snippet_order=default_snippet_order, 

1574 ) 

1575 

1576 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1577 return MaintscriptAccessorProvider( 

1578 self._plugin_metadata, 

1579 self._plugin_source_id, 

1580 self._maintscript_snippets, 

1581 self._package_substitution, 

1582 ) 

1583 

1584 

1585class BinaryCtrlAccessorProviderCreator: 

1586 def __init__( 

1587 self, 

1588 package_metadata_context: PackageProcessingContext, 

1589 substvars: FlushableSubstvars, 

1590 maintscript_snippets: dict[str, MaintscriptSnippetContainer], 

1591 substitution: Substitution, 

1592 ) -> None: 

1593 self._package_metadata_context = package_metadata_context 

1594 self._substvars = substvars 

1595 self._maintscript_snippets = maintscript_snippets 

1596 self._substitution = substitution 

1597 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1598 self.shlibs_details: tuple[str | None, list[str] | None] = None, None 

1599 

1600 def for_plugin( 

1601 self, 

1602 plugin_metadata: DebputyPluginMetadata, 

1603 plugin_source_id: str, 

1604 *, 

1605 default_snippet_order: Literal["service"] | None = None, 

1606 ) -> BinaryCtrlAccessor: 

1607 return BinaryCtrlAccessorProvider( 

1608 plugin_metadata, 

1609 plugin_source_id, 

1610 self._package_metadata_context, 

1611 self._triggers, 

1612 self._substvars, 

1613 self._maintscript_snippets, 

1614 self._substitution, 

1615 self.shlibs_details, 

1616 default_snippet_order=default_snippet_order, 

1617 ) 

1618 

1619 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1620 return self._triggers.values() 

1621 

1622 

1623def _resolve_bundled_plugin_docs_path( 

1624 plugin_name: str, 

1625 loader: PluginInitializationEntryPoint | None, 

1626) -> Traversable | Path | None: 

1627 plugin_module = getattr(loader, "__module__") 

1628 assert plugin_module is not None 

1629 plugin_package_name = sys.modules[plugin_module].__package__ 

1630 return importlib.resources.files(plugin_package_name).joinpath( 

1631 f"{plugin_name}_docs.yaml" 

1632 ) 

1633 

1634 

1635def plugin_metadata_for_debputys_own_plugin( 

1636 loader: PluginInitializationEntryPoint | None = None, 

1637) -> DebputyPluginMetadata: 

1638 if loader is None: 

1639 from debputy.plugins.debputy.debputy_plugin import ( 

1640 initialize_debputy_features, 

1641 ) 

1642 

1643 loader = initialize_debputy_features 

1644 plugin_name = "debputy" 

1645 return DebputyPluginMetadata( 

1646 plugin_name="debputy", 

1647 api_compat_version=1, 

1648 plugin_initializer=loader, 

1649 plugin_loader=None, 

1650 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path( 

1651 plugin_name, 

1652 loader, 

1653 ), 

1654 plugin_path="<bundled>", 

1655 ) 

1656 

1657 

1658def load_plugin_features( 

1659 plugin_search_dirs: Sequence[str], 

1660 substitution: Substitution, 

1661 requested_plugins_only: Sequence[str] | None = None, 

1662 required_plugins: set[str] | None = None, 

1663 plugin_feature_set: PluginProvidedFeatureSet | None = None, 

1664 debug_mode: bool = False, 

1665) -> PluginProvidedFeatureSet: 

1666 if plugin_feature_set is None: 

1667 plugin_feature_set = PluginProvidedFeatureSet() 

1668 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1669 unloadable_plugins = set() 

1670 if required_plugins: 

1671 plugins.extend( 

1672 find_json_plugins( 

1673 plugin_search_dirs, 

1674 required_plugins, 

1675 ) 

1676 ) 

1677 if requested_plugins_only is not None: 

1678 plugins.extend( 

1679 find_json_plugins( 

1680 plugin_search_dirs, 

1681 requested_plugins_only, 

1682 ) 

1683 ) 

1684 else: 

1685 auto_loaded = _find_all_json_plugins( 

1686 plugin_search_dirs, 

1687 required_plugins if required_plugins is not None else frozenset(), 

1688 debug_mode=debug_mode, 

1689 ) 

1690 for plugin_metadata in auto_loaded: 

1691 plugins.append(plugin_metadata) 

1692 unloadable_plugins.add(plugin_metadata.plugin_name) 

1693 

1694 for plugin_metadata in plugins: 

1695 api = DebputyPluginInitializerProvider( 

1696 plugin_metadata, plugin_feature_set, substitution 

1697 ) 

1698 try: 

1699 api.load_plugin() 

1700 except PluginBaseError as e: 

1701 if plugin_metadata.plugin_name not in unloadable_plugins: 

1702 raise 

1703 if debug_mode: 

1704 _warn( 

1705 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1706 f" to --debug/-d or DEBPUTY_DEBUG=1" 

1707 ) 

1708 raise 

1709 try: 

1710 api.unload_plugin() 

1711 except Exception: 

1712 _warn( 

1713 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1714 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1715 " module might have tainted the feature set." 

1716 ) 

1717 raise e from None 

1718 else: 

1719 _warn( 

1720 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1721 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace" 

1722 f" (the warning will become an error)" 

1723 ) 

1724 

1725 return plugin_feature_set 

1726 

1727 

1728def find_json_plugin( 

1729 search_dirs: Sequence[str], 

1730 requested_plugin: str, 

1731) -> DebputyPluginMetadata: 

1732 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1733 assert len(r) == 1 

1734 return r[0] 

1735 

1736 

1737def find_related_implementation_files_for_plugin( 

1738 plugin_metadata: DebputyPluginMetadata, 

1739) -> list[str]: 

1740 if plugin_metadata.is_bundled: 

1741 plugin_name = plugin_metadata.plugin_name 

1742 _error( 

1743 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1744 " or loaded via a mechanism that does not support detecting its tests." 

1745 ) 

1746 

1747 if plugin_metadata.is_from_python_path: 

1748 plugin_name = plugin_metadata.plugin_name 

1749 # Maybe they could be, but that is for another day. 

1750 _error( 

1751 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1752 " and these are not supported." 

1753 ) 

1754 files = [] 

1755 module_name, module_file = _find_plugin_implementation_file( 

1756 plugin_metadata.plugin_name, 

1757 plugin_metadata.plugin_path, 

1758 ) 

1759 if os.path.isfile(module_file): 

1760 files.append(module_file) 

1761 else: 

1762 if not plugin_metadata.is_loaded: 

1763 plugin_metadata.load_plugin() 

1764 if module_name in sys.modules: 

1765 _error( 

1766 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1767 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1768 f" installed via this method. The related Python would not be installed" 

1769 f" (which would result in a plugin that would fail to load)" 

1770 ) 

1771 

1772 return files 

1773 

1774 

1775def find_tests_for_plugin( 

1776 plugin_metadata: DebputyPluginMetadata, 

1777) -> list[str]: 

1778 plugin_name = plugin_metadata.plugin_name 

1779 plugin_path = plugin_metadata.plugin_path 

1780 

1781 if plugin_metadata.is_bundled: 

1782 _error( 

1783 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1784 " mechanism that does not support detecting its tests." 

1785 ) 

1786 

1787 if plugin_metadata.is_from_python_path: 

1788 plugin_name = plugin_metadata.plugin_name 

1789 # Maybe they could be, but that is for another day. 

1790 _error( 

1791 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1792 " and these are not supported." 

1793 ) 

1794 

1795 plugin_dir = os.path.dirname(plugin_path) 

1796 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1797 tests = [] 

1798 with os.scandir(plugin_dir) as dir_iter: 

1799 for p in dir_iter: 

1800 if ( 

1801 p.is_file() 

1802 and p.name.startswith(test_basename_prefix) 

1803 and PLUGIN_TEST_SUFFIX.search(p.name) 

1804 ): 

1805 tests.append(p.path) 

1806 return tests 

1807 

1808 

1809def find_json_plugins( 

1810 search_dirs: Sequence[str], 

1811 requested_plugins: Iterable[str], 

1812) -> Iterable[DebputyPluginMetadata]: 

1813 for plugin_name_or_path in requested_plugins: 1813 ↛ exitline 1813 didn't return from function 'find_json_plugins' because the loop on line 1813 didn't complete

1814 if "/" in plugin_name_or_path: 1814 ↛ 1815line 1814 didn't jump to line 1815 because the condition on line 1814 was never true

1815 if not os.path.isfile(plugin_name_or_path): 

1816 raise PluginNotFoundError( 

1817 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1818 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1819 " is not used." 

1820 ) 

1821 yield parse_json_plugin_desc(plugin_name_or_path) 

1822 return 

1823 for search_dir in search_dirs: 1823 ↛ 1832line 1823 didn't jump to line 1832 because the loop on line 1823 didn't complete

1824 path = os.path.join( 

1825 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1826 ) 

1827 if not os.path.isfile(path): 1827 ↛ 1828line 1827 didn't jump to line 1828 because the condition on line 1827 was never true

1828 continue 

1829 yield parse_json_plugin_desc(path) 

1830 return 

1831 

1832 path_root = PLUGIN_PYTHON_RES_PATH 

1833 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json") 

1834 if pp_path or pp_path.is_file(): 

1835 with pp_path.open() as fd: 

1836 yield parse_json_plugin_desc( 

1837 f"PYTHONPATH:debputy/plugins/{pp_path.name}", 

1838 fd=fd, 

1839 is_from_python_path=True, 

1840 ) 

1841 return 

1842 

1843 search_dir_str = ":".join(search_dirs) 

1844 raise PluginNotFoundError( 

1845 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1846 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1847 ) 

1848 

1849 

1850def _find_all_json_plugins( 

1851 search_dirs: Sequence[str], 

1852 required_plugins: AbstractSet[str], 

1853 debug_mode: bool = False, 

1854) -> Iterable[DebputyPluginMetadata]: 

1855 seen = set(required_plugins) 

1856 error_seen = False 

1857 for search_dir in search_dirs: 

1858 try: 

1859 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1860 except FileNotFoundError: 

1861 continue 

1862 with dir_fd: 

1863 for entry in dir_fd: 

1864 if ( 

1865 not entry.is_file(follow_symlinks=True) 

1866 or not entry.name.endswith(".json") 

1867 or entry.name in seen 

1868 ): 

1869 continue 

1870 seen.add(entry.name) 

1871 try: 

1872 plugin_metadata = parse_json_plugin_desc(entry.path) 

1873 except PluginBaseError as e: 

1874 if debug_mode: 

1875 raise 

1876 if not error_seen: 

1877 error_seen = True 

1878 _warn( 

1879 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1880 ) 

1881 else: 

1882 _warn( 

1883 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1884 ) 

1885 else: 

1886 yield plugin_metadata 

1887 

1888 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir(): 

1889 if ( 

1890 not pp_entry.name.endswith(".json") 

1891 or not pp_entry.is_file() 

1892 or pp_entry.name in seen 

1893 ): 

1894 continue 

1895 seen.add(pp_entry.name) 

1896 with pp_entry.open() as fd: 

1897 yield parse_json_plugin_desc( 

1898 f"PYTHONPATH:debputy/plugins/{pp_entry.name}", 

1899 fd=fd, 

1900 is_from_python_path=True, 

1901 ) 

1902 

1903 

1904def _find_plugin_implementation_file( 

1905 plugin_name: str, 

1906 json_file_path: str, 

1907) -> tuple[str, str]: 

1908 guessed_module_basename = plugin_name.replace("-", "_") 

1909 module_name = f"debputy.plugins.{guessed_module_basename}" 

1910 module_fs_path = os.path.join( 

1911 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1912 ) 

1913 return module_name, module_fs_path 

1914 

1915 

1916def _resolve_module_initializer( 

1917 plugin_name: str, 

1918 plugin_initializer_name: str, 

1919 module_name: str | None, 

1920 json_file_path: str, 

1921) -> PluginInitializationEntryPoint: 

1922 module = None 

1923 module_fs_path = None 

1924 if module_name is None: 1924 ↛ 1952line 1924 didn't jump to line 1952 because the condition on line 1924 was always true

1925 module_name, module_fs_path = _find_plugin_implementation_file( 

1926 plugin_name, json_file_path 

1927 ) 

1928 if os.path.isfile(module_fs_path): 1928 ↛ 1952line 1928 didn't jump to line 1952 because the condition on line 1928 was always true

1929 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1930 if spec is None: 1930 ↛ 1931line 1930 didn't jump to line 1931 because the condition on line 1930 was never true

1931 raise PluginInitializationError( 

1932 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1933 " The spec_from_file_location function returned None." 

1934 ) 

1935 mod = importlib.util.module_from_spec(spec) 

1936 loader = spec.loader 

1937 if loader is None: 1937 ↛ 1938line 1937 didn't jump to line 1938 because the condition on line 1937 was never true

1938 raise PluginInitializationError( 

1939 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1940 " Python could not find a suitable loader (spec.loader was None)" 

1941 ) 

1942 sys.modules[module_name] = mod 

1943 try: 

1944 run_in_context_of_plugin(plugin_name, loader.exec_module, mod) 

1945 except (Exception, GeneratorExit) as e: 

1946 raise PluginInitializationError( 

1947 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1948 " The module threw an exception while being loaded." 

1949 ) from e 

1950 module = mod 

1951 

1952 if module is None: 1952 ↛ 1953line 1952 didn't jump to line 1953 because the condition on line 1952 was never true

1953 try: 

1954 module = run_in_context_of_plugin( 

1955 plugin_name, importlib.import_module, module_name 

1956 ) 

1957 except ModuleNotFoundError as e: 

1958 if module_fs_path is None: 

1959 raise PluginMetadataError( 

1960 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1961 " this module is not available in the python search path" 

1962 ) from e 

1963 raise PluginInitializationError( 

1964 f"Failed to load {plugin_name}. Tried loading it from" 

1965 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

1966 f" {module_name} (where it was not found either). Please ensure" 

1967 " the module code is installed in the correct spot or provide an" 

1968 f' explicit "module" definition in {json_file_path}.' 

1969 ) from e 

1970 

1971 plugin_initializer = run_in_context_of_plugin_wrap_errors( 

1972 plugin_name, 

1973 getattr, 

1974 module, 

1975 plugin_initializer_name, 

1976 None, 

1977 ) 

1978 

1979 if plugin_initializer is None: 1979 ↛ 1980line 1979 didn't jump to line 1980 because the condition on line 1979 was never true

1980 raise PluginMetadataError( 

1981 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

1982 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute' 

1983 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name" 

1984 " in the Python module." 

1985 ) 

1986 if isinstance(plugin_initializer, DebputyPluginDefinition): 

1987 return plugin_initializer.initialize 

1988 if not callable(plugin_initializer): 1988 ↛ 1989line 1988 didn't jump to line 1989 because the condition on line 1988 was never true

1989 raise PluginMetadataError( 

1990 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

1991 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that' 

1992 " attribute exists, it is neither a `DebputyPluginDefinition`" 

1993 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`" 

1994 " (`def initialize(api: DebputyPluginInitializer) -> None:`)." 

1995 ) 

1996 return cast("PluginInitializationEntryPoint", plugin_initializer) 

1997 

1998 

1999def _json_plugin_loader( 

2000 plugin_name: str, 

2001 plugin_json_metadata: PluginJsonMetadata, 

2002 json_file_path: str, 

2003 attribute_path: AttributePath, 

2004) -> Callable[["DebputyPluginInitializer"], None]: 

2005 api_compat = plugin_json_metadata["api_compat_version"] 

2006 module_name = plugin_json_metadata.get("module") 

2007 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

2008 packager_provided_files_raw = plugin_json_metadata.get( 

2009 "packager_provided_files", [] 

2010 ) 

2011 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

2012 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

2013 if api_compat != 1: 2013 ↛ 2014line 2013 didn't jump to line 2014 because the condition on line 2013 was never true

2014 raise PluginMetadataError( 

2015 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

2016 f" version of debputy only supports API compat version of 1" 

2017 ) 

2018 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2018 ↛ 2019line 2018 didn't jump to line 2019 because the condition on line 2018 was never true

2019 p = attribute_path["plugin_initializer"] 

2020 raise PluginMetadataError( 

2021 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

2022 ) 

2023 

2024 plugin_initializers = [] 

2025 

2026 if plugin_initializer_name is not None: 

2027 plugin_initializer = _resolve_module_initializer( 

2028 plugin_name, 

2029 plugin_initializer_name, 

2030 module_name, 

2031 json_file_path, 

2032 ) 

2033 plugin_initializers.append(plugin_initializer) 

2034 

2035 if known_packaging_files_raw: 

2036 kpf_root_path = attribute_path["known_packaging_files"] 

2037 known_packaging_files = [] 

2038 for k, v in enumerate(known_packaging_files_raw): 

2039 kpf_path = kpf_root_path[k] 

2040 p = v.get("path") 

2041 if isinstance(p, str): 2041 ↛ 2043line 2041 didn't jump to line 2043 because the condition on line 2041 was always true

2042 kpf_path.path_hint = p 

2043 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2043 ↛ 2055line 2043 didn't jump to line 2055 because the condition on line 2043 was always true

2044 docs = v.get("documentation-uris") 

2045 if docs is not None and isinstance(docs, list): 

2046 docs = [ 

2047 ( 

2048 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

2049 if isinstance(d, str) 

2050 else d 

2051 ) 

2052 for d in docs 

2053 ] 

2054 v["documentation-uris"] = docs 

2055 known_packaging_file: KnownPackagingFileInfo = ( 

2056 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

2057 v, 

2058 kpf_path, 

2059 ) 

2060 ) 

2061 known_packaging_files.append((kpf_path, known_packaging_file)) 

2062 

2063 def _initialize_json_provided_known_packaging_files( 

2064 api: DebputyPluginInitializerProvider, 

2065 ) -> None: 

2066 for p, details in known_packaging_files: 

2067 try: 

2068 api.known_packaging_files(details) 

2069 except ValueError as ex: 

2070 raise PluginMetadataError( 

2071 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

2072 ) 

2073 

2074 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

2075 

2076 if manifest_variables_raw: 

2077 manifest_var_path = attribute_path["manifest_variables"] 

2078 manifest_variables = [ 

2079 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

2080 for i, p in enumerate(manifest_variables_raw) 

2081 ] 

2082 

2083 def _initialize_json_provided_manifest_vars( 

2084 api: DebputyPluginInitializer, 

2085 ) -> None: 

2086 for idx, manifest_variable in enumerate(manifest_variables): 

2087 name = manifest_variable["name"] 

2088 value = manifest_variable["value"] 

2089 doc = manifest_variable.get("reference_documentation") 

2090 try: 

2091 api.manifest_variable( 

2092 name, value, variable_reference_documentation=doc 

2093 ) 

2094 except ValueError as ex: 

2095 var_path = manifest_var_path[idx] 

2096 raise PluginMetadataError( 

2097 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

2098 ) 

2099 

2100 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

2101 

2102 if packager_provided_files_raw: 

2103 ppf_path = attribute_path["packager_provided_files"] 

2104 ppfs = [ 

2105 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

2106 for i, p in enumerate(packager_provided_files_raw) 

2107 ] 

2108 

2109 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

2110 ppf: PackagerProvidedFileJsonDescription 

2111 for idx, ppf in enumerate(ppfs): 

2112 c = dict(ppf) 

2113 stem = ppf["stem"] 

2114 installed_path = ppf["installed_path"] 

2115 default_mode = ppf.get("default_mode") 

2116 ref_doc_dict = ppf.get("reference_documentation") 

2117 if default_mode is not None: 2117 ↛ 2120line 2117 didn't jump to line 2120 because the condition on line 2117 was always true

2118 c["default_mode"] = default_mode.octal_mode 

2119 

2120 if ref_doc_dict is not None: 2120 ↛ 2125line 2120 didn't jump to line 2125 because the condition on line 2120 was always true

2121 ref_doc = packager_provided_file_reference_documentation( 

2122 **ref_doc_dict 

2123 ) 

2124 else: 

2125 ref_doc = None 

2126 

2127 for k in [ 

2128 "stem", 

2129 "installed_path", 

2130 "reference_documentation", 

2131 ]: 

2132 try: 

2133 del c[k] 

2134 except KeyError: 

2135 pass 

2136 

2137 try: 

2138 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

2139 except ValueError as ex: 

2140 p_path = ppf_path[idx] 

2141 raise PluginMetadataError( 

2142 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

2143 ) 

2144 

2145 plugin_initializers.append(_initialize_json_provided_ppfs) 

2146 

2147 if not plugin_initializers: 2147 ↛ 2148line 2147 didn't jump to line 2148 because the condition on line 2147 was never true

2148 raise PluginMetadataError( 

2149 f"The plugin defined in {json_file_path} does not seem to provide features," 

2150 f" such as module + plugin-initializer or packager-provided-files." 

2151 ) 

2152 

2153 if len(plugin_initializers) == 1: 

2154 return plugin_initializers[0] 

2155 

2156 def _chain_loader(api: DebputyPluginInitializer) -> None: 

2157 for initializer in plugin_initializers: 

2158 initializer(api) 

2159 

2160 return _chain_loader 

2161 

2162 

2163@overload 

2164@contextlib.contextmanager 

2165def _open( 2165 ↛ exitline 2165 didn't return from function '_open' because

2166 path: str, 

2167 fd: IO[AnyStr] | IOBase = ..., 

2168) -> Iterator[IO[AnyStr] | IOBase]: ... 

2169 

2170 

2171@overload 

2172@contextlib.contextmanager 

2173def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2173 ↛ exitline 2173 didn't return from function '_open' because

2174 

2175 

2176@contextlib.contextmanager 

2177def _open( 

2178 path: str, fd: IO[AnyStr] | IOBase | None = None 

2179) -> Iterator[IO[AnyStr] | IOBase]: 

2180 if fd is not None: 

2181 yield fd 

2182 else: 

2183 with open(path, "rb") as fd: 

2184 yield fd 

2185 

2186 

2187def _resolve_json_plugin_docs_path( 

2188 plugin_name: str, 

2189 plugin_path: str, 

2190) -> Traversable | Path | None: 

2191 plugin_dir = os.path.dirname(plugin_path) 

2192 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml")) 

2193 

2194 

2195def parse_json_plugin_desc( 

2196 path: str, 

2197 *, 

2198 fd: IO[AnyStr] | IOBase | None = None, 

2199 is_from_python_path: bool = False, 

2200) -> DebputyPluginMetadata: 

2201 with _open(path, fd=fd) as rfd: 

2202 try: 

2203 raw = json.load(rfd) 

2204 except JSONDecodeError as e: 

2205 raise PluginMetadataError( 

2206 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

2207 ) from e 

2208 plugin_name = os.path.basename(path) 

2209 if plugin_name.endswith(".json"): 

2210 plugin_name = plugin_name[:-5] 

2211 elif plugin_name.endswith(".json.in"): 

2212 plugin_name = plugin_name[:-8] 

2213 

2214 if plugin_name == "debputy": 2214 ↛ 2216line 2214 didn't jump to line 2216 because the condition on line 2214 was never true

2215 # Provide a better error message than "The plugin has already loaded!?" 

2216 raise PluginMetadataError( 

2217 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

2218 f" clash with the bundled plugin of same name." 

2219 ) 

2220 

2221 attribute_path = AttributePath.root_path(raw) 

2222 

2223 try: 

2224 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

2225 raw, 

2226 attribute_path, 

2227 ) 

2228 except ManifestParseException as e: 

2229 raise PluginMetadataError( 

2230 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

2231 ) from e 

2232 api_compat = plugin_json_metadata["api_compat_version"] 

2233 

2234 return DebputyPluginMetadata( 

2235 plugin_name=plugin_name, 

2236 plugin_loader=lambda: _json_plugin_loader( 

2237 plugin_name, 

2238 plugin_json_metadata, 

2239 path, 

2240 attribute_path, 

2241 ), 

2242 api_compat_version=api_compat, 

2243 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path( 

2244 plugin_name, path 

2245 ), 

2246 plugin_initializer=None, 

2247 plugin_path=path, 

2248 is_from_python_path=is_from_python_path, 

2249 ) 

2250 

2251 

2252@dataclasses.dataclass(slots=True, frozen=True) 

2253class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

2254 name: str 

2255 names: Sequence[str] 

2256 path: VirtualPath 

2257 type_of_service: str 

2258 service_scope: str 

2259 auto_enable_on_install: bool 

2260 auto_start_on_install: bool 

2261 on_upgrade: ServiceUpgradeRule 

2262 definition_source: str 

2263 is_plugin_provided_definition: bool 

2264 service_context: DSD | None 

2265 

2266 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

2267 return dataclasses.replace(self, **changes) 

2268 

2269 

2270class ServiceRegistryImpl(ServiceRegistry[DSD]): 

2271 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

2272 

2273 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

2274 self._service_manager_details = service_manager_details 

2275 self._service_definitions: list[ServiceDefinition[DSD]] = [] 

2276 self._seen_services: set[tuple[str, str, str]] = set() 

2277 

2278 @property 

2279 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

2280 return self._service_definitions 

2281 

2282 def register_service( 

2283 self, 

2284 path: VirtualPath, 

2285 name: str | list[str], 

2286 *, 

2287 type_of_service: str = "service", # "timer", etc. 

2288 service_scope: str = "system", 

2289 enable_by_default: bool = True, 

2290 start_by_default: bool = True, 

2291 default_upgrade_rule: ServiceUpgradeRule = "restart", 

2292 service_context: DSD | None = None, 

2293 ) -> None: 

2294 names = name if isinstance(name, list) else [name] 

2295 if len(names) < 1: 

2296 raise ValueError( 

2297 f"The service must have at least one name - {path.absolute} did not have any" 

2298 ) 

2299 for n in names: 

2300 key = (n, type_of_service, service_scope) 

2301 if key in self._seen_services: 

2302 raise PluginAPIViolationError( 

2303 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

2304 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

2305 " allowed by the debputy plugin API." 

2306 ) 

2307 # TODO: We cannot create a service definition immediate once the manifest is involved 

2308 self._service_definitions.append( 

2309 ServiceDefinitionImpl( 

2310 names[0], 

2311 names, 

2312 path, 

2313 type_of_service, 

2314 service_scope, 

2315 enable_by_default, 

2316 start_by_default, 

2317 default_upgrade_rule, 

2318 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

2319 True, 

2320 service_context, 

2321 ) 

2322 )