Coverage for src/debputy/plugin/api/impl.py: 60%

878 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import contextlib 

2import dataclasses 

3import functools 

4import importlib 

5import importlib.resources 

6import importlib.util 

7import itertools 

8import json 

9import os 

10import re 

11import subprocess 

12import sys 

13from abc import ABC 

14from importlib.resources.abc import Traversable 

15from json import JSONDecodeError 

16from pathlib import Path 

17from typing import ( 

18 Optional, 

19 Callable, 

20 Dict, 

21 Tuple, 

22 Iterable, 

23 Sequence, 

24 Type, 

25 List, 

26 Union, 

27 Set, 

28 Iterator, 

29 IO, 

30 Mapping, 

31 AbstractSet, 

32 cast, 

33 FrozenSet, 

34 Any, 

35 Literal, 

36 Container, 

37 TYPE_CHECKING, 

38 is_typeddict, 

39) 

40 

41import debputy 

42from debputy import DEBPUTY_DOC_ROOT_DIR 

43from debputy.exceptions import ( 

44 DebputySubstitutionError, 

45 PluginConflictError, 

46 PluginMetadataError, 

47 PluginBaseError, 

48 PluginInitializationError, 

49 PluginAPIViolationError, 

50 PluginNotFoundError, 

51 PluginIncorrectRegistrationError, 

52) 

53from debputy.maintscript_snippet import ( 

54 STD_CONTROL_SCRIPTS, 

55 MaintscriptSnippetContainer, 

56 MaintscriptSnippet, 

57) 

58from debputy.manifest_parser.exceptions import ManifestParseException 

59from debputy.manifest_parser.parser_data import ParserContextData 

60from debputy.manifest_parser.tagging_types import TypeMapping 

61from debputy.manifest_parser.util import AttributePath 

62from debputy.manifest_parser.util import resolve_package_type_selectors 

63from debputy.plugin.api.doc_parsing import ( 

64 DEBPUTY_DOC_REFERENCE_DATA_PARSER, 

65 parser_type_name, 

66 DebputyParsedDoc, 

67) 

68from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

69from debputy.plugin.api.impl_types import ( 

70 DebputyPluginMetadata, 

71 PackagerProvidedFileClassSpec, 

72 MetadataOrMaintscriptDetector, 

73 PluginProvidedTrigger, 

74 TTP, 

75 DIPHandler, 

76 PF, 

77 SF, 

78 DIPKWHandler, 

79 PluginProvidedManifestVariable, 

80 PluginProvidedPackageProcessor, 

81 PluginProvidedDiscardRule, 

82 AutomaticDiscardRuleExample, 

83 PPFFormatParam, 

84 ServiceManagerDetails, 

85 KnownPackagingFileInfo, 

86 PluginProvidedKnownPackagingFile, 

87 DHCompatibilityBasedRule, 

88 PluginProvidedTypeMapping, 

89 PluginProvidedBuildSystemAutoDetection, 

90 BSR, 

91) 

92from debputy.plugin.api.plugin_parser import ( 

93 PLUGIN_METADATA_PARSER, 

94 PluginJsonMetadata, 

95 PLUGIN_PPF_PARSER, 

96 PackagerProvidedFileJsonDescription, 

97 PLUGIN_MANIFEST_VARS_PARSER, 

98 PLUGIN_KNOWN_PACKAGING_FILES_PARSER, 

99) 

100from debputy.plugin.api.spec import ( 

101 MaintscriptAccessor, 

102 Maintscript, 

103 DpkgTriggerType, 

104 BinaryCtrlAccessor, 

105 PackageProcessingContext, 

106 MetadataAutoDetector, 

107 PluginInitializationEntryPoint, 

108 DebputyPluginInitializer, 

109 PackageTypeSelector, 

110 FlushableSubstvars, 

111 ParserDocumentation, 

112 PackageProcessor, 

113 VirtualPath, 

114 ServiceIntegrator, 

115 ServiceDetector, 

116 ServiceRegistry, 

117 ServiceDefinition, 

118 DSD, 

119 ServiceUpgradeRule, 

120 PackagerProvidedFileReferenceDocumentation, 

121 packager_provided_file_reference_documentation, 

122 TypeMappingDocumentation, 

123 DebputyIntegrationMode, 

124 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

125 BuildSystemManifestRuleMetadata, 

126 INTEGRATION_MODE_FULL, 

127 only_integrations, 

128 DebputyPluginDefinition, 

129) 

130from debputy.plugin.api.std_docs import _STD_ATTR_DOCS 

131from debputy.plugins.debputy.to_be_api_types import ( 

132 BuildRuleParsedFormat, 

133 BSPF, 

134 debputy_build_system, 

135) 

136from debputy.plugin.plugin_state import ( 

137 run_in_context_of_plugin, 

138 run_in_context_of_plugin_wrap_errors, 

139 wrap_plugin_code, 

140) 

141from debputy.substitution import ( 

142 Substitution, 

143 VariableNameState, 

144 SUBST_VAR_RE, 

145 VariableContext, 

146) 

147from debputy.util import ( 

148 _normalize_path, 

149 POSTINST_DEFAULT_CONDITION, 

150 _error, 

151 print_command, 

152 _warn, 

153 _debug_log, 

154) 

155from debputy.yaml import MANIFEST_YAML 

156 

157if TYPE_CHECKING: 

158 from debputy.highlevel_manifest import HighLevelManifest 

159 

160PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$") 

161PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__) 

162 

163 

164def _validate_known_packaging_file_dh_compat_rules( 

165 dh_compat_rules: Optional[List[DHCompatibilityBasedRule]], 

166) -> None: 

167 max_compat = None 

168 if not dh_compat_rules: 168 ↛ 171line 168 didn't jump to line 171 because the condition on line 168 was always true

169 return 

170 dh_compat_rule: DHCompatibilityBasedRule 

171 for idx, dh_compat_rule in enumerate(dh_compat_rules): 

172 dh_version = dh_compat_rule.get("starting_with_debhelper_version") 

173 compat = dh_compat_rule.get("starting_with_compat_level") 

174 

175 remaining = dh_compat_rule.keys() - { 

176 "after_debhelper_version", 

177 "starting_with_compat_level", 

178 } 

179 if not remaining: 

180 raise ValueError( 

181 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?" 

182 ) 

183 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1: 

184 raise ValueError( 

185 f"The dh compat-rule at index {idx} is not the last and is missing either" 

186 " before-debhelper-version or before-compat-level" 

187 ) 

188 if compat is not None and compat < 0: 

189 raise ValueError( 

190 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule" 

191 f" for something that appeared when migrating from {compat} to {compat + 1}." 

192 ) 

193 

194 if max_compat is None: 

195 max_compat = compat 

196 elif compat is not None: 

197 if compat >= max_compat: 

198 raise ValueError( 

199 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}." 

200 ) 

201 max_compat = compat 

202 

203 install_pattern = dh_compat_rule.get("install_pattern") 

204 if ( 

205 install_pattern is not None 

206 and _normalize_path(install_pattern, with_prefix=False) != install_pattern 

207 ): 

208 raise ValueError( 

209 f"The install-pattern in dh compat-rule at {idx} must be normalized as" 

210 f' "{_normalize_path(install_pattern, with_prefix=False)}".' 

211 ) 

212 

213 

214class DebputyPluginInitializerProvider(DebputyPluginInitializer): 

215 __slots__ = ( 

216 "_plugin_metadata", 

217 "_feature_set", 

218 "_plugin_detector_ids", 

219 "_substitution", 

220 "_unloaders", 

221 "_is_doc_cache_resolved", 

222 "_doc_cache", 

223 "_load_started", 

224 ) 

225 

226 def __init__( 

227 self, 

228 plugin_metadata: DebputyPluginMetadata, 

229 feature_set: PluginProvidedFeatureSet, 

230 substitution: Substitution, 

231 ) -> None: 

232 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata 

233 self._feature_set = feature_set 

234 self._plugin_detector_ids: Set[str] = set() 

235 self._substitution = substitution 

236 self._unloaders: List[Callable[[], None]] = [] 

237 self._is_doc_cache_resolved: bool = False 

238 self._doc_cache: Optional[DebputyParsedDoc] = None 

239 self._load_started = False 

240 

241 @property 

242 def plugin_metadata(self) -> DebputyPluginMetadata: 

243 return self._plugin_metadata 

244 

245 def unload_plugin(self) -> None: 

246 if self._load_started: 

247 for unloader in self._unloaders: 

248 unloader() 

249 del self._feature_set.plugin_data[self._plugin_name] 

250 

251 def load_plugin(self) -> None: 

252 metadata = self._plugin_metadata 

253 if metadata.plugin_name in self._feature_set.plugin_data: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 raise PluginConflictError( 

255 f'The plugin "{metadata.plugin_name}" has already been loaded!?' 

256 ) 

257 assert ( 

258 metadata.api_compat_version == 1 

259 ), f"Unsupported plugin API compat version {metadata.api_compat_version}" 

260 self._feature_set.plugin_data[metadata.plugin_name] = metadata 

261 self._load_started = True 

262 assert not metadata.is_initialized 

263 try: 

264 metadata.initialize_plugin(self) 

265 except Exception as e: 

266 initializer = metadata.plugin_initializer 

267 if ( 267 ↛ 272line 267 didn't jump to line 272 because the condition on line 267 was never true

268 isinstance(e, TypeError) 

269 and initializer is not None 

270 and not callable(initializer) 

271 ): 

272 raise PluginMetadataError( 

273 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a" 

274 f" callable (callable returns False). The specified entry point identifies" 

275 f' itself as "{initializer.__qualname__}".' 

276 ) from e 

277 elif isinstance(e, PluginBaseError): 277 ↛ 279line 277 didn't jump to line 279 because the condition on line 277 was always true

278 raise 

279 raise PluginInitializationError( 

280 f"Exception while attempting to load plugin {metadata.plugin_name}" 

281 ) from e 

282 

283 def _resolve_docs(self) -> Optional[DebputyParsedDoc]: 

284 doc_cache = self._doc_cache 

285 if doc_cache is not None: 

286 return doc_cache 

287 

288 plugin_doc_path = self._plugin_metadata.plugin_doc_path 

289 if plugin_doc_path is None or self._is_doc_cache_resolved: 

290 self._is_doc_cache_resolved = True 

291 return None 

292 try: 

293 with plugin_doc_path.open("r", encoding="utf-8") as fd: 

294 raw = MANIFEST_YAML.load(fd) 

