Coverage for src/debputy/plugin/api/impl_types.py: 79%
561 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import os.path
3from importlib.resources.abc import Traversable
4from pathlib import Path
5from typing import (
6 Optional,
7 Callable,
8 FrozenSet,
9 Dict,
10 List,
11 Tuple,
12 Generic,
13 TYPE_CHECKING,
14 TypeVar,
15 cast,
16 Any,
17 Sequence,
18 Union,
19 Type,
20 TypedDict,
21 Iterable,
22 Mapping,
23 NotRequired,
24 Literal,
25 Set,
26 Iterator,
27 Container,
28 Protocol,
29)
30from weakref import ref
32from debputy.exceptions import (
33 DebputyFSIsROError,
34 PluginAPIViolationError,
35 PluginConflictError,
36 UnhandledOrUnexpectedErrorFromPluginError,
37 PluginBaseError,
38 PluginInitializationError,
39)
40from debputy.filesystem_scan import as_path_def
41from debputy.lsp.diagnostics import LintSeverity
42from debputy.manifest_parser.exceptions import ManifestParseException
43from debputy.manifest_parser.tagging_types import DebputyParsedContent, TypeMapping
44from debputy.manifest_parser.util import AttributePath, check_integration_mode
45from debputy.packages import BinaryPackage
46from debputy.plugin.api import (
47 VirtualPath,
48 BinaryCtrlAccessor,
49 PackageProcessingContext,
50)
51from debputy.plugin.api.spec import (
52 DebputyPluginInitializer,
53 MetadataAutoDetector,
54 DpkgTriggerType,
55 ParserDocumentation,
56 PackageProcessor,
57 PathDef,
58 ParserAttributeDocumentation,
59 undocumented_attr,
60 documented_attr,
61 reference_documentation,
62 PackagerProvidedFileReferenceDocumentation,
63 TypeMappingDocumentation,
64 DebputyIntegrationMode,
65)
66from debputy.plugin.plugin_state import (
67 run_in_context_of_plugin,
68)
69from debputy.substitution import VariableContext
70from debputy.util import _normalize_path, package_cross_check_precheck
72if TYPE_CHECKING:
73 from debputy.plugin.api.spec import (
74 ServiceDetector,
75 ServiceIntegrator,
76 )
77 from debputy.manifest_parser.parser_data import ParserContextData
78 from debputy.highlevel_manifest import (
79 HighLevelManifest,
80 PackageTransformationDefinition,
81 BinaryPackageData,
82 )
83 from debputy.plugin.debputy.to_be_api_types import (
84 BuildRuleParsedFormat,
85 )
88TD = TypeVar("TD", bound="Union[DebputyParsedContent, List[DebputyParsedContent]]")
89PF = TypeVar("PF")
90SF = TypeVar("SF")
91TP = TypeVar("TP")
92TTP = Type[TP]
93BSR = TypeVar("BSR", bound="BuildSystemRule")
95DIPKWHandler = Callable[[str, AttributePath, "ParserContextData"], TP]
96DIPHandler = Callable[[str, PF, AttributePath, "ParserContextData"], TP]
99@dataclasses.dataclass(slots=True)
100class DebputyPluginMetadata:
101 plugin_name: str
102 api_compat_version: int
103 plugin_loader: Optional[Callable[[], Callable[["DebputyPluginInitializer"], None]]]
104 plugin_initializer: Optional[Callable[["DebputyPluginInitializer"], None]]
105 plugin_path: str
106 plugin_doc_path_resolver: Callable[[], Optional[Union[Traversable, Path]]] = (
107 lambda: None
108 )
109 _is_initialized: bool = False
110 _is_doc_path_resolved: bool = False
111 _plugin_doc_path: Optional[Union[Traversable, Path]] = None
113 @property
114 def is_bundled(self) -> bool:
115 return self.plugin_path == "<bundled>"
117 @property
118 def is_loaded(self) -> bool:
119 return self.plugin_initializer is not None
121 @property
122 def is_initialized(self) -> bool:
123 return self._is_initialized
125 @property
126 def plugin_doc_path(self) -> Optional[Union[Traversable, Path]]:
127 if not self._is_doc_path_resolved:
128 self._plugin_doc_path = self.plugin_doc_path_resolver()
129 self._is_doc_path_resolved = True
130 return self._plugin_doc_path
132 def initialize_plugin(self, api: "DebputyPluginInitializer") -> None:
133 if self.is_initialized: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 raise RuntimeError("Cannot load plugins twice")
135 if not self.is_loaded:
136 self.load_plugin()
137 plugin_initializer = self.plugin_initializer
138 assert plugin_initializer is not None
139 plugin_initializer(api)
140 self._is_initialized = True
142 def load_plugin(self) -> None:
143 plugin_loader = self.plugin_loader
144 assert plugin_loader is not None
145 try:
146 self.plugin_initializer = run_in_context_of_plugin(
147 self.plugin_name,
148 plugin_loader,
149 )
150 except PluginBaseError:
151 raise
152 except Exception as e:
153 raise PluginInitializationError(
154 f"Initialization of {self.plugin_name} failed due to its initializer raising an exception"
155 ) from e
156 assert self.plugin_initializer is not None
159@dataclasses.dataclass(slots=True, frozen=True)
160class PluginProvidedParser(Generic[PF, TP]):
161 parser: "DeclarativeInputParser[PF]"
162 handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP]
163 plugin_metadata: DebputyPluginMetadata
165 def parse(
166 self,
167 name: str,
168 value: object,
169 attribute_path: "AttributePath",
170 *,
171 parser_context: "ParserContextData",
172 ) -> TP:
173 parsed_value = self.parser.parse_input(
174 value,
175 attribute_path,
176 parser_context=parser_context,
177 )
178 return self.handler(name, parsed_value, attribute_path, parser_context)
181class PPFFormatParam(TypedDict):
182 priority: Optional[int]
183 name: str
184 owning_package: str
187@dataclasses.dataclass(slots=True, frozen=True)
188class PackagerProvidedFileClassSpec:
189 debputy_plugin_metadata: DebputyPluginMetadata
190 stem: str
191 installed_as_format: str
192 default_mode: int
193 default_priority: Optional[int]
194 allow_name_segment: bool
195 allow_architecture_segment: bool
196 post_formatting_rewrite: Optional[Callable[[str], str]]
197 packageless_is_fallback_for_all_packages: bool
198 reservation_only: bool
199 formatting_callback: Optional[Callable[[str, PPFFormatParam, VirtualPath], str]] = (
200 None
201 )
202 reference_documentation: Optional[PackagerProvidedFileReferenceDocumentation] = None
203 bug_950723: bool = False
204 has_active_command: bool = True
206 @property
207 def supports_priority(self) -> bool:
208 return self.default_priority is not None
210 def compute_dest(
211 self,
212 assigned_name: str,
213 # Note this method is currently used 1:1 inside plugin tests.
214 *,
215 owning_package: Optional[str] = None,
216 assigned_priority: Optional[int] = None,
217 path: Optional[VirtualPath] = None,
218 ) -> Tuple[str, str]:
219 if assigned_priority is not None and not self.supports_priority: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 raise ValueError(
221 f"Cannot assign priority to packager provided files with stem"
222 f' "{self.stem}" (e.g., "debian/foo.{self.stem}"). They'
223 " do not use priority at all."
224 )
226 path_format = self.installed_as_format
227 if self.supports_priority and assigned_priority is None:
228 assigned_priority = self.default_priority
230 if owning_package is None:
231 owning_package = assigned_name
233 params: PPFFormatParam = {
234 "priority": assigned_priority,
235 "name": assigned_name,
236 "owning_package": owning_package,
237 }
239 if self.formatting_callback is not None:
240 if path is None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise ValueError(
242 "The path parameter is required for PPFs with formatting_callback"
243 )
244 dest_path = self.formatting_callback(path_format, params, path)
245 else:
246 dest_path = path_format.format(**params)
248 dirname, basename = os.path.split(dest_path)
249 dirname = _normalize_path(dirname)
251 if self.post_formatting_rewrite:
252 basename = self.post_formatting_rewrite(basename)
253 return dirname, basename
256@dataclasses.dataclass(slots=True)
257class MetadataOrMaintscriptDetector:
258 plugin_metadata: DebputyPluginMetadata
259 detector_id: str
260 detector: MetadataAutoDetector
261 applies_to_package_types: FrozenSet[str]
262 enabled: bool = True
264 def applies_to(self, binary_package: BinaryPackage) -> bool:
265 return binary_package.package_type in self.applies_to_package_types
267 def run_detector(
268 self,
269 fs_root: "VirtualPath",
270 ctrl: "BinaryCtrlAccessor",
271 context: "PackageProcessingContext",
272 ) -> None:
273 try:
274 self.detector(fs_root, ctrl, context)
275 except DebputyFSIsROError as e: 275 ↛ 284line 275 didn't jump to line 284
276 nv = self.plugin_metadata.plugin_name
277 raise PluginAPIViolationError(
278 f'The plugin {nv} violated the API contract for "metadata detectors"'
279 " by attempting to mutate the provided file system in its metadata detector"
280 f" with id {self.detector_id}. File system mutation is *not* supported at"
281 " this stage (file system layout is committed and the attempted changes"
282 " would be lost)."
283 ) from e
284 except UnhandledOrUnexpectedErrorFromPluginError as e:
285 e.add_note(
286 f"The exception was raised by the detector with the ID: {self.detector_id}"
287 )
290class DeclarativeInputParser(Generic[TD]):
291 @property
292 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
293 return None
295 @property
296 def expected_debputy_integration_mode(
297 self,
298 ) -> Optional[Container[DebputyIntegrationMode]]:
299 return None
301 @property
302 def reference_documentation_url(self) -> Optional[str]:
303 doc = self.inline_reference_documentation
304 return doc.documentation_reference_url if doc is not None else None
306 def parse_input(
307 self,
308 value: object,
309 path: "AttributePath",
310 *,
311 parser_context: Optional["ParserContextData"] = None,
312 ) -> TD:
313 raise NotImplementedError
316class DelegatingDeclarativeInputParser(DeclarativeInputParser[TD]):
317 __slots__ = (
318 "delegate",
319 "_reference_documentation",
320 "_expected_debputy_integration_mode",
321 )
323 def __init__(
324 self,
325 delegate: DeclarativeInputParser[TD],
326 *,
327 inline_reference_documentation: Optional[ParserDocumentation] = None,
328 expected_debputy_integration_mode: Optional[
329 Container[DebputyIntegrationMode]
330 ] = None,
331 ) -> None:
332 self.delegate = delegate
333 self._reference_documentation = inline_reference_documentation
334 self._expected_debputy_integration_mode = expected_debputy_integration_mode
336 @property
337 def expected_debputy_integration_mode(
338 self,
339 ) -> Optional[Container[DebputyIntegrationMode]]:
340 return self._expected_debputy_integration_mode
342 @property
343 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
344 doc = self._reference_documentation
345 if doc is None:
346 return self.delegate.inline_reference_documentation
347 return doc
350class ListWrappedDeclarativeInputParser(DelegatingDeclarativeInputParser[TD]):
351 __slots__ = ()
353 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
354 doc_url = self.reference_documentation_url
355 if doc_url is not None: 355 ↛ 359line 355 didn't jump to line 359 because the condition on line 355 was always true
356 if see_url_version: 356 ↛ 358line 356 didn't jump to line 358 because the condition on line 356 was always true
357 return f" Please see {doc_url} for the documentation."
358 return f" (Documentation: {doc_url})"
359 return ""
361 def parse_input(
362 self,
363 value: object,
364 path: "AttributePath",
365 *,
366 parser_context: Optional["ParserContextData"] = None,
367 ) -> TD:
368 check_integration_mode(
369 path, parser_context, self._expected_debputy_integration_mode
370 )
371 if not isinstance(value, list):
372 doc_ref = self._doc_url_error_suffix(see_url_version=True)
373 raise ManifestParseException(
374 f"The attribute {path.path} must be a list.{doc_ref}"
375 )
376 result = []
377 delegate = self.delegate
378 for idx, element in enumerate(value):
379 element_path = path[idx]
380 result.append(
381 delegate.parse_input(
382 element,
383 element_path,
384 parser_context=parser_context,
385 )
386 )
387 return result
390class DispatchingParserBase(Generic[TP]):
391 def __init__(self, manifest_attribute_path_template: str) -> None:
392 self.manifest_attribute_path_template = manifest_attribute_path_template
393 self._parsers: Dict[str, PluginProvidedParser[Any, TP]] = {}
395 @property
396 def unknown_keys_diagnostic_severity(self) -> Optional[LintSeverity]:
397 return "error"
399 def is_known_keyword(self, keyword: str) -> bool:
400 return keyword in self._parsers
402 def registered_keywords(self) -> Iterable[str]:
403 yield from self._parsers
405 def parser_for(self, keyword: str) -> PluginProvidedParser[Any, TP]:
406 return self._parsers[keyword]
408 def register_keyword(
409 self,
410 keyword: Union[str, Sequence[str]],
411 handler: DIPKWHandler,
412 plugin_metadata: DebputyPluginMetadata,
413 *,
414 inline_reference_documentation: Optional[ParserDocumentation] = None,
415 ) -> None:
416 reference_documentation_url = None
417 if inline_reference_documentation:
418 if inline_reference_documentation.attribute_doc: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true
419 raise ValueError(
420 "Cannot provide per-attribute documentation for a value-less keyword!"
421 )
422 if inline_reference_documentation.alt_parser_description: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 raise ValueError(
424 "Cannot provide non-mapping-format documentation for a value-less keyword!"
425 )
426 reference_documentation_url = (
427 inline_reference_documentation.documentation_reference_url
428 )
429 parser = DeclarativeValuelessKeywordInputParser(
430 inline_reference_documentation,
431 documentation_reference=reference_documentation_url,
432 )
434 def _combined_handler(
435 name: str,
436 _ignored: Any,
437 attr_path: AttributePath,
438 context: "ParserContextData",
439 ) -> TP:
440 return handler(name, attr_path, context)
442 p = PluginProvidedParser(
443 parser,
444 _combined_handler,
445 plugin_metadata,
446 )
448 self._add_parser(keyword, p)
450 def register_parser(
451 self,
452 keyword: Union[str, List[str]],
453 parser: "DeclarativeInputParser[PF]",
454 handler: Callable[[str, PF, "AttributePath", "ParserContextData"], TP],
455 plugin_metadata: DebputyPluginMetadata,
456 ) -> None:
457 p = PluginProvidedParser(
458 parser,
459 handler,
460 plugin_metadata,
461 )
462 self._add_parser(keyword, p)
464 def _add_parser(
465 self,
466 keyword: Union[str, Iterable[str]],
467 ppp: "PluginProvidedParser[PF, TP]",
468 ) -> None:
469 ks = [keyword] if isinstance(keyword, str) else keyword
470 for k in ks:
471 existing_parser = self._parsers.get(k)
472 if existing_parser is not None: 472 ↛ 473line 472 didn't jump to line 473
473 message = (
474 f'The rule name "{k}" is already taken by the plugin'
475 f" {existing_parser.plugin_metadata.plugin_name}. This conflict was triggered"
476 f" when plugin {ppp.plugin_metadata.plugin_name} attempted to register its parser."
477 )
478 raise PluginConflictError(
479 message,
480 existing_parser.plugin_metadata,
481 ppp.plugin_metadata,
482 )
483 self._new_parser(k, ppp)
485 def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None:
486 self._parsers[keyword] = ppp
488 def parse_input(
489 self,
490 orig_value: object,
491 attribute_path: "AttributePath",
492 *,
493 parser_context: "ParserContextData",
494 ) -> TP:
495 raise NotImplementedError
498class DispatchingObjectParser(
499 DispatchingParserBase[Mapping[str, Any]],
500 DeclarativeInputParser[Mapping[str, Any]],
501):
502 def __init__(
503 self,
504 manifest_attribute_path_template: str,
505 *,
506 parser_documentation: Optional[ParserDocumentation] = None,
507 expected_debputy_integration_mode: Optional[
508 Container[DebputyIntegrationMode]
509 ] = None,
510 unknown_keys_diagnostic_severity: LintSeverity = "error",
511 ) -> None:
512 super().__init__(manifest_attribute_path_template)
513 self._attribute_documentation: List[ParserAttributeDocumentation] = []
514 if parser_documentation is None:
515 parser_documentation = reference_documentation()
516 self._parser_documentation = parser_documentation
517 self._expected_debputy_integration_mode = expected_debputy_integration_mode
518 self._unknown_keys_diagnostic_severity = unknown_keys_diagnostic_severity
520 @property
521 def unknown_keys_diagnostic_severity(self) -> Optional[LintSeverity]:
522 return self._unknown_keys_diagnostic_severity
524 @property
525 def expected_debputy_integration_mode(
526 self,
527 ) -> Optional[Container[DebputyIntegrationMode]]:
528 return self._expected_debputy_integration_mode
530 @property
531 def reference_documentation_url(self) -> Optional[str]:
532 return self._parser_documentation.documentation_reference_url
534 @property
535 def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
536 ref_doc = self._parser_documentation
537 return reference_documentation(
538 title=ref_doc.title,
539 description=ref_doc.description,
540 attributes=self._attribute_documentation,
541 reference_documentation_url=self.reference_documentation_url,
542 )
544 def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None:
545 super()._new_parser(keyword, ppp)
546 doc = ppp.parser.inline_reference_documentation
547 if doc is None or doc.description is None:
548 self._attribute_documentation.append(undocumented_attr(keyword))
549 else:
550 self._attribute_documentation.append(
551 documented_attr(keyword, doc.description)
552 )
554 def register_child_parser(
555 self,
556 keyword: str,
557 parser: "DispatchingObjectParser",
558 plugin_metadata: DebputyPluginMetadata,
559 *,
560 on_end_parse_step: Optional[
561 Callable[
562 [str, Optional[Mapping[str, Any]], AttributePath, "ParserContextData"],
563 None,
564 ]
565 ] = None,
566 nested_in_package_context: bool = False,
567 ) -> None:
568 def _handler(
569 name: str,
570 value: Mapping[str, Any],
571 path: AttributePath,
572 parser_context: "ParserContextData",
573 ) -> Mapping[str, Any]:
574 on_end_parse_step(name, value, path, parser_context)
575 return value
577 if nested_in_package_context:
578 parser = InPackageContextParser(
579 keyword,
580 parser,
581 )
583 p = PluginProvidedParser(
584 parser,
585 _handler,
586 plugin_metadata,
587 )
588 self._add_parser(keyword, p)
590 def parse_input(
591 self,
592 orig_value: object,
593 attribute_path: "AttributePath",
594 *,
595 parser_context: "ParserContextData",
596 ) -> TP:
597 check_integration_mode(
598 attribute_path,
599 parser_context,
600 self._expected_debputy_integration_mode,
601 )
602 doc_ref = ""
603 if self.reference_documentation_url is not None: 603 ↛ 607line 603 didn't jump to line 607 because the condition on line 603 was always true
604 doc_ref = (
605 f" Please see {self.reference_documentation_url} for the documentation."
606 )
607 if not isinstance(orig_value, dict):
608 raise ManifestParseException(
609 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
610 )
611 if not orig_value: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true
612 raise ManifestParseException(
613 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
614 )
615 result = {}
616 unknown_keys = orig_value.keys() - self._parsers.keys()
617 if unknown_keys: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true
618 first_key = next(iter(unknown_keys))
619 remaining_valid_attributes = self._parsers.keys() - orig_value.keys()
620 if not remaining_valid_attributes:
621 raise ManifestParseException(
622 f'The attribute "{first_key}" is not applicable at {attribute_path.path} (with the'
623 f" current set of plugins).{doc_ref}"
624 )
625 remaining_valid_attribute_names = ", ".join(remaining_valid_attributes)
626 raise ManifestParseException(
627 f'The attribute "{first_key}" is not applicable at {attribute_path.path} (with the current set'
628 " of plugins). Possible attributes available (and not already used) are:"
629 f" {remaining_valid_attribute_names}.{doc_ref}"
630 )
631 # Parse order is important for the root level (currently we use rule registration order)
632 for key, provided_parser in self._parsers.items():
633 value = orig_value.get(key)
634 if value is None:
635 if isinstance(provided_parser.parser, DispatchingObjectParser):
636 provided_parser.handler(
637 key,
638 {},
639 attribute_path[key],
640 parser_context,
641 )
642 continue
643 value_path = attribute_path[key]
644 if provided_parser is None: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true
645 valid_keys = ", ".join(sorted(self._parsers.keys()))
646 raise ManifestParseException(
647 f'Unknown or unsupported option "{key}" at {value_path.path}.'
648 " Valid options at this location are:"
649 f" {valid_keys}\n{doc_ref}"
650 )
651 parsed_value = provided_parser.parse(
652 key, value, value_path, parser_context=parser_context
653 )
654 result[key] = parsed_value
655 return result
658@dataclasses.dataclass(slots=True, frozen=True)
659class PackageContextData(Generic[TP]):
660 resolved_package_name: str
661 value: TP
664class InPackageContextParser(
665 DelegatingDeclarativeInputParser[Mapping[str, PackageContextData[TP]]]
666):
667 __slots__ = ()
669 def __init__(
670 self,
671 manifest_attribute_path_template: str,
672 delegate: DeclarativeInputParser[TP],
673 *,
674 parser_documentation: Optional[ParserDocumentation] = None,
675 ) -> None:
676 self.manifest_attribute_path_template = manifest_attribute_path_template
677 self._attribute_documentation: List[ParserAttributeDocumentation] = []
678 super().__init__(delegate, inline_reference_documentation=parser_documentation)
680 def parse_input(
681 self,
682 orig_value: object,
683 attribute_path: "AttributePath",
684 *,
685 parser_context: Optional["ParserContextData"] = None,
686 ) -> TP:
687 assert parser_context is not None
688 check_integration_mode(
689 attribute_path,
690 parser_context,
691 self._expected_debputy_integration_mode,
692 )
693 doc_ref = ""
694 if self.reference_documentation_url is not None: 694 ↛ 698line 694 didn't jump to line 698 because the condition on line 694 was always true
695 doc_ref = (
696 f" Please see {self.reference_documentation_url} for the documentation."
697 )
698 if not isinstance(orig_value, dict) or not orig_value: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 raise ManifestParseException(
700 f"The attribute {attribute_path.path_container_lc} must be a non-empty mapping.{doc_ref}"
701 )
702 delegate = self.delegate
703 result = {}
704 for package_name_raw, value in orig_value.items():
706 definition_source = attribute_path[package_name_raw]
707 package_name = package_name_raw
708 if "{{" in package_name:
709 package_name = parser_context.substitution.substitute(
710 package_name_raw,
711 definition_source.path,
712 )
713 package_state: PackageTransformationDefinition
714 with parser_context.binary_package_context(package_name) as package_state:
715 if package_state.is_auto_generated_package: 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was never true
716 # Maybe lift (part) of this restriction.
717 raise ManifestParseException(
718 f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
719 " auto-generated package."
720 )
721 parsed_value = delegate.parse_input(
722 value, definition_source, parser_context=parser_context
723 )
724 result[package_name_raw] = PackageContextData(
725 package_name, parsed_value
726 )
727 return result
730class DispatchingTableParser(
731 DispatchingParserBase[TP],
732 DeclarativeInputParser[TP],
733):
734 def __init__(self, base_type: TTP, manifest_attribute_path_template: str) -> None:
735 super().__init__(manifest_attribute_path_template)
736 self.base_type = base_type
738 def parse_input(
739 self,
740 orig_value: object,
741 attribute_path: "AttributePath",
742 *,
743 parser_context: "ParserContextData",
744 ) -> TP:
745 if isinstance(orig_value, str): 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true
746 key = orig_value
747 value = None
748 value_path = attribute_path
749 elif isinstance(orig_value, dict): 749 ↛ 760line 749 didn't jump to line 760 because the condition on line 749 was always true
750 if len(orig_value) != 1: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true
751 valid_keys = ", ".join(sorted(self._parsers.keys()))
752 raise ManifestParseException(
753 f'The mapping "{attribute_path.path}" had two keys, but it should only have one top level key.'
754 " Maybe you are missing a list marker behind the second key or some indentation. The"
755 f" possible keys are: {valid_keys}"
756 )
757 key, value = next(iter(orig_value.items()))
758 value_path = attribute_path[key]
759 else:
760 raise ManifestParseException(
761 f"The attribute {attribute_path.path} must be a string or a mapping."
762 )
763 provided_parser = self._parsers.get(key)
764 if provided_parser is None: 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true
765 valid_keys = ", ".join(sorted(self._parsers.keys()))
766 raise ManifestParseException(
767 f'Unknown or unsupported action "{key}" at {value_path.path}.'
768 " Valid actions at this location are:"
769 f" {valid_keys}"
770 )
771 return provided_parser.parse(
772 key, value, value_path, parser_context=parser_context
773 )
776@dataclasses.dataclass(slots=True)
777class DeclarativeValuelessKeywordInputParser(DeclarativeInputParser[None]):
778 inline_reference_documentation: Optional[ParserDocumentation] = None
779 documentation_reference: Optional[str] = None
781 def parse_input(
782 self,
783 value: object,
784 path: "AttributePath",
785 *,
786 parser_context: Optional["ParserContextData"] = None,
787 ) -> TD:
788 if value is None:
789 return cast("TD", value)
790 if self.documentation_reference is not None:
791 doc_ref = f" (Documentation: {self.documentation_reference})"
792 else:
793 doc_ref = ""
794 raise ManifestParseException(
795 f"Expected attribute {path.path} to be a string.{doc_ref}"
796 )
799@dataclasses.dataclass(slots=True)
800class PluginProvidedManifestVariable:
801 plugin_metadata: DebputyPluginMetadata
802 variable_name: str
803 variable_value: Optional[Union[str, Callable[[VariableContext], str]]]
804 is_context_specific_variable: bool
805 variable_reference_documentation: Optional[str] = None
806 is_documentation_placeholder: bool = False
807 is_for_special_case: bool = False
809 @property
810 def is_internal(self) -> bool:
811 return self.variable_name.startswith("_") or ":_" in self.variable_name
813 @property
814 def is_token(self) -> bool:
815 return self.variable_name.startswith("token:")
817 def resolve(self, variable_context: VariableContext) -> str:
818 value_resolver = self.variable_value
819 if isinstance(value_resolver, str):
820 res = value_resolver
821 else:
822 res = value_resolver(variable_context)
823 return res
826@dataclasses.dataclass(slots=True, frozen=True)
827class AutomaticDiscardRuleExample:
828 content: Sequence[Tuple[PathDef, bool]]
829 description: Optional[str] = None
832def automatic_discard_rule_example(
833 *content: Union[str, PathDef, Tuple[Union[str, PathDef], bool]],
834 example_description: Optional[str] = None,
835) -> AutomaticDiscardRuleExample:
836 """Provide an example for an automatic discard rule
838 The return value of this method should be passed to the `examples` parameter of
839 `automatic_discard_rule` method - either directly for a single example or as a
840 part of a sequence of examples.
842 >>> # Possible example for an exclude rule for ".la" files
843 >>> # Example shows two files; The ".la" file that will be removed and another file that
844 >>> # will be kept.
845 >>> automatic_discard_rule_example( # doctest: +ELLIPSIS
846 ... "usr/lib/libfoo.la",
847 ... ("usr/lib/libfoo.so.1.0.0", False),
848 ... )
849 AutomaticDiscardRuleExample(...)
851 Keep in mind that you have to explicitly include directories that are relevant for the test
852 if you want them shown. Also, if a directory is excluded, all path beneath it will be
853 automatically excluded in the example as well. Your example data must account for that.
855 >>> # Possible example for python cache file discard rule
856 >>> # In this example, we explicitly list the __pycache__ directory itself because we
857 >>> # want it shown in the output (otherwise, we could have omitted it)
858 >>> automatic_discard_rule_example( # doctest: +ELLIPSIS
859 ... (".../foo.py", False),
860 ... ".../__pycache__/",
861 ... ".../__pycache__/...",
862 ... ".../foo.pyc",
863 ... ".../foo.pyo",
864 ... )
865 AutomaticDiscardRuleExample(...)
867 Note: Even if `__pycache__` had been implicit, the result would have been the same. However,
868 the rendered example would not have shown the directory on its own. The use of `...` as
869 path names is useful for denoting "anywhere" or "anything". Though, there is nothing "magic"
870 about this name - it happens to be allowed as a path name (unlike `.` or `..`).
872 These examples can be seen via `debputy plugin show automatic-discard-rules <name-here>`.
874 :param content: The content of the example. Each element can be either a path definition or
875 a tuple of a path definition followed by a verdict (boolean). Each provided path definition
876 describes the paths to be presented in the example. Implicit paths such as parent
877 directories will be created but not shown in the example. Therefore, if a directory is
878 relevant to the example, be sure to explicitly list it.
880 The verdict associated with a path determines whether the path should be discarded (when
881 True) or kept (when False). When a path is not explicitly associated with a verdict, the
882 verdict is assumed to be discarded (True).
883 :param example_description: An optional description displayed together with the example.
884 :return: An opaque data structure containing the example.
885 """
886 example = []
887 for d in content:
888 if not isinstance(d, tuple):
889 pd = d
890 verdict = True
891 else:
892 pd, verdict = d
894 path_def = as_path_def(pd)
895 example.append((path_def, verdict))
897 if not example: 897 ↛ 898line 897 didn't jump to line 898 because the condition on line 897 was never true
898 raise ValueError("At least one path must be given for an example")
900 return AutomaticDiscardRuleExample(
901 tuple(example),
902 description=example_description,
903 )
906@dataclasses.dataclass(slots=True, frozen=True)
907class PluginProvidedPackageProcessor:
908 processor_id: str
909 applies_to_package_types: FrozenSet[str]
910 package_processor: PackageProcessor
911 dependencies: FrozenSet[Tuple[str, str]]
912 plugin_metadata: DebputyPluginMetadata
914 def applies_to(self, binary_package: BinaryPackage) -> bool:
915 return binary_package.package_type in self.applies_to_package_types
917 @property
918 def dependency_id(self) -> Tuple[str, str]:
919 return self.plugin_metadata.plugin_name, self.processor_id
921 def run_package_processor(
922 self,
923 fs_root: "VirtualPath",
924 unused: None,
925 context: "PackageProcessingContext",
926 ) -> None:
927 self.package_processor(fs_root, unused, context)
930@dataclasses.dataclass(slots=True, frozen=True)
931class PluginProvidedDiscardRule:
932 name: str
933 plugin_metadata: DebputyPluginMetadata
934 discard_check: Callable[[VirtualPath], bool]
935 reference_documentation: Optional[str]
936 examples: Sequence[AutomaticDiscardRuleExample] = tuple()
938 def should_discard(self, path: VirtualPath) -> bool:
939 return self.discard_check(path)
942@dataclasses.dataclass(slots=True, frozen=True)
943class ServiceManagerDetails:
944 service_manager: str
945 service_detector: "ServiceDetector"
946 service_integrator: "ServiceIntegrator"
947 plugin_metadata: DebputyPluginMetadata
950ReferenceValue = TypedDict(
951 "ReferenceValue",
952 {
953 "description": str,
954 },
955)
958def _reference_data_value(
959 *,
960 description: str,
961) -> ReferenceValue:
962 return {
963 "description": description,
964 }
967KnownPackagingFileCategories = Literal[
968 "generated",
969 "generic-template",
970 "ppf-file",
971 "ppf-control-file",
972 "maint-config",
973 "pkg-metadata",
974 "pkg-helper-config",
975 "testing",
976 "lint-config",
977]
978KNOWN_PACKAGING_FILE_CATEGORY_DESCRIPTIONS: Mapping[
979 KnownPackagingFileCategories, ReferenceValue
980] = {
981 "generated": _reference_data_value(
982 description="The file is (likely) generated from another file"
983 ),
984 "generic-template": _reference_data_value(
985 description="The file is (likely) a generic template that generates a known packaging file. While the"
986 " file is annotated as if it was the target file, the file might uses a custom template"
987 " language inside it."
988 ),
989 "ppf-file": _reference_data_value(
990 description="Packager provided file to be installed on the file system - usually as-is."
991 " When `install-pattern` or `install-path` are provided, this is where the file is installed."
992 ),
993 "ppf-control-file": _reference_data_value(
994 description="Packager provided file that becomes a control file - possible after processing. "
995 " If `install-pattern` or `install-path` are provided, they denote where the is placed"
996 " (generally, this will be of the form `DEBIAN/<name>`)"
997 ),
998 "maint-config": _reference_data_value(
999 description="Maintenance configuration for a specific tool that the maintainer uses (tool / style preferences)"
1000 ),
1001 "pkg-metadata": _reference_data_value(
1002 description="The file is related to standard package metadata (usually documented in Debian Policy)"
1003 ),
1004 "pkg-helper-config": _reference_data_value(
1005 description="The file is packaging helper configuration or instruction file"
1006 ),
1007 "testing": _reference_data_value(
1008 description="The file is related to automated testing (autopkgtests, salsa/gitlab CI)."
1009 ),
1010 "lint-config": _reference_data_value(
1011 description="The file is related to a linter (such as overrides for false-positives or style preferences)"
1012 ),
1013}
1015KnownPackagingConfigFeature = Literal[
1016 "dh-filearray",
1017 "dh-filedoublearray",
1018 "dh-hash-subst",
1019 "dh-dollar-subst",
1020 "dh-glob",
1021 "dh-partial-glob",
1022 "dh-late-glob",
1023 "dh-glob-after-execute",
1024 "dh-executable-config",
1025 "dh-custom-format",
1026 "dh-file-list",
1027 "dh-install-list",
1028 "dh-install-list-dest-dir-like-dh_install",
1029 "dh-install-list-fixed-dest-dir",
1030 "dh-fixed-dest-dir",
1031 "dh-exec-rename",
1032 "dh-docs-only",
1033]
1035KNOWN_PACKAGING_FILE_CONFIG_FEATURE_DESCRIPTION: Mapping[
1036 KnownPackagingConfigFeature, ReferenceValue
1037] = {
1038 "dh-filearray": _reference_data_value(
1039 description="The file will be read as a list of space/newline separated tokens",
1040 ),
1041 "dh-filedoublearray": _reference_data_value(
1042 description="Each line in the file will be read as a list of space-separated tokens",
1043 ),
1044 "dh-hash-subst": _reference_data_value(
1045 description="Supports debhelper #PACKAGE# style substitutions (udebs often excluded)",
1046 ),
1047 "dh-dollar-subst": _reference_data_value(
1048 description="Supports debhelper ${PACKAGE} style substitutions (usually requires compat 13+)",
1049 ),
1050 "dh-glob": _reference_data_value(
1051 description="Supports standard debhelper globing",
1052 ),
1053 "dh-partial-glob": _reference_data_value(
1054 description="Supports standard debhelper globing but only to a subset of the values (implies dh-late-glob)",
1055 ),
1056 "dh-late-glob": _reference_data_value(
1057 description="Globbing is done separately instead of using the built-in function",
1058 ),
1059 "dh-glob-after-execute": _reference_data_value(
1060 description="When the dh config file is executable, the generated output will be subject to globbing",
1061 ),
1062 "dh-executable-config": _reference_data_value(
1063 description="If marked executable, debhelper will execute the file and read its output",
1064 ),
1065 "dh-custom-format": _reference_data_value(
1066 description="The dh tool will or may have a custom parser for this file",
1067 ),
1068 "dh-file-list": _reference_data_value(
1069 description="The dh file contains a list of paths to be processed",
1070 ),
1071 "dh-install-list": _reference_data_value(
1072 description="The dh file contains a list of paths/globs to be installed but the tool specific knowledge"
1073 " required to understand the file cannot be conveyed via this interface.",
1074 ),
1075 "dh-install-list-dest-dir-like-dh_install": _reference_data_value(
1076 description="The dh file is processed similar to dh_install (notably dest-dir handling derived"
1077 " from the path or the last token on the line)",
1078 ),
1079 "dh-install-list-fixed-dest-dir": _reference_data_value(
1080 description="The dh file is an install list and the dest-dir is always the same for all patterns"
1081 " (when `install-pattern` or `install-path` are provided, they identify the directory - not the file location)",
1082 ),
1083 "dh-exec-rename": _reference_data_value(
1084 description="When `dh-exec` is the interpreter of this dh config file, its renaming (=>) feature can be"
1085 " requested/used",
1086 ),
1087 "dh-docs-only": _reference_data_value(
1088 description="The dh config file is used for documentation only. Implicit <!nodocs> Build-Profiles support",
1089 ),
1090}
1092CONFIG_FEATURE_ALIASES: Dict[
1093 KnownPackagingConfigFeature, List[Tuple[KnownPackagingConfigFeature, int]]
1094] = {
1095 "dh-filearray": [
1096 ("dh-filearray", 0),
1097 ("dh-executable-config", 9),
1098 ("dh-dollar-subst", 13),
1099 ],
1100 "dh-filedoublearray": [
1101 ("dh-filedoublearray", 0),
1102 ("dh-executable-config", 9),
1103 ("dh-dollar-subst", 13),
1104 ],
1105}
1108def _implies(
1109 features: List[KnownPackagingConfigFeature],
1110 seen: Set[KnownPackagingConfigFeature],
1111 implying: Sequence[KnownPackagingConfigFeature],
1112 implied: KnownPackagingConfigFeature,
1113) -> None:
1114 if implied in seen:
1115 return
1116 if all(f in seen for f in implying):
1117 seen.add(implied)
1118 features.append(implied)
1121def expand_known_packaging_config_features(
1122 compat_level: int,
1123 features: List[KnownPackagingConfigFeature],
1124) -> List[KnownPackagingConfigFeature]:
1125 final_features: List[KnownPackagingConfigFeature] = []
1126 seen = set()
1127 for feature in features:
1128 expanded = CONFIG_FEATURE_ALIASES.get(feature)
1129 if not expanded:
1130 expanded = [(feature, 0)]
1131 for v, c in expanded:
1132 if compat_level < c or v in seen:
1133 continue
1134 seen.add(v)
1135 final_features.append(v)
1136 if "dh-glob" in seen and "dh-late-glob" in seen:
1137 final_features.remove("dh-glob")
1139 _implies(final_features, seen, ["dh-partial-glob"], "dh-late-glob")
1140 _implies(
1141 final_features,
1142 seen,
1143 ["dh-late-glob", "dh-executable-config"],
1144 "dh-glob-after-execute",
1145 )
1146 return sorted(final_features)
1149class InstallPatternDHCompatRule(DebputyParsedContent):
1150 install_pattern: NotRequired[str]
1151 add_config_features: NotRequired[List[KnownPackagingConfigFeature]]
1152 starting_with_compat_level: NotRequired[int]
1155class KnownPackagingFileInfo(DebputyParsedContent):
1156 # Exposed directly in the JSON plugin parsing; be careful with changes
1157 path: NotRequired[str]
1158 pkgfile: NotRequired[str]
1159 detection_method: NotRequired[Literal["path", "dh.pkgfile"]]
1160 file_categories: NotRequired[List[KnownPackagingFileCategories]]
1161 documentation_uris: NotRequired[List[str]]
1162 debputy_cmd_templates: NotRequired[List[List[str]]]
1163 debhelper_commands: NotRequired[List[str]]
1164 config_features: NotRequired[List[KnownPackagingConfigFeature]]
1165 install_pattern: NotRequired[str]
1166 dh_compat_rules: NotRequired[List[InstallPatternDHCompatRule]]
1167 default_priority: NotRequired[int]
1168 post_formatting_rewrite: NotRequired[Literal["period-to-underscore"]]
1169 packageless_is_fallback_for_all_packages: NotRequired[bool]
1170 has_active_command: NotRequired[bool]
1173@dataclasses.dataclass(slots=True)
1174class PluginProvidedKnownPackagingFile:
1175 info: KnownPackagingFileInfo
1176 detection_method: Literal["path", "dh.pkgfile"]
1177 detection_value: str
1178 plugin_metadata: DebputyPluginMetadata
1181class BuildSystemAutoDetector(Protocol):
1183 def __call__(self, source_root: VirtualPath, *args: Any, **kwargs: Any) -> bool: ... 1183 ↛ exitline 1183 didn't jump to line 1183 because
1186@dataclasses.dataclass(slots=True, frozen=True)
1187class PluginProvidedTypeMapping:
1188 mapped_type: TypeMapping[Any, Any]
1189 reference_documentation: Optional[TypeMappingDocumentation]
1190 plugin_metadata: DebputyPluginMetadata
1193@dataclasses.dataclass(slots=True, frozen=True)
1194class PluginProvidedBuildSystemAutoDetection(Generic[BSR]):
1195 manifest_keyword: str
1196 build_system_rule_type: Type[BSR]
1197 detector: BuildSystemAutoDetector
1198 constructor: Callable[
1199 ["BuildRuleParsedFormat", AttributePath, "HighLevelManifest"],
1200 BSR,
1201 ]
1202 auto_detection_shadow_build_systems: FrozenSet[str]
1203 plugin_metadata: DebputyPluginMetadata
1206class PackageDataTable:
1207 def __init__(self, package_data_table: Mapping[str, "BinaryPackageData"]) -> None:
1208 self._package_data_table = package_data_table
1209 # This is enabled for metadata-detectors. But it is deliberate not enabled for package processors,
1210 # because it is not clear how it should interact with dependencies. For metadata-detectors, things
1211 # read-only and there are no dependencies, so we cannot "get them wrong".
1212 self.enable_cross_package_checks = False
1214 def __iter__(self) -> Iterator["BinaryPackageData"]:
1215 return iter(self._package_data_table.values())
1217 def __getitem__(self, item: str) -> "BinaryPackageData":
1218 return self._package_data_table[item]
1220 def __contains__(self, item: str) -> bool:
1221 return item in self._package_data_table
1224class PackageProcessingContextProvider(PackageProcessingContext):
1225 __slots__ = (
1226 "_manifest",
1227 "_binary_package",
1228 "_related_udeb_package",
1229 "_package_data_table",
1230 "_cross_check_cache",
1231 )
1233 def __init__(
1234 self,
1235 manifest: "HighLevelManifest",
1236 binary_package: BinaryPackage,
1237 related_udeb_package: Optional[BinaryPackage],
1238 package_data_table: PackageDataTable,
1239 ) -> None:
1240 self._manifest = manifest
1241 self._binary_package = binary_package
1242 self._related_udeb_package = related_udeb_package
1243 self._package_data_table = ref(package_data_table)
1244 self._cross_check_cache: Optional[
1245 Sequence[Tuple[BinaryPackage, "VirtualPath"]]
1246 ] = None
1248 def _package_state_for(
1249 self,
1250 package: BinaryPackage,
1251 ) -> "PackageTransformationDefinition":
1252 return self._manifest.package_state_for(package.name)
1254 def _package_version_for(
1255 self,
1256 package: BinaryPackage,
1257 ) -> str:
1258 package_state = self._package_state_for(package)
1259 version = package_state.binary_version
1260 if version is not None:
1261 return version
1262 return self._manifest.source_version(
1263 include_binnmu_version=not package.is_arch_all
1264 )
1266 @property
1267 def binary_package(self) -> BinaryPackage:
1268 return self._binary_package
1270 @property
1271 def related_udeb_package(self) -> Optional[BinaryPackage]:
1272 return self._related_udeb_package
1274 @property
1275 def binary_package_version(self) -> str:
1276 return self._package_version_for(self._binary_package)
1278 @property
1279 def related_udeb_package_version(self) -> Optional[str]:
1280 udeb = self._related_udeb_package
1281 if udeb is None:
1282 return None
1283 return self._package_version_for(udeb)
1285 def accessible_package_roots(self) -> Iterable[Tuple[BinaryPackage, "VirtualPath"]]:
1286 package_table = self._package_data_table()
1287 if package_table is None:
1288 raise ReferenceError(
1289 "Internal error: package_table was garbage collected too early"
1290 )
1291 if not package_table.enable_cross_package_checks:
1292 raise PluginAPIViolationError(
1293 "Cross package content checks are not available at this time."
1294 )
1295 cache = self._cross_check_cache
1296 if cache is None:
1297 matches = []
1298 pkg = self.binary_package
1299 for pkg_data in package_table:
1300 if pkg_data.binary_package.name == pkg.name:
1301 continue
1302 res = package_cross_check_precheck(pkg, pkg_data.binary_package)
1303 if not res[0]:
1304 continue
1305 matches.append((pkg_data.binary_package, pkg_data.fs_root))
1306 cache = tuple(matches) if matches else tuple()
1307 self._cross_check_cache = cache
1308 return cache
1311@dataclasses.dataclass(slots=True, frozen=True)
1312class PluginProvidedTrigger:
1313 dpkg_trigger_type: DpkgTriggerType
1314 dpkg_trigger_target: str
1315 provider: DebputyPluginMetadata
1316 provider_source_id: str
1318 def serialized_format(self) -> str:
1319 return f"{self.dpkg_trigger_type} {self.dpkg_trigger_target}"