Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%

298 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from importlib.resources.abc import Traversable 

6from io import BytesIO 

7from pathlib import Path 

8from typing import ( 

9 Mapping, 

10 Dict, 

11 Optional, 

12 Tuple, 

13 List, 

14 cast, 

15 FrozenSet, 

16 Sequence, 

17 Union, 

18 Type, 

19 Iterator, 

20 Set, 

21 KeysView, 

22 Callable, 

23) 

24 

25from debian.deb822 import Deb822 

26from debian.substvars import Substvars 

27 

28from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

29from debputy.architecture_support import faked_arch_table 

30from debputy.filesystem_scan import FSROOverlay, FSRootDir 

31from debputy.packages import BinaryPackage 

32from debputy.plugin.api import ( 

33 PluginInitializationEntryPoint, 

34 VirtualPath, 

35 PackageProcessingContext, 

36 DpkgTriggerType, 

37 Maintscript, 

38) 

39from debputy.plugin.api.example_processing import process_discard_rule_example 

40from debputy.plugin.api.impl import ( 

41 plugin_metadata_for_debputys_own_plugin, 

42 DebputyPluginInitializerProvider, 

43 parse_json_plugin_desc, 

44 MaintscriptAccessorProviderBase, 

45 BinaryCtrlAccessorProviderBase, 

46 PLUGIN_TEST_SUFFIX, 

47 find_json_plugin, 

48 ServiceDefinitionImpl, 

49) 

50from debputy.plugin.api.impl_types import ( 

51 PackagerProvidedFileClassSpec, 

52 DebputyPluginMetadata, 

53 PluginProvidedTrigger, 

54 ServiceManagerDetails, 

55) 

56from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

57from debputy.plugin.api.spec import ( 

58 MaintscriptAccessor, 

59 FlushableSubstvars, 

60 ServiceRegistry, 

61 DSD, 

62 ServiceUpgradeRule, 

63) 

64from debputy.plugin.api.test_api.test_spec import ( 

65 InitializedPluginUnderTest, 

66 RegisteredPackagerProvidedFile, 

67 RegisteredTrigger, 

68 RegisteredMaintscript, 

69 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

70 ADRExampleIssue, 

71 DetectedService, 

72 RegisteredMetadata, 

73) 

74from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features 

75from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

76from debputy.util import package_cross_check_precheck 

77 

78RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

79 

80 

81@dataclasses.dataclass(frozen=True, slots=True) 

82class PackageProcessingContextTestProvider(PackageProcessingContext): 

83 binary_package: BinaryPackage 

84 binary_package_version: str 

85 related_udeb_package: Optional[BinaryPackage] 

86 related_udeb_package_version: Optional[str] 

87 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]] 

88 

89 

90def _initialize_plugin_under_test( 

91 plugin_metadata: DebputyPluginMetadata, 

92 load_debputy_plugin: bool = True, 

93) -> "InitializedPluginUnderTest": 

94 feature_set = PluginProvidedFeatureSet() 

95 substitution = SubstitutionImpl( 

96 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

97 variable_context=VariableContext( 

98 FSROOverlay.create_root_dir("debian", "debian"), 

99 ), 

100 plugin_feature_set=feature_set, 

101 ) 

102 

103 if load_debputy_plugin: 

104 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

105 initialize_debputy_features 

106 ) 

107 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

108 debputy_provider = DebputyPluginInitializerProvider( 

109 debputy_plugin_metadata, 

110 feature_set, 

111 substitution, 

112 ) 

113 debputy_provider.load_plugin() 

114 

115 plugin_under_test_provider = DebputyPluginInitializerProvider( 

116 plugin_metadata, 

117 feature_set, 

118 substitution, 

119 ) 

120 plugin_under_test_provider.load_plugin() 

121 

122 return InitializedPluginUnderTestImpl( 

123 plugin_metadata.plugin_name, 

124 feature_set, 

125 substitution, 

126 ) 

127 

128 

129def _auto_load_plugin_from_filename( 

130 py_test_filename: str, 

131) -> "InitializedPluginUnderTest": 

132 dirname, basename = os.path.split(py_test_filename) 

133 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

134 

135 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

136 if test_location == "uninstalled": 

137 json_basename = f"{plugin_name}.json" 

138 json_desc_file = os.path.join(dirname, json_basename) 

