Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%
298 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from importlib.resources.abc import Traversable
6from io import BytesIO
7from pathlib import Path
8from typing import (
9 Mapping,
10 Dict,
11 Optional,
12 Tuple,
13 List,
14 cast,
15 FrozenSet,
16 Sequence,
17 Union,
18 Type,
19 Iterator,
20 Set,
21 KeysView,
22 Callable,
23)
25from debian.deb822 import Deb822
26from debian.substvars import Substvars
28from debputy import DEBPUTY_PLUGIN_ROOT_DIR
29from debputy.architecture_support import faked_arch_table
30from debputy.filesystem_scan import FSROOverlay, FSRootDir
31from debputy.packages import BinaryPackage
32from debputy.plugin.api import (
33 PluginInitializationEntryPoint,
34 VirtualPath,
35 PackageProcessingContext,
36 DpkgTriggerType,
37 Maintscript,
38)
39from debputy.plugin.api.example_processing import process_discard_rule_example
40from debputy.plugin.api.impl import (
41 plugin_metadata_for_debputys_own_plugin,
42 DebputyPluginInitializerProvider,
43 parse_json_plugin_desc,
44 MaintscriptAccessorProviderBase,
45 BinaryCtrlAccessorProviderBase,
46 PLUGIN_TEST_SUFFIX,
47 find_json_plugin,
48 ServiceDefinitionImpl,
49)
50from debputy.plugin.api.impl_types import (
51 PackagerProvidedFileClassSpec,
52 DebputyPluginMetadata,
53 PluginProvidedTrigger,
54 ServiceManagerDetails,
55)
56from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
57from debputy.plugin.api.spec import (
58 MaintscriptAccessor,
59 FlushableSubstvars,
60 ServiceRegistry,
61 DSD,
62 ServiceUpgradeRule,
63)
64from debputy.plugin.api.test_api.test_spec import (
65 InitializedPluginUnderTest,
66 RegisteredPackagerProvidedFile,
67 RegisteredTrigger,
68 RegisteredMaintscript,
69 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
70 ADRExampleIssue,
71 DetectedService,
72 RegisteredMetadata,
73)
74from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features
75from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
76from debputy.util import package_cross_check_precheck
78RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
81@dataclasses.dataclass(frozen=True, slots=True)
82class PackageProcessingContextTestProvider(PackageProcessingContext):
83 binary_package: BinaryPackage
84 binary_package_version: str
85 related_udeb_package: Optional[BinaryPackage]
86 related_udeb_package_version: Optional[str]
87 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]]
90def _initialize_plugin_under_test(
91 plugin_metadata: DebputyPluginMetadata,
92 load_debputy_plugin: bool = True,
93) -> "InitializedPluginUnderTest":
94 feature_set = PluginProvidedFeatureSet()
95 substitution = SubstitutionImpl(
96 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
97 variable_context=VariableContext(
98 FSROOverlay.create_root_dir("debian", "debian"),
99 ),
100 plugin_feature_set=feature_set,
101 )
103 if load_debputy_plugin:
104 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
105 initialize_debputy_features
106 )
107 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
108 debputy_provider = DebputyPluginInitializerProvider(
109 debputy_plugin_metadata,
110 feature_set,
111 substitution,
112 )
113 debputy_provider.load_plugin()
115 plugin_under_test_provider = DebputyPluginInitializerProvider(
116 plugin_metadata,
117 feature_set,
118 substitution,
119 )
120 plugin_under_test_provider.load_plugin()
122 return InitializedPluginUnderTestImpl(
123 plugin_metadata.plugin_name,
124 feature_set,
125 substitution,
126 )
129def _auto_load_plugin_from_filename(
130 py_test_filename: str,
131) -> "InitializedPluginUnderTest":
132 dirname, basename = os.path.split(py_test_filename)
133 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
135 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
136 if test_location == "uninstalled":
137 json_basename = f"{plugin_name}.json"
138 json_desc_file = os.path.join(dirname, json_basename)
139 if "/" not in json_desc_file: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 json_desc_file = f"./{json_desc_file}"
142 if os.path.isfile(json_desc_file): 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true
143 return _initialize_plugin_from_desc(json_desc_file)
145 json_desc_file_in = f"{json_desc_file}.in"
146 if os.path.isfile(json_desc_file_in):
147 return _initialize_plugin_from_desc(json_desc_file)
148 raise FileNotFoundError(
149 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
150 f" {json_desc_file} or {json_desc_file_in}"
151 )
153 if test_location == "installed": 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true
154 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
155 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
157 raise ValueError(
158 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
159 ' unset OR one of "installed", "uninstalled".'
160 )
163def initialize_plugin_under_test(
164 *,
165 plugin_desc_file: Optional[str] = None,
166) -> "InitializedPluginUnderTest":
167 """Load and initialize a plugin for testing it
169 This method will load the plugin via plugin description, which is the method that `debputy` does at
170 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
171 of the flow).
173 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
174 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
175 This works for "single-file" plugins, where the description file and the test are right next to
176 each other.
177 :return: The loaded plugin for testing
178 """
179 if plugin_desc_file is None:
180 caller_file = inspect.stack()[1].filename
181 return _auto_load_plugin_from_filename(caller_file)
182 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 raise RuntimeError(
184 "Running the test against an installed plugin does not work when"
185 " plugin_desc_file is provided. Please skip this test. You can "
186 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
187 " conditional for this purpose."
188 )
189 return _initialize_plugin_from_desc(plugin_desc_file)
192def _initialize_plugin_from_desc(
193 desc_file: str,
194) -> "InitializedPluginUnderTest":
195 if not desc_file.endswith((".json", ".json.in")): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise ValueError("The plugin file must end with .json or .json.in")
198 plugin_metadata = parse_json_plugin_desc(desc_file)
200 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
203def initialize_plugin_under_test_from_inline_json(
204 plugin_name: str,
205 json_content: str,
206) -> "InitializedPluginUnderTest":
207 with BytesIO(json_content.encode("utf-8")) as fd:
208 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
210 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
213def initialize_plugin_under_test_preloaded(
214 api_compat_version: int,
215 plugin_initializer: PluginInitializationEntryPoint,
216 /,
217 plugin_name: str = "plugin-under-test",
218 load_debputy_plugin: bool = True,
219 plugin_doc_path_resolver: Callable[
220 [], Optional[Union[str, Traversable, Path]]
221 ] = lambda: None,
222) -> "InitializedPluginUnderTest":
223 """Internal API: Initialize a plugin for testing without loading it from a file
225 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
226 description file is correct. Notably, any feature provided via the JSON description file will
227 **NOT** be visible for the test.
229 This API is mostly useful for testing parts of debputy itself.
231 :param api_compat_version: The API version the plugin was written for. Use the same version as the
232 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
233 :param plugin_initializer: The entry point of the plugin
234 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
235 use a test name and version. However, you can explicitly set if you want the real name/version.
236 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
237 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
238 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
239 to enable debputy to use this method for self testing.
240 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The
241 default is to not load the documentation.
242 :return: The loaded plugin for testing
243 """
245 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 raise RuntimeError(
247 "Running the test against an installed plugin does not work when"
248 " the plugin is preload. Please skip this test. You can "
249 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
250 " conditional for this purpose."
251 )
253 plugin_metadata = DebputyPluginMetadata(
254 plugin_name=plugin_name,
255 api_compat_version=api_compat_version,
256 plugin_initializer=plugin_initializer,
257 plugin_loader=None,
258 plugin_path="<loaded-via-test>",
259 plugin_doc_path_resolver=plugin_doc_path_resolver,
260 )
262 return _initialize_plugin_under_test(
263 plugin_metadata,
264 load_debputy_plugin=load_debputy_plugin,
265 )
268class _MockArchTable:
269 @staticmethod
270 def matches_architecture(_a: str, _b: str) -> bool:
271 return True
274FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable())
275del _MockArchTable
278def package_metadata_context(
279 *,
280 host_arch: str = "amd64",
281 package_fields: Optional[Dict[str, str]] = None,
282 related_udeb_package_fields: Optional[Dict[str, str]] = None,
283 binary_package_version: str = "1.0-1",
284 related_udeb_package_version: Optional[str] = None,
285 should_be_acted_on: bool = True,
286 related_udeb_fs_root: Optional[VirtualPath] = None,
287 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(),
288) -> PackageProcessingContext:
289 process_table = faked_arch_table(host_arch)
290 f = {
291 "Package": "foo",
292 "Architecture": "any",
293 }
294 if package_fields is not None:
295 f.update(package_fields)
297 bin_package = BinaryPackage(
298 Deb822(f),
299 process_table,
300 FAKE_DPKG_QUERY_TABLE,
301 is_main_package=True,
302 should_be_acted_on=should_be_acted_on,
303 )
304 udeb_package = None
305 if related_udeb_package_fields is not None: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 uf = dict(related_udeb_package_fields)
307 uf.setdefault("Package", f'{f["Package"]}-udeb')
308 uf.setdefault("Architecture", f["Architecture"])
309 uf.setdefault("Package-Type", "udeb")
310 udeb_package = BinaryPackage(
311 Deb822(uf),
312 process_table,
313 FAKE_DPKG_QUERY_TABLE,
314 is_main_package=False,
315 should_be_acted_on=True,
316 )
317 if related_udeb_package_version is None:
318 related_udeb_package_version = binary_package_version
319 if accessible_package_roots:
320 apr = []
321 for fields, apr_fs_root in accessible_package_roots:
322 apr_fields = Deb822(dict(fields))
323 if "Package" not in apr_fields: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 raise ValueError(
325 "Missing mandatory Package field in member of accessible_package_roots"
326 )
327 if "Architecture" not in apr_fields: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise ValueError(
329 "Missing mandatory Architecture field in member of accessible_package_roots"
330 )
331 apr_package = BinaryPackage(
332 apr_fields,
333 process_table,
334 FAKE_DPKG_QUERY_TABLE,
335 is_main_package=False,
336 should_be_acted_on=True,
337 )
338 r = package_cross_check_precheck(bin_package, apr_package)
339 if not r[0]: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 raise ValueError(
341 f"{apr_package.name} would not be accessible for {bin_package.name}"
342 )
343 apr.append((apr_package, apr_fs_root))
345 if related_udeb_fs_root is not None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 if udeb_package is None:
347 raise ValueError(
348 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
349 )
350 r = package_cross_check_precheck(bin_package, udeb_package)
351 if not r[0]:
352 raise ValueError(
353 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
354 " related_udeb_fs_root is irrelevant"
355 )
356 apr.append(udeb_package)
357 apr = tuple(apr)
358 else:
359 apr = tuple()
361 return PackageProcessingContextTestProvider(
362 binary_package=bin_package,
363 related_udeb_package=udeb_package,
364 binary_package_version=binary_package_version,
365 related_udeb_package_version=related_udeb_package_version,
366 accessible_package_roots=lambda: apr,
367 )
370def manifest_variable_resolution_context(
371 *,
372 debian_dir: Optional[VirtualPath] = None,
373) -> VariableContext:
374 if debian_dir is None:
375 debian_dir = FSRootDir()
377 return VariableContext(debian_dir)
380class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
381 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
383 def __init__(
384 self,
385 plugin_metadata: DebputyPluginMetadata,
386 plugin_source_id: str,
387 maintscript_container: Dict[str, List[RegisteredMaintscript]],
388 ):
389 self._plugin_metadata = plugin_metadata
390 self._plugin_source_id = plugin_source_id
391 self._maintscript_container = maintscript_container
393 @classmethod
394 def _apply_condition_to_script(
395 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None
396 ) -> str:
397 return run_snippet
399 def _append_script(
400 self,
401 caller_name: str,
402 maintscript: Maintscript,
403 full_script: str,
404 /,
405 perform_substitution: bool = True,
406 ) -> None:
407 if self._plugin_source_id not in self._maintscript_container:
408 self._maintscript_container[self._plugin_source_id] = []
409 self._maintscript_container[self._plugin_source_id].append(
410 RegisteredMaintscript(
411 maintscript,
412 caller_name,
413 full_script,
414 perform_substitution,
415 )
416 )
419class RegisteredMetadataImpl(RegisteredMetadata):
420 __slots__ = (
421 "_substvars",
422 "_triggers",
423 "_maintscripts",
424 )
426 def __init__(
427 self,
428 substvars: Substvars,
429 triggers: List[RegisteredTrigger],
430 maintscripts: List[RegisteredMaintscript],
431 ) -> None:
432 self._substvars = substvars
433 self._triggers = triggers
434 self._maintscripts = maintscripts
436 @property
437 def substvars(self) -> Substvars:
438 return self._substvars
440 @property
441 def triggers(self) -> List[RegisteredTrigger]:
442 return self._triggers
444 def maintscripts(
445 self,
446 *,
447 maintscript: Optional[Maintscript] = None,
448 ) -> List[RegisteredMaintscript]:
449 if maintscript is None:
450 return self._maintscripts
451 return [m for m in self._maintscripts if m.maintscript == maintscript]
454class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
455 __slots__ = ("_maintscript_container",)
457 def __init__(
458 self,
459 plugin_metadata: DebputyPluginMetadata,
460 plugin_source_id: str,
461 context: PackageProcessingContext,
462 ) -> None:
463 super().__init__(
464 plugin_metadata,
465 plugin_source_id,
466 context,
467 {},
468 FlushableSubstvars(),
469 (None, None),
470 )
471 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
473 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
474 return MaintscriptAccessorTestProvider(
475 self._plugin_metadata,
476 self._plugin_source_id,
477 self._maintscript_container,
478 )
480 def registered_metadata(self) -> RegisteredMetadata:
481 return RegisteredMetadataImpl(
482 self._substvars,
483 [
484 RegisteredTrigger.from_plugin_provided_trigger(t)
485 for t in self._triggers.values()
486 if t.provider_source_id == self._plugin_source_id
487 ],
488 self._maintscript_container.get(self._plugin_source_id, []),
489 )
492class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
493 __slots__ = ("_service_manager_details", "_service_definitions")
495 def __init__(
496 self,
497 service_manager_details: ServiceManagerDetails,
498 detected_services: List[DetectedService[DSD]],
499 ) -> None:
500 self._service_manager_details = service_manager_details
501 self._service_definitions = detected_services
503 def register_service(
504 self,
505 path: VirtualPath,
506 name: Union[str, List[str]],
507 *,
508 type_of_service: str = "service", # "timer", etc.
509 service_scope: str = "system",
510 enable_by_default: bool = True,
511 start_by_default: bool = True,
512 default_upgrade_rule: ServiceUpgradeRule = "restart",
513 service_context: Optional[DSD] = None,
514 ) -> None:
515 names = name if isinstance(name, list) else [name]
516 if len(names) < 1: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 raise ValueError(
518 f"The service must have at least one name - {path.absolute} did not have any"
519 )
520 self._service_definitions.append(
521 DetectedService(
522 path,
523 names,
524 type_of_service,
525 service_scope,
526 enable_by_default,
527 start_by_default,
528 default_upgrade_rule,
529 service_context,
530 )
531 )
534@contextlib.contextmanager
535def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
536 if fs_root.is_read_write: 536 ↛ 542line 536 didn't jump to line 542 because the condition on line 536 was always true
537 assert isinstance(fs_root, FSRootDir)
538 fs_root.is_read_write = False
539 yield fs_root
540 fs_root.is_read_write = True
541 else:
542 yield fs_root
545class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
546 def __init__(
547 self,
548 plugin_name: str,
549 feature_set: PluginProvidedFeatureSet,
550 substitution: SubstitutionImpl,
551 ) -> None:
552 self._feature_set = feature_set
553 self._plugin_name = plugin_name
554 self._packager_provided_files: Optional[
555 Dict[str, RegisteredPackagerProvidedFile]
556 ] = None
557 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
558 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {}
559 self._substitution = substitution
560 assert plugin_name in self._feature_set.plugin_data
562 @property
563 def _plugin_metadata(self) -> DebputyPluginMetadata:
564 return self._feature_set.plugin_data[self._plugin_name]
566 def packager_provided_files_by_stem(
567 self,
568 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
569 ppf = self._packager_provided_files
570 if ppf is None:
571 result: Dict[str, RegisteredPackagerProvidedFile] = {}
572 for spec in self._feature_set.packager_provided_files.values():
573 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
574 continue
575 # Registered as a virtual subclass, so this should always be True
576 assert isinstance(spec, RegisteredPackagerProvidedFile)
577 result[spec.stem] = spec
578 self._packager_provided_files = result
579 ppf = result
580 return ppf
582 def run_metadata_detector(
583 self,
584 metadata_detector_id: str,
585 fs_root: VirtualPath,
586 context: Optional[PackageProcessingContext] = None,
587 ) -> RegisteredMetadata:
588 if fs_root.parent_dir is not None: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true
589 raise ValueError("Provided path must be the file system root.")
590 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
591 matching_detectors = [
592 d for d in detectors if d.detector_id == metadata_detector_id
593 ]
594 if len(matching_detectors) != 1: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 assert not matching_detectors
596 raise ValueError(
597 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
598 f' "{metadata_detector_id}"'
599 )
600 if context is None:
601 context = package_metadata_context()
602 detector = matching_detectors[0]
603 if not detector.applies_to(context.binary_package):
604 raise ValueError(
605 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
606 " given package. Consider using `package_metadata_context()` to emulate a binary package"
607 " with the correct specification. As an example: "
608 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
609 " package."
610 )
612 ctrl = BinaryCtrlAccessorTestProvider(
613 self._plugin_metadata,
614 metadata_detector_id,
615 context,
616 )
617 with _read_only_fs_root(fs_root) as ro_root:
618 detector.run_detector(
619 ro_root,
620 ctrl,
621 context,
622 )
623 return ctrl.registered_metadata()
625 def run_package_processor(
626 self,
627 package_processor_id: str,
628 fs_root: VirtualPath,
629 context: Optional[PackageProcessingContext] = None,
630 ) -> None:
631 if fs_root.parent_dir is not None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 raise ValueError("Provided path must be the file system root.")
633 pp_key = (self._plugin_name, package_processor_id)
634 package_processor = self._feature_set.all_package_processors.get(pp_key)
635 if package_processor is None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 raise ValueError(
637 f"The plugin {self._plugin_name} did not provide a package processor with ID"
638 f' "{package_processor_id}"'
639 )
640 if context is None: 640 ↛ 642line 640 didn't jump to line 642 because the condition on line 640 was always true
641 context = package_metadata_context()
642 if not fs_root.is_read_write: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 raise ValueError(
644 "The provided fs_root is read-only and it must be read-write for package processor"
645 )
646 if not package_processor.applies_to(context.binary_package): 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 raise ValueError(
648 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
649 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
650 " package with the correct specification. As an example: "
651 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
652 " package."
653 )
654 package_processor.run_package_processor(
655 fs_root,
656 None,
657 context,
658 )
660 @property
661 def declared_manifest_variables(self) -> FrozenSet[str]:
662 return frozenset(
663 {
664 k
665 for k, v in self._feature_set.manifest_variables.items()
666 if v.plugin_metadata.plugin_name == self._plugin_name
667 }
668 )
670 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
671 issues = []
672 for adr in self._feature_set.auto_discard_rules.values():
673 if adr.plugin_metadata.plugin_name != self._plugin_name: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 continue
675 for idx, example in enumerate(adr.examples):
676 result = process_discard_rule_example(
677 adr,
678 example,
679 )
680 if result.inconsistent_paths:
681 issues.append(
682 ADRExampleIssue(
683 adr.name,
684 idx,
685 [
686 x.absolute + ("/" if x.is_dir else "")
687 for x in result.inconsistent_paths
688 ],
689 )
690 )
691 return issues
693 def run_service_detection_and_integrations(
694 self,
695 service_manager: str,
696 fs_root: VirtualPath,
697 context: Optional[PackageProcessingContext] = None,
698 *,
699 service_context_type_hint: Optional[Type[DSD]] = None,
700 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]:
701 if fs_root.parent_dir is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 raise ValueError("Provided path must be the file system root.")
703 try:
704 service_manager_details = self._feature_set.service_managers[
705 service_manager
706 ]
707 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 raise KeyError(service_manager)
709 except KeyError:
710 raise ValueError(
711 f"The plugin {self._plugin_name} does not provide a"
712 f" service manager called {service_manager}"
713 ) from None
715 if context is None: 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was always true
716 context = package_metadata_context()
717 detected_services: List[DetectedService[DSD]] = []
718 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
719 service_manager_details.service_detector(
720 fs_root,
721 registry,
722 context,
723 )
724 ctrl = BinaryCtrlAccessorTestProvider(
725 self._plugin_metadata,
726 service_manager_details.service_manager,
727 context,
728 )
729 if detected_services:
730 service_definitions = [
731 ServiceDefinitionImpl(
732 ds.names[0],
733 ds.names,
734 ds.path,
735 ds.type_of_service,
736 ds.service_scope,
737 ds.enable_by_default,
738 ds.start_by_default,
739 ds.default_upgrade_rule,
740 self._plugin_name,
741 True,
742 ds.service_context,
743 )
744 for ds in detected_services
745 ]
746 service_manager_details.service_integrator(
747 service_definitions,
748 ctrl,
749 context,
750 )
751 return detected_services, ctrl.registered_metadata()
753 def manifest_variables(
754 self,
755 *,
756 resolution_context: Optional[VariableContext] = None,
757 mocked_variables: Optional[Mapping[str, str]] = None,
758 ) -> Mapping[str, str]:
759 valid_manifest_variables = frozenset(
760 {
761 n
762 for n, v in self._feature_set.manifest_variables.items()
763 if v.plugin_metadata.plugin_name == self._plugin_name
764 }
765 )
766 if resolution_context is None:
767 resolution_context = manifest_variable_resolution_context()
768 substitution = self._substitution.copy_for_subst_test(
769 self._feature_set,
770 resolution_context,
771 extra_substitutions=mocked_variables,
772 )
773 return SubstitutionTable(
774 valid_manifest_variables,
775 substitution,
776 )
779class SubstitutionTable(Mapping[str, str]):
780 def __init__(
781 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution
782 ) -> None:
783 self._valid_manifest_variables = valid_manifest_variables
784 self._resolved: Set[str] = set()
785 self._substitution = substitution
787 def __contains__(self, item: object) -> bool:
788 return item in self._valid_manifest_variables
790 def __getitem__(self, key: str) -> str:
791 if key not in self._valid_manifest_variables: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true
792 raise KeyError(key)
793 v = self._substitution.substitute(
794 "{{" + key + "}}", f"test of manifest variable `{key}`"
795 )
796 self._resolved.add(key)
797 return v
799 def __len__(self) -> int:
800 return len(self._valid_manifest_variables)
802 def __iter__(self) -> Iterator[str]:
803 return iter(self._valid_manifest_variables)
805 def keys(self) -> KeysView[str]:
806 return cast("KeysView[str]", self._valid_manifest_variables)