Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%

300 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from importlib.resources.abc import Traversable 

6from io import BytesIO 

7from pathlib import Path 

8from typing import ( 

9 Dict, 

10 Optional, 

11 Tuple, 

12 List, 

13 cast, 

14 FrozenSet, 

15 Union, 

16 Type, 

17 Set, 

18) 

19from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable 

20 

21from debian.deb822 import Deb822 

22from debian.debian_support import DpkgArchTable 

23from debian.substvars import Substvars 

24 

25from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

26from debputy.architecture_support import faked_arch_table 

27from debputy.filesystem_scan import FSROOverlay, FSRootDir 

28from debputy.packages import BinaryPackage 

29from debputy.plugin.api import ( 

30 PluginInitializationEntryPoint, 

31 VirtualPath, 

32 PackageProcessingContext, 

33 DpkgTriggerType, 

34 Maintscript, 

35) 

36from debputy.plugin.api.example_processing import process_discard_rule_example 

37from debputy.plugin.api.impl import ( 

38 plugin_metadata_for_debputys_own_plugin, 

39 DebputyPluginInitializerProvider, 

40 parse_json_plugin_desc, 

41 MaintscriptAccessorProviderBase, 

42 BinaryCtrlAccessorProviderBase, 

43 PLUGIN_TEST_SUFFIX, 

44 find_json_plugin, 

45 ServiceDefinitionImpl, 

46) 

47from debputy.plugin.api.impl_types import ( 

48 PackagerProvidedFileClassSpec, 

49 DebputyPluginMetadata, 

50 PluginProvidedTrigger, 

51 ServiceManagerDetails, 

52) 

53from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

54from debputy.plugin.api.spec import ( 

55 MaintscriptAccessor, 

56 FlushableSubstvars, 

57 ServiceRegistry, 

58 DSD, 

59 ServiceUpgradeRule, 

60) 

61from debputy.plugin.api.test_api.test_spec import ( 

62 InitializedPluginUnderTest, 

63 RegisteredPackagerProvidedFile, 

64 RegisteredTrigger, 

65 RegisteredMaintscript, 

66 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

67 ADRExampleIssue, 

68 DetectedService, 

69 RegisteredMetadata, 

70) 

71from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features 

72from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

73from debputy.util import package_cross_check_precheck 

74 

75RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

76 

77 

78@dataclasses.dataclass(frozen=True, slots=True) 

79class PackageProcessingContextTestProvider(PackageProcessingContext): 

80 binary_package: BinaryPackage 

81 binary_package_version: str 

82 related_udeb_package: BinaryPackage | None 

83 related_udeb_package_version: str | None 

84 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]] 

85 

86 

87def _initialize_plugin_under_test( 

88 plugin_metadata: DebputyPluginMetadata, 

89 load_debputy_plugin: bool = True, 

90) -> "InitializedPluginUnderTest": 

91 feature_set = PluginProvidedFeatureSet() 

92 substitution = SubstitutionImpl( 

93 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

94 variable_context=VariableContext( 

95 FSROOverlay.create_root_dir("debian", "debian"), 

96 ), 

97 plugin_feature_set=feature_set, 

98 ) 

99 

100 if load_debputy_plugin: 

101 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

102 initialize_debputy_features 

103 ) 

104 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

105 debputy_provider = DebputyPluginInitializerProvider( 

106 debputy_plugin_metadata, 

107 feature_set, 

108 substitution, 

109 ) 

110 debputy_provider.load_plugin() 

111 

112 plugin_under_test_provider = DebputyPluginInitializerProvider( 

113 plugin_metadata, 

114 feature_set, 

115 substitution, 

116 ) 

117 plugin_under_test_provider.load_plugin() 

118 

119 return InitializedPluginUnderTestImpl( 

120 plugin_metadata.plugin_name, 

121 feature_set, 

122 substitution, 

123 ) 

124 

125 

126def _auto_load_plugin_from_filename( 

127 py_test_filename: str, 

128) -> "InitializedPluginUnderTest": 

129 dirname, basename = os.path.split(py_test_filename) 

130 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

131 

132 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

133 if test_location == "uninstalled": 

134 json_basename = f"{plugin_name}.json" 

135 json_desc_file = os.path.join(dirname, json_basename) 

136 if "/" not in json_desc_file: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 json_desc_file = f"./{json_desc_file}" 