139 if "/" not in json_desc_file: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 json_desc_file = f"./{json_desc_file}" 

141 

142 if os.path.isfile(json_desc_file): 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true

143 return _initialize_plugin_from_desc(json_desc_file) 

144 

145 json_desc_file_in = f"{json_desc_file}.in" 

146 if os.path.isfile(json_desc_file_in): 

147 return _initialize_plugin_from_desc(json_desc_file) 

148 raise FileNotFoundError( 

149 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

150 f" {json_desc_file} or {json_desc_file_in}" 

151 ) 

152 

153 if test_location == "installed": 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true

154 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

155 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

156 

157 raise ValueError( 

158 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

159 ' unset OR one of "installed", "uninstalled".' 

160 ) 

161 

162 

163def initialize_plugin_under_test( 

164 *, 

165 plugin_desc_file: Optional[str] = None, 

166) -> "InitializedPluginUnderTest": 

167 """Load and initialize a plugin for testing it 

168 

169 This method will load the plugin via plugin description, which is the method that `debputy` does at 

170 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

171 of the flow). 

172 

173 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

174 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

175 This works for "single-file" plugins, where the description file and the test are right next to 

176 each other. 

177 :return: The loaded plugin for testing 

178 """ 

179 if plugin_desc_file is None: 

180 caller_file = inspect.stack()[1].filename 

181 return _auto_load_plugin_from_filename(caller_file) 

182 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 raise RuntimeError( 

184 "Running the test against an installed plugin does not work when" 

185 " plugin_desc_file is provided. Please skip this test. You can " 

186 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

187 " conditional for this purpose." 

188 ) 

189 return _initialize_plugin_from_desc(plugin_desc_file) 

190 

191 

192def _initialize_plugin_from_desc( 

193 desc_file: str, 

194) -> "InitializedPluginUnderTest": 

195 if not desc_file.endswith((".json", ".json.in")): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise ValueError("The plugin file must end with .json or .json.in") 

197 

198 plugin_metadata = parse_json_plugin_desc(desc_file) 

199 

200 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

201 

202 

203def initialize_plugin_under_test_from_inline_json( 

204 plugin_name: str, 

205 json_content: str, 

206) -> "InitializedPluginUnderTest": 

207 with BytesIO(json_content.encode("utf-8")) as fd: 

208 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

209 

210 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

211 

212 

213def initialize_plugin_under_test_preloaded( 

214 api_compat_version: int, 

215 plugin_initializer: PluginInitializationEntryPoint, 

216 /, 

217 plugin_name: str = "plugin-under-test", 

218 load_debputy_plugin: bool = True, 

219 plugin_doc_path_resolver: Callable[ 

220 [], Optional[Union[str, Traversable, Path]] 

221 ] = lambda: None, 

222) -> "InitializedPluginUnderTest": 

223 """Internal API: Initialize a plugin for testing without loading it from a file 

224 

225 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

226 description file is correct. Notably, any feature provided via the JSON description file will 

227 **NOT** be visible for the test. 

228 

229 This API is mostly useful for testing parts of debputy itself. 

230 

231 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

232 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

233 :param plugin_initializer: The entry point of the plugin 

234 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

235 use a test name and version. However, you can explicitly set if you want the real name/version. 

236 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

237 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

238 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

239 to enable debputy to use this method for self testing. 

240 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The 

241 default is to not load the documentation. 

242 :return: The loaded plugin for testing 

243 """ 

244 

245 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 raise RuntimeError( 

247 "Running the test against an installed plugin does not work when" 

248 " the plugin is preload. Please skip this test. You can " 

249 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

250 " conditional for this purpose." 

251 ) 

252 

253 plugin_metadata = DebputyPluginMetadata( 

254 plugin_name=plugin_name, 

255 api_compat_version=api_compat_version, 

256 plugin_initializer=plugin_initializer, 

257 plugin_loader=None, 

258 plugin_path="<loaded-via-test>", 

259 plugin_doc_path_resolver=plugin_doc_path_resolver, 

260 ) 

261 

262 return _initialize_plugin_under_test( 

263 plugin_metadata, 

264 load_debputy_plugin=load_debputy_plugin, 

265 ) 

266 

267 

268class _MockArchTable: 

269 @staticmethod 

270 def matches_architecture(_a: str, _b: str) -> bool: 

