Coverage for src/debputy/plugin/api/impl.py: 57%
852 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import contextlib
2import dataclasses
3import functools
4import importlib
5import importlib.resources
6import importlib.util
7import itertools
8import json
9import os
10import re
11import subprocess
12import sys
13from abc import ABC
14from importlib.resources.abc import Traversable
15from json import JSONDecodeError
16from pathlib import Path
17from typing import (
18 Optional,
19 Callable,
20 Dict,
21 Tuple,
22 Iterable,
23 Sequence,
24 Type,
25 List,
26 Union,
27 Set,
28 Iterator,
29 IO,
30 Mapping,
31 AbstractSet,
32 cast,
33 FrozenSet,
34 Any,
35 Literal,
36 Container,
37 TYPE_CHECKING,
38 is_typeddict,
39)
41from debputy import DEBPUTY_DOC_ROOT_DIR
42from debputy.exceptions import (
43 DebputySubstitutionError,
44 PluginConflictError,
45 PluginMetadataError,
46 PluginBaseError,
47 PluginInitializationError,
48 PluginAPIViolationError,
49 PluginNotFoundError,
50 PluginIncorrectRegistrationError,
51)
52from debputy.maintscript_snippet import (
53 STD_CONTROL_SCRIPTS,
54 MaintscriptSnippetContainer,
55 MaintscriptSnippet,
56)
57from debputy.manifest_parser.exceptions import ManifestParseException
58from debputy.manifest_parser.parser_data import ParserContextData
59from debputy.manifest_parser.tagging_types import TypeMapping
60from debputy.manifest_parser.util import AttributePath
61from debputy.manifest_parser.util import resolve_package_type_selectors
62from debputy.plugin.api.doc_parsing import (
63 DEBPUTY_DOC_REFERENCE_DATA_PARSER,
64 parser_type_name,
65 DebputyParsedDoc,
66)
67from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
68from debputy.plugin.api.impl_types import (
69 DebputyPluginMetadata,
70 PackagerProvidedFileClassSpec,
71 MetadataOrMaintscriptDetector,
72 PluginProvidedTrigger,
73 TTP,
74 DIPHandler,
75 PF,
76 SF,
77 DIPKWHandler,
78 PluginProvidedManifestVariable,
79 PluginProvidedPackageProcessor,
80 PluginProvidedDiscardRule,
81 AutomaticDiscardRuleExample,
82 PPFFormatParam,
83 ServiceManagerDetails,
84 KnownPackagingFileInfo,
85 PluginProvidedKnownPackagingFile,
86 InstallPatternDHCompatRule,
87 PluginProvidedTypeMapping,
88 PluginProvidedBuildSystemAutoDetection,
89 BSR,
90)
91from debputy.plugin.api.plugin_parser import (
92 PLUGIN_METADATA_PARSER,
93 PluginJsonMetadata,
94 PLUGIN_PPF_PARSER,
95 PackagerProvidedFileJsonDescription,
96 PLUGIN_MANIFEST_VARS_PARSER,
97 PLUGIN_KNOWN_PACKAGING_FILES_PARSER,
98)
99from debputy.plugin.api.spec import (
100 MaintscriptAccessor,
101 Maintscript,
102 DpkgTriggerType,
103 BinaryCtrlAccessor,
104 PackageProcessingContext,
105 MetadataAutoDetector,
106 PluginInitializationEntryPoint,
107 DebputyPluginInitializer,
108 PackageTypeSelector,
109 FlushableSubstvars,
110 ParserDocumentation,
111 PackageProcessor,
112 VirtualPath,
113 ServiceIntegrator,
114 ServiceDetector,
115 ServiceRegistry,
116 ServiceDefinition,
117 DSD,
118 ServiceUpgradeRule,
119 PackagerProvidedFileReferenceDocumentation,
120 packager_provided_file_reference_documentation,
121 TypeMappingDocumentation,
122 DebputyIntegrationMode,
123 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
124 BuildSystemManifestRuleMetadata,
125 INTEGRATION_MODE_FULL,
126 only_integrations,
127)
128from debputy.plugin.api.std_docs import _STD_ATTR_DOCS
129from debputy.plugin.debputy.to_be_api_types import (
130 BuildRuleParsedFormat,
131 BSPF,
132 debputy_build_system,
133)
134from debputy.plugin.plugin_state import (
135 run_in_context_of_plugin,
136 run_in_context_of_plugin_wrap_errors,
137 wrap_plugin_code,
138)
139from debputy.substitution import (
140 Substitution,
141 VariableNameState,
142 SUBST_VAR_RE,
143 VariableContext,
144)
145from debputy.util import (
146 _normalize_path,
147 POSTINST_DEFAULT_CONDITION,
148 _error,
149 print_command,
150 _warn,
151 _debug_log,
152)
153from debputy.yaml import MANIFEST_YAML
155if TYPE_CHECKING:
156 from debputy.highlevel_manifest import HighLevelManifest
158PLUGIN_TEST_SUFFIX = re.compile(r"_(?:t|test|check)(?:_([a-z0-9_]+))?[.]py$")
161def _validate_known_packaging_file_dh_compat_rules(
162 dh_compat_rules: Optional[List[InstallPatternDHCompatRule]],
163) -> None:
164 max_compat = None
165 if not dh_compat_rules:
166 return
167 dh_compat_rule: InstallPatternDHCompatRule
168 for idx, dh_compat_rule in enumerate(dh_compat_rules):
169 dh_version = dh_compat_rule.get("starting_with_debhelper_version")
170 compat = dh_compat_rule.get("starting_with_compat_level")
172 remaining = dh_compat_rule.keys() - {
173 "after_debhelper_version",
174 "starting_with_compat_level",
175 }
176 if not remaining:
177 raise ValueError(
178 f"The dh compat-rule at index {idx} does not affect anything not have any rules!? So why have it?"
179 )
180 if dh_version is None and compat is None and idx < len(dh_compat_rules) - 1:
181 raise ValueError(
182 f"The dh compat-rule at index {idx} is not the last and is missing either"
183 " before-debhelper-version or before-compat-level"
184 )
185 if compat is not None and compat < 0:
186 raise ValueError(
187 f"There is no compat below 1 but dh compat-rule at {idx} wants to declare some rule"
188 f" for something that appeared when migrating from {compat} to {compat + 1}."
189 )
191 if max_compat is None:
192 max_compat = compat
193 elif compat is not None:
194 if compat >= max_compat:
195 raise ValueError(
196 f"The dh compat-rule at {idx} should be moved earlier than the entry for compat {max_compat}."
197 )
198 max_compat = compat
200 install_pattern = dh_compat_rule.get("install_pattern")
201 if (
202 install_pattern is not None
203 and _normalize_path(install_pattern, with_prefix=False) != install_pattern
204 ):
205 raise ValueError(
206 f"The install-pattern in dh compat-rule at {idx} must be normalized as"
207 f' "{_normalize_path(install_pattern, with_prefix=False)}".'
208 )
211class DebputyPluginInitializerProvider(DebputyPluginInitializer):
212 __slots__ = (
213 "_plugin_metadata",
214 "_feature_set",
215 "_plugin_detector_ids",
216 "_substitution",
217 "_unloaders",
218 "_is_doc_cache_resolved",
219 "_doc_cache",
220 "_load_started",
221 )
223 def __init__(
224 self,
225 plugin_metadata: DebputyPluginMetadata,
226 feature_set: PluginProvidedFeatureSet,
227 substitution: Substitution,
228 ) -> None:
229 self._plugin_metadata: DebputyPluginMetadata = plugin_metadata
230 self._feature_set = feature_set
231 self._plugin_detector_ids: Set[str] = set()
232 self._substitution = substitution
233 self._unloaders: List[Callable[[], None]] = []
234 self._is_doc_cache_resolved: bool = False
235 self._doc_cache: Optional[DebputyParsedDoc] = None
236 self._load_started = False
238 def unload_plugin(self) -> None:
239 if self._load_started:
240 for unloader in self._unloaders:
241 unloader()
242 del self._feature_set.plugin_data[self._plugin_name]
244 def load_plugin(self) -> None:
245 metadata = self._plugin_metadata
246 if metadata.plugin_name in self._feature_set.plugin_data: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 raise PluginConflictError(
248 f'The plugin "{metadata.plugin_name}" has already been loaded!?'
249 )
250 assert (
251 metadata.api_compat_version == 1
252 ), f"Unsupported plugin API compat version {metadata.api_compat_version}"
253 self._feature_set.plugin_data[metadata.plugin_name] = metadata
254 self._load_started = True
255 assert not metadata.is_initialized
256 try:
257 metadata.initialize_plugin(self)
258 except Exception as e:
259 initializer = metadata.plugin_initializer
260 if ( 260 ↛ 265line 260 didn't jump to line 265
261 isinstance(e, TypeError)
262 and initializer is not None
263 and not callable(initializer)
264 ):
265 raise PluginMetadataError(
266 f"The specified entry point for plugin {metadata.plugin_name} does not appear to be a"
267 f" callable (callable returns False). The specified entry point identifies"
268 f' itself as "{initializer.__qualname__}".'
269 ) from e
270 elif isinstance(e, PluginBaseError): 270 ↛ 272line 270 didn't jump to line 272 because the condition on line 270 was always true
271 raise
272 raise PluginInitializationError(
273 f"Exception while attempting to load plugin {metadata.plugin_name}"
274 ) from e
276 def _resolve_docs(self) -> Optional[DebputyParsedDoc]:
277 doc_cache = self._doc_cache
278 if doc_cache is not None:
279 return doc_cache
281 plugin_doc_path = self._plugin_metadata.plugin_doc_path
282 if plugin_doc_path is None or self._is_doc_cache_resolved:
283 self._is_doc_cache_resolved = True
284 return None
285 try:
286 with plugin_doc_path.open("r", encoding="utf-8") as fd:
287 raw = MANIFEST_YAML.load(fd)
288 except FileNotFoundError:
289 _debug_log(
290 f"No documentation file found for {self._plugin_name}. Expected it at {plugin_doc_path}"
291 )
292 self._is_doc_cache_resolved = True
293 return None
294 attr_path = AttributePath.root_path(plugin_doc_path)
295 try:
296 ref = DEBPUTY_DOC_REFERENCE_DATA_PARSER.parse_input(raw, attr_path)
297 except ManifestParseException as e:
298 raise ValueError(
299 f"Could not parse documentation in {plugin_doc_path}: {e.message}"
300 ) from e
301 try:
302 res = DebputyParsedDoc.from_ref_data(ref)
303 except ValueError as e:
304 raise ValueError(
305 f"Could not parse documentation in {plugin_doc_path}: {e.args[0]}"
306 ) from e
308 self._doc_cache = res
309 self._is_doc_cache_resolved = True
310 return res
312 def _pluggable_manifest_docs_for(
313 self,
314 rule_type: Union[TTP, str],
315 rule_name: Union[str, List[str]],
316 *,
317 inline_reference_documentation: Optional[ParserDocumentation] = None,
318 ) -> Optional[ParserDocumentation]:
319 ref_data = self._resolve_docs()
320 if ref_data is not None:
321 primary_rule_name = (
322 rule_name if isinstance(rule_name, str) else rule_name[0]
323 )
324 rule_ref = f"{parser_type_name(rule_type)}::{primary_rule_name}"
325 resolved_docs = ref_data.pluggable_manifest_rules.get(rule_ref)
326 if resolved_docs is not None:
327 if inline_reference_documentation is not None: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise ValueError(
329 f"Conflicting docs for {rule_ref}: Was provided one in the API call and one via"
330 f" {self._plugin_metadata.plugin_doc_path}. Please remove one of the two, so"
331 f" there is only one doc reference"
332 )
333 return resolved_docs
334 return inline_reference_documentation
336 def packager_provided_file(
337 self,
338 stem: str,
339 installed_path: str,
340 *,
341 default_mode: int = 0o0644,
342 default_priority: Optional[int] = None,
343 allow_name_segment: bool = True,
344 allow_architecture_segment: bool = False,
345 post_formatting_rewrite: Optional[Callable[[str], str]] = None,
346 packageless_is_fallback_for_all_packages: bool = False,
347 reservation_only: bool = False,
348 format_callback: Optional[
349 Callable[[str, PPFFormatParam, VirtualPath], str]
350 ] = None,
351 reference_documentation: Optional[
352 PackagerProvidedFileReferenceDocumentation
353 ] = None,
354 ) -> None:
355 packager_provided_files = self._feature_set.packager_provided_files
356 existing = packager_provided_files.get(stem)
358 if format_callback is not None and self._plugin_name != "debputy": 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 raise ValueError(
360 "Sorry; Using format_callback is a debputy-internal"
361 f" API. Triggered by plugin {self._plugin_name}"
362 )
364 if installed_path.endswith("/"): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 raise ValueError(
366 f'The installed_path ends with "/" indicating it is a directory, but it must be a file.'
367 f" Triggered by plugin {self._plugin_name}."
368 )
370 installed_path = _normalize_path(installed_path)
372 has_name_var = "{name}" in installed_path
374 if installed_path.startswith("./DEBIAN") or reservation_only:
375 # Special-case, used for control files.
376 if self._plugin_name != "debputy": 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 raise ValueError(
378 "Sorry; Using DEBIAN as install path or/and reservation_only is a debputy-internal"
379 f" API. Triggered by plugin {self._plugin_name}"
380 )
381 elif not has_name_var and "{owning_package}" not in installed_path: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 raise ValueError(
383 'The installed_path must contain a "{name}" (preferred) or a "{owning_package}"'
384 " substitution (or have installed_path end with a slash). Otherwise, the installed"
385 f" path would caused file-conflicts. Triggered by plugin {self._plugin_name}"
386 )
388 if allow_name_segment and not has_name_var: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true
389 raise ValueError(
390 'When allow_name_segment is True, the installed_path must have a "{name}" substitution'
391 " variable. Otherwise, the name segment will not work properly. Triggered by"
392 f" plugin {self._plugin_name}"
393 )
395 if ( 395 ↛ 400line 395 didn't jump to line 400
396 default_priority is not None
397 and "{priority}" not in installed_path
398 and "{priority:02}" not in installed_path
399 ):
400 raise ValueError(
401 'When default_priority is not None, the installed_path should have a "{priority}"'
402 ' or a "{priority:02}" substitution variable. Otherwise, the priority would be lost.'
403 f" Triggered by plugin {self._plugin_name}"
404 )
406 if existing is not None:
407 if existing.debputy_plugin_metadata.plugin_name != self._plugin_name: 407 ↛ 414line 407 didn't jump to line 414
408 message = (
409 f'The stem "{stem}" is registered twice for packager provided files.'
410 f" Once by {existing.debputy_plugin_metadata.plugin_name} and once"
411 f" by {self._plugin_name}"
412 )
413 else:
414 message = (
415 f"Bug in the plugin {self._plugin_name}: It tried to register the"
416 f' stem "{stem}" twice for packager provided files.'
417 )
418 raise PluginConflictError(
419 message, existing.debputy_plugin_metadata, self._plugin_metadata
420 )
421 packager_provided_files[stem] = PackagerProvidedFileClassSpec(
422 self._plugin_metadata,
423 stem,
424 installed_path,
425 default_mode=default_mode,
426 default_priority=default_priority,
427 allow_name_segment=allow_name_segment,
428 allow_architecture_segment=allow_architecture_segment,
429 post_formatting_rewrite=post_formatting_rewrite,
430 packageless_is_fallback_for_all_packages=packageless_is_fallback_for_all_packages,
431 reservation_only=reservation_only,
432 formatting_callback=format_callback,
433 reference_documentation=reference_documentation,
434 )
436 def _unload() -> None:
437 del packager_provided_files[stem]
439 self._unloaders.append(_unload)
441 def metadata_or_maintscript_detector(
442 self,
443 auto_detector_id: str,
444 auto_detector: MetadataAutoDetector,
445 *,
446 package_type: PackageTypeSelector = "deb",
447 ) -> None:
448 if auto_detector_id in self._plugin_detector_ids: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 raise ValueError(
450 f"The plugin {self._plugin_name} tried to register"
451 f' "{auto_detector_id}" twice'
452 )
453 self._plugin_detector_ids.add(auto_detector_id)
454 all_detectors = self._feature_set.metadata_maintscript_detectors
455 if self._plugin_name not in all_detectors:
456 all_detectors[self._plugin_name] = []
457 package_types = resolve_package_type_selectors(package_type)
458 all_detectors[self._plugin_name].append(
459 MetadataOrMaintscriptDetector(
460 detector_id=auto_detector_id,
461 detector=wrap_plugin_code(self._plugin_name, auto_detector),
462 plugin_metadata=self._plugin_metadata,
463 applies_to_package_types=package_types,
464 enabled=True,
465 )
466 )
468 def _unload() -> None:
469 if self._plugin_name in all_detectors:
470 del all_detectors[self._plugin_name]
472 self._unloaders.append(_unload)
474 def document_builtin_variable(
475 self,
476 variable_name: str,
477 variable_reference_documentation: str,
478 *,
479 is_context_specific: bool = False,
480 is_for_special_case: bool = False,
481 ) -> None:
482 manifest_variables = self._feature_set.manifest_variables
483 self._restricted_api()
484 state = self._substitution.variable_state(variable_name)
485 if state == VariableNameState.UNDEFINED: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 raise ValueError(
487 f"The plugin {self._plugin_name} attempted to document built-in {variable_name},"
488 f" but it is not known to be a variable"
489 )
491 assert variable_name not in manifest_variables
493 manifest_variables[variable_name] = PluginProvidedManifestVariable(
494 self._plugin_metadata,
495 variable_name,
496 None,
497 is_context_specific_variable=is_context_specific,
498 variable_reference_documentation=variable_reference_documentation,
499 is_documentation_placeholder=True,
500 is_for_special_case=is_for_special_case,
501 )
503 def _unload() -> None:
504 del manifest_variables[variable_name]
506 self._unloaders.append(_unload)
508 def manifest_variable_provider(
509 self,
510 provider: Callable[[VariableContext], Mapping[str, str]],
511 variables: Union[Sequence[str], Mapping[str, Optional[str]]],
512 ) -> None:
513 self._restricted_api()
514 cached_provider = functools.lru_cache(None)(provider)
515 permitted_variables = frozenset(variables)
516 variables_iter: Iterable[Tuple[str, Optional[str]]]
517 if not isinstance(variables, Mapping): 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 variables_iter = zip(variables, itertools.repeat(None))
519 else:
520 variables_iter = variables.items()
522 checked_vars = False
523 manifest_variables = self._feature_set.manifest_variables
524 plugin_name = self._plugin_name
526 def _value_resolver_generator(
527 variable_name: str,
528 ) -> Callable[[VariableContext], str]:
529 def _value_resolver(variable_context: VariableContext) -> str:
530 res = cached_provider(variable_context)
531 nonlocal checked_vars
532 if not checked_vars: 532 ↛ 543line 532 didn't jump to line 543 because the condition on line 532 was always true
533 if permitted_variables != res.keys(): 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 expected = ", ".join(sorted(permitted_variables))
535 actual = ", ".join(sorted(res))
536 raise PluginAPIViolationError(
537 f"The plugin {plugin_name} claimed to provide"
538 f" the following variables {expected},"
539 f" but when resolving the variables, the plugin provided"
540 f" {actual}. These two lists should have been the same."
541 )
542 checked_vars = False
543 return res[variable_name]
545 return _value_resolver
547 for varname, vardoc in variables_iter:
548 self._check_variable_name(varname)
549 manifest_variables[varname] = PluginProvidedManifestVariable(
550 self._plugin_metadata,
551 varname,
552 _value_resolver_generator(varname),
553 is_context_specific_variable=False,
554 variable_reference_documentation=vardoc,
555 )
557 def _unload() -> None:
558 raise PluginInitializationError(
559 "Cannot unload manifest_variable_provider (not implemented)"
560 )
562 self._unloaders.append(_unload)
564 def _check_variable_name(self, variable_name: str) -> None:
565 manifest_variables = self._feature_set.manifest_variables
566 existing = manifest_variables.get(variable_name)
568 if existing is not None:
569 if existing.plugin_metadata.plugin_name == self._plugin_name: 569 ↛ 575line 569 didn't jump to line 575
570 message = (
571 f"Bug in the plugin {self._plugin_name}: It tried to register the"
572 f' manifest variable "{variable_name}" twice.'
573 )
574 else:
575 message = (
576 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
577 f" both tried to provide the manifest variable {variable_name}"
578 )
579 raise PluginConflictError(
580 message, existing.plugin_metadata, self._plugin_metadata
581 )
582 if not SUBST_VAR_RE.match("{{" + variable_name + "}}"):
583 raise ValueError(
584 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
585 f" which is not a valid variable name"
586 )
588 namespace = ""
589 variable_basename = variable_name
590 if ":" in variable_name:
591 namespace, variable_basename = variable_name.rsplit(":", 1)
592 assert namespace != ""
593 assert variable_name != ""
595 if namespace != "" and namespace not in ("token", "path"):
596 raise ValueError(
597 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
598 f" which is in the reserved namespace {namespace}"
599 )
601 variable_name_upper = variable_name.upper()
602 if (
603 variable_name_upper.startswith(("DEB_", "DPKG_", "DEBPUTY"))
604 or variable_basename.startswith("_")
605 or variable_basename.upper().startswith("DEBPUTY")
606 ) and self._plugin_name != "debputy":
607 raise ValueError(
608 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
609 f" which is a variable name reserved by debputy"
610 )
612 state = self._substitution.variable_state(variable_name)
613 if state != VariableNameState.UNDEFINED and self._plugin_name != "debputy":
614 raise ValueError(
615 f"The plugin {self._plugin_name} attempted to declare {variable_name},"
616 f" which would shadow a built-in variable"
617 )
619 def package_processor(
620 self,
621 processor_id: str,
622 processor: PackageProcessor,
623 *,
624 depends_on_processor: Iterable[str] = tuple(),
625 package_type: PackageTypeSelector = "deb",
626 ) -> None:
627 self._restricted_api(allowed_plugins={"lua", "debputy-self-hosting"})
628 package_processors = self._feature_set.all_package_processors
629 dependencies = set()
630 processor_key = (self._plugin_name, processor_id)
632 if processor_key in package_processors: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true
633 raise PluginConflictError(
634 f"The plugin {self._plugin_name} already registered a processor with id {processor_id}",
635 self._plugin_metadata,
636 self._plugin_metadata,
637 )
639 for depends_ref in depends_on_processor:
640 if isinstance(depends_ref, str): 640 ↛ 654line 640 didn't jump to line 654 because the condition on line 640 was always true
641 if (self._plugin_name, depends_ref) in package_processors: 641 ↛ 643line 641 didn't jump to line 643 because the condition on line 641 was always true
642 depends_key = (self._plugin_name, depends_ref)
643 elif ("debputy", depends_ref) in package_processors:
644 depends_key = ("debputy", depends_ref)
645 else:
646 raise ValueError(
647 f'Could not resolve dependency "{depends_ref}" for'
648 f' "{processor_id}". It was not provided by the plugin itself'
649 f" ({self._plugin_name}) nor debputy."
650 )
651 else:
652 # TODO: Add proper dependencies first, at which point we should probably resolve "name"
653 # via the direct dependencies.
654 assert False
656 existing_processor = package_processors.get(depends_key)
657 if existing_processor is None: 657 ↛ 660line 657 didn't jump to line 660 because the condition on line 657 was never true
658 # We currently require the processor to be declared already. If this ever changes,
659 # PluginProvidedFeatureSet.package_processors_in_order will need an update
660 dplugin_name, dprocessor_name = depends_key
661 available_processors = ", ".join(
662 n for p, n in package_processors.keys() if p == dplugin_name
663 )
664 raise ValueError(
665 f"The plugin {dplugin_name} does not provide a processor called"
666 f" {dprocessor_name}. Available processors for that plugin are:"
667 f" {available_processors}"
668 )
669 dependencies.add(depends_key)
671 package_processors[processor_key] = PluginProvidedPackageProcessor(
672 processor_id,
673 resolve_package_type_selectors(package_type),
674 wrap_plugin_code(self._plugin_name, processor),
675 frozenset(dependencies),
676 self._plugin_metadata,
677 )
679 def _unload() -> None:
680 del package_processors[processor_key]
682 self._unloaders.append(_unload)
684 def automatic_discard_rule(
685 self,
686 name: str,
687 should_discard: Callable[[VirtualPath], bool],
688 *,
689 rule_reference_documentation: Optional[str] = None,
690 examples: Union[
691 AutomaticDiscardRuleExample, Sequence[AutomaticDiscardRuleExample]
692 ] = tuple(),
693 ) -> None:
694 """Register an automatic discard rule
696 An automatic discard rule is basically applied to *every* path about to be installed in to any package.
697 If any discard rule concludes that a path should not be installed, then the path is not installed.
698 In the case where the discard path is a:
700 * directory: Then the entire directory is excluded along with anything beneath it.
701 * symlink: Then the symlink itself (but not its target) is excluded.
702 * hardlink: Then the current hardlink will not be installed, but other instances of it will be.
704 Note: Discarded files are *never* deleted by `debputy`. They just make `debputy` skip the file.
706 Automatic discard rules should be written with the assumption that directories will be tested
707 before their content *when it is relevant* for the discard rule to examine whether the directory
708 can be excluded.
710 The packager can via the manifest overrule automatic discard rules by explicitly listing the path
711 without any globs. As example:
713 installations:
714 - install:
715 sources:
716 - usr/lib/libfoo.la # <-- This path is always installed
717 # (Discard rules are never asked in this case)
718 #
719 - usr/lib/*.so* # <-- Discard rules applies to any path beneath usr/lib and can exclude matches
720 # Though, they will not examine `libfoo.la` as it has already been installed
721 #
722 # Note: usr/lib itself is never tested in this case (it is assumed to be
723 # explicitly requested). But any subdir of usr/lib will be examined.
725 When an automatic discard rule is evaluated, it can see the source path currently being considered
726 for installation. While it can look at "surrounding" context (like parent directory), it will not
727 know whether those paths are to be installed or will be installed.
729 :param name: A user visible name discard rule. It can be used on the command line, so avoid shell
730 metacharacters and spaces.
731 :param should_discard: A callable that is the implementation of the automatic discard rule. It will receive
732 a VirtualPath representing the *source* path about to be installed. If callable returns `True`, then the
733 path is discarded. If it returns `False`, the path is not discarded (by this rule at least).
734 A source path will either be from the root of the source tree or the root of a search directory such as
735 `debian/tmp`. Where the path will be installed is not available at the time the discard rule is
736 evaluated.
737 :param rule_reference_documentation: Optionally, the reference documentation to be shown when a user
738 looks up this automatic discard rule.
739 :param examples: Provide examples for the rule. Use the automatic_discard_rule_example function to
740 generate the examples.
742 """
743 self._restricted_api()
744 auto_discard_rules = self._feature_set.auto_discard_rules
745 existing = auto_discard_rules.get(name)
746 if existing is not None: 746 ↛ 747line 746 didn't jump to line 747 because the condition on line 746 was never true
747 if existing.plugin_metadata.plugin_name == self._plugin_name:
748 message = (
749 f"Bug in the plugin {self._plugin_name}: It tried to register the"
750 f' automatic discard rule "{name}" twice.'
751 )
752 else:
753 message = (
754 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
755 f" both tried to provide the automatic discard rule {name}"
756 )
757 raise PluginConflictError(
758 message, existing.plugin_metadata, self._plugin_metadata
759 )
760 examples = (
761 (examples,)
762 if isinstance(examples, AutomaticDiscardRuleExample)
763 else tuple(examples)
764 )
765 auto_discard_rules[name] = PluginProvidedDiscardRule(
766 name,
767 self._plugin_metadata,
768 should_discard,
769 rule_reference_documentation,
770 examples,
771 )
773 def _unload() -> None:
774 del auto_discard_rules[name]
776 self._unloaders.append(_unload)
778 def service_provider(
779 self,
780 service_manager: str,
781 detector: ServiceDetector,
782 integrator: ServiceIntegrator,
783 ) -> None:
784 self._restricted_api()
785 service_managers = self._feature_set.service_managers
786 existing = service_managers.get(service_manager)
787 if existing is not None: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true
788 if existing.plugin_metadata.plugin_name == self._plugin_name:
789 message = (
790 f"Bug in the plugin {self._plugin_name}: It tried to register the"
791 f' service manager "{service_manager}" twice.'
792 )
793 else:
794 message = (
795 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
796 f' both tried to provide the service manager "{service_manager}"'
797 )
798 raise PluginConflictError(
799 message, existing.plugin_metadata, self._plugin_metadata
800 )
801 service_managers[service_manager] = ServiceManagerDetails(
802 service_manager,
803 wrap_plugin_code(self._plugin_name, detector),
804 wrap_plugin_code(self._plugin_name, integrator),
805 self._plugin_metadata,
806 )
808 def _unload() -> None:
809 del service_managers[service_manager]
811 self._unloaders.append(_unload)
813 def manifest_variable(
814 self,
815 variable_name: str,
816 value: str,
817 variable_reference_documentation: Optional[str] = None,
818 ) -> None:
819 self._check_variable_name(variable_name)
820 manifest_variables = self._feature_set.manifest_variables
821 try:
822 resolved_value = self._substitution.substitute(
823 value, "Plugin initialization"
824 )
825 depends_on_variable = resolved_value != value
826 except DebputySubstitutionError:
827 depends_on_variable = True
828 if depends_on_variable:
829 raise ValueError(
830 f"The plugin {self._plugin_name} attempted to declare {variable_name} with value {value!r}."
831 f" This value depends on another variable, which is not supported. This restriction may be"
832 f" lifted in the future."
833 )
835 manifest_variables[variable_name] = PluginProvidedManifestVariable(
836 self._plugin_metadata,
837 variable_name,
838 value,
839 is_context_specific_variable=False,
840 variable_reference_documentation=variable_reference_documentation,
841 )
843 def _unload() -> None:
844 # We need to check it was never resolved
845 raise PluginInitializationError(
846 "Cannot unload manifest_variable (not implemented)"
847 )
849 self._unloaders.append(_unload)
851 @property
852 def _plugin_name(self) -> str:
853 return self._plugin_metadata.plugin_name
855 def provide_manifest_keyword(
856 self,
857 rule_type: TTP,
858 rule_name: Union[str, List[str]],
859 handler: DIPKWHandler,
860 *,
861 inline_reference_documentation: Optional[ParserDocumentation] = None,
862 ) -> None:
863 self._restricted_api()
864 parser_generator = self._feature_set.manifest_parser_generator
865 if rule_type not in parser_generator.dispatchable_table_parsers: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 types = ", ".join(
867 sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers)
868 )
869 raise ValueError(
870 f"The rule_type was not a supported type. It must be one of {types}"
871 )
873 inline_reference_documentation = self._pluggable_manifest_docs_for(
874 rule_type,
875 rule_name,
876 inline_reference_documentation=inline_reference_documentation,
877 )
879 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
880 dispatching_parser.register_keyword(
881 rule_name,
882 wrap_plugin_code(self._plugin_name, handler),
883 self._plugin_metadata,
884 inline_reference_documentation=inline_reference_documentation,
885 )
887 def _unload() -> None:
888 raise PluginInitializationError(
889 "Cannot unload provide_manifest_keyword (not implemented)"
890 )
892 self._unloaders.append(_unload)
894 def pluggable_object_parser(
895 self,
896 rule_type: str,
897 rule_name: str,
898 *,
899 object_parser_key: Optional[str] = None,
900 on_end_parse_step: Optional[
901 Callable[
902 [str, Optional[Mapping[str, Any]], AttributePath, ParserContextData],
903 None,
904 ]
905 ] = None,
906 nested_in_package_context: bool = False,
907 ) -> None:
908 self._restricted_api()
909 if object_parser_key is None: 909 ↛ 910line 909 didn't jump to line 910 because the condition on line 909 was never true
910 object_parser_key = rule_name
912 parser_generator = self._feature_set.manifest_parser_generator
913 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
914 if rule_type not in dispatchable_object_parsers: 914 ↛ 915line 914 didn't jump to line 915 because the condition on line 914 was never true
915 types = ", ".join(sorted(dispatchable_object_parsers))
916 raise ValueError(
917 f"The rule_type was not a supported type. It must be one of {types}"
918 )
919 if object_parser_key not in dispatchable_object_parsers: 919 ↛ 920line 919 didn't jump to line 920 because the condition on line 919 was never true
920 types = ", ".join(sorted(dispatchable_object_parsers))
921 raise ValueError(
922 f"The object_parser_key was not a supported type. It must be one of {types}"
923 )
924 parent_dispatcher = dispatchable_object_parsers[rule_type]
925 child_dispatcher = dispatchable_object_parsers[object_parser_key]
927 if on_end_parse_step is not None: 927 ↛ 930line 927 didn't jump to line 930 because the condition on line 927 was always true
928 on_end_parse_step = wrap_plugin_code(self._plugin_name, on_end_parse_step)
930 parent_dispatcher.register_child_parser(
931 rule_name,
932 child_dispatcher,
933 self._plugin_metadata,
934 on_end_parse_step=on_end_parse_step,
935 nested_in_package_context=nested_in_package_context,
936 )
938 def _unload() -> None:
939 raise PluginInitializationError(
940 "Cannot unload pluggable_object_parser (not implemented)"
941 )
943 self._unloaders.append(_unload)
945 def pluggable_manifest_rule(
946 self,
947 rule_type: Union[TTP, str],
948 rule_name: Union[str, Sequence[str]],
949 parsed_format: Type[PF],
950 handler: DIPHandler,
951 *,
952 source_format: Optional[SF] = None,
953 inline_reference_documentation: Optional[ParserDocumentation] = None,
954 expected_debputy_integration_mode: Optional[
955 Container[DebputyIntegrationMode]
956 ] = None,
957 apply_standard_attribute_documentation: bool = False,
958 ) -> None:
959 # When changing this, consider which types will be unrestricted
960 self._restricted_api()
961 if apply_standard_attribute_documentation and sys.version_info < (3, 12): 961 ↛ 962line 961 didn't jump to line 962 because the condition on line 961 was never true
962 _error(
963 f"The plugin {self._plugin_metadata.plugin_name} requires python 3.12 due to"
964 f" its use of apply_standard_attribute_documentation"
965 )
966 feature_set = self._feature_set
967 parser_generator = feature_set.manifest_parser_generator
968 if isinstance(rule_type, str):
969 if rule_type not in parser_generator.dispatchable_object_parsers: 969 ↛ 970line 969 didn't jump to line 970 because the condition on line 969 was never true
970 types = ", ".join(sorted(parser_generator.dispatchable_object_parsers))
971 raise ValueError(
972 f"The rule_type was not a supported type. It must be one of {types}"
973 )
974 dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type]
975 else:
976 if rule_type not in parser_generator.dispatchable_table_parsers: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 types = ", ".join(
978 sorted(
979 x.__name__ for x in parser_generator.dispatchable_table_parsers
980 )
981 )
982 raise ValueError(
983 f"The rule_type was not a supported type. It must be one of {types}"
984 )
985 dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
987 inline_reference_documentation = self._pluggable_manifest_docs_for(
988 rule_type,
989 rule_name,
990 inline_reference_documentation=inline_reference_documentation,
991 )
993 if apply_standard_attribute_documentation: 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true
994 docs = _STD_ATTR_DOCS
995 else:
996 docs = None
998 parser = feature_set.manifest_parser_generator.generate_parser(
999 parsed_format,
1000 source_content=source_format,
1001 inline_reference_documentation=inline_reference_documentation,
1002 expected_debputy_integration_mode=expected_debputy_integration_mode,
1003 automatic_docs=docs,
1004 )
1005 dispatching_parser.register_parser(
1006 rule_name,
1007 parser,
1008 wrap_plugin_code(self._plugin_name, handler),
1009 self._plugin_metadata,
1010 )
1012 def _unload() -> None:
1013 raise PluginInitializationError(
1014 "Cannot unload pluggable_manifest_rule (not implemented)"
1015 )
1017 self._unloaders.append(_unload)
1019 def register_build_system(
1020 self,
1021 build_system_definition: type[BSPF],
1022 ) -> None:
1023 self._restricted_api()
1024 if not is_typeddict(build_system_definition): 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true
1025 raise PluginInitializationError(
1026 f"Expected build_system_definition to be a subclass of {BuildRuleParsedFormat.__name__},"
1027 f" but got {build_system_definition.__name__} instead"
1028 )
1029 metadata = getattr(
1030 build_system_definition,
1031 _DEBPUTY_DISPATCH_METADATA_ATTR_NAME,
1032 None,
1033 )
1034 if not isinstance(metadata, BuildSystemManifestRuleMetadata): 1034 ↛ 1035line 1034 didn't jump to line 1035 because the condition on line 1034 was never true
1035 raise PluginIncorrectRegistrationError(
1036 f"The {build_system_definition.__qualname__} type should have been annotated with"
1037 f" @{debputy_build_system.__name__}."
1038 )
1039 assert len(metadata.manifest_keywords) == 1
1040 build_system_impl = metadata.build_system_impl
1041 assert build_system_impl is not None
1042 manifest_keyword = next(iter(metadata.manifest_keywords))
1043 self.pluggable_manifest_rule(
1044 metadata.dispatched_type,
1045 metadata.manifest_keywords,
1046 build_system_definition,
1047 # pluggable_manifest_rule does the wrapping
1048 metadata.unwrapped_constructor,
1049 source_format=metadata.source_format,
1050 inline_reference_documentation=metadata.online_reference_documentation,
1051 expected_debputy_integration_mode=only_integrations(INTEGRATION_MODE_FULL),
1052 )
1053 self._auto_detectable_build_system(
1054 manifest_keyword,
1055 build_system_impl,
1056 constructor=wrap_plugin_code(
1057 self._plugin_name,
1058 build_system_impl,
1059 ),
1060 shadowing_build_systems_when_active=metadata.auto_detection_shadow_build_systems,
1061 )
1063 def _auto_detectable_build_system(
1064 self,
1065 manifest_keyword: str,
1066 rule_type: type[BSR],
1067 *,
1068 shadowing_build_systems_when_active: FrozenSet[str] = frozenset(),
1069 constructor: Optional[
1070 Callable[[BuildRuleParsedFormat, AttributePath, "HighLevelManifest"], BSR]
1071 ] = None,
1072 ) -> None:
1073 self._restricted_api()
1074 feature_set = self._feature_set
1075 existing = feature_set.auto_detectable_build_systems.get(rule_type)
1076 if existing is not None: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true
1077 bs_name = rule_type.__class__.__name__
1078 if existing.plugin_metadata.plugin_name == self._plugin_name:
1079 message = (
1080 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1081 f' auto-detection of the build system "{bs_name}" twice.'
1082 )
1083 else:
1084 message = (
1085 f"The plugins {existing.plugin_metadata.plugin_name} and {self._plugin_name}"
1086 f' both tried to provide auto-detection of the build system "{bs_name}"'
1087 )
1088 raise PluginConflictError(
1089 message, existing.plugin_metadata, self._plugin_metadata
1090 )
1092 if constructor is None: 1092 ↛ 1094line 1092 didn't jump to line 1094 because the condition on line 1092 was never true
1094 def impl(
1095 attributes: BuildRuleParsedFormat,
1096 attribute_path: AttributePath,
1097 manifest: "HighLevelManifest",
1098 ) -> BSR:
1099 return rule_type(attributes, attribute_path, manifest)
1101 else:
1102 impl = constructor
1104 feature_set.auto_detectable_build_systems[rule_type] = (
1105 PluginProvidedBuildSystemAutoDetection(
1106 manifest_keyword,
1107 rule_type,
1108 wrap_plugin_code(self._plugin_name, rule_type.auto_detect_build_system),
1109 impl,
1110 shadowing_build_systems_when_active,
1111 self._plugin_metadata,
1112 )
1113 )
1115 def _unload() -> None:
1116 try:
1117 del feature_set.auto_detectable_build_systems[rule_type]
1118 except KeyError:
1119 pass
1121 self._unloaders.append(_unload)
1123 def known_packaging_files(
1124 self,
1125 packaging_file_details: KnownPackagingFileInfo,
1126 ) -> None:
1127 known_packaging_files = self._feature_set.known_packaging_files
1128 detection_method = packaging_file_details.get(
1129 "detection_method", cast("Literal['path']", "path")
1130 )
1131 path = packaging_file_details.get("path")
1132 dhpkgfile = packaging_file_details.get("pkgfile")
1134 packaging_file_details: KnownPackagingFileInfo = packaging_file_details.copy()
1136 if detection_method == "path":
1137 if dhpkgfile is not None:
1138 raise ValueError(
1139 'The "pkgfile" attribute cannot be used when detection-method is "path" (or omitted)'
1140 )
1141 if path != _normalize_path(path, with_prefix=False):
1142 raise ValueError(
1143 f"The path for known packaging files must be normalized. Please replace"
1144 f' "{path}" with "{_normalize_path(path, with_prefix=False)}"'
1145 )
1146 detection_value = path
1147 else:
1148 assert detection_method == "dh.pkgfile"
1149 if path is not None:
1150 raise ValueError(
1151 'The "path" attribute cannot be used when detection-method is "dh.pkgfile"'
1152 )
1153 if "/" in dhpkgfile:
1154 raise ValueError(
1155 'The "pkgfile" attribute ḿust be a name stem such as "install" (no "/" are allowed)'
1156 )
1157 detection_value = dhpkgfile
1158 key = f"{detection_method}::{detection_value}"
1159 existing = known_packaging_files.get(key)
1160 if existing is not None:
1161 if existing.plugin_metadata.plugin_name != self._plugin_name:
1162 message = (
1163 f'The key "{key}" is registered twice for known packaging files.'
1164 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1165 )
1166 else:
1167 message = (
1168 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1169 f' key "{key}" twice for known packaging files.'
1170 )
1171 raise PluginConflictError(
1172 message, existing.plugin_metadata, self._plugin_metadata
1173 )
1174 _validate_known_packaging_file_dh_compat_rules(
1175 packaging_file_details.get("dh_compat_rules")
1176 )
1177 known_packaging_files[key] = PluginProvidedKnownPackagingFile(
1178 packaging_file_details,
1179 detection_method,
1180 detection_value,
1181 self._plugin_metadata,
1182 )
1184 def _unload() -> None:
1185 del known_packaging_files[key]
1187 self._unloaders.append(_unload)
1189 def register_mapped_type(
1190 self,
1191 type_mapping: TypeMapping,
1192 *,
1193 reference_documentation: Optional[TypeMappingDocumentation] = None,
1194 ) -> None:
1195 self._restricted_api()
1196 target_type = type_mapping.target_type
1197 mapped_types = self._feature_set.mapped_types
1198 existing = mapped_types.get(target_type)
1199 if existing is not None: 1199 ↛ 1200line 1199 didn't jump to line 1200 because the condition on line 1199 was never true
1200 if existing.plugin_metadata.plugin_name != self._plugin_name:
1201 message = (
1202 f'The key "{target_type.__name__}" is registered twice for known packaging files.'
1203 f" Once by {existing.plugin_metadata.plugin_name} and once by {self._plugin_name}"
1204 )
1205 else:
1206 message = (
1207 f"Bug in the plugin {self._plugin_name}: It tried to register the"
1208 f' key "{target_type.__name__}" twice for known packaging files.'
1209 )
1210 raise PluginConflictError(
1211 message, existing.plugin_metadata, self._plugin_metadata
1212 )
1213 parser_generator = self._feature_set.manifest_parser_generator
1214 # TODO: Wrap the mapper in the plugin context
1215 mapped_types[target_type] = PluginProvidedTypeMapping(
1216 type_mapping, reference_documentation, self._plugin_metadata
1217 )
1218 parser_generator.register_mapped_type(type_mapping)
1220 def _restricted_api(
1221 self,
1222 *,
1223 allowed_plugins: Union[Set[str], FrozenSet[str]] = frozenset(),
1224 ) -> None:
1225 if self._plugin_name != "debputy" and self._plugin_name not in allowed_plugins: 1225 ↛ 1226line 1225 didn't jump to line 1226 because the condition on line 1225 was never true
1226 raise PluginAPIViolationError(
1227 f"Plugin {self._plugin_name} attempted to access a debputy-only API."
1228 " If you are the maintainer of this plugin and want access to this"
1229 " API, please file a feature request to make this public."
1230 " (The API is currently private as it is unstable.)"
1231 )
1234class MaintscriptAccessorProviderBase(MaintscriptAccessor, ABC):
1235 __slots__ = ()
1237 def _append_script(
1238 self,
1239 caller_name: str,
1240 maintscript: Maintscript,
1241 full_script: str,
1242 /,
1243 perform_substitution: bool = True,
1244 ) -> None:
1245 raise NotImplementedError
1247 @classmethod
1248 def _apply_condition_to_script(
1249 cls,
1250 condition: str,
1251 run_snippet: str,
1252 /,
1253 indent: Optional[bool] = None,
1254 ) -> str:
1255 if indent is None:
1256 # We auto-determine this based on heredocs currently
1257 indent = "<<" not in run_snippet
1259 if indent:
1260 run_snippet = "".join(" " + x for x in run_snippet.splitlines(True))
1261 if not run_snippet.endswith("\n"):
1262 run_snippet += "\n"
1263 condition_line = f"if {condition}; then\n"
1264 end_line = "fi\n"
1265 return "".join((condition_line, run_snippet, end_line))
1267 def on_configure(
1268 self,
1269 run_snippet: str,
1270 /,
1271 indent: Optional[bool] = None,
1272 perform_substitution: bool = True,
1273 skip_on_rollback: bool = False,
1274 ) -> None:
1275 condition = POSTINST_DEFAULT_CONDITION
1276 if skip_on_rollback: 1276 ↛ 1277line 1276 didn't jump to line 1277 because the condition on line 1276 was never true
1277 condition = '[ "$1" = "configure" ]'
1278 return self._append_script(
1279 "on_configure",
1280 "postinst",
1281 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1282 perform_substitution=perform_substitution,
1283 )
1285 def on_initial_install(
1286 self,
1287 run_snippet: str,
1288 /,
1289 indent: Optional[bool] = None,
1290 perform_substitution: bool = True,
1291 ) -> None:
1292 condition = '[ "$1" = "configure" -a -z "$2" ]'
1293 return self._append_script(
1294 "on_initial_install",
1295 "postinst",
1296 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1297 perform_substitution=perform_substitution,
1298 )
1300 def on_upgrade(
1301 self,
1302 run_snippet: str,
1303 /,
1304 indent: Optional[bool] = None,
1305 perform_substitution: bool = True,
1306 ) -> None:
1307 condition = '[ "$1" = "configure" -a -n "$2" ]'
1308 return self._append_script(
1309 "on_upgrade",
1310 "postinst",
1311 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1312 perform_substitution=perform_substitution,
1313 )
1315 def on_upgrade_from(
1316 self,
1317 version: str,
1318 run_snippet: str,
1319 /,
1320 indent: Optional[bool] = None,
1321 perform_substitution: bool = True,
1322 ) -> None:
1323 condition = '[ "$1" = "configure" ] && dpkg --compare-versions le-nl "$2"'
1324 return self._append_script(
1325 "on_upgrade_from",
1326 "postinst",
1327 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1328 perform_substitution=perform_substitution,
1329 )
1331 def on_before_removal(
1332 self,
1333 run_snippet: str,
1334 /,
1335 indent: Optional[bool] = None,
1336 perform_substitution: bool = True,
1337 ) -> None:
1338 condition = '[ "$1" = "remove" ]'
1339 return self._append_script(
1340 "on_before_removal",
1341 "prerm",
1342 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1343 perform_substitution=perform_substitution,
1344 )
1346 def on_removed(
1347 self,
1348 run_snippet: str,
1349 /,
1350 indent: Optional[bool] = None,
1351 perform_substitution: bool = True,
1352 ) -> None:
1353 condition = '[ "$1" = "remove" ]'
1354 return self._append_script(
1355 "on_removed",
1356 "postrm",
1357 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1358 perform_substitution=perform_substitution,
1359 )
1361 def on_purge(
1362 self,
1363 run_snippet: str,
1364 /,
1365 indent: Optional[bool] = None,
1366 perform_substitution: bool = True,
1367 ) -> None:
1368 condition = '[ "$1" = "purge" ]'
1369 return self._append_script(
1370 "on_purge",
1371 "postrm",
1372 self._apply_condition_to_script(condition, run_snippet, indent=indent),
1373 perform_substitution=perform_substitution,
1374 )
1376 def unconditionally_in_script(
1377 self,
1378 maintscript: Maintscript,
1379 run_snippet: str,
1380 /,
1381 perform_substitution: bool = True,
1382 ) -> None:
1383 if maintscript not in STD_CONTROL_SCRIPTS: 1383 ↛ 1384line 1383 didn't jump to line 1384 because the condition on line 1383 was never true
1384 raise ValueError(
1385 f'Unknown script "{maintscript}". Should have been one of:'
1386 f' {", ".join(sorted(STD_CONTROL_SCRIPTS))}'
1387 )
1388 return self._append_script(
1389 "unconditionally_in_script",
1390 maintscript,
1391 run_snippet,
1392 perform_substitution=perform_substitution,
1393 )
1396class MaintscriptAccessorProvider(MaintscriptAccessorProviderBase):
1397 __slots__ = (
1398 "_plugin_metadata",
1399 "_maintscript_snippets",
1400 "_plugin_source_id",
1401 "_package_substitution",
1402 "_default_snippet_order",
1403 )
1405 def __init__(
1406 self,
1407 plugin_metadata: DebputyPluginMetadata,
1408 plugin_source_id: str,
1409 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1410 package_substitution: Substitution,
1411 *,
1412 default_snippet_order: Optional[Literal["service"]] = None,
1413 ):
1414 self._plugin_metadata = plugin_metadata
1415 self._plugin_source_id = plugin_source_id
1416 self._maintscript_snippets = maintscript_snippets
1417 self._package_substitution = package_substitution
1418 self._default_snippet_order = default_snippet_order
1420 def _append_script(
1421 self,
1422 caller_name: str,
1423 maintscript: Maintscript,
1424 full_script: str,
1425 /,
1426 perform_substitution: bool = True,
1427 ) -> None:
1428 def_source = f"{self._plugin_metadata.plugin_name} ({self._plugin_source_id})"
1429 if perform_substitution:
1430 full_script = self._package_substitution.substitute(full_script, def_source)
1432 snippet = MaintscriptSnippet(
1433 snippet=full_script,
1434 definition_source=def_source,
1435 snippet_order=self._default_snippet_order,
1436 )
1437 self._maintscript_snippets[maintscript].append(snippet)
1440class BinaryCtrlAccessorProviderBase(BinaryCtrlAccessor):
1441 __slots__ = (
1442 "_plugin_metadata",
1443 "_plugin_source_id",
1444 "_package_metadata_context",
1445 "_triggers",
1446 "_substvars",
1447 "_maintscript",
1448 "_shlibs_details",
1449 )
1451 def __init__(
1452 self,
1453 plugin_metadata: DebputyPluginMetadata,
1454 plugin_source_id: str,
1455 package_metadata_context: PackageProcessingContext,
1456 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1457 substvars: FlushableSubstvars,
1458 shlibs_details: Tuple[Optional[str], Optional[List[str]]],
1459 ) -> None:
1460 self._plugin_metadata = plugin_metadata
1461 self._plugin_source_id = plugin_source_id
1462 self._package_metadata_context = package_metadata_context
1463 self._triggers = triggers
1464 self._substvars = substvars
1465 self._maintscript: Optional[MaintscriptAccessor] = None
1466 self._shlibs_details = shlibs_details
1468 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1469 raise NotImplementedError
1471 def dpkg_trigger(self, trigger_type: DpkgTriggerType, trigger_target: str) -> None:
1472 """Register a declarative dpkg level trigger
1474 The provided trigger will be added to the package's metadata (the triggers file of the control.tar).
1476 If the trigger has already been added previously, a second call with the same trigger data will be ignored.
1477 """
1478 key = (trigger_type, trigger_target)
1479 if key in self._triggers: 1479 ↛ 1480line 1479 didn't jump to line 1480 because the condition on line 1479 was never true
1480 return
1481 self._triggers[key] = PluginProvidedTrigger(
1482 dpkg_trigger_type=trigger_type,
1483 dpkg_trigger_target=trigger_target,
1484 provider=self._plugin_metadata,
1485 provider_source_id=self._plugin_source_id,
1486 )
1488 @property
1489 def maintscript(self) -> MaintscriptAccessor:
1490 maintscript = self._maintscript
1491 if maintscript is None:
1492 maintscript = self._create_maintscript_accessor()
1493 self._maintscript = maintscript
1494 return maintscript
1496 @property
1497 def substvars(self) -> FlushableSubstvars:
1498 return self._substvars
1500 def dpkg_shlibdeps(self, paths: Sequence[VirtualPath]) -> None:
1501 binary_package = self._package_metadata_context.binary_package
1502 with self.substvars.flush() as substvars_file:
1503 dpkg_cmd = ["dpkg-shlibdeps", f"-T{substvars_file}"]
1504 if binary_package.is_udeb:
1505 dpkg_cmd.append("-tudeb")
1506 if binary_package.is_essential: 1506 ↛ 1507line 1506 didn't jump to line 1507 because the condition on line 1506 was never true
1507 dpkg_cmd.append("-dPre-Depends")
1508 shlibs_local, shlib_dirs = self._shlibs_details
1509 if shlibs_local is not None: 1509 ↛ 1510line 1509 didn't jump to line 1510 because the condition on line 1509 was never true
1510 dpkg_cmd.append(f"-L{shlibs_local}")
1511 if shlib_dirs: 1511 ↛ 1512line 1511 didn't jump to line 1512 because the condition on line 1511 was never true
1512 dpkg_cmd.extend(f"-l{sd}" for sd in shlib_dirs)
1513 dpkg_cmd.extend(p.fs_path for p in paths)
1514 print_command(*dpkg_cmd)
1515 try:
1516 subprocess.check_call(dpkg_cmd)
1517 except subprocess.CalledProcessError:
1518 _error(
1519 f"Attempting to auto-detect dependencies via dpkg-shlibdeps for {binary_package.name} failed. Please"
1520 " review the output from dpkg-shlibdeps above to understand what went wrong."
1521 )
1524class BinaryCtrlAccessorProvider(BinaryCtrlAccessorProviderBase):
1525 __slots__ = (
1526 "_maintscript",
1527 "_maintscript_snippets",
1528 "_package_substitution",
1529 )
1531 def __init__(
1532 self,
1533 plugin_metadata: DebputyPluginMetadata,
1534 plugin_source_id: str,
1535 package_metadata_context: PackageProcessingContext,
1536 triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger],
1537 substvars: FlushableSubstvars,
1538 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1539 package_substitution: Substitution,
1540 shlibs_details: Tuple[Optional[str], Optional[List[str]]],
1541 *,
1542 default_snippet_order: Optional[Literal["service"]] = None,
1543 ) -> None:
1544 super().__init__(
1545 plugin_metadata,
1546 plugin_source_id,
1547 package_metadata_context,
1548 triggers,
1549 substvars,
1550 shlibs_details,
1551 )
1552 self._maintscript_snippets = maintscript_snippets
1553 self._package_substitution = package_substitution
1554 self._maintscript = MaintscriptAccessorProvider(
1555 plugin_metadata,
1556 plugin_source_id,
1557 maintscript_snippets,
1558 package_substitution,
1559 default_snippet_order=default_snippet_order,
1560 )
1562 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
1563 return MaintscriptAccessorProvider(
1564 self._plugin_metadata,
1565 self._plugin_source_id,
1566 self._maintscript_snippets,
1567 self._package_substitution,
1568 )
1571class BinaryCtrlAccessorProviderCreator:
1572 def __init__(
1573 self,
1574 package_metadata_context: PackageProcessingContext,
1575 substvars: FlushableSubstvars,
1576 maintscript_snippets: Dict[str, MaintscriptSnippetContainer],
1577 substitution: Substitution,
1578 ) -> None:
1579 self._package_metadata_context = package_metadata_context
1580 self._substvars = substvars
1581 self._maintscript_snippets = maintscript_snippets
1582 self._substitution = substitution
1583 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
1584 self.shlibs_details: Tuple[Optional[str], Optional[List[str]]] = None, None
1586 def for_plugin(
1587 self,
1588 plugin_metadata: DebputyPluginMetadata,
1589 plugin_source_id: str,
1590 *,
1591 default_snippet_order: Optional[Literal["service"]] = None,
1592 ) -> BinaryCtrlAccessor:
1593 return BinaryCtrlAccessorProvider(
1594 plugin_metadata,
1595 plugin_source_id,
1596 self._package_metadata_context,
1597 self._triggers,
1598 self._substvars,
1599 self._maintscript_snippets,
1600 self._substitution,
1601 self.shlibs_details,
1602 default_snippet_order=default_snippet_order,
1603 )
1605 def generated_triggers(self) -> Iterable[PluginProvidedTrigger]:
1606 return self._triggers.values()
1609def _resolve_bundled_plugin_docs_path(
1610 plugin_name: str,
1611 loader: Optional[PluginInitializationEntryPoint],
1612) -> Optional[Union[Traversable, Path]]:
1613 plugin_module = getattr(loader, "__module__")
1614 assert plugin_module is not None
1615 plugin_package_name = sys.modules[plugin_module].__package__
1616 return importlib.resources.files(plugin_package_name).joinpath(
1617 f"{plugin_name}_docs.yaml"
1618 )
1621def plugin_metadata_for_debputys_own_plugin(
1622 loader: Optional[PluginInitializationEntryPoint] = None,
1623) -> DebputyPluginMetadata:
1624 if loader is None:
1625 from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features
1627 loader = initialize_debputy_features
1628 plugin_name = "debputy"
1629 return DebputyPluginMetadata(
1630 plugin_name="debputy",
1631 api_compat_version=1,
1632 plugin_initializer=loader,
1633 plugin_loader=None,
1634 plugin_doc_path_resolver=lambda: _resolve_bundled_plugin_docs_path(
1635 plugin_name,
1636 loader,
1637 ),
1638 plugin_path="<bundled>",
1639 )
1642def load_plugin_features(
1643 plugin_search_dirs: Sequence[str],
1644 substitution: Substitution,
1645 requested_plugins_only: Optional[Sequence[str]] = None,
1646 required_plugins: Optional[Set[str]] = None,
1647 plugin_feature_set: Optional[PluginProvidedFeatureSet] = None,
1648 debug_mode: bool = False,
1649) -> PluginProvidedFeatureSet:
1650 if plugin_feature_set is None:
1651 plugin_feature_set = PluginProvidedFeatureSet()
1652 plugins = [plugin_metadata_for_debputys_own_plugin()]
1653 unloadable_plugins = set()
1654 if required_plugins:
1655 plugins.extend(
1656 find_json_plugins(
1657 plugin_search_dirs,
1658 required_plugins,
1659 )
1660 )
1661 if requested_plugins_only is not None:
1662 plugins.extend(
1663 find_json_plugins(
1664 plugin_search_dirs,
1665 requested_plugins_only,
1666 )
1667 )
1668 else:
1669 auto_loaded = _find_all_json_plugins(
1670 plugin_search_dirs,
1671 required_plugins if required_plugins is not None else frozenset(),
1672 debug_mode=debug_mode,
1673 )
1674 for plugin_metadata in auto_loaded:
1675 plugins.append(plugin_metadata)
1676 unloadable_plugins.add(plugin_metadata.plugin_name)
1678 for plugin_metadata in plugins:
1679 api = DebputyPluginInitializerProvider(
1680 plugin_metadata, plugin_feature_set, substitution
1681 )
1682 try:
1683 api.load_plugin()
1684 except PluginBaseError as e:
1685 if plugin_metadata.plugin_name not in unloadable_plugins:
1686 raise
1687 if debug_mode:
1688 _warn(
1689 f"The optional plugin {plugin_metadata.plugin_name} failed during load. Re-raising due"
1690 f" to --debug/-d or DEBPUTY_DEBUG=1"
1691 )
1692 raise
1693 try:
1694 api.unload_plugin()
1695 except Exception:
1696 _warn(
1697 f"Failed to load optional {plugin_metadata.plugin_name} and an error was raised when trying to"
1698 " clean up after the half-initialized plugin. Re-raising load error as the partially loaded"
1699 " module might have tainted the feature set."
1700 )
1701 raise e from None
1702 else:
1703 _warn(
1704 f"The optional plugin {plugin_metadata.plugin_name} failed during load. The plugin was"
1705 f" deactivated. Use debug mode (--debug/DEBPUTY_DEBUG=1) to show the stacktrace"
1706 f" (the warning will become an error)"
1707 )
1709 return plugin_feature_set
1712def find_json_plugin(
1713 search_dirs: Sequence[str],
1714 requested_plugin: str,
1715) -> DebputyPluginMetadata:
1716 r = list(find_json_plugins(search_dirs, [requested_plugin]))
1717 assert len(r) == 1
1718 return r[0]
1721def find_related_implementation_files_for_plugin(
1722 plugin_metadata: DebputyPluginMetadata,
1723) -> List[str]:
1724 if plugin_metadata.is_bundled:
1725 plugin_name = plugin_metadata.plugin_name
1726 _error(
1727 f"Cannot run find related files for {plugin_name}: The plugin seems to be bundled"
1728 " or loaded via a mechanism that does not support detecting its tests."
1729 )
1730 files = []
1731 module_name, module_file = _find_plugin_implementation_file(
1732 plugin_metadata.plugin_name,
1733 plugin_metadata.plugin_path,
1734 )
1735 if os.path.isfile(module_file):
1736 files.append(module_file)
1737 else:
1738 if not plugin_metadata.is_loaded:
1739 plugin_metadata.load_plugin()
1740 if module_name in sys.modules:
1741 _error(
1742 f'The plugin {plugin_metadata.plugin_name} uses the "module"" key in its'
1743 f" JSON metadata file ({plugin_metadata.plugin_path}) and cannot be "
1744 f" installed via this method. The related Python would not be installed"
1745 f" (which would result in a plugin that would fail to load)"
1746 )
1748 return files
1751def find_tests_for_plugin(
1752 plugin_metadata: DebputyPluginMetadata,
1753) -> List[str]:
1754 plugin_name = plugin_metadata.plugin_name
1755 plugin_path = plugin_metadata.plugin_path
1757 if plugin_metadata.is_bundled:
1758 _error(
1759 f"Cannot run tests for {plugin_name}: The plugin seems to be bundled or loaded via a"
1760 " mechanism that does not support detecting its tests."
1761 )
1763 plugin_dir = os.path.dirname(plugin_path)
1764 test_basename_prefix = plugin_metadata.plugin_name.replace("-", "_")
1765 tests = []
1766 with os.scandir(plugin_dir) as dir_iter:
1767 for p in dir_iter:
1768 if (
1769 p.is_file()
1770 and p.name.startswith(test_basename_prefix)
1771 and PLUGIN_TEST_SUFFIX.search(p.name)
1772 ):
1773 tests.append(p.path)
1774 return tests
1777def find_json_plugins(
1778 search_dirs: Sequence[str],
1779 requested_plugins: Iterable[str],
1780) -> Iterable[DebputyPluginMetadata]:
1781 for plugin_name_or_path in requested_plugins:
1782 found = False
1783 if "/" in plugin_name_or_path: 1783 ↛ 1784line 1783 didn't jump to line 1784 because the condition on line 1783 was never true
1784 if not os.path.isfile(plugin_name_or_path):
1785 raise PluginNotFoundError(
1786 f"Unable to load the plugin {plugin_name_or_path}: The path is not a file."
1787 ' (Because the plugin name contains "/", it is assumed to be a path and search path'
1788 " is not used."
1789 )
1790 yield parse_json_plugin_desc(plugin_name_or_path)
1791 return
1792 for search_dir in search_dirs:
1793 path = os.path.join(
1794 search_dir, "debputy", "plugins", f"{plugin_name_or_path}.json"
1795 )
1796 if not os.path.isfile(path): 1796 ↛ 1797line 1796 didn't jump to line 1797 because the condition on line 1796 was never true
1797 continue
1798 found = True
1799 yield parse_json_plugin_desc(path)
1800 if not found: 1800 ↛ 1801line 1800 didn't jump to line 1801 because the condition on line 1800 was never true
1801 search_dir_str = ":".join(search_dirs)
1802 raise PluginNotFoundError(
1803 f"Unable to load the plugin {plugin_name_or_path}: Could not find {plugin_name_or_path}.json in the"
1804 f" debputy/plugins subdir of any of the search dirs ({search_dir_str})"
1805 )
1808def _find_all_json_plugins(
1809 search_dirs: Sequence[str],
1810 required_plugins: AbstractSet[str],
1811 debug_mode: bool = False,
1812) -> Iterable[DebputyPluginMetadata]:
1813 seen = set(required_plugins)
1814 error_seen = False
1815 for search_dir in search_dirs:
1816 try:
1817 dir_fd = os.scandir(os.path.join(search_dir, "debputy", "plugins"))
1818 except FileNotFoundError:
1819 continue
1820 with dir_fd:
1821 for entry in dir_fd:
1822 if (
1823 not entry.is_file(follow_symlinks=True)
1824 or not entry.name.endswith(".json")
1825 or entry.name in seen
1826 ):
1827 continue
1828 try:
1829 plugin_metadata = parse_json_plugin_desc(entry.path)
1830 except PluginBaseError as e:
1831 if debug_mode:
1832 raise
1833 if not error_seen:
1834 error_seen = True
1835 _warn(
1836 f"Failed to load the plugin in {entry.path} due to the following error: {e.message}"
1837 )
1838 else:
1839 _warn(
1840 f"Failed to load plugin in {entry.path} due to errors (not shown)."
1841 )
1842 else:
1843 yield plugin_metadata
1846def _find_plugin_implementation_file(
1847 plugin_name: str,
1848 json_file_path: str,
1849) -> Tuple[str, str]:
1850 guessed_module_basename = plugin_name.replace("-", "_")
1851 module_name = f"debputy.plugin.{guessed_module_basename}"
1852 module_fs_path = os.path.join(
1853 os.path.dirname(json_file_path), f"{guessed_module_basename}.py"
1854 )
1855 return module_name, module_fs_path
1858def _resolve_module_initializer(
1859 plugin_name: str,
1860 plugin_initializer_name: str,
1861 module_name: Optional[str],
1862 json_file_path: str,
1863) -> PluginInitializationEntryPoint:
1864 module = None
1865 module_fs_path = None
1866 if module_name is None: 1866 ↛ 1894line 1866 didn't jump to line 1894 because the condition on line 1866 was always true
1867 module_name, module_fs_path = _find_plugin_implementation_file(
1868 plugin_name, json_file_path
1869 )
1870 if os.path.isfile(module_fs_path): 1870 ↛ 1894line 1870 didn't jump to line 1894 because the condition on line 1870 was always true
1871 spec = importlib.util.spec_from_file_location(module_name, module_fs_path)
1872 if spec is None: 1872 ↛ 1873line 1872 didn't jump to line 1873 because the condition on line 1872 was never true
1873 raise PluginInitializationError(
1874 f"Failed to load {plugin_name} (path: {module_fs_path})."
1875 " The spec_from_file_location function returned None."
1876 )
1877 mod = importlib.util.module_from_spec(spec)
1878 loader = spec.loader
1879 if loader is None: 1879 ↛ 1880line 1879 didn't jump to line 1880 because the condition on line 1879 was never true
1880 raise PluginInitializationError(
1881 f"Failed to load {plugin_name} (path: {module_fs_path})."
1882 " Python could not find a suitable loader (spec.loader was None)"
1883 )
1884 sys.modules[module_name] = mod
1885 try:
1886 run_in_context_of_plugin(plugin_name, loader.exec_module, mod)
1887 except (Exception, GeneratorExit) as e:
1888 raise PluginInitializationError(
1889 f"Failed to load {plugin_name} (path: {module_fs_path})."
1890 " The module threw an exception while being loaded."
1891 ) from e
1892 module = mod
1894 if module is None: 1894 ↛ 1895line 1894 didn't jump to line 1895 because the condition on line 1894 was never true
1895 try:
1896 module = run_in_context_of_plugin(
1897 plugin_name, importlib.import_module, module_name
1898 )
1899 except ModuleNotFoundError as e:
1900 if module_fs_path is None:
1901 raise PluginMetadataError(
1902 f'The plugin defined in "{json_file_path}" wanted to load the module "{module_name}", but'
1903 " this module is not available in the python search path"
1904 ) from e
1905 raise PluginInitializationError(
1906 f"Failed to load {plugin_name}. Tried loading it from"
1907 f' "{module_fs_path}" (which did not exist) and PYTHONPATH as'
1908 f" {module_name} (where it was not found either). Please ensure"
1909 " the module code is installed in the correct spot or provide an"
1910 f' explicit "module" definition in {json_file_path}.'
1911 ) from e
1913 plugin_initializer = run_in_context_of_plugin_wrap_errors(
1914 plugin_name,
1915 getattr,
1916 module,
1917 plugin_initializer_name,
1918 )
1920 if plugin_initializer is None: 1920 ↛ 1921line 1920 didn't jump to line 1921 because the condition on line 1920 was never true
1921 raise PluginMetadataError(
1922 f'The plugin defined in {json_file_path} claimed that module "{module_name}" would have an'
1923 f" attribute called {plugin_initializer}. However, it does not. Please correct the plugin"
1924 f" metadata or initializer name in the Python module."
1925 )
1926 return cast("PluginInitializationEntryPoint", plugin_initializer)
1929def _json_plugin_loader(
1930 plugin_name: str,
1931 plugin_json_metadata: PluginJsonMetadata,
1932 json_file_path: str,
1933 attribute_path: AttributePath,
1934) -> Callable[["DebputyPluginInitializer"], None]:
1935 api_compat = plugin_json_metadata["api_compat_version"]
1936 module_name = plugin_json_metadata.get("module")
1937 plugin_initializer_name = plugin_json_metadata.get("plugin_initializer")
1938 packager_provided_files_raw = plugin_json_metadata.get(
1939 "packager_provided_files", []
1940 )
1941 manifest_variables_raw = plugin_json_metadata.get("manifest_variables")
1942 known_packaging_files_raw = plugin_json_metadata.get("known_packaging_files")
1943 if api_compat != 1: 1943 ↛ 1944line 1943 didn't jump to line 1944 because the condition on line 1943 was never true
1944 raise PluginMetadataError(
1945 f'The plugin defined in "{json_file_path}" requires API compat level {api_compat}, but this'
1946 f" version of debputy only supports API compat version of 1"
1947 )
1948 if plugin_initializer_name is not None and "." in plugin_initializer_name: 1948 ↛ 1949line 1948 didn't jump to line 1949 because the condition on line 1948 was never true
1949 p = attribute_path["plugin_initializer"]
1950 raise PluginMetadataError(
1951 f'The "{p}" must not contain ".". Problematic file is "{json_file_path}".'
1952 )
1954 plugin_initializers = []
1956 if plugin_initializer_name is not None:
1957 plugin_initializer = _resolve_module_initializer(
1958 plugin_name,
1959 plugin_initializer_name,
1960 module_name,
1961 json_file_path,
1962 )
1963 plugin_initializers.append(plugin_initializer)
1965 if known_packaging_files_raw: 1965 ↛ 1966line 1965 didn't jump to line 1966 because the condition on line 1965 was never true
1966 kpf_root_path = attribute_path["known_packaging_files"]
1967 known_packaging_files = []
1968 for k, v in enumerate(known_packaging_files_raw):
1969 kpf_path = kpf_root_path[k]
1970 p = v.get("path")
1971 if isinstance(p, str):
1972 kpf_path.path_hint = p
1973 if plugin_name.startswith("debputy-") and isinstance(v, dict):
1974 docs = v.get("documentation-uris")
1975 if docs is not None and isinstance(docs, list):
1976 docs = [
1977 (
1978 d.replace("@DEBPUTY_DOC_ROOT_DIR@", DEBPUTY_DOC_ROOT_DIR)
1979 if isinstance(d, str)
1980 else d
1981 )
1982 for d in docs
1983 ]
1984 v["documentation-uris"] = docs
1985 known_packaging_file: KnownPackagingFileInfo = (
1986 PLUGIN_KNOWN_PACKAGING_FILES_PARSER.parse_input(
1987 v,
1988 kpf_path,
1989 )
1990 )
1991 known_packaging_files.append((kpf_path, known_packaging_file))
1993 def _initialize_json_provided_known_packaging_files(
1994 api: DebputyPluginInitializerProvider,
1995 ) -> None:
1996 for p, details in known_packaging_files:
1997 try:
1998 api.known_packaging_files(details)
1999 except ValueError as ex:
2000 raise PluginMetadataError(
2001 f"Error while processing {p.path} defined in {json_file_path}: {ex.args[0]}"
2002 )
2004 plugin_initializers.append(_initialize_json_provided_known_packaging_files)
2006 if manifest_variables_raw:
2007 manifest_var_path = attribute_path["manifest_variables"]
2008 manifest_variables = [
2009 PLUGIN_MANIFEST_VARS_PARSER.parse_input(p, manifest_var_path[i])
2010 for i, p in enumerate(manifest_variables_raw)
2011 ]
2013 def _initialize_json_provided_manifest_vars(
2014 api: DebputyPluginInitializer,
2015 ) -> None:
2016 for idx, manifest_variable in enumerate(manifest_variables):
2017 name = manifest_variable["name"]
2018 value = manifest_variable["value"]
2019 doc = manifest_variable.get("reference_documentation")
2020 try:
2021 api.manifest_variable(
2022 name, value, variable_reference_documentation=doc
2023 )
2024 except ValueError as ex:
2025 var_path = manifest_var_path[idx]
2026 raise PluginMetadataError(
2027 f"Error while processing {var_path.path} defined in {json_file_path}: {ex.args[0]}"
2028 )
2030 plugin_initializers.append(_initialize_json_provided_manifest_vars)
2032 if packager_provided_files_raw:
2033 ppf_path = attribute_path["packager_provided_files"]
2034 ppfs = [
2035 PLUGIN_PPF_PARSER.parse_input(p, ppf_path[i])
2036 for i, p in enumerate(packager_provided_files_raw)
2037 ]
2039 def _initialize_json_provided_ppfs(api: DebputyPluginInitializer) -> None:
2040 ppf: PackagerProvidedFileJsonDescription
2041 for idx, ppf in enumerate(ppfs):
2042 c = dict(ppf)
2043 stem = ppf["stem"]
2044 installed_path = ppf["installed_path"]
2045 default_mode = ppf.get("default_mode")
2046 ref_doc_dict = ppf.get("reference_documentation")
2047 if default_mode is not None: 2047 ↛ 2050line 2047 didn't jump to line 2050 because the condition on line 2047 was always true
2048 c["default_mode"] = default_mode.octal_mode
2050 if ref_doc_dict is not None: 2050 ↛ 2055line 2050 didn't jump to line 2055 because the condition on line 2050 was always true
2051 ref_doc = packager_provided_file_reference_documentation(
2052 **ref_doc_dict
2053 )
2054 else:
2055 ref_doc = None
2057 for k in [
2058 "stem",
2059 "installed_path",
2060 "reference_documentation",
2061 ]:
2062 try:
2063 del c[k]
2064 except KeyError:
2065 pass
2067 try:
2068 api.packager_provided_file(stem, installed_path, reference_documentation=ref_doc, **c) # type: ignore
2069 except ValueError as ex:
2070 p_path = ppf_path[idx]
2071 raise PluginMetadataError(
2072 f"Error while processing {p_path.path} defined in {json_file_path}: {ex.args[0]}"
2073 )
2075 plugin_initializers.append(_initialize_json_provided_ppfs)
2077 if not plugin_initializers: 2077 ↛ 2078line 2077 didn't jump to line 2078 because the condition on line 2077 was never true
2078 raise PluginMetadataError(
2079 f"The plugin defined in {json_file_path} does not seem to provide features, "
2080 f" such as module + plugin-initializer or packager-provided-files."
2081 )
2083 if len(plugin_initializers) == 1:
2084 return plugin_initializers[0]
2086 def _chain_loader(api: DebputyPluginInitializer) -> None:
2087 for initializer in plugin_initializers:
2088 initializer(api)
2090 return _chain_loader
2093@contextlib.contextmanager
2094def _open(path: str, fd: Optional[IO[bytes]] = None) -> Iterator[IO[bytes]]:
2095 if fd is not None:
2096 yield fd
2097 else:
2098 with open(path, "rb") as fd:
2099 yield fd
2102def _resolve_json_plugin_docs_path(
2103 plugin_name: str,
2104 plugin_path: str,
2105) -> Optional[Union[Traversable, Path]]:
2106 plugin_dir = os.path.dirname(plugin_path)
2107 return Path(os.path.join(plugin_dir, plugin_name + "_docs.yaml"))
2110def parse_json_plugin_desc(
2111 path: str,
2112 *,
2113 fd: Optional[IO[bytes]] = None,
2114) -> DebputyPluginMetadata:
2115 with _open(path, fd=fd) as rfd:
2116 try:
2117 raw = json.load(rfd)
2118 except JSONDecodeError as e:
2119 raise PluginMetadataError(
2120 f'The plugin defined in "{path}" could not be parsed as valid JSON: {e.args[0]}'
2121 ) from e
2122 plugin_name = os.path.basename(path)
2123 if plugin_name.endswith(".json"):
2124 plugin_name = plugin_name[:-5]
2125 elif plugin_name.endswith(".json.in"):
2126 plugin_name = plugin_name[:-8]
2128 if plugin_name == "debputy": 2128 ↛ 2130line 2128 didn't jump to line 2130 because the condition on line 2128 was never true
2129 # Provide a better error message than "The plugin has already loaded!?"
2130 raise PluginMetadataError(
2131 f'The plugin named {plugin_name} must be bundled with `debputy`. Please rename "{path}" so it does not'
2132 f" clash with the bundled plugin of same name."
2133 )
2135 attribute_path = AttributePath.root_path(raw)
2137 try:
2138 plugin_json_metadata = PLUGIN_METADATA_PARSER.parse_input(
2139 raw,
2140 attribute_path,
2141 )
2142 except ManifestParseException as e:
2143 raise PluginMetadataError(
2144 f'The plugin defined in "{path}" was valid JSON but could not be parsed: {e.message}'
2145 ) from e
2146 api_compat = plugin_json_metadata["api_compat_version"]
2148 return DebputyPluginMetadata(
2149 plugin_name=plugin_name,
2150 plugin_loader=lambda: _json_plugin_loader(
2151 plugin_name,
2152 plugin_json_metadata,
2153 path,
2154 attribute_path,
2155 ),
2156 api_compat_version=api_compat,
2157 plugin_doc_path_resolver=lambda: _resolve_json_plugin_docs_path(
2158 plugin_name, path
2159 ),
2160 plugin_initializer=None,
2161 plugin_path=path,
2162 )
2165@dataclasses.dataclass(slots=True, frozen=True)
2166class ServiceDefinitionImpl(ServiceDefinition[DSD]):
2167 name: str
2168 names: Sequence[str]
2169 path: VirtualPath
2170 type_of_service: str
2171 service_scope: str
2172 auto_enable_on_install: bool
2173 auto_start_on_install: bool
2174 on_upgrade: ServiceUpgradeRule
2175 definition_source: str
2176 is_plugin_provided_definition: bool
2177 service_context: Optional[DSD]
2179 def replace(self, **changes: Any) -> "ServiceDefinitionImpl[DSD]":
2180 return dataclasses.replace(self, **changes)
2183class ServiceRegistryImpl(ServiceRegistry[DSD]):
2184 __slots__ = ("_service_manager_details", "_service_definitions", "_seen_services")
2186 def __init__(self, service_manager_details: ServiceManagerDetails) -> None:
2187 self._service_manager_details = service_manager_details
2188 self._service_definitions: List[ServiceDefinition[DSD]] = []
2189 self._seen_services = set()
2191 @property
2192 def detected_services(self) -> Sequence[ServiceDefinition[DSD]]:
2193 return self._service_definitions
2195 def register_service(
2196 self,
2197 path: VirtualPath,
2198 name: Union[str, List[str]],
2199 *,
2200 type_of_service: str = "service", # "timer", etc.
2201 service_scope: str = "system",
2202 enable_by_default: bool = True,
2203 start_by_default: bool = True,
2204 default_upgrade_rule: ServiceUpgradeRule = "restart",
2205 service_context: Optional[DSD] = None,
2206 ) -> None:
2207 names = name if isinstance(name, list) else [name]
2208 if len(names) < 1:
2209 raise ValueError(
2210 f"The service must have at least one name - {path.absolute} did not have any"
2211 )
2212 for n in names:
2213 key = (n, type_of_service, service_scope)
2214 if key in self._seen_services:
2215 raise PluginAPIViolationError(
2216 f"The service manager (from {self._service_manager_details.plugin_metadata.plugin_name}) used"
2217 f" the service name {n} (type: {type_of_service}, scope: {service_scope}) twice. This is not"
2218 " allowed by the debputy plugin API."
2219 )
2220 # TODO: We cannot create a service definition immediate once the manifest is involved
2221 self._service_definitions.append(
2222 ServiceDefinitionImpl(
2223 names[0],
2224 names,
2225 path,
2226 type_of_service,
2227 service_scope,
2228 enable_by_default,
2229 start_by_default,
2230 default_upgrade_rule,
2231 f"Auto-detected by plugin {self._service_manager_details.plugin_metadata.plugin_name}",
2232 True,
2233 service_context,
2234 )
2235 )