295 except FileNotFoundError: 

296 _debug_log( 

297 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}" 

298 ) 

299 self._is_doc_cache_resolved = True 

300 return None 

301 attr_path = AttributePath.root_path(plugin_doc_path) 

302 try: 

303 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path) 

304 except ManifestParseException as e: 

305 raise ValueError( 

306 f"Could not parse documentation in {plugin_doc_path}: {e.message}" 

307 ) from e 

308 try: 

309 res = DebputyParsedDoc.from_ref_data(ref) 

310 except ValueError as e: 

311 raise ValueError( 

312 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}" 

313 ) from e 

314 

315 self._doc_cache = res 

316 self._is_doc_cache_resolved = True 

317 return res 

318 

319 def _pluggable_manifest_docs_for( 

320 self, 

321 rule_type: Union[TTP, str], 

322 rule_name: Union[str, List[str]], 

323 *, 

324 inline_reference_documentation: Optional[ParserDocumentation] = None, 

325 ) -> Optional[ParserDocumentation]: 

326 ref_data = self._resolve_docs() 

327 if ref_data is not None: 

328 primary_rule_name = ( 

329 rule_name if isinstance(rule_name, str) else rule_name[0] 

330 ) 

331 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}" 

332 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref) 

333 if resolved_docs is not None: 

334 if inline_reference_documentation is not None: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 raise ValueError( 

336 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via" 

337 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so" 

338 f" there is only one doc reference" 

339 ) 

340 return resolved_docs 

341 return inline_reference_documentation 

342 

343 def packager_provided_file( 

344 self, 

345 stem: str, 

346 installed_path: str, 

347 *, 

348 default_mode: int = 0o0644, 

349 default_priority: Optional[int] = None, 

350 allow_name_segment: bool = True, 

351 allow_architecture_segment: bool = False, 

352 post_formatting_rewrite: Optional[Callable[[str], str]] = None, 

353 packageless_is_fallback_for_all_packages: bool = False, 

354 reservation_only: bool = False, 

355 format_callback: Optional[ 

356 Callable[[str, PPFFormatParam, VirtualPath], str] 

357 ] = None, 

358 reference_documentation: Optional[ 

359 PackagerProvidedFileReferenceDocumentation 

360 ] = None, 

361 ) -> None: 

362 packager_provided_files = self._feature_set.packager_provided_files 

363 existing = packager_provided_files.get(stem) 

364 

365 if format_callback is not None and self._plugin_name != "debputy": 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 raise ValueError( 

367 "Sorry; Using format_callback is a debputy-internal" 

368 f" API. Triggered by plugin {self._plugin_name}" 

369 ) 

370 

371 if installed_path.endswith("/"): 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 raise ValueError( 

373 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.' 

374 f" Triggered by plugin {self._plugin_name}." 

375 ) 

376 

377 installed_path = _normalize_path(installed_path) 

378 

379 has_name_var = "{name}" in installed_path 

380 

381 if installed_path.startswith("./DEBIAN") or reservation_only: 

382 # Special-case, used for control files. 

383 if self._plugin_name != "debputy": 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true

384 raise ValueError( 

385 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal" 

386 f" API. Triggered by plugin {self._plugin_name}" 

387 ) 

388 elif not has_name_var and "{owning_package}" not in installed_path: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true

389 raise ValueError( 

390 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"' 

391 " substitution (or have installed_path end with a slash). Otherwise, the installed" 

392 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}" 

393 ) 

394 

395 if allow_name_segment and not has_name_var: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 raise ValueError( 

397 'When allow_name_segment is True, the installed_path must have a "{name}" substitution' 

398 " variable. Otherwise, the name segment will not work properly. Triggered by" 

399 f" plugin {self._plugin_name}" 

400 ) 

401 

402 if ( 402 ↛ 407line 402 didn't jump to line 407 because the condition on line 402 was never true

403 default_priority is not None 

404 and "{priority}" not in installed_path 

405 and "{priority:02}" not in installed_path 

406 ): 

407 raise ValueError( 

408 'When default_priority is not None, the installed_path should have a "{priority}"' 

409 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.' 

410 f" Triggered by plugin {self._plugin_name}" 

411 ) 

412 

413 if existing is not None: 

414 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 414 ↛ 421line 414 didn't jump to line 421 because the condition on line 414 was always true

415 message = ( 

416 f'The stem "{stem}" is registered twice for packager provided files.' 

417 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once" 

418 f" by {self._plugin_name}" 

419 ) 

420 else: 

421 message = ( 

422 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

423 f' stem "{stem}" twice for packager provided files.' 

424 ) 

425 raise PluginConflictError( 

426 message, existing.debputy_plugin_metadata, self._plugin_metadata 

427 ) 

428 packager_provided_files[stem] = PackagerProvidedFileClassSpec( 

429 self._plugin_metadata, 

430 stem, 

431 installed_path, 

432 default_mode=default_mode, 

433 default_priority=default_priority, 

434 allow_name_segment=allow_name_segment, 

435 allow_architecture_segment=allow_architecture_segment, 

436 post_formatting_rewrite=post_formatting_rewrite, 

437 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages, 

438 reservation_only=reservation_only, 

439 formatting_callback=format_callback, 

440 reference_documentation=reference_documentation, 

441 ) 

442 

443 def _unload() -> None: 

444 del packager_provided_files[stem] 

445 

446 self._unloaders.append(_unload) 

447 

448 def metadata_or_maintscript_detector( 

449 self, 

450 auto_detector_id: str, 

451 auto_detector: MetadataAutoDetector, 

452 *, 

453 package_type: PackageTypeSelector = "deb", 

454 ) -> None: 

455 if auto_detector_id in self._plugin_detector_ids: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 raise ValueError( 

457 f"The plugin {self._plugin_name} tried to register" 

458 f' "{auto_detector_id}" twice' 

459 ) 

460 self._plugin_detector_ids.add(auto_detector_id) 

461 all_detectors = self._feature_set.metadata_maintscript_detectors 

462 if self._plugin_name not in all_detectors: 

463 all_detectors[self._plugin_name] = [] 

464 package_types = resolve_package_type_selectors(package_type) 

465 all_detectors[self._plugin_name].append( 

466 MetadataOrMaintscriptDetector( 

467 detector_id=auto_detector_id, 

468 detector=wrap_plugin_code(self._plugin_name, auto_detector), 

469 plugin_metadata=self._plugin_metadata, 

470 applies_to_package_types=package_types, 

471 enabled=True, 

472 ) 

473 ) 

474 

475 def _unload() -> None: 

476 if self._plugin_name in all_detectors: 

477 del all_detectors[self._plugin_name] 

478 

479 self._unloaders.append(_unload) 

480 

481 def document_builtin_variable( 

482 self, 

483 variable_name: str, 

484 variable_reference_documentation: str, 

485 *, 

486 is_context_specific: bool = False, 

487 is_for_special_case: bool = False, 

488 ) -> None: 

489 manifest_variables = self._feature_set.manifest_variables 

490 self._restricted_api() 

491 state = self._substitution.variable_state(variable_name) 

492 if state == VariableNameState.UNDEFINED: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 raise ValueError( 

494 f"The plugin {self._plugin_name} attempted to document built-in {variable_name}," 

495 f" but it is not known to be a variable" 

496 ) 

497 

498 assert variable_name not in manifest_variables 

499 

500 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

501 self._plugin_metadata, 

502 variable_name, 

503 None, 

504 is_context_specific_variable=is_context_specific, 

505 variable_reference_documentation=variable_reference_documentation, 

506 is_documentation_placeholder=True, 

507 is_for_special_case=is_for_special_case, 

508 ) 

509 

510 def _unload() -> None: 

511 del manifest_variables[variable_name] 

512 

513 self._unloaders.append(_unload) 

514 

515 def manifest_variable_provider( 

516 self, 

517 provider: Callable[[VariableContext], Mapping[str, str]], 

518 variables: Union[Sequence[str], Mapping[str, Optional[str]]], 

519 ) -> None: 

520 self._restricted_api() 

521 cached_provider = functools.lru_cache(None)(provider) 

522 permitted_variables = frozenset(variables) 

523 variables_iter: Iterable[Tuple[str, Optional[str]]] 

524 if not isinstance(variables, Mapping): 524 ↛ 525line 524 didn't jump to line 525 because the condition on line 524 was never true

525 variables_iter = zip(variables, itertools.repeat(None)) 

526 else: 

527 variables_iter = variables.items() 

528 

529 checked_vars = False 

530 manifest_variables = self._feature_set.manifest_variables 

531 plugin_name = self._plugin_name 

532 

533 def _value_resolver_generator( 

534 variable_name: str, 

535 ) -> Callable[[VariableContext], str]: 

536 def _value_resolver(variable_context: VariableContext) -> str: 

537 res = cached_provider(variable_context) 

538 nonlocal checked_vars 

539 if not checked_vars: 539 ↛ 550line 539 didn't jump to line 550 because the condition on line 539 was always true

540 if permitted_variables != res.keys(): 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 expected = ", ".join(sorted(permitted_variables)) 

542 actual = ", ".join(sorted(res)) 

543 raise PluginAPIViolationError( 

544 f"The plugin {plugin_name} claimed to provide" 

545 f" the following variables {expected}," 

546 f" but when resolving the variables, the plugin provided" 

547 f" {actual}. These two lists should have been the same." 

548 ) 

549 checked_vars = False 

550 return res[variable_name] 

551 

552 return _value_resolver 

553 

554 for varname, vardoc in variables_iter: 

555 self._check_variable_name(varname) 

556 manifest_variables[varname] = PluginProvidedManifestVariable( 

557 self._plugin_metadata, 

558 varname, 

559 _value_resolver_generator(varname), 

560 is_context_specific_variable=False, 

561 variable_reference_documentation=vardoc, 

562 ) 

563 

564 def _unload() -> None: 

565 raise PluginInitializationError( 

566 "Cannot unload manifest_variable_provider (not implemented)" 

567 ) 

568 

569 self._unloaders.append(_unload) 

570 

571 def _check_variable_name(self, variable_name: str) -> None: 

572 manifest_variables = self._feature_set.manifest_variables 

573 existing = manifest_variables.get(variable_name) 

574 

575 if existing is not None: 

576 if existing.plugin_metadata.plugin_name == self._plugin_name: 576 ↛ 582line 576 didn't jump to line 582 because the condition on line 576 was always true

577 message = ( 

578 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

579 f' manifest variable "{variable_name}" twice.' 

580 ) 

