Coverage for src/debputy/plugin/api/test_api/test_impl.py: 82%

298 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from importlib.resources.abc import Traversable 

6from io import BytesIO 

7from pathlib import Path 

8from typing import ( 

9 Mapping, 

10 Dict, 

11 Optional, 

12 Tuple, 

13 List, 

14 cast, 

15 FrozenSet, 

16 Sequence, 

17 Union, 

18 Type, 

19 Iterator, 

20 Set, 

21 KeysView, 

22 Callable, 

23) 

24 

25from debian.deb822 import Deb822 

26from debian.substvars import Substvars 

27 

28from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

29from debputy.architecture_support import faked_arch_table 

30from debputy.filesystem_scan import FSROOverlay, FSRootDir 

31from debputy.packages import BinaryPackage 

32from debputy.plugin.api import ( 

33 PluginInitializationEntryPoint, 

34 VirtualPath, 

35 PackageProcessingContext, 

36 DpkgTriggerType, 

37 Maintscript, 

38) 

39from debputy.plugin.api.example_processing import process_discard_rule_example 

40from debputy.plugin.api.impl import ( 

41 plugin_metadata_for_debputys_own_plugin, 

42 DebputyPluginInitializerProvider, 

43 parse_json_plugin_desc, 

44 MaintscriptAccessorProviderBase, 

45 BinaryCtrlAccessorProviderBase, 

46 PLUGIN_TEST_SUFFIX, 

47 find_json_plugin, 

48 ServiceDefinitionImpl, 

49) 

50from debputy.plugin.api.impl_types import ( 

51 PackagerProvidedFileClassSpec, 

52 DebputyPluginMetadata, 

53 PluginProvidedTrigger, 

54 ServiceManagerDetails, 

55) 

56from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

57from debputy.plugin.api.spec import ( 

58 MaintscriptAccessor, 

59 FlushableSubstvars, 

60 ServiceRegistry, 

61 DSD, 

62 ServiceUpgradeRule, 

63) 

64from debputy.plugin.api.test_api.test_spec import ( 

65 InitializedPluginUnderTest, 

66 RegisteredPackagerProvidedFile, 

67 RegisteredTrigger, 

68 RegisteredMaintscript, 

69 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

70 ADRExampleIssue, 

71 DetectedService, 

72 RegisteredMetadata, 

73) 

74from debputy.plugin.debputy.debputy_plugin import initialize_debputy_features 

75from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

76from debputy.util import package_cross_check_precheck 

77 

78RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

79 

80 

81@dataclasses.dataclass(frozen=True, slots=True) 

82class PackageProcessingContextTestProvider(PackageProcessingContext): 

83 binary_package: BinaryPackage 

84 binary_package_version: str 

85 related_udeb_package: Optional[BinaryPackage] 

86 related_udeb_package_version: Optional[str] 

87 accessible_package_roots: Callable[[], Sequence[Tuple[BinaryPackage, VirtualPath]]] 

88 

89 

90def _initialize_plugin_under_test( 

91 plugin_metadata: DebputyPluginMetadata, 

92 load_debputy_plugin: bool = True, 

93) -> "InitializedPluginUnderTest": 

94 feature_set = PluginProvidedFeatureSet() 

95 substitution = SubstitutionImpl( 

96 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

97 variable_context=VariableContext( 

98 FSROOverlay.create_root_dir("debian", "debian"), 

99 ), 

100 plugin_feature_set=feature_set, 

101 ) 

102 

103 if load_debputy_plugin: 

104 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

105 initialize_debputy_features 

106 ) 

107 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

108 debputy_provider = DebputyPluginInitializerProvider( 

109 debputy_plugin_metadata, 

110 feature_set, 

111 substitution, 

112 ) 

113 debputy_provider.load_plugin() 

114 

115 plugin_under_test_provider = DebputyPluginInitializerProvider( 

116 plugin_metadata, 

117 feature_set, 

118 substitution, 

119 ) 

120 plugin_under_test_provider.load_plugin() 

121 

122 return InitializedPluginUnderTestImpl( 

123 plugin_metadata.plugin_name, 

124 feature_set, 

125 substitution, 

126 ) 

127 

128 

129def _auto_load_plugin_from_filename( 

130 py_test_filename: str, 

131) -> "InitializedPluginUnderTest": 

132 dirname, basename = os.path.split(py_test_filename) 

133 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

134 

135 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

136 if test_location == "uninstalled": 

137 json_basename = f"{plugin_name}.json" 

