Coverage for src/debputy/plugin/api/impl.py: 60%
910 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-04 10:15 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-04 10:15 +0000
1import contextlib
2import dataclasses
3import functools
4import importlib
5import importlib.resources
6import importlib.util
7import inspect
8import itertools
9import json
10import os
11import re
12import subprocess
13import sys
14from abc import ABC
15from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container
16from importlib.resources.abc import Traversable
17from io import IOBase
18from json import JSONDecodeError
19from pathlib import Path
20from types import NoneType
21from typing import (
22 IO,
23 AbstractSet,
24 cast,
25 Any,
26 Literal,
27 TYPE_CHECKING,
28 is_typeddict,
29 AnyStr,
30 overload,
31)
33import debputy
34from debputy import DEBPUTY_DOC_ROOT_DIR
35from debputy.exceptions import (
36 DebputySubstitutionError,
37 PluginConflictError,
38 PluginMetadataError,
39 PluginBaseError,
40 PluginInitializationError,
41 PluginAPIViolationError,
42 PluginNotFoundError,
43 PluginIncorrectRegistrationError,
44)
45from debputy.maintscript_snippet import (
46 STD_CONTROL_SCRIPTS,
47 MaintscriptSnippetContainer,
48 MaintscriptSnippet,
49)
50from debputy.manifest_parser.exceptions import ManifestParseException
51from debputy.manifest_parser.parser_data import ParserContextData
52from debputy.manifest_parser.tagging_types import TypeMapping
53from debputy.manifest_parser.util import AttributePath
54from debputy.manifest_parser.util import resolve_package_type_selectors
55from debputy.plugin.api.doc_parsing import (
56 DEBPUTY_DOC_REFERENCE_DATA_PARSER,
57 parser_type_name,
58 DebputyParsedDoc,
59)
60from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
61from debputy.plugin.api.impl_types import (
62 DebputyPluginMetadata,
63 PackagerProvidedFileClassSpec,
64 MetadataOrMaintscriptDetector,
65 PluginProvidedTrigger,
66 TTP,
67 DIPHandler,
68 PF,
69 SF,
70 DIPKWHandler,
71 PluginProvidedManifestVariable,
72 PluginProvidedPackageProcessor,
73 PluginProvidedDiscardRule,
74 AutomaticDiscardRuleExample,
75 PPFFormatParam,
76 ServiceManagerDetails,
77 KnownPackagingFileInfo,
78 PluginProvidedKnownPackagingFile,
79 DHCompatibilityBasedRule,
80 PluginProvidedTypeMapping,
81 PluginProvidedBuildSystemAutoDetection,
82 BSR,
83 TP,
84)
85from debputy.plugin.api.plugin_parser import (
86 PLUGIN_METADATA_PARSER,
87 PluginJsonMetadata,
88 PLUGIN_PPF_PARSER,
89 PackagerProvidedFileJsonDescription,
90 PLUGIN_MANIFEST_VARS_PARSER,
91 PLUGIN_KNOWN_PACKAGING_FILES_PARSER,
92)
93from debputy.plugin.api.spec import (
94 MaintscriptAccessor,
95 Maintscript,
96 DpkgTriggerType,
97 BinaryCtrlAccessor,
98 PackageProcessingContext,
99 MetadataAutoDetector,
100 PluginInitializationEntryPoint,
101 DebputyPluginInitializer,
102 PackageTypeSelector,
103 FlushableSubstvars,
104 ParserDocumentation,
105 PackageProcessor,
106 VirtualPath,
107 ServiceIntegrator,
108 ServiceDetector,
109 ServiceRegistry,
110 ServiceDefinition,
111 DSD,
112 ServiceUpgradeRule,
113 PackagerProvidedFileReferenceDocumentation,
114 packager_provided_file_reference_documentation,
115 TypeMappingDocumentation,
116 DebputyIntegrationMode,
117 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
118 BuildSystemManifestRuleMetadata,
119 INTEGRATION_MODE_FULL,
120 only_integrations,
121 DebputyPluginDefinition,
122)
123from debputy.plugin.api.std_docs import _STD_ATTR_DOCS
124from debputy.plugin.plugin_state import (
125 run_in_context_of_plugin,
126 run_in_context_of_plugin_wrap_errors,
127 wrap_plugin_code,
128 register_manifest_type_value_in_context,
129)
130from debputy.plugins.debputy.to_be_api_types import (
131 BuildRuleParsedFormat,
132 BSPF,
133 debputy_build_system,
134)
135from debputy.substitution import (
136 Substitution,
137 VariableNameState,
138 SUBST_VAR_RE,
139 VariableContext,
140)
141from debputy.util import (
142 _normalize_path,
143 POSTINST_DEFAULT_CONDITION,
144 _error,
145 print_command,
146 _warn,
147 _debug_log,
148)
149from debputy.yaml import MANIFEST_YAML
151if TYPE_CHECKING:
152 from debputy.highlevel_manifest import HighLevelManifest
154PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$")
155PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__)
158def _validate_known_packaging_file_dh_compat_rules(
159 dh_compat_rules: list[DHCompatibilityBasedRule] | None,
160) -> None:
161 max_compat = None
162 if not dh_compat_rules: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true
163 return
164 dh_compat_rule: DHCompatibilityBasedRule
165 for idx, dh_compat_rule in enumerate(dh_compat_rules):
166 dh_version = dh_compat_rule.get("starting_with_debhelper_version")
167 compat = dh_compat_rule.get("starting_with_compat_level")
169 remaining = dh_compat_rule.keys() - {
170 "after_debhelper_version",
171 "starting_with_compat_level",
172 }
173 if not remaining:
174 raise ValueError(
175 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?"
176 )
177 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1:
178 raise ValueError(
179 f"The dh compat-rule at index {idx} is not the last and is missing either"
180 " before-debhelper-version or before-compat-level"
181 )
182 if compat is not None and compat < 0:
183 raise ValueError(
184 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule"
185 f" for something that appeared when migrating from {compat} to {compat + 1}."
186 )
188 if max_compat is None:
189 max_compat = compat
190 elif compat is not None:
191 if compat >= max_compat:
192 raise ValueError(
193 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}."
194 )
195 max_compat = compat
197 install_pattern = dh_compat_rule.get("install_pattern")
198 if (
199 install_pattern is not None
200 and _normalize_path(install_pattern, with_prefix=False) != install_pattern
201 ):
202 raise ValueError(
203 f"The install-pattern in dh compat-rule at {idx} must be normalized as"
204 f' "{_normalize_path(install_pattern, with_prefix=False)}".'
205 )
208class DebputyPluginInitializerProvider(DebputyPluginInitializer):
209 __slots__ = (
210 "_plugin_metadata",
211 "_feature_set",
212 "_plugin_detector_ids",
213 "_substitution",
214 "_unloaders",
215 "_is_doc_cache_resolved",
216 "_doc_cache",
217 "_registered_manifest_types",
218 "_load_started",
219 )
221 def __init__(
222 self,
223 plugin_metadata: DebputyPluginMetadata,
224 feature_set: PluginProvidedFeatureSet,
225 substitution: Substitution,
226 ) -> None:
227 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata
228 self._feature_set = feature_set
229 self._plugin_detector_ids: set[str] = set()
230 self._substitution = substitution
231 self._unloaders: list[Callable[[], None]] = []
232 self._is_doc_cache_resolved: bool = False
233 self._doc_cache: DebputyParsedDoc | None = None
234 self._registered_manifest_types: dict[type[Any], DebputyPluginMetadata] = {}
235 self._load_started = False
237 @property
238 def plugin_metadata(self) -> DebputyPluginMetadata:
239 return self._plugin_metadata
241 def unload_plugin(self) -> None:
242 if self._load_started:
243 for unloader in self._unloaders:
244 unloader()
245 del self._feature_set.plugin_data[self._plugin_name]
247 def load_plugin(self) -> None:
248 metadata = self._plugin_metadata
249 if metadata.plugin_name in self._feature_set.plugin_data: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 raise PluginConflictError(
251 f'The plugin "{metadata.plugin_name}" has already been loaded!?'
252 )
253 assert (
254 metadata.api_compat_version == 1
255 ), f"Unsupported plugin API compat version {metadata.api_compat_version}"
256 self._feature_set.plugin_data[metadata.plugin_name] = metadata
257 self._load_started = True
258 assert not metadata.is_initialized
259 try:
260 metadata.initialize_plugin(self)
261 except Exception as e:
262 initializer = metadata.plugin_initializer
263 if ( 263 ↛ 268line 263 didn't jump to line 268 because the condition on line 263 was never true
264 isinstance(e, TypeError)
265 and initializer is not None
266 and not callable(initializer)
267 ):
268 raise PluginMetadataError(
269 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a"
270 f" callable (callable returns False). The specified entry point identifies"
271 f' itself as "{initializer.__qualname__}".'
272 ) from e
273 elif isinstance(e, PluginBaseError): 273 ↛ 275line 273 didn't jump to line 275 because the condition on line 273 was always true
274 raise
275 raise PluginInitializationError(
276 f"Exception while attempting to load plugin {metadata.plugin_name}"
277 ) from e
279 def _resolve_docs(self) -> DebputyParsedDoc | None:
280 doc_cache = self._doc_cache
281 if doc_cache is not None:
282 return doc_cache
284 plugin_doc_path = self._plugin_metadata.plugin_doc_path
285 if plugin_doc_path is None or self._is_doc_cache_resolved:
286 self._is_doc_cache_resolved = True
287 return None
288 try:
289 with plugin_doc_path.open("r", encoding="utf-8") as fd:
290 raw = MANIFEST_YAML.load(fd)
291 except FileNotFoundError:
292 _debug_log(
293 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}"
294 )
295 self._is_doc_cache_resolved = True
296 return None
297 attr_path = AttributePath.root_path(plugin_doc_path)
298 try:
299 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path)
300 except ManifestParseException as e:
301 raise ValueError(
302 f"Could not parse documentation in {plugin_doc_path}: {e.message}"
303 ) from e
304 try:
305 res = DebputyParsedDoc.from_ref_data(ref)
306 except ValueError as e:
307 raise ValueError(
308 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}"
309 ) from e
311 self._doc_cache = res
312 self._is_doc_cache_resolved = True
313 return res
315 def _pluggable_manifest_docs_for(
316 self,
317 rule_type: TTP | str,
318 rule_name: str | list[str],
319 *,
320 inline_reference_documentation: ParserDocumentation | None = None,
321 ) -> ParserDocumentation | None:
322 ref_data = self._resolve_docs()
323 if ref_data is not None:
324 primary_rule_name = (
325 rule_name if isinstance(rule_name, str) else rule_name[0]
326 )
327 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}"
328 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref)
329 if resolved_docs is not None:
330 if inline_reference_documentation is not None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 raise ValueError(
332 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via"
333 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so"
334 f" there is only one doc reference"
335 )
336 return resolved_docs
337 return inline_reference_documentation
339 def packager_provided_file(
340 self,
341 stem: str,
342 installed_path: str,
343 *,
344 default_mode: int = 0o0644,
345 default_priority: int | None = None,
346 allow_name_segment: bool = True,
347 allow_architecture_segment: bool = False,
348 post_formatting_rewrite: Callable[[str], str] | None = None,
349 packageless_is_fallback_for_all_packages: bool = False,
350 reservation_only: bool = False,
351 format_callback: None | (
352 Callable[[str, PPFFormatParam, VirtualPath], str]
353 ) = None,
354 reference_documentation: None | (
355 PackagerProvidedFileReferenceDocumentation
356 ) = None,
357 ) -> None:
358 packager_provided_files = self._feature_set.packager_provided_files
359 existing = packager_provided_files.get(stem)
361 if format_callback is not None and self._plugin_name != "debputy": 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 raise ValueError(
363 "Sorry; Using format_callback is a debputy-internal"
364 f" API. Triggered by plugin {self._plugin_name}"
365 )
367 if installed_path.endswith("/"): 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true
368 raise ValueError(
369 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.'
370 f" Triggered by plugin {self._plugin_name}."
371 )
373 installed_path = _normalize_path(installed_path)
375 has_name_var = "{name}" in installed_path
377 if installed_path.startswith("./DEBIAN") or reservation_only:
378 # Special-case, used for control files.
379 if self._plugin_name != "debputy": 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 raise ValueError(
381 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal"
382 f" API. Triggered by plugin {self._plugin_name}"
383 )
384 elif not has_name_var and "{owning_package}" not in installed_path: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 raise ValueError(
386 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"'
387 " substitution (or have installed_path end with a slash). Otherwise, the installed"
388 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}"
389 )
391 if allow_name_segment and not has_name_var: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 raise ValueError(
393 'When allow_name_segment is True, the installed_path must have a "{name}" substitution'
394 " variable. Otherwise, the name segment will not work properly. Triggered by"
395 f" plugin {self._plugin_name}"
396 )
398 if ( 398 ↛ 403line 398 didn't jump to line 403 because the condition on line 398 was never true
399 default_priority is not None
400 and "{priority}" not in installed_path
401 and "{priority:02}" not in installed_path
402 ):
403 raise ValueError(
404 'When default_priority is not None, the installed_path should have a "{priority}"'
405 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.'
406 f" Triggered by plugin {self._plugin_name}"
407 )
409 if existing is not None:
410 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 410 ↛ 417line 410 didn't jump to line 417 because the condition on line 410 was always true
411 message = (
412 f'The stem "{stem}" is registered twice for packager provided files.'
413 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once"
414 f" by {self._plugin_name}"
415 )
416 else:
417 message = (
418 f"Bug in the plugin {self._plugin_name}: It tried to register the"
419 f' stem "{stem}" twice for packager provided files.'
420 )
421 raise PluginConflictError(
422 message, existing.debputy_plugin_metadata, self._plugin_metadata
423 )
424 packager_provided_files[stem] = PackagerProvidedFileClassSpec(
425 self._plugin_metadata,
426 stem,
427 installed_path,
428 default_mode=default_mode,
429 default_priority=default_priority,
430 allow_name_segment=allow_name_segment,
431 allow_architecture_segment=allow_architecture_segment,
432 post_formatting_rewrite=post_formatting_rewrite,
433 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages,
434 reservation_only=reservation_only,
435 formatting_callback=format_callback,
436 reference_documentation=reference_documentation,
437 )
439 def _unload() -> None:
440 del packager_provided_files[stem]
442 self._unloaders.append(_unload)
444 def metadata_or_maintscript_detector(
445 self,
446 auto_detector_id: str,
447 auto_detector: MetadataAutoDetector,
448 *,
449 package_type: PackageTypeSelector = "deb",
450 ) -> None:
451 if auto_detector_id in self._plugin_detector_ids: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 raise ValueError(
453 f"The plugin {self._plugin_name} tried to register"
454 f' "{auto_detector_id}" twice'
455 )
456 self._plugin_detector_ids.add(auto_detector_id)
457 all_detectors = self._feature_set.metadata_maintscript_detectors
458 if self._plugin_name not in all_detectors:
459 all_detectors[self._plugin_name] = []
460 package_types = resolve_package_type_selectors(package_type)
461 all_detectors[self._plugin_name].append(
462 MetadataOrMaintscriptDetector(
463 detector_id=auto_detector_id,
464 detector=wrap_plugin_code(self._plugin_name, auto_detector),
465 plugin_metadata=self._plugin_metadata,
466 applies_to_package_types=package_types,
467 enabled=True,
468 )
469 )
471 def _unload() -> None:
472 if self._plugin_name in all_detectors:
473 del all_detectors[self._plugin_name]
475 self._unloaders.append(_unload)
477 def document_builtin_variable(
478 self,
479 variable_name: str,
480 variable_reference_documentation: str,
481 *,
482 is_context_specific: bool = False,
483 is_for_special_case: bool = False,
484 ) -> None:
485 manifest_variables = self._feature_set.manifest_variables
486 self._restricted_api()
487 state = self._substitution.variable_state(variable_name)
488 if state == VariableNameState.UNDEFINED: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 raise ValueError(
490 f"The plugin {self._plugin_name} attempted to document built-in {variable_name},"
491 f" but it is not known to be a variable"
492 )
494 assert variable_name not in manifest_variables
496 manifest_variables[variable_name] = PluginProvidedManifestVariable(
497 self._plugin_metadata,
498 variable_name,
499 None,
500 is_context_specific_variable=is_context_specific,
501 variable_reference_documentation=variable_reference_documentation,
502 is_documentation_placeholder=True,
503 is_for_special_case=is_for_special_case,
504 )
506 def _unload() -> None:
507 del manifest_variables[variable_name]
509 self._unloaders.append(_unload)
511 def manifest_variable_provider(
512 self,
513 provider: Callable[[VariableContext], Mapping[str, str]],
514 variables: Sequence[str] | Mapping[str, str | None],
515 ) -> None:
516 self._restricted_api()
517 cached_provider = functools.lru_cache(None)(provider)
518 permitted_variables = frozenset(variables)
519 variables_iter: Iterable[tuple[str, str | None]]
520 if not isinstance(variables, Mapping): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 variables_iter = zip(variables, itertools.repeat(None))
522 else:
523 variables_iter = variables.items()
525 checked_vars = False
526 manifest_variables = self._feature_set.manifest_variables
527 plugin_name = self._plugin_name
529 def _value_resolver_generator(
530 variable_name: str,
531 ) -> Callable[[VariableContext], str]:
532 def _value_resolver(variable_context: VariableContext) -> str:
533 res = cached_provider(variable_context)
534 nonlocal checked_vars
535 if not checked_vars: 535 ↛ 546line 535 didn't jump to line 546 because the condition on line 535 was always true
536 if permitted_variables != res.keys(): 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true
537 expected = ", ".join(sorted(permitted_variables))
538 actual = ", ".join(sorted(res))
539 raise PluginAPIViolationError(
540 f"The plugin {plugin_name} claimed to provide"
541 f" the following variables {expected},"
542 f" but when resolving the variables, the plugin provided"
543 f" {actual}. These two lists should have been the same."
544 )
545 checked_vars = False
546 return res[variable_name]
548 return _value_resolver
550 for varname, vardoc in variables_iter:
551 self._check_variable_name(varname)
552 manifest_variables[varname] = PluginProvidedManifestVariable(
553 self._plugin_metadata,
554 varname,
555 _value_resolver_generator(varname),
556 is_context_specific_variable=False,
557 variable_reference_documentation=vardoc,
558 )
560 def _unload() -> None:
561 raise PluginInitializationError(
562 "Cannot unload manifest_variable_provider (not implemented)"
563 )
565 self._unloaders.append(_unload)
567 def _check_variable_name(self, variable_name: str) -> None:
568 manifest_variables = self._feature_set.manifest_variables
569 existing = manifest_variables.get(variable_name)
571 if existing is not None:
572 if existing.plugin_metadata.plugin_name == self._plugin_name: 572 ↛ 578line 572 didn't jump to line 578 because the condition on line 572 was always true
573 message = (
574 f"Bug in the plugin {self._plugin_name}: It tried to register the"
575 f' manifest variable "{variable_name}" twice.'
576 )
577 else:
578 message = (
579 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
580 f" both tried to provide the manifest variable {variable_name}"
581 )
582 raise PluginConflictError(
583 message, existing.plugin_metadata, self._plugin_metadata
584 )
585 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"):
586 raise ValueError(
587 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
588 f" which is not a valid variable name"
589 )
591 namespace = ""
592 variable_basename = variable_name
593 if ":" in variable_name:
594 namespace, variable_basename = variable_name.rsplit(":", 1)
595 assert namespace != ""
596 assert variable_name != ""
598 if namespace != "" and namespace not in ("token", "path"):
599 raise ValueError(
600 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
601 f" which is in the reserved namespace {namespace}"
602 )
604 variable_name_upper = variable_name.upper()
605 if (
606 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY"))
607 or variable_basename.startswith("_")
608 or variable_basename.upper().startswith("DEBPUTY")
609 ) and self._plugin_name != "debputy":
610 raise ValueError(
611 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
612 f" which is a variable name reserved by debputy"
613 )
615 state = self._substitution.variable_state(variable_name)
616 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy":
617 raise ValueError(
618 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
619 f" which would shadow a built-in variable"
620 )
622 def package_processor(
623 self,
624 processor_id: str,
625 processor: PackageProcessor,
626 *,
627 depends_on_processor: Iterable[str] = tuple(),
628 package_type: PackageTypeSelector = "deb",
629 ) -> None:
630 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"})
631 package_processors = self._feature_set.all_package_processors
632 dependencies = set()
633 processor_key = (self._plugin_name, processor_id)
635 if processor_key in package_processors: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 raise PluginConflictError(
637 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}",
638 self._plugin_metadata,
639 self._plugin_metadata,
640 )
642 for depends_ref in depends_on_processor:
643 if isinstance(depends_ref, str): 643 ↛ 657line 643 didn't jump to line 657 because the condition on line 643 was always true
644 if (self._plugin_name, depends_ref) in package_processors: 644 ↛ 646line 644 didn't jump to line 646 because the condition on line 644 was always true
645 depends_key = (self._plugin_name, depends_ref)
646 elif ("debputy", depends_ref) in package_processors:
647 depends_key = ("debputy", depends_ref)
648 else:
649 raise ValueError(
650 f'Could not resolve dependency "{depends_ref}" for'
651 f' "{processor_id}". It was not provided by the plugin itself'
652 f" ({self._plugin_name}) nor debputy."
653 )
654 else:
655 # TODO: Add proper dependencies first, at which point we should probably resolve "name"
656 # via the direct dependencies.
657 assert False
659 existing_processor = package_processors.get(depends_key)
660 if existing_processor is None: 660 ↛ 663line 660 didn't jump to line 663 because the condition on line 660 was never true
661 # We currently require the processor to be declared already. If this ever changes,
662 # PluginProvidedFeatureSet.package_processors_in_order will need an update
663 dplugin_name, dprocessor_name = depends_key
664 available_processors = ", ".join(
665 n for p, n in package_processors.keys() if p == dplugin_name
666 )
667 raise ValueError(
668 f"The plugin {dplugin_name} does not provide a processor called"
669 f" {dprocessor_name}. Available processors for that plugin are:"
670 f" {available_processors}"
671 )
672 dependencies.add(depends_key)
674 package_processors[processor_key] = PluginProvidedPackageProcessor(
675 processor_id,
676 resolve_package_type_selectors(package_type),
677 wrap_plugin_code(self._plugin_name, processor),
678 frozenset(dependencies),
679 self._plugin_metadata,
680 )
682 def _unload() -> None:
683 del package_processors[processor_key]
685 self._unloaders.append(_unload)
687 def automatic_discard_rule(
688 self,
689 name: str,
690 should_discard: Callable[[VirtualPath], bool],
691 *,
692 rule_reference_documentation: str | None = None,
693 examples: (
694 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample]
695 ) = tuple(),
696 ) -> None:
697 """Register an automatic discard rule
699 An automatic discard rule is basically applied to *every* path about to be installed in to any package.
700 If any discard rule concludes that a path should not be installed, then the path is not installed.
701 In the case where the discard path is a:
703 * directory: Then the entire directory is excluded along with anything beneath it.
704 * symlink: Then the symlink itself (but not its target) is excluded.
705 * hardlink: Then the current hardlink will not be installed, but other instances of it will be.
707 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file.
709 Automatic discard rules should be written with the assumption that directories will be tested
710 before their content *when it is relevant* for the discard rule to examine whether the directory
711 can be excluded.
713 The packager can via the manifest overrule automatic discard rules by explicitly listing the path
714 without any globs. As example:
716 installations:
717 - install:
718 sources:
719 - usr/lib/libfoo.la # <-- This path is always installed
720 # (Discard rules are never asked in this case)
721 #
722 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches
723 # Though, they will not examine `libfoo.la` as it has already been installed
724 #
725 # Note: usr/lib itself is never tested in this case (it is assumed to be
726 # explicitly requested). But any subdir of usr/lib will be examined.
728 When an automatic discard rule is evaluated, it can see the source path currently being considered
729 for installation. While it can look at "surrounding" context (like parent directory), it will not
730 know whether those paths are to be installed or will be installed.
732 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell
733 metacharacters and spaces.
734 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive
735 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the
736 path is discarded. If it returns `False`, the path is not discarded (by this rule at least).
737 A source path will either be from the root of the source tree or the root of a search directory such as
738 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is
739 evaluated.
740 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user
741 looks up this automatic discard rule.
742 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to
743 generate the examples.
745 """
746 self._restricted_api()
747 auto_discard_rules = self._feature_set.auto_discard_rules
748 existing = auto_discard_rules.get(name)
749 if existing is not None: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true
750 if existing.plugin_metadata.plugin_name == self._plugin_name:
751 message = (
752 f"Bug in the plugin {self._plugin_name}: It tried to register the"
753 f' automatic discard rule "{name}" twice.'
754 )
755 else:
756 message = (
757 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
758 f" both tried to provide the automatic discard rule {name}"
759 )
760 raise PluginConflictError(
761 message, existing.plugin_metadata, self._plugin_metadata
762 )
763 examples = (
764 (examples,)
765 if isinstance(examples, AutomaticDiscardRuleExample)
766 else tuple(examples)
767 )
768 auto_discard_rules[name] = PluginProvidedDiscardRule(
769 name,
770 self._plugin_metadata,
771 should_discard,
772 rule_reference_documentation,
773 examples,
774 )
776 def _unload() -> None:
777 del auto_discard_rules[name]
779 self._unloaders.append(_unload)
781 def service_provider(
782 self,
783 service_manager: str,
784 detector: ServiceDetector,
785 integrator: ServiceIntegrator,
786 ) -> None:
787 self._restricted_api()
788 service_managers = self._feature_set.service_managers
789 existing = service_managers.get(service_manager)
790 if existing is not None: 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true
791 if existing.plugin_metadata.plugin_name == self._plugin_name:
792 message = (
793 f"Bug in the plugin {self._plugin_name}: It tried to register the"
794 f' service manager "{service_manager}" twice.'
795 )
796 else:
797 message = (
798 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
799 f' both tried to provide the service manager "{service_manager}"'
800 )
801 raise PluginConflictError(
802 message, existing.plugin_metadata, self._plugin_metadata
803 )
804 service_managers[service_manager] = ServiceManagerDetails(
805 service_manager,
806 wrap_plugin_code(self._plugin_name, detector),
807 wrap_plugin_code(self._plugin_name, integrator),
808 self._plugin_metadata,
809 )
811 def _unload() -> None:
812 del service_managers[service_manager]
814 self._unloaders.append(_unload)
816 def manifest_variable(
817 self,
818 variable_name: str,
819 value: str,
820 *,
821 variable_reference_documentation: str | None = None,
822 ) -> None:
823 self._check_variable_name(variable_name)
824 manifest_variables = self._feature_set.manifest_variables
825 try:
826 resolved_value = self._substitution.substitute(
827 value, "Plugin initialization"
828 )
829 depends_on_variable = resolved_value != value
830 except DebputySubstitutionError:
831 depends_on_variable = True
832 if depends_on_variable:
833 raise ValueError(
834 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}."
835 f" This value depends on another variable, which is not supported. This restriction may be"
836 f" lifted in the future."
837 )
839 manifest_variables[variable_name] = PluginProvidedManifestVariable(
840 self._plugin_metadata,
841 variable_name,
842 value,
843 is_context_specific_variable=False,
844 variable_reference_documentation=variable_reference_documentation,
845 )
847 def _unload() -> None:
848 # We need to check it was never resolved
849 raise PluginInitializationError(
850 "Cannot unload manifest_variable (not implemented)"
851 )
853 self._unloaders.append(_unload)
855 @property
856 def _plugin_name(self) -> str:
857 return self._plugin_metadata.plugin_name
859 def provide_manifest_keyword(
860 self,
861 rule_type: TTP,
862 rule_name: str | list[str],
863 handler: DIPKWHandler,
864 *,
865 inline_reference_documentation: ParserDocumentation | None = None,
866 ) -> None:
867 self._restricted_api()
868 parser_generator = self._feature_set.manifest_parser_generator
869 if rule_type not in parser_generator.dispatchable_table_parsers: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true
870 types = ", ".join(
871 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers)
872 )
873 raise ValueError(
874 f"The rule_type was not a supported type. It must be one of {types}"
875 )
877 inline_reference_documentation = self._pluggable_manifest_docs_for(
878 rule_type,
879 rule_name,
880 inline_reference_documentation=inline_reference_documentation,
881 )
883 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
884 dispatching_parser.register_keyword(
885 rule_name,
886 wrap_plugin_code(self._plugin_name, handler),
887 self._plugin_metadata,
888 inline_reference_documentation=inline_reference_documentation,
889 )
891 def _unload() -> None:
892 raise PluginInitializationError(
893 "Cannot unload provide_manifest_keyword (not implemented)"
894 )
896 self._unloaders.append(_unload)
898 def pluggable_object_parser(
899 self,
900 rule_type: str,
901 rule_name: str,
902 *,
903 object_parser_key: str | None = None,
904 on_end_parse_step: None | (
905 Callable[
906 [str, Mapping[str, Any] | None, AttributePath, ParserContextData],
907 None,
908 ]
909 ) = None,
910 nested_in_package_context: bool = False,
911 ) -> None:
912 self._restricted_api()
913 if object_parser_key is None: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 object_parser_key = rule_name
916 parser_generator = self._feature_set.manifest_parser_generator
917 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
918 if rule_type not in dispatchable_object_parsers: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true
919 types = ", ".join(sorted(dispatchable_object_parsers))
920 raise ValueError(
921 f"The rule_type was not a supported type. It must be one of {types}"
922 )
923 if object_parser_key not in dispatchable_object_parsers: 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true
924 types = ", ".join(sorted(dispatchable_object_parsers))
925 raise ValueError(
926 f"The object_parser_key was not a supported type. It must be one of {types}"
927 )
928 parent_dispatcher = dispatchable_object_parsers[rule_type]
929 child_dispatcher = dispatchable_object_parsers[object_parser_key]
931 if on_end_parse_step is not None: 931 ↛ 934line 931 didn't jump to line 934 because the condition on line 931 was always true
932 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step)
934 parent_dispatcher.register_child_parser(
935 rule_name,
936 child_dispatcher,
937 self._plugin_metadata,
938 on_end_parse_step=on_end_parse_step,
939 nested_in_package_context=nested_in_package_context,
940 )
942 def _unload() -> None:
943 raise PluginInitializationError(
944 "Cannot unload pluggable_object_parser (not implemented)"
945 )
947 self._unloaders.append(_unload)
949 def pluggable_manifest_rule(
950 self,
951 rule_type: TTP | str,
952 rule_name: str | Sequence[str],
953 parsed_format: type[PF],
954 handler: DIPHandler,
955 *,
956 source_format: SF | None = None,
957 inline_reference_documentation: ParserDocumentation | None = None,
958 expected_debputy_integration_mode: None | (
959 Container[DebputyIntegrationMode]
960 ) = None,
961 apply_standard_attribute_documentation: bool = False,
962 register_value: bool = True,
963 ) -> None:
964 # When changing this, consider which types will be unrestricted
965 self._restricted_api()
966 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true
967 _error(
968 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to"
969 f" its use of apply_standard_attribute_documentation"
970 )
971 feature_set = self._feature_set
972 parser_generator = feature_set.manifest_parser_generator
973 if isinstance(rule_type, str):
974 if rule_type not in parser_generator.dispatchable_object_parsers: 974 ↛ 975line 974 didn't jump to line 975 because the condition on line 974 was never true
975 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers))
976 raise ValueError(
977 f"The rule_type was not a supported type. It must be one of {types}"
978 )
979 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type]
980 signature = inspect.signature(handler)
981 if ( 981 ↛ 985line 981 didn't jump to line 985 because the condition on line 981 was never true
982 signature.return_annotation is signature.empty
983 or signature.return_annotation == NoneType
984 ):
985 raise ValueError(
986 "The handler must have a return type (that is not None)"
987 )
988 register_as_type = signature.return_annotation
989 else:
990 # Dispatchable types cannot be resolved
991 register_as_type = None
992 if rule_type not in parser_generator.dispatchable_table_parsers: 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true
993 types = ", ".join(
994 sorted(
995 x.__name__ for x in parser_generator.dispatchable_table_parsers
996 )
997 )
998 raise ValueError(
999 f"The rule_type was not a supported type. It must be one of {types}"
1000 )
1001 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
1003 if register_as_type is not None and not register_value:
1004 register_as_type = None
1006 if register_as_type is not None:
1007 existing_registration = self._registered_manifest_types.get(
1008 register_as_type
1009 )
1010 if existing_registration is not None: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 raise ValueError(
1012 f"Cannot register rule {rule_name!r} for plugin {self._plugin_name}. The plugin {existing_registration.plugin_name} already registered a manifest rule with type {register_as_type!r}"
1013 )
1014 self._registered_manifest_types[register_as_type] = self._plugin_metadata
1016 inline_reference_documentation = self._pluggable_manifest_docs_for(
1017 rule_type,
1018 rule_name,
1019 inline_reference_documentation=inline_reference_documentation,
1020 )
1022 if apply_standard_attribute_documentation: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true
1023 docs = _STD_ATTR_DOCS
1024 else:
1025 docs = None
1027 parser = feature_set.manifest_parser_generator.generate_parser(
1028 parsed_format,
1029 source_content=source_format,
1030 inline_reference_documentation=inline_reference_documentation,
1031 expected_debputy_integration_mode=expected_debputy_integration_mode,
1032 automatic_docs=docs,
1033 )
1035 def _registering_handler(
1036 name: str,
1037 parsed_data: PF,
1038 attribute_path: AttributePath,
1039 parser_context: ParserContextData,
1040 ) -> TP:
1041 value = handler(name, parsed_data, attribute_path, parser_context)
1042 if register_as_type is not None:
1043 register_manifest_type_value_in_context(register_as_type, value)
1044 return value
1046 dispatching_parser.register_parser(
1047 rule_name,
1048 parser,
1049 wrap_plugin_code(self._plugin_name, _registering_handler),
1050 self._plugin_metadata,
1051 )
1053 def _unload() -> None:
1054 raise PluginInitializationError(
1055 "Cannot unload pluggable_manifest_rule (not implemented)"
1056 )
1058 self._unloaders.append(_unload)
1060 def register_build_system(
1061 self,
1062 build_system_definition: type[BSPF],
1063 ) -> None:
1064 self._restricted_api()
1065 if not is_typeddict(build_system_definition): 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true
1066 raise PluginInitializationError(
1067 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__},"
1068 f" but got {build_system_definition.__name__} instead"
1069 )
1070 metadata = getattr(
1071 build_system_definition,
1072 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
1073 None,
1074 )
1075 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1075 ↛ 1076line 1075 didn't jump to line 1076 because the condition on line 1075 was never true
1076 raise PluginIncorrectRegistrationError(
1077 f"The {build_system_definition.__qualname__} type should have been annotated with"
1078 f" @{debputy_build_system.__name__}."
1079 )
1080 assert len(metadata.manifest_keywords) == 1
1081 build_system_impl = metadata.build_system_impl
1082 assert build_system_impl is not None
1083 manifest_keyword = next(iter(metadata.manifest_keywords))
1084 self.pluggable_manifest_rule(
1085 metadata.dispatched_type,
1086 metadata.manifest_keywords,
1087 build_system_definition,
1088 # pluggable_manifest_rule does the wrapping
1089 metadata.unwrapped_constructor,
1090 source_format=metadata.source_format,
1091 inline_reference_documentation=metadata.online_reference_documentation,
1092 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL),
1093 )
1094 self._auto_detectable_build_system(
1095 manifest_keyword,
1096 build_system_impl,
1097 constructor=wrap_plugin_code(
1098 self._plugin_name,
1099 build_system_impl,
1100 ),
1101 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems,
1102 )
1104 def _auto_detectable_build_system(
1105 self,
1106 manifest_keyword: str,
1107 rule_type: type[BSR],
1108 *,
1109 shadowing_build_systems_when_active: frozenset[str] = frozenset(),
1110 constructor: None | (
1111 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR]
1112 ) = None,
1113 ) -> None:
1114 self._restricted_api()
1115 feature_set = self._feature_set
1116 existing = feature_set.auto_detectable_build_systems.get(rule_type)
1117 if existing is not None: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 bs_name = rule_type.__class__.__name__
1119 if existing.plugin_metadata.plugin_name == self._plugin_name:
1120 message = (
1121 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1122 f' auto-detection of the build system "{bs_name}" twice.'
1123 )
1124 else:
1125 message = (
1126 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
1127 f' both tried to provide auto-detection of the build system "{bs_name}"'
1128 )
1129 raise PluginConflictError(
1130 message, existing.plugin_metadata, self._plugin_metadata
1131 )
1133 if constructor is None: 1133 ↛ 1135line 1133 didn't jump to line 1135 because the condition on line 1133 was never true
1135 def impl(
1136 attributes: BuildRuleParsedFormat,
1137 attribute_path: AttributePath,
1138 manifest: "HighLevelManifest",
1139 ) -> BSR:
1140 return rule_type(attributes, attribute_path, manifest)
1142 else:
1143 impl = constructor
1145 feature_set.auto_detectable_build_systems[rule_type] = (
1146 PluginProvidedBuildSystemAutoDetection(
1147 manifest_keyword,
1148 rule_type,
1149 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system),
1150 impl,
1151 shadowing_build_systems_when_active,
1152 self._plugin_metadata,
1153 )
1154 )
1156 def _unload() -> None:
1157 try:
1158 del feature_set.auto_detectable_build_systems[rule_type]
1159 except KeyError:
1160 pass
1162 self._unloaders.append(_unload)
1164 def known_packaging_files(
1165 self,
1166 packaging_file_details: KnownPackagingFileInfo,
1167 ) -> None:
1168 known_packaging_files = self._feature_set.known_packaging_files
1169 detection_method = packaging_file_details.get(
1170 "detection_method", cast("Literal['path']", "path")
1171 )
1172 path = packaging_file_details.get("path")
1173 dhpkgfile = packaging_file_details.get("pkgfile")
1175 packaging_file_details = packaging_file_details.copy()
1177 if detection_method == "path": 1177 ↛ 1193line 1177 didn't jump to line 1193 because the condition on line 1177 was always true
1178 if dhpkgfile is not None: 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true
1179 raise ValueError(
1180 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)'
1181 )
1182 if path is None: 1182 ↛ 1183line 1182 didn't jump to line 1183 because the condition on line 1182 was never true
1183 raise ValueError(
1184 'The "path" attribute must be present when detection-method is "path" (or omitted)'
1185 )
1186 if path != _normalize_path(path, with_prefix=False): 1186 ↛ 1187line 1186 didn't jump to line 1187 because the condition on line 1186 was never true
1187 raise ValueError(
1188 f"The path for known packaging files must be normalized. Please replace"
1189 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"'
1190 )
1191 detection_value = path
1192 else:
1193 assert detection_method == "dh.pkgfile"
1194 if path is not None:
1195 raise ValueError(
1196 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"'
1197 )
1198 if dhpkgfile is None:
1199 raise ValueError(
1200 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"'
1201 )
1202 if "/" in dhpkgfile:
1203 raise ValueError(
1204 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)'
1205 )
1206 detection_value = dhpkgfile
1207 key = f"{detection_method}::{detection_value}"
1208 existing = known_packaging_files.get(key)
1209 if existing is not None: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true
1210 if existing.plugin_metadata.plugin_name != self._plugin_name:
1211 message = (
1212 f'The key "{key}" is registered twice for known packaging files.'
1213 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1214 )
1215 else:
1216 message = (
1217 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1218 f' key "{key}" twice for known packaging files.'
1219 )
1220 raise PluginConflictError(
1221 message, existing.plugin_metadata, self._plugin_metadata
1222 )
1223 _validate_known_packaging_file_dh_compat_rules(
1224 packaging_file_details.get("dh_compat_rules")
1225 )
1226 known_packaging_files[key] = PluginProvidedKnownPackagingFile(
1227 packaging_file_details,
1228 detection_method,
1229 detection_value,
1230 self._plugin_metadata,
1231 )
1233 def _unload() -> None:
1234 del known_packaging_files[key]
1236 self._unloaders.append(_unload)
1238 def register_mapped_type(
1239 self,
1240 type_mapping: TypeMapping,
1241 *,
1242 reference_documentation: TypeMappingDocumentation | None = None,
1243 ) -> None:
1244 self._restricted_api()
1245 target_type = type_mapping.target_type
1246 mapped_types = self._feature_set.mapped_types
1247 existing = mapped_types.get(target_type)
1248 if existing is not None: 1248 ↛ 1249line 1248 didn't jump to line 1249 because the condition on line 1248 was never true
1249 if existing.plugin_metadata.plugin_name != self._plugin_name:
1250 message = (
1251 f'The key "{target_type.__name__}" is registered twice for known packaging files.'
1252 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1253 )
1254 else:
1255 message = (
1256 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1257 f' key "{target_type.__name__}" twice for known packaging files.'
1258 )
1259 raise PluginConflictError(
1260 message, existing.plugin_metadata, self._plugin_metadata
1261 )
1262 parser_generator = self._feature_set.manifest_parser_generator
1263 # TODO: Wrap the mapper in the plugin context
1264 mapped_types[target_type] = PluginProvidedTypeMapping(
1265 type_mapping, reference_documentation, self._plugin_metadata
1266 )
1267 parser_generator.register_mapped_type(type_mapping)
1269 def _restricted_api(
1270 self,
1271 *,
1272 allowed_plugins: set[str] | frozenset[str] = frozenset(),
1273 ) -> None:
1274 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true
1275 raise PluginAPIViolationError(
1276 f"Plugin {self._plugin_name} attempted to access a debputy-only API."
1277 " If you are the maintainer of this plugin and want access to this"
1278 " API, please file a feature request to make this public."
1279 " (The API is currently private as it is unstable.)"
1280 )
1283class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC):
1284 __slots__ = ()
1286 def _append_script(
1287 self,
1288 caller_name: str,
1289 maintscript: Maintscript,
1290 full_script: str,
1291 /,
1292 perform_substitution: bool = True,
1293 ) -> None:
1294 raise NotImplementedError
1296 @classmethod
1297 def _apply_condition_to_script(
1298 cls,
1299 condition: str,
1300 run_snippet: str,
1301 /,
1302 indent: bool | None = None,
1303 ) -> str:
1304 if indent is None:
1305 # We auto-determine this based on heredocs currently
1306 indent = "<<" not in run_snippet
1308 if indent:
1309 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True))
1310 if not run_snippet.endswith("\n"):
1311 run_snippet += "\n"
1312 condition_line = f"if {condition}; then\n"
1313 end_line = "fi\n"
1314 return "".join((condition_line, run_snippet, end_line))
1316 def on_configure(
1317 self,
1318 run_snippet: str,
1319 /,
1320 indent: bool | None = None,
1321 perform_substitution: bool = True,
1322 skip_on_rollback: bool = False,
1323 ) -> None:
1324 condition = POSTINST_DEFAULT_CONDITION
1325 if skip_on_rollback: 1325 ↛ 1326line 1325 didn't jump to line 1326 because the condition on line 1325 was never true
1326 condition = '[ "$1" = "configure" ]'
1327 return self._append_script(
1328 "on_configure",
1329 "postinst",
1330 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1331 perform_substitution=perform_substitution,
1332 )
1334 def on_initial_install(
1335 self,
1336 run_snippet: str,
1337 /,
1338 indent: bool | None = None,
1339 perform_substitution: bool = True,
1340 ) -> None:
1341 condition = '[ "$1" = "configure" -a -z "$2" ]'
1342 return self._append_script(
1343 "on_initial_install",
1344 "postinst",
1345 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1346 perform_substitution=perform_substitution,
1347 )
1349 def on_upgrade(
1350 self,
1351 run_snippet: str,
1352 /,
1353 indent: bool | None = None,
1354 perform_substitution: bool = True,
1355 ) -> None:
1356 condition = '[ "$1" = "configure" -a -n "$2" ]'
1357 return self._append_script(
1358 "on_upgrade",
1359 "postinst",
1360 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1361 perform_substitution=perform_substitution,
1362 )
1364 def on_upgrade_from(
1365 self,
1366 version: str,
1367 run_snippet: str,
1368 /,
1369 indent: bool | None = None,
1370 perform_substitution: bool = True,
1371 ) -> None:
1372 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"'
1373 return self._append_script(
1374 "on_upgrade_from",
1375 "postinst",
1376 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1377 perform_substitution=perform_substitution,
1378 )
1380 def on_before_removal(
1381 self,
1382 run_snippet: str,
1383 /,
1384 indent: bool | None = None,
1385 perform_substitution: bool = True,
1386 ) -> None:
1387 condition = '[ "$1" = "remove" ]'
1388 return self._append_script(
1389 "on_before_removal",
1390 "prerm",
1391 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1392 perform_substitution=perform_substitution,
1393 )
1395 def on_removed(
1396 self,
1397 run_snippet: str,
1398 /,
1399 indent: bool | None = None,
1400 perform_substitution: bool = True,
1401 ) -> None:
1402 condition = '[ "$1" = "remove" ]'
1403 return self._append_script(
1404 "on_removed",
1405 "postrm",
1406 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1407 perform_substitution=perform_substitution,
1408 )
1410 def on_purge(
1411 self,
1412 run_snippet: str,
1413 /,
1414 indent: bool | None = None,
1415 perform_substitution: bool = True,
1416 ) -> None:
1417 condition = '[ "$1" = "purge" ]'
1418 return self._append_script(
1419 "on_purge",
1420 "postrm",
1421 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1422 perform_substitution=perform_substitution,
1423 )
1425 def unconditionally_in_script(
1426 self,
1427 maintscript: Maintscript,
1428 run_snippet: str,
1429 /,
1430 perform_substitution: bool = True,
1431 ) -> None:
1432 if maintscript not in STD_CONTROL_SCRIPTS: 1432 ↛ 1433line 1432 didn't jump to line 1433 because the condition on line 1432 was never true
1433 raise ValueError(
1434 f'Unknown script "{maintscript}". Should have been one of:'
1435 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}'
1436 )
1437 return self._append_script(
1438 "unconditionally_in_script",
1439 maintscript,
1440 run_snippet,
1441 perform_substitution=perform_substitution,
1442 )
1445class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase):
1446 __slots__ = (
1447 "_plugin_metadata",
1448 "_maintscript_snippets",
1449 "_plugin_source_id",
1450 "_package_substitution",
1451 "_default_snippet_order",
1452 )
1454 def __init__(
1455 self,
1456 plugin_metadata: DebputyPluginMetadata,
1457 plugin_source_id: str,
1458 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1459 package_substitution: Substitution,
1460 *,
1461 default_snippet_order: Literal["service"] | None = None,
1462 ):
1463 self._plugin_metadata = plugin_metadata
1464 self._plugin_source_id = plugin_source_id
1465 self._maintscript_snippets = maintscript_snippets
1466 self._package_substitution = package_substitution
1467 self._default_snippet_order = default_snippet_order
1469 def _append_script(
1470 self,
1471 caller_name: str,
1472 maintscript: Maintscript,
1473 full_script: str,
1474 /,
1475 perform_substitution: bool = True,
1476 ) -> None:
1477 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})"
1478 if perform_substitution:
1479 full_script = self._package_substitution.substitute(full_script, def_source)
1481 snippet = MaintscriptSnippet(
1482 snippet=full_script,
1483 definition_source=def_source,
1484 snippet_order=self._default_snippet_order,
1485 )
1486 self._maintscript_snippets[maintscript].append(snippet)
1489class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor):
1490 __slots__ = (
1491 "_plugin_metadata",
1492 "_plugin_source_id",
1493 "_package_metadata_context",
1494 "_triggers",
1495 "_substvars",
1496 "_maintscript",
1497 "_shlibs_details",
1498 )
1500 def __init__(
1501 self,
1502 plugin_metadata: DebputyPluginMetadata,
1503 plugin_source_id: str,
1504 package_metadata_context: PackageProcessingContext,
1505 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1506 substvars: FlushableSubstvars,
1507 shlibs_details: tuple[str | None, list[str] | None],
1508 ) -> None:
1509 self._plugin_metadata = plugin_metadata
1510 self._plugin_source_id = plugin_source_id
1511 self._package_metadata_context = package_metadata_context
1512 self._triggers = triggers
1513 self._substvars = substvars
1514 self._maintscript: MaintscriptAccessor | None = None
1515 self._shlibs_details = shlibs_details
1517 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1518 raise NotImplementedError
1520 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None:
1521 """Register a declarative dpkg level trigger
1523 The provided trigger will be added to the package's metadata (the triggers file of the control.tar).
1525 If the trigger has already been added previously, a second call with the same trigger data will be ignored.
1526 """
1527 key = (trigger_type, trigger_target)
1528 if key in self._triggers: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true
1529 return
1530 self._triggers[key] = PluginProvidedTrigger(
1531 dpkg_trigger_type=trigger_type,
1532 dpkg_trigger_target=trigger_target,
1533 provider=self._plugin_metadata,
1534 provider_source_id=self._plugin_source_id,
1535 )
1537 @property
1538 def maintscript(self) -> MaintscriptAccessor:
1539 maintscript = self._maintscript
1540 if maintscript is None:
1541 maintscript = self._create_maintscript_accessor()
1542 self._maintscript = maintscript
1543 return maintscript
1545 @property
1546 def substvars(self) -> FlushableSubstvars:
1547 return self._substvars
1549 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None:
1550 binary_package = self._package_metadata_context.binary_package
1551 with self.substvars.flush() as substvars_file:
1552 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"]
1553 if binary_package.is_udeb:
1554 dpkg_cmd.append("-tudeb")
1555 if binary_package.is_essential: 1555 ↛ 1556line 1555 didn't jump to line 1556 because the condition on line 1555 was never true
1556 dpkg_cmd.append("-dPre-Depends")
1557 shlibs_local, shlib_dirs = self._shlibs_details
1558 if shlibs_local is not None: 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true
1559 dpkg_cmd.append(f"-L{shlibs_local}")
1560 if shlib_dirs: 1560 ↛ 1561line 1560 didn't jump to line 1561 because the condition on line 1560 was never true
1561 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs)
1562 dpkg_cmd.extend(p.fs_path for p in paths)
1563 print_command(*dpkg_cmd)
1564 try:
1565 subprocess.check_call(dpkg_cmd)
1566 except subprocess.CalledProcessError:
1567 _error(
1568 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please"
1569 " review the output from dpkg-shlibdeps above to understand what went wrong."
1570 )
1573class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase):
1574 __slots__ = (
1575 "_maintscript",
1576 "_maintscript_snippets",
1577 "_package_substitution",
1578 )
1580 def __init__(
1581 self,
1582 plugin_metadata: DebputyPluginMetadata,
1583 plugin_source_id: str,
1584 package_metadata_context: PackageProcessingContext,
1585 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1586 substvars: FlushableSubstvars,
1587 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1588 package_substitution: Substitution,
1589 shlibs_details: tuple[str | None, list[str] | None],
1590 *,
1591 default_snippet_order: Literal["service"] | None = None,
1592 ) -> None:
1593 super().__init__(
1594 plugin_metadata,
1595 plugin_source_id,
1596 package_metadata_context,
1597 triggers,
1598 substvars,
1599 shlibs_details,
1600 )
1601 self._maintscript_snippets = maintscript_snippets
1602 self._package_substitution = package_substitution
1603 self._maintscript = MaintscriptAccessorProvider(
1604 plugin_metadata,
1605 plugin_source_id,
1606 maintscript_snippets,
1607 package_substitution,
1608 default_snippet_order=default_snippet_order,
1609 )
1611 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1612 return MaintscriptAccessorProvider(
1613 self._plugin_metadata,
1614 self._plugin_source_id,
1615 self._maintscript_snippets,
1616 self._package_substitution,
1617 )
1620class BinaryCtrlAccessorProviderCreator:
1621 def __init__(
1622 self,
1623 package_metadata_context: PackageProcessingContext,
1624 substvars: FlushableSubstvars,
1625 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1626 substitution: Substitution,
1627 ) -> None:
1628 self._package_metadata_context = package_metadata_context
1629 self._substvars = substvars
1630 self._maintscript_snippets = maintscript_snippets
1631 self._substitution = substitution
1632 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
1633 self.shlibs_details: tuple[str | None, list[str] | None] = None, None
1635 def for_plugin(
1636 self,
1637 plugin_metadata: DebputyPluginMetadata,
1638 plugin_source_id: str,
1639 *,
1640 default_snippet_order: Literal["service"] | None = None,
1641 ) -> BinaryCtrlAccessor:
1642 return BinaryCtrlAccessorProvider(
1643 plugin_metadata,
1644 plugin_source_id,
1645 self._package_metadata_context,
1646 self._triggers,
1647 self._substvars,
1648 self._maintscript_snippets,
1649 self._substitution,
1650 self.shlibs_details,
1651 default_snippet_order=default_snippet_order,
1652 )
1654 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]:
1655 return self._triggers.values()
1658def _resolve_bundled_plugin_docs_path(
1659 plugin_name: str,
1660 loader: PluginInitializationEntryPoint | None,
1661) -> Traversable | Path | None:
1662 plugin_module = getattr(loader, "__module__")
1663 assert plugin_module is not None
1664 plugin_package_name = sys.modules[plugin_module].__package__
1665 return importlib.resources.files(plugin_package_name).joinpath(
1666 f"{plugin_name}_docs.yaml"
1667 )
1670def plugin_metadata_for_debputys_own_plugin(
1671 loader: PluginInitializationEntryPoint | None = None,
1672) -> DebputyPluginMetadata:
1673 if loader is None:
1674 from debputy.plugins.debputy.debputy_plugin import (
1675 initialize_debputy_features,
1676 )
1678 loader = initialize_debputy_features
1679 plugin_name = "debputy"
1680 return DebputyPluginMetadata(
1681 plugin_name="debputy",
1682 api_compat_version=1,
1683 plugin_initializer=loader,
1684 plugin_loader=None,
1685 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path(
1686 plugin_name,
1687 loader,
1688 ),
1689 plugin_path="<bundled>",
1690 )
1693def load_plugin_features(
1694 plugin_search_dirs: Sequence[str],
1695 substitution: Substitution,
1696 requested_plugins_only: Sequence[str] | None = None,
1697 required_plugins: set[str] | None = None,
1698 plugin_feature_set: PluginProvidedFeatureSet | None = None,
1699 debug_mode: bool = False,
1700) -> PluginProvidedFeatureSet:
1701 if plugin_feature_set is None:
1702 plugin_feature_set = PluginProvidedFeatureSet()
1703 plugins = [plugin_metadata_for_debputys_own_plugin()]
1704 unloadable_plugins = set()
1705 if required_plugins:
1706 plugins.extend(
1707 find_json_plugins(
1708 plugin_search_dirs,
1709 required_plugins,
1710 )
1711 )
1712 if requested_plugins_only is not None:
1713 plugins.extend(
1714 find_json_plugins(
1715 plugin_search_dirs,
1716 requested_plugins_only,
1717 )
1718 )
1719 else:
1720 auto_loaded = _find_all_json_plugins(
1721 plugin_search_dirs,
1722 required_plugins if required_plugins is not None else frozenset(),
1723 debug_mode=debug_mode,
1724 )
1725 for plugin_metadata in auto_loaded:
1726 plugins.append(plugin_metadata)
1727 unloadable_plugins.add(plugin_metadata.plugin_name)
1729 for plugin_metadata in plugins:
1730 api = DebputyPluginInitializerProvider(
1731 plugin_metadata, plugin_feature_set, substitution
1732 )
1733 try:
1734 api.load_plugin()
1735 except PluginBaseError as e:
1736 if plugin_metadata.plugin_name not in unloadable_plugins:
1737 raise
1738 if debug_mode:
1739 _warn(
1740 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due"
1741 f" to --debug/-d or DEBPUTY_DEBUG=1"
1742 )
1743 raise
1744 try:
1745 api.unload_plugin()
1746 except Exception:
1747 _warn(
1748 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to"
1749 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded"
1750 " module might have tainted the feature set."
1751 )
1752 raise e from None
1753 else:
1754 _warn(
1755 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was"
1756 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace"
1757 f" (the warning will become an error)"
1758 )
1760 return plugin_feature_set
1763def find_json_plugin(
1764 search_dirs: Sequence[str],
1765 requested_plugin: str,
1766) -> DebputyPluginMetadata:
1767 r = list(find_json_plugins(search_dirs, [requested_plugin]))
1768 assert len(r) == 1
1769 return r[0]
1772def find_related_implementation_files_for_plugin(
1773 plugin_metadata: DebputyPluginMetadata,
1774) -> list[str]:
1775 if plugin_metadata.is_bundled:
1776 plugin_name = plugin_metadata.plugin_name
1777 _error(
1778 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled"
1779 " or loaded via a mechanism that does not support detecting its tests."
1780 )
1782 if plugin_metadata.is_from_python_path:
1783 plugin_name = plugin_metadata.plugin_name
1784 # Maybe they could be, but that is for another day.
1785 _error(
1786 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path"
1787 " and these are not supported."
1788 )
1789 files = []
1790 module_name, module_file = _find_plugin_implementation_file(
1791 plugin_metadata.plugin_name,
1792 plugin_metadata.plugin_path,
1793 )
1794 if os.path.isfile(module_file):
1795 files.append(module_file)
1796 else:
1797 if not plugin_metadata.is_loaded:
1798 plugin_metadata.load_plugin()
1799 if module_name in sys.modules:
1800 _error(
1801 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its'
1802 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be "
1803 f" installed via this method. The related Python would not be installed"
1804 f" (which would result in a plugin that would fail to load)"
1805 )
1807 return files
1810def find_tests_for_plugin(
1811 plugin_metadata: DebputyPluginMetadata,
1812) -> list[str]:
1813 plugin_name = plugin_metadata.plugin_name
1814 plugin_path = plugin_metadata.plugin_path
1816 if plugin_metadata.is_bundled:
1817 _error(
1818 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a"
1819 " mechanism that does not support detecting its tests."
1820 )
1822 if plugin_metadata.is_from_python_path:
1823 plugin_name = plugin_metadata.plugin_name
1824 # Maybe they could be, but that is for another day.
1825 _error(
1826 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path"
1827 " and these are not supported."
1828 )
1830 plugin_dir = os.path.dirname(plugin_path)
1831 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_")
1832 tests = []
1833 with os.scandir(plugin_dir) as dir_iter:
1834 for p in dir_iter:
1835 if (
1836 p.is_file()
1837 and p.name.startswith(test_basename_prefix)
1838 and PLUGIN_TEST_SUFFIX.search(p.name)
1839 ):
1840 tests.append(p.path)
1841 return tests
1844def find_json_plugins(
1845 search_dirs: Sequence[str],
1846 requested_plugins: Iterable[str],
1847) -> Iterable[DebputyPluginMetadata]:
1848 for plugin_name_or_path in requested_plugins: 1848 ↛ exitline 1848 didn't return from function 'find_json_plugins' because the loop on line 1848 didn't complete
1849 if "/" in plugin_name_or_path: 1849 ↛ 1850line 1849 didn't jump to line 1850 because the condition on line 1849 was never true
1850 if not os.path.isfile(plugin_name_or_path):
1851 raise PluginNotFoundError(
1852 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file."
1853 ' (Because the plugin name contains "/", it is assumed to be a path and search path'
1854 " is not used."
1855 )
1856 yield parse_json_plugin_desc(plugin_name_or_path)
1857 return
1858 for search_dir in search_dirs: 1858 ↛ 1867line 1858 didn't jump to line 1867 because the loop on line 1858 didn't complete
1859 path = os.path.join(
1860 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json"
1861 )
1862 if not os.path.isfile(path): 1862 ↛ 1863line 1862 didn't jump to line 1863 because the condition on line 1862 was never true
1863 continue
1864 yield parse_json_plugin_desc(path)
1865 return
1867 path_root = PLUGIN_PYTHON_RES_PATH
1868 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json")
1869 if pp_path or pp_path.is_file():
1870 with pp_path.open() as fd:
1871 yield parse_json_plugin_desc(
1872 f"PYTHONPATH:debputy/plugins/{pp_path.name}",
1873 fd=fd,
1874 is_from_python_path=True,
1875 )
1876 return
1878 search_dir_str = ":".join(search_dirs)
1879 raise PluginNotFoundError(
1880 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the"
1881 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})"
1882 )
1885def _find_all_json_plugins(
1886 search_dirs: Sequence[str],
1887 required_plugins: AbstractSet[str],
1888 debug_mode: bool = False,
1889) -> Iterable[DebputyPluginMetadata]:
1890 seen = set(required_plugins)
1891 error_seen = False
1892 for search_dir in search_dirs:
1893 try:
1894 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins"))
1895 except FileNotFoundError:
1896 continue
1897 with dir_fd:
1898 for entry in dir_fd:
1899 if (
1900 not entry.is_file(follow_symlinks=True)
1901 or not entry.name.endswith(".json")
1902 or entry.name in seen
1903 ):
1904 continue
1905 seen.add(entry.name)
1906 try:
1907 plugin_metadata = parse_json_plugin_desc(entry.path)
1908 except PluginBaseError as e:
1909 if debug_mode:
1910 raise
1911 if not error_seen:
1912 error_seen = True
1913 _warn(
1914 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}"
1915 )
1916 else:
1917 _warn(
1918 f"Failed to load plugin in {entry.path} due to errors (not shown)."
1919 )
1920 else:
1921 yield plugin_metadata
1923 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir():
1924 if (
1925 not pp_entry.name.endswith(".json")
1926 or not pp_entry.is_file()
1927 or pp_entry.name in seen
1928 ):
1929 continue
1930 seen.add(pp_entry.name)
1931 with pp_entry.open() as fd:
1932 yield parse_json_plugin_desc(
1933 f"PYTHONPATH:debputy/plugins/{pp_entry.name}",
1934 fd=fd,
1935 is_from_python_path=True,
1936 )
1939def _find_plugin_implementation_file(
1940 plugin_name: str,
1941 json_file_path: str,
1942) -> tuple[str, str]:
1943 guessed_module_basename = plugin_name.replace("-", "_")
1944 module_name = f"debputy.plugins.{guessed_module_basename}"
1945 module_fs_path = os.path.join(
1946 os.path.dirname(json_file_path), f"{guessed_module_basename}.py"
1947 )
1948 return module_name, module_fs_path
1951def _resolve_module_initializer(
1952 plugin_name: str,
1953 plugin_initializer_name: str,
1954 module_name: str | None,
1955 json_file_path: str,
1956) -> PluginInitializationEntryPoint:
1957 module = None
1958 module_fs_path = None
1959 if module_name is None: 1959 ↛ 1987line 1959 didn't jump to line 1987 because the condition on line 1959 was always true
1960 module_name, module_fs_path = _find_plugin_implementation_file(
1961 plugin_name, json_file_path
1962 )
1963 if os.path.isfile(module_fs_path): 1963 ↛ 1987line 1963 didn't jump to line 1987 because the condition on line 1963 was always true
1964 spec = importlib.util.spec_from_file_location(module_name, module_fs_path)
1965 if spec is None: 1965 ↛ 1966line 1965 didn't jump to line 1966 because the condition on line 1965 was never true
1966 raise PluginInitializationError(
1967 f"Failed to load {plugin_name} (path: {module_fs_path})."
1968 " The spec_from_file_location function returned None."
1969 )
1970 mod = importlib.util.module_from_spec(spec)
1971 loader = spec.loader
1972 if loader is None: 1972 ↛ 1973line 1972 didn't jump to line 1973 because the condition on line 1972 was never true
1973 raise PluginInitializationError(
1974 f"Failed to load {plugin_name} (path: {module_fs_path})."
1975 " Python could not find a suitable loader (spec.loader was None)"
1976 )
1977 sys.modules[module_name] = mod
1978 try:
1979 run_in_context_of_plugin(plugin_name, loader.exec_module, mod)
1980 except (Exception, GeneratorExit) as e:
1981 raise PluginInitializationError(
1982 f"Failed to load {plugin_name} (path: {module_fs_path})."
1983 " The module threw an exception while being loaded."
1984 ) from e
1985 module = mod
1987 if module is None: 1987 ↛ 1988line 1987 didn't jump to line 1988 because the condition on line 1987 was never true
1988 try:
1989 module = run_in_context_of_plugin(
1990 plugin_name, importlib.import_module, module_name
1991 )
1992 except ModuleNotFoundError as e:
1993 if module_fs_path is None:
1994 raise PluginMetadataError(
1995 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but'
1996 " this module is not available in the python search path"
1997 ) from e
1998 raise PluginInitializationError(
1999 f"Failed to load {plugin_name}. Tried loading it from"
2000 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as'
2001 f" {module_name} (where it was not found either). Please ensure"
2002 " the module code is installed in the correct spot or provide an"
2003 f' explicit "module" definition in {json_file_path}.'
2004 ) from e
2006 plugin_initializer = run_in_context_of_plugin_wrap_errors(
2007 plugin_name,
2008 getattr,
2009 module,
2010 plugin_initializer_name,
2011 None,
2012 )
2014 if plugin_initializer is None: 2014 ↛ 2015line 2014 didn't jump to line 2015 because the condition on line 2014 was never true
2015 raise PluginMetadataError(
2016 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
2017 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute'
2018 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name"
2019 " in the Python module."
2020 )
2021 if isinstance(plugin_initializer, DebputyPluginDefinition):
2022 return plugin_initializer.initialize
2023 if not callable(plugin_initializer): 2023 ↛ 2024line 2023 didn't jump to line 2024 because the condition on line 2023 was never true
2024 raise PluginMetadataError(
2025 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
2026 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that'
2027 " attribute exists, it is neither a `DebputyPluginDefinition`"
2028 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`"
2029 " (`def initialize(api: DebputyPluginInitializer) -> None:`)."
2030 )
2031 return cast("PluginInitializationEntryPoint", plugin_initializer)
2034def _json_plugin_loader(
2035 plugin_name: str,
2036 plugin_json_metadata: PluginJsonMetadata,
2037 json_file_path: str,
2038 attribute_path: AttributePath,
2039) -> Callable[["DebputyPluginInitializer"], None]:
2040 api_compat = plugin_json_metadata["api_compat_version"]
2041 module_name = plugin_json_metadata.get("module")
2042 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer")
2043 packager_provided_files_raw = plugin_json_metadata.get(
2044 "packager_provided_files", []
2045 )
2046 manifest_variables_raw = plugin_json_metadata.get("manifest_variables")
2047 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files")
2048 if api_compat != 1: 2048 ↛ 2049line 2048 didn't jump to line 2049 because the condition on line 2048 was never true
2049 raise PluginMetadataError(
2050 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this'
2051 f" version of debputy only supports API compat version of 1"
2052 )
2053 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2053 ↛ 2054line 2053 didn't jump to line 2054 because the condition on line 2053 was never true
2054 p = attribute_path["plugin_initializer"]
2055 raise PluginMetadataError(
2056 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".'
2057 )
2059 plugin_initializers = []
2061 if plugin_initializer_name is not None:
2062 plugin_initializer = _resolve_module_initializer(
2063 plugin_name,
2064 plugin_initializer_name,
2065 module_name,
2066 json_file_path,
2067 )
2068 plugin_initializers.append(plugin_initializer)
2070 if known_packaging_files_raw:
2071 kpf_root_path = attribute_path["known_packaging_files"]
2072 known_packaging_files = []
2073 for k, v in enumerate(known_packaging_files_raw):
2074 kpf_path = kpf_root_path[k]
2075 p = v.get("path")
2076 if isinstance(p, str): 2076 ↛ 2078line 2076 didn't jump to line 2078 because the condition on line 2076 was always true
2077 kpf_path.path_hint = p
2078 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2078 ↛ 2090line 2078 didn't jump to line 2090 because the condition on line 2078 was always true
2079 docs = v.get("documentation-uris")
2080 if docs is not None and isinstance(docs, list):
2081 docs = [
2082 (
2083 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR)
2084 if isinstance(d, str)
2085 else d
2086 )
2087 for d in docs
2088 ]
2089 v["documentation-uris"] = docs
2090 known_packaging_file: KnownPackagingFileInfo = (
2091 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input(
2092 v,
2093 kpf_path,
2094 )
2095 )
2096 known_packaging_files.append((kpf_path, known_packaging_file))
2098 def _initialize_json_provided_known_packaging_files(
2099 api: DebputyPluginInitializerProvider,
2100 ) -> None:
2101 for p, details in known_packaging_files:
2102 try:
2103 api.known_packaging_files(details)
2104 except ValueError as ex:
2105 raise PluginMetadataError(
2106 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}"
2107 )
2109 plugin_initializers.append(_initialize_json_provided_known_packaging_files)
2111 if manifest_variables_raw:
2112 manifest_var_path = attribute_path["manifest_variables"]
2113 manifest_variables = [
2114 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i])
2115 for i, p in enumerate(manifest_variables_raw)
2116 ]
2118 def _initialize_json_provided_manifest_vars(
2119 api: DebputyPluginInitializer,
2120 ) -> None:
2121 for idx, manifest_variable in enumerate(manifest_variables):
2122 name = manifest_variable["name"]
2123 value = manifest_variable["value"]
2124 doc = manifest_variable.get("reference_documentation")
2125 try:
2126 api.manifest_variable(
2127 name, value, variable_reference_documentation=doc
2128 )
2129 except ValueError as ex:
2130 var_path = manifest_var_path[idx]
2131 raise PluginMetadataError(
2132 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}"
2133 )
2135 plugin_initializers.append(_initialize_json_provided_manifest_vars)
2137 if packager_provided_files_raw:
2138 ppf_path = attribute_path["packager_provided_files"]
2139 ppfs = [
2140 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i])
2141 for i, p in enumerate(packager_provided_files_raw)
2142 ]
2144 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None:
2145 ppf: PackagerProvidedFileJsonDescription
2146 for idx, ppf in enumerate(ppfs):
2147 c = dict(ppf)
2148 stem = ppf["stem"]
2149 installed_path = ppf["installed_path"]
2150 default_mode = ppf.get("default_mode")
2151 ref_doc_dict = ppf.get("reference_documentation")
2152 if default_mode is not None: 2152 ↛ 2155line 2152 didn't jump to line 2155 because the condition on line 2152 was always true
2153 c["default_mode"] = default_mode.octal_mode
2155 if ref_doc_dict is not None: 2155 ↛ 2160line 2155 didn't jump to line 2160 because the condition on line 2155 was always true
2156 ref_doc = packager_provided_file_reference_documentation(
2157 **ref_doc_dict
2158 )
2159 else:
2160 ref_doc = None
2162 for k in [
2163 "stem",
2164 "installed_path",
2165 "reference_documentation",
2166 ]:
2167 try:
2168 del c[k]
2169 except KeyError:
2170 pass
2172 try:
2173 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore
2174 except ValueError as ex:
2175 p_path = ppf_path[idx]
2176 raise PluginMetadataError(
2177 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}"
2178 )
2180 plugin_initializers.append(_initialize_json_provided_ppfs)
2182 if not plugin_initializers: 2182 ↛ 2183line 2182 didn't jump to line 2183 because the condition on line 2182 was never true
2183 raise PluginMetadataError(
2184 f"The plugin defined in {json_file_path} does not seem to provide features,"
2185 f" such as module + plugin-initializer or packager-provided-files."
2186 )
2188 if len(plugin_initializers) == 1:
2189 return plugin_initializers[0]
2191 def _chain_loader(api: DebputyPluginInitializer) -> None:
2192 for initializer in plugin_initializers:
2193 initializer(api)
2195 return _chain_loader
2198@overload
2199@contextlib.contextmanager
2200def _open( 2200 ↛ exitline 2200 didn't return from function '_open' because
2201 path: str,
2202 fd: IO[AnyStr] | IOBase = ...,
2203) -> Iterator[IO[AnyStr] | IOBase]: ...
2206@overload
2207@contextlib.contextmanager
2208def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2208 ↛ exitline 2208 didn't return from function '_open' because
2211@contextlib.contextmanager
2212def _open(
2213 path: str, fd: IO[AnyStr] | IOBase | None = None
2214) -> Iterator[IO[AnyStr] | IOBase]:
2215 if fd is not None:
2216 yield fd
2217 else:
2218 with open(path, "rb") as fd:
2219 yield fd
2222def _resolve_json_plugin_docs_path(
2223 plugin_name: str,
2224 plugin_path: str,
2225) -> Traversable | Path | None:
2226 plugin_dir = os.path.dirname(plugin_path)
2227 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml"))
2230def parse_json_plugin_desc(
2231 path: str,
2232 *,
2233 fd: IO[AnyStr] | IOBase | None = None,
2234 is_from_python_path: bool = False,
2235) -> DebputyPluginMetadata:
2236 with _open(path, fd=fd) as rfd:
2237 try:
2238 raw = json.load(rfd)
2239 except JSONDecodeError as e:
2240 raise PluginMetadataError(
2241 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}'
2242 ) from e
2243 plugin_name = os.path.basename(path)
2244 if plugin_name.endswith(".json"):
2245 plugin_name = plugin_name[:-5]
2246 elif plugin_name.endswith(".json.in"):
2247 plugin_name = plugin_name[:-8]
2249 if plugin_name == "debputy": 2249 ↛ 2251line 2249 didn't jump to line 2251 because the condition on line 2249 was never true
2250 # Provide a better error message than "The plugin has already loaded!?"
2251 raise PluginMetadataError(
2252 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not'
2253 f" clash with the bundled plugin of same name."
2254 )
2256 attribute_path = AttributePath.root_path(raw)
2258 try:
2259 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input(
2260 raw,
2261 attribute_path,
2262 )
2263 except ManifestParseException as e:
2264 raise PluginMetadataError(
2265 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}'
2266 ) from e
2267 api_compat = plugin_json_metadata["api_compat_version"]
2269 return DebputyPluginMetadata(
2270 plugin_name=plugin_name,
2271 plugin_loader=lambda: _json_plugin_loader(
2272 plugin_name,
2273 plugin_json_metadata,
2274 path,
2275 attribute_path,
2276 ),
2277 api_compat_version=api_compat,
2278 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path(
2279 plugin_name, path
2280 ),
2281 plugin_initializer=None,
2282 plugin_path=path,
2283 is_from_python_path=is_from_python_path,
2284 )
2287@dataclasses.dataclass(slots=True, frozen=True)
2288class ServiceDefinitionImpl(ServiceDefinition[DSD]):
2289 name: str
2290 names: Sequence[str]
2291 path: VirtualPath
2292 type_of_service: str
2293 service_scope: str
2294 auto_enable_on_install: bool
2295 auto_start_on_install: bool
2296 on_upgrade: ServiceUpgradeRule
2297 definition_source: str
2298 is_plugin_provided_definition: bool
2299 service_context: DSD | None
2301 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]":
2302 return dataclasses.replace(self, **changes)
2305class ServiceRegistryImpl(ServiceRegistry[DSD]):
2306 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services")
2308 def __init__(self, service_manager_details: ServiceManagerDetails) -> None:
2309 self._service_manager_details = service_manager_details
2310 self._service_definitions: list[ServiceDefinition[DSD]] = []
2311 self._seen_services: set[tuple[str, str, str]] = set()
2313 @property
2314 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]:
2315 return self._service_definitions
2317 def register_service(
2318 self,
2319 path: VirtualPath,
2320 name: str | list[str],
2321 *,
2322 type_of_service: str = "service", # "timer", etc.
2323 service_scope: str = "system",
2324 enable_by_default: bool = True,
2325 start_by_default: bool = True,
2326 default_upgrade_rule: ServiceUpgradeRule = "restart",
2327 service_context: DSD | None = None,
2328 ) -> None:
2329 names = name if isinstance(name, list) else [name]
2330 if len(names) < 1:
2331 raise ValueError(
2332 f"The service must have at least one name - {path.absolute} did not have any"
2333 )
2334 for n in names:
2335 key = (n, type_of_service, service_scope)
2336 if key in self._seen_services:
2337 raise PluginAPIViolationError(
2338 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used"
2339 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not"
2340 " allowed by the debputy plugin API."
2341 )
2342 # TODO: We cannot create a service definition immediate once the manifest is involved
2343 self._service_definitions.append(
2344 ServiceDefinitionImpl(
2345 names[0],
2346 names,
2347 path,
2348 type_of_service,
2349 service_scope,
2350 enable_by_default,
2351 start_by_default,
2352 default_upgrade_rule,
2353 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}",
2354 True,
2355 service_context,
2356 )
2357 )