581 else: 

582 message = ( 

583 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

584 f" both tried to provide the manifest variable {variable_name}" 

585 ) 

586 raise PluginConflictError( 

587 message, existing.plugin_metadata, self._plugin_metadata 

588 ) 

589 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"): 

590 raise ValueError( 

591 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

592 f" which is not a valid variable name" 

593 ) 

594 

595 namespace = "" 

596 variable_basename = variable_name 

597 if ":" in variable_name: 

598 namespace, variable_basename = variable_name.rsplit(":", 1) 

599 assert namespace != "" 

600 assert variable_name != "" 

601 

602 if namespace != "" and namespace not in ("token", "path"): 

603 raise ValueError( 

604 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

605 f" which is in the reserved namespace {namespace}" 

606 ) 

607 

608 variable_name_upper = variable_name.upper() 

609 if ( 

610 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY")) 

611 or variable_basename.startswith("_") 

612 or variable_basename.upper().startswith("DEBPUTY") 

613 ) and self._plugin_name != "debputy": 

614 raise ValueError( 

615 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

616 f" which is a variable name reserved by debputy" 

617 ) 

618 

619 state = self._substitution.variable_state(variable_name) 

620 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy": 

621 raise ValueError( 

622 f"The plugin {self._plugin_name} attempted to declare {variable_name}," 

623 f" which would shadow a built-in variable" 

624 ) 

625 

626 def package_processor( 

627 self, 

628 processor_id: str, 

629 processor: PackageProcessor, 

630 *, 

631 depends_on_processor: Iterable[str] = tuple(), 

632 package_type: PackageTypeSelector = "deb", 

633 ) -> None: 

634 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"}) 

635 package_processors = self._feature_set.all_package_processors 

636 dependencies = set() 

637 processor_key = (self._plugin_name, processor_id) 

638 

639 if processor_key in package_processors: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 raise PluginConflictError( 

641 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}", 

642 self._plugin_metadata, 

643 self._plugin_metadata, 

644 ) 

645 

646 for depends_ref in depends_on_processor: 

647 if isinstance(depends_ref, str): 647 ↛ 661line 647 didn't jump to line 661 because the condition on line 647 was always true

648 if (self._plugin_name, depends_ref) in package_processors: 648 ↛ 650line 648 didn't jump to line 650 because the condition on line 648 was always true

649 depends_key = (self._plugin_name, depends_ref) 

650 elif ("debputy", depends_ref) in package_processors: 

651 depends_key = ("debputy", depends_ref) 

652 else: 

653 raise ValueError( 

654 f'Could not resolve dependency "{depends_ref}" for' 

655 f' "{processor_id}". It was not provided by the plugin itself' 

656 f" ({self._plugin_name}) nor debputy." 

657 ) 

658 else: 

659 # TODO: Add proper dependencies first, at which point we should probably resolve "name" 

660 # via the direct dependencies. 

661 assert False 

662 

663 existing_processor = package_processors.get(depends_key) 

664 if existing_processor is None: 664 ↛ 667line 664 didn't jump to line 667 because the condition on line 664 was never true

665 # We currently require the processor to be declared already. If this ever changes, 

666 # PluginProvidedFeatureSet.package_processors_in_order will need an update 

667 dplugin_name, dprocessor_name = depends_key 

668 available_processors = ", ".join( 

669 n for p, n in package_processors.keys() if p == dplugin_name 

670 ) 

671 raise ValueError( 

672 f"The plugin {dplugin_name} does not provide a processor called" 

673 f" {dprocessor_name}. Available processors for that plugin are:" 

674 f" {available_processors}" 

675 ) 

676 dependencies.add(depends_key) 

677 

678 package_processors[processor_key] = PluginProvidedPackageProcessor( 

679 processor_id, 

680 resolve_package_type_selectors(package_type), 

681 wrap_plugin_code(self._plugin_name, processor), 

682 frozenset(dependencies), 

683 self._plugin_metadata, 

684 ) 

685 

686 def _unload() -> None: 

687 del package_processors[processor_key] 

688 

689 self._unloaders.append(_unload) 

690 

691 def automatic_discard_rule( 

692 self, 

693 name: str, 

694 should_discard: Callable[[VirtualPath], bool], 

695 *, 

696 rule_reference_documentation: Optional[str] = None, 

697 examples: Union[ 

698 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample] 

699 ] = tuple(), 

700 ) -> None: 

701 """Register an automatic discard rule 

702 

703 An automatic discard rule is basically applied to *every* path about to be installed in to any package. 

704 If any discard rule concludes that a path should not be installed, then the path is not installed. 

705 In the case where the discard path is a: 

706 

707 * directory: Then the entire directory is excluded along with anything beneath it. 

708 * symlink: Then the symlink itself (but not its target) is excluded. 

709 * hardlink: Then the current hardlink will not be installed, but other instances of it will be. 

710 

711 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file. 

712 

713 Automatic discard rules should be written with the assumption that directories will be tested 

714 before their content *when it is relevant* for the discard rule to examine whether the directory 

715 can be excluded. 

716 

717 The packager can via the manifest overrule automatic discard rules by explicitly listing the path 

718 without any globs. As example: 

719 

720 installations: 

721 - install: 

722 sources: 

723 - usr/lib/libfoo.la # <-- This path is always installed 

724 # (Discard rules are never asked in this case) 

725 # 

726 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches 

727 # Though, they will not examine `libfoo.la` as it has already been installed 

728 # 

729 # Note: usr/lib itself is never tested in this case (it is assumed to be 

730 # explicitly requested). But any subdir of usr/lib will be examined. 

731 

732 When an automatic discard rule is evaluated, it can see the source path currently being considered 

733 for installation. While it can look at "surrounding" context (like parent directory), it will not 

734 know whether those paths are to be installed or will be installed. 

735 

736 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell 

737 metacharacters and spaces. 

738 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive 

739 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the 

740 path is discarded. If it returns `False`, the path is not discarded (by this rule at least). 

741 A source path will either be from the root of the source tree or the root of a search directory such as 

742 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is 

743 evaluated. 

744 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user 

745 looks up this automatic discard rule. 

746 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to 

747 generate the examples. 

748 

749 """ 

750 self._restricted_api() 

751 auto_discard_rules = self._feature_set.auto_discard_rules 

752 existing = auto_discard_rules.get(name) 

753 if existing is not None: 753 ↛ 754line 753 didn't jump to line 754 because the condition on line 753 was never true

754 if existing.plugin_metadata.plugin_name == self._plugin_name: 

755 message = ( 

756 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

757 f' automatic discard rule "{name}" twice.' 

758 ) 

759 else: 

760 message = ( 

761 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

762 f" both tried to provide the automatic discard rule {name}" 

763 ) 

764 raise PluginConflictError( 

765 message, existing.plugin_metadata, self._plugin_metadata 

766 ) 

767 examples = ( 

768 (examples,) 

769 if isinstance(examples, AutomaticDiscardRuleExample) 

770 else tuple(examples) 

771 ) 

772 auto_discard_rules[name] = PluginProvidedDiscardRule( 

773 name, 

774 self._plugin_metadata, 

775 should_discard, 

776 rule_reference_documentation, 

777 examples, 

778 ) 

779 

780 def _unload() -> None: 

781 del auto_discard_rules[name] 

782 

783 self._unloaders.append(_unload) 

784 

785 def service_provider( 

786 self, 

787 service_manager: str, 

788 detector: ServiceDetector, 

789 integrator: ServiceIntegrator, 

790 ) -> None: 

791 self._restricted_api() 

792 service_managers = self._feature_set.service_managers 

793 existing = service_managers.get(service_manager) 

794 if existing is not None: 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 if existing.plugin_metadata.plugin_name == self._plugin_name: 

796 message = ( 

797 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

798 f' service manager "{service_manager}" twice.' 

799 ) 

800 else: 

801 message = ( 

802 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

803 f' both tried to provide the service manager "{service_manager}"' 

804 ) 

805 raise PluginConflictError( 

806 message, existing.plugin_metadata, self._plugin_metadata 

807 ) 

808 service_managers[service_manager] = ServiceManagerDetails( 

809 service_manager, 

810 wrap_plugin_code(self._plugin_name, detector), 

811 wrap_plugin_code(self._plugin_name, integrator), 

812 self._plugin_metadata, 

813 ) 

814 

815 def _unload() -> None: 

816 del service_managers[service_manager] 

817 

818 self._unloaders.append(_unload) 

819 

820 def manifest_variable( 

821 self, 

822 variable_name: str, 

823 value: str, 

824 *, 

825 variable_reference_documentation: Optional[str] = None, 

826 ) -> None: 

827 self._check_variable_name(variable_name) 

828 manifest_variables = self._feature_set.manifest_variables 

829 try: 

830 resolved_value = self._substitution.substitute( 

831 value, "Plugin initialization" 

832 ) 

833 depends_on_variable = resolved_value != value 

834 except DebputySubstitutionError: 

835 depends_on_variable = True 

836 if depends_on_variable: 

837 raise ValueError( 

838 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}." 

839 f" This value depends on another variable, which is not supported. This restriction may be" 

840 f" lifted in the future." 

841 ) 

842 

843 manifest_variables[variable_name] = PluginProvidedManifestVariable( 

844 self._plugin_metadata, 

845 variable_name, 

846 value, 

847 is_context_specific_variable=False, 

848 variable_reference_documentation=variable_reference_documentation, 

849 ) 

850 

851 def _unload() -> None: 

852 # We need to check it was never resolved 

853 raise PluginInitializationError( 

854 "Cannot unload manifest_variable (not implemented)" 

855 ) 

856 

857 self._unloaders.append(_unload) 

858 

859 @property 

860 def _plugin_name(self) -> str: 

861 return self._plugin_metadata.plugin_name 

862 

863 def provide_manifest_keyword( 

864 self, 

865 rule_type: TTP, 

866 rule_name: Union[str, List[str]], 

867 handler: DIPKWHandler, 

868 *, 

869 inline_reference_documentation: Optional[ParserDocumentation] = None, 

870 ) -> None: 

871 self._restricted_api() 

872 parser_generator = self._feature_set.manifest_parser_generator 

873 if rule_type not in parser_generator.dispatchable_table_parsers: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true

874 types = ", ".join( 

875 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) 

876 ) 

877 raise ValueError( 

878 f"The rule_type was not a supported type. It must be one of {types}" 

879 ) 

880 