138 

139 if os.path.isfile(json_desc_file): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true

140 return _initialize_plugin_from_desc(json_desc_file) 

141 

142 json_desc_file_in = f"{json_desc_file}.in" 

143 if os.path.isfile(json_desc_file_in): 

144 return _initialize_plugin_from_desc(json_desc_file) 

145 raise FileNotFoundError( 

146 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

147 f" {json_desc_file} or {json_desc_file_in}" 

148 ) 

149 

150 if test_location == "installed": 150 ↛ 154line 150 didn't jump to line 154 because the condition on line 150 was always true

151 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

152 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

153 

154 raise ValueError( 

155 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

156 ' unset OR one of "installed", "uninstalled".' 

157 ) 

158 

159 

160def initialize_plugin_under_test( 

161 *, 

162 plugin_desc_file: str | None = None, 

163) -> "InitializedPluginUnderTest": 

164 """Load and initialize a plugin for testing it 

165 

166 This method will load the plugin via plugin description, which is the method that `debputy` does at 

167 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

168 of the flow). 

169 

170 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

171 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

172 This works for "single-file" plugins, where the description file and the test are right next to 

173 each other. 

174 :return: The loaded plugin for testing 

175 """ 

176 if plugin_desc_file is None: 

177 caller_file = inspect.stack()[1].filename 

178 return _auto_load_plugin_from_filename(caller_file) 

179 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 raise RuntimeError( 

181 "Running the test against an installed plugin does not work when" 

182 " plugin_desc_file is provided. Please skip this test. You can " 

183 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

184 " conditional for this purpose." 

185 ) 

186 return _initialize_plugin_from_desc(plugin_desc_file) 

187 

188 

189def _initialize_plugin_from_desc( 

190 desc_file: str, 

191) -> "InitializedPluginUnderTest": 

192 if not desc_file.endswith((".json", ".json.in")): 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise ValueError("The plugin file must end with .json or .json.in") 

194 

195 plugin_metadata = parse_json_plugin_desc(desc_file) 

196 

197 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

198 

199 

200def initialize_plugin_under_test_from_inline_json( 

201 plugin_name: str, 

202 json_content: str, 

203) -> "InitializedPluginUnderTest": 

204 with BytesIO(json_content.encode("utf-8")) as fd: 

205 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

206 

207 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

208 

209 

210def initialize_plugin_under_test_preloaded( 

211 api_compat_version: int, 

212 plugin_initializer: PluginInitializationEntryPoint, 

213 /, 

214 plugin_name: str = "plugin-under-test", 

215 load_debputy_plugin: bool = True, 

216 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None, 

217) -> "InitializedPluginUnderTest": 

218 """Internal API: Initialize a plugin for testing without loading it from a file 

219 

220 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

221 description file is correct. Notably, any feature provided via the JSON description file will 

222 **NOT** be visible for the test. 

223 

224 This API is mostly useful for testing parts of debputy itself. 

225 

226 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

227 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

228 :param plugin_initializer: The entry point of the plugin 

229 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

230 use a test name and version. However, you can explicitly set if you want the real name/version. 

231 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

232 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

233 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

234 to enable debputy to use this method for self testing. 

235 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The 

236 default is to not load the documentation. 

237 :return: The loaded plugin for testing 

238 """ 

239 

240 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 raise RuntimeError( 

242 "Running the test against an installed plugin does not work when" 

243 " the plugin is preload. Please skip this test. You can " 

244 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

245 " conditional for this purpose." 

246 ) 

247 

248 plugin_metadata = DebputyPluginMetadata( 

249 plugin_name=plugin_name, 

250 api_compat_version=api_compat_version, 

251 plugin_initializer=plugin_initializer, 

252 plugin_loader=None, 

253 plugin_path="<loaded-via-test>", 

254 plugin_doc_path_resolver=plugin_doc_path_resolver, 

255 ) 

256 

257 return _initialize_plugin_under_test( 

258 plugin_metadata, 

259 load_debputy_plugin=load_debputy_plugin, 

260 ) 

261 

262 

263class _MockArchTable: 

264 @staticmethod 

265 def matches_architecture(_a: str, _b: str) -> bool: 

266 return True 

267 

268 

269FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable()) 

270del _MockArchTable 

271 

272 