138 json_desc_file = os.path.join(dirname, json_basename) 

139 if "/" not in json_desc_file: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 json_desc_file = f"./{json_desc_file}" 

141 

142 if os.path.isfile(json_desc_file): 142 ↛ 145line 142 didn't jump to line 145 because the condition on line 142 was always true

143 return _initialize_plugin_from_desc(json_desc_file) 

144 

145 json_desc_file_in = f"{json_desc_file}.in" 

146 if os.path.isfile(json_desc_file_in): 

147 return _initialize_plugin_from_desc(json_desc_file) 

148 raise FileNotFoundError( 

149 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

150 f" {json_desc_file} or {json_desc_file_in}" 

151 ) 

152 

153 if test_location == "installed": 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true

154 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

155 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

156 

157 raise ValueError( 

158 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

159 ' unset OR one of "installed", "uninstalled".' 

160 ) 

161 

162 

163def initialize_plugin_under_test( 

164 *, 

165 plugin_desc_file: Optional[str] = None, 

166) -> "InitializedPluginUnderTest": 

167 """Load and initialize a plugin for testing it 

168 

169 This method will load the plugin via plugin description, which is the method that `debputy` does at 

170 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

171 of the flow). 

172 

173 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

174 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

175 This works for "single-file" plugins, where the description file and the test are right next to 

176 each other. 

177 

178 Note that the description file is *not* required to a valid version at this stage (e.g., "N/A" or 

179 "@PLACEHOLDER@") is fine. So you still use this method if you substitute in the version during 

180 build after running the tests. To support this flow, the file name can also end with `.json.in` 

181 (instead of `.json`). 

182 :return: The loaded plugin for testing 

183 """ 

184 if plugin_desc_file is None: 

185 caller_file = inspect.stack()[1].filename 

186 return _auto_load_plugin_from_filename(caller_file) 

187 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 raise RuntimeError( 

189 "Running the test against an installed plugin does not work when" 

190 " plugin_desc_file is provided. Please skip this test. You can " 

191 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

192 " conditional for this purpose." 

193 ) 

194 return _initialize_plugin_from_desc(plugin_desc_file) 

195 

196 

197def _initialize_plugin_from_desc( 

198 desc_file: str, 

199) -> "InitializedPluginUnderTest": 

200 if not desc_file.endswith((".json", ".json.in")): 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 raise ValueError("The plugin file must end with .json or .json.in") 

202 

203 plugin_metadata = parse_json_plugin_desc(desc_file) 

204 

205 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

206 

207 

208def initialize_plugin_under_test_from_inline_json( 

209 plugin_name: str, 

210 json_content: str, 

211) -> "InitializedPluginUnderTest": 

212 with BytesIO(json_content.encode("utf-8")) as fd: 

213 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

214 

215 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

216 

217 

218def initialize_plugin_under_test_preloaded( 218 ↛ exitline 218 didn't jump to the function exit

219 api_compat_version: int, 

220 plugin_initializer: PluginInitializationEntryPoint, 

221 /, 

222 plugin_name: str = "plugin-under-test", 

223 load_debputy_plugin: bool = True, 

224 plugin_doc_path_resolver: Callable[ 

225 [], Optional[Union[str, Traversable, Path]] 

226 ] = lambda: None, 

227) -> "InitializedPluginUnderTest": 

228 """Internal API: Initialize a plugin for testing without loading it from a file 

229 

230 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

231 description file is correct. Notably, any feature provided via the JSON description file will 

232 **NOT** be visible for the test. 

233 

234 This API is mostly useful for testing parts of debputy itself. 

235 

236 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

237 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

238 :param plugin_initializer: The entry point of the plugin 

239 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

240 use a test name and version. However, you can explicitly set if you want the real name/version. 

241 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

242 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

243 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

244 to enable debputy to use this method for self testing. 

245 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The 

246 default is to not load the documentation. 

247 :return: The loaded plugin for testing 

248 """ 

249 

250 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 raise RuntimeError( 

252 "Running the test against an installed plugin does not work when" 

253 " the plugin is preload. Please skip this test. You can " 

254 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

255 " conditional for this purpose." 

256 ) 

257 

258 plugin_metadata = DebputyPluginMetadata( 

259 plugin_name=plugin_name, 

260 api_compat_version=api_compat_version, 

261 plugin_initializer=plugin_initializer, 

262 plugin_loader=None, 

263 plugin_path="<loaded-via-test>", 

264 ) 

265 

266 return _initialize_plugin_under_test( 

267 plugin_metadata, 

268 load_debputy_plugin=load_debputy_plugin, 

269 ) 