881 inline_reference_documentation = self._pluggable_manifest_docs_for( 

882 rule_type, 

883 rule_name, 

884 inline_reference_documentation=inline_reference_documentation, 

885 ) 

886 

887 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

888 dispatching_parser.register_keyword( 

889 rule_name, 

890 wrap_plugin_code(self._plugin_name, handler), 

891 self._plugin_metadata, 

892 inline_reference_documentation=inline_reference_documentation, 

893 ) 

894 

895 def _unload() -> None: 

896 raise PluginInitializationError( 

897 "Cannot unload provide_manifest_keyword (not implemented)" 

898 ) 

899 

900 self._unloaders.append(_unload) 

901 

902 def pluggable_object_parser( 

903 self, 

904 rule_type: str, 

905 rule_name: str, 

906 *, 

907 object_parser_key: Optional[str] = None, 

908 on_end_parse_step: Optional[ 

909 Callable[ 

910 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData], 

911 None, 

912 ] 

913 ] = None, 

914 nested_in_package_context: bool = False, 

915 ) -> None: 

916 self._restricted_api() 

917 if object_parser_key is None: 917 ↛ 918line 917 didn't jump to line 918 because the condition on line 917 was never true

918 object_parser_key = rule_name 

919 

920 parser_generator = self._feature_set.manifest_parser_generator 

921 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers 

922 if rule_type not in dispatchable_object_parsers: 922 ↛ 923line 922 didn't jump to line 923 because the condition on line 922 was never true

923 types = ", ".join(sorted(dispatchable_object_parsers)) 

924 raise ValueError( 

925 f"The rule_type was not a supported type. It must be one of {types}" 

926 ) 

927 if object_parser_key not in dispatchable_object_parsers: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 types = ", ".join(sorted(dispatchable_object_parsers)) 

929 raise ValueError( 

930 f"The object_parser_key was not a supported type. It must be one of {types}" 

931 ) 

932 parent_dispatcher = dispatchable_object_parsers[rule_type] 

933 child_dispatcher = dispatchable_object_parsers[object_parser_key] 

934 

935 if on_end_parse_step is not None: 935 ↛ 938line 935 didn't jump to line 938 because the condition on line 935 was always true

936 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step) 

937 

938 parent_dispatcher.register_child_parser( 

939 rule_name, 

940 child_dispatcher, 

941 self._plugin_metadata, 

942 on_end_parse_step=on_end_parse_step, 

943 nested_in_package_context=nested_in_package_context, 

944 ) 

945 

946 def _unload() -> None: 

947 raise PluginInitializationError( 

948 "Cannot unload pluggable_object_parser (not implemented)" 

949 ) 

950 

951 self._unloaders.append(_unload) 

952 

953 def pluggable_manifest_rule( 

954 self, 

955 rule_type: Union[TTP, str], 

956 rule_name: Union[str, Sequence[str]], 

957 parsed_format: Type[PF], 

958 handler: DIPHandler, 

959 *, 

960 source_format: Optional[SF] = None, 

961 inline_reference_documentation: Optional[ParserDocumentation] = None, 

962 expected_debputy_integration_mode: Optional[ 

963 Container[DebputyIntegrationMode] 

964 ] = None, 

965 apply_standard_attribute_documentation: bool = False, 

966 ) -> None: 

967 # When changing this, consider which types will be unrestricted 

968 self._restricted_api() 

969 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true

970 _error( 

971 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to" 

972 f" its use of apply_standard_attribute_documentation" 

973 ) 

974 feature_set = self._feature_set 

975 parser_generator = feature_set.manifest_parser_generator 

976 if isinstance(rule_type, str): 

977 if rule_type not in parser_generator.dispatchable_object_parsers: 977 ↛ 978line 977 didn't jump to line 978 because the condition on line 977 was never true

978 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) 

979 raise ValueError( 

980 f"The rule_type was not a supported type. It must be one of {types}" 

981 ) 

982 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] 

983 else: 

984 if rule_type not in parser_generator.dispatchable_table_parsers: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true

985 types = ", ".join( 

986 sorted( 

987 x.__name__ for x in parser_generator.dispatchable_table_parsers 

988 ) 

989 ) 

990 raise ValueError( 

991 f"The rule_type was not a supported type. It must be one of {types}" 

992 ) 

993 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] 

994 

995 inline_reference_documentation = self._pluggable_manifest_docs_for( 

996 rule_type, 

997 rule_name, 

998 inline_reference_documentation=inline_reference_documentation, 

999 ) 

1000 

1001 if apply_standard_attribute_documentation: 1001 ↛ 1002line 1001 didn't jump to line 1002 because the condition on line 1001 was never true

1002 docs = _STD_ATTR_DOCS 

1003 else: 

1004 docs = None 

1005 

1006 parser = feature_set.manifest_parser_generator.generate_parser( 

1007 parsed_format, 

1008 source_content=source_format, 

1009 inline_reference_documentation=inline_reference_documentation, 

1010 expected_debputy_integration_mode=expected_debputy_integration_mode, 

1011 automatic_docs=docs, 

1012 ) 

1013 dispatching_parser.register_parser( 

1014 rule_name, 

1015 parser, 

1016 wrap_plugin_code(self._plugin_name, handler), 

1017 self._plugin_metadata, 

1018 ) 

1019 

1020 def _unload() -> None: 

1021 raise PluginInitializationError( 

1022 "Cannot unload pluggable_manifest_rule (not implemented)" 

1023 ) 

1024 

1025 self._unloaders.append(_unload) 

1026 

1027 def register_build_system( 

1028 self, 

1029 build_system_definition: type[BSPF], 

1030 ) -> None: 

1031 self._restricted_api() 

1032 if not is_typeddict(build_system_definition): 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true

1033 raise PluginInitializationError( 

1034 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__}," 

1035 f" but got {build_system_definition.__name__} instead" 

1036 ) 

1037 metadata = getattr( 

1038 build_system_definition, 

1039 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME, 

1040 None, 

1041 ) 

1042 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true

1043 raise PluginIncorrectRegistrationError( 

1044 f"The {build_system_definition.__qualname__} type should have been annotated with" 

1045 f" @{debputy_build_system.__name__}." 

1046 ) 

1047 assert len(metadata.manifest_keywords) == 1 

1048 build_system_impl = metadata.build_system_impl 

1049 assert build_system_impl is not None 

1050 manifest_keyword = next(iter(metadata.manifest_keywords)) 

1051 self.pluggable_manifest_rule( 

1052 metadata.dispatched_type, 

1053 metadata.manifest_keywords, 

1054 build_system_definition, 

1055 # pluggable_manifest_rule does the wrapping 

1056 metadata.unwrapped_constructor, 

1057 source_format=metadata.source_format, 

1058 inline_reference_documentation=metadata.online_reference_documentation, 

1059 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL), 

1060 ) 

1061 self._auto_detectable_build_system( 

1062 manifest_keyword, 

1063 build_system_impl, 

1064 constructor=wrap_plugin_code( 

1065 self._plugin_name, 

1066 build_system_impl, 

1067 ), 

1068 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems, 

1069 ) 

1070 

1071 def _auto_detectable_build_system( 

1072 self, 

1073 manifest_keyword: str, 

1074 rule_type: type[BSR], 

1075 *, 

1076 shadowing_build_systems_when_active: FrozenSet[str] = frozenset(), 

1077 constructor: Optional[ 

1078 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR] 

1079 ] = None, 

1080 ) -> None: 

1081 self._restricted_api() 

1082 feature_set = self._feature_set 

1083 existing = feature_set.auto_detectable_build_systems.get(rule_type) 

1084 if existing is not None: 1084 ↛ 1085line 1084 didn't jump to line 1085 because the condition on line 1084 was never true

1085 bs_name = rule_type.__class__.__name__ 

1086 if existing.plugin_metadata.plugin_name == self._plugin_name: 

1087 message = ( 

1088 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1089 f' auto-detection of the build system "{bs_name}" twice.' 

1090 ) 

1091 else: 

1092 message = ( 

1093 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}" 

1094 f' both tried to provide auto-detection of the build system "{bs_name}"' 

1095 ) 

1096 raise PluginConflictError( 

1097 message, existing.plugin_metadata, self._plugin_metadata 

1098 ) 

1099 

1100 if constructor is None: 1100 ↛ 1102line 1100 didn't jump to line 1102 because the condition on line 1100 was never true

1101 

1102 def impl( 

1103 attributes: BuildRuleParsedFormat, 

1104 attribute_path: AttributePath, 

1105 manifest: "HighLevelManifest", 

1106 ) -> BSR: 

1107 return rule_type(attributes, attribute_path, manifest) 

1108 

1109 else: 

1110 impl = constructor 

1111 

1112 feature_set.auto_detectable_build_systems[rule_type] = ( 

1113 PluginProvidedBuildSystemAutoDetection( 

1114 manifest_keyword, 

1115 rule_type, 

1116 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system), 

1117 impl, 

1118 shadowing_build_systems_when_active, 

1119 self._plugin_metadata, 

1120 ) 

1121 ) 

1122 

1123 def _unload() -> None: 

1124 try: 

1125 del feature_set.auto_detectable_build_systems[rule_type] 

1126 except KeyError: 

1127 pass 

1128 

1129 self._unloaders.append(_unload) 

1130 

1131 def known_packaging_files( 

1132 self, 

1133 packaging_file_details: KnownPackagingFileInfo, 

1134 ) -> None: 

1135 known_packaging_files = self._feature_set.known_packaging_files 

1136 detection_method = packaging_file_details.get( 

1137 "detection_method", cast("Literal['path']", "path") 

1138 ) 

1139 path = packaging_file_details.get("path") 

1140 dhpkgfile = packaging_file_details.get("pkgfile") 

1141 

1142 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy() 

1143 

1144 if detection_method == "path": 1144 ↛ 1156line 1144 didn't jump to line 1156 because the condition on line 1144 was always true

1145 if dhpkgfile is not None: 1145 ↛ 1146line 1145 didn't jump to line 1146 because the condition on line 1145 was never true

1146 raise ValueError( 

1147 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)' 

1148 ) 

1149 if path != _normalize_path(path, with_prefix=False): 1149 ↛ 1150line 1149 didn't jump to line 1150 because the condition on line 1149 was never true

1150 raise ValueError( 

1151 f"The path for known packaging files must be normalized. Please replace" 

1152 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"' 

1153 ) 

1154 detection_value = path 

1155 else: 

1156 assert detection_method == "dh.pkgfile" 