271 return True 

272 

273 

274FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable()) 

275del _MockArchTable 

276 

277 

278def package_metadata_context( 

279 *, 

280 host_arch: str = "amd64", 

281 package_fields: Optional[Dict[str, str]] = None, 

282 related_udeb_package_fields: Optional[Dict[str, str]] = None, 

283 binary_package_version: str = "1.0-1", 

284 related_udeb_package_version: Optional[str] = None, 

285 should_be_acted_on: bool = True, 

286 related_udeb_fs_root: Optional[VirtualPath] = None, 

287 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(), 

288) -> PackageProcessingContext: 

289 process_table = faked_arch_table(host_arch) 

290 f = { 

291 "Package": "foo", 

292 "Architecture": "any", 

293 } 

294 if package_fields is not None: 

295 f.update(package_fields) 

296 

297 bin_package = BinaryPackage( 

298 Deb822(f), 

299 process_table, 

300 FAKE_DPKG_QUERY_TABLE, 

301 is_main_package=True, 

302 should_be_acted_on=should_be_acted_on, 

303 ) 

304 udeb_package = None 

305 if related_udeb_package_fields is not None: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 uf = dict(related_udeb_package_fields) 

307 uf.setdefault("Package", f'{f["Package"]}-udeb') 

308 uf.setdefault("Architecture", f["Architecture"]) 

309 uf.setdefault("Package-Type", "udeb") 

310 udeb_package = BinaryPackage( 

311 Deb822(uf), 

312 process_table, 

313 FAKE_DPKG_QUERY_TABLE, 

314 is_main_package=False, 

315 should_be_acted_on=True, 

316 ) 

317 if related_udeb_package_version is None: 

318 related_udeb_package_version = binary_package_version 

319 if accessible_package_roots: 

320 apr = [] 

321 for fields, apr_fs_root in accessible_package_roots: 

322 apr_fields = Deb822(dict(fields)) 

323 if "Package" not in apr_fields: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 raise ValueError( 

325 "Missing mandatory Package field in member of accessible_package_roots" 

326 ) 

327 if "Architecture" not in apr_fields: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise ValueError( 

329 "Missing mandatory Architecture field in member of accessible_package_roots" 

330 ) 

331 apr_package = BinaryPackage( 

332 apr_fields, 

333 process_table, 

334 FAKE_DPKG_QUERY_TABLE, 

335 is_main_package=False, 

336 should_be_acted_on=True, 

337 ) 

338 r = package_cross_check_precheck(bin_package, apr_package) 

339 if not r[0]: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 raise ValueError( 

341 f"{apr_package.name} would not be accessible for {bin_package.name}" 

342 ) 

343 apr.append((apr_package, apr_fs_root)) 

344 

345 if related_udeb_fs_root is not None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 if udeb_package is None: 

347 raise ValueError( 

348 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

349 ) 

350 r = package_cross_check_precheck(bin_package, udeb_package) 

351 if not r[0]: 

352 raise ValueError( 

353 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

354 " related_udeb_fs_root is irrelevant" 

355 ) 

356 apr.append(udeb_package) 

357 apr = tuple(apr) 

358 else: 

359 apr = tuple() 

360 

361 return PackageProcessingContextTestProvider( 

362 binary_package=bin_package, 

363 related_udeb_package=udeb_package, 

364 binary_package_version=binary_package_version, 

365 related_udeb_package_version=related_udeb_package_version, 

366 accessible_package_roots=lambda: apr, 

367 ) 

368 

369 

370def manifest_variable_resolution_context( 

371 *, 

372 debian_dir: Optional[VirtualPath] = None, 

373) -> VariableContext: 

374 if debian_dir is None: 

375 debian_dir = FSRootDir() 

376 

377 return VariableContext(debian_dir) 

378 

379 

380class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

381 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

382 

383 def __init__( 

384 self, 

385 plugin_metadata: DebputyPluginMetadata, 

386 plugin_source_id: str, 

387 maintscript_container: Dict[str, List[RegisteredMaintscript]], 

388 ): 

389 self._plugin_metadata = plugin_metadata 

390 self._plugin_source_id = plugin_source_id 

391 self._maintscript_container = maintscript_container 

392 

393 @classmethod 

394 def _apply_condition_to_script( 

395 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None 

396 ) -> str: 