270 

271 

272class _MockArchTable: 

273 @staticmethod 

274 def matches_architecture(_a: str, _b: str) -> bool: 

275 return True 

276 

277 

278FAKE_DPKG_QUERY_TABLE = cast("DpkgArchTable", _MockArchTable()) 

279del _MockArchTable 

280 

281 

282def package_metadata_context( 

283 *, 

284 host_arch: str = "amd64", 

285 package_fields: Optional[Dict[str, str]] = None, 

286 related_udeb_package_fields: Optional[Dict[str, str]] = None, 

287 binary_package_version: str = "1.0-1", 

288 related_udeb_package_version: Optional[str] = None, 

289 should_be_acted_on: bool = True, 

290 related_udeb_fs_root: Optional[VirtualPath] = None, 

291 accessible_package_roots: Sequence[Tuple[Mapping[str, str], VirtualPath]] = tuple(), 

292) -> PackageProcessingContext: 

293 process_table = faked_arch_table(host_arch) 

294 f = { 

295 "Package": "foo", 

296 "Architecture": "any", 

297 } 

298 if package_fields is not None: 

299 f.update(package_fields) 

300 

301 bin_package = BinaryPackage( 

302 Deb822(f), 

303 process_table, 

304 FAKE_DPKG_QUERY_TABLE, 

305 is_main_package=True, 

306 should_be_acted_on=should_be_acted_on, 

307 ) 

308 udeb_package = None 

309 if related_udeb_package_fields is not None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 uf = dict(related_udeb_package_fields) 

311 uf.setdefault("Package", f'{f["Package"]}-udeb') 

312 uf.setdefault("Architecture", f["Architecture"]) 

313 uf.setdefault("Package-Type", "udeb") 

314 udeb_package = BinaryPackage( 

315 Deb822(uf), 

316 process_table, 

317 FAKE_DPKG_QUERY_TABLE, 

318 is_main_package=False, 

319 should_be_acted_on=True, 

320 ) 

321 if related_udeb_package_version is None: 

322 related_udeb_package_version = binary_package_version 

323 if accessible_package_roots: 

324 apr = [] 

325 for fields, apr_fs_root in accessible_package_roots: 

326 apr_fields = Deb822(dict(fields)) 

327 if "Package" not in apr_fields: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise ValueError( 

329 "Missing mandatory Package field in member of accessible_package_roots" 

330 ) 

331 if "Architecture" not in apr_fields: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValueError( 

333 "Missing mandatory Architecture field in member of accessible_package_roots" 

334 ) 

335 apr_package = BinaryPackage( 

336 apr_fields, 

337 process_table, 

338 FAKE_DPKG_QUERY_TABLE, 

339 is_main_package=False, 

340 should_be_acted_on=True, 

341 ) 

342 r = package_cross_check_precheck(bin_package, apr_package) 

343 if not r[0]: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 raise ValueError( 

345 f"{apr_package.name} would not be accessible for {bin_package.name}" 

346 ) 

347 apr.append((apr_package, apr_fs_root)) 

348 

349 if related_udeb_fs_root is not None: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 if udeb_package is None: 

351 raise ValueError( 

352 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

353 ) 

354 r = package_cross_check_precheck(bin_package, udeb_package) 

355 if not r[0]: 

356 raise ValueError( 

357 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

358 " related_udeb_fs_root is irrelevant" 

359 ) 

360 apr.append(udeb_package) 

361 apr = tuple(apr) 

362 else: 

363 apr = tuple() 

364 

365 return PackageProcessingContextTestProvider( 

366 binary_package=bin_package, 

367 related_udeb_package=udeb_package, 

368 binary_package_version=binary_package_version, 

369 related_udeb_package_version=related_udeb_package_version, 

370 accessible_package_roots=lambda: apr, 

371 ) 

372 

373 

374def manifest_variable_resolution_context( 

375 *, 

376 debian_dir: Optional[VirtualPath] = None, 

377) -> VariableContext: 

378 if debian_dir is None: 

379 debian_dir = FSRootDir() 

380 

381 return VariableContext(debian_dir) 

382 

383 

384class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

385 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

386 

387 def __init__( 

388 self, 

389 plugin_metadata: DebputyPluginMetadata, 

390 plugin_source_id: str, 

391 maintscript_container: Dict[str, List[RegisteredMaintscript]], 

392 ): 

393 self._plugin_metadata = plugin_metadata 

394 self._plugin_source_id = plugin_source_id 