1157 if path is not None: 

1158 raise ValueError( 

1159 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"' 

1160 ) 

1161 if "/" in dhpkgfile: 

1162 raise ValueError( 

1163 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)' 

1164 ) 

1165 detection_value = dhpkgfile 

1166 key = f"{detection_method}::{detection_value}" 

1167 existing = known_packaging_files.get(key) 

1168 if existing is not None: 1168 ↛ 1169line 1168 didn't jump to line 1169 because the condition on line 1168 was never true

1169 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1170 message = ( 

1171 f'The key "{key}" is registered twice for known packaging files.' 

1172 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1173 ) 

1174 else: 

1175 message = ( 

1176 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1177 f' key "{key}" twice for known packaging files.' 

1178 ) 

1179 raise PluginConflictError( 

1180 message, existing.plugin_metadata, self._plugin_metadata 

1181 ) 

1182 _validate_known_packaging_file_dh_compat_rules( 

1183 packaging_file_details.get("dh_compat_rules") 

1184 ) 

1185 known_packaging_files[key] = PluginProvidedKnownPackagingFile( 

1186 packaging_file_details, 

1187 detection_method, 

1188 detection_value, 

1189 self._plugin_metadata, 

1190 ) 

1191 

1192 def _unload() -> None: 

1193 del known_packaging_files[key] 

1194 

1195 self._unloaders.append(_unload) 

1196 

1197 def register_mapped_type( 

1198 self, 

1199 type_mapping: TypeMapping, 

1200 *, 

1201 reference_documentation: Optional[TypeMappingDocumentation] = None, 

1202 ) -> None: 

1203 self._restricted_api() 

1204 target_type = type_mapping.target_type 

1205 mapped_types = self._feature_set.mapped_types 

1206 existing = mapped_types.get(target_type) 

1207 if existing is not None: 1207 ↛ 1208line 1207 didn't jump to line 1208 because the condition on line 1207 was never true

1208 if existing.plugin_metadata.plugin_name != self._plugin_name: 

1209 message = ( 

1210 f'The key "{target_type.__name__}" is registered twice for known packaging files.' 

1211 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}" 

1212 ) 

1213 else: 

1214 message = ( 

1215 f"Bug in the plugin {self._plugin_name}: It tried to register the" 

1216 f' key "{target_type.__name__}" twice for known packaging files.' 

1217 ) 

1218 raise PluginConflictError( 

1219 message, existing.plugin_metadata, self._plugin_metadata 

1220 ) 

1221 parser_generator = self._feature_set.manifest_parser_generator 

1222 # TODO: Wrap the mapper in the plugin context 

1223 mapped_types[target_type] = PluginProvidedTypeMapping( 

1224 type_mapping, reference_documentation, self._plugin_metadata 

1225 ) 

1226 parser_generator.register_mapped_type(type_mapping) 

1227 

1228 def _restricted_api( 

1229 self, 

1230 *, 

1231 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(), 

1232 ) -> None: 

1233 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1233 ↛ 1234line 1233 didn't jump to line 1234 because the condition on line 1233 was never true

1234 raise PluginAPIViolationError( 

1235 f"Plugin {self._plugin_name} attempted to access a debputy-only API." 

1236 " If you are the maintainer of this plugin and want access to this" 

1237 " API, please file a feature request to make this public." 

1238 " (The API is currently private as it is unstable.)" 

1239 ) 

1240 

1241 

1242class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC): 

1243 __slots__ = () 

1244 

1245 def _append_script( 

1246 self, 

1247 caller_name: str, 

1248 maintscript: Maintscript, 

1249 full_script: str, 

1250 /, 

1251 perform_substitution: bool = True, 

1252 ) -> None: 

1253 raise NotImplementedError 

1254 

1255 @classmethod 

1256 def _apply_condition_to_script( 

1257 cls, 

1258 condition: str, 

1259 run_snippet: str, 

1260 /, 

1261 indent: Optional[bool] = None, 

1262 ) -> str: 

1263 if indent is None: 

1264 # We auto-determine this based on heredocs currently 

1265 indent = "<<" not in run_snippet 

1266 

1267 if indent: 

1268 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True)) 

1269 if not run_snippet.endswith("\n"): 

1270 run_snippet += "\n" 

1271 condition_line = f"if {condition}; then\n" 

1272 end_line = "fi\n" 

1273 return "".join((condition_line, run_snippet, end_line)) 

1274 

1275 def on_configure( 

1276 self, 

1277 run_snippet: str, 

1278 /, 

1279 indent: Optional[bool] = None, 

1280 perform_substitution: bool = True, 

1281 skip_on_rollback: bool = False, 

1282 ) -> None: 

1283 condition = POSTINST_DEFAULT_CONDITION 

1284 if skip_on_rollback: 1284 ↛ 1285line 1284 didn't jump to line 1285 because the condition on line 1284 was never true

1285 condition = '[ "$1" = "configure" ]' 

1286 return self._append_script( 

1287 "on_configure", 

1288 "postinst", 

1289 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1290 perform_substitution=perform_substitution, 

1291 ) 

1292 

1293 def on_initial_install( 

1294 self, 

1295 run_snippet: str, 

1296 /, 

1297 indent: Optional[bool] = None, 

1298 perform_substitution: bool = True, 

1299 ) -> None: 

1300 condition = '[ "$1" = "configure" -a -z "$2" ]' 

1301 return self._append_script( 

1302 "on_initial_install", 

1303 "postinst", 

1304 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1305 perform_substitution=perform_substitution, 

1306 ) 

1307 

1308 def on_upgrade( 

1309 self, 

1310 run_snippet: str, 

1311 /, 

1312 indent: Optional[bool] = None, 

1313 perform_substitution: bool = True, 

1314 ) -> None: 

1315 condition = '[ "$1" = "configure" -a -n "$2" ]' 

1316 return self._append_script( 

1317 "on_upgrade", 

1318 "postinst", 

1319 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1320 perform_substitution=perform_substitution, 

1321 ) 

1322 

1323 def on_upgrade_from( 

1324 self, 

1325 version: str, 

1326 run_snippet: str, 

1327 /, 

1328 indent: Optional[bool] = None, 

1329 perform_substitution: bool = True, 

1330 ) -> None: 

1331 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"' 

1332 return self._append_script( 

1333 "on_upgrade_from", 

1334 "postinst", 

1335 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1336 perform_substitution=perform_substitution, 

1337 ) 

1338 

1339 def on_before_removal( 

1340 self, 

1341 run_snippet: str, 

1342 /, 

1343 indent: Optional[bool] = None, 

1344 perform_substitution: bool = True, 

1345 ) -> None: 

1346 condition = '[ "$1" = "remove" ]' 

1347 return self._append_script( 

1348 "on_before_removal", 

1349 "prerm", 

1350 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1351 perform_substitution=perform_substitution, 

1352 ) 

1353 

1354 def on_removed( 

1355 self, 

1356 run_snippet: str, 

1357 /, 

1358 indent: Optional[bool] = None, 

1359 perform_substitution: bool = True, 

1360 ) -> None: 

1361 condition = '[ "$1" = "remove" ]' 

1362 return self._append_script( 

1363 "on_removed", 

1364 "postrm", 

1365 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1366 perform_substitution=perform_substitution, 

1367 ) 

1368 

1369 def on_purge( 

1370 self, 

1371 run_snippet: str, 

1372 /, 

1373 indent: Optional[bool] = None, 

1374 perform_substitution: bool = True, 

1375 ) -> None: 

1376 condition = '[ "$1" = "purge" ]' 

1377 return self._append_script( 

1378 "on_purge", 

1379 "postrm", 

1380 self._apply_condition_to_script(condition, run_snippet, indent=indent), 

1381 perform_substitution=perform_substitution, 

1382 ) 

1383 

1384 def unconditionally_in_script( 

1385 self, 

1386 maintscript: Maintscript, 

1387 run_snippet: str, 

1388 /, 

1389 perform_substitution: bool = True, 

1390 ) -> None: 

1391 if maintscript not in STD_CONTROL_SCRIPTS: 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true

1392 raise ValueError( 

1393 f'Unknown script "{maintscript}". Should have been one of:' 

1394 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}' 

1395 ) 

1396 return self._append_script( 

1397 "unconditionally_in_script", 

1398 maintscript, 

1399 run_snippet, 

1400 perform_substitution=perform_substitution, 

1401 ) 

1402 

1403 

1404class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase): 

1405 __slots__ = ( 

1406 "_plugin_metadata", 

1407 "_maintscript_snippets", 

1408 "_plugin_source_id", 

1409 "_package_substitution", 

1410 "_default_snippet_order", 

1411 ) 

1412 

1413 def __init__( 

1414 self, 

1415 plugin_metadata: DebputyPluginMetadata, 

1416 plugin_source_id: str, 

1417 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1418 package_substitution: Substitution, 

1419 *, 

1420 default_snippet_order: Optional[Literal["service"]] = None, 

1421 ): 

1422 self._plugin_metadata = plugin_metadata 

1423 self._plugin_source_id = plugin_source_id 

1424 self._maintscript_snippets = maintscript_snippets 

1425 self._package_substitution = package_substitution 

1426 self._default_snippet_order = default_snippet_order 

1427 

1428 def _append_script( 

1429 self, 

1430 caller_name: str, 

1431 maintscript: Maintscript, 

1432 full_script: str, 

1433 /, 

1434 perform_substitution: bool = True, 

1435 ) -> None: 

1436 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})" 

1437 if perform_substitution: 

1438 full_script = self._package_substitution.substitute(full_script, def_source) 

1439 

1440 snippet = MaintscriptSnippet( 

1441 snippet=full_script, 

1442 definition_source=def_source, 

1443 snippet_order=self._default_snippet_order, 

1444 ) 

1445 self._maintscript_snippets[maintscript].append(snippet) 

1446 

1447 

1448class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor): 

1449 __slots__ = ( 

1450 "_plugin_metadata", 

1451 "_plugin_source_id", 

1452 "_package_metadata_context", 

1453 "_triggers", 

1454 "_substvars", 

1455 "_maintscript", 

1456 "_shlibs_details", 

1457 ) 

1458 

1459 def __init__( 

1460 self, 

1461 plugin_metadata: DebputyPluginMetadata, 

1462 plugin_source_id: str, 

1463 package_metadata_context: PackageProcessingContext, 

1464 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1465 substvars: FlushableSubstvars, 

1466 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

1467 ) -> None: 