273def package_metadata_context( 

274 *, 

275 host_arch: str = "amd64", 

276 package_fields: dict[str, str] | None = None, 

277 related_udeb_package_fields: dict[str, str] | None = None, 

278 binary_package_version: str = "1.0-1", 

279 related_udeb_package_version: str | None = None, 

280 should_be_acted_on: bool = True, 

281 related_udeb_fs_root: VirtualPath | None = None, 

282 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(), 

283) -> PackageProcessingContext: 

284 process_table = faked_arch_table(host_arch) 

285 f = { 

286 "Package": "foo", 

287 "Architecture": "any", 

288 } 

289 if package_fields is not None: 

290 f.update(package_fields) 

291 

292 bin_package = BinaryPackage( 

293 Deb822(f), 

294 process_table, 

295 FAKE_DPKG_QUERY_TABLE, 

296 is_main_package=True, 

297 should_be_acted_on=should_be_acted_on, 

298 ) 

299 udeb_package = None 

300 if related_udeb_package_fields is not None: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 uf = dict(related_udeb_package_fields) 

302 uf.setdefault("Package", f'{f["Package"]}-udeb') 

303 uf.setdefault("Architecture", f["Architecture"]) 

304 uf.setdefault("Package-Type", "udeb") 

305 udeb_package = BinaryPackage( 

306 Deb822(uf), 

307 process_table, 

308 FAKE_DPKG_QUERY_TABLE, 

309 is_main_package=False, 

310 should_be_acted_on=True, 

311 ) 

312 if related_udeb_package_version is None: 

313 related_udeb_package_version = binary_package_version 

314 if accessible_package_roots: 

315 apr = [] 

316 for fields, apr_fs_root in accessible_package_roots: 

317 apr_fields = Deb822(dict(fields)) 

318 if "Package" not in apr_fields: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 raise ValueError( 

320 "Missing mandatory Package field in member of accessible_package_roots" 

321 ) 

322 if "Architecture" not in apr_fields: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 raise ValueError( 

324 "Missing mandatory Architecture field in member of accessible_package_roots" 

325 ) 

326 apr_package = BinaryPackage( 

327 apr_fields, 

328 process_table, 

329 FAKE_DPKG_QUERY_TABLE, 

330 is_main_package=False, 

331 should_be_acted_on=True, 

332 ) 

333 r = package_cross_check_precheck(bin_package, apr_package) 

334 if not r[0]: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 raise ValueError( 

336 f"{apr_package.name} would not be accessible for {bin_package.name}" 

337 ) 

338 apr.append((apr_package, apr_fs_root)) 

339 

340 if related_udeb_fs_root is not None: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 if udeb_package is None: 

342 raise ValueError( 

343 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

344 ) 

345 r = package_cross_check_precheck(bin_package, udeb_package) 

346 if not r[0]: 

347 raise ValueError( 

348 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

349 " related_udeb_fs_root is irrelevant" 

350 ) 

351 apr.append((udeb_package, related_udeb_fs_root)) 

352 final_apr = tuple(apr) 

353 else: 

354 final_apr = tuple() 

355 

356 return PackageProcessingContextTestProvider( 

357 binary_package=bin_package, 

358 related_udeb_package=udeb_package, 

359 binary_package_version=binary_package_version, 

360 related_udeb_package_version=related_udeb_package_version, 

361 accessible_package_roots=lambda: final_apr, 

362 ) 

363 

364 

365def manifest_variable_resolution_context( 

366 *, 

367 debian_dir: VirtualPath | None = None, 

368) -> VariableContext: 

369 if debian_dir is None: 

370 debian_dir = FSRootDir() 

371 

372 return VariableContext(debian_dir) 

373 

374 

375class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

376 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

377 

378 def __init__( 

379 self, 

380 plugin_metadata: DebputyPluginMetadata, 

381 plugin_source_id: str, 

382 maintscript_container: dict[str, list[RegisteredMaintscript]], 

383 ): 

384 self._plugin_metadata = plugin_metadata 

385 self._plugin_source_id = plugin_source_id 

386 self._maintscript_container = maintscript_container 

387 

388 @classmethod 

389 def _apply_condition_to_script( 

390 cls, condition: str, run_snippet: str, /, indent: bool | None = None 

391 ) -> str: 

392 return run_snippet 

393 

