Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%
307 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-04 10:15 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-04 10:15 +0000
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable
6from importlib.resources.abc import Traversable
7from io import BytesIO
8from pathlib import Path
9from typing import (
10 cast,
11 TYPE_CHECKING,
12)
14from debian.deb822 import Deb822
15from debian.debian_support import DpkgArchTable
16from debian.substvars import Substvars
18from debputy import DEBPUTY_PLUGIN_ROOT_DIR
19from debputy.architecture_support import faked_arch_table
20from debputy.filesystem_scan import OSFSROOverlay, FSRootDir
21from debputy.packages import BinaryPackage, SourcePackage
22from debputy.plugin.api import (
23 PluginInitializationEntryPoint,
24 VirtualPath,
25 PackageProcessingContext,
26 DpkgTriggerType,
27 Maintscript,
28)
29from debputy.plugin.api.example_processing import process_discard_rule_example
30from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
31from debputy.plugin.api.impl import (
32 plugin_metadata_for_debputys_own_plugin,
33 DebputyPluginInitializerProvider,
34 parse_json_plugin_desc,
35 MaintscriptAccessorProviderBase,
36 BinaryCtrlAccessorProviderBase,
37 PLUGIN_TEST_SUFFIX,
38 find_json_plugin,
39 ServiceDefinitionImpl,
40)
41from debputy.plugin.api.impl_types import (
42 PackagerProvidedFileClassSpec,
43 DebputyPluginMetadata,
44 PluginProvidedTrigger,
45 ServiceManagerDetails,
46)
47from debputy.plugin.api.spec import (
48 MaintscriptAccessor,
49 FlushableSubstvars,
50 ServiceRegistry,
51 DSD,
52 ServiceUpgradeRule,
53)
54from debputy.plugin.api.test_api.test_spec import (
55 InitializedPluginUnderTest,
56 RegisteredPackagerProvidedFile,
57 RegisteredTrigger,
58 RegisteredMaintscript,
59 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
60 ADRExampleIssue,
61 DetectedService,
62 RegisteredMetadata,
63)
64from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features
65from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
66from debputy.util import package_cross_check_precheck
68if TYPE_CHECKING:
69 from debputy.highlevel_manifest import HighLevelManifest
72RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
75type ManifestConfigurationImplementation[T] = Callable[
76 [SourcePackage | BinaryPackage, type[T]], T
77]
80@dataclasses.dataclass(frozen=True, slots=True)
81class PackageProcessingContextTestProvider(PackageProcessingContext):
82 source_package: SourcePackage
83 binary_package: BinaryPackage
84 binary_package_version: str
85 related_udeb_package: BinaryPackage | None
86 related_udeb_package_version: str | None
87 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]]
88 manifest_configuration: ManifestConfigurationImplementation
90 # TODO: implement (when needed)
91 # dpkg_arch_query_table
92 # deb_options_and_profiles (pull from binary ?)
93 # source_condition_context
94 # condition_context
97def _initialize_plugin_under_test(
98 plugin_metadata: DebputyPluginMetadata,
99 load_debputy_plugin: bool = True,
100) -> "InitializedPluginUnderTest":
101 feature_set = PluginProvidedFeatureSet()
102 substitution = SubstitutionImpl(
103 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
104 variable_context=VariableContext(
105 OSFSROOverlay.create_root_dir("debian", "debian"),
106 ),
107 plugin_feature_set=feature_set,
108 )
110 if load_debputy_plugin:
111 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
112 initialize_debputy_features
113 )
114 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
115 debputy_provider = DebputyPluginInitializerProvider(
116 debputy_plugin_metadata,
117 feature_set,
118 substitution,
119 )
120 debputy_provider.load_plugin()
122 plugin_under_test_provider = DebputyPluginInitializerProvider(
123 plugin_metadata,
124 feature_set,
125 substitution,
126 )
127 plugin_under_test_provider.load_plugin()
129 return InitializedPluginUnderTestImpl(
130 plugin_metadata.plugin_name,
131 feature_set,
132 substitution,
133 )
136def _auto_load_plugin_from_filename(
137 py_test_filename: str,
138) -> "InitializedPluginUnderTest":
139 dirname, basename = os.path.split(py_test_filename)
140 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
142 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
143 if test_location == "uninstalled":
144 json_basename = f"{plugin_name}.json"
145 json_desc_file = os.path.join(dirname, json_basename)
146 if "/" not in json_desc_file: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 json_desc_file = f"./{json_desc_file}"
149 if os.path.isfile(json_desc_file): 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true
150 return _initialize_plugin_from_desc(json_desc_file)
152 json_desc_file_in = f"{json_desc_file}.in"
153 if os.path.isfile(json_desc_file_in):
154 return _initialize_plugin_from_desc(json_desc_file)
155 raise FileNotFoundError(
156 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
157 f" {json_desc_file} or {json_desc_file_in}"
158 )
160 if test_location == "installed": 160 ↛ 164line 160 didn't jump to line 164 because the condition on line 160 was always true
161 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
162 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
164 raise ValueError(
165 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
166 ' unset OR one of "installed", "uninstalled".'
167 )
170def initialize_plugin_under_test(
171 *,
172 plugin_desc_file: str | None = None,
173) -> "InitializedPluginUnderTest":
174 """Load and initialize a plugin for testing it
176 This method will load the plugin via plugin description, which is the method that `debputy` does at
177 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
178 of the flow).
180 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
181 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
182 This works for "single-file" plugins, where the description file and the test are right next to
183 each other.
184 :return: The loaded plugin for testing
185 """
186 if plugin_desc_file is None:
187 caller_file = inspect.stack()[1].filename
188 return _auto_load_plugin_from_filename(caller_file)
189 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 raise RuntimeError(
191 "Running the test against an installed plugin does not work when"
192 " plugin_desc_file is provided. Please skip this test. You can "
193 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
194 " conditional for this purpose."
195 )
196 return _initialize_plugin_from_desc(plugin_desc_file)
199def _initialize_plugin_from_desc(
200 desc_file: str,
201) -> "InitializedPluginUnderTest":
202 if not desc_file.endswith((".json", ".json.in")): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 raise ValueError("The plugin file must end with .json or .json.in")
205 plugin_metadata = parse_json_plugin_desc(desc_file)
207 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
210def initialize_plugin_under_test_from_inline_json(
211 plugin_name: str,
212 json_content: str,
213) -> "InitializedPluginUnderTest":
214 with BytesIO(json_content.encode("utf-8")) as fd:
215 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
217 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
220def initialize_plugin_under_test_preloaded(
221 api_compat_version: int,
222 plugin_initializer: PluginInitializationEntryPoint,
223 /,
224 plugin_name: str = "plugin-under-test",
225 load_debputy_plugin: bool = True,
226 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None,
227) -> "InitializedPluginUnderTest":
228 """Internal API: Initialize a plugin for testing without loading it from a file
230 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
231 description file is correct. Notably, any feature provided via the JSON description file will
232 **NOT** be visible for the test.
234 This API is mostly useful for testing parts of debputy itself.
236 :param api_compat_version: The API version the plugin was written for. Use the same version as the
237 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
238 :param plugin_initializer: The entry point of the plugin
239 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
240 use a test name and version. However, you can explicitly set if you want the real name/version.
241 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
242 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
243 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
244 to enable debputy to use this method for self testing.
245 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The
246 default is to not load the documentation.
247 :return: The loaded plugin for testing
248 """
250 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 raise RuntimeError(
252 "Running the test against an installed plugin does not work when"
253 " the plugin is preload. Please skip this test. You can "
254 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
255 " conditional for this purpose."
256 )
258 plugin_metadata = DebputyPluginMetadata(
259 plugin_name=plugin_name,
260 api_compat_version=api_compat_version,
261 plugin_initializer=plugin_initializer,
262 plugin_loader=None,
263 plugin_path="<loaded-via-test>",
264 plugin_doc_path_resolver=plugin_doc_path_resolver,
265 )
267 return _initialize_plugin_under_test(
268 plugin_metadata,
269 load_debputy_plugin=load_debputy_plugin,
270 )
273class _MockArchTable:
274 @staticmethod
275 def matches_architecture(_a: str, _b: str) -> bool:
276 return True
279FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable())
280del _MockArchTable
283def package_metadata_context(
284 *,
285 host_arch: str = "amd64",
286 package_fields: dict[str, str] | None = None,
287 related_udeb_package_fields: dict[str, str] | None = None,
288 binary_package_version: str = "1.0-1",
289 related_udeb_package_version: str | None = None,
290 should_be_acted_on: bool = True,
291 related_udeb_fs_root: VirtualPath | None = None,
292 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(),
293 source_package_fields: dict[str, str] | None = None,
294 manifest_configuration: ManifestConfigurationImplementation = lambda x, y: None,
295) -> PackageProcessingContext:
296 process_table = faked_arch_table(host_arch)
297 f = {
298 "Package": "foo",
299 "Architecture": "any",
300 }
301 if package_fields is not None:
302 f.update(package_fields)
304 bin_package = BinaryPackage(
305 Deb822(f),
306 process_table,
307 FAKE_DPKG_QUERY_TABLE,
308 is_main_package=True,
309 should_be_acted_on=should_be_acted_on,
310 )
311 udeb_package = None
312 s = {
313 "Source": bin_package.name,
314 }
315 if source_package_fields is not None: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 s.update(source_package_fields)
317 source_package = SourcePackage(Deb822(s))
318 if related_udeb_package_fields is not None: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 uf = dict(related_udeb_package_fields)
320 uf.setdefault("Package", f'{f["Package"]}-udeb')
321 uf.setdefault("Architecture", f["Architecture"])
322 uf.setdefault("Package-Type", "udeb")
323 udeb_package = BinaryPackage(
324 Deb822(uf),
325 process_table,
326 FAKE_DPKG_QUERY_TABLE,
327 is_main_package=False,
328 should_be_acted_on=True,
329 )
330 if related_udeb_package_version is None:
331 related_udeb_package_version = binary_package_version
332 if accessible_package_roots:
333 apr = []
334 for fields, apr_fs_root in accessible_package_roots:
335 apr_fields = Deb822(dict(fields))
336 if "Package" not in apr_fields: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 raise ValueError(
338 "Missing mandatory Package field in member of accessible_package_roots"
339 )
340 if "Architecture" not in apr_fields: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 raise ValueError(
342 "Missing mandatory Architecture field in member of accessible_package_roots"
343 )
344 apr_package = BinaryPackage(
345 apr_fields,
346 process_table,
347 FAKE_DPKG_QUERY_TABLE,
348 is_main_package=False,
349 should_be_acted_on=True,
350 )
351 r = package_cross_check_precheck(bin_package, apr_package)
352 if not r[0]: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true
353 raise ValueError(
354 f"{apr_package.name} would not be accessible for {bin_package.name}"
355 )
356 apr.append((apr_package, apr_fs_root))
358 if related_udeb_fs_root is not None: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 if udeb_package is None:
360 raise ValueError(
361 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
362 )
363 r = package_cross_check_precheck(bin_package, udeb_package)
364 if not r[0]:
365 raise ValueError(
366 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
367 " related_udeb_fs_root is irrelevant"
368 )
369 apr.append((udeb_package, related_udeb_fs_root))
370 final_apr = tuple(apr)
371 else:
372 final_apr = tuple()
374 return PackageProcessingContextTestProvider(
375 source_package=source_package,
376 binary_package=bin_package,
377 related_udeb_package=udeb_package,
378 binary_package_version=binary_package_version,
379 related_udeb_package_version=related_udeb_package_version,
380 accessible_package_roots=lambda: final_apr,
381 manifest_configuration=manifest_configuration,
382 )
385def manifest_variable_resolution_context(
386 *,
387 debian_dir: VirtualPath | None = None,
388) -> VariableContext:
389 if debian_dir is None:
390 debian_dir = FSRootDir()
392 return VariableContext(debian_dir)
395class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
396 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
398 def __init__(
399 self,
400 plugin_metadata: DebputyPluginMetadata,
401 plugin_source_id: str,
402 maintscript_container: dict[str, list[RegisteredMaintscript]],
403 ):
404 self._plugin_metadata = plugin_metadata
405 self._plugin_source_id = plugin_source_id
406 self._maintscript_container = maintscript_container
408 @classmethod
409 def _apply_condition_to_script(
410 cls, condition: str, run_snippet: str, /, indent: bool | None = None
411 ) -> str:
412 return run_snippet
414 def _append_script(
415 self,
416 caller_name: str,
417 maintscript: Maintscript,
418 full_script: str,
419 /,
420 perform_substitution: bool = True,
421 ) -> None:
422 if self._plugin_source_id not in self._maintscript_container:
423 self._maintscript_container[self._plugin_source_id] = []
424 self._maintscript_container[self._plugin_source_id].append(
425 RegisteredMaintscript(
426 maintscript,
427 caller_name,
428 full_script,
429 perform_substitution,
430 )
431 )
434class RegisteredMetadataImpl(RegisteredMetadata):
435 __slots__ = (
436 "_substvars",
437 "_triggers",
438 "_maintscripts",
439 )
441 def __init__(
442 self,
443 substvars: Substvars,
444 triggers: list[RegisteredTrigger],
445 maintscripts: list[RegisteredMaintscript],
446 ) -> None:
447 self._substvars = substvars
448 self._triggers = triggers
449 self._maintscripts = maintscripts
451 @property
452 def substvars(self) -> Substvars:
453 return self._substvars
455 @property
456 def triggers(self) -> list[RegisteredTrigger]:
457 return self._triggers
459 def maintscripts(
460 self,
461 *,
462 maintscript: Maintscript | None = None,
463 ) -> list[RegisteredMaintscript]:
464 if maintscript is None:
465 return self._maintscripts
466 return [m for m in self._maintscripts if m.maintscript == maintscript]
469class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
470 __slots__ = ("_maintscript_container",)
472 def __init__(
473 self,
474 plugin_metadata: DebputyPluginMetadata,
475 plugin_source_id: str,
476 context: PackageProcessingContext,
477 ) -> None:
478 super().__init__(
479 plugin_metadata,
480 plugin_source_id,
481 context,
482 {},
483 FlushableSubstvars(),
484 (None, None),
485 )
486 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
488 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
489 return MaintscriptAccessorTestProvider(
490 self._plugin_metadata,
491 self._plugin_source_id,
492 self._maintscript_container,
493 )
495 def registered_metadata(self) -> RegisteredMetadata:
496 return RegisteredMetadataImpl(
497 self._substvars,
498 [
499 RegisteredTrigger.from_plugin_provided_trigger(t)
500 for t in self._triggers.values()
501 if t.provider_source_id == self._plugin_source_id
502 ],
503 self._maintscript_container.get(self._plugin_source_id, []),
504 )
507class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
508 __slots__ = ("_service_manager_details", "_service_definitions")
510 def __init__(
511 self,
512 service_manager_details: ServiceManagerDetails,
513 detected_services: list[DetectedService[DSD]],
514 ) -> None:
515 self._service_manager_details = service_manager_details
516 self._service_definitions = detected_services
518 def register_service(
519 self,
520 path: VirtualPath,
521 name: str | list[str],
522 *,
523 type_of_service: str = "service", # "timer", etc.
524 service_scope: str = "system",
525 enable_by_default: bool = True,
526 start_by_default: bool = True,
527 default_upgrade_rule: ServiceUpgradeRule = "restart",
528 service_context: DSD | None = None,
529 ) -> None:
530 names = name if isinstance(name, list) else [name]
531 if len(names) < 1: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 raise ValueError(
533 f"The service must have at least one name - {path.absolute} did not have any"
534 )
535 self._service_definitions.append(
536 DetectedService(
537 path,
538 names,
539 type_of_service,
540 service_scope,
541 enable_by_default,
542 start_by_default,
543 default_upgrade_rule,
544 service_context,
545 )
546 )
549@contextlib.contextmanager
550def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
551 if fs_root.is_read_write: 551 ↛ 557line 551 didn't jump to line 557 because the condition on line 551 was always true
552 assert isinstance(fs_root, FSRootDir)
553 fs_root.is_read_write = False
554 yield fs_root
555 fs_root.is_read_write = True
556 else:
557 yield fs_root
560class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
561 def __init__(
562 self,
563 plugin_name: str,
564 feature_set: PluginProvidedFeatureSet,
565 substitution: SubstitutionImpl,
566 ) -> None:
567 self._feature_set = feature_set
568 self._plugin_name = plugin_name
569 self._packager_provided_files: None | (
570 dict[str, RegisteredPackagerProvidedFile]
571 ) = None
572 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
573 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
574 self._substitution = substitution
575 assert plugin_name in self._feature_set.plugin_data
577 @property
578 def _plugin_metadata(self) -> DebputyPluginMetadata:
579 return self._feature_set.plugin_data[self._plugin_name]
581 def packager_provided_files_by_stem(
582 self,
583 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
584 ppf = self._packager_provided_files
585 if ppf is None:
586 result: dict[str, RegisteredPackagerProvidedFile] = {}
587 for spec in self._feature_set.packager_provided_files.values():
588 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
589 continue
590 # Registered as a virtual subclass, so this should always be True
591 assert isinstance(spec, RegisteredPackagerProvidedFile)
592 result[spec.stem] = spec
593 self._packager_provided_files = result
594 ppf = result
595 return ppf
597 def run_metadata_detector(
598 self,
599 metadata_detector_id: str,
600 fs_root: VirtualPath,
601 context: PackageProcessingContext | None = None,
602 ) -> RegisteredMetadata:
603 if fs_root.parent_dir is not None: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 raise ValueError("Provided path must be the file system root.")
605 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
606 matching_detectors = [
607 d for d in detectors if d.detector_id == metadata_detector_id
608 ]
609 if len(matching_detectors) != 1: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 assert not matching_detectors
611 raise ValueError(
612 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
613 f' "{metadata_detector_id}"'
614 )
615 if context is None:
616 context = package_metadata_context()
617 detector = matching_detectors[0]
618 if not detector.applies_to(context.binary_package):
619 raise ValueError(
620 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
621 " given package. Consider using `package_metadata_context()` to emulate a binary package"
622 " with the correct specification. As an example: "
623 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
624 " package."
625 )
627 ctrl = BinaryCtrlAccessorTestProvider(
628 self._plugin_metadata,
629 metadata_detector_id,
630 context,
631 )
632 with _read_only_fs_root(fs_root) as ro_root:
633 detector.run_detector(
634 ro_root,
635 ctrl,
636 context,
637 )
638 return ctrl.registered_metadata()
640 def run_package_processor(
641 self,
642 package_processor_id: str,
643 fs_root: VirtualPath,
644 context: PackageProcessingContext | None = None,
645 ) -> None:
646 if fs_root.parent_dir is not None: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 raise ValueError("Provided path must be the file system root.")
648 pp_key = (self._plugin_name, package_processor_id)
649 package_processor = self._feature_set.all_package_processors.get(pp_key)
650 if package_processor is None: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 raise ValueError(
652 f"The plugin {self._plugin_name} did not provide a package processor with ID"
653 f' "{package_processor_id}"'
654 )
655 if context is None: 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true
656 context = package_metadata_context()
657 if not fs_root.is_read_write: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 raise ValueError(
659 "The provided fs_root is read-only and it must be read-write for package processor"
660 )
661 if not package_processor.applies_to(context.binary_package): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true
662 raise ValueError(
663 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
664 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
665 " package with the correct specification. As an example: "
666 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
667 " package."
668 )
669 package_processor.run_package_processor(
670 fs_root,
671 None,
672 context,
673 )
675 @property
676 def declared_manifest_variables(self) -> frozenset[str]:
677 return frozenset(
678 {
679 k
680 for k, v in self._feature_set.manifest_variables.items()
681 if v.plugin_metadata.plugin_name == self._plugin_name
682 }
683 )
685 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
686 issues = []
687 for adr in self._feature_set.auto_discard_rules.values():
688 if adr.plugin_metadata.plugin_name != self._plugin_name: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true
689 continue
690 for idx, example in enumerate(adr.examples):
691 result = process_discard_rule_example(
692 adr,
693 example,
694 )
695 if result.inconsistent_paths:
696 issues.append(
697 ADRExampleIssue(
698 adr.name,
699 idx,
700 [
701 x.absolute + ("/" if x.is_dir else "")
702 for x in result.inconsistent_paths
703 ],
704 )
705 )
706 return issues
708 def run_service_detection_and_integrations(
709 self,
710 service_manager: str,
711 fs_root: VirtualPath,
712 context: PackageProcessingContext | None = None,
713 *,
714 service_context_type_hint: type[DSD] | None = None,
715 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]:
716 if fs_root.parent_dir is not None: 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true
717 raise ValueError("Provided path must be the file system root.")
718 try:
719 service_manager_details = self._feature_set.service_managers[
720 service_manager
721 ]
722 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true
723 raise KeyError(service_manager)
724 except KeyError:
725 raise ValueError(
726 f"The plugin {self._plugin_name} does not provide a"
727 f" service manager called {service_manager}"
728 ) from None
730 if context is None: 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true
731 context = package_metadata_context()
732 detected_services: list[DetectedService[DSD]] = []
733 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
734 service_manager_details.service_detector(
735 fs_root,
736 registry,
737 context,
738 )
739 ctrl = BinaryCtrlAccessorTestProvider(
740 self._plugin_metadata,
741 service_manager_details.service_manager,
742 context,
743 )
744 if detected_services:
745 service_definitions = [
746 ServiceDefinitionImpl(
747 ds.names[0],
748 ds.names,
749 ds.path,
750 ds.type_of_service,
751 ds.service_scope,
752 ds.enable_by_default,
753 ds.start_by_default,
754 ds.default_upgrade_rule,
755 self._plugin_name,
756 True,
757 ds.service_context,
758 )
759 for ds in detected_services
760 ]
761 service_manager_details.service_integrator(
762 service_definitions,
763 ctrl,
764 context,
765 )
766 return detected_services, ctrl.registered_metadata()
768 def manifest_variables(
769 self,
770 *,
771 resolution_context: VariableContext | None = None,
772 mocked_variables: Mapping[str, str] | None = None,
773 ) -> Mapping[str, str]:
774 valid_manifest_variables = frozenset(
775 {
776 n
777 for n, v in self._feature_set.manifest_variables.items()
778 if v.plugin_metadata.plugin_name == self._plugin_name
779 }
780 )
781 if resolution_context is None:
782 resolution_context = manifest_variable_resolution_context()
783 substitution = self._substitution.copy_for_subst_test(
784 self._feature_set,
785 resolution_context,
786 extra_substitutions=mocked_variables,
787 )
788 return SubstitutionTable(
789 valid_manifest_variables,
790 substitution,
791 )
794class SubstitutionTable(Mapping[str, str]):
795 def __init__(
796 self, valid_manifest_variables: frozenset[str], substitution: Substitution
797 ) -> None:
798 self._valid_manifest_variables = valid_manifest_variables
799 self._resolved: set[str] = set()
800 self._substitution = substitution
802 def __contains__(self, item: object) -> bool:
803 return item in self._valid_manifest_variables
805 def __getitem__(self, key: str) -> str:
806 if key not in self._valid_manifest_variables: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 raise KeyError(key)
808 v = self._substitution.substitute(
809 "{{" + key + "}}", f"test of manifest variable `{key}`"
810 )
811 self._resolved.add(key)
812 return v
814 def __len__(self) -> int:
815 return len(self._valid_manifest_variables)
817 def __iter__(self) -> Iterator[str]:
818 return iter(self._valid_manifest_variables)
820 def keys(self) -> KeysView[str]:
821 return cast("KeysView[str]", self._valid_manifest_variables)