1468 self._plugin_metadata = plugin_metadata 

1469 self._plugin_source_id = plugin_source_id 

1470 self._package_metadata_context = package_metadata_context 

1471 self._triggers = triggers 

1472 self._substvars = substvars 

1473 self._maintscript: Optional[MaintscriptAccessor] = None 

1474 self._shlibs_details = shlibs_details 

1475 

1476 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1477 raise NotImplementedError 

1478 

1479 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None: 

1480 """Register a declarative dpkg level trigger 

1481 

1482 The provided trigger will be added to the package's metadata (the triggers file of the control.tar). 

1483 

1484 If the trigger has already been added previously, a second call with the same trigger data will be ignored. 

1485 """ 

1486 key = (trigger_type, trigger_target) 

1487 if key in self._triggers: 1487 ↛ 1488line 1487 didn't jump to line 1488 because the condition on line 1487 was never true

1488 return 

1489 self._triggers[key] = PluginProvidedTrigger( 

1490 dpkg_trigger_type=trigger_type, 

1491 dpkg_trigger_target=trigger_target, 

1492 provider=self._plugin_metadata, 

1493 provider_source_id=self._plugin_source_id, 

1494 ) 

1495 

1496 @property 

1497 def maintscript(self) -> MaintscriptAccessor: 

1498 maintscript = self._maintscript 

1499 if maintscript is None: 

1500 maintscript = self._create_maintscript_accessor() 

1501 self._maintscript = maintscript 

1502 return maintscript 

1503 

1504 @property 

1505 def substvars(self) -> FlushableSubstvars: 

1506 return self._substvars 

1507 

1508 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None: 

1509 binary_package = self._package_metadata_context.binary_package 

1510 with self.substvars.flush() as substvars_file: 

1511 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"] 

1512 if binary_package.is_udeb: 

1513 dpkg_cmd.append("-tudeb") 

1514 if binary_package.is_essential: 1514 ↛ 1515line 1514 didn't jump to line 1515 because the condition on line 1514 was never true

1515 dpkg_cmd.append("-dPre-Depends") 

1516 shlibs_local, shlib_dirs = self._shlibs_details 

1517 if shlibs_local is not None: 1517 ↛ 1518line 1517 didn't jump to line 1518 because the condition on line 1517 was never true

1518 dpkg_cmd.append(f"-L{shlibs_local}") 

1519 if shlib_dirs: 1519 ↛ 1520line 1519 didn't jump to line 1520 because the condition on line 1519 was never true

1520 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs) 

1521 dpkg_cmd.extend(p.fs_path for p in paths) 

1522 print_command(*dpkg_cmd) 

1523 try: 

1524 subprocess.check_call(dpkg_cmd) 

1525 except subprocess.CalledProcessError: 

1526 _error( 

1527 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please" 

1528 " review the output from dpkg-shlibdeps above to understand what went wrong." 

1529 ) 

1530 

1531 

1532class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase): 

1533 __slots__ = ( 

1534 "_maintscript", 

1535 "_maintscript_snippets", 

1536 "_package_substitution", 

1537 ) 

1538 

1539 def __init__( 

1540 self, 

1541 plugin_metadata: DebputyPluginMetadata, 

1542 plugin_source_id: str, 

1543 package_metadata_context: PackageProcessingContext, 

1544 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger], 

1545 substvars: FlushableSubstvars, 

1546 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1547 package_substitution: Substitution, 

1548 shlibs_details: Tuple[Optional[str], Optional[List[str]]], 

1549 *, 

1550 default_snippet_order: Optional[Literal["service"]] = None, 

1551 ) -> None: 

1552 super().__init__( 

1553 plugin_metadata, 

1554 plugin_source_id, 

1555 package_metadata_context, 

1556 triggers, 

1557 substvars, 

1558 shlibs_details, 

1559 ) 

1560 self._maintscript_snippets = maintscript_snippets 

1561 self._package_substitution = package_substitution 

1562 self._maintscript = MaintscriptAccessorProvider( 

1563 plugin_metadata, 

1564 plugin_source_id, 

1565 maintscript_snippets, 

1566 package_substitution, 

1567 default_snippet_order=default_snippet_order, 

1568 ) 

1569 

1570 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

1571 return MaintscriptAccessorProvider( 

1572 self._plugin_metadata, 

1573 self._plugin_source_id, 

1574 self._maintscript_snippets, 

1575 self._package_substitution, 

1576 ) 

1577 

1578 

1579class BinaryCtrlAccessorProviderCreator: 

1580 def __init__( 

1581 self, 

1582 package_metadata_context: PackageProcessingContext, 

1583 substvars: FlushableSubstvars, 

1584 maintscript_snippets: Dict[str, MaintscriptSnippetContainer], 

1585 substitution: Substitution, 

1586 ) -> None: 

1587 self._package_metadata_context = package_metadata_context 

1588 self._substvars = substvars 

1589 self._maintscript_snippets = maintscript_snippets 

1590 self._substitution = substitution 

1591 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

1592 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None 

1593 

1594 def for_plugin( 

1595 self, 

1596 plugin_metadata: DebputyPluginMetadata, 

1597 plugin_source_id: str, 

1598 *, 

1599 default_snippet_order: Optional[Literal["service"]] = None, 

1600 ) -> BinaryCtrlAccessor: 

1601 return BinaryCtrlAccessorProvider( 

1602 plugin_metadata, 

1603 plugin_source_id, 

1604 self._package_metadata_context, 

1605 self._triggers, 

1606 self._substvars, 

1607 self._maintscript_snippets, 

1608 self._substitution, 

1609 self.shlibs_details, 

1610 default_snippet_order=default_snippet_order, 

1611 ) 

1612 

1613 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]: 

1614 return self._triggers.values() 

1615 

1616 

1617def _resolve_bundled_plugin_docs_path( 

1618 plugin_name: str, 

1619 loader: Optional[PluginInitializationEntryPoint], 

1620) -> Optional[Union[Traversable, Path]]: 

1621 plugin_module = getattr(loader, "__module__") 

1622 assert plugin_module is not None 

1623 plugin_package_name = sys.modules[plugin_module].__package__ 

1624 return importlib.resources.files(plugin_package_name).joinpath( 

1625 f"{plugin_name}_docs.yaml" 

1626 ) 

1627 

1628 

1629def plugin_metadata_for_debputys_own_plugin( 

1630 loader: Optional[PluginInitializationEntryPoint] = None, 

1631) -> DebputyPluginMetadata: 

1632 if loader is None: 

1633 from debputy.plugins.debputy.debputy_plugin import ( 

1634 initialize_debputy_features, 

1635 ) 

1636 

1637 loader = initialize_debputy_features 

1638 plugin_name = "debputy" 

1639 return DebputyPluginMetadata( 

1640 plugin_name="debputy", 

1641 api_compat_version=1, 

1642 plugin_initializer=loader, 

1643 plugin_loader=None, 

1644 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path( 

1645 plugin_name, 

1646 loader, 

1647 ), 

1648 plugin_path="<bundled>", 

1649 ) 

1650 

1651 

1652def load_plugin_features( 

1653 plugin_search_dirs: Sequence[str], 

1654 substitution: Substitution, 

1655 requested_plugins_only: Optional[Sequence[str]] = None, 

1656 required_plugins: Optional[Set[str]] = None, 

1657 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None, 

1658 debug_mode: bool = False, 

1659) -> PluginProvidedFeatureSet: 

1660 if plugin_feature_set is None: 

1661 plugin_feature_set = PluginProvidedFeatureSet() 

1662 plugins = [plugin_metadata_for_debputys_own_plugin()] 

1663 unloadable_plugins = set() 

1664 if required_plugins: 

1665 plugins.extend( 

1666 find_json_plugins( 

1667 plugin_search_dirs, 

1668 required_plugins, 

1669 ) 

1670 ) 

1671 if requested_plugins_only is not None: 

1672 plugins.extend( 

1673 find_json_plugins( 

1674 plugin_search_dirs, 

1675 requested_plugins_only, 

1676 ) 

1677 ) 

1678 else: 

1679 auto_loaded = _find_all_json_plugins( 

1680 plugin_search_dirs, 

1681 required_plugins if required_plugins is not None else frozenset(), 

1682 debug_mode=debug_mode, 

1683 ) 

1684 for plugin_metadata in auto_loaded: 

1685 plugins.append(plugin_metadata) 

1686 unloadable_plugins.add(plugin_metadata.plugin_name) 

1687 

1688 for plugin_metadata in plugins: 

1689 api = DebputyPluginInitializerProvider( 

1690 plugin_metadata, plugin_feature_set, substitution 

1691 ) 

1692 try: 

1693 api.load_plugin() 

1694 except PluginBaseError as e: 

1695 if plugin_metadata.plugin_name not in unloadable_plugins: 

1696 raise 

1697 if debug_mode: 

1698 _warn( 

1699 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due" 

1700 f" to --debug/-d or DEBPUTY_DEBUG=1" 

1701 ) 

1702 raise 

1703 try: 

1704 api.unload_plugin() 

1705 except Exception: 

1706 _warn( 

1707 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to" 

1708 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded" 

1709 " module might have tainted the feature set." 

1710 ) 

1711 raise e from None 

1712 else: 

1713 _warn( 

1714 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was" 

1715 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace" 

1716 f" (the warning will become an error)" 

1717 ) 

1718 

1719 return plugin_feature_set 

1720 

1721 

1722def find_json_plugin( 

1723 search_dirs: Sequence[str], 

1724 requested_plugin: str, 

1725) -> DebputyPluginMetadata: 

1726 r = list(find_json_plugins(search_dirs, [requested_plugin])) 

1727 assert len(r) == 1 

1728 return r[0] 

1729 

1730 

1731def find_related_implementation_files_for_plugin( 

1732 plugin_metadata: DebputyPluginMetadata, 

1733) -> List[str]: 

1734 if plugin_metadata.is_bundled: 

1735 plugin_name = plugin_metadata.plugin_name 

1736 _error( 

1737 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled" 

1738 " or loaded via a mechanism that does not support detecting its tests." 

1739 ) 

1740 

1741 if plugin_metadata.is_from_python_path: 

1742 plugin_name = plugin_metadata.plugin_name 

1743 # Maybe they could be, but that is for another day. 

1744 _error( 

1745 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1746 " and these are not supported." 

1747 ) 

1748 files = [] 

1749 module_name, module_file = _find_plugin_implementation_file( 

1750 plugin_metadata.plugin_name, 

1751 plugin_metadata.plugin_path, 

1752 ) 