394 def _append_script( 

395 self, 

396 caller_name: str, 

397 maintscript: Maintscript, 

398 full_script: str, 

399 /, 

400 perform_substitution: bool = True, 

401 ) -> None: 

402 if self._plugin_source_id not in self._maintscript_container: 

403 self._maintscript_container[self._plugin_source_id] = [] 

404 self._maintscript_container[self._plugin_source_id].append( 

405 RegisteredMaintscript( 

406 maintscript, 

407 caller_name, 

408 full_script, 

409 perform_substitution, 

410 ) 

411 ) 

412 

413 

414class RegisteredMetadataImpl(RegisteredMetadata): 

415 __slots__ = ( 

416 "_substvars", 

417 "_triggers", 

418 "_maintscripts", 

419 ) 

420 

421 def __init__( 

422 self, 

423 substvars: Substvars, 

424 triggers: list[RegisteredTrigger], 

425 maintscripts: list[RegisteredMaintscript], 

426 ) -> None: 

427 self._substvars = substvars 

428 self._triggers = triggers 

429 self._maintscripts = maintscripts 

430 

431 @property 

432 def substvars(self) -> Substvars: 

433 return self._substvars 

434 

435 @property 

436 def triggers(self) -> list[RegisteredTrigger]: 

437 return self._triggers 

438 

439 def maintscripts( 

440 self, 

441 *, 

442 maintscript: Maintscript | None = None, 

443 ) -> list[RegisteredMaintscript]: 

444 if maintscript is None: 

445 return self._maintscripts 

446 return [m for m in self._maintscripts if m.maintscript == maintscript] 

447 

448 

449class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

450 __slots__ = ("_maintscript_container",) 

451 

452 def __init__( 

453 self, 

454 plugin_metadata: DebputyPluginMetadata, 

455 plugin_source_id: str, 

456 context: PackageProcessingContext, 

457 ) -> None: 

458 super().__init__( 

459 plugin_metadata, 

460 plugin_source_id, 

461 context, 

462 {}, 

463 FlushableSubstvars(), 

464 (None, None), 

465 ) 

466 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

467 

468 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

469 return MaintscriptAccessorTestProvider( 

470 self._plugin_metadata, 

471 self._plugin_source_id, 

472 self._maintscript_container, 

473 ) 

474 

475 def registered_metadata(self) -> RegisteredMetadata: 

476 return RegisteredMetadataImpl( 

477 self._substvars, 

478 [ 

479 RegisteredTrigger.from_plugin_provided_trigger(t) 

480 for t in self._triggers.values() 

481 if t.provider_source_id == self._plugin_source_id 

482 ], 

483 self._maintscript_container.get(self._plugin_source_id, []), 

484 ) 

485 

486 

487class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

488 __slots__ = ("_service_manager_details", "_service_definitions") 

489 

490 def __init__( 

491 self, 

492 service_manager_details: ServiceManagerDetails, 

493 detected_services: list[DetectedService[DSD]], 

494 ) -> None: 

495 self._service_manager_details = service_manager_details 

496 self._service_definitions = detected_services 

497 

498 def register_service( 

499 self, 

500 path: VirtualPath, 

501 name: str | list[str], 

502 *, 

503 type_of_service: str = "service", # "timer", etc. 

504 service_scope: str = "system", 

505 enable_by_default: bool = True, 

506 start_by_default: bool = True, 

507 default_upgrade_rule: ServiceUpgradeRule = "restart", 

508 service_context: DSD | None = None, 

509 ) -> None: 

510 names = name if isinstance(name, list) else [name] 

511 if len(names) < 1: 511 ↛ 512line 511 didn't jump to line 512 because the condition on line 511 was never true

512 raise ValueError( 

513 f"The service must have at least one name - {path.absolute} did not have any" 

514 ) 

515 self._service_definitions.append( 

516 DetectedService( 

517 path, 

518 names, 

519 type_of_service, 

520 service_scope, 

521 enable_by_default, 

522 start_by_default, 

523 default_upgrade_rule, 

524 service_context, 

525 ) 

526 ) 

527 

528 

529@contextlib.contextmanager 

530def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

531 if fs_root.is_read_write: 531 ↛ 537line 531 didn't jump to line 537 because the condition on line 531 was always true

532 assert isinstance(fs_root, FSRootDir) 

533 fs_root.is_read_write = False 

