Coverage for src/debputy/plugin/api/test_api/test_impl.py: 82%
298 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from importlib.resources.abc import Traversable
6from io import BytesIO
7from pathlib import Path
8from typing import (
9 Mapping,
10 Dict,
11 Optional,
12 Tuple,
13 List,
14 cast,
15 FrozenSet,
16 Sequence,
17 Union,
18 Type,
19 Iterator,
20 Set,
21 KeysView,
22 Callable,
23)
25from debian.deb822 import Deb822
26from debian.substvars import Substvars
28from debputy import DEBPUTY_PLUGIN_ROOT_DIR
29from debputy.architecture_support import faked_arch_table
30from debputy.filesystem_scan import FSROOverlay, FSRootDir
31from debputy.packages import BinaryPackage
32from debputy.plugin.api import (
33 PluginInitializationEntryPoint,
34 VirtualPath,
35 PackageProcessingContext,
36 DpkgTriggerType,
37 Maintscript,
38)
39from debputy.plugin.api.example_processing import process_discard_rule_example
40from debputy.plugin.api.impl import (
41 plugin_metadata_for_debputys_own_plugin,
42 DebputyPluginInitializerProvider,
43 parse_json_plugin_desc,
44 MaintscriptAccessorProviderBase,
45 BinaryCtrlAccessorProviderBase,
46 PLUGIN_TEST_SUFFIX,
47 find_json_plugin,
48 ServiceDefinitionImpl,
49)
50from debputy.plugin.api.impl_types import (
51 PackagerProvidedFileClassSpec,
52 DebputyPluginMetadata,
53 PluginProvidedTrigger,
54 ServiceManagerDetails,
55)
56from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
57from debputy.plugin.api.spec import (
58 MaintscriptAccessor,
59 FlushableSubstvars,
60 ServiceRegistry,
61 DSD,
62 ServiceUpgradeRule,
63)
64from debputy.plugin.api.test_api.test_spec import (
65 InitializedPluginUnderTest,
66 RegisteredPackagerProvidedFile,
67 RegisteredTrigger,
68 RegisteredMaintscript,
69 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
70 ADRExampleIssue,
71 DetectedService,
72 RegisteredMetadata,
73)
74from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features
75from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
76from debputy.util import package_cross_check_precheck
78RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
81@dataclasses.dataclass(frozen=True, slots=True)
82class PackageProcessingContextTestProvider(PackageProcessingContext):
83 binary_package: BinaryPackage
84 binary_package_version: str
85 related_udeb_package: Optional[BinaryPackage]
86 related_udeb_package_version: Optional[str]
87 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]]
90def _initialize_plugin_under_test(
91 plugin_metadata: DebputyPluginMetadata,
92 load_debputy_plugin: bool = True,
93) -> "InitializedPluginUnderTest":
94 feature_set = PluginProvidedFeatureSet()
95 substitution = SubstitutionImpl(
96 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
97 variable_context=VariableContext(
98 FSROOverlay.create_root_dir("debian", "debian"),
99 ),
100 plugin_feature_set=feature_set,
101 )
103 if load_debputy_plugin:
104 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
105 initialize_debputy_features
106 )
107 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
108 debputy_provider = DebputyPluginInitializerProvider(
109 debputy_plugin_metadata,
110 feature_set,
111 substitution,
112 )
113 debputy_provider.load_plugin()
115 plugin_under_test_provider = DebputyPluginInitializerProvider(
116 plugin_metadata,
117 feature_set,
118 substitution,
119 )
120 plugin_under_test_provider.load_plugin()
122 return InitializedPluginUnderTestImpl(
123 plugin_metadata.plugin_name,
124 feature_set,
125 substitution,
126 )
129def _auto_load_plugin_from_filename(
130 py_test_filename: str,
131) -> "InitializedPluginUnderTest":
132 dirname, basename = os.path.split(py_test_filename)
133 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
135 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
136 if test_location == "uninstalled":
137 json_basename = f"{plugin_name}.json"
138 json_desc_file = os.path.join(dirname, json_basename)
139 if "/" not in json_desc_file: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 json_desc_file = f"./{json_desc_file}"
142 if os.path.isfile(json_desc_file): 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true
143 return _initialize_plugin_from_desc(json_desc_file)
145 json_desc_file_in = f"{json_desc_file}.in"
146 if os.path.isfile(json_desc_file_in):
147 return _initialize_plugin_from_desc(json_desc_file)
148 raise FileNotFoundError(
149 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
150 f" {json_desc_file} or {json_desc_file_in}"
151 )
153 if test_location == "installed": 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true
154 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
155 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
157 raise ValueError(
158 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
159 ' unset OR one of "installed", "uninstalled".'
160 )
163def initialize_plugin_under_test(
164 *,
165 plugin_desc_file: Optional[str] = None,
166) -> "InitializedPluginUnderTest":
167 """Load and initialize a plugin for testing it
169 This method will load the plugin via plugin description, which is the method that `debputy` does at
170 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
171 of the flow).
173 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
174 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
175 This works for "single-file" plugins, where the description file and the test are right next to
176 each other.
178 Note that the description file is *not* required to a valid version at this stage (e.g., "N/A" or
179 "@PLACEHOLDER@") is fine. So you still use this method if you substitute in the version during
180 build after running the tests. To support this flow, the file name can also end with `.json.in`
181 (instead of `.json`).
182 :return: The loaded plugin for testing
183 """
184 if plugin_desc_file is None:
185 caller_file = inspect.stack()[1].filename
186 return _auto_load_plugin_from_filename(caller_file)
187 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 raise RuntimeError(
189 "Running the test against an installed plugin does not work when"
190 " plugin_desc_file is provided. Please skip this test. You can "
191 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
192 " conditional for this purpose."
193 )
194 return _initialize_plugin_from_desc(plugin_desc_file)
197def _initialize_plugin_from_desc(
198 desc_file: str,
199) -> "InitializedPluginUnderTest":
200 if not desc_file.endswith((".json", ".json.in")): 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 raise ValueError("The plugin file must end with .json or .json.in")
203 plugin_metadata = parse_json_plugin_desc(desc_file)
205 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
208def initialize_plugin_under_test_from_inline_json(
209 plugin_name: str,
210 json_content: str,
211) -> "InitializedPluginUnderTest":
212 with BytesIO(json_content.encode("utf-8")) as fd:
213 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
215 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
218def initialize_plugin_under_test_preloaded( 218 ↛ exitline 218 didn't jump to the function exit
219 api_compat_version: int,
220 plugin_initializer: PluginInitializationEntryPoint,
221 /,
222 plugin_name: str = "plugin-under-test",
223 load_debputy_plugin: bool = True,
224 plugin_doc_path_resolver: Callable[
225 [], Optional[Union[str, Traversable, Path]]
226 ] = lambda: None,
227) -> "InitializedPluginUnderTest":
228 """Internal API: Initialize a plugin for testing without loading it from a file
230 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
231 description file is correct. Notably, any feature provided via the JSON description file will
232 **NOT** be visible for the test.
234 This API is mostly useful for testing parts of debputy itself.
236 :param api_compat_version: The API version the plugin was written for. Use the same version as the
237 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
238 :param plugin_initializer: The entry point of the plugin
239 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
240 use a test name and version. However, you can explicitly set if you want the real name/version.
241 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
242 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
243 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
244 to enable debputy to use this method for self testing.
245 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The
246 default is to not load the documentation.
247 :return: The loaded plugin for testing
248 """
250 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 raise RuntimeError(
252 "Running the test against an installed plugin does not work when"
253 " the plugin is preload. Please skip this test. You can "
254 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
255 " conditional for this purpose."
256 )
258 plugin_metadata = DebputyPluginMetadata(
259 plugin_name=plugin_name,
260 api_compat_version=api_compat_version,
261 plugin_initializer=plugin_initializer,
262 plugin_loader=None,
263 plugin_path="<loaded-via-test>",
264 )
266 return _initialize_plugin_under_test(
267 plugin_metadata,
268 load_debputy_plugin=load_debputy_plugin,
269 )
272class _MockArchTable:
273 @staticmethod
274 def matches_architecture(_a: str, _b: str) -> bool:
275 return True
278FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable())
279del _MockArchTable
282def package_metadata_context(
283 *,
284 host_arch: str = "amd64",
285 package_fields: Optional[Dict[str, str]] = None,
286 related_udeb_package_fields: Optional[Dict[str, str]] = None,
287 binary_package_version: str = "1.0-1",
288 related_udeb_package_version: Optional[str] = None,
289 should_be_acted_on: bool = True,
290 related_udeb_fs_root: Optional[VirtualPath] = None,
291 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(),
292) -> PackageProcessingContext:
293 process_table = faked_arch_table(host_arch)
294 f = {
295 "Package": "foo",
296 "Architecture": "any",
297 }
298 if package_fields is not None:
299 f.update(package_fields)
301 bin_package = BinaryPackage(
302 Deb822(f),
303 process_table,
304 FAKE_DPKG_QUERY_TABLE,
305 is_main_package=True,
306 should_be_acted_on=should_be_acted_on,
307 )
308 udeb_package = None
309 if related_udeb_package_fields is not None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 uf = dict(related_udeb_package_fields)
311 uf.setdefault("Package", f'{f["Package"]}-udeb')
312 uf.setdefault("Architecture", f["Architecture"])
313 uf.setdefault("Package-Type", "udeb")
314 udeb_package = BinaryPackage(
315 Deb822(uf),
316 process_table,
317 FAKE_DPKG_QUERY_TABLE,
318 is_main_package=False,
319 should_be_acted_on=True,
320 )
321 if related_udeb_package_version is None:
322 related_udeb_package_version = binary_package_version
323 if accessible_package_roots:
324 apr = []
325 for fields, apr_fs_root in accessible_package_roots:
326 apr_fields = Deb822(dict(fields))
327 if "Package" not in apr_fields: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise ValueError(
329 "Missing mandatory Package field in member of accessible_package_roots"
330 )
331 if "Architecture" not in apr_fields: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise ValueError(
333 "Missing mandatory Architecture field in member of accessible_package_roots"
334 )
335 apr_package = BinaryPackage(
336 apr_fields,
337 process_table,
338 FAKE_DPKG_QUERY_TABLE,
339 is_main_package=False,
340 should_be_acted_on=True,
341 )
342 r = package_cross_check_precheck(bin_package, apr_package)
343 if not r[0]: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 raise ValueError(
345 f"{apr_package.name} would not be accessible for {bin_package.name}"
346 )
347 apr.append((apr_package, apr_fs_root))
349 if related_udeb_fs_root is not None: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 if udeb_package is None:
351 raise ValueError(
352 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
353 )
354 r = package_cross_check_precheck(bin_package, udeb_package)
355 if not r[0]:
356 raise ValueError(
357 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
358 " related_udeb_fs_root is irrelevant"
359 )
360 apr.append(udeb_package)
361 apr = tuple(apr)
362 else:
363 apr = tuple()
365 return PackageProcessingContextTestProvider(
366 binary_package=bin_package,
367 related_udeb_package=udeb_package,
368 binary_package_version=binary_package_version,
369 related_udeb_package_version=related_udeb_package_version,
370 accessible_package_roots=lambda: apr,
371 )
374def manifest_variable_resolution_context(
375 *,
376 debian_dir: Optional[VirtualPath] = None,
377) -> VariableContext:
378 if debian_dir is None:
379 debian_dir = FSRootDir()
381 return VariableContext(debian_dir)
384class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
385 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
387 def __init__(
388 self,
389 plugin_metadata: DebputyPluginMetadata,
390 plugin_source_id: str,
391 maintscript_container: Dict[str, List[RegisteredMaintscript]],
392 ):
393 self._plugin_metadata = plugin_metadata
394 self._plugin_source_id = plugin_source_id
395 self._maintscript_container = maintscript_container
397 @classmethod
398 def _apply_condition_to_script(
399 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None
400 ) -> str:
401 return run_snippet
403 def _append_script(
404 self,
405 caller_name: str,
406 maintscript: Maintscript,
407 full_script: str,
408 /,
409 perform_substitution: bool = True,
410 ) -> None:
411 if self._plugin_source_id not in self._maintscript_container:
412 self._maintscript_container[self._plugin_source_id] = []
413 self._maintscript_container[self._plugin_source_id].append(
414 RegisteredMaintscript(
415 maintscript,
416 caller_name,
417 full_script,
418 perform_substitution,
419 )
420 )
423class RegisteredMetadataImpl(RegisteredMetadata):
424 __slots__ = (
425 "_substvars",
426 "_triggers",
427 "_maintscripts",
428 )
430 def __init__(
431 self,
432 substvars: Substvars,
433 triggers: List[RegisteredTrigger],
434 maintscripts: List[RegisteredMaintscript],
435 ) -> None:
436 self._substvars = substvars
437 self._triggers = triggers
438 self._maintscripts = maintscripts
440 @property
441 def substvars(self) -> Substvars:
442 return self._substvars
444 @property
445 def triggers(self) -> List[RegisteredTrigger]:
446 return self._triggers
448 def maintscripts(
449 self,
450 *,
451 maintscript: Optional[Maintscript] = None,
452 ) -> List[RegisteredMaintscript]:
453 if maintscript is None:
454 return self._maintscripts
455 return [m for m in self._maintscripts if m.maintscript == maintscript]
458class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
459 __slots__ = ("_maintscript_container",)
461 def __init__(
462 self,
463 plugin_metadata: DebputyPluginMetadata,
464 plugin_source_id: str,
465 context: PackageProcessingContext,
466 ) -> None:
467 super().__init__(
468 plugin_metadata,
469 plugin_source_id,
470 context,
471 {},
472 FlushableSubstvars(),
473 (None, None),
474 )
475 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
477 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
478 return MaintscriptAccessorTestProvider(
479 self._plugin_metadata,
480 self._plugin_source_id,
481 self._maintscript_container,
482 )
484 def registered_metadata(self) -> RegisteredMetadata:
485 return RegisteredMetadataImpl(
486 self._substvars,
487 [
488 RegisteredTrigger.from_plugin_provided_trigger(t)
489 for t in self._triggers.values()
490 if t.provider_source_id == self._plugin_source_id
491 ],
492 self._maintscript_container.get(self._plugin_source_id, []),
493 )
496class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
497 __slots__ = ("_service_manager_details", "_service_definitions")
499 def __init__(
500 self,
501 service_manager_details: ServiceManagerDetails,
502 detected_services: List[DetectedService[DSD]],
503 ) -> None:
504 self._service_manager_details = service_manager_details
505 self._service_definitions = detected_services
507 def register_service(
508 self,
509 path: VirtualPath,
510 name: Union[str, List[str]],
511 *,
512 type_of_service: str = "service", # "timer", etc.
513 service_scope: str = "system",
514 enable_by_default: bool = True,
515 start_by_default: bool = True,
516 default_upgrade_rule: ServiceUpgradeRule = "restart",
517 service_context: Optional[DSD] = None,
518 ) -> None:
519 names = name if isinstance(name, list) else [name]
520 if len(names) < 1: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 raise ValueError(
522 f"The service must have at least one name - {path.absolute} did not have any"
523 )
524 self._service_definitions.append(
525 DetectedService(
526 path,
527 names,
528 type_of_service,
529 service_scope,
530 enable_by_default,
531 start_by_default,
532 default_upgrade_rule,
533 service_context,
534 )
535 )
538@contextlib.contextmanager
539def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
540 if fs_root.is_read_write: 540 ↛ 546line 540 didn't jump to line 546 because the condition on line 540 was always true
541 assert isinstance(fs_root, FSRootDir)
542 fs_root.is_read_write = False
543 yield fs_root
544 fs_root.is_read_write = True
545 else:
546 yield fs_root
549class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
550 def __init__(
551 self,
552 plugin_name: str,
553 feature_set: PluginProvidedFeatureSet,
554 substitution: SubstitutionImpl,
555 ) -> None:
556 self._feature_set = feature_set
557 self._plugin_name = plugin_name
558 self._packager_provided_files: Optional[
559 Dict[str, RegisteredPackagerProvidedFile]
560 ] = None
561 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
562 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
563 self._substitution = substitution
564 assert plugin_name in self._feature_set.plugin_data
566 @property
567 def _plugin_metadata(self) -> DebputyPluginMetadata:
568 return self._feature_set.plugin_data[self._plugin_name]
570 def packager_provided_files_by_stem(
571 self,
572 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
573 ppf = self._packager_provided_files
574 if ppf is None:
575 result: Dict[str, RegisteredPackagerProvidedFile] = {}
576 for spec in self._feature_set.packager_provided_files.values():
577 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
578 continue
579 # Registered as a virtual subclass, so this should always be True
580 assert isinstance(spec, RegisteredPackagerProvidedFile)
581 result[spec.stem] = spec
582 self._packager_provided_files = result
583 ppf = result
584 return ppf
586 def run_metadata_detector(
587 self,
588 metadata_detector_id: str,
589 fs_root: VirtualPath,
590 context: Optional[PackageProcessingContext] = None,
591 ) -> RegisteredMetadata:
592 if fs_root.parent_dir is not None: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true
593 raise ValueError("Provided path must be the file system root.")
594 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
595 matching_detectors = [
596 d for d in detectors if d.detector_id == metadata_detector_id
597 ]
598 if len(matching_detectors) != 1: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 assert not matching_detectors
600 raise ValueError(
601 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
602 f' "{metadata_detector_id}"'
603 )
604 if context is None:
605 context = package_metadata_context()
606 detector = matching_detectors[0]
607 if not detector.applies_to(context.binary_package):
608 raise ValueError(
609 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
610 " given package. Consider using `package_metadata_context()` to emulate a binary package"
611 " with the correct specification. As an example: "
612 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
613 " package."
614 )
616 ctrl = BinaryCtrlAccessorTestProvider(
617 self._plugin_metadata,
618 metadata_detector_id,
619 context,
620 )
621 with _read_only_fs_root(fs_root) as ro_root:
622 detector.run_detector(
623 ro_root,
624 ctrl,
625 context,
626 )
627 return ctrl.registered_metadata()
629 def run_package_processor(
630 self,
631 package_processor_id: str,
632 fs_root: VirtualPath,
633 context: Optional[PackageProcessingContext] = None,
634 ) -> None:
635 if fs_root.parent_dir is not None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 raise ValueError("Provided path must be the file system root.")
637 pp_key = (self._plugin_name, package_processor_id)
638 package_processor = self._feature_set.all_package_processors.get(pp_key)
639 if package_processor is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 raise ValueError(
641 f"The plugin {self._plugin_name} did not provide a package processor with ID"
642 f' "{package_processor_id}"'
643 )
644 if context is None: 644 ↛ 646line 644 didn't jump to line 646 because the condition on line 644 was always true
645 context = package_metadata_context()
646 if not fs_root.is_read_write: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 raise ValueError(
648 "The provided fs_root is read-only and it must be read-write for package processor"
649 )
650 if not package_processor.applies_to(context.binary_package): 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 raise ValueError(
652 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
653 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
654 " package with the correct specification. As an example: "
655 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
656 " package."
657 )
658 package_processor.run_package_processor(
659 fs_root,
660 None,
661 context,
662 )
664 @property
665 def declared_manifest_variables(self) -> FrozenSet[str]:
666 return frozenset(
667 {
668 k
669 for k, v in self._feature_set.manifest_variables.items()
670 if v.plugin_metadata.plugin_name == self._plugin_name
671 }
672 )
674 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
675 issues = []
676 for adr in self._feature_set.auto_discard_rules.values():
677 if adr.plugin_metadata.plugin_name != self._plugin_name: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 continue
679 for idx, example in enumerate(adr.examples):
680 result = process_discard_rule_example(
681 adr,
682 example,
683 )
684 if result.inconsistent_paths:
685 issues.append(
686 ADRExampleIssue(
687 adr.name,
688 idx,
689 [
690 x.absolute + ("/" if x.is_dir else "")
691 for x in result.inconsistent_paths
692 ],
693 )
694 )
695 return issues
697 def run_service_detection_and_integrations(
698 self,
699 service_manager: str,
700 fs_root: VirtualPath,
701 context: Optional[PackageProcessingContext] = None,
702 *,
703 service_context_type_hint: Optional[Type[DSD]] = None,
704 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]:
705 if fs_root.parent_dir is not None: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true
706 raise ValueError("Provided path must be the file system root.")
707 try:
708 service_manager_details = self._feature_set.service_managers[
709 service_manager
710 ]
711 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true
712 raise KeyError(service_manager)
713 except KeyError:
714 raise ValueError(
715 f"The plugin {self._plugin_name} does not provide a"
716 f" service manager called {service_manager}"
717 ) from None
719 if context is None: 719 ↛ 721line 719 didn't jump to line 721 because the condition on line 719 was always true
720 context = package_metadata_context()
721 detected_services: List[DetectedService[DSD]] = []
722 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
723 service_manager_details.service_detector(
724 fs_root,
725 registry,
726 context,
727 )
728 ctrl = BinaryCtrlAccessorTestProvider(
729 self._plugin_metadata,
730 service_manager_details.service_manager,
731 context,
732 )
733 if detected_services:
734 service_definitions = [
735 ServiceDefinitionImpl(
736 ds.names[0],
737 ds.names,
738 ds.path,
739 ds.type_of_service,
740 ds.service_scope,
741 ds.enable_by_default,
742 ds.start_by_default,
743 ds.default_upgrade_rule,
744 self._plugin_name,
745 True,
746 ds.service_context,
747 )
748 for ds in detected_services
749 ]
750 service_manager_details.service_integrator(
751 service_definitions,
752 ctrl,
753 context,
754 )
755 return detected_services, ctrl.registered_metadata()
757 def manifest_variables(
758 self,
759 *,
760 resolution_context: Optional[VariableContext] = None,
761 mocked_variables: Optional[Mapping[str, str]] = None,
762 ) -> Mapping[str, str]:
763 valid_manifest_variables = frozenset(
764 {
765 n
766 for n, v in self._feature_set.manifest_variables.items()
767 if v.plugin_metadata.plugin_name == self._plugin_name
768 }
769 )
770 if resolution_context is None:
771 resolution_context = manifest_variable_resolution_context()
772 substitution = self._substitution.copy_for_subst_test(
773 self._feature_set,
774 resolution_context,
775 extra_substitutions=mocked_variables,
776 )
777 return SubstitutionTable(
778 valid_manifest_variables,
779 substitution,
780 )
783class SubstitutionTable(Mapping[str, str]):
784 def __init__(
785 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution
786 ) -> None:
787 self._valid_manifest_variables = valid_manifest_variables
788 self._resolved: Set[str] = set()
789 self._substitution = substitution
791 def __contains__(self, item: object) -> bool:
792 return item in self._valid_manifest_variables
794 def __getitem__(self, key: str) -> str:
795 if key not in self._valid_manifest_variables: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true
796 raise KeyError(key)
797 v = self._substitution.substitute(
798 "{{" + key + "}}", f"test of manifest variable `{key}`"
799 )
800 self._resolved.add(key)
801 return v
803 def __len__(self) -> int:
804 return len(self._valid_manifest_variables)
806 def __iter__(self) -> Iterator[str]:
807 return iter(self._valid_manifest_variables)
809 def keys(self) -> KeysView[str]:
810 return cast("KeysView[str]", self._valid_manifest_variables)