1753 if os.path.isfile(module_file): 

1754 files.append(module_file) 

1755 else: 

1756 if not plugin_metadata.is_loaded: 

1757 plugin_metadata.load_plugin() 

1758 if module_name in sys.modules: 

1759 _error( 

1760 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its' 

1761 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be " 

1762 f" installed via this method. The related Python would not be installed" 

1763 f" (which would result in a plugin that would fail to load)" 

1764 ) 

1765 

1766 return files 

1767 

1768 

1769def find_tests_for_plugin( 

1770 plugin_metadata: DebputyPluginMetadata, 

1771) -> List[str]: 

1772 plugin_name = plugin_metadata.plugin_name 

1773 plugin_path = plugin_metadata.plugin_path 

1774 

1775 if plugin_metadata.is_bundled: 

1776 _error( 

1777 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a" 

1778 " mechanism that does not support detecting its tests." 

1779 ) 

1780 

1781 if plugin_metadata.is_from_python_path: 

1782 plugin_name = plugin_metadata.plugin_name 

1783 # Maybe they could be, but that is for another day. 

1784 _error( 

1785 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path" 

1786 " and these are not supported." 

1787 ) 

1788 

1789 plugin_dir = os.path.dirname(plugin_path) 

1790 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_") 

1791 tests = [] 

1792 with os.scandir(plugin_dir) as dir_iter: 

1793 for p in dir_iter: 

1794 if ( 

1795 p.is_file() 

1796 and p.name.startswith(test_basename_prefix) 

1797 and PLUGIN_TEST_SUFFIX.search(p.name) 

1798 ): 

1799 tests.append(p.path) 

1800 return tests 

1801 

1802 

1803def find_json_plugins( 

1804 search_dirs: Sequence[str], 

1805 requested_plugins: Iterable[str], 

1806) -> Iterable[DebputyPluginMetadata]: 

1807 for plugin_name_or_path in requested_plugins: 1807 ↛ exitline 1807 didn't return from function 'find_json_plugins' because the loop on line 1807 didn't complete

1808 if "/" in plugin_name_or_path: 1808 ↛ 1809line 1808 didn't jump to line 1809 because the condition on line 1808 was never true

1809 if not os.path.isfile(plugin_name_or_path): 

1810 raise PluginNotFoundError( 

1811 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file." 

1812 ' (Because the plugin name contains "/", it is assumed to be a path and search path' 

1813 " is not used." 

1814 ) 

1815 yield parse_json_plugin_desc(plugin_name_or_path) 

1816 return 

1817 for search_dir in search_dirs: 1817 ↛ 1826line 1817 didn't jump to line 1826 because the loop on line 1817 didn't complete

1818 path = os.path.join( 

1819 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json" 

1820 ) 

1821 if not os.path.isfile(path): 1821 ↛ 1822line 1821 didn't jump to line 1822 because the condition on line 1821 was never true

1822 continue 

1823 yield parse_json_plugin_desc(path) 

1824 return 

1825 

1826 path_root = PLUGIN_PYTHON_RES_PATH 

1827 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json") 

1828 if pp_path or pp_path.is_file(): 

1829 with pp_path.open() as fd: 

1830 yield parse_json_plugin_desc( 

1831 f"PYTHONPATH:debputy/plugins/{pp_path.name}", 

1832 fd=fd, 

1833 is_from_python_path=True, 

1834 ) 

1835 return 

1836 

1837 search_dir_str = ":".join(search_dirs) 

1838 raise PluginNotFoundError( 

1839 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the" 

1840 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})" 

1841 ) 

1842 

1843 

1844def _find_all_json_plugins( 

1845 search_dirs: Sequence[str], 

1846 required_plugins: AbstractSet[str], 

1847 debug_mode: bool = False, 

1848) -> Iterable[DebputyPluginMetadata]: 

1849 seen = set(required_plugins) 

1850 error_seen = False 

1851 for search_dir in search_dirs: 

1852 try: 

1853 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins")) 

1854 except FileNotFoundError: 

1855 continue 

1856 with dir_fd: 

1857 for entry in dir_fd: 

1858 if ( 

1859 not entry.is_file(follow_symlinks=True) 

1860 or not entry.name.endswith(".json") 

1861 or entry.name in seen 

1862 ): 

1863 continue 

1864 seen.add(entry.name) 

1865 try: 

1866 plugin_metadata = parse_json_plugin_desc(entry.path) 

1867 except PluginBaseError as e: 

1868 if debug_mode: 

1869 raise 

1870 if not error_seen: 

1871 error_seen = True 

1872 _warn( 

1873 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}" 

1874 ) 

1875 else: 

1876 _warn( 

1877 f"Failed to load plugin in {entry.path} due to errors (not shown)." 

1878 ) 

1879 else: 

1880 yield plugin_metadata 

1881 

1882 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir(): 

1883 if ( 

1884 not pp_entry.name.endswith(".json") 

1885 or not pp_entry.is_file() 

1886 or pp_entry.name in seen 

1887 ): 

1888 continue 

1889 seen.add(pp_entry.name) 

1890 with pp_entry.open() as fd: 

1891 yield parse_json_plugin_desc( 

1892 f"PYTHONPATH:debputy/plugins/{pp_entry.name}", 

1893 fd=fd, 

1894 is_from_python_path=True, 

1895 ) 

1896 

1897 

1898def _find_plugin_implementation_file( 

1899 plugin_name: str, 

1900 json_file_path: str, 

1901) -> Tuple[str, str]: 

1902 guessed_module_basename = plugin_name.replace("-", "_") 

1903 module_name = f"debputy.plugins.{guessed_module_basename}" 

1904 module_fs_path = os.path.join( 

1905 os.path.dirname(json_file_path), f"{guessed_module_basename}.py" 

1906 ) 

1907 return module_name, module_fs_path 

1908 

1909 

1910def _resolve_module_initializer( 

1911 plugin_name: str, 

1912 plugin_initializer_name: str, 

1913 module_name: Optional[str], 

1914 json_file_path: str, 

1915) -> PluginInitializationEntryPoint: 

1916 module = None 

1917 module_fs_path = None 

1918 if module_name is None: 1918 ↛ 1946line 1918 didn't jump to line 1946 because the condition on line 1918 was always true

1919 module_name, module_fs_path = _find_plugin_implementation_file( 

1920 plugin_name, json_file_path 

1921 ) 

1922 if os.path.isfile(module_fs_path): 1922 ↛ 1946line 1922 didn't jump to line 1946 because the condition on line 1922 was always true

1923 spec = importlib.util.spec_from_file_location(module_name, module_fs_path) 

1924 if spec is None: 1924 ↛ 1925line 1924 didn't jump to line 1925 because the condition on line 1924 was never true

1925 raise PluginInitializationError( 

1926 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1927 " The spec_from_file_location function returned None." 

1928 ) 

1929 mod = importlib.util.module_from_spec(spec) 

1930 loader = spec.loader 

1931 if loader is None: 1931 ↛ 1932line 1931 didn't jump to line 1932 because the condition on line 1931 was never true

1932 raise PluginInitializationError( 

1933 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1934 " Python could not find a suitable loader (spec.loader was None)" 

1935 ) 

1936 sys.modules[module_name] = mod 

1937 try: 

1938 run_in_context_of_plugin(plugin_name, loader.exec_module, mod) 

1939 except (Exception, GeneratorExit) as e: 

1940 raise PluginInitializationError( 

1941 f"Failed to load {plugin_name} (path: {module_fs_path})." 

1942 " The module threw an exception while being loaded." 

1943 ) from e 

1944 module = mod 

1945 

1946 if module is None: 1946 ↛ 1947line 1946 didn't jump to line 1947 because the condition on line 1946 was never true

1947 try: 

1948 module = run_in_context_of_plugin( 

1949 plugin_name, importlib.import_module, module_name 

1950 ) 

1951 except ModuleNotFoundError as e: 

1952 if module_fs_path is None: 

1953 raise PluginMetadataError( 

1954 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but' 

1955 " this module is not available in the python search path" 

1956 ) from e 

1957 raise PluginInitializationError( 

1958 f"Failed to load {plugin_name}. Tried loading it from" 

1959 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as' 

1960 f" {module_name} (where it was not found either). Please ensure" 

1961 " the module code is installed in the correct spot or provide an" 

1962 f' explicit "module" definition in {json_file_path}.' 

1963 ) from e 

1964 

1965 plugin_initializer = run_in_context_of_plugin_wrap_errors( 

1966 plugin_name, 

1967 getattr, 

1968 module, 

1969 plugin_initializer_name, 

1970 None, 

1971 ) 

1972 

1973 if plugin_initializer is None: 1973 ↛ 1974line 1973 didn't jump to line 1974 because the condition on line 1973 was never true

1974 raise PluginMetadataError( 

1975 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

1976 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute' 

1977 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name" 

1978 " in the Python module." 

1979 ) 

1980 if isinstance(plugin_initializer, DebputyPluginDefinition): 

1981 return plugin_initializer.initialize 

1982 if not callable(plugin_initializer): 1982 ↛ 1983line 1982 didn't jump to line 1983 because the condition on line 1982 was never true

