Coverage for src/debputy/plugin/api/impl.py: 60%
908 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-06-06 10:55 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-06-06 10:55 +0000
1import contextlib
2import dataclasses
3import functools
4import importlib
5import importlib.resources
6import importlib.util
7import inspect
8import itertools
9import json
10import os
11import re
12import subprocess
13import sys
14from abc import ABC
15from collections.abc import Callable, Iterable, Sequence, Iterator, Mapping, Container
16from importlib.resources.abc import Traversable
17from io import IOBase
18from json import JSONDecodeError
19from pathlib import Path
20from types import NoneType
21from typing import (
22 IO,
23 AbstractSet,
24 cast,
25 Any,
26 Literal,
27 TYPE_CHECKING,
28 is_typeddict,
29 AnyStr,
30 overload,
31)
33import debputy
34from debputy.exceptions import (
35 DebputySubstitutionError,
36 PluginConflictError,
37 PluginMetadataError,
38 PluginBaseError,
39 PluginInitializationError,
40 PluginAPIViolationError,
41 PluginNotFoundError,
42 PluginIncorrectRegistrationError,
43)
44from debputy.maintscript_snippet import (
45 STD_CONTROL_SCRIPTS,
46 MaintscriptSnippetContainer,
47 MaintscriptSnippet,
48 SnippetResolver,
49)
50from debputy.manifest_parser.exceptions import ManifestParseException
51from debputy.manifest_parser.parser_data import ParserContextData
52from debputy.manifest_parser.tagging_types import TypeMapping
53from debputy.manifest_parser.util import AttributePath
54from debputy.plugin.api.doc_parsing import (
55 DEBPUTY_DOC_REFERENCE_DATA_PARSER,
56 parser_type_name,
57 DebputyParsedDoc,
58)
59from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
60from debputy.plugin.api.impl_types import (
61 DebputyPluginMetadata,
62 PackagerProvidedFileClassSpec,
63 MetadataOrMaintscriptDetector,
64 PluginProvidedTrigger,
65 TTP,
66 DIPHandler,
67 PF,
68 SF,
69 DIPKWHandler,
70 PluginProvidedManifestVariable,
71 PluginProvidedPackageProcessor,
72 PluginProvidedDiscardRule,
73 AutomaticDiscardRuleExample,
74 PPFFormatParam,
75 ServiceManagerDetails,
76 KnownPackagingFileInfo,
77 PluginProvidedKnownPackagingFile,
78 DHCompatibilityBasedRule,
79 PluginProvidedTypeMapping,
80 PluginProvidedBuildSystemAutoDetection,
81 BSR,
82 TP,
83)
84from debputy.plugin.api.plugin_parser import (
85 PLUGIN_METADATA_PARSER,
86 PluginJsonMetadata,
87 PLUGIN_PPF_PARSER,
88 PackagerProvidedFileJsonDescription,
89 PLUGIN_MANIFEST_VARS_PARSER,
90 PLUGIN_KNOWN_PACKAGING_FILES_PARSER,
91)
92from debputy.plugin.api.spec import (
93 MaintscriptAccessor,
94 Maintscript,
95 DpkgTriggerType,
96 BinaryCtrlAccessor,
97 PackageProcessingContext,
98 MetadataAutoDetector,
99 PluginInitializationEntryPoint,
100 DebputyPluginInitializer,
101 FlushableSubstvars,
102 ParserDocumentation,
103 PackageProcessor,
104 VirtualPath,
105 ServiceIntegrator,
106 ServiceDetector,
107 ServiceRegistry,
108 ServiceDefinition,
109 DSD,
110 ServiceUpgradeRule,
111 PackagerProvidedFileReferenceDocumentation,
112 packager_provided_file_reference_documentation,
113 TypeMappingDocumentation,
114 DebputyIntegrationMode,
115 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
116 BuildSystemManifestRuleMetadata,
117 INTEGRATION_MODE_FULL,
118 only_integrations,
119 DebputyPluginDefinition,
120)
121from debputy.plugin.api.std_docs import _STD_ATTR_DOCS
122from debputy.plugin.plugin_state import (
123 run_in_context_of_plugin,
124 run_in_context_of_plugin_wrap_errors,
125 wrap_plugin_code,
126 register_manifest_type_value_in_context,
127)
128from debputy.plugins.debputy.to_be_api_types import (
129 BuildRuleParsedFormat,
130 BSPF,
131 debputy_build_system,
132)
133from debputy.substitution import (
134 Substitution,
135 VariableNameState,
136 SUBST_VAR_RE,
137 VariableContext,
138)
139from debputy.util import (
140 _normalize_path,
141 POSTINST_DEFAULT_CONDITION,
142 _error,
143 print_command,
144 _warn,
145 _debug_log,
146 PackageTypeSelector,
147)
148from debputy.version import debputy_doc_root_dir
149from debputy.yaml import MANIFEST_YAML
151if TYPE_CHECKING:
152 from debputy.highlevel_manifest import HighLevelManifest
154PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$")
155PLUGIN_PYTHON_RES_PATH = importlib.resources.files(debputy.plugins.__name__)
158def _validate_known_packaging_file_dh_compat_rules(
159 dh_compat_rules: list[DHCompatibilityBasedRule] | None,
160) -> None:
161 max_compat = None
162 if not dh_compat_rules: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true
163 return
164 dh_compat_rule: DHCompatibilityBasedRule
165 for idx, dh_compat_rule in enumerate(dh_compat_rules):
166 dh_version = dh_compat_rule.get("starting_with_debhelper_version")
167 compat = dh_compat_rule.get("starting_with_compat_level")
169 remaining = dh_compat_rule.keys() - {
170 "after_debhelper_version",
171 "starting_with_compat_level",
172 }
173 if not remaining:
174 raise ValueError(
175 f"The dh compat-rule at index {idx} does not affect anything / not have any rules!? So why have it?"
176 )
177 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1:
178 raise ValueError(
179 f"The dh compat-rule at index {idx} is not the last and is missing either"
180 " before-debhelper-version or before-compat-level"
181 )
182 if compat is not None and compat < 0:
183 raise ValueError(
184 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule"
185 f" for something that appeared when migrating from {compat} to {compat + 1}."
186 )
188 if max_compat is None:
189 max_compat = compat
190 elif compat is not None:
191 if compat >= max_compat:
192 raise ValueError(
193 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}."
194 )
195 max_compat = compat
197 install_pattern = dh_compat_rule.get("install_pattern")
198 if (
199 install_pattern is not None
200 and _normalize_path(install_pattern, with_prefix=False) != install_pattern
201 ):
202 raise ValueError(
203 f"The install-pattern in dh compat-rule at {idx} must be normalized as"
204 f' "{_normalize_path(install_pattern, with_prefix=False)}".'
205 )
208class DebputyPluginInitializerProvider(DebputyPluginInitializer):
209 __slots__ = (
210 "_plugin_metadata",
211 "_feature_set",
212 "_plugin_detector_ids",
213 "_substitution",
214 "_unloaders",
215 "_is_doc_cache_resolved",
216 "_doc_cache",
217 "_registered_manifest_types",
218 "_load_started",
219 )
221 def __init__(
222 self,
223 plugin_metadata: DebputyPluginMetadata,
224 feature_set: PluginProvidedFeatureSet,
225 substitution: Substitution,
226 ) -> None:
227 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata
228 self._feature_set = feature_set
229 self._plugin_detector_ids: set[str] = set()
230 self._substitution = substitution
231 self._unloaders: list[Callable[[], None]] = []
232 self._is_doc_cache_resolved: bool = False
233 self._doc_cache: DebputyParsedDoc | None = None
234 self._registered_manifest_types: dict[type[Any], DebputyPluginMetadata] = {}
235 self._load_started = False
237 @property
238 def plugin_metadata(self) -> DebputyPluginMetadata:
239 return self._plugin_metadata
241 def unload_plugin(self) -> None:
242 if self._load_started:
243 for unloader in self._unloaders:
244 unloader()
245 del self._feature_set.plugin_data[self._plugin_name]
247 def load_plugin(self) -> None:
248 metadata = self._plugin_metadata
249 if metadata.plugin_name in self._feature_set.plugin_data: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 raise PluginConflictError(
251 f'The plugin "{metadata.plugin_name}" has already been loaded!?',
252 metadata,
253 metadata,
254 )
255 assert (
256 metadata.api_compat_version == 1
257 ), f"Unsupported plugin API compat version {metadata.api_compat_version}"
258 self._feature_set.plugin_data[metadata.plugin_name] = metadata
259 self._load_started = True
260 assert not metadata.is_initialized
261 try:
262 metadata.initialize_plugin(self)
263 except Exception as e:
264 initializer = metadata.plugin_initializer
265 if ( 265 ↛ 270line 265 didn't jump to line 270 because the condition on line 265 was never true
266 isinstance(e, TypeError)
267 and initializer is not None
268 and not callable(initializer)
269 ):
270 raise PluginMetadataError(
271 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a"
272 f" callable (callable returns False). The specified entry point identifies"
273 f' itself as "{initializer.__qualname__}".'
274 ) from e
275 if isinstance(e, PluginBaseError): 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true
276 raise
277 raise PluginInitializationError(
278 f"Exception while attempting to load plugin {metadata.plugin_name}"
279 ) from e
281 def _resolve_docs(self) -> DebputyParsedDoc | None:
282 doc_cache = self._doc_cache
283 if doc_cache is not None:
284 return doc_cache
286 plugin_doc_path = self._plugin_metadata.plugin_doc_path
287 if plugin_doc_path is None or self._is_doc_cache_resolved:
288 self._is_doc_cache_resolved = True
289 return None
290 try:
291 with plugin_doc_path.open("r", encoding="utf-8") as fd:
292 raw = MANIFEST_YAML.load(fd)
293 except FileNotFoundError:
294 _debug_log(
295 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}"
296 )
297 self._is_doc_cache_resolved = True
298 return None
299 attr_path = AttributePath.root_path(plugin_doc_path)
300 try:
301 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path)
302 except ManifestParseException as e:
303 raise ValueError(
304 f"Could not parse documentation in {plugin_doc_path}: {e.message}"
305 ) from e
306 try:
307 res = DebputyParsedDoc.from_ref_data(ref)
308 except ValueError as e:
309 raise ValueError(
310 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}"
311 ) from e
313 self._doc_cache = res
314 self._is_doc_cache_resolved = True
315 return res
317 def _pluggable_manifest_docs_for(
318 self,
319 rule_type: TTP | str,
320 rule_name: str | list[str],
321 *,
322 inline_reference_documentation: ParserDocumentation | None = None,
323 ) -> ParserDocumentation | None:
324 ref_data = self._resolve_docs()
325 if ref_data is not None:
326 primary_rule_name = (
327 rule_name if isinstance(rule_name, str) else rule_name[0]
328 )
329 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}"
330 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref)
331 if resolved_docs is not None:
332 if inline_reference_documentation is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 raise ValueError(
334 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via"
335 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so"
336 f" there is only one doc reference"
337 )
338 return resolved_docs
339 return inline_reference_documentation
341 def packager_provided_file(
342 self,
343 stem: str,
344 installed_path: str,
345 *,
346 default_mode: int = 0o0644,
347 default_priority: int | None = None,
348 allow_name_segment: bool = True,
349 allow_architecture_segment: bool = False,
350 post_formatting_rewrite: Callable[[str], str] | None = None,
351 packageless_is_fallback_for_all_packages: bool = False,
352 package_types: PackageTypeSelector = PackageTypeSelector.ALL,
353 reservation_only: bool = False,
354 format_callback: None | (
355 Callable[[str, PPFFormatParam, VirtualPath], str]
356 ) = None,
357 reference_documentation: None | (
358 PackagerProvidedFileReferenceDocumentation
359 ) = None,
360 ) -> None:
361 packager_provided_files = self._feature_set.packager_provided_files
362 existing = packager_provided_files.get(stem)
364 if format_callback is not None and self._plugin_name != "debputy": 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 raise ValueError(
366 "Sorry; Using format_callback is a debputy-internal"
367 f" API. Triggered by plugin {self._plugin_name}"
368 )
370 if installed_path.endswith("/"): 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 raise ValueError(
372 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.'
373 f" Triggered by plugin {self._plugin_name}."
374 )
376 installed_path = _normalize_path(installed_path)
378 has_name_var = "{name}" in installed_path
380 if installed_path.startswith("./DEBIAN") or reservation_only:
381 # Special-case, used for control files.
382 if self._plugin_name != "debputy": 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 raise ValueError(
384 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal"
385 f" API. Triggered by plugin {self._plugin_name}"
386 )
387 elif not has_name_var and "{owning_package}" not in installed_path: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 raise ValueError(
389 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"'
390 " substitution (or have installed_path end with a slash). Otherwise, the installed"
391 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}"
392 )
394 if allow_name_segment and not has_name_var: 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true
395 raise ValueError(
396 'When allow_name_segment is True, the installed_path must have a "{name}" substitution'
397 " variable. Otherwise, the name segment will not work properly. Triggered by"
398 f" plugin {self._plugin_name}"
399 )
401 if ( 401 ↛ 406line 401 didn't jump to line 406 because the condition on line 401 was never true
402 default_priority is not None
403 and "{priority}" not in installed_path
404 and "{priority:02}" not in installed_path
405 ):
406 raise ValueError(
407 'When default_priority is not None, the installed_path should have a "{priority}"'
408 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.'
409 f" Triggered by plugin {self._plugin_name}"
410 )
412 if existing is not None:
413 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 413 ↛ 420line 413 didn't jump to line 420 because the condition on line 413 was always true
414 message = (
415 f'The stem "{stem}" is registered twice for packager provided files.'
416 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once"
417 f" by {self._plugin_name}"
418 )
419 else:
420 message = (
421 f"Bug in the plugin {self._plugin_name}: It tried to register the"
422 f' stem "{stem}" twice for packager provided files.'
423 )
424 raise PluginConflictError(
425 message, existing.debputy_plugin_metadata, self._plugin_metadata
426 )
427 packager_provided_files[stem] = PackagerProvidedFileClassSpec(
428 self._plugin_metadata,
429 stem,
430 installed_path,
431 default_mode=default_mode,
432 default_priority=default_priority,
433 allow_name_segment=allow_name_segment,
434 allow_architecture_segment=allow_architecture_segment,
435 post_formatting_rewrite=post_formatting_rewrite,
436 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages,
437 package_types=package_types,
438 reservation_only=reservation_only,
439 formatting_callback=format_callback,
440 reference_documentation=reference_documentation,
441 )
443 def _unload() -> None:
444 del packager_provided_files[stem]
446 self._unloaders.append(_unload)
448 def metadata_or_maintscript_detector(
449 self,
450 auto_detector_id: str,
451 auto_detector: MetadataAutoDetector,
452 *,
453 package_types: PackageTypeSelector = PackageTypeSelector.DEB,
454 ) -> None:
455 if auto_detector_id in self._plugin_detector_ids: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 raise ValueError(
457 f"The plugin {self._plugin_name} tried to register"
458 f' "{auto_detector_id}" twice'
459 )
460 self._plugin_detector_ids.add(auto_detector_id)
461 all_detectors = self._feature_set.metadata_maintscript_detectors
462 if self._plugin_name not in all_detectors:
463 all_detectors[self._plugin_name] = []
464 all_detectors[self._plugin_name].append(
465 MetadataOrMaintscriptDetector(
466 detector_id=auto_detector_id,
467 detector=wrap_plugin_code(self._plugin_name, auto_detector),
468 plugin_metadata=self._plugin_metadata,
469 applies_to_package_types=package_types,
470 enabled=True,
471 )
472 )
474 def _unload() -> None:
475 if self._plugin_name in all_detectors:
476 del all_detectors[self._plugin_name]
478 self._unloaders.append(_unload)
480 def document_builtin_variable(
481 self,
482 variable_name: str,
483 variable_reference_documentation: str,
484 *,
485 is_context_specific: bool = False,
486 is_for_special_case: bool = False,
487 ) -> None:
488 manifest_variables = self._feature_set.manifest_variables
489 self._restricted_api()
490 state = self._substitution.variable_state(variable_name)
491 if state == VariableNameState.UNDEFINED: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 raise ValueError(
493 f"The plugin {self._plugin_name} attempted to document built-in {variable_name},"
494 f" but it is not known to be a variable"
495 )
497 assert variable_name not in manifest_variables
499 manifest_variables[variable_name] = PluginProvidedManifestVariable(
500 self._plugin_metadata,
501 variable_name,
502 None,
503 is_context_specific_variable=is_context_specific,
504 variable_reference_documentation=variable_reference_documentation,
505 is_documentation_placeholder=True,
506 is_for_special_case=is_for_special_case,
507 )
509 def _unload() -> None:
510 del manifest_variables[variable_name]
512 self._unloaders.append(_unload)
514 def manifest_variable_provider(
515 self,
516 provider: Callable[[VariableContext], Mapping[str, str]],
517 variables: Sequence[str] | Mapping[str, str | None],
518 ) -> None:
519 self._restricted_api()
520 cached_provider = functools.lru_cache(None)(provider)
521 permitted_variables = frozenset(variables)
522 variables_iter: Iterable[tuple[str, str | None]]
523 if not isinstance(variables, Mapping): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 variables_iter = zip(variables, itertools.repeat(None))
525 else:
526 variables_iter = variables.items()
528 checked_vars = False
529 manifest_variables = self._feature_set.manifest_variables
530 plugin_name = self._plugin_name
532 def _value_resolver_generator(
533 variable_name: str,
534 ) -> Callable[[VariableContext], str]:
535 def _value_resolver(variable_context: VariableContext) -> str:
536 res = cached_provider(variable_context)
537 nonlocal checked_vars
538 if not checked_vars: 538 ↛ 549line 538 didn't jump to line 549 because the condition on line 538 was always true
539 if permitted_variables != res.keys(): 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 expected = ", ".join(sorted(permitted_variables))
541 actual = ", ".join(sorted(res))
542 raise PluginAPIViolationError(
543 f"The plugin {plugin_name} claimed to provide"
544 f" the following variables {expected},"
545 f" but when resolving the variables, the plugin provided"
546 f" {actual}. These two lists should have been the same."
547 )
548 checked_vars = False
549 return res[variable_name]
551 return _value_resolver
553 for varname, vardoc in variables_iter:
554 self._check_variable_name(varname)
555 manifest_variables[varname] = PluginProvidedManifestVariable(
556 self._plugin_metadata,
557 varname,
558 _value_resolver_generator(varname),
559 is_context_specific_variable=False,
560 variable_reference_documentation=vardoc,
561 )
563 def _unload() -> None:
564 raise PluginInitializationError(
565 "Cannot unload manifest_variable_provider (not implemented)"
566 )
568 self._unloaders.append(_unload)
570 def _check_variable_name(self, variable_name: str) -> None:
571 manifest_variables = self._feature_set.manifest_variables
572 existing = manifest_variables.get(variable_name)
574 if existing is not None:
575 if existing.plugin_metadata.plugin_name == self._plugin_name: 575 ↛ 581line 575 didn't jump to line 581 because the condition on line 575 was always true
576 message = (
577 f"Bug in the plugin {self._plugin_name}: It tried to register the"
578 f' manifest variable "{variable_name}" twice.'
579 )
580 else:
581 message = (
582 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
583 f" both tried to provide the manifest variable {variable_name}"
584 )
585 raise PluginConflictError(
586 message, existing.plugin_metadata, self._plugin_metadata
587 )
588 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"):
589 raise ValueError(
590 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
591 f" which is not a valid variable name"
592 )
594 namespace = ""
595 variable_basename = variable_name
596 if ":" in variable_name:
597 namespace, variable_basename = variable_name.rsplit(":", 1)
598 assert namespace != ""
599 assert variable_name != ""
601 if namespace != "" and namespace not in ("token", "path"):
602 raise ValueError(
603 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
604 f" which is in the reserved namespace {namespace}"
605 )
607 variable_name_upper = variable_name.upper()
608 if (
609 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY"))
610 or variable_basename.startswith("_")
611 or variable_basename.upper().startswith("DEBPUTY")
612 ) and self._plugin_name != "debputy":
613 raise ValueError(
614 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
615 f" which is a variable name reserved by debputy"
616 )
618 state = self._substitution.variable_state(variable_name)
619 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy":
620 raise ValueError(
621 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
622 f" which would shadow a built-in variable"
623 )
625 def package_processor(
626 self,
627 processor_id: str,
628 processor: PackageProcessor,
629 *,
630 depends_on_processor: Iterable[str] = (),
631 package_types: PackageTypeSelector = PackageTypeSelector.DEB,
632 ) -> None:
633 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"})
634 package_processors = self._feature_set.all_package_processors
635 dependencies = set()
636 processor_key = (self._plugin_name, processor_id)
638 if processor_key in package_processors: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 raise PluginConflictError(
640 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}",
641 self._plugin_metadata,
642 self._plugin_metadata,
643 )
645 for depends_ref in depends_on_processor:
646 if isinstance(depends_ref, str): 646 ↛ 660line 646 didn't jump to line 660 because the condition on line 646 was always true
647 if (self._plugin_name, depends_ref) in package_processors: 647 ↛ 649line 647 didn't jump to line 649 because the condition on line 647 was always true
648 depends_key = (self._plugin_name, depends_ref)
649 elif ("debputy", depends_ref) in package_processors:
650 depends_key = ("debputy", depends_ref)
651 else:
652 raise ValueError(
653 f'Could not resolve dependency "{depends_ref}" for'
654 f' "{processor_id}". It was not provided by the plugin itself'
655 f" ({self._plugin_name}) nor debputy."
656 )
657 else:
658 # TODO: Add proper dependencies first, at which point we should probably resolve "name"
659 # via the direct dependencies.
660 assert False
662 existing_processor = package_processors.get(depends_key)
663 if existing_processor is None: 663 ↛ 666line 663 didn't jump to line 666 because the condition on line 663 was never true
664 # We currently require the processor to be declared already. If this ever changes,
665 # PluginProvidedFeatureSet.package_processors_in_order will need an update
666 dplugin_name, dprocessor_name = depends_key
667 available_processors = ", ".join(
668 n for p, n in package_processors.keys() if p == dplugin_name
669 )
670 raise ValueError(
671 f"The plugin {dplugin_name} does not provide a processor called"
672 f" {dprocessor_name}. Available processors for that plugin are:"
673 f" {available_processors}"
674 )
675 dependencies.add(depends_key)
677 package_processors[processor_key] = PluginProvidedPackageProcessor(
678 processor_id,
679 package_types,
680 wrap_plugin_code(self._plugin_name, processor),
681 frozenset(dependencies),
682 self._plugin_metadata,
683 )
685 def _unload() -> None:
686 del package_processors[processor_key]
688 self._unloaders.append(_unload)
690 def automatic_discard_rule(
691 self,
692 name: str,
693 should_discard: Callable[[VirtualPath], bool],
694 *,
695 rule_reference_documentation: str | None = None,
696 examples: (
697 AutomaticDiscardRuleExample | Sequence[AutomaticDiscardRuleExample]
698 ) = (),
699 ) -> None:
700 """Register an automatic discard rule
702 An automatic discard rule is basically applied to *every* path about to be installed in to any package.
703 If any discard rule concludes that a path should not be installed, then the path is not installed.
704 In the case where the discard path is a:
706 * directory: Then the entire directory is excluded along with anything beneath it.
707 * symlink: Then the symlink itself (but not its target) is excluded.
708 * hardlink: Then the current hardlink will not be installed, but other instances of it will be.
710 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file.
712 Automatic discard rules should be written with the assumption that directories will be tested
713 before their content *when it is relevant* for the discard rule to examine whether the directory
714 can be excluded.
716 The packager can via the manifest overrule automatic discard rules by explicitly listing the path
717 without any globs. As example:
719 installations:
720 - install:
721 sources:
722 - usr/lib/libfoo.la # <-- This path is always installed
723 # (Discard rules are never asked in this case)
724 #
725 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches
726 # Though, they will not examine `libfoo.la` as it has already been installed
727 #
728 # Note: usr/lib itself is never tested in this case (it is assumed to be
729 # explicitly requested). But any subdir of usr/lib will be examined.
731 When an automatic discard rule is evaluated, it can see the source path currently being considered
732 for installation. While it can look at "surrounding" context (like parent directory), it will not
733 know whether those paths are to be installed or will be installed.
735 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell
736 metacharacters and spaces.
737 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive
738 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the
739 path is discarded. If it returns `False`, the path is not discarded (by this rule at least).
740 A source path will either be from the root of the source tree or the root of a search directory such as
741 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is
742 evaluated.
743 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user
744 looks up this automatic discard rule.
745 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to
746 generate the examples.
748 """
749 self._restricted_api()
750 auto_discard_rules = self._feature_set.auto_discard_rules
751 existing = auto_discard_rules.get(name)
752 if existing is not None: 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true
753 if existing.plugin_metadata.plugin_name == self._plugin_name:
754 message = (
755 f"Bug in the plugin {self._plugin_name}: It tried to register the"
756 f' automatic discard rule "{name}" twice.'
757 )
758 else:
759 message = (
760 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
761 f" both tried to provide the automatic discard rule {name}"
762 )
763 raise PluginConflictError(
764 message, existing.plugin_metadata, self._plugin_metadata
765 )
766 examples = (
767 (examples,)
768 if isinstance(examples, AutomaticDiscardRuleExample)
769 else tuple(examples)
770 )
771 auto_discard_rules[name] = PluginProvidedDiscardRule(
772 name,
773 self._plugin_metadata,
774 should_discard,
775 rule_reference_documentation,
776 examples,
777 )
779 def _unload() -> None:
780 del auto_discard_rules[name]
782 self._unloaders.append(_unload)
784 def service_provider(
785 self,
786 service_manager: str,
787 detector: ServiceDetector,
788 integrator: ServiceIntegrator,
789 ) -> None:
790 self._restricted_api()
791 service_managers = self._feature_set.service_managers
792 existing = service_managers.get(service_manager)
793 if existing is not None: 793 ↛ 794line 793 didn't jump to line 794 because the condition on line 793 was never true
794 if existing.plugin_metadata.plugin_name == self._plugin_name:
795 message = (
796 f"Bug in the plugin {self._plugin_name}: It tried to register the"
797 f' service manager "{service_manager}" twice.'
798 )
799 else:
800 message = (
801 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
802 f' both tried to provide the service manager "{service_manager}"'
803 )
804 raise PluginConflictError(
805 message, existing.plugin_metadata, self._plugin_metadata
806 )
807 service_managers[service_manager] = ServiceManagerDetails(
808 service_manager,
809 wrap_plugin_code(self._plugin_name, detector),
810 wrap_plugin_code(self._plugin_name, integrator),
811 self._plugin_metadata,
812 )
814 def _unload() -> None:
815 del service_managers[service_manager]
817 self._unloaders.append(_unload)
819 def manifest_variable(
820 self,
821 variable_name: str,
822 value: str,
823 *,
824 variable_reference_documentation: str | None = None,
825 ) -> None:
826 self._check_variable_name(variable_name)
827 manifest_variables = self._feature_set.manifest_variables
828 try:
829 resolved_value = self._substitution.substitute(
830 value, "Plugin initialization"
831 )
832 depends_on_variable = resolved_value != value
833 except DebputySubstitutionError:
834 depends_on_variable = True
835 if depends_on_variable:
836 raise ValueError(
837 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}."
838 f" This value depends on another variable, which is not supported. This restriction may be"
839 f" lifted in the future."
840 )
842 manifest_variables[variable_name] = PluginProvidedManifestVariable(
843 self._plugin_metadata,
844 variable_name,
845 value,
846 is_context_specific_variable=False,
847 variable_reference_documentation=variable_reference_documentation,
848 )
850 def _unload() -> None:
851 # We need to check it was never resolved
852 raise PluginInitializationError(
853 "Cannot unload manifest_variable (not implemented)"
854 )
856 self._unloaders.append(_unload)
858 @property
859 def _plugin_name(self) -> str:
860 return self._plugin_metadata.plugin_name
862 def provide_manifest_keyword(
863 self,
864 rule_type: TTP,
865 rule_name: str | list[str],
866 handler: DIPKWHandler,
867 *,
868 inline_reference_documentation: ParserDocumentation | None = None,
869 ) -> None:
870 self._restricted_api()
871 parser_generator = self._feature_set.manifest_parser_generator
872 if rule_type not in parser_generator.dispatchable_table_parsers: 872 ↛ 873line 872 didn't jump to line 873 because the condition on line 872 was never true
873 types = ", ".join(
874 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers)
875 )
876 raise ValueError(
877 f"The rule_type was not a supported type. It must be one of {types}"
878 )
880 inline_reference_documentation = self._pluggable_manifest_docs_for(
881 rule_type,
882 rule_name,
883 inline_reference_documentation=inline_reference_documentation,
884 )
886 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
887 dispatching_parser.register_keyword(
888 rule_name,
889 wrap_plugin_code(self._plugin_name, handler),
890 self._plugin_metadata,
891 inline_reference_documentation=inline_reference_documentation,
892 )
894 def _unload() -> None:
895 raise PluginInitializationError(
896 "Cannot unload provide_manifest_keyword (not implemented)"
897 )
899 self._unloaders.append(_unload)
901 def pluggable_object_parser(
902 self,
903 rule_type: str,
904 rule_name: str,
905 *,
906 object_parser_key: str | None = None,
907 on_end_parse_step: None | (
908 Callable[
909 [str, Mapping[str, Any] | None, AttributePath, ParserContextData],
910 None,
911 ]
912 ) = None,
913 nested_in_package_context: bool = False,
914 ) -> None:
915 self._restricted_api()
916 if object_parser_key is None: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true
917 object_parser_key = rule_name
919 parser_generator = self._feature_set.manifest_parser_generator
920 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
921 if rule_type not in dispatchable_object_parsers: 921 ↛ 922line 921 didn't jump to line 922 because the condition on line 921 was never true
922 types = ", ".join(sorted(dispatchable_object_parsers))
923 raise ValueError(
924 f"The rule_type was not a supported type. It must be one of {types}"
925 )
926 if object_parser_key not in dispatchable_object_parsers: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 types = ", ".join(sorted(dispatchable_object_parsers))
928 raise ValueError(
929 f"The object_parser_key was not a supported type. It must be one of {types}"
930 )
931 parent_dispatcher = dispatchable_object_parsers[rule_type]
932 child_dispatcher = dispatchable_object_parsers[object_parser_key]
934 if on_end_parse_step is not None: 934 ↛ 937line 934 didn't jump to line 937 because the condition on line 934 was always true
935 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step)
937 parent_dispatcher.register_child_parser(
938 rule_name,
939 child_dispatcher,
940 self._plugin_metadata,
941 on_end_parse_step=on_end_parse_step,
942 nested_in_package_context=nested_in_package_context,
943 )
945 def _unload() -> None:
946 raise PluginInitializationError(
947 "Cannot unload pluggable_object_parser (not implemented)"
948 )
950 self._unloaders.append(_unload)
952 def pluggable_manifest_rule(
953 self,
954 rule_type: TTP | str,
955 rule_name: str | Sequence[str],
956 parsed_format: type[PF],
957 handler: DIPHandler,
958 *,
959 source_format: SF | None = None,
960 inline_reference_documentation: ParserDocumentation | None = None,
961 expected_debputy_integration_mode: None | (
962 Container[DebputyIntegrationMode]
963 ) = None,
964 apply_standard_attribute_documentation: bool = False,
965 register_value: bool = True,
966 ) -> None:
967 # When changing this, consider which types will be unrestricted
968 self._restricted_api()
969 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true
970 _error(
971 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to"
972 f" its use of apply_standard_attribute_documentation"
973 )
974 feature_set = self._feature_set
975 parser_generator = feature_set.manifest_parser_generator
976 if isinstance(rule_type, str):
977 if rule_type not in parser_generator.dispatchable_object_parsers: 977 ↛ 978line 977 didn't jump to line 978 because the condition on line 977 was never true
978 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers))
979 raise ValueError(
980 f"The rule_type was not a supported type. It must be one of {types}"
981 )
982 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type]
983 signature = inspect.signature(handler)
984 if ( 984 ↛ 988line 984 didn't jump to line 988 because the condition on line 984 was never true
985 signature.return_annotation is signature.empty
986 or signature.return_annotation == NoneType
987 ):
988 raise ValueError(
989 "The handler must have a return type (that is not None)"
990 )
991 register_as_type = signature.return_annotation
992 else:
993 # Dispatchable types cannot be resolved
994 register_as_type = None
995 if rule_type not in parser_generator.dispatchable_table_parsers: 995 ↛ 996line 995 didn't jump to line 996 because the condition on line 995 was never true
996 types = ", ".join(
997 sorted(
998 x.__name__ for x in parser_generator.dispatchable_table_parsers
999 )
1000 )
1001 raise ValueError(
1002 f"The rule_type was not a supported type. It must be one of {types}"
1003 )
1004 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
1006 if register_as_type is not None and not register_value:
1007 register_as_type = None
1009 if register_as_type is not None:
1010 existing_registration = self._registered_manifest_types.get(
1011 register_as_type
1012 )
1013 if existing_registration is not None: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 raise ValueError(
1015 f"Cannot register rule {rule_name!r} for plugin {self._plugin_name}. The plugin {existing_registration.plugin_name} already registered a manifest rule with type {register_as_type!r}"
1016 )
1017 self._registered_manifest_types[register_as_type] = self._plugin_metadata
1019 inline_reference_documentation = self._pluggable_manifest_docs_for(
1020 rule_type,
1021 rule_name,
1022 inline_reference_documentation=inline_reference_documentation,
1023 )
1025 if apply_standard_attribute_documentation: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true
1026 docs = _STD_ATTR_DOCS
1027 else:
1028 docs = None
1030 parser = feature_set.manifest_parser_generator.generate_parser(
1031 parsed_format,
1032 source_content=source_format,
1033 inline_reference_documentation=inline_reference_documentation,
1034 expected_debputy_integration_mode=expected_debputy_integration_mode,
1035 automatic_docs=docs,
1036 )
1038 def _registering_handler(
1039 name: str,
1040 parsed_data: PF,
1041 attribute_path: AttributePath,
1042 parser_context: ParserContextData,
1043 ) -> TP:
1044 value = handler(name, parsed_data, attribute_path, parser_context)
1045 if register_as_type is not None:
1046 register_manifest_type_value_in_context(register_as_type, value)
1047 return value
1049 dispatching_parser.register_parser(
1050 rule_name,
1051 parser,
1052 wrap_plugin_code(self._plugin_name, _registering_handler),
1053 self._plugin_metadata,
1054 )
1056 def _unload() -> None:
1057 raise PluginInitializationError(
1058 "Cannot unload pluggable_manifest_rule (not implemented)"
1059 )
1061 self._unloaders.append(_unload)
1063 def register_build_system(
1064 self,
1065 build_system_definition: type[BSPF],
1066 ) -> None:
1067 self._restricted_api()
1068 if not is_typeddict(build_system_definition): 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true
1069 raise PluginInitializationError(
1070 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__},"
1071 f" but got {build_system_definition.__name__} instead"
1072 )
1073 metadata = getattr(
1074 build_system_definition,
1075 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
1076 None,
1077 )
1078 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true
1079 raise PluginIncorrectRegistrationError(
1080 f"The {build_system_definition.__qualname__} type should have been annotated with"
1081 f" @{debputy_build_system.__name__}."
1082 )
1083 assert len(metadata.manifest_keywords) == 1
1084 build_system_impl = metadata.build_system_impl
1085 assert build_system_impl is not None
1086 manifest_keyword = next(iter(metadata.manifest_keywords))
1087 self.pluggable_manifest_rule(
1088 metadata.dispatched_type,
1089 metadata.manifest_keywords,
1090 build_system_definition,
1091 # pluggable_manifest_rule does the wrapping
1092 metadata.unwrapped_constructor,
1093 source_format=metadata.source_format,
1094 inline_reference_documentation=metadata.online_reference_documentation,
1095 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL),
1096 )
1097 self._auto_detectable_build_system(
1098 manifest_keyword,
1099 build_system_impl,
1100 constructor=wrap_plugin_code(
1101 self._plugin_name,
1102 build_system_impl,
1103 ),
1104 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems,
1105 )
1107 def _auto_detectable_build_system(
1108 self,
1109 manifest_keyword: str,
1110 rule_type: type[BSR],
1111 *,
1112 shadowing_build_systems_when_active: frozenset[str] = frozenset(),
1113 constructor: None | (
1114 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR]
1115 ) = None,
1116 ) -> None:
1117 self._restricted_api()
1118 feature_set = self._feature_set
1119 existing = feature_set.auto_detectable_build_systems.get(rule_type)
1120 if existing is not None: 1120 ↛ 1121line 1120 didn't jump to line 1121 because the condition on line 1120 was never true
1121 bs_name = rule_type.__class__.__name__
1122 if existing.plugin_metadata.plugin_name == self._plugin_name:
1123 message = (
1124 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1125 f' auto-detection of the build system "{bs_name}" twice.'
1126 )
1127 else:
1128 message = (
1129 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
1130 f' both tried to provide auto-detection of the build system "{bs_name}"'
1131 )
1132 raise PluginConflictError(
1133 message, existing.plugin_metadata, self._plugin_metadata
1134 )
1136 if constructor is None: 1136 ↛ 1138line 1136 didn't jump to line 1138 because the condition on line 1136 was never true
1138 def impl(
1139 attributes: BuildRuleParsedFormat,
1140 attribute_path: AttributePath,
1141 manifest: "HighLevelManifest",
1142 ) -> BSR:
1143 return rule_type(attributes, attribute_path, manifest)
1145 else:
1146 impl = constructor
1148 feature_set.auto_detectable_build_systems[rule_type] = (
1149 PluginProvidedBuildSystemAutoDetection(
1150 manifest_keyword,
1151 rule_type,
1152 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system),
1153 impl,
1154 shadowing_build_systems_when_active,
1155 self._plugin_metadata,
1156 )
1157 )
1159 def _unload() -> None:
1160 try:
1161 del feature_set.auto_detectable_build_systems[rule_type]
1162 except KeyError:
1163 pass
1165 self._unloaders.append(_unload)
1167 def known_packaging_files(
1168 self,
1169 packaging_file_details: KnownPackagingFileInfo,
1170 ) -> None:
1171 known_packaging_files = self._feature_set.known_packaging_files
1172 detection_method = packaging_file_details.get(
1173 "detection_method", cast("Literal['path']", "path")
1174 )
1175 path = packaging_file_details.get("path")
1176 dhpkgfile = packaging_file_details.get("pkgfile")
1178 packaging_file_details = packaging_file_details.copy()
1180 if detection_method == "path": 1180 ↛ 1196line 1180 didn't jump to line 1196 because the condition on line 1180 was always true
1181 if dhpkgfile is not None: 1181 ↛ 1182line 1181 didn't jump to line 1182 because the condition on line 1181 was never true
1182 raise ValueError(
1183 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)'
1184 )
1185 if path is None: 1185 ↛ 1186line 1185 didn't jump to line 1186 because the condition on line 1185 was never true
1186 raise ValueError(
1187 'The "path" attribute must be present when detection-method is "path" (or omitted)'
1188 )
1189 if path != _normalize_path(path, with_prefix=False): 1189 ↛ 1190line 1189 didn't jump to line 1190 because the condition on line 1189 was never true
1190 raise ValueError(
1191 f"The path for known packaging files must be normalized. Please replace"
1192 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"'
1193 )
1194 detection_value = path
1195 else:
1196 assert detection_method == "dh.pkgfile"
1197 if path is not None:
1198 raise ValueError(
1199 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"'
1200 )
1201 if dhpkgfile is None:
1202 raise ValueError(
1203 'The "pkgfile" attribute must be present when detection-method is "dh.pkgfile"'
1204 )
1205 if "/" in dhpkgfile:
1206 raise ValueError(
1207 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)'
1208 )
1209 detection_value = dhpkgfile
1210 key = f"{detection_method}::{detection_value}"
1211 existing = known_packaging_files.get(key)
1212 if existing is not None: 1212 ↛ 1213line 1212 didn't jump to line 1213 because the condition on line 1212 was never true
1213 if existing.plugin_metadata.plugin_name != self._plugin_name:
1214 message = (
1215 f'The key "{key}" is registered twice for known packaging files.'
1216 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1217 )
1218 else:
1219 message = (
1220 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1221 f' key "{key}" twice for known packaging files.'
1222 )
1223 raise PluginConflictError(
1224 message, existing.plugin_metadata, self._plugin_metadata
1225 )
1226 _validate_known_packaging_file_dh_compat_rules(
1227 packaging_file_details.get("dh_compat_rules")
1228 )
1229 known_packaging_files[key] = PluginProvidedKnownPackagingFile(
1230 packaging_file_details,
1231 detection_method,
1232 detection_value,
1233 self._plugin_metadata,
1234 )
1236 def _unload() -> None:
1237 del known_packaging_files[key]
1239 self._unloaders.append(_unload)
1241 def register_mapped_type(
1242 self,
1243 type_mapping: TypeMapping,
1244 *,
1245 reference_documentation: TypeMappingDocumentation | None = None,
1246 ) -> None:
1247 self._restricted_api()
1248 target_type = type_mapping.target_type
1249 mapped_types = self._feature_set.mapped_types
1250 existing = mapped_types.get(target_type)
1251 if existing is not None: 1251 ↛ 1252line 1251 didn't jump to line 1252 because the condition on line 1251 was never true
1252 if existing.plugin_metadata.plugin_name != self._plugin_name:
1253 message = (
1254 f'The key "{target_type.__name__}" is registered twice for known packaging files.'
1255 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1256 )
1257 else:
1258 message = (
1259 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1260 f' key "{target_type.__name__}" twice for known packaging files.'
1261 )
1262 raise PluginConflictError(
1263 message, existing.plugin_metadata, self._plugin_metadata
1264 )
1265 parser_generator = self._feature_set.manifest_parser_generator
1266 # TODO: Wrap the mapper in the plugin context
1267 mapped_types[target_type] = PluginProvidedTypeMapping(
1268 type_mapping, reference_documentation, self._plugin_metadata
1269 )
1270 parser_generator.register_mapped_type(type_mapping)
1272 def _restricted_api(
1273 self,
1274 *,
1275 allowed_plugins: set[str] | frozenset[str] = frozenset(),
1276 ) -> None:
1277 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1277 ↛ 1278line 1277 didn't jump to line 1278 because the condition on line 1277 was never true
1278 raise PluginAPIViolationError(
1279 f"Plugin {self._plugin_name} attempted to access a debputy-only API."
1280 " If you are the maintainer of this plugin and want access to this"
1281 " API, please file a feature request to make this public."
1282 " (The API is currently private as it is unstable.)"
1283 )
1286class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC):
1287 __slots__ = ()
1289 def _append_script(
1290 self,
1291 caller_name: str,
1292 maintscript: Maintscript,
1293 full_script: str,
1294 /,
1295 perform_substitution: bool = True,
1296 ) -> None:
1297 raise NotImplementedError
1299 @classmethod
1300 def _apply_condition_to_script(
1301 cls,
1302 condition: str,
1303 run_snippet: str,
1304 /,
1305 indent: bool | None = None,
1306 ) -> str:
1307 if indent is None:
1308 # We auto-determine this based on heredocs currently
1309 indent = "<<" not in run_snippet
1311 if indent:
1312 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True))
1313 if not run_snippet.endswith("\n"):
1314 run_snippet += "\n"
1315 condition_line = f"if {condition}; then\n"
1316 end_line = "fi\n"
1317 return "".join((condition_line, run_snippet, end_line))
1319 def on_configure(
1320 self,
1321 run_snippet: str,
1322 /,
1323 indent: bool | None = None,
1324 perform_substitution: bool = True,
1325 skip_on_rollback: bool = False,
1326 ) -> None:
1327 condition = POSTINST_DEFAULT_CONDITION
1328 if skip_on_rollback: 1328 ↛ 1329line 1328 didn't jump to line 1329 because the condition on line 1328 was never true
1329 condition = '[ "$1" = "configure" ]'
1330 return self._append_script(
1331 "on_configure",
1332 "postinst",
1333 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1334 perform_substitution=perform_substitution,
1335 )
1337 def on_initial_install(
1338 self,
1339 run_snippet: str,
1340 /,
1341 indent: bool | None = None,
1342 perform_substitution: bool = True,
1343 ) -> None:
1344 condition = '[ "$1" = "configure" -a -z "$2" ]'
1345 return self._append_script(
1346 "on_initial_install",
1347 "postinst",
1348 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1349 perform_substitution=perform_substitution,
1350 )
1352 def on_upgrade(
1353 self,
1354 run_snippet: str,
1355 /,
1356 indent: bool | None = None,
1357 perform_substitution: bool = True,
1358 ) -> None:
1359 condition = '[ "$1" = "configure" -a -n "$2" ]'
1360 return self._append_script(
1361 "on_upgrade",
1362 "postinst",
1363 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1364 perform_substitution=perform_substitution,
1365 )
1367 def on_upgrade_from(
1368 self,
1369 version: str,
1370 run_snippet: str,
1371 /,
1372 indent: bool | None = None,
1373 perform_substitution: bool = True,
1374 ) -> None:
1375 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"'
1376 return self._append_script(
1377 "on_upgrade_from",
1378 "postinst",
1379 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1380 perform_substitution=perform_substitution,
1381 )
1383 def on_before_removal(
1384 self,
1385 run_snippet: str,
1386 /,
1387 indent: bool | None = None,
1388 perform_substitution: bool = True,
1389 ) -> None:
1390 condition = '[ "$1" = "remove" ]'
1391 return self._append_script(
1392 "on_before_removal",
1393 "prerm",
1394 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1395 perform_substitution=perform_substitution,
1396 )
1398 def on_removed(
1399 self,
1400 run_snippet: str,
1401 /,
1402 indent: bool | None = None,
1403 perform_substitution: bool = True,
1404 ) -> None:
1405 condition = '[ "$1" = "remove" ]'
1406 return self._append_script(
1407 "on_removed",
1408 "postrm",
1409 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1410 perform_substitution=perform_substitution,
1411 )
1413 def on_purge(
1414 self,
1415 run_snippet: str,
1416 /,
1417 indent: bool | None = None,
1418 perform_substitution: bool = True,
1419 ) -> None:
1420 condition = '[ "$1" = "purge" ]'
1421 return self._append_script(
1422 "on_purge",
1423 "postrm",
1424 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1425 perform_substitution=perform_substitution,
1426 )
1428 def unconditionally_in_script(
1429 self,
1430 maintscript: Maintscript,
1431 run_snippet: str,
1432 /,
1433 perform_substitution: bool = True,
1434 ) -> None:
1435 if maintscript not in STD_CONTROL_SCRIPTS: 1435 ↛ 1436line 1435 didn't jump to line 1436 because the condition on line 1435 was never true
1436 raise ValueError(
1437 f'Unknown script "{maintscript}". Should have been one of:'
1438 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}'
1439 )
1440 return self._append_script(
1441 "unconditionally_in_script",
1442 maintscript,
1443 run_snippet,
1444 perform_substitution=perform_substitution,
1445 )
1448class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase):
1449 __slots__ = (
1450 "_plugin_metadata",
1451 "_maintscript_snippets",
1452 "_plugin_source_id",
1453 "_package_substitution",
1454 "_default_snippet_order",
1455 )
1457 def __init__(
1458 self,
1459 plugin_metadata: DebputyPluginMetadata,
1460 plugin_source_id: str,
1461 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1462 package_substitution: Substitution,
1463 *,
1464 default_snippet_order: Literal["service"] | None = None,
1465 ):
1466 self._plugin_metadata = plugin_metadata
1467 self._plugin_source_id = plugin_source_id
1468 self._maintscript_snippets = maintscript_snippets
1469 self._package_substitution = package_substitution
1470 self._default_snippet_order = default_snippet_order
1472 def _append_script(
1473 self,
1474 caller_name: str,
1475 maintscript: Maintscript,
1476 full_script: str,
1477 /,
1478 perform_substitution: bool = True,
1479 ) -> None:
1480 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})"
1481 if perform_substitution:
1482 full_script = self._package_substitution.substitute(full_script, def_source)
1484 snippet = MaintscriptSnippet(
1485 snippet=SnippetResolver.snippet_template(full_script),
1486 definition_source=def_source,
1487 snippet_order=self._default_snippet_order,
1488 )
1489 self._maintscript_snippets[maintscript].append(snippet)
1492class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor):
1493 __slots__ = (
1494 "_plugin_metadata",
1495 "_plugin_source_id",
1496 "_package_metadata_context",
1497 "_triggers",
1498 "_substvars",
1499 "_maintscript",
1500 "_shlibs_details",
1501 )
1503 def __init__(
1504 self,
1505 plugin_metadata: DebputyPluginMetadata,
1506 plugin_source_id: str,
1507 package_metadata_context: PackageProcessingContext,
1508 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1509 substvars: FlushableSubstvars,
1510 shlibs_details: tuple[str | None, list[str] | None],
1511 ) -> None:
1512 self._plugin_metadata = plugin_metadata
1513 self._plugin_source_id = plugin_source_id
1514 self._package_metadata_context = package_metadata_context
1515 self._triggers = triggers
1516 self._substvars = substvars
1517 self._maintscript: MaintscriptAccessor | None = None
1518 self._shlibs_details = shlibs_details
1520 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1521 raise NotImplementedError
1523 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None:
1524 """Register a declarative dpkg level trigger
1526 The provided trigger will be added to the package's metadata (the triggers file of the control.tar).
1528 If the trigger has already been added previously, a second call with the same trigger data will be ignored.
1529 """
1530 key = (trigger_type, trigger_target)
1531 if key in self._triggers: 1531 ↛ 1532line 1531 didn't jump to line 1532 because the condition on line 1531 was never true
1532 return
1533 self._triggers[key] = PluginProvidedTrigger(
1534 dpkg_trigger_type=trigger_type,
1535 dpkg_trigger_target=trigger_target,
1536 provider=self._plugin_metadata,
1537 provider_source_id=self._plugin_source_id,
1538 )
1540 @property
1541 def maintscript(self) -> MaintscriptAccessor:
1542 maintscript = self._maintscript
1543 if maintscript is None:
1544 maintscript = self._create_maintscript_accessor()
1545 self._maintscript = maintscript
1546 return maintscript
1548 @property
1549 def substvars(self) -> FlushableSubstvars:
1550 return self._substvars
1552 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None:
1553 binary_package = self._package_metadata_context.binary_package
1554 with self.substvars.flush() as substvars_file:
1555 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"]
1556 if binary_package.is_udeb:
1557 dpkg_cmd.append("-tudeb")
1558 if binary_package.is_essential: 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true
1559 dpkg_cmd.append("-dPre-Depends")
1560 shlibs_local, shlib_dirs = self._shlibs_details
1561 if shlibs_local is not None: 1561 ↛ 1562line 1561 didn't jump to line 1562 because the condition on line 1561 was never true
1562 dpkg_cmd.append(f"-L{shlibs_local}")
1563 if shlib_dirs: 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true
1564 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs)
1565 dpkg_cmd.extend(p.fs_path for p in paths)
1566 print_command(*dpkg_cmd)
1567 try:
1568 subprocess.check_call(dpkg_cmd)
1569 except subprocess.CalledProcessError:
1570 _error(
1571 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please"
1572 " review the output from dpkg-shlibdeps above to understand what went wrong."
1573 )
1576class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase):
1577 __slots__ = (
1578 "_maintscript",
1579 "_maintscript_snippets",
1580 "_package_substitution",
1581 )
1583 def __init__(
1584 self,
1585 plugin_metadata: DebputyPluginMetadata,
1586 plugin_source_id: str,
1587 package_metadata_context: PackageProcessingContext,
1588 triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1589 substvars: FlushableSubstvars,
1590 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1591 package_substitution: Substitution,
1592 shlibs_details: tuple[str | None, list[str] | None],
1593 *,
1594 default_snippet_order: Literal["service"] | None = None,
1595 ) -> None:
1596 super().__init__(
1597 plugin_metadata,
1598 plugin_source_id,
1599 package_metadata_context,
1600 triggers,
1601 substvars,
1602 shlibs_details,
1603 )
1604 self._maintscript_snippets = maintscript_snippets
1605 self._package_substitution = package_substitution
1606 self._maintscript = MaintscriptAccessorProvider(
1607 plugin_metadata,
1608 plugin_source_id,
1609 maintscript_snippets,
1610 package_substitution,
1611 default_snippet_order=default_snippet_order,
1612 )
1614 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1615 return MaintscriptAccessorProvider(
1616 self._plugin_metadata,
1617 self._plugin_source_id,
1618 self._maintscript_snippets,
1619 self._package_substitution,
1620 )
1623class BinaryCtrlAccessorProviderCreator:
1624 def __init__(
1625 self,
1626 package_metadata_context: PackageProcessingContext,
1627 substvars: FlushableSubstvars,
1628 maintscript_snippets: dict[str, MaintscriptSnippetContainer],
1629 substitution: Substitution,
1630 ) -> None:
1631 self._package_metadata_context = package_metadata_context
1632 self._substvars = substvars
1633 self._maintscript_snippets = maintscript_snippets
1634 self._substitution = substitution
1635 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
1636 self.shlibs_details: tuple[str | None, list[str] | None] = None, None
1638 def for_plugin(
1639 self,
1640 plugin_metadata: DebputyPluginMetadata,
1641 plugin_source_id: str,
1642 *,
1643 default_snippet_order: Literal["service"] | None = None,
1644 ) -> BinaryCtrlAccessor:
1645 return BinaryCtrlAccessorProvider(
1646 plugin_metadata,
1647 plugin_source_id,
1648 self._package_metadata_context,
1649 self._triggers,
1650 self._substvars,
1651 self._maintscript_snippets,
1652 self._substitution,
1653 self.shlibs_details,
1654 default_snippet_order=default_snippet_order,
1655 )
1657 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]:
1658 return self._triggers.values()
1661def _resolve_bundled_plugin_docs_path(
1662 plugin_name: str,
1663 loader: PluginInitializationEntryPoint | None,
1664) -> Traversable | Path | None:
1665 plugin_module = getattr(loader, "__module__")
1666 assert plugin_module is not None
1667 plugin_package_name = sys.modules[plugin_module].__package__
1668 return importlib.resources.files(plugin_package_name).joinpath(
1669 f"{plugin_name}_docs.yaml"
1670 )
1673def plugin_metadata_for_debputys_own_plugin(
1674 loader: PluginInitializationEntryPoint | None = None,
1675) -> DebputyPluginMetadata:
1676 if loader is None:
1677 from debputy.plugins.debputy.debputy_plugin import (
1678 initialize_debputy_features,
1679 )
1681 loader = initialize_debputy_features
1682 plugin_name = "debputy"
1683 return DebputyPluginMetadata(
1684 plugin_name="debputy",
1685 api_compat_version=1,
1686 plugin_initializer=loader,
1687 plugin_loader=None,
1688 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path(
1689 plugin_name,
1690 loader,
1691 ),
1692 plugin_path="<bundled>",
1693 )
1696def load_plugin_features(
1697 plugin_search_dirs: Sequence[str],
1698 substitution: Substitution,
1699 requested_plugins_only: Sequence[str] | None = None,
1700 required_plugins: set[str] | None = None,
1701 plugin_feature_set: PluginProvidedFeatureSet | None = None,
1702 debug_mode: bool = False,
1703) -> PluginProvidedFeatureSet:
1704 if plugin_feature_set is None:
1705 plugin_feature_set = PluginProvidedFeatureSet()
1706 plugins = [plugin_metadata_for_debputys_own_plugin()]
1707 unloadable_plugins = set()
1708 if required_plugins:
1709 plugins.extend(
1710 find_json_plugins(
1711 plugin_search_dirs,
1712 required_plugins,
1713 )
1714 )
1715 if requested_plugins_only is not None:
1716 plugins.extend(
1717 find_json_plugins(
1718 plugin_search_dirs,
1719 requested_plugins_only,
1720 )
1721 )
1722 else:
1723 auto_loaded = _find_all_json_plugins(
1724 plugin_search_dirs,
1725 required_plugins if required_plugins is not None else frozenset(),
1726 debug_mode=debug_mode,
1727 )
1728 for plugin_metadata in auto_loaded:
1729 plugins.append(plugin_metadata)
1730 unloadable_plugins.add(plugin_metadata.plugin_name)
1732 for plugin_metadata in plugins:
1733 api = DebputyPluginInitializerProvider(
1734 plugin_metadata, plugin_feature_set, substitution
1735 )
1736 try:
1737 api.load_plugin()
1738 except PluginBaseError as e:
1739 if plugin_metadata.plugin_name not in unloadable_plugins:
1740 raise
1741 if debug_mode:
1742 _warn(
1743 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due"
1744 f" to --debug/-d or DEBPUTY_DEBUG=1"
1745 )
1746 raise
1747 try:
1748 api.unload_plugin()
1749 except Exception:
1750 _warn(
1751 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to"
1752 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded"
1753 " module might have tainted the feature set."
1754 )
1755 raise e from None
1756 _warn(
1757 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was"
1758 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace"
1759 f" (the warning will become an error)"
1760 )
1762 return plugin_feature_set
1765def find_json_plugin(
1766 search_dirs: Sequence[str],
1767 requested_plugin: str,
1768) -> DebputyPluginMetadata:
1769 r = list(find_json_plugins(search_dirs, [requested_plugin]))
1770 assert len(r) == 1
1771 return r[0]
1774def find_related_implementation_files_for_plugin(
1775 plugin_metadata: DebputyPluginMetadata,
1776) -> list[str]:
1777 if plugin_metadata.is_bundled:
1778 plugin_name = plugin_metadata.plugin_name
1779 _error(
1780 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled"
1781 " or loaded via a mechanism that does not support detecting its tests."
1782 )
1784 if plugin_metadata.is_from_python_path:
1785 plugin_name = plugin_metadata.plugin_name
1786 # Maybe they could be, but that is for another day.
1787 _error(
1788 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path"
1789 " and these are not supported."
1790 )
1791 files = []
1792 module_name, module_file = _find_plugin_implementation_file(
1793 plugin_metadata.plugin_name,
1794 plugin_metadata.plugin_path,
1795 )
1796 if os.path.isfile(module_file):
1797 files.append(module_file)
1798 else:
1799 if not plugin_metadata.is_loaded:
1800 plugin_metadata.load_plugin()
1801 if module_name in sys.modules:
1802 _error(
1803 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its'
1804 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be "
1805 f" installed via this method. The related Python would not be installed"
1806 f" (which would result in a plugin that would fail to load)"
1807 )
1809 return files
1812def find_tests_for_plugin(
1813 plugin_metadata: DebputyPluginMetadata,
1814) -> list[str]:
1815 plugin_name = plugin_metadata.plugin_name
1816 plugin_path = plugin_metadata.plugin_path
1818 if plugin_metadata.is_bundled:
1819 _error(
1820 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a"
1821 " mechanism that does not support detecting its tests."
1822 )
1824 if plugin_metadata.is_from_python_path:
1825 plugin_name = plugin_metadata.plugin_name
1826 # Maybe they could be, but that is for another day.
1827 _error(
1828 f"Cannot run find related files for {plugin_name}: The plugin is installed into python path"
1829 " and these are not supported."
1830 )
1832 plugin_dir = os.path.dirname(plugin_path)
1833 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_")
1834 tests = []
1835 with os.scandir(plugin_dir) as dir_iter:
1836 for p in dir_iter:
1837 if (
1838 p.is_file()
1839 and p.name.startswith(test_basename_prefix)
1840 and PLUGIN_TEST_SUFFIX.search(p.name)
1841 ):
1842 tests.append(p.path)
1843 return tests
1846def find_json_plugins(
1847 search_dirs: Sequence[str],
1848 requested_plugins: Iterable[str],
1849) -> Iterable[DebputyPluginMetadata]:
1850 for plugin_name_or_path in requested_plugins: 1850 ↛ exitline 1850 didn't return from function 'find_json_plugins' because the loop on line 1850 didn't complete
1851 if "/" in plugin_name_or_path: 1851 ↛ 1852line 1851 didn't jump to line 1852 because the condition on line 1851 was never true
1852 if not os.path.isfile(plugin_name_or_path):
1853 raise PluginNotFoundError(
1854 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file."
1855 ' (Because the plugin name contains "/", it is assumed to be a path and search path'
1856 " is not used."
1857 )
1858 yield parse_json_plugin_desc(plugin_name_or_path)
1859 return
1860 for search_dir in search_dirs: 1860 ↛ 1869line 1860 didn't jump to line 1869 because the loop on line 1860 didn't complete
1861 path = os.path.join(
1862 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json"
1863 )
1864 if not os.path.isfile(path): 1864 ↛ 1865line 1864 didn't jump to line 1865 because the condition on line 1864 was never true
1865 continue
1866 yield parse_json_plugin_desc(path)
1867 return
1869 path_root = PLUGIN_PYTHON_RES_PATH
1870 pp_path = path_root.joinpath(f"{plugin_name_or_path}.json")
1871 if pp_path or pp_path.is_file():
1872 with pp_path.open() as fd:
1873 yield parse_json_plugin_desc(
1874 f"PYTHONPATH:debputy/plugins/{pp_path.name}",
1875 fd=fd,
1876 is_from_python_path=True,
1877 )
1878 return
1880 search_dir_str = ":".join(search_dirs)
1881 raise PluginNotFoundError(
1882 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the"
1883 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})"
1884 )
1887def _find_all_json_plugins(
1888 search_dirs: Sequence[str],
1889 required_plugins: AbstractSet[str],
1890 debug_mode: bool = False,
1891) -> Iterable[DebputyPluginMetadata]:
1892 seen = set(required_plugins)
1893 error_seen = False
1894 for search_dir in search_dirs:
1895 try:
1896 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins"))
1897 except FileNotFoundError:
1898 continue
1899 with dir_fd:
1900 for entry in dir_fd:
1901 if (
1902 not entry.is_file(follow_symlinks=True)
1903 or not entry.name.endswith(".json")
1904 or entry.name in seen
1905 ):
1906 continue
1907 seen.add(entry.name)
1908 try:
1909 plugin_metadata = parse_json_plugin_desc(entry.path)
1910 except PluginBaseError as e:
1911 if debug_mode:
1912 raise
1913 if not error_seen:
1914 error_seen = True
1915 _warn(
1916 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}"
1917 )
1918 else:
1919 _warn(
1920 f"Failed to load plugin in {entry.path} due to errors (not shown)."
1921 )
1922 else:
1923 yield plugin_metadata
1925 for pp_entry in PLUGIN_PYTHON_RES_PATH.iterdir():
1926 if (
1927 not pp_entry.name.endswith(".json")
1928 or not pp_entry.is_file()
1929 or pp_entry.name in seen
1930 ):
1931 continue
1932 seen.add(pp_entry.name)
1933 with pp_entry.open() as fd:
1934 yield parse_json_plugin_desc(
1935 f"PYTHONPATH:debputy/plugins/{pp_entry.name}",
1936 fd=fd,
1937 is_from_python_path=True,
1938 )
1941def _find_plugin_implementation_file(
1942 plugin_name: str,
1943 json_file_path: str,
1944) -> tuple[str, str]:
1945 guessed_module_basename = plugin_name.replace("-", "_")
1946 module_name = f"debputy.plugins.{guessed_module_basename}"
1947 module_fs_path = os.path.join(
1948 os.path.dirname(json_file_path), f"{guessed_module_basename}.py"
1949 )
1950 return module_name, module_fs_path
1953def _resolve_module_initializer(
1954 plugin_name: str,
1955 plugin_initializer_name: str,
1956 module_name: str | None,
1957 json_file_path: str,
1958) -> PluginInitializationEntryPoint:
1959 module = None
1960 module_fs_path = None
1961 if module_name is None: 1961 ↛ 1989line 1961 didn't jump to line 1989 because the condition on line 1961 was always true
1962 module_name, module_fs_path = _find_plugin_implementation_file(
1963 plugin_name, json_file_path
1964 )
1965 if os.path.isfile(module_fs_path): 1965 ↛ 1989line 1965 didn't jump to line 1989 because the condition on line 1965 was always true
1966 spec = importlib.util.spec_from_file_location(module_name, module_fs_path)
1967 if spec is None: 1967 ↛ 1968line 1967 didn't jump to line 1968 because the condition on line 1967 was never true
1968 raise PluginInitializationError(
1969 f"Failed to load {plugin_name} (path: {module_fs_path})."
1970 " The spec_from_file_location function returned None."
1971 )
1972 mod = importlib.util.module_from_spec(spec)
1973 loader = spec.loader
1974 if loader is None: 1974 ↛ 1975line 1974 didn't jump to line 1975 because the condition on line 1974 was never true
1975 raise PluginInitializationError(
1976 f"Failed to load {plugin_name} (path: {module_fs_path})."
1977 " Python could not find a suitable loader (spec.loader was None)"
1978 )
1979 sys.modules[module_name] = mod
1980 try:
1981 run_in_context_of_plugin(plugin_name, loader.exec_module, mod)
1982 except (Exception, GeneratorExit) as e:
1983 raise PluginInitializationError(
1984 f"Failed to load {plugin_name} (path: {module_fs_path})."
1985 " The module threw an exception while being loaded."
1986 ) from e
1987 module = mod
1989 if module is None: 1989 ↛ 1990line 1989 didn't jump to line 1990 because the condition on line 1989 was never true
1990 try:
1991 module = run_in_context_of_plugin(
1992 plugin_name, importlib.import_module, module_name
1993 )
1994 except ModuleNotFoundError as e:
1995 if module_fs_path is None:
1996 raise PluginMetadataError(
1997 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but'
1998 " this module is not available in the python search path"
1999 ) from e
2000 raise PluginInitializationError(
2001 f"Failed to load {plugin_name}. Tried loading it from"
2002 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as'
2003 f" {module_name} (where it was not found either). Please ensure"
2004 " the module code is installed in the correct spot or provide an"
2005 f' explicit "module" definition in {json_file_path}.'
2006 ) from e
2008 plugin_initializer = run_in_context_of_plugin_wrap_errors(
2009 plugin_name,
2010 getattr,
2011 module,
2012 plugin_initializer_name,
2013 None,
2014 )
2016 if plugin_initializer is None: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true
2017 raise PluginMetadataError(
2018 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
2019 f' attribute called "{plugin_initializer_name}" to initialize the plugin. However, that attribute'
2020 " does not exist or cannot be resolved. Please correct the plugin metadata or initializer name"
2021 " in the Python module."
2022 )
2023 if isinstance(plugin_initializer, DebputyPluginDefinition):
2024 return plugin_initializer.initialize
2025 if not callable(plugin_initializer): 2025 ↛ 2026line 2025 didn't jump to line 2026 because the condition on line 2025 was never true
2026 raise PluginMetadataError(
2027 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
2028 f' attribute called "{plugin_initializer_name}" for initializing the plugin. While that'
2029 " attribute exists, it is neither a `DebputyPluginDefinition`"
2030 " (`plugin_definition = define_debputy_plugin()`) nor is it `callable`"
2031 " (`def initialize(api: DebputyPluginInitializer) -> None:`)."
2032 )
2033 return cast("PluginInitializationEntryPoint", plugin_initializer)
2036def _json_plugin_loader(
2037 plugin_name: str,
2038 plugin_json_metadata: PluginJsonMetadata,
2039 json_file_path: str,
2040 attribute_path: AttributePath,
2041) -> Callable[["DebputyPluginInitializer"], None]:
2042 api_compat = plugin_json_metadata["api_compat_version"]
2043 module_name = plugin_json_metadata.get("module")
2044 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer")
2045 packager_provided_files_raw = plugin_json_metadata.get(
2046 "packager_provided_files", []
2047 )
2048 manifest_variables_raw = plugin_json_metadata.get("manifest_variables")
2049 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files")
2050 if api_compat != 1: 2050 ↛ 2051line 2050 didn't jump to line 2051 because the condition on line 2050 was never true
2051 raise PluginMetadataError(
2052 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this'
2053 f" version of debputy only supports API compat version of 1"
2054 )
2055 if plugin_initializer_name is not None and "." in plugin_initializer_name: 2055 ↛ 2056line 2055 didn't jump to line 2056 because the condition on line 2055 was never true
2056 p = attribute_path["plugin_initializer"]
2057 raise PluginMetadataError(
2058 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".'
2059 )
2061 plugin_initializers = []
2063 if plugin_initializer_name is not None:
2064 plugin_initializer = _resolve_module_initializer(
2065 plugin_name,
2066 plugin_initializer_name,
2067 module_name,
2068 json_file_path,
2069 )
2070 plugin_initializers.append(plugin_initializer)
2072 if known_packaging_files_raw:
2073 kpf_root_path = attribute_path["known_packaging_files"]
2074 known_packaging_files = []
2075 for k, v in enumerate(known_packaging_files_raw):
2076 kpf_path = kpf_root_path[k]
2077 p = v.get("path")
2078 if isinstance(p, str): 2078 ↛ 2080line 2078 didn't jump to line 2080 because the condition on line 2078 was always true
2079 kpf_path.path_hint = p
2080 if plugin_name.startswith("debputy-") and isinstance(v, dict): 2080 ↛ 2092line 2080 didn't jump to line 2092 because the condition on line 2080 was always true
2081 docs = v.get("documentation-uris")
2082 if docs is not None and isinstance(docs, list):
2083 docs = [
2084 (
2085 d.replace("@DEBPUTY_DOC_ROOT_DIR@", debputy_doc_root_dir())
2086 if isinstance(d, str)
2087 else d
2088 )
2089 for d in docs
2090 ]
2091 v["documentation-uris"] = docs
2092 known_packaging_file: KnownPackagingFileInfo = (
2093 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input(
2094 v,
2095 kpf_path,
2096 )
2097 )
2098 known_packaging_files.append((kpf_path, known_packaging_file))
2100 def _initialize_json_provided_known_packaging_files(
2101 api: DebputyPluginInitializerProvider,
2102 ) -> None:
2103 for p, details in known_packaging_files:
2104 try:
2105 api.known_packaging_files(details)
2106 except ValueError as ex:
2107 raise PluginMetadataError(
2108 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}"
2109 )
2111 plugin_initializers.append(_initialize_json_provided_known_packaging_files)
2113 if manifest_variables_raw:
2114 manifest_var_path = attribute_path["manifest_variables"]
2115 manifest_variables = [
2116 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i])
2117 for i, p in enumerate(manifest_variables_raw)
2118 ]
2120 def _initialize_json_provided_manifest_vars(
2121 api: DebputyPluginInitializer,
2122 ) -> None:
2123 for idx, manifest_variable in enumerate(manifest_variables):
2124 name = manifest_variable["name"]
2125 value = manifest_variable["value"]
2126 doc = manifest_variable.get("reference_documentation")
2127 try:
2128 api.manifest_variable(
2129 name, value, variable_reference_documentation=doc
2130 )
2131 except ValueError as ex:
2132 var_path = manifest_var_path[idx]
2133 raise PluginMetadataError(
2134 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}"
2135 )
2137 plugin_initializers.append(_initialize_json_provided_manifest_vars)
2139 if packager_provided_files_raw:
2140 ppf_path = attribute_path["packager_provided_files"]
2141 ppfs = [
2142 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i])
2143 for i, p in enumerate(packager_provided_files_raw)
2144 ]
2146 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None:
2147 ppf: PackagerProvidedFileJsonDescription
2148 for idx, ppf in enumerate(ppfs):
2149 c = dict(ppf)
2150 stem = ppf["stem"]
2151 installed_path = ppf["installed_path"]
2152 default_mode = ppf.get("default_mode")
2153 ref_doc_dict = ppf.get("reference_documentation")
2154 if default_mode is not None: 2154 ↛ 2157line 2154 didn't jump to line 2157 because the condition on line 2154 was always true
2155 c["default_mode"] = default_mode.octal_mode
2157 if ref_doc_dict is not None: 2157 ↛ 2162line 2157 didn't jump to line 2162 because the condition on line 2157 was always true
2158 ref_doc = packager_provided_file_reference_documentation(
2159 **ref_doc_dict
2160 )
2161 else:
2162 ref_doc = None
2164 for k in [
2165 "stem",
2166 "installed_path",
2167 "reference_documentation",
2168 ]:
2169 try:
2170 del c[k]
2171 except KeyError:
2172 pass
2174 try:
2175 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore
2176 except ValueError as ex:
2177 p_path = ppf_path[idx]
2178 raise PluginMetadataError(
2179 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}"
2180 )
2182 plugin_initializers.append(_initialize_json_provided_ppfs)
2184 if not plugin_initializers: 2184 ↛ 2185line 2184 didn't jump to line 2185 because the condition on line 2184 was never true
2185 raise PluginMetadataError(
2186 f"The plugin defined in {json_file_path} does not seem to provide features,"
2187 f" such as module + plugin-initializer or packager-provided-files."
2188 )
2190 if len(plugin_initializers) == 1:
2191 return plugin_initializers[0]
2193 def _chain_loader(api: DebputyPluginInitializer) -> None:
2194 for initializer in plugin_initializers:
2195 initializer(api)
2197 return _chain_loader
2200@overload
2201@contextlib.contextmanager
2202def _open( 2202 ↛ exitline 2202 didn't return from function '_open' because
2203 path: str,
2204 fd: IO[AnyStr] | IOBase = ...,
2205) -> Iterator[IO[AnyStr] | IOBase]: ...
2208@overload
2209@contextlib.contextmanager
2210def _open(path: str, fd: None = None) -> Iterator[IO[bytes]]: ... 2210 ↛ exitline 2210 didn't return from function '_open' because
2213@contextlib.contextmanager
2214def _open(
2215 path: str, fd: IO[AnyStr] | IOBase | None = None
2216) -> Iterator[IO[AnyStr] | IOBase]:
2217 if fd is not None:
2218 yield fd
2219 else:
2220 with open(path, "rb") as fd:
2221 yield fd
2224def _resolve_json_plugin_docs_path(
2225 plugin_name: str,
2226 plugin_path: str,
2227) -> Traversable | Path | None:
2228 plugin_dir = os.path.dirname(plugin_path)
2229 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml"))
2232def parse_json_plugin_desc(
2233 path: str,
2234 *,
2235 fd: IO[AnyStr] | IOBase | None = None,
2236 is_from_python_path: bool = False,
2237) -> DebputyPluginMetadata:
2238 with _open(path, fd=fd) as rfd:
2239 try:
2240 raw = json.load(rfd)
2241 except JSONDecodeError as e:
2242 raise PluginMetadataError(
2243 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}'
2244 ) from e
2245 plugin_name = os.path.basename(path)
2246 if plugin_name.endswith(".json"):
2247 plugin_name = plugin_name[:-5]
2248 elif plugin_name.endswith(".json.in"):
2249 plugin_name = plugin_name[:-8]
2251 if plugin_name == "debputy": 2251 ↛ 2253line 2251 didn't jump to line 2253 because the condition on line 2251 was never true
2252 # Provide a better error message than "The plugin has already loaded!?"
2253 raise PluginMetadataError(
2254 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not'
2255 f" clash with the bundled plugin of same name."
2256 )
2258 attribute_path = AttributePath.root_path(raw)
2260 try:
2261 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input(
2262 raw,
2263 attribute_path,
2264 )
2265 except ManifestParseException as e:
2266 raise PluginMetadataError(
2267 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}'
2268 ) from e
2269 api_compat = plugin_json_metadata["api_compat_version"]
2271 return DebputyPluginMetadata(
2272 plugin_name=plugin_name,
2273 plugin_loader=lambda: _json_plugin_loader(
2274 plugin_name,
2275 plugin_json_metadata,
2276 path,
2277 attribute_path,
2278 ),
2279 api_compat_version=api_compat,
2280 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path(
2281 plugin_name, path
2282 ),
2283 plugin_initializer=None,
2284 plugin_path=path,
2285 is_from_python_path=is_from_python_path,
2286 )
2289@dataclasses.dataclass(slots=True, frozen=True)
2290class ServiceDefinitionImpl(ServiceDefinition[DSD]):
2291 name: str
2292 names: Sequence[str]
2293 path: VirtualPath
2294 type_of_service: str
2295 service_scope: str
2296 auto_enable_on_install: bool
2297 auto_start_on_install: bool
2298 on_upgrade: ServiceUpgradeRule
2299 definition_source: str
2300 is_plugin_provided_definition: bool
2301 service_context: DSD | None
2303 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]":
2304 return dataclasses.replace(self, **changes)
2307class ServiceRegistryImpl(ServiceRegistry[DSD]):
2308 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services")
2310 def __init__(self, service_manager_details: ServiceManagerDetails) -> None:
2311 self._service_manager_details = service_manager_details
2312 self._service_definitions: list[ServiceDefinition[DSD]] = []
2313 self._seen_services: set[tuple[str, str, str]] = set()
2315 @property
2316 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]:
2317 return self._service_definitions
2319 def register_service(
2320 self,
2321 path: VirtualPath,
2322 name: str | list[str],
2323 *,
2324 type_of_service: str = "service", # "timer", etc.
2325 service_scope: str = "system",
2326 enable_by_default: bool = True,
2327 start_by_default: bool = True,
2328 default_upgrade_rule: ServiceUpgradeRule = "restart",
2329 service_context: DSD | None = None,
2330 ) -> None:
2331 names = name if isinstance(name, list) else [name]
2332 if len(names) < 1:
2333 raise ValueError(
2334 f"The service must have at least one name - {path.absolute} did not have any"
2335 )
2336 for n in names:
2337 key = (n, type_of_service, service_scope)
2338 if key in self._seen_services:
2339 raise PluginAPIViolationError(
2340 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used"
2341 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not"
2342 " allowed by the debputy plugin API."
2343 )
2344 # TODO: We cannot create a service definition immediate once the manifest is involved
2345 self._service_definitions.append(
2346 ServiceDefinitionImpl(
2347 names[0],
2348 names,
2349 path,
2350 type_of_service,
2351 service_scope,
2352 enable_by_default,
2353 start_by_default,
2354 default_upgrade_rule,
2355 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}",
2356 True,
2357 service_context,
2358 )
2359 )