397 return run_snippet 

398 

399 def _append_script( 

400 self, 

401 caller_name: str, 

402 maintscript: Maintscript, 

403 full_script: str, 

404 /, 

405 perform_substitution: bool = True, 

406 ) -> None: 

407 if self._plugin_source_id not in self._maintscript_container: 

408 self._maintscript_container[self._plugin_source_id] = [] 

409 self._maintscript_container[self._plugin_source_id].append( 

410 RegisteredMaintscript( 

411 maintscript, 

412 caller_name, 

413 full_script, 

414 perform_substitution, 

415 ) 

416 ) 

417 

418 

419class RegisteredMetadataImpl(RegisteredMetadata): 

420 __slots__ = ( 

421 "_substvars", 

422 "_triggers", 

423 "_maintscripts", 

424 ) 

425 

426 def __init__( 

427 self, 

428 substvars: Substvars, 

429 triggers: List[RegisteredTrigger], 

430 maintscripts: List[RegisteredMaintscript], 

431 ) -> None: 

432 self._substvars = substvars 

433 self._triggers = triggers 

434 self._maintscripts = maintscripts 

435 

436 @property 

437 def substvars(self) -> Substvars: 

438 return self._substvars 

439 

440 @property 

441 def triggers(self) -> List[RegisteredTrigger]: 

442 return self._triggers 

443 

444 def maintscripts( 

445 self, 

446 *, 

447 maintscript: Optional[Maintscript] = None, 

448 ) -> List[RegisteredMaintscript]: 

449 if maintscript is None: 

450 return self._maintscripts 

451 return [m for m in self._maintscripts if m.maintscript == maintscript] 

452 

453 

454class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

455 __slots__ = ("_maintscript_container",) 

456 

457 def __init__( 

458 self, 

459 plugin_metadata: DebputyPluginMetadata, 

460 plugin_source_id: str, 

461 context: PackageProcessingContext, 

462 ) -> None: 

463 super().__init__( 

464 plugin_metadata, 

465 plugin_source_id, 

466 context, 

467 {}, 

468 FlushableSubstvars(), 

469 (None, None), 

470 ) 

471 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

472 

473 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

474 return MaintscriptAccessorTestProvider( 

475 self._plugin_metadata, 

476 self._plugin_source_id, 

477 self._maintscript_container, 

478 ) 

479 

480 def registered_metadata(self) -> RegisteredMetadata: 

481 return RegisteredMetadataImpl( 

482 self._substvars, 

483 [ 

484 RegisteredTrigger.from_plugin_provided_trigger(t) 

485 for t in self._triggers.values() 

486 if t.provider_source_id == self._plugin_source_id 

487 ], 

488 self._maintscript_container.get(self._plugin_source_id, []), 

489 ) 

490 

491 

492class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

493 __slots__ = ("_service_manager_details", "_service_definitions") 

494 

495 def __init__( 

496 self, 

497 service_manager_details: ServiceManagerDetails, 

498 detected_services: List[DetectedService[DSD]], 

499 ) -> None: 

500 self._service_manager_details = service_manager_details 

501 self._service_definitions = detected_services 

502 

503 def register_service( 

504 self, 

505 path: VirtualPath, 

506 name: Union[str, List[str]], 

507 *, 

508 type_of_service: str = "service", # "timer", etc. 

509 service_scope: str = "system", 

510 enable_by_default: bool = True, 

511 start_by_default: bool = True, 

512 default_upgrade_rule: ServiceUpgradeRule = "restart", 

513 service_context: Optional[DSD] = None, 

514 ) -> None: 

515 names = name if isinstance(name, list) else [name] 

516 if len(names) < 1: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 raise ValueError( 

518 f"The service must have at least one name - {path.absolute} did not have any" 

519 ) 

520 self._service_definitions.append( 

521 DetectedService( 

522 path, 

523 names, 

524 type_of_service, 

525 service_scope, 

526 enable_by_default, 

527 start_by_default, 

528 default_upgrade_rule, 

529 service_context, 

530 ) 

531 ) 

532 

533 

534@contextlib.contextmanager 

535def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

536 if fs_root.is_read_write: 536 ↛ 542line 536 didn't jump to line 542 because the condition on line 536 was always true

537 assert isinstance(fs_root, FSRootDir) 

