Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%
307 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-06 19:25 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-06 19:25 +0000
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable
6from importlib.resources.abc import Traversable
7from io import BytesIO
8from pathlib import Path
9from typing import (
10 cast,
11 TYPE_CHECKING,
12)
14from debian.deb822 import Deb822
15from debian.debian_support import DpkgArchTable
16from debian.substvars import Substvars
18from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
19from debputy.filesystem_scan import OSFSROOverlay, InMemoryVirtualRootDir
20from debputy.packages import BinaryPackage, SourcePackage
21from debputy.plugin.api import (
22 PluginInitializationEntryPoint,
23 VirtualPath,
24 PackageProcessingContext,
25 DpkgTriggerType,
26 Maintscript,
27)
28from debputy.plugin.api.example_processing import process_discard_rule_example
29from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
30from debputy.plugin.api.impl import (
31 plugin_metadata_for_debputys_own_plugin,
32 DebputyPluginInitializerProvider,
33 parse_json_plugin_desc,
34 MaintscriptAccessorProviderBase,
35 BinaryCtrlAccessorProviderBase,
36 PLUGIN_TEST_SUFFIX,
37 find_json_plugin,
38 ServiceDefinitionImpl,
39)
40from debputy.plugin.api.impl_types import (
41 PackagerProvidedFileClassSpec,
42 DebputyPluginMetadata,
43 PluginProvidedTrigger,
44 ServiceManagerDetails,
45)
46from debputy.plugin.api.spec import (
47 MaintscriptAccessor,
48 FlushableSubstvars,
49 ServiceRegistry,
50 DSD,
51 ServiceUpgradeRule,
52)
53from debputy.plugin.api.test_api.test_spec import (
54 InitializedPluginUnderTest,
55 RegisteredPackagerProvidedFile,
56 RegisteredTrigger,
57 RegisteredMaintscript,
58 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
59 ADRExampleIssue,
60 DetectedService,
61 RegisteredMetadata,
62)
63from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features
64from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
65from debputy.util import package_cross_check_precheck
66from debputy.version import DEBPUTY_PLUGIN_ROOT_DIR
68RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
71type ManifestConfigurationImplementation[T] = Callable[
72 [SourcePackage | BinaryPackage, type[T]], T
73]
76@dataclasses.dataclass(frozen=True, slots=True)
77class PackageProcessingContextTestProvider(PackageProcessingContext):
78 source_package: SourcePackage
79 binary_package: BinaryPackage
80 binary_package_version: str
81 related_udeb_package: BinaryPackage | None
82 related_udeb_package_version: str | None
83 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]]
84 manifest_configuration: ManifestConfigurationImplementation
86 # TODO: implement (when needed)
87 # dpkg_arch_query_table
88 # deb_options_and_profiles (pull from binary ?)
89 # source_condition_context
90 # condition_context
93def _initialize_plugin_under_test(
94 plugin_metadata: DebputyPluginMetadata,
95 load_debputy_plugin: bool = True,
96) -> "InitializedPluginUnderTest":
97 feature_set = PluginProvidedFeatureSet()
98 substitution = SubstitutionImpl(
99 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
100 variable_context=VariableContext(
101 OSFSROOverlay.create_root_dir("debian", "debian"),
102 ),
103 plugin_feature_set=feature_set,
104 )
106 if load_debputy_plugin:
107 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
108 initialize_debputy_features
109 )
110 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
111 debputy_provider = DebputyPluginInitializerProvider(
112 debputy_plugin_metadata,
113 feature_set,
114 substitution,
115 )
116 debputy_provider.load_plugin()
118 plugin_under_test_provider = DebputyPluginInitializerProvider(
119 plugin_metadata,
120 feature_set,
121 substitution,
122 )
123 plugin_under_test_provider.load_plugin()
125 return InitializedPluginUnderTestImpl(
126 plugin_metadata.plugin_name,
127 feature_set,
128 substitution,
129 )
132def _auto_load_plugin_from_filename(
133 py_test_filename: str,
134) -> "InitializedPluginUnderTest":
135 dirname, basename = os.path.split(py_test_filename)
136 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
138 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
139 if test_location == "uninstalled":
140 json_basename = f"{plugin_name}.json"
141 json_desc_file = os.path.join(dirname, json_basename)
142 if "/" not in json_desc_file: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 json_desc_file = f"./{json_desc_file}"
145 if os.path.isfile(json_desc_file): 145 ↛ 148line 145 didn't jump to line 148 because the condition on line 145 was always true
146 return _initialize_plugin_from_desc(json_desc_file)
148 json_desc_file_in = f"{json_desc_file}.in"
149 if os.path.isfile(json_desc_file_in):
150 return _initialize_plugin_from_desc(json_desc_file)
151 raise FileNotFoundError(
152 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
153 f" {json_desc_file} or {json_desc_file_in}"
154 )
156 if test_location == "installed": 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true
157 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
158 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
160 raise ValueError(
161 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
162 ' unset OR one of "installed", "uninstalled".'
163 )
166def initialize_plugin_under_test(
167 *,
168 plugin_desc_file: str | None = None,
169) -> "InitializedPluginUnderTest":
170 """Load and initialize a plugin for testing it
172 This method will load the plugin via plugin description, which is the method that `debputy` does at
173 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
174 of the flow).
176 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
177 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
178 This works for "single-file" plugins, where the description file and the test are right next to
179 each other.
180 :return: The loaded plugin for testing
181 """
182 if plugin_desc_file is None:
183 caller_file = inspect.stack()[1].filename
184 return _auto_load_plugin_from_filename(caller_file)
185 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 raise RuntimeError(
187 "Running the test against an installed plugin does not work when"
188 " plugin_desc_file is provided. Please skip this test. You can "
189 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
190 " conditional for this purpose."
191 )
192 return _initialize_plugin_from_desc(plugin_desc_file)
195def _initialize_plugin_from_desc(
196 desc_file: str,
197) -> "InitializedPluginUnderTest":
198 if not desc_file.endswith((".json", ".json.in")): 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise ValueError("The plugin file must end with .json or .json.in")
201 plugin_metadata = parse_json_plugin_desc(desc_file)
203 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
206def initialize_plugin_under_test_from_inline_json(
207 plugin_name: str,
208 json_content: str,
209) -> "InitializedPluginUnderTest":
210 with BytesIO(json_content.encode("utf-8")) as fd:
211 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
213 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
216def initialize_plugin_under_test_preloaded(
217 api_compat_version: int,
218 plugin_initializer: PluginInitializationEntryPoint,
219 /,
220 plugin_name: str = "plugin-under-test",
221 load_debputy_plugin: bool = True,
222 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None,
223) -> "InitializedPluginUnderTest":
224 """Internal API: Initialize a plugin for testing without loading it from a file
226 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
227 description file is correct. Notably, any feature provided via the JSON description file will
228 **NOT** be visible for the test.
230 This API is mostly useful for testing parts of debputy itself.
232 :param api_compat_version: The API version the plugin was written for. Use the same version as the
233 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
234 :param plugin_initializer: The entry point of the plugin
235 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
236 use a test name and version. However, you can explicitly set if you want the real name/version.
237 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
238 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
239 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
240 to enable debputy to use this method for self testing.
241 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The
242 default is to not load the documentation.
243 :return: The loaded plugin for testing
244 """
246 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 raise RuntimeError(
248 "Running the test against an installed plugin does not work when"
249 " the plugin is preload. Please skip this test. You can "
250 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
251 " conditional for this purpose."
252 )
254 plugin_metadata = DebputyPluginMetadata(
255 plugin_name=plugin_name,
256 api_compat_version=api_compat_version,
257 plugin_initializer=plugin_initializer,
258 plugin_loader=None,
259 plugin_path="<loaded-via-test>",
260 plugin_doc_path_resolver=plugin_doc_path_resolver,
261 )
263 return _initialize_plugin_under_test(
264 plugin_metadata,
265 load_debputy_plugin=load_debputy_plugin,
266 )
269class _MockArchTable:
270 @staticmethod
271 def matches_architecture(_a: str, _b: str) -> bool:
272 return True
275FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable())
276del _MockArchTable
279def package_metadata_context(
280 *,
281 host_arch: str = "amd64",
282 package_fields: dict[str, str] | None = None,
283 related_udeb_package_fields: dict[str, str] | None = None,
284 binary_package_version: str = "1.0-1",
285 related_udeb_package_version: str | None = None,
286 should_be_acted_on: bool = True,
287 related_udeb_fs_root: VirtualPath | None = None,
288 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(),
289 source_package_fields: dict[str, str] | None = None,
290 manifest_configuration: ManifestConfigurationImplementation = lambda x, y: None,
291) -> PackageProcessingContext:
292 process_table = DpkgArchitectureBuildProcessValuesTable(fake_host=host_arch)
293 f = {
294 "Package": "foo",
295 "Architecture": "any",
296 }
297 if package_fields is not None:
298 f.update(package_fields)
300 bin_package = BinaryPackage(
301 Deb822(f),
302 process_table,
303 FAKE_DPKG_QUERY_TABLE,
304 is_main_package=True,
305 should_be_acted_on=should_be_acted_on,
306 )
307 udeb_package = None
308 s = {
309 "Source": bin_package.name,
310 }
311 if source_package_fields is not None: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 s.update(source_package_fields)
313 source_package = SourcePackage(Deb822(s))
314 if related_udeb_package_fields is not None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 uf = dict(related_udeb_package_fields)
316 uf.setdefault("Package", f'{f["Package"]}-udeb')
317 uf.setdefault("Architecture", f["Architecture"])
318 uf.setdefault("Package-Type", "udeb")
319 udeb_package = BinaryPackage(
320 Deb822(uf),
321 process_table,
322 FAKE_DPKG_QUERY_TABLE,
323 is_main_package=False,
324 should_be_acted_on=True,
325 )
326 if related_udeb_package_version is None:
327 related_udeb_package_version = binary_package_version
328 if accessible_package_roots:
329 apr = []
330 for fields, apr_fs_root in accessible_package_roots:
331 apr_fields = Deb822(dict(fields))
332 if "Package" not in apr_fields: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 raise ValueError(
334 "Missing mandatory Package field in member of accessible_package_roots"
335 )
336 if "Architecture" not in apr_fields: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 raise ValueError(
338 "Missing mandatory Architecture field in member of accessible_package_roots"
339 )
340 apr_package = BinaryPackage(
341 apr_fields,
342 process_table,
343 FAKE_DPKG_QUERY_TABLE,
344 is_main_package=False,
345 should_be_acted_on=True,
346 )
347 r = package_cross_check_precheck(bin_package, apr_package)
348 if not r[0]: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 raise ValueError(
350 f"{apr_package.name} would not be accessible for {bin_package.name}"
351 )
352 apr.append((apr_package, apr_fs_root))
354 if related_udeb_fs_root is not None: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 if udeb_package is None:
356 raise ValueError(
357 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
358 )
359 r = package_cross_check_precheck(bin_package, udeb_package)
360 if not r[0]:
361 raise ValueError(
362 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
363 " related_udeb_fs_root is irrelevant"
364 )
365 apr.append((udeb_package, related_udeb_fs_root))
366 final_apr = tuple(apr)
367 else:
368 final_apr = tuple()
370 return PackageProcessingContextTestProvider(
371 source_package=source_package,
372 binary_package=bin_package,
373 related_udeb_package=udeb_package,
374 binary_package_version=binary_package_version,
375 related_udeb_package_version=related_udeb_package_version,
376 accessible_package_roots=lambda: final_apr,
377 manifest_configuration=manifest_configuration,
378 )
381def manifest_variable_resolution_context(
382 *,
383 debian_dir: VirtualPath | None = None,
384) -> VariableContext:
385 if debian_dir is None:
386 debian_dir = InMemoryVirtualRootDir()
388 return VariableContext(debian_dir)
391class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
392 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
394 def __init__(
395 self,
396 plugin_metadata: DebputyPluginMetadata,
397 plugin_source_id: str,
398 maintscript_container: dict[str, list[RegisteredMaintscript]],
399 ):
400 self._plugin_metadata = plugin_metadata
401 self._plugin_source_id = plugin_source_id
402 self._maintscript_container = maintscript_container
404 @classmethod
405 def _apply_condition_to_script(
406 cls, condition: str, run_snippet: str, /, indent: bool | None = None
407 ) -> str:
408 return run_snippet
410 def _append_script(
411 self,
412 caller_name: str,
413 maintscript: Maintscript,
414 full_script: str,
415 /,
416 perform_substitution: bool = True,
417 ) -> None:
418 if self._plugin_source_id not in self._maintscript_container:
419 self._maintscript_container[self._plugin_source_id] = []
420 self._maintscript_container[self._plugin_source_id].append(
421 RegisteredMaintscript(
422 maintscript,
423 caller_name,
424 full_script,
425 perform_substitution,
426 )
427 )
430class RegisteredMetadataImpl(RegisteredMetadata):
431 __slots__ = (
432 "_substvars",
433 "_triggers",
434 "_maintscripts",
435 )
437 def __init__(
438 self,
439 substvars: Substvars,
440 triggers: list[RegisteredTrigger],
441 maintscripts: list[RegisteredMaintscript],
442 ) -> None:
443 self._substvars = substvars
444 self._triggers = triggers
445 self._maintscripts = maintscripts
447 @property
448 def substvars(self) -> Substvars:
449 return self._substvars
451 @property
452 def triggers(self) -> list[RegisteredTrigger]:
453 return self._triggers
455 def maintscripts(
456 self,
457 *,
458 maintscript: Maintscript | None = None,
459 ) -> list[RegisteredMaintscript]:
460 if maintscript is None:
461 return self._maintscripts
462 return [m for m in self._maintscripts if m.maintscript == maintscript]
465class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
466 __slots__ = ("_maintscript_container",)
468 def __init__(
469 self,
470 plugin_metadata: DebputyPluginMetadata,
471 plugin_source_id: str,
472 context: PackageProcessingContext,
473 ) -> None:
474 super().__init__(
475 plugin_metadata,
476 plugin_source_id,
477 context,
478 {},
479 FlushableSubstvars(),
480 (None, None),
481 )
482 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
484 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
485 return MaintscriptAccessorTestProvider(
486 self._plugin_metadata,
487 self._plugin_source_id,
488 self._maintscript_container,
489 )
491 def registered_metadata(self) -> RegisteredMetadata:
492 return RegisteredMetadataImpl(
493 self._substvars,
494 [
495 RegisteredTrigger.from_plugin_provided_trigger(t)
496 for t in self._triggers.values()
497 if t.provider_source_id == self._plugin_source_id
498 ],
499 self._maintscript_container.get(self._plugin_source_id, []),
500 )
503class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
504 __slots__ = ("_service_manager_details", "_service_definitions")
506 def __init__(
507 self,
508 service_manager_details: ServiceManagerDetails,
509 detected_services: list[DetectedService[DSD]],
510 ) -> None:
511 self._service_manager_details = service_manager_details
512 self._service_definitions = detected_services
514 def register_service(
515 self,
516 path: VirtualPath,
517 name: str | list[str],
518 *,
519 type_of_service: str = "service", # "timer", etc.
520 service_scope: str = "system",
521 enable_by_default: bool = True,
522 start_by_default: bool = True,
523 default_upgrade_rule: ServiceUpgradeRule = "restart",
524 service_context: DSD | None = None,
525 ) -> None:
526 names = name if isinstance(name, list) else [name]
527 if len(names) < 1: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 raise ValueError(
529 f"The service must have at least one name - {path.absolute} did not have any"
530 )
531 self._service_definitions.append(
532 DetectedService(
533 path,
534 names,
535 type_of_service,
536 service_scope,
537 enable_by_default,
538 start_by_default,
539 default_upgrade_rule,
540 service_context,
541 )
542 )
545@contextlib.contextmanager
546def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
547 if fs_root.is_read_write: 547 ↛ 553line 547 didn't jump to line 553 because the condition on line 547 was always true
548 assert isinstance(fs_root, InMemoryVirtualRootDir)
549 fs_root.is_read_write = False
550 yield fs_root
551 fs_root.is_read_write = True
552 else:
553 yield fs_root
556class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
557 def __init__(
558 self,
559 plugin_name: str,
560 feature_set: PluginProvidedFeatureSet,
561 substitution: SubstitutionImpl,
562 ) -> None:
563 self._feature_set = feature_set
564 self._plugin_name = plugin_name
565 self._packager_provided_files: None | (
566 dict[str, RegisteredPackagerProvidedFile]
567 ) = None
568 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
569 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
570 self._substitution = substitution
571 assert plugin_name in self._feature_set.plugin_data
573 @property
574 def _plugin_metadata(self) -> DebputyPluginMetadata:
575 return self._feature_set.plugin_data[self._plugin_name]
577 def packager_provided_files_by_stem(
578 self,
579 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
580 ppf = self._packager_provided_files
581 if ppf is None:
582 result: dict[str, RegisteredPackagerProvidedFile] = {}
583 for spec in self._feature_set.packager_provided_files.values():
584 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
585 continue
586 # Registered as a virtual subclass, so this should always be True
587 assert isinstance(spec, RegisteredPackagerProvidedFile)
588 result[spec.stem] = spec
589 self._packager_provided_files = result
590 ppf = result
591 return ppf
593 def run_metadata_detector(
594 self,
595 metadata_detector_id: str,
596 fs_root: VirtualPath,
597 context: PackageProcessingContext | None = None,
598 ) -> RegisteredMetadata:
599 if not fs_root.is_root_dir(): 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 raise ValueError("Provided path must be the file system root.")
601 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
602 matching_detectors = [
603 d for d in detectors if d.detector_id == metadata_detector_id
604 ]
605 if len(matching_detectors) != 1: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 assert not matching_detectors
607 raise ValueError(
608 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
609 f' "{metadata_detector_id}"'
610 )
611 if context is None:
612 context = package_metadata_context()
613 detector = matching_detectors[0]
614 if not detector.applies_to(context.binary_package):
615 raise ValueError(
616 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
617 " given package. Consider using `package_metadata_context()` to emulate a binary package"
618 " with the correct specification. As an example: "
619 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
620 " package."
621 )
623 ctrl = BinaryCtrlAccessorTestProvider(
624 self._plugin_metadata,
625 metadata_detector_id,
626 context,
627 )
628 with _read_only_fs_root(fs_root) as ro_root:
629 detector.run_detector(
630 ro_root,
631 ctrl,
632 context,
633 )
634 return ctrl.registered_metadata()
636 def run_package_processor(
637 self,
638 package_processor_id: str,
639 fs_root: VirtualPath,
640 context: PackageProcessingContext | None = None,
641 ) -> None:
642 if not fs_root.is_root_dir(): 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 raise ValueError("Provided path must be the file system root.")
644 pp_key = (self._plugin_name, package_processor_id)
645 package_processor = self._feature_set.all_package_processors.get(pp_key)
646 if package_processor is None: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 raise ValueError(
648 f"The plugin {self._plugin_name} did not provide a package processor with ID"
649 f' "{package_processor_id}"'
650 )
651 if context is None: 651 ↛ 653line 651 didn't jump to line 653 because the condition on line 651 was always true
652 context = package_metadata_context()
653 if not fs_root.is_read_write: 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true
654 raise ValueError(
655 "The provided fs_root is read-only and it must be read-write for package processor"
656 )
657 if not package_processor.applies_to(context.binary_package): 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 raise ValueError(
659 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
660 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
661 " package with the correct specification. As an example: "
662 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
663 " package."
664 )
665 package_processor.run_package_processor(
666 fs_root,
667 None,
668 context,
669 )
671 @property
672 def declared_manifest_variables(self) -> frozenset[str]:
673 return frozenset(
674 {
675 k
676 for k, v in self._feature_set.manifest_variables.items()
677 if v.plugin_metadata.plugin_name == self._plugin_name
678 }
679 )
681 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
682 issues = []
683 for adr in self._feature_set.auto_discard_rules.values():
684 if adr.plugin_metadata.plugin_name != self._plugin_name: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true
685 continue
686 for idx, example in enumerate(adr.examples):
687 result = process_discard_rule_example(
688 adr,
689 example,
690 )
691 if result.inconsistent_paths:
692 issues.append(
693 ADRExampleIssue(
694 adr.name,
695 idx,
696 [
697 x.absolute + ("/" if x.is_dir else "")
698 for x in result.inconsistent_paths
699 ],
700 )
701 )
702 return issues
704 def run_service_detection_and_integrations(
705 self,
706 service_manager: str,
707 fs_root: VirtualPath,
708 context: PackageProcessingContext | None = None,
709 *,
710 service_context_type_hint: type[DSD] | None = None,
711 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]:
712 if not fs_root.is_root_dir(): 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true
713 raise ValueError("Provided path must be the file system root.")
714 try:
715 service_manager_details = self._feature_set.service_managers[
716 service_manager
717 ]
718 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true
719 raise KeyError(service_manager)
720 except KeyError:
721 raise ValueError(
722 f"The plugin {self._plugin_name} does not provide a"
723 f" service manager called {service_manager}"
724 ) from None
726 if context is None: 726 ↛ 728line 726 didn't jump to line 728 because the condition on line 726 was always true
727 context = package_metadata_context()
728 detected_services: list[DetectedService[DSD]] = []
729 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
730 service_manager_details.service_detector(
731 fs_root,
732 registry,
733 context,
734 )
735 ctrl = BinaryCtrlAccessorTestProvider(
736 self._plugin_metadata,
737 service_manager_details.service_manager,
738 context,
739 )
740 if detected_services:
741 service_definitions = [
742 ServiceDefinitionImpl(
743 ds.names[0],
744 ds.names,
745 ds.path,
746 ds.type_of_service,
747 ds.service_scope,
748 ds.enable_by_default,
749 ds.start_by_default,
750 ds.default_upgrade_rule,
751 self._plugin_name,
752 True,
753 ds.service_context,
754 )
755 for ds in detected_services
756 ]
757 service_manager_details.service_integrator(
758 service_definitions,
759 ctrl,
760 context,
761 )
762 return detected_services, ctrl.registered_metadata()
764 def manifest_variables(
765 self,
766 *,
767 resolution_context: VariableContext | None = None,
768 mocked_variables: Mapping[str, str] | None = None,
769 ) -> Mapping[str, str]:
770 valid_manifest_variables = frozenset(
771 {
772 n
773 for n, v in self._feature_set.manifest_variables.items()
774 if v.plugin_metadata.plugin_name == self._plugin_name
775 }
776 )
777 if resolution_context is None:
778 resolution_context = manifest_variable_resolution_context()
779 substitution = self._substitution.copy_for_subst_test(
780 self._feature_set,
781 resolution_context,
782 extra_substitutions=mocked_variables,
783 )
784 return SubstitutionTable(
785 valid_manifest_variables,
786 substitution,
787 )
790class SubstitutionTable(Mapping[str, str]):
791 def __init__(
792 self, valid_manifest_variables: frozenset[str], substitution: Substitution
793 ) -> None:
794 self._valid_manifest_variables = valid_manifest_variables
795 self._resolved: set[str] = set()
796 self._substitution = substitution
798 def __contains__(self, item: object) -> bool:
799 return item in self._valid_manifest_variables
801 def __getitem__(self, key: str) -> str:
802 if key not in self._valid_manifest_variables: 802 ↛ 803line 802 didn't jump to line 803 because the condition on line 802 was never true
803 raise KeyError(key)
804 v = self._substitution.substitute(
805 "{{" + key + "}}", f"test of manifest variable `{key}`"
806 )
807 self._resolved.add(key)
808 return v
810 def __len__(self) -> int:
811 return len(self._valid_manifest_variables)
813 def __iter__(self) -> Iterator[str]:
814 return iter(self._valid_manifest_variables)
816 def keys(self) -> KeysView[str]:
817 return cast("KeysView[str]", self._valid_manifest_variables)