1983 raise PluginMetadataError( 

1984 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an' 

1985 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that' 

1986 " attribute exists, it is neither a `DebputyPluginDefinition`" 

1987 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`" 

1988 " (`def initialize(api: DebputyPluginInitializer) -> None:`)." 

1989 ) 

1990 return cast("PluginInitializationEntryPoint", plugin_initializer) 

1991 

1992 

1993def _json_plugin_loader( 

1994 plugin_name: str, 

1995 plugin_json_metadata: PluginJsonMetadata, 

1996 json_file_path: str, 

1997 attribute_path: AttributePath, 

1998) -> Callable[["DebputyPluginInitializer"], None]: 

1999 api_compat = plugin_json_metadata["api_compat_version"] 

2000 module_name = plugin_json_metadata.get("module") 

2001 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer") 

2002 packager_provided_files_raw = plugin_json_metadata.get( 

2003 "packager_provided_files", [] 

2004 ) 

2005 manifest_variables_raw = plugin_json_metadata.get("manifest_variables") 

2006 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files") 

2007 if api_compat != 1: 2007 ↛ 2008line 2007 didn't jump to line 2008 because the condition on line 2007 was never true

2008 raise PluginMetadataError( 

2009 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this' 

2010 f" version of debputy only supports API compat version of 1" 

2011 ) 

2012 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2012 ↛ 2013line 2012 didn't jump to line 2013 because the condition on line 2012 was never true

2013 p = attribute_path["plugin_initializer"] 

2014 raise PluginMetadataError( 

2015 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".' 

2016 ) 

2017 

2018 plugin_initializers = [] 

2019 

2020 if plugin_initializer_name is not None: 

2021 plugin_initializer = _resolve_module_initializer( 

2022 plugin_name, 

2023 plugin_initializer_name, 

2024 module_name, 

2025 json_file_path, 

2026 ) 

2027 plugin_initializers.append(plugin_initializer) 

2028 

2029 if known_packaging_files_raw: 

2030 kpf_root_path = attribute_path["known_packaging_files"] 

2031 known_packaging_files = [] 

2032 for k, v in enumerate(known_packaging_files_raw): 

2033 kpf_path = kpf_root_path[k] 

2034 p = v.get("path") 

2035 if isinstance(p, str): 2035 ↛ 2037line 2035 didn't jump to line 2037 because the condition on line 2035 was always true

2036 kpf_path.path_hint = p 

2037 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2037 ↛ 2049line 2037 didn't jump to line 2049 because the condition on line 2037 was always true

2038 docs = v.get("documentation-uris") 

2039 if docs is not None and isinstance(docs, list): 

2040 docs = [ 

2041 ( 

2042 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR) 

2043 if isinstance(d, str) 

2044 else d 

2045 ) 

2046 for d in docs 

2047 ] 

2048 v["documentation-uris"] = docs 

2049 known_packaging_file: KnownPackagingFileInfo = ( 

2050 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input( 

2051 v, 

2052 kpf_path, 

2053 ) 

2054 ) 

2055 known_packaging_files.append((kpf_path, known_packaging_file)) 

2056 

2057 def _initialize_json_provided_known_packaging_files( 

2058 api: DebputyPluginInitializerProvider, 

2059 ) -> None: 

2060 for p, details in known_packaging_files: 

2061 try: 

2062 api.known_packaging_files(details) 

2063 except ValueError as ex: 

2064 raise PluginMetadataError( 

2065 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}" 

2066 ) 

2067 

2068 plugin_initializers.append(_initialize_json_provided_known_packaging_files) 

2069 

2070 if manifest_variables_raw: 

2071 manifest_var_path = attribute_path["manifest_variables"] 

2072 manifest_variables = [ 

2073 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i]) 

2074 for i, p in enumerate(manifest_variables_raw) 

2075 ] 

2076 

2077 def _initialize_json_provided_manifest_vars( 

2078 api: DebputyPluginInitializer, 

2079 ) -> None: 

2080 for idx, manifest_variable in enumerate(manifest_variables): 

2081 name = manifest_variable["name"] 

2082 value = manifest_variable["value"] 

2083 doc = manifest_variable.get("reference_documentation") 

2084 try: 

2085 api.manifest_variable( 

2086 name, value, variable_reference_documentation=doc 

2087 ) 

2088 except ValueError as ex: 

2089 var_path = manifest_var_path[idx] 

2090 raise PluginMetadataError( 

2091 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}" 

2092 ) 

2093 

2094 plugin_initializers.append(_initialize_json_provided_manifest_vars) 

2095 

2096 if packager_provided_files_raw: 

2097 ppf_path = attribute_path["packager_provided_files"] 

2098 ppfs = [ 

2099 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i]) 

2100 for i, p in enumerate(packager_provided_files_raw) 

2101 ] 

2102 

2103 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None: 

2104 ppf: PackagerProvidedFileJsonDescription 

2105 for idx, ppf in enumerate(ppfs): 

2106 c = dict(ppf) 

2107 stem = ppf["stem"] 

2108 installed_path = ppf["installed_path"] 

2109 default_mode = ppf.get("default_mode") 

2110 ref_doc_dict = ppf.get("reference_documentation") 

2111 if default_mode is not None: 2111 ↛ 2114line 2111 didn't jump to line 2114 because the condition on line 2111 was always true

2112 c["default_mode"] = default_mode.octal_mode 

2113 

2114 if ref_doc_dict is not None: 2114 ↛ 2119line 2114 didn't jump to line 2119 because the condition on line 2114 was always true

2115 ref_doc = packager_provided_file_reference_documentation( 

2116 **ref_doc_dict 

2117 ) 

2118 else: 

2119 ref_doc = None 

2120 

2121 for k in [ 

2122 "stem", 

2123 "installed_path", 

2124 "reference_documentation", 

2125 ]: 

2126 try: 

2127 del c[k] 

2128 except KeyError: 

2129 pass 

2130 

2131 try: 

2132 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore 

2133 except ValueError as ex: 

2134 p_path = ppf_path[idx] 

2135 raise PluginMetadataError( 

2136 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}" 

2137 ) 

2138 

2139 plugin_initializers.append(_initialize_json_provided_ppfs) 

2140 

2141 if not plugin_initializers: 2141 ↛ 2142line 2141 didn't jump to line 2142 because the condition on line 2141 was never true

2142 raise PluginMetadataError( 

2143 f"The plugin defined in {json_file_path} does not seem to provide features," 

2144 f" such as module + plugin-initializer or packager-provided-files." 

2145 ) 

2146 

2147 if len(plugin_initializers) == 1: 

2148 return plugin_initializers[0] 

2149 

2150 def _chain_loader(api: DebputyPluginInitializer) -> None: 

2151 for initializer in plugin_initializers: 

2152 initializer(api) 

2153 

2154 return _chain_loader 

2155 

2156 

2157@contextlib.contextmanager 

2158def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]: 

2159 if fd is not None: 

2160 yield fd 

2161 else: 

2162 with open(path, "rb") as fd: 

2163 yield fd 

2164 

2165 

2166def _resolve_json_plugin_docs_path( 

2167 plugin_name: str, 

2168 plugin_path: str, 

2169) -> Optional[Union[Traversable, Path]]: 

2170 plugin_dir = os.path.dirname(plugin_path) 

2171 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml")) 

2172 

2173 

2174def parse_json_plugin_desc( 

2175 path: str, 

2176 *, 

2177 fd: Optional[IO[Union[bytes, str]]] = None, 

2178 is_from_python_path: bool = False, 

2179) -> DebputyPluginMetadata: 

2180 with _open(path, fd=fd) as rfd: 

2181 try: 

2182 raw = json.load(rfd) 

2183 except JSONDecodeError as e: 

2184 raise PluginMetadataError( 

2185 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}' 

2186 ) from e 

2187 plugin_name = os.path.basename(path) 

2188 if plugin_name.endswith(".json"): 

2189 plugin_name = plugin_name[:-5] 

2190 elif plugin_name.endswith(".json.in"): 

2191 plugin_name = plugin_name[:-8] 

2192 

2193 if plugin_name == "debputy": 2193 ↛ 2195line 2193 didn't jump to line 2195 because the condition on line 2193 was never true

2194 # Provide a better error message than "The plugin has already loaded!?" 

2195 raise PluginMetadataError( 

2196 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not' 

2197 f" clash with the bundled plugin of same name." 

2198 ) 

2199 

2200 attribute_path = AttributePath.root_path(raw) 

2201 

2202 try: 

2203 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input( 

2204 raw, 

2205 attribute_path, 

2206 ) 

2207 except ManifestParseException as e: 

2208 raise PluginMetadataError( 

2209 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}' 

2210 ) from e 

2211 api_compat = plugin_json_metadata["api_compat_version"] 

2212 

2213 return DebputyPluginMetadata( 

2214 plugin_name=plugin_name, 

2215 plugin_loader=lambda: _json_plugin_loader( 

2216 plugin_name, 

2217 plugin_json_metadata, 

2218 path, 

2219 attribute_path, 

2220 ), 

2221 api_compat_version=api_compat, 

2222 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path( 

2223 plugin_name, path 

2224 ), 

2225 plugin_initializer=None, 

2226 plugin_path=path, 

2227 is_from_python_path=is_from_python_path, 

2228 ) 

2229 

2230 

2231@dataclasses.dataclass(slots=True, frozen=True) 

2232class ServiceDefinitionImpl(ServiceDefinition[DSD]): 

2233 name: str 

2234 names: Sequence[str] 

2235 path: VirtualPath 

2236 type_of_service: str 

2237 service_scope: str 

2238 auto_enable_on_install: bool 

2239 auto_start_on_install: bool 

2240 on_upgrade: ServiceUpgradeRule 

2241 definition_source: str 

2242 is_plugin_provided_definition: bool 

2243 service_context: Optional[DSD] 

2244 

2245 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]": 

2246 return dataclasses.replace(self, **changes) 

2247 

2248 

2249class ServiceRegistryImpl(ServiceRegistry[DSD]): 

2250 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services") 

2251 

2252 def __init__(self, service_manager_details: ServiceManagerDetails) -> None: 

2253 self._service_manager_details = service_manager_details 

2254 self._service_definitions: List[ServiceDefinition[DSD]] = [] 

2255 self._seen_services = set() 

2256 

2257 @property 

2258 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]: 

2259 return self._service_definitions 

2260 

2261 def register_service( 

2262 self, 

2263 path: VirtualPath, 

2264 name: Union[str, List[str]], 

2265 *, 

2266 type_of_service: str = "service", # "timer", etc. 

2267 service_scope: str = "system", 

2268 enable_by_default: bool = True, 

2269 start_by_default: bool = True, 

2270 default_upgrade_rule: ServiceUpgradeRule = "restart", 

2271 service_context: Optional[DSD] = None, 

2272 ) -> None: 

2273 names = name if isinstance(name, list) else [name] 

2274 if len(names) < 1: 

2275 raise ValueError( 

2276 f"The service must have at least one name - {path.absolute} did not have any" 

2277 ) 

2278 for n in names: 

2279 key = (n, type_of_service, service_scope) 

2280 if key in self._seen_services: 

2281 raise PluginAPIViolationError( 

2282 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used" 

2283 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not" 

2284 " allowed by the debputy plugin API." 

2285 ) 

2286 # TODO: We cannot create a service definition immediate once the manifest is involved 

2287 self._service_definitions.append( 

2288 ServiceDefinitionImpl( 

2289 names[0], 

2290 names, 

2291 path, 

2292 type_of_service, 

2293 service_scope, 

2294 enable_by_default, 

2295 start_by_default, 

2296 default_upgrade_rule, 

2297 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}", 

2298 True, 

2299 service_context, 

2300 ) 

2301 )