538 fs_root.is_read_write = False 

539 yield fs_root 

540 fs_root.is_read_write = True 

541 else: 

542 yield fs_root 

543 

544 

545class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

546 def __init__( 

547 self, 

548 plugin_name: str, 

549 feature_set: PluginProvidedFeatureSet, 

550 substitution: SubstitutionImpl, 

551 ) -> None: 

552 self._feature_set = feature_set 

553 self._plugin_name = plugin_name 

554 self._packager_provided_files: Optional[ 

555 Dict[str, RegisteredPackagerProvidedFile] 

556 ] = None 

557 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

558 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

559 self._substitution = substitution 

560 assert plugin_name in self._feature_set.plugin_data 

561 

562 @property 

563 def _plugin_metadata(self) -> DebputyPluginMetadata: 

564 return self._feature_set.plugin_data[self._plugin_name] 

565 

566 def packager_provided_files_by_stem( 

567 self, 

568 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

569 ppf = self._packager_provided_files 

570 if ppf is None: 

571 result: Dict[str, RegisteredPackagerProvidedFile] = {} 

572 for spec in self._feature_set.packager_provided_files.values(): 

573 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

574 continue 

575 # Registered as a virtual subclass, so this should always be True 

576 assert isinstance(spec, RegisteredPackagerProvidedFile) 

577 result[spec.stem] = spec 

578 self._packager_provided_files = result 

579 ppf = result 

580 return ppf 

581 

582 def run_metadata_detector( 

583 self, 

584 metadata_detector_id: str, 

585 fs_root: VirtualPath, 

586 context: Optional[PackageProcessingContext] = None, 

587 ) -> RegisteredMetadata: 

588 if fs_root.parent_dir is not None: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 raise ValueError("Provided path must be the file system root.") 

590 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

591 matching_detectors = [ 

592 d for d in detectors if d.detector_id == metadata_detector_id 

593 ] 

594 if len(matching_detectors) != 1: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 assert not matching_detectors 

596 raise ValueError( 

597 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

598 f' "{metadata_detector_id}"' 

599 ) 

600 if context is None: 

601 context = package_metadata_context() 

602 detector = matching_detectors[0] 

603 if not detector.applies_to(context.binary_package): 

604 raise ValueError( 

605 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

606 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

607 " with the correct specification. As an example: " 

608 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

609 " package." 

610 ) 

611 

612 ctrl = BinaryCtrlAccessorTestProvider( 

613 self._plugin_metadata, 

614 metadata_detector_id, 

615 context, 

616 ) 

617 with _read_only_fs_root(fs_root) as ro_root: 

618 detector.run_detector( 

619 ro_root, 

620 ctrl, 

621 context, 

622 ) 

623 return ctrl.registered_metadata() 

624 

625 def run_package_processor( 

626 self, 

627 package_processor_id: str, 

628 fs_root: VirtualPath, 

629 context: Optional[PackageProcessingContext] = None, 

630 ) -> None: 

631 if fs_root.parent_dir is not None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true

632 raise ValueError("Provided path must be the file system root.") 

633 pp_key = (self._plugin_name, package_processor_id) 

634 package_processor = self._feature_set.all_package_processors.get(pp_key) 

635 if package_processor is None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 raise ValueError( 

637 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

638 f' "{package_processor_id}"' 

639 ) 

640 if context is None: 640 ↛ 642line 640 didn't jump to line 642 because the condition on line 640 was always true

641 context = package_metadata_context() 

642 if not fs_root.is_read_write: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 raise ValueError( 

644 "The provided fs_root is read-only and it must be read-write for package processor" 

645 ) 

646 if not package_processor.applies_to(context.binary_package): 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 raise ValueError( 

648 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

649 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

650 " package with the correct specification. As an example: " 

651 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

652 " package." 

653 ) 

654 package_processor.run_package_processor( 

655 fs_root, 

656 None, 

657 context, 

658 ) 

659 

660 @property 

661 def declared_manifest_variables(self) -> FrozenSet[str]: 

662 return frozenset( 

663 { 

664 k 

665 for k, v in self._feature_set.manifest_variables.items() 

666 if v.plugin_metadata.plugin_name == self._plugin_name 

667 } 

668 ) 

669 

670 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

671 issues = [] 

672 for adr in self._feature_set.auto_discard_rules.values(): 

673 if adr.plugin_metadata.plugin_name != self._plugin_name: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 continue 

675 for idx, example in enumerate(adr.examples): 

676 result = process_discard_rule_example( 

677 adr, 

678 example, 

679 ) 

680 if result.inconsistent_paths: 

681 issues.append( 

682 ADRExampleIssue( 

683 adr.name, 

684 idx, 

685 [ 

686 x.absolute + ("/" if x.is_dir else "") 

687 for x in result.inconsistent_paths 

688 ], 

689 ) 

690 ) 

691 return issues 

692 

693 def run_service_detection_and_integrations( 

694 self, 

695 service_manager: str, 

696 fs_root: VirtualPath, 

697 context: Optional[PackageProcessingContext] = None, 

698 *, 

699 service_context_type_hint: Optional[Type[DSD]] = None, 

700 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]: 

701 if fs_root.parent_dir is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true

702 raise ValueError("Provided path must be the file system root.") 

703 try: 

704 service_manager_details = self._feature_set.service_managers[ 

705 service_manager 

706 ] 

707 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 raise KeyError(service_manager) 

709 except KeyError: 

710 raise ValueError( 

711 f"The plugin {self._plugin_name} does not provide a" 

712 f" service manager called {service_manager}" 

713 ) from None 

714 

715 if context is None: 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was always true

716 context = package_metadata_context() 

717 detected_services: List[DetectedService[DSD]] = [] 

718 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

719 service_manager_details.service_detector( 

720 fs_root, 

721 registry, 

722 context, 

723 ) 

724 ctrl = BinaryCtrlAccessorTestProvider( 

725 self._plugin_metadata, 

726 service_manager_details.service_manager, 

727 context, 

728 ) 

729 if detected_services: 

730 service_definitions = [ 

731 ServiceDefinitionImpl( 

732 ds.names[0], 

733 ds.names, 

734 ds.path, 

735 ds.type_of_service, 

736 ds.service_scope, 

737 ds.enable_by_default, 

738 ds.start_by_default, 

739 ds.default_upgrade_rule, 

740 self._plugin_name, 

741 True, 

742 ds.service_context, 

743 ) 

744 for ds in detected_services 

745 ] 

746 service_manager_details.service_integrator( 

747 service_definitions, 

748 ctrl, 

749 context, 

750 ) 

751 return detected_services, ctrl.registered_metadata() 

752 

753 def manifest_variables( 

754 self, 

755 *, 

756 resolution_context: Optional[VariableContext] = None, 

757 mocked_variables: Optional[Mapping[str, str]] = None, 

758 ) -> Mapping[str, str]: 

759 valid_manifest_variables = frozenset( 

760 { 

761 n 

762 for n, v in self._feature_set.manifest_variables.items() 

763 if v.plugin_metadata.plugin_name == self._plugin_name 

764 } 

765 ) 

766 if resolution_context is None: 

767 resolution_context = manifest_variable_resolution_context() 

768 substitution = self._substitution.copy_for_subst_test( 

769 self._feature_set, 

770 resolution_context, 

771 extra_substitutions=mocked_variables, 

772 ) 

773 return SubstitutionTable( 

774 valid_manifest_variables, 

775 substitution, 

776 ) 

777 

778 

779class SubstitutionTable(Mapping[str, str]): 

780 def __init__( 

781 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution 

782 ) -> None: 

783 self._valid_manifest_variables = valid_manifest_variables 

784 self._resolved: Set[str] = set() 

785 self._substitution = substitution 

786 

787 def __contains__(self, item: object) -> bool: 

788 return item in self._valid_manifest_variables 

789 

790 def __getitem__(self, key: str) -> str: 

791 if key not in self._valid_manifest_variables: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true

792 raise KeyError(key) 

793 v = self._substitution.substitute( 

794 "{{" + key + "}}", f"test of manifest variable `{key}`" 

795 ) 

796 self._resolved.add(key) 

797 return v 

798 

799 def __len__(self) -> int: 

800 return len(self._valid_manifest_variables) 

801 

802 def __iter__(self) -> Iterator[str]: 

803 return iter(self._valid_manifest_variables) 

804 

805 def keys(self) -> KeysView[str]: 

806 return cast("KeysView[str]", self._valid_manifest_variables)