Coverage for src/debputy/plugin/api/impl_types.py: 77%
562 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import dataclasses
2import os.path
3from importlib.resources.abc import Traversable
4from pathlib import Path
5from typing import (
6 Optional,
7 Callable,
8 FrozenSet,
9 Dict,
10 List,
11 Tuple,
12 Generic,
13 TYPE_CHECKING,
14 TypeVar,
15 cast,
16 Any,
17 Sequence,
18 Union,
19 Type,
20 TypedDict,
21 Iterable,
22 Mapping,
23 NotRequired,
24 Literal,
25 Set,
26 Iterator,
27 Container,
28 Protocol,
29)
30from weakref import ref
32from debputy.exceptions import (
33 DebputyFSIsROError,
34 PluginAPIViolationError,
35 PluginConflictError,
36 UnhandledOrUnexpectedErrorFromPluginError,
37 PluginBaseError,
38 PluginInitializationError,
39)
40from debputy.filesystem_scan import as_path_def
41from debputy.manifest_parser.exceptions import ManifestParseException
42from debputy.manifest_parser.tagging_types import DebputyParsedContent, TypeMapping
43from debputy.manifest_parser.util import AttributePath, check_integration_mode
44from debputy.packages import BinaryPackage
45from debputy.plugin.api import (
46 VirtualPath,
47 BinaryCtrlAccessor,
48 PackageProcessingContext,
49)
50from debputy.plugin.api.spec import (
51 DebputyPluginInitializer,
52 MetadataAutoDetector,
53 DpkgTriggerType,
54 ParserDocumentation,
55 PackageProcessor,
56 PathDef,
57 ParserAttributeDocumentation,
58 undocumented_attr,
59 documented_attr,
60 reference_documentation,
61 PackagerProvidedFileReferenceDocumentation,
62 TypeMappingDocumentation,
63 DebputyIntegrationMode,
64)
65from debputy.plugin.plugin_state import (
66 run_in_context_of_plugin,
67)
68from debputy.substitution import VariableContext
69from debputy.util import _normalize_path, package_cross_check_precheck
71if TYPE_CHECKING:
72 from debputy.lsp.diagnostics import LintSeverity
73 from debputy.plugin.api.spec import (
74 ServiceDetector,
75 ServiceIntegrator,
76 )
77 from debputy.manifest_parser.parser_data import ParserContextData
78 from debputy.highlevel_manifest import (
79 HighLevelManifest,
80 PackageTransformationDefinition,
81 BinaryPackageData,
82 )
83 from debputy.plugins.debputy.to_be_api_types import (
84 BuildRuleParsedFormat,
85 )
88TD = TypeVar("TD", bound="Union[DebputyParsedContent, List[DebputyParsedContent]]")
89PF = TypeVar("PF")
90SF = TypeVar("SF")
91TP = TypeVar("TP")
92TTP = Type[TP]
93BSR = TypeVar("BSR", bound="BuildSystemRule")
95DIPKWHandler = Callable[[str, AttributePath, "ParserContextData"], TP]
96DIPHandler = Callable[[str, PF, AttributePath, "ParserContextData"], TP]
99@dataclasses.dataclass(slots=True)
100class DebputyPluginMetadata:
101 plugin_name: str
102 api_compat_version: int
103 plugin_loader: Optional[Callable[[], Callable[["DebputyPluginInitializer"], None]]]
104 plugin_initializer: Optional[Callable[["DebputyPluginInitializer"], None]]
105 plugin_path: str
106 plugin_doc_path_resolver: Callable[[], Optional[Union[Traversable, Path]]] = (
107 lambda: None
108 )
109 is_from_python_path: bool = False
110 _is_initialized: bool = False
111 _is_doc_path_resolved: bool = False
112 _plugin_doc_path: Optional[Union[Traversable, Path]] = None
114 @property
115 def is_bundled(self) -> bool:
116 return self.plugin_path == "<bundled>"
118 @property
119 def is_loaded(self) -> bool:
120 return self.plugin_initializer is not None
122 @property
123 def is_initialized(self) -> bool:
124 return self._is_initialized
126 @property
127 def plugin_doc_path(self) -> Optional[Union[Traversable, Path]]:
128 if not self._is_doc_path_resolved:
129 self._plugin_doc_path = self.plugin_doc_path_resolver()
130 self._is_doc_path_resolved = True
131 return self._plugin_doc_path
133 def initialize_plugin(self, api: "DebputyPluginInitializer") -> None:
134 if self.is_initialized: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise RuntimeError("Cannot load plugins twice")
136 if not self.is_loaded:
137 self.load_plugin()
138 plugin_initializer = self.plugin_initializer
139 assert plugin_initializer is not None
140 plugin_initializer(api)
141 self._is_initialized = True
143 def load_plugin(self) -> None:
144 plugin_loader = self.plugin_loader
145 assert plugin_loader is not None
146 try:
147 self.plugin_initializer = run_in_context_of_plugin(
148 self.plugin_name,
149 plugin_loader,
150 )
151 except PluginBaseError:
152 raise
153 except Exception as e:
154 raise PluginInitializationError(
155 f"Initialization of {self.plugin_name} failed due to its initializer raising an exception"
156 ) from e
157 assert self.plugin_initializer is not None
160@dataclasses.dataclass(slots=True, frozen=True)
161class PluginProvidedParser(Generic[PF, TP]):
162 parser: "DeclarativeInputParser[PF]"
163 handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP]
164 plugin_metadata: DebputyPluginMetadata
166 def parse(
167 self,
168 name: str,
169 value: object,
170 attribute_path: "AttributePath",
171 *,
172 parser_context: "ParserContextData",
173 ) -> TP:
174 parsed_value = self.parser.parse_input(
175 value,
176 attribute_path,
177 parser_context=parser_context,
178 )
179 return self.handler(name, parsed_value, attribute_path, parser_context)
182class PPFFormatParam(TypedDict):
183 priority: Optional[int]
184 name: str
185 owning_package: str
188@dataclasses.dataclass(slots=True, frozen=True)
189class PackagerProvidedFileClassSpec:
190 debputy_plugin_metadata: DebputyPluginMetadata
191 stem: str
192 installed_as_format: str
193 default_mode: int
194 default_priority: Optional[int]
195 allow_name_segment: bool
196 allow_architecture_segment: bool
197 post_formatting_rewrite: Optional[Callable[[str], str]]
198 packageless_is_fallback_for_all_packages: bool
199 reservation_only: bool
200 formatting_callback: Optional[Callable[[str, PPFFormatParam, VirtualPath], str]] = (
201 None
202 )
203 reference_documentation: Optional[PackagerProvidedFileReferenceDocumentation] = None
204 bug_950723: bool = False
205 has_active_command: bool = True
207 @property
208 def supports_priority(self) -> bool:
209 return self.default_priority is not None
211 def compute_dest(
212 self,
213 assigned_name: str,
214 # Note this method is currently used 1:1 inside plugin tests.
215 *,
216 owning_package: Optional[str] = None,
217 assigned_priority: Optional[int] = None,
218 path: Optional[VirtualPath] = None,
219 ) -> Tuple[str, str]:
220 if assigned_priority is not None and not self.supports_priority: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise ValueError(
222 f"Cannot assign priority to packager provided files with stem"
223 f' "{self.stem}" (e.g., "debian/foo.{self.stem}"). They'
224 " do not use priority at all."
225 )
227 path_format = self.installed_as_format
228 if self.supports_priority and assigned_priority is None:
229 assigned_priority = self.default_priority
231 if owning_package is None:
232 owning_package = assigned_name
234 params: PPFFormatParam = {
235 "priority": assigned_priority,
236 "name": assigned_name,
237 "owning_package": owning_package,
238 }
240 if self.formatting_callback is not None:
241 if path is None: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise ValueError(
243 "The path parameter is required for PPFs with formatting_callback"
244 )
245 dest_path = self.formatting_callback(path_format, params, path)
246 else:
247 dest_path = path_format.format(**params)
249 dirname, basename = os.path.split(dest_path)
250 dirname = _normalize_path(dirname)
252 if self.post_formatting_rewrite:
253 basename = self.post_formatting_rewrite(basename)
254 return dirname, basename
257@dataclasses.dataclass(slots=True)
258class MetadataOrMaintscriptDetector:
259 plugin_metadata: DebputyPluginMetadata
260 detector_id: str
261 detector: MetadataAutoDetector
262 applies_to_package_types: FrozenSet[str]
263 enabled: bool = True
265 def applies_to(self, binary_package: BinaryPackage) -> bool:
266 return binary_package.package_type in self.applies_to_package_types
268 def run_detector(
269 self,
270 fs_root: "VirtualPath",
271 ctrl: "BinaryCtrlAccessor",
272 context: "PackageProcessingContext",
273 ) -> None:
274 try:
275 self.detector(fs_root, ctrl, context)
276 except DebputyFSIsROError as e:
277 nv = self.plugin_metadata.plugin_name
278 raise PluginAPIViolationError(
279 f'The plugin {nv} violated the API contract for "metadata detectors"'
280 " by attempting to mutate the provided file system in its metadata detector"
281 f" with id {self.detector_id}. File system mutation is *not* supported at"
282 " this stage (file system layout is committed and the attempted changes"
283 " would be lost)."
284 ) from e
285 except UnhandledOrUnexpectedErrorFromPluginError as e:
286 e.add_note(
287 f"The exception was raised by the detector with the ID: {self.detector_id}"
288 )
291class DeclarativeInputParser(Generic[TD]):
292 @property
293 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
294 return None
296 @property
297 def expected_debputy_integration_mode(
298 self,
299 ) -> Optional[Container[DebputyIntegrationMode]]:
300 return None
302 @property
303 def reference_documentation_url(self) -> Optional[str]:
304 doc = self.inline_reference_documentation
305 return doc.documentation_reference_url if doc is not None else None
307 def parse_input(
308 self,
309 value: object,
310 path: "AttributePath",
311 *,
312 parser_context: Optional["ParserContextData"] = None,
313 ) -> TD:
314 raise NotImplementedError
317class DelegatingDeclarativeInputParser(DeclarativeInputParser[TD]):
318 __slots__ = (
319 "delegate",
320 "_reference_documentation",
321 "_expected_debputy_integration_mode",
322 )
324 def __init__(
325 self,
326 delegate: DeclarativeInputParser[TD],
327 *,
328 inline_reference_documentation: Optional[ParserDocumentation] = None,
329 expected_debputy_integration_mode: Optional[
330 Container[DebputyIntegrationMode]
331 ] = None,
332 ) -> None:
333 self.delegate = delegate
334 self._reference_documentation = inline_reference_documentation
335 self._expected_debputy_integration_mode = expected_debputy_integration_mode
337 @property
338 def expected_debputy_integration_mode(
339 self,
340 ) -> Optional[Container[DebputyIntegrationMode]]:
341 return self._expected_debputy_integration_mode
343 @property
344 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
345 doc = self._reference_documentation
346 if doc is None:
347 return self.delegate.inline_reference_documentation
348 return doc
351class ListWrappedDeclarativeInputParser(DelegatingDeclarativeInputParser[TD]):
352 __slots__ = ()
354 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
355 doc_url = self.reference_documentation_url
356 if doc_url is not None: 356 ↛ 360line 356 didn't jump to line 360 because the condition on line 356 was always true
357 if see_url_version: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true
358 return f" Please see {doc_url} for the documentation."
359 return f" (Documentation: {doc_url})"
360 return ""
362 def parse_input(
363 self,
364 value: object,
365 path: "AttributePath",
366 *,
367 parser_context: Optional["ParserContextData"] = None,
368 ) -> TD:
369 check_integration_mode(
370 path, parser_context, self._expected_debputy_integration_mode
371 )
372 if not isinstance(value, list):
373 doc_ref = self._doc_url_error_suffix(see_url_version=True)
374 raise ManifestParseException(
375 f"The attribute {path.path} must be a list.{doc_ref}"
376 )
377 result = []
378 delegate = self.delegate
379 for idx, element in enumerate(value):
380 element_path = path[idx]
381 result.append(
382 delegate.parse_input(
383 element,
384 element_path,
385 parser_context=parser_context,
386 )
387 )
388 return result
391class DispatchingParserBase(Generic[TP]):
392 def __init__(self, manifest_attribute_path_template: str) -> None:
393 self.manifest_attribute_path_template = manifest_attribute_path_template
394 self._parsers: Dict[str, PluginProvidedParser[Any, TP]] = {}
396 @property
397 def unknown_keys_diagnostic_severity(self) -> Optional["LintSeverity"]:
398 return "error"
400 def is_known_keyword(self, keyword: str) -> bool:
401 return keyword in self._parsers
403 def registered_keywords(self) -> Iterable[str]:
404 yield from self._parsers
406 def parser_for(self, keyword: str) -> PluginProvidedParser[Any, TP]:
407 return self._parsers[keyword]
409 def register_keyword(
410 self,
411 keyword: Union[str, Sequence[str]],
412 handler: DIPKWHandler,
413 plugin_metadata: DebputyPluginMetadata,
414 *,
415 inline_reference_documentation: Optional[ParserDocumentation] = None,
416 ) -> None:
417 reference_documentation_url = None
418 if inline_reference_documentation:
419 if inline_reference_documentation.attribute_doc: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 raise ValueError(
421 "Cannot provide per-attribute documentation for a value-less keyword!"
422 )
423 if inline_reference_documentation.alt_parser_description: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 raise ValueError(
425 "Cannot provide non-mapping-format documentation for a value-less keyword!"
426 )
427 reference_documentation_url = (
428 inline_reference_documentation.documentation_reference_url
429 )
430 parser = DeclarativeValuelessKeywordInputParser(
431 inline_reference_documentation,
432 documentation_reference=reference_documentation_url,
433 )
435 def _combined_handler(
436 name: str,
437 _ignored: Any,
438 attr_path: AttributePath,
439 context: "ParserContextData",
440 ) -> TP:
441 return handler(name, attr_path, context)
443 p = PluginProvidedParser(
444 parser,
445 _combined_handler,
446 plugin_metadata,
447 )
449 self._add_parser(keyword, p)
451 def register_parser(
452 self,
453 keyword: Union[str, List[str]],
454 parser: "DeclarativeInputParser[PF]",
455 handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP],
456 plugin_metadata: DebputyPluginMetadata,
457 ) -> None:
458 p = PluginProvidedParser(
459 parser,
460 handler,
461 plugin_metadata,
462 )
463 self._add_parser(keyword, p)
465 def _add_parser(
466 self,
467 keyword: Union[str, Iterable[str]],
468 ppp: "PluginProvidedParser[PF, TP]",
469 ) -> None:
470 ks = [keyword] if isinstance(keyword, str) else keyword
471 for k in ks:
472 existing_parser = self._parsers.get(k)
473 if existing_parser is not None: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true
474 message = (
475 f'The rule name "{k}" is already taken by the plugin'
476 f" {existing_parser.plugin_metadata.plugin_name}. This conflict was triggered"
477 f" when plugin {ppp.plugin_metadata.plugin_name} attempted to register its parser."
478 )
479 raise PluginConflictError(
480 message,
481 existing_parser.plugin_metadata,
482 ppp.plugin_metadata,
483 )
484 self._new_parser(k, ppp)
486 def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None:
487 self._parsers[keyword] = ppp
489 def parse_input(
490 self,
491 orig_value: object,
492 attribute_path: "AttributePath",
493 *,
494 parser_context: "ParserContextData",
495 ) -> TP:
496 raise NotImplementedError
499class DispatchingObjectParser(
500 DispatchingParserBase[Mapping[str, Any]],
501 DeclarativeInputParser[Mapping[str, Any]],
502):
503 def __init__(
504 self,
505 manifest_attribute_path_template: str,
506 *,
507 parser_documentation: Optional[ParserDocumentation] = None,
508 expected_debputy_integration_mode: Optional[
509 Container[DebputyIntegrationMode]
510 ] = None,
511 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
512 allow_unknown_keys: bool = False,
513 ) -> None:
514 super().__init__(manifest_attribute_path_template)
515 self._attribute_documentation: List[ParserAttributeDocumentation] = []
516 if parser_documentation is None:
517 parser_documentation = reference_documentation()
518 self._parser_documentation = parser_documentation
519 self._expected_debputy_integration_mode = expected_debputy_integration_mode
520 self._unknown_keys_diagnostic_severity = unknown_keys_diagnostic_severity
521 self._allow_unknown_keys = allow_unknown_keys
523 @property
524 def unknown_keys_diagnostic_severity(self) -> Optional["LintSeverity"]:
525 return self._unknown_keys_diagnostic_severity
527 @property
528 def expected_debputy_integration_mode(
529 self,
530 ) -> Optional[Container[DebputyIntegrationMode]]:
531 return self._expected_debputy_integration_mode
533 @property
534 def reference_documentation_url(self) -> Optional[str]:
535 return self._parser_documentation.documentation_reference_url
537 @property
538 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
539 ref_doc = self._parser_documentation
540 return reference_documentation(
541 title=ref_doc.title,
542 description=ref_doc.description,
543 attributes=self._attribute_documentation,
544 reference_documentation_url=self.reference_documentation_url,
545 )
547 def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None:
548 super()._new_parser(keyword, ppp)
549 doc = ppp.parser.inline_reference_documentation
550 if doc is None or doc.description is None:
551 self._attribute_documentation.append(undocumented_attr(keyword))
552 else:
553 self._attribute_documentation.append(
554 documented_attr(keyword, doc.description)
555 )
557 def register_child_parser(
558 self,
559 keyword: str,
560 parser: "DispatchingObjectParser",
561 plugin_metadata: DebputyPluginMetadata,
562 *,
563 on_end_parse_step: Optional[
564 Callable[
565 [str, Optional[Mapping[str, Any]], AttributePath, "ParserContextData"],
566 None,
567 ]
568 ] = None,
569 nested_in_package_context: bool = False,
570 ) -> None:
571 def _handler(
572 name: str,
573 value: Mapping[str, Any],
574 path: AttributePath,
575 parser_context: "ParserContextData",
576 ) -> Mapping[str, Any]:
577 on_end_parse_step(name, value, path, parser_context)
578 return value
580 if nested_in_package_context:
581 parser = InPackageContextParser(
582 keyword,
583 parser,
584 )
586 p = PluginProvidedParser(
587 parser,
588 _handler,
589 plugin_metadata,
590 )
591 self._add_parser(keyword, p)
593 def parse_input(
594 self,
595 orig_value: object,
596 attribute_path: "AttributePath",
597 *,
598 parser_context: "ParserContextData",
599 ) -> TP:
600 check_integration_mode(
601 attribute_path,
602 parser_context,
603 self._expected_debputy_integration_mode,
604 )
605 doc_ref = ""
606 if self.reference_documentation_url is not None: 606 ↛ 610line 606 didn't jump to line 610 because the condition on line 606 was always true
607 doc_ref = (
608 f" Please see {self.reference_documentation_url} for the documentation."
609 )
610 if not isinstance(orig_value, dict):
611 raise ManifestParseException(
612 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
613 )
614 if not orig_value: 614 ↛ 615line 614 didn't jump to line 615 because the condition on line 614 was never true
615 raise ManifestParseException(
616 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
617 )
618 result = {}
619 unknown_keys = orig_value.keys() - self._parsers.keys()
620 if unknown_keys and not self._allow_unknown_keys: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true
621 first_key = next(iter(unknown_keys))
622 remaining_valid_attributes = self._parsers.keys() - orig_value.keys()
623 if not remaining_valid_attributes:
624 raise ManifestParseException(
625 f'The attribute "{first_key}" is not applicable at {attribute_path.path} (with the'
626 f" current set of plugins).{doc_ref}"
627 )
628 remaining_valid_attribute_names = ", ".join(remaining_valid_attributes)
629 raise ManifestParseException(
630 f'The attribute "{first_key}" is not applicable at {attribute_path.path} (with the current set'
631 " of plugins). Possible attributes available (and not already used) are:"
632 f" {remaining_valid_attribute_names}.{doc_ref}"
633 )
634 # Parse order is important for the root level (currently we use rule registration order)
635 for key, provided_parser in self._parsers.items():
636 value = orig_value.get(key)
637 if value is None:
638 if isinstance(provided_parser.parser, DispatchingObjectParser):
639 provided_parser.handler(
640 key,
641 {},
642 attribute_path[key],
643 parser_context,
644 )
645 continue
646 value_path = attribute_path[key]
647 if provided_parser is None: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true
648 valid_keys = ", ".join(sorted(self._parsers.keys()))
649 raise ManifestParseException(
650 f'Unknown or unsupported option "{key}" at {value_path.path}.'
651 " Valid options at this location are:"
652 f" {valid_keys}\n{doc_ref}"
653 )
654 parsed_value = provided_parser.parse(
655 key, value, value_path, parser_context=parser_context
656 )
657 result[key] = parsed_value
658 return result
661@dataclasses.dataclass(slots=True, frozen=True)
662class PackageContextData(Generic[TP]):
663 resolved_package_name: str
664 value: TP
667class InPackageContextParser(
668 DelegatingDeclarativeInputParser[Mapping[str, PackageContextData[TP]]]
669):
670 __slots__ = ()
672 def __init__(
673 self,
674 manifest_attribute_path_template: str,
675 delegate: DeclarativeInputParser[TP],
676 *,
677 parser_documentation: Optional[ParserDocumentation] = None,
678 ) -> None:
679 self.manifest_attribute_path_template = manifest_attribute_path_template
680 self._attribute_documentation: List[ParserAttributeDocumentation] = []
681 super().__init__(delegate, inline_reference_documentation=parser_documentation)
683 def parse_input(
684 self,
685 orig_value: object,
686 attribute_path: "AttributePath",
687 *,
688 parser_context: Optional["ParserContextData"] = None,
689 ) -> TP:
690 assert parser_context is not None
691 check_integration_mode(
692 attribute_path,
693 parser_context,
694 self._expected_debputy_integration_mode,
695 )
696 doc_ref = ""
697 if self.reference_documentation_url is not None: 697 ↛ 701line 697 didn't jump to line 701 because the condition on line 697 was always true
698 doc_ref = (
699 f" Please see {self.reference_documentation_url} for the documentation."
700 )
701 if not isinstance(orig_value, dict) or not orig_value: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 raise ManifestParseException(
703 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
704 )
705 delegate = self.delegate
706 result = {}
707 for package_name_raw, value in orig_value.items():
709 definition_source = attribute_path[package_name_raw]
710 package_name = package_name_raw
711 if "{{" in package_name:
712 package_name = parser_context.substitution.substitute(
713 package_name_raw,
714 definition_source.path,
715 )
716 package_state: PackageTransformationDefinition
717 with parser_context.binary_package_context(package_name) as package_state:
718 if package_state.is_auto_generated_package: 718 ↛ 720line 718 didn't jump to line 720 because the condition on line 718 was never true
719 # Maybe lift (part) of this restriction.
720 raise ManifestParseException(
721 f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
722 " auto-generated package."
723 )
724 parsed_value = delegate.parse_input(
725 value, definition_source, parser_context=parser_context
726 )
727 result[package_name_raw] = PackageContextData(
728 package_name, parsed_value
729 )
730 return result
733class DispatchingTableParser(
734 DispatchingParserBase[TP],
735 DeclarativeInputParser[TP],
736):
737 def __init__(self, base_type: TTP, manifest_attribute_path_template: str) -> None:
738 super().__init__(manifest_attribute_path_template)
739 self.base_type = base_type
741 def parse_input(
742 self,
743 orig_value: object,
744 attribute_path: "AttributePath",
745 *,
746 parser_context: "ParserContextData",
747 ) -> TP:
748 if isinstance(orig_value, str): 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 key = orig_value
750 value = None
751 value_path = attribute_path
752 elif isinstance(orig_value, dict): 752 ↛ 763line 752 didn't jump to line 763 because the condition on line 752 was always true
753 if len(orig_value) != 1: 753 ↛ 754line 753 didn't jump to line 754 because the condition on line 753 was never true
754 valid_keys = ", ".join(sorted(self._parsers.keys()))
755 raise ManifestParseException(
756 f'The mapping "{attribute_path.path}" had two keys, but it should only have one top level key.'
757 " Maybe you are missing a list marker behind the second key or some indentation. The"
758 f" possible keys are: {valid_keys}"
759 )
760 key, value = next(iter(orig_value.items()))
761 value_path = attribute_path[key]
762 else:
763 raise ManifestParseException(
764 f"The attribute {attribute_path.path} must be a string or a mapping."
765 )
766 provided_parser = self._parsers.get(key)
767 if provided_parser is None: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true
768 valid_keys = ", ".join(sorted(self._parsers.keys()))
769 raise ManifestParseException(
770 f'Unknown or unsupported action "{key}" at {value_path.path}.'
771 " Valid actions at this location are:"
772 f" {valid_keys}"
773 )
774 return provided_parser.parse(
775 key, value, value_path, parser_context=parser_context
776 )
779@dataclasses.dataclass(slots=True)
780class DeclarativeValuelessKeywordInputParser(DeclarativeInputParser[None]):
781 inline_reference_documentation: Optional[ParserDocumentation] = None
782 documentation_reference: Optional[str] = None
784 def parse_input(
785 self,
786 value: object,
787 path: "AttributePath",
788 *,
789 parser_context: Optional["ParserContextData"] = None,
790 ) -> TD:
791 if value is None:
792 return cast("TD", value)
793 if self.documentation_reference is not None:
794 doc_ref = f" (Documentation: {self.documentation_reference})"
795 else:
796 doc_ref = ""
797 raise ManifestParseException(
798 f"Expected attribute {path.path} to be a string.{doc_ref}"
799 )
802@dataclasses.dataclass(slots=True)
803class PluginProvidedManifestVariable:
804 plugin_metadata: DebputyPluginMetadata
805 variable_name: str
806 variable_value: Optional[Union[str, Callable[[VariableContext], str]]]
807 is_context_specific_variable: bool
808 variable_reference_documentation: Optional[str] = None
809 is_documentation_placeholder: bool = False
810 is_for_special_case: bool = False
812 @property
813 def is_internal(self) -> bool:
814 return self.variable_name.startswith("_") or ":_" in self.variable_name
816 @property
817 def is_token(self) -> bool:
818 return self.variable_name.startswith("token:")
820 def resolve(self, variable_context: VariableContext) -> str:
821 value_resolver = self.variable_value
822 if isinstance(value_resolver, str):
823 res = value_resolver
824 else:
825 res = value_resolver(variable_context)
826 return res
829@dataclasses.dataclass(slots=True, frozen=True)
830class AutomaticDiscardRuleExample:
831 content: Sequence[Tuple[PathDef, bool]]
832 description: Optional[str] = None
835def automatic_discard_rule_example(
836 *content: Union[str, PathDef, Tuple[Union[str, PathDef], bool]],
837 example_description: Optional[str] = None,
838) -> AutomaticDiscardRuleExample:
839 """Provide an example for an automatic discard rule
841 The return value of this method should be passed to the `examples` parameter of
842 `automatic_discard_rule` method - either directly for a single example or as a
843 part of a sequence of examples.
845 >>> # Possible example for an exclude rule for ".la" files
846 >>> # Example shows two files; The ".la" file that will be removed and another file that
847 >>> # will be kept.
848 >>> automatic_discard_rule_example( # doctest: +ELLIPSIS
849 ... "usr/lib/libfoo.la",
850 ... ("usr/lib/libfoo.so.1.0.0", False),
851 ... )
852 AutomaticDiscardRuleExample(...)
854 Keep in mind that you have to explicitly include directories that are relevant for the test
855 if you want them shown. Also, if a directory is excluded, all path beneath it will be
856 automatically excluded in the example as well. Your example data must account for that.
858 >>> # Possible example for python cache file discard rule
859 >>> # In this example, we explicitly list the __pycache__ directory itself because we
860 >>> # want it shown in the output (otherwise, we could have omitted it)
861 >>> automatic_discard_rule_example( # doctest: +ELLIPSIS
862 ... (".../foo.py", False),
863 ... ".../__pycache__/",
864 ... ".../__pycache__/...",
865 ... ".../foo.pyc",
866 ... ".../foo.pyo",
867 ... )
868 AutomaticDiscardRuleExample(...)
870 Note: Even if `__pycache__` had been implicit, the result would have been the same. However,
871 the rendered example would not have shown the directory on its own. The use of `...` as
872 path names is useful for denoting "anywhere" or "anything". Though, there is nothing "magic"
873 about this name - it happens to be allowed as a path name (unlike `.` or `..`).
875 These examples can be seen via `debputy plugin show automatic-discard-rules <name-here>`.
877 :param content: The content of the example. Each element can be either a path definition or
878 a tuple of a path definition followed by a verdict (boolean). Each provided path definition
879 describes the paths to be presented in the example. Implicit paths such as parent
880 directories will be created but not shown in the example. Therefore, if a directory is
881 relevant to the example, be sure to explicitly list it.
883 The verdict associated with a path determines whether the path should be discarded (when
884 True) or kept (when False). When a path is not explicitly associated with a verdict, the
885 verdict is assumed to be discarded (True).
886 :param example_description: An optional description displayed together with the example.
887 :return: An opaque data structure containing the example.
888 """
889 example = []
890 for d in content:
891 if not isinstance(d, tuple):
892 pd = d
893 verdict = True
894 else:
895 pd, verdict = d
897 path_def = as_path_def(pd)
898 example.append((path_def, verdict))
900 if not example: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true
901 raise ValueError("At least one path must be given for an example")
903 return AutomaticDiscardRuleExample(
904 tuple(example),
905 description=example_description,
906 )
909@dataclasses.dataclass(slots=True, frozen=True)
910class PluginProvidedPackageProcessor:
911 processor_id: str
912 applies_to_package_types: FrozenSet[str]
913 package_processor: PackageProcessor
914 dependencies: FrozenSet[Tuple[str, str]]
915 plugin_metadata: DebputyPluginMetadata
917 def applies_to(self, binary_package: BinaryPackage) -> bool:
918 return binary_package.package_type in self.applies_to_package_types
920 @property
921 def dependency_id(self) -> Tuple[str, str]:
922 return self.plugin_metadata.plugin_name, self.processor_id
924 def run_package_processor(
925 self,
926 fs_root: "VirtualPath",
927 unused: None,
928 context: "PackageProcessingContext",
929 ) -> None:
930 self.package_processor(fs_root, unused, context)
933@dataclasses.dataclass(slots=True, frozen=True)
934class PluginProvidedDiscardRule:
935 name: str
936 plugin_metadata: DebputyPluginMetadata
937 discard_check: Callable[[VirtualPath], bool]
938 reference_documentation: Optional[str]
939 examples: Sequence[AutomaticDiscardRuleExample] = tuple()
941 def should_discard(self, path: VirtualPath) -> bool:
942 return self.discard_check(path)
945@dataclasses.dataclass(slots=True, frozen=True)
946class ServiceManagerDetails:
947 service_manager: str
948 service_detector: "ServiceDetector"
949 service_integrator: "ServiceIntegrator"
950 plugin_metadata: DebputyPluginMetadata
953ReferenceValue = TypedDict(
954 "ReferenceValue",
955 {
956 "description": str,
957 },
958)
961def _reference_data_value(
962 *,
963 description: str,
964) -> ReferenceValue:
965 return {
966 "description": description,
967 }
970KnownPackagingFileCategories = Literal[
971 "generated",
972 "generic-template",
973 "ppf-file",
974 "ppf-control-file",
975 "maint-config",
976 "pkg-metadata",
977 "pkg-helper-config",
978 "testing",
979 "lint-config",
980]
981KNOWN_PACKAGING_FILE_CATEGORY_DESCRIPTIONS: Mapping[
982 KnownPackagingFileCategories, ReferenceValue
983] = {
984 "generated": _reference_data_value(
985 description="The file is (likely) generated from another file"
986 ),
987 "generic-template": _reference_data_value(
988 description="The file is (likely) a generic template that generates a known packaging file. While the"
989 " file is annotated as if it was the target file, the file might uses a custom template"
990 " language inside it."
991 ),
992 "ppf-file": _reference_data_value(
993 description="Packager provided file to be installed on the file system - usually as-is."
994 " When `install-pattern` or `install-path` are provided, this is where the file is installed."
995 ),
996 "ppf-control-file": _reference_data_value(
997 description="Packager provided file that becomes a control file - possible after processing. "
998 " If `install-pattern` or `install-path` are provided, they denote where the is placed"
999 " (generally, this will be of the form `DEBIAN/<name>`)"
1000 ),
1001 "maint-config": _reference_data_value(
1002 description="Maintenance configuration for a specific tool that the maintainer uses (tool / style preferences)"
1003 ),
1004 "pkg-metadata": _reference_data_value(
1005 description="The file is related to standard package metadata (usually documented in Debian Policy)"
1006 ),
1007 "pkg-helper-config": _reference_data_value(
1008 description="The file is packaging helper configuration or instruction file"
1009 ),
1010 "testing": _reference_data_value(
1011 description="The file is related to automated testing (autopkgtests, salsa/gitlab CI)."
1012 ),
1013 "lint-config": _reference_data_value(
1014 description="The file is related to a linter (such as overrides for false-positives or style preferences)"
1015 ),
1016}
1018KnownPackagingConfigFeature = Literal[
1019 "dh-filearray",
1020 "dh-filedoublearray",
1021 "dh-hash-subst",
1022 "dh-dollar-subst",
1023 "dh-glob",
1024 "dh-partial-glob",
1025 "dh-late-glob",
1026 "dh-glob-after-execute",
1027 "dh-executable-config",
1028 "dh-custom-format",
1029 "dh-file-list",
1030 "dh-install-list",
1031 "dh-install-list-dest-dir-like-dh_install",
1032 "dh-install-list-fixed-dest-dir",
1033 "dh-fixed-dest-dir",
1034 "dh-exec-rename",
1035 "dh-docs-only",
1036]
1038KNOWN_PACKAGING_FILE_CONFIG_FEATURE_DESCRIPTION: Mapping[
1039 KnownPackagingConfigFeature, ReferenceValue
1040] = {
1041 "dh-filearray": _reference_data_value(
1042 description="The file will be read as a list of space/newline separated tokens",
1043 ),
1044 "dh-filedoublearray": _reference_data_value(
1045 description="Each line in the file will be read as a list of space-separated tokens",
1046 ),
1047 "dh-hash-subst": _reference_data_value(
1048 description="Supports debhelper #PACKAGE# style substitutions (udebs often excluded)",
1049 ),
1050 "dh-dollar-subst": _reference_data_value(
1051 description="Supports debhelper ${PACKAGE} style substitutions (usually requires compat 13+)",
1052 ),
1053 "dh-glob": _reference_data_value(
1054 description="Supports standard debhelper globing",
1055 ),
1056 "dh-partial-glob": _reference_data_value(
1057 description="Supports standard debhelper globing but only to a subset of the values (implies dh-late-glob)",
1058 ),
1059 "dh-late-glob": _reference_data_value(
1060 description="Globbing is done separately instead of using the built-in function",
1061 ),
1062 "dh-glob-after-execute": _reference_data_value(
1063 description="When the dh config file is executable, the generated output will be subject to globbing",
1064 ),
1065 "dh-executable-config": _reference_data_value(
1066 description="If marked executable, debhelper will execute the file and read its output",
1067 ),
1068 "dh-custom-format": _reference_data_value(
1069 description="The dh tool will or may have a custom parser for this file",
1070 ),
1071 "dh-file-list": _reference_data_value(
1072 description="The dh file contains a list of paths to be processed",
1073 ),
1074 "dh-install-list": _reference_data_value(
1075 description="The dh file contains a list of paths/globs to be installed but the tool specific knowledge"
1076 " required to understand the file cannot be conveyed via this interface.",
1077 ),
1078 "dh-install-list-dest-dir-like-dh_install": _reference_data_value(
1079 description="The dh file is processed similar to dh_install (notably dest-dir handling derived"
1080 " from the path or the last token on the line)",
1081 ),
1082 "dh-install-list-fixed-dest-dir": _reference_data_value(
1083 description="The dh file is an install list and the dest-dir is always the same for all patterns"
1084 " (when `install-pattern` or `install-path` are provided, they identify the directory - not the file location)",
1085 ),
1086 "dh-exec-rename": _reference_data_value(
1087 description="When `dh-exec` is the interpreter of this dh config file, its renaming (=>) feature can be"
1088 " requested/used",
1089 ),
1090 "dh-docs-only": _reference_data_value(
1091 description="The dh config file is used for documentation only. Implicit <!nodocs> Build-Profiles support",
1092 ),
1093}
1095CONFIG_FEATURE_ALIASES: Dict[
1096 KnownPackagingConfigFeature, List[Tuple[KnownPackagingConfigFeature, int]]
1097] = {
1098 "dh-filearray": [
1099 ("dh-filearray", 0),
1100 ("dh-executable-config", 9),
1101 ("dh-dollar-subst", 13),
1102 ],
1103 "dh-filedoublearray": [
1104 ("dh-filedoublearray", 0),
1105 ("dh-executable-config", 9),
1106 ("dh-dollar-subst", 13),
1107 ],
1108}
1111def _implies(
1112 features: List[KnownPackagingConfigFeature],
1113 seen: Set[KnownPackagingConfigFeature],
1114 implying: Sequence[KnownPackagingConfigFeature],
1115 implied: KnownPackagingConfigFeature,
1116) -> None:
1117 if implied in seen:
1118 return
1119 if all(f in seen for f in implying):
1120 seen.add(implied)
1121 features.append(implied)
1124def expand_known_packaging_config_features(
1125 compat_level: int,
1126 features: List[KnownPackagingConfigFeature],
1127) -> List[KnownPackagingConfigFeature]:
1128 final_features: List[KnownPackagingConfigFeature] = []
1129 seen = set()
1130 for feature in features:
1131 expanded = CONFIG_FEATURE_ALIASES.get(feature)
1132 if not expanded:
1133 expanded = [(feature, 0)]
1134 for v, c in expanded:
1135 if compat_level < c or v in seen:
1136 continue
1137 seen.add(v)
1138 final_features.append(v)
1139 if "dh-glob" in seen and "dh-late-glob" in seen:
1140 final_features.remove("dh-glob")
1142 _implies(final_features, seen, ["dh-partial-glob"], "dh-late-glob")
1143 _implies(
1144 final_features,
1145 seen,
1146 ["dh-late-glob", "dh-executable-config"],
1147 "dh-glob-after-execute",
1148 )
1149 return sorted(final_features)
1152class DHCompatibilityBasedRule(DebputyParsedContent):
1153 install_pattern: NotRequired[str]
1154 add_config_features: NotRequired[List[KnownPackagingConfigFeature]]
1155 starting_with_compat_level: NotRequired[int]
1158class KnownPackagingFileInfo(DebputyParsedContent):
1159 # Exposed directly in the JSON plugin parsing; be careful with changes
1160 path: NotRequired[str]
1161 pkgfile: NotRequired[str]
1162 detection_method: NotRequired[Literal["path", "dh.pkgfile"]]
1163 file_categories: NotRequired[List[KnownPackagingFileCategories]]
1164 documentation_uris: NotRequired[List[str]]
1165 debputy_cmd_templates: NotRequired[List[List[str]]]
1166 debhelper_commands: NotRequired[List[str]]
1167 config_features: NotRequired[List[KnownPackagingConfigFeature]]
1168 install_pattern: NotRequired[str]
1169 dh_compat_rules: NotRequired[List[DHCompatibilityBasedRule]]
1170 default_priority: NotRequired[int]
1171 post_formatting_rewrite: NotRequired[Literal["period-to-underscore"]]
1172 packageless_is_fallback_for_all_packages: NotRequired[bool]
1173 has_active_command: NotRequired[bool]
1176@dataclasses.dataclass(slots=True)
1177class PluginProvidedKnownPackagingFile:
1178 info: KnownPackagingFileInfo
1179 detection_method: Literal["path", "dh.pkgfile"]
1180 detection_value: str
1181 plugin_metadata: DebputyPluginMetadata
1184class BuildSystemAutoDetector(Protocol):
1186 def __call__(self, source_root: VirtualPath, *args: Any, **kwargs: Any) -> bool: ... 1186 ↛ exitline 1186 didn't return from function '__call__' because
1189@dataclasses.dataclass(slots=True, frozen=True)
1190class PluginProvidedTypeMapping:
1191 mapped_type: TypeMapping[Any, Any]
1192 reference_documentation: Optional[TypeMappingDocumentation]
1193 plugin_metadata: DebputyPluginMetadata
1196@dataclasses.dataclass(slots=True, frozen=True)
1197class PluginProvidedBuildSystemAutoDetection(Generic[BSR]):
1198 manifest_keyword: str
1199 build_system_rule_type: Type[BSR]
1200 detector: BuildSystemAutoDetector
1201 constructor: Callable[
1202 ["BuildRuleParsedFormat", AttributePath, "HighLevelManifest"],
1203 BSR,
1204 ]
1205 auto_detection_shadow_build_systems: FrozenSet[str]
1206 plugin_metadata: DebputyPluginMetadata
1209class PackageDataTable:
1210 def __init__(self, package_data_table: Mapping[str, "BinaryPackageData"]) -> None:
1211 self._package_data_table = package_data_table
1212 # This is enabled for metadata-detectors. But it is deliberate not enabled for package processors,
1213 # because it is not clear how it should interact with dependencies. For metadata-detectors, things
1214 # read-only and there are no dependencies, so we cannot "get them wrong".
1215 self.enable_cross_package_checks = False
1217 def __iter__(self) -> Iterator["BinaryPackageData"]:
1218 return iter(self._package_data_table.values())
1220 def __getitem__(self, item: str) -> "BinaryPackageData":
1221 return self._package_data_table[item]
1223 def __contains__(self, item: str) -> bool:
1224 return item in self._package_data_table
1227class PackageProcessingContextProvider(PackageProcessingContext):
1228 __slots__ = (
1229 "_manifest",
1230 "_binary_package",
1231 "_related_udeb_package",
1232 "_package_data_table",
1233 "_cross_check_cache",
1234 )
1236 def __init__(
1237 self,
1238 manifest: "HighLevelManifest",
1239 binary_package: BinaryPackage,
1240 related_udeb_package: Optional[BinaryPackage],
1241 package_data_table: PackageDataTable,
1242 ) -> None:
1243 self._manifest = manifest
1244 self._binary_package = binary_package
1245 self._related_udeb_package = related_udeb_package
1246 self._package_data_table = ref(package_data_table)
1247 self._cross_check_cache: Optional[
1248 Sequence[Tuple[BinaryPackage, "VirtualPath"]]
1249 ] = None
1251 def _package_state_for(
1252 self,
1253 package: BinaryPackage,
1254 ) -> "PackageTransformationDefinition":
1255 return self._manifest.package_state_for(package.name)
1257 def _package_version_for(
1258 self,
1259 package: BinaryPackage,
1260 ) -> str:
1261 package_state = self._package_state_for(package)
1262 version = package_state.binary_version
1263 if version is not None:
1264 return version
1265 return self._manifest.source_version(
1266 include_binnmu_version=not package.is_arch_all
1267 )
1269 @property
1270 def binary_package(self) -> BinaryPackage:
1271 return self._binary_package
1273 @property
1274 def related_udeb_package(self) -> Optional[BinaryPackage]:
1275 return self._related_udeb_package
1277 @property
1278 def binary_package_version(self) -> str:
1279 return self._package_version_for(self._binary_package)
1281 @property
1282 def related_udeb_package_version(self) -> Optional[str]:
1283 udeb = self._related_udeb_package
1284 if udeb is None:
1285 return None
1286 return self._package_version_for(udeb)
1288 def accessible_package_roots(self) -> Iterable[Tuple[BinaryPackage, "VirtualPath"]]:
1289 package_table = self._package_data_table()
1290 if package_table is None:
1291 raise ReferenceError(
1292 "Internal error: package_table was garbage collected too early"
1293 )
1294 if not package_table.enable_cross_package_checks:
1295 raise PluginAPIViolationError(
1296 "Cross package content checks are not available at this time."
1297 )
1298 cache = self._cross_check_cache
1299 if cache is None:
1300 matches = []
1301 pkg = self.binary_package
1302 for pkg_data in package_table:
1303 if pkg_data.binary_package.name == pkg.name:
1304 continue
1305 res = package_cross_check_precheck(pkg, pkg_data.binary_package)
1306 if not res[0]:
1307 continue
1308 matches.append((pkg_data.binary_package, pkg_data.fs_root))
1309 cache = tuple(matches) if matches else tuple()
1310 self._cross_check_cache = cache
1311 return cache
1314@dataclasses.dataclass(slots=True, frozen=True)
1315class PluginProvidedTrigger:
1316 dpkg_trigger_type: DpkgTriggerType
1317 dpkg_trigger_target: str
1318 provider: DebputyPluginMetadata
1319 provider_source_id: str
1321 def serialized_format(self) -> str:
1322 return f"{self.dpkg_trigger_type} {self.dpkg_trigger_target}"