395 self._maintscript_container = maintscript_container 

396 

397 @classmethod 

398 def _apply_condition_to_script( 

399 cls, condition: str, run_snippet: str, /, indent: Optional[bool] = None 

400 ) -> str: 

401 return run_snippet 

402 

403 def _append_script( 

404 self, 

405 caller_name: str, 

406 maintscript: Maintscript, 

407 full_script: str, 

408 /, 

409 perform_substitution: bool = True, 

410 ) -> None: 

411 if self._plugin_source_id not in self._maintscript_container: 

412 self._maintscript_container[self._plugin_source_id] = [] 

413 self._maintscript_container[self._plugin_source_id].append( 

414 RegisteredMaintscript( 

415 maintscript, 

416 caller_name, 

417 full_script, 

418 perform_substitution, 

419 ) 

420 ) 

421 

422 

423class RegisteredMetadataImpl(RegisteredMetadata): 

424 __slots__ = ( 

425 "_substvars", 

426 "_triggers", 

427 "_maintscripts", 

428 ) 

429 

430 def __init__( 

431 self, 

432 substvars: Substvars, 

433 triggers: List[RegisteredTrigger], 

434 maintscripts: List[RegisteredMaintscript], 

435 ) -> None: 

436 self._substvars = substvars 

437 self._triggers = triggers 

438 self._maintscripts = maintscripts 

439 

440 @property 

441 def substvars(self) -> Substvars: 

442 return self._substvars 

443 

444 @property 

445 def triggers(self) -> List[RegisteredTrigger]: 

446 return self._triggers 

447 

448 def maintscripts( 

449 self, 

450 *, 

451 maintscript: Optional[Maintscript] = None, 

452 ) -> List[RegisteredMaintscript]: 

453 if maintscript is None: 

454 return self._maintscripts 

455 return [m for m in self._maintscripts if m.maintscript == maintscript] 

456 

457 

458class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

459 __slots__ = ("_maintscript_container",) 

460 

461 def __init__( 

462 self, 

463 plugin_metadata: DebputyPluginMetadata, 

464 plugin_source_id: str, 

465 context: PackageProcessingContext, 

466 ) -> None: 

467 super().__init__( 

468 plugin_metadata, 

469 plugin_source_id, 

470 context, 

471 {}, 

472 FlushableSubstvars(), 

473 (None, None), 

474 ) 

475 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

476 

477 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

478 return MaintscriptAccessorTestProvider( 

479 self._plugin_metadata, 

480 self._plugin_source_id, 

481 self._maintscript_container, 

482 ) 

483 

484 def registered_metadata(self) -> RegisteredMetadata: 

485 return RegisteredMetadataImpl( 

486 self._substvars, 

487 [ 

488 RegisteredTrigger.from_plugin_provided_trigger(t) 

489 for t in self._triggers.values() 

490 if t.provider_source_id == self._plugin_source_id 

491 ], 

492 self._maintscript_container.get(self._plugin_source_id, []), 

493 ) 

494 

495 

496class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

497 __slots__ = ("_service_manager_details", "_service_definitions") 

498 

499 def __init__( 

500 self, 

501 service_manager_details: ServiceManagerDetails, 

502 detected_services: List[DetectedService[DSD]], 

503 ) -> None: 

504 self._service_manager_details = service_manager_details 

505 self._service_definitions = detected_services 

506 

507 def register_service( 

508 self, 

509 path: VirtualPath, 

510 name: Union[str, List[str]], 

511 *, 

512 type_of_service: str = "service", # "timer", etc. 

513 service_scope: str = "system", 

514 enable_by_default: bool = True, 

515 start_by_default: bool = True, 

516 default_upgrade_rule: ServiceUpgradeRule = "restart", 

517 service_context: Optional[DSD] = None, 

518 ) -> None: 

519 names = name if isinstance(name, list) else [name] 

520 if len(names) < 1: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 raise ValueError( 

522 f"The service must have at least one name - {path.absolute} did not have any" 

523 ) 

524 self._service_definitions.append( 

525 DetectedService( 

526 path, 

527 names, 

528 type_of_service, 

529 service_scope, 

530 enable_by_default, 

531 start_by_default, 

532 default_upgrade_rule, 

533 service_context, 

534 ) 

535 ) 

536 

537 

538@contextlib.contextmanager 

539def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

540 if fs_root.is_read_write: 540 ↛ 546line 540 didn't jump to line 546 because the condition on line 540 was always true

541 assert isinstance(fs_root, FSRootDir) 

