Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%
300 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import contextlib
2import dataclasses
3import inspect
4import os.path
5from importlib.resources.abc import Traversable
6from io import BytesIO
7from pathlib import Path
8from typing import (
9 Dict,
10 Optional,
11 Tuple,
12 List,
13 cast,
14 FrozenSet,
15 Union,
16 Type,
17 Set,
18)
19from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable
21from debian.deb822 import Deb822
22from debian.debian_support import DpkgArchTable
23from debian.substvars import Substvars
25from debputy import DEBPUTY_PLUGIN_ROOT_DIR
26from debputy.architecture_support import faked_arch_table
27from debputy.filesystem_scan import FSROOverlay, FSRootDir
28from debputy.packages import BinaryPackage
29from debputy.plugin.api import (
30 PluginInitializationEntryPoint,
31 VirtualPath,
32 PackageProcessingContext,
33 DpkgTriggerType,
34 Maintscript,
35)
36from debputy.plugin.api.example_processing import process_discard_rule_example
37from debputy.plugin.api.impl import (
38 plugin_metadata_for_debputys_own_plugin,
39 DebputyPluginInitializerProvider,
40 parse_json_plugin_desc,
41 MaintscriptAccessorProviderBase,
42 BinaryCtrlAccessorProviderBase,
43 PLUGIN_TEST_SUFFIX,
44 find_json_plugin,
45 ServiceDefinitionImpl,
46)
47from debputy.plugin.api.impl_types import (
48 PackagerProvidedFileClassSpec,
49 DebputyPluginMetadata,
50 PluginProvidedTrigger,
51 ServiceManagerDetails,
52)
53from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
54from debputy.plugin.api.spec import (
55 MaintscriptAccessor,
56 FlushableSubstvars,
57 ServiceRegistry,
58 DSD,
59 ServiceUpgradeRule,
60)
61from debputy.plugin.api.test_api.test_spec import (
62 InitializedPluginUnderTest,
63 RegisteredPackagerProvidedFile,
64 RegisteredTrigger,
65 RegisteredMaintscript,
66 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS,
67 ADRExampleIssue,
68 DetectedService,
69 RegisteredMetadata,
70)
71from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features
72from debputy.substitution import SubstitutionImpl, VariableContext, Substitution
73from debputy.util import package_cross_check_precheck
75RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec)
78@dataclasses.dataclass(frozen=True, slots=True)
79class PackageProcessingContextTestProvider(PackageProcessingContext):
80 binary_package: BinaryPackage
81 binary_package_version: str
82 related_udeb_package: BinaryPackage | None
83 related_udeb_package_version: str | None
84 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]]
87def _initialize_plugin_under_test(
88 plugin_metadata: DebputyPluginMetadata,
89 load_debputy_plugin: bool = True,
90) -> "InitializedPluginUnderTest":
91 feature_set = PluginProvidedFeatureSet()
92 substitution = SubstitutionImpl(
93 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]),
94 variable_context=VariableContext(
95 FSROOverlay.create_root_dir("debian", "debian"),
96 ),
97 plugin_feature_set=feature_set,
98 )
100 if load_debputy_plugin:
101 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin(
102 initialize_debputy_features
103 )
104 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early
105 debputy_provider = DebputyPluginInitializerProvider(
106 debputy_plugin_metadata,
107 feature_set,
108 substitution,
109 )
110 debputy_provider.load_plugin()
112 plugin_under_test_provider = DebputyPluginInitializerProvider(
113 plugin_metadata,
114 feature_set,
115 substitution,
116 )
117 plugin_under_test_provider.load_plugin()
119 return InitializedPluginUnderTestImpl(
120 plugin_metadata.plugin_name,
121 feature_set,
122 substitution,
123 )
126def _auto_load_plugin_from_filename(
127 py_test_filename: str,
128) -> "InitializedPluginUnderTest":
129 dirname, basename = os.path.split(py_test_filename)
130 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-")
132 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled")
133 if test_location == "uninstalled":
134 json_basename = f"{plugin_name}.json"
135 json_desc_file = os.path.join(dirname, json_basename)
136 if "/" not in json_desc_file: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 json_desc_file = f"./{json_desc_file}"
139 if os.path.isfile(json_desc_file): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 return _initialize_plugin_from_desc(json_desc_file)
142 json_desc_file_in = f"{json_desc_file}.in"
143 if os.path.isfile(json_desc_file_in):
144 return _initialize_plugin_from_desc(json_desc_file)
145 raise FileNotFoundError(
146 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be"
147 f" {json_desc_file} or {json_desc_file_in}"
148 )
150 if test_location == "installed": 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true
151 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name)
152 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
154 raise ValueError(
155 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either'
156 ' unset OR one of "installed", "uninstalled".'
157 )
160def initialize_plugin_under_test(
161 *,
162 plugin_desc_file: str | None = None,
163) -> "InitializedPluginUnderTest":
164 """Load and initialize a plugin for testing it
166 This method will load the plugin via plugin description, which is the method that `debputy` does at
167 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part
168 of the flow).
170 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin.
171 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself.
172 This works for "single-file" plugins, where the description file and the test are right next to
173 each other.
174 :return: The loaded plugin for testing
175 """
176 if plugin_desc_file is None:
177 caller_file = inspect.stack()[1].filename
178 return _auto_load_plugin_from_filename(caller_file)
179 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 raise RuntimeError(
181 "Running the test against an installed plugin does not work when"
182 " plugin_desc_file is provided. Please skip this test. You can "
183 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
184 " conditional for this purpose."
185 )
186 return _initialize_plugin_from_desc(plugin_desc_file)
189def _initialize_plugin_from_desc(
190 desc_file: str,
191) -> "InitializedPluginUnderTest":
192 if not desc_file.endswith((".json", ".json.in")): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise ValueError("The plugin file must end with .json or .json.in")
195 plugin_metadata = parse_json_plugin_desc(desc_file)
197 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
200def initialize_plugin_under_test_from_inline_json(
201 plugin_name: str,
202 json_content: str,
203) -> "InitializedPluginUnderTest":
204 with BytesIO(json_content.encode("utf-8")) as fd:
205 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd)
207 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True)
210def initialize_plugin_under_test_preloaded(
211 api_compat_version: int,
212 plugin_initializer: PluginInitializationEntryPoint,
213 /,
214 plugin_name: str = "plugin-under-test",
215 load_debputy_plugin: bool = True,
216 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None,
217) -> "InitializedPluginUnderTest":
218 """Internal API: Initialize a plugin for testing without loading it from a file
220 This method by-passes the standard loading mechanism, meaning you will not test that your plugin
221 description file is correct. Notably, any feature provided via the JSON description file will
222 **NOT** be visible for the test.
224 This API is mostly useful for testing parts of debputy itself.
226 :param api_compat_version: The API version the plugin was written for. Use the same version as the
227 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`).
228 :param plugin_initializer: The entry point of the plugin
229 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will
230 use a test name and version. However, you can explicitly set if you want the real name/version.
231 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more
232 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making
233 the plugin unloadable in practice if such a conflict is present). This option is mostly provided
234 to enable debputy to use this method for self testing.
235 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The
236 default is to not load the documentation.
237 :return: The loaded plugin for testing
238 """
240 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise RuntimeError(
242 "Running the test against an installed plugin does not work when"
243 " the plugin is preload. Please skip this test. You can "
244 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as"
245 " conditional for this purpose."
246 )
248 plugin_metadata = DebputyPluginMetadata(
249 plugin_name=plugin_name,
250 api_compat_version=api_compat_version,
251 plugin_initializer=plugin_initializer,
252 plugin_loader=None,
253 plugin_path="<loaded-via-test>",
254 plugin_doc_path_resolver=plugin_doc_path_resolver,
255 )
257 return _initialize_plugin_under_test(
258 plugin_metadata,
259 load_debputy_plugin=load_debputy_plugin,
260 )
263class _MockArchTable:
264 @staticmethod
265 def matches_architecture(_a: str, _b: str) -> bool:
266 return True
269FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable())
270del _MockArchTable
273def package_metadata_context(
274 *,
275 host_arch: str = "amd64",
276 package_fields: dict[str, str] | None = None,
277 related_udeb_package_fields: dict[str, str] | None = None,
278 binary_package_version: str = "1.0-1",
279 related_udeb_package_version: str | None = None,
280 should_be_acted_on: bool = True,
281 related_udeb_fs_root: VirtualPath | None = None,
282 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(),
283) -> PackageProcessingContext:
284 process_table = faked_arch_table(host_arch)
285 f = {
286 "Package": "foo",
287 "Architecture": "any",
288 }
289 if package_fields is not None:
290 f.update(package_fields)
292 bin_package = BinaryPackage(
293 Deb822(f),
294 process_table,
295 FAKE_DPKG_QUERY_TABLE,
296 is_main_package=True,
297 should_be_acted_on=should_be_acted_on,
298 )
299 udeb_package = None
300 if related_udeb_package_fields is not None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 uf = dict(related_udeb_package_fields)
302 uf.setdefault("Package", f'{f["Package"]}-udeb')
303 uf.setdefault("Architecture", f["Architecture"])
304 uf.setdefault("Package-Type", "udeb")
305 udeb_package = BinaryPackage(
306 Deb822(uf),
307 process_table,
308 FAKE_DPKG_QUERY_TABLE,
309 is_main_package=False,
310 should_be_acted_on=True,
311 )
312 if related_udeb_package_version is None:
313 related_udeb_package_version = binary_package_version
314 if accessible_package_roots:
315 apr = []
316 for fields, apr_fs_root in accessible_package_roots:
317 apr_fields = Deb822(dict(fields))
318 if "Package" not in apr_fields: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 raise ValueError(
320 "Missing mandatory Package field in member of accessible_package_roots"
321 )
322 if "Architecture" not in apr_fields: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 raise ValueError(
324 "Missing mandatory Architecture field in member of accessible_package_roots"
325 )
326 apr_package = BinaryPackage(
327 apr_fields,
328 process_table,
329 FAKE_DPKG_QUERY_TABLE,
330 is_main_package=False,
331 should_be_acted_on=True,
332 )
333 r = package_cross_check_precheck(bin_package, apr_package)
334 if not r[0]: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 raise ValueError(
336 f"{apr_package.name} would not be accessible for {bin_package.name}"
337 )
338 apr.append((apr_package, apr_fs_root))
340 if related_udeb_fs_root is not None: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 if udeb_package is None:
342 raise ValueError(
343 "related_udeb_package_fields must be given when related_udeb_fs_root is given"
344 )
345 r = package_cross_check_precheck(bin_package, udeb_package)
346 if not r[0]:
347 raise ValueError(
348 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing"
349 " related_udeb_fs_root is irrelevant"
350 )
351 apr.append((udeb_package, related_udeb_fs_root))
352 final_apr = tuple(apr)
353 else:
354 final_apr = tuple()
356 return PackageProcessingContextTestProvider(
357 binary_package=bin_package,
358 related_udeb_package=udeb_package,
359 binary_package_version=binary_package_version,
360 related_udeb_package_version=related_udeb_package_version,
361 accessible_package_roots=lambda: final_apr,
362 )
365def manifest_variable_resolution_context(
366 *,
367 debian_dir: VirtualPath | None = None,
368) -> VariableContext:
369 if debian_dir is None:
370 debian_dir = FSRootDir()
372 return VariableContext(debian_dir)
375class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase):
376 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container")
378 def __init__(
379 self,
380 plugin_metadata: DebputyPluginMetadata,
381 plugin_source_id: str,
382 maintscript_container: dict[str, list[RegisteredMaintscript]],
383 ):
384 self._plugin_metadata = plugin_metadata
385 self._plugin_source_id = plugin_source_id
386 self._maintscript_container = maintscript_container
388 @classmethod
389 def _apply_condition_to_script(
390 cls, condition: str, run_snippet: str, /, indent: bool | None = None
391 ) -> str:
392 return run_snippet
394 def _append_script(
395 self,
396 caller_name: str,
397 maintscript: Maintscript,
398 full_script: str,
399 /,
400 perform_substitution: bool = True,
401 ) -> None:
402 if self._plugin_source_id not in self._maintscript_container:
403 self._maintscript_container[self._plugin_source_id] = []
404 self._maintscript_container[self._plugin_source_id].append(
405 RegisteredMaintscript(
406 maintscript,
407 caller_name,
408 full_script,
409 perform_substitution,
410 )
411 )
414class RegisteredMetadataImpl(RegisteredMetadata):
415 __slots__ = (
416 "_substvars",
417 "_triggers",
418 "_maintscripts",
419 )
421 def __init__(
422 self,
423 substvars: Substvars,
424 triggers: list[RegisteredTrigger],
425 maintscripts: list[RegisteredMaintscript],
426 ) -> None:
427 self._substvars = substvars
428 self._triggers = triggers
429 self._maintscripts = maintscripts
431 @property
432 def substvars(self) -> Substvars:
433 return self._substvars
435 @property
436 def triggers(self) -> list[RegisteredTrigger]:
437 return self._triggers
439 def maintscripts(
440 self,
441 *,
442 maintscript: Maintscript | None = None,
443 ) -> list[RegisteredMaintscript]:
444 if maintscript is None:
445 return self._maintscripts
446 return [m for m in self._maintscripts if m.maintscript == maintscript]
449class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase):
450 __slots__ = ("_maintscript_container",)
452 def __init__(
453 self,
454 plugin_metadata: DebputyPluginMetadata,
455 plugin_source_id: str,
456 context: PackageProcessingContext,
457 ) -> None:
458 super().__init__(
459 plugin_metadata,
460 plugin_source_id,
461 context,
462 {},
463 FlushableSubstvars(),
464 (None, None),
465 )
466 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
468 def _create_maintscript_accessor(self) -> MaintscriptAccessor:
469 return MaintscriptAccessorTestProvider(
470 self._plugin_metadata,
471 self._plugin_source_id,
472 self._maintscript_container,
473 )
475 def registered_metadata(self) -> RegisteredMetadata:
476 return RegisteredMetadataImpl(
477 self._substvars,
478 [
479 RegisteredTrigger.from_plugin_provided_trigger(t)
480 for t in self._triggers.values()
481 if t.provider_source_id == self._plugin_source_id
482 ],
483 self._maintscript_container.get(self._plugin_source_id, []),
484 )
487class ServiceRegistryTestImpl(ServiceRegistry[DSD]):
488 __slots__ = ("_service_manager_details", "_service_definitions")
490 def __init__(
491 self,
492 service_manager_details: ServiceManagerDetails,
493 detected_services: list[DetectedService[DSD]],
494 ) -> None:
495 self._service_manager_details = service_manager_details
496 self._service_definitions = detected_services
498 def register_service(
499 self,
500 path: VirtualPath,
501 name: str | list[str],
502 *,
503 type_of_service: str = "service", # "timer", etc.
504 service_scope: str = "system",
505 enable_by_default: bool = True,
506 start_by_default: bool = True,
507 default_upgrade_rule: ServiceUpgradeRule = "restart",
508 service_context: DSD | None = None,
509 ) -> None:
510 names = name if isinstance(name, list) else [name]
511 if len(names) < 1: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true
512 raise ValueError(
513 f"The service must have at least one name - {path.absolute} did not have any"
514 )
515 self._service_definitions.append(
516 DetectedService(
517 path,
518 names,
519 type_of_service,
520 service_scope,
521 enable_by_default,
522 start_by_default,
523 default_upgrade_rule,
524 service_context,
525 )
526 )
529@contextlib.contextmanager
530def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]:
531 if fs_root.is_read_write: 531 ↛ 537line 531 didn't jump to line 537 because the condition on line 531 was always true
532 assert isinstance(fs_root, FSRootDir)
533 fs_root.is_read_write = False
534 yield fs_root
535 fs_root.is_read_write = True
536 else:
537 yield fs_root
540class InitializedPluginUnderTestImpl(InitializedPluginUnderTest):
541 def __init__(
542 self,
543 plugin_name: str,
544 feature_set: PluginProvidedFeatureSet,
545 substitution: SubstitutionImpl,
546 ) -> None:
547 self._feature_set = feature_set
548 self._plugin_name = plugin_name
549 self._packager_provided_files: None | (
550 dict[str, RegisteredPackagerProvidedFile]
551 ) = None
552 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {}
553 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {}
554 self._substitution = substitution
555 assert plugin_name in self._feature_set.plugin_data
557 @property
558 def _plugin_metadata(self) -> DebputyPluginMetadata:
559 return self._feature_set.plugin_data[self._plugin_name]
561 def packager_provided_files_by_stem(
562 self,
563 ) -> Mapping[str, RegisteredPackagerProvidedFile]:
564 ppf = self._packager_provided_files
565 if ppf is None:
566 result: dict[str, RegisteredPackagerProvidedFile] = {}
567 for spec in self._feature_set.packager_provided_files.values():
568 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name:
569 continue
570 # Registered as a virtual subclass, so this should always be True
571 assert isinstance(spec, RegisteredPackagerProvidedFile)
572 result[spec.stem] = spec
573 self._packager_provided_files = result
574 ppf = result
575 return ppf
577 def run_metadata_detector(
578 self,
579 metadata_detector_id: str,
580 fs_root: VirtualPath,
581 context: PackageProcessingContext | None = None,
582 ) -> RegisteredMetadata:
583 if fs_root.parent_dir is not None: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 raise ValueError("Provided path must be the file system root.")
585 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name]
586 matching_detectors = [
587 d for d in detectors if d.detector_id == metadata_detector_id
588 ]
589 if len(matching_detectors) != 1: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true
590 assert not matching_detectors
591 raise ValueError(
592 f"The plugin {self._plugin_name} did not provide a metadata detector with ID"
593 f' "{metadata_detector_id}"'
594 )
595 if context is None:
596 context = package_metadata_context()
597 detector = matching_detectors[0]
598 if not detector.applies_to(context.binary_package):
599 raise ValueError(
600 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the'
601 " given package. Consider using `package_metadata_context()` to emulate a binary package"
602 " with the correct specification. As an example: "
603 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
604 " package."
605 )
607 ctrl = BinaryCtrlAccessorTestProvider(
608 self._plugin_metadata,
609 metadata_detector_id,
610 context,
611 )
612 with _read_only_fs_root(fs_root) as ro_root:
613 detector.run_detector(
614 ro_root,
615 ctrl,
616 context,
617 )
618 return ctrl.registered_metadata()
620 def run_package_processor(
621 self,
622 package_processor_id: str,
623 fs_root: VirtualPath,
624 context: PackageProcessingContext | None = None,
625 ) -> None:
626 if fs_root.parent_dir is not None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true
627 raise ValueError("Provided path must be the file system root.")
628 pp_key = (self._plugin_name, package_processor_id)
629 package_processor = self._feature_set.all_package_processors.get(pp_key)
630 if package_processor is None: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true
631 raise ValueError(
632 f"The plugin {self._plugin_name} did not provide a package processor with ID"
633 f' "{package_processor_id}"'
634 )
635 if context is None: 635 ↛ 637line 635 didn't jump to line 637 because the condition on line 635 was always true
636 context = package_metadata_context()
637 if not fs_root.is_read_write: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true
638 raise ValueError(
639 "The provided fs_root is read-only and it must be read-write for package processor"
640 )
641 if not package_processor.applies_to(context.binary_package): 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 raise ValueError(
643 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply'
644 " to the given package. Consider using `package_metadata_context()` to emulate a binary"
645 " package with the correct specification. As an example: "
646 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb'
647 " package."
648 )
649 package_processor.run_package_processor(
650 fs_root,
651 None,
652 context,
653 )
655 @property
656 def declared_manifest_variables(self) -> frozenset[str]:
657 return frozenset(
658 {
659 k
660 for k, v in self._feature_set.manifest_variables.items()
661 if v.plugin_metadata.plugin_name == self._plugin_name
662 }
663 )
665 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]:
666 issues = []
667 for adr in self._feature_set.auto_discard_rules.values():
668 if adr.plugin_metadata.plugin_name != self._plugin_name: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 continue
670 for idx, example in enumerate(adr.examples):
671 result = process_discard_rule_example(
672 adr,
673 example,
674 )
675 if result.inconsistent_paths:
676 issues.append(
677 ADRExampleIssue(
678 adr.name,
679 idx,
680 [
681 x.absolute + ("/" if x.is_dir else "")
682 for x in result.inconsistent_paths
683 ],
684 )
685 )
686 return issues
688 def run_service_detection_and_integrations(
689 self,
690 service_manager: str,
691 fs_root: VirtualPath,
692 context: PackageProcessingContext | None = None,
693 *,
694 service_context_type_hint: type[DSD] | None = None,
695 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]:
696 if fs_root.parent_dir is not None: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true
697 raise ValueError("Provided path must be the file system root.")
698 try:
699 service_manager_details = self._feature_set.service_managers[
700 service_manager
701 ]
702 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true
703 raise KeyError(service_manager)
704 except KeyError:
705 raise ValueError(
706 f"The plugin {self._plugin_name} does not provide a"
707 f" service manager called {service_manager}"
708 ) from None
710 if context is None: 710 ↛ 712line 710 didn't jump to line 712 because the condition on line 710 was always true
711 context = package_metadata_context()
712 detected_services: list[DetectedService[DSD]] = []
713 registry = ServiceRegistryTestImpl(service_manager_details, detected_services)
714 service_manager_details.service_detector(
715 fs_root,
716 registry,
717 context,
718 )
719 ctrl = BinaryCtrlAccessorTestProvider(
720 self._plugin_metadata,
721 service_manager_details.service_manager,
722 context,
723 )
724 if detected_services:
725 service_definitions = [
726 ServiceDefinitionImpl(
727 ds.names[0],
728 ds.names,
729 ds.path,
730 ds.type_of_service,
731 ds.service_scope,
732 ds.enable_by_default,
733 ds.start_by_default,
734 ds.default_upgrade_rule,
735 self._plugin_name,
736 True,
737 ds.service_context,
738 )
739 for ds in detected_services
740 ]
741 service_manager_details.service_integrator(
742 service_definitions,
743 ctrl,
744 context,
745 )
746 return detected_services, ctrl.registered_metadata()
748 def manifest_variables(
749 self,
750 *,
751 resolution_context: VariableContext | None = None,
752 mocked_variables: Mapping[str, str] | None = None,
753 ) -> Mapping[str, str]:
754 valid_manifest_variables = frozenset(
755 {
756 n
757 for n, v in self._feature_set.manifest_variables.items()
758 if v.plugin_metadata.plugin_name == self._plugin_name
759 }
760 )
761 if resolution_context is None:
762 resolution_context = manifest_variable_resolution_context()
763 substitution = self._substitution.copy_for_subst_test(
764 self._feature_set,
765 resolution_context,
766 extra_substitutions=mocked_variables,
767 )
768 return SubstitutionTable(
769 valid_manifest_variables,
770 substitution,
771 )
774class SubstitutionTable(Mapping[str, str]):
775 def __init__(
776 self, valid_manifest_variables: frozenset[str], substitution: Substitution
777 ) -> None:
778 self._valid_manifest_variables = valid_manifest_variables
779 self._resolved: set[str] = set()
780 self._substitution = substitution
782 def __contains__(self, item: object) -> bool:
783 return item in self._valid_manifest_variables
785 def __getitem__(self, key: str) -> str:
786 if key not in self._valid_manifest_variables: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true
787 raise KeyError(key)
788 v = self._substitution.substitute(
789 "{{" + key + "}}", f"test of manifest variable `{key}`"
790 )
791 self._resolved.add(key)
792 return v
794 def __len__(self) -> int:
795 return len(self._valid_manifest_variables)
797 def __iter__(self) -> Iterator[str]:
798 return iter(self._valid_manifest_variables)
800 def keys(self) -> KeysView[str]:
801 return cast("KeysView[str]", self._valid_manifest_variables)