534 yield fs_root 

535 fs_root.is_read_write = True 

536 else: 

537 yield fs_root 

538 

539 

540class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

541 def __init__( 

542 self, 

543 plugin_name: str, 

544 feature_set: PluginProvidedFeatureSet, 

545 substitution: SubstitutionImpl, 

546 ) -> None: 

547 self._feature_set = feature_set 

548 self._plugin_name = plugin_name 

549 self._packager_provided_files: None | ( 

550 dict[str, RegisteredPackagerProvidedFile] 

551 ) = None 

552 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

553 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

554 self._substitution = substitution 

555 assert plugin_name in self._feature_set.plugin_data 

556 

557 @property 

558 def _plugin_metadata(self) -> DebputyPluginMetadata: 

559 return self._feature_set.plugin_data[self._plugin_name] 

560 

561 def packager_provided_files_by_stem( 

562 self, 

563 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

564 ppf = self._packager_provided_files 

565 if ppf is None: 

566 result: dict[str, RegisteredPackagerProvidedFile] = {} 

567 for spec in self._feature_set.packager_provided_files.values(): 

568 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

569 continue 

570 # Registered as a virtual subclass, so this should always be True 

571 assert isinstance(spec, RegisteredPackagerProvidedFile) 

572 result[spec.stem] = spec 

573 self._packager_provided_files = result 

574 ppf = result 

575 return ppf 

576 

577 def run_metadata_detector( 

578 self, 

579 metadata_detector_id: str, 

580 fs_root: VirtualPath, 

581 context: PackageProcessingContext | None = None, 

582 ) -> RegisteredMetadata: 

583 if fs_root.parent_dir is not None: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 raise ValueError("Provided path must be the file system root.") 

585 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

586 matching_detectors = [ 

587 d for d in detectors if d.detector_id == metadata_detector_id 

588 ] 

589 if len(matching_detectors) != 1: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true

590 assert not matching_detectors 

591 raise ValueError( 

592 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

593 f' "{metadata_detector_id}"' 

594 ) 

595 if context is None: 

596 context = package_metadata_context() 

597 detector = matching_detectors[0] 

598 if not detector.applies_to(context.binary_package): 

599 raise ValueError( 

600 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

601 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

602 " with the correct specification. As an example: " 

603 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

604 " package." 

605 ) 

606 

607 ctrl = BinaryCtrlAccessorTestProvider( 

608 self._plugin_metadata, 

609 metadata_detector_id, 

610 context, 

611 ) 

612 with _read_only_fs_root(fs_root) as ro_root: 

613 detector.run_detector( 

614 ro_root, 

615 ctrl, 

616 context, 

617 ) 

618 return ctrl.registered_metadata() 

619 

620 def run_package_processor( 

621 self, 

622 package_processor_id: str, 

623 fs_root: VirtualPath, 

624 context: PackageProcessingContext | None = None, 

625 ) -> None: 

626 if fs_root.parent_dir is not None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true

627 raise ValueError("Provided path must be the file system root.") 

628 pp_key = (self._plugin_name, package_processor_id) 

629 package_processor = self._feature_set.all_package_processors.get(pp_key) 

630 if package_processor is None: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true

631 raise ValueError( 

632 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

633 f' "{package_processor_id}"' 

634 ) 

635 if context is None: 635 ↛ 637line 635 didn't jump to line 637 because the condition on line 635 was always true

636 context = package_metadata_context() 

637 if not fs_root.is_read_write: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 raise ValueError( 

639 "The provided fs_root is read-only and it must be read-write for package processor" 

640 ) 

641 if not package_processor.applies_to(context.binary_package): 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 raise ValueError( 

643 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

644 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

645 " package with the correct specification. As an example: " 

646 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

647 " package." 

648 ) 

649 package_processor.run_package_processor( 

650 fs_root, 

651 None, 

652 context, 

653 ) 

654 

655 @property 

656 def declared_manifest_variables(self) -> frozenset[str]: 

657 return frozenset( 

658 { 

659 k 

660 for k, v in self._feature_set.manifest_variables.items() 

661 if v.plugin_metadata.plugin_name == self._plugin_name 

662 } 

663 ) 

664 

665 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

666 issues = [] 

667 for adr in self._feature_set.auto_discard_rules.values(): 

668 if adr.plugin_metadata.plugin_name != self._plugin_name: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 continue 

670 for idx, example in enumerate(adr.examples): 

671 result = process_discard_rule_example( 

672 adr, 

673 example, 

674 ) 

675 if result.inconsistent_paths: 

676 issues.append( 

677 ADRExampleIssue( 

678 adr.name, 

679 idx, 

680 [ 

681 x.absolute + ("/" if x.is_dir else "") 

682 for x in result.inconsistent_paths 

683 ], 

684 ) 

685 ) 

686 return issues 

687 

688 def run_service_detection_and_integrations( 

689 self, 

690 service_manager: str, 

691 fs_root: VirtualPath, 

692 context: PackageProcessingContext | None = None, 

693 *, 

694 service_context_type_hint: type[DSD] | None = None, 

695 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]: 

696 if fs_root.parent_dir is not None: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 raise ValueError("Provided path must be the file system root.") 

698 try: 

699 service_manager_details = self._feature_set.service_managers[ 

700 service_manager 

701 ] 

702 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true

703 raise KeyError(service_manager) 

704 except KeyError: 

705 raise ValueError( 

706 f"The plugin {self._plugin_name} does not provide a" 

707 f" service manager called {service_manager}" 

708 ) from None 

709 

710 if context is None: 710 ↛ 712line 710 didn't jump to line 712 because the condition on line 710 was always true

711 context = package_metadata_context() 

712 detected_services: list[DetectedService[DSD]] = [] 

713 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

714 service_manager_details.service_detector( 

715 fs_root, 

716 registry, 

717 context, 

718 ) 

719 ctrl = BinaryCtrlAccessorTestProvider( 

720 self._plugin_metadata, 

721 service_manager_details.service_manager, 

722 context, 

723 ) 

724 if detected_services: 

725 service_definitions = [ 

726 ServiceDefinitionImpl( 

727 ds.names[0], 

728 ds.names, 

729 ds.path, 

730 ds.type_of_service, 

731 ds.service_scope, 

732 ds.enable_by_default, 

733 ds.start_by_default, 

734 ds.default_upgrade_rule, 

735 self._plugin_name, 

736 True, 

737 ds.service_context, 

738 ) 

739 for ds in detected_services 

740 ] 

741 service_manager_details.service_integrator( 

742 service_definitions, 

743 ctrl, 

744 context, 

745 ) 

746 return detected_services, ctrl.registered_metadata() 

747 

748 def manifest_variables( 

749 self, 

750 *, 

751 resolution_context: VariableContext | None = None, 

752 mocked_variables: Mapping[str, str] | None = None, 

753 ) -> Mapping[str, str]: 

754 valid_manifest_variables = frozenset( 

755 { 

756 n 

757 for n, v in self._feature_set.manifest_variables.items() 

758 if v.plugin_metadata.plugin_name == self._plugin_name 

759 } 

760 ) 

761 if resolution_context is None: 

762 resolution_context = manifest_variable_resolution_context() 

763 substitution = self._substitution.copy_for_subst_test( 

764 self._feature_set, 

765 resolution_context, 

766 extra_substitutions=mocked_variables, 

767 ) 

768 return SubstitutionTable( 

769 valid_manifest_variables, 

770 substitution, 

771 ) 

772 

773 

774class SubstitutionTable(Mapping[str, str]): 

775 def __init__( 

776 self, valid_manifest_variables: frozenset[str], substitution: Substitution 

777 ) -> None: 

778 self._valid_manifest_variables = valid_manifest_variables 

779 self._resolved: set[str] = set() 

780 self._substitution = substitution 

781 

782 def __contains__(self, item: object) -> bool: 

783 return item in self._valid_manifest_variables 

784 

785 def __getitem__(self, key: str) -> str: 

786 if key not in self._valid_manifest_variables: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true

787 raise KeyError(key) 

788 v = self._substitution.substitute( 

789 "{{" + key + "}}", f"test of manifest variable `{key}`" 

790 ) 

791 self._resolved.add(key) 

792 return v 

793 

794 def __len__(self) -> int: 

795 return len(self._valid_manifest_variables) 

796 

797 def __iter__(self) -> Iterator[str]: 

798 return iter(self._valid_manifest_variables) 

799 

800 def keys(self) -> KeysView[str]: 

801 return cast("KeysView[str]", self._valid_manifest_variables)