542 fs_root.is_read_write = False 

543 yield fs_root 

544 fs_root.is_read_write = True 

545 else: 

546 yield fs_root 

547 

548 

549class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

550 def __init__( 

551 self, 

552 plugin_name: str, 

553 feature_set: PluginProvidedFeatureSet, 

554 substitution: SubstitutionImpl, 

555 ) -> None: 

556 self._feature_set = feature_set 

557 self._plugin_name = plugin_name 

558 self._packager_provided_files: Optional[ 

559 Dict[str, RegisteredPackagerProvidedFile] 

560 ] = None 

561 self._triggers: Dict[Tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

562 self._maintscript_container: Dict[str, List[RegisteredMaintscript]] = {} 

563 self._substitution = substitution 

564 assert plugin_name in self._feature_set.plugin_data 

565 

566 @property 

567 def _plugin_metadata(self) -> DebputyPluginMetadata: 

568 return self._feature_set.plugin_data[self._plugin_name] 

569 

570 def packager_provided_files_by_stem( 

571 self, 

572 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

573 ppf = self._packager_provided_files 

574 if ppf is None: 

575 result: Dict[str, RegisteredPackagerProvidedFile] = {} 

576 for spec in self._feature_set.packager_provided_files.values(): 

577 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

578 continue 

579 # Registered as a virtual subclass, so this should always be True 

580 assert isinstance(spec, RegisteredPackagerProvidedFile) 

581 result[spec.stem] = spec 

582 self._packager_provided_files = result 

583 ppf = result 

584 return ppf 

585 

586 def run_metadata_detector( 

587 self, 

588 metadata_detector_id: str, 

589 fs_root: VirtualPath, 

590 context: Optional[PackageProcessingContext] = None, 

591 ) -> RegisteredMetadata: 

592 if fs_root.parent_dir is not None: 592 ↛ 593line 592 didn't jump to line 593 because the condition on line 592 was never true

593 raise ValueError("Provided path must be the file system root.") 

594 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

595 matching_detectors = [ 

596 d for d in detectors if d.detector_id == metadata_detector_id 

597 ] 

598 if len(matching_detectors) != 1: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 assert not matching_detectors 

600 raise ValueError( 

601 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

602 f' "{metadata_detector_id}"' 

603 ) 

604 if context is None: 

605 context = package_metadata_context() 

606 detector = matching_detectors[0] 

607 if not detector.applies_to(context.binary_package): 

608 raise ValueError( 

609 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

610 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

611 " with the correct specification. As an example: " 

612 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

613 " package." 

614 ) 

615 

616 ctrl = BinaryCtrlAccessorTestProvider( 

617 self._plugin_metadata, 

618 metadata_detector_id, 

619 context, 

620 ) 

621 with _read_only_fs_root(fs_root) as ro_root: 

622 detector.run_detector( 

623 ro_root, 

624 ctrl, 

625 context, 

626 ) 

627 return ctrl.registered_metadata() 

628 

629 def run_package_processor( 

630 self, 

631 package_processor_id: str, 

632 fs_root: VirtualPath, 

633 context: Optional[PackageProcessingContext] = None, 

634 ) -> None: 

635 if fs_root.parent_dir is not None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 raise ValueError("Provided path must be the file system root.") 

637 pp_key = (self._plugin_name, package_processor_id) 

638 package_processor = self._feature_set.all_package_processors.get(pp_key) 

639 if package_processor is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 raise ValueError( 

641 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

642 f' "{package_processor_id}"' 

643 ) 

644 if context is None: 644 ↛ 646line 644 didn't jump to line 646 because the condition on line 644 was always true

645 context = package_metadata_context() 

646 if not fs_root.is_read_write: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 raise ValueError( 

648 "The provided fs_root is read-only and it must be read-write for package processor" 

649 ) 

650 if not package_processor.applies_to(context.binary_package): 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 raise ValueError( 

652 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

653 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

654 " package with the correct specification. As an example: " 

655 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

656 " package." 

657 ) 

658 package_processor.run_package_processor( 

659 fs_root, 

660 None, 

661 context, 

662 ) 

663 

664 @property 

665 def declared_manifest_variables(self) -> FrozenSet[str]: 

666 return frozenset( 

667 { 

668 k 

669 for k, v in self._feature_set.manifest_variables.items() 

670 if v.plugin_metadata.plugin_name == self._plugin_name 

671 } 

672 ) 

673 

674 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

675 issues = [] 

676 for adr in self._feature_set.auto_discard_rules.values(): 

677 if adr.plugin_metadata.plugin_name != self._plugin_name: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 continue 

679 for idx, example in enumerate(adr.examples): 

680 result = process_discard_rule_example( 

681 adr, 

682 example, 

683 ) 

684 if result.inconsistent_paths: 

685 issues.append( 

686 ADRExampleIssue( 

687 adr.name, 

688 idx, 

689 [ 

690 x.absolute + ("/" if x.is_dir else "") 

691 for x in result.inconsistent_paths 

692 ], 

693 ) 

694 ) 

695 return issues 

696 

697 def run_service_detection_and_integrations( 

698 self, 

699 service_manager: str, 

700 fs_root: VirtualPath, 

701 context: Optional[PackageProcessingContext] = None, 

702 *, 

703 service_context_type_hint: Optional[Type[DSD]] = None, 

704 ) -> Tuple[List[DetectedService[DSD]], RegisteredMetadata]: 

705 if fs_root.parent_dir is not None: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true

706 raise ValueError("Provided path must be the file system root.") 

707 try: 

708 service_manager_details = self._feature_set.service_managers[ 

709 service_manager 

710 ] 

711 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true

712 raise KeyError(service_manager) 

713 except KeyError: 

714 raise ValueError( 

715 f"The plugin {self._plugin_name} does not provide a" 

716 f" service manager called {service_manager}" 

717 ) from None 

718 

719 if context is None: 719 ↛ 721line 719 didn't jump to line 721 because the condition on line 719 was always true

720 context = package_metadata_context() 

721 detected_services: List[DetectedService[DSD]] = [] 

722 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

723 service_manager_details.service_detector( 

724 fs_root, 

725 registry, 

726 context, 

727 ) 

728 ctrl = BinaryCtrlAccessorTestProvider( 

729 self._plugin_metadata, 

730 service_manager_details.service_manager, 

731 context, 

732 ) 

733 if detected_services: 

734 service_definitions = [ 

735 ServiceDefinitionImpl( 

736 ds.names[0], 

737 ds.names, 

738 ds.path, 

739 ds.type_of_service, 

740 ds.service_scope, 

741 ds.enable_by_default, 

742 ds.start_by_default, 

743 ds.default_upgrade_rule, 

744 self._plugin_name, 

745 True, 

746 ds.service_context, 

747 ) 

748 for ds in detected_services 

749 ] 

750 service_manager_details.service_integrator( 

751 service_definitions, 

752 ctrl, 

753 context, 

754 ) 

755 return detected_services, ctrl.registered_metadata() 

756 

757 def manifest_variables( 

758 self, 

759 *, 

760 resolution_context: Optional[VariableContext] = None, 

761 mocked_variables: Optional[Mapping[str, str]] = None, 

762 ) -> Mapping[str, str]: 

763 valid_manifest_variables = frozenset( 

764 { 

765 n 

766 for n, v in self._feature_set.manifest_variables.items() 

767 if v.plugin_metadata.plugin_name == self._plugin_name 

768 } 

769 ) 

770 if resolution_context is None: 

771 resolution_context = manifest_variable_resolution_context() 

772 substitution = self._substitution.copy_for_subst_test( 

773 self._feature_set, 

774 resolution_context, 

775 extra_substitutions=mocked_variables, 

776 ) 

777 return SubstitutionTable( 

778 valid_manifest_variables, 

779 substitution, 

780 ) 

781 

782 

783class SubstitutionTable(Mapping[str, str]): 

784 def __init__( 

785 self, valid_manifest_variables: FrozenSet[str], substitution: Substitution 

786 ) -> None: 

787 self._valid_manifest_variables = valid_manifest_variables 

788 self._resolved: Set[str] = set() 

789 self._substitution = substitution 

790 

791 def __contains__(self, item: object) -> bool: 

792 return item in self._valid_manifest_variables 

793 

794 def __getitem__(self, key: str) -> str: 

795 if key not in self._valid_manifest_variables: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true

796 raise KeyError(key) 

797 v = self._substitution.substitute( 

798 "{{" + key + "}}", f"test of manifest variable `{key}`" 

799 ) 

800 self._resolved.add(key) 

801 return v 

802 

803 def __len__(self) -> int: 

804 return len(self._valid_manifest_variables) 

805 

806 def __iter__(self) -> Iterator[str]: 

807 return iter(self._valid_manifest_variables) 

808 

809 def keys(self) -> KeysView[str]: 

810 return cast("KeysView[str]", self._valid_manifest_variables)