Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%

307 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-04-06 19:25 +0000

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable 

6from importlib.resources.abc import Traversable 

7from io import BytesIO 

8from pathlib import Path 

9from typing import ( 

10 cast, 

11 TYPE_CHECKING, 

12) 

13 

14from debian.deb822 import Deb822 

15from debian.debian_support import DpkgArchTable 

16from debian.substvars import Substvars 

17 

18from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

19from debputy.filesystem_scan import OSFSROOverlay, InMemoryVirtualRootDir 

20from debputy.packages import BinaryPackage, SourcePackage 

21from debputy.plugin.api import ( 

22 PluginInitializationEntryPoint, 

23 VirtualPath, 

24 PackageProcessingContext, 

25 DpkgTriggerType, 

26 Maintscript, 

27) 

28from debputy.plugin.api.example_processing import process_discard_rule_example 

29from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

30from debputy.plugin.api.impl import ( 

31 plugin_metadata_for_debputys_own_plugin, 

32 DebputyPluginInitializerProvider, 

33 parse_json_plugin_desc, 

34 MaintscriptAccessorProviderBase, 

35 BinaryCtrlAccessorProviderBase, 

36 PLUGIN_TEST_SUFFIX, 

37 find_json_plugin, 

38 ServiceDefinitionImpl, 

39) 

40from debputy.plugin.api.impl_types import ( 

41 PackagerProvidedFileClassSpec, 

42 DebputyPluginMetadata, 

43 PluginProvidedTrigger, 

44 ServiceManagerDetails, 

45) 

46from debputy.plugin.api.spec import ( 

47 MaintscriptAccessor, 

48 FlushableSubstvars, 

49 ServiceRegistry, 

50 DSD, 

51 ServiceUpgradeRule, 

52) 

53from debputy.plugin.api.test_api.test_spec import ( 

54 InitializedPluginUnderTest, 

55 RegisteredPackagerProvidedFile, 

56 RegisteredTrigger, 

57 RegisteredMaintscript, 

58 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

59 ADRExampleIssue, 

60 DetectedService, 

61 RegisteredMetadata, 

62) 

63from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features 

64from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

65from debputy.util import package_cross_check_precheck 

66from debputy.version import DEBPUTY_PLUGIN_ROOT_DIR 

67 

68RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

69 

70 

71type ManifestConfigurationImplementation[T] = Callable[ 

72 [SourcePackage | BinaryPackage, type[T]], T 

73] 

74 

75 

76@dataclasses.dataclass(frozen=True, slots=True) 

77class PackageProcessingContextTestProvider(PackageProcessingContext): 

78 source_package: SourcePackage 

79 binary_package: BinaryPackage 

80 binary_package_version: str 

81 related_udeb_package: BinaryPackage | None 

82 related_udeb_package_version: str | None 

83 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]] 

84 manifest_configuration: ManifestConfigurationImplementation 

85 

86 # TODO: implement (when needed) 

87 # dpkg_arch_query_table 

88 # deb_options_and_profiles (pull from binary ?) 

89 # source_condition_context 

90 # condition_context 

91 

92 

93def _initialize_plugin_under_test( 

94 plugin_metadata: DebputyPluginMetadata, 

95 load_debputy_plugin: bool = True, 

96) -> "InitializedPluginUnderTest": 

97 feature_set = PluginProvidedFeatureSet() 

98 substitution = SubstitutionImpl( 

99 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

100 variable_context=VariableContext( 

101 OSFSROOverlay.create_root_dir("debian", "debian"), 

102 ), 

103 plugin_feature_set=feature_set, 

104 ) 

105 

106 if load_debputy_plugin: 

107 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

108 initialize_debputy_features 

109 ) 

110 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

111 debputy_provider = DebputyPluginInitializerProvider( 

112 debputy_plugin_metadata, 

113 feature_set, 

114 substitution, 

115 ) 

116 debputy_provider.load_plugin() 

117 

118 plugin_under_test_provider = DebputyPluginInitializerProvider( 

119 plugin_metadata, 

120 feature_set, 

121 substitution, 

122 ) 

123 plugin_under_test_provider.load_plugin() 

124 

125 return InitializedPluginUnderTestImpl( 

126 plugin_metadata.plugin_name, 

127 feature_set, 

128 substitution, 

129 ) 

130 

131 

132def _auto_load_plugin_from_filename( 

133 py_test_filename: str, 

134) -> "InitializedPluginUnderTest": 

135 dirname, basename = os.path.split(py_test_filename) 

136 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

137 

138 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

139 if test_location == "uninstalled": 

140 json_basename = f"{plugin_name}.json" 

141 json_desc_file = os.path.join(dirname, json_basename) 

142 if "/" not in json_desc_file: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 json_desc_file = f"./{json_desc_file}" 

144 

145 if os.path.isfile(json_desc_file): 145 ↛ 148line 145 didn't jump to line 148 because the condition on line 145 was always true

146 return _initialize_plugin_from_desc(json_desc_file) 

147 

148 json_desc_file_in = f"{json_desc_file}.in" 

149 if os.path.isfile(json_desc_file_in): 

150 return _initialize_plugin_from_desc(json_desc_file) 

151 raise FileNotFoundError( 

152 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

153 f" {json_desc_file} or {json_desc_file_in}" 

154 ) 

155 

156 if test_location == "installed": 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true

157 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

158 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

159 

160 raise ValueError( 

161 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

162 ' unset OR one of "installed", "uninstalled".' 

163 ) 

164 

165 

166def initialize_plugin_under_test( 

167 *, 

168 plugin_desc_file: str | None = None, 

169) -> "InitializedPluginUnderTest": 

170 """Load and initialize a plugin for testing it 

171 

172 This method will load the plugin via plugin description, which is the method that `debputy` does at 

173 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

174 of the flow). 

175 

176 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

177 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

178 This works for "single-file" plugins, where the description file and the test are right next to 

179 each other. 

180 :return: The loaded plugin for testing 

181 """ 

182 if plugin_desc_file is None: 

183 caller_file = inspect.stack()[1].filename 

184 return _auto_load_plugin_from_filename(caller_file) 

185 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise RuntimeError( 

187 "Running the test against an installed plugin does not work when" 

188 " plugin_desc_file is provided. Please skip this test. You can " 

189 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

190 " conditional for this purpose." 

191 ) 

192 return _initialize_plugin_from_desc(plugin_desc_file) 

193 

194 

195def _initialize_plugin_from_desc( 

196 desc_file: str, 

197) -> "InitializedPluginUnderTest": 

198 if not desc_file.endswith((".json", ".json.in")): 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise ValueError("The plugin file must end with .json or .json.in") 

200 

201 plugin_metadata = parse_json_plugin_desc(desc_file) 

202 

203 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

204 

205 

206def initialize_plugin_under_test_from_inline_json( 

207 plugin_name: str, 

208 json_content: str, 

209) -> "InitializedPluginUnderTest": 

210 with BytesIO(json_content.encode("utf-8")) as fd: 

211 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

212 

213 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

214 

215 

216def initialize_plugin_under_test_preloaded( 

217 api_compat_version: int, 

218 plugin_initializer: PluginInitializationEntryPoint, 

219 /, 

220 plugin_name: str = "plugin-under-test", 

221 load_debputy_plugin: bool = True, 

222 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None, 

223) -> "InitializedPluginUnderTest": 

224 """Internal API: Initialize a plugin for testing without loading it from a file 

225 

226 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

227 description file is correct. Notably, any feature provided via the JSON description file will 

228 **NOT** be visible for the test. 

229 

230 This API is mostly useful for testing parts of debputy itself. 

231 

232 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

233 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

234 :param plugin_initializer: The entry point of the plugin 

235 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

236 use a test name and version. However, you can explicitly set if you want the real name/version. 

237 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

238 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

239 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

240 to enable debputy to use this method for self testing. 

241 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The 

242 default is to not load the documentation. 

243 :return: The loaded plugin for testing 

244 """ 

245 

246 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 raise RuntimeError( 

248 "Running the test against an installed plugin does not work when" 

249 " the plugin is preload. Please skip this test. You can " 

250 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

251 " conditional for this purpose." 

252 ) 

253 

254 plugin_metadata = DebputyPluginMetadata( 

255 plugin_name=plugin_name, 

256 api_compat_version=api_compat_version, 

257 plugin_initializer=plugin_initializer, 

258 plugin_loader=None, 

259 plugin_path="<loaded-via-test>", 

260 plugin_doc_path_resolver=plugin_doc_path_resolver, 

261 ) 

262 

263 return _initialize_plugin_under_test( 

264 plugin_metadata, 

265 load_debputy_plugin=load_debputy_plugin, 

266 ) 

267 

268 

269class _MockArchTable: 

270 @staticmethod 

271 def matches_architecture(_a: str, _b: str) -> bool: 

272 return True 

273 

274 

275FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable()) 

276del _MockArchTable 

277 

278 

279def package_metadata_context( 

280 *, 

281 host_arch: str = "amd64", 

282 package_fields: dict[str, str] | None = None, 

283 related_udeb_package_fields: dict[str, str] | None = None, 

284 binary_package_version: str = "1.0-1", 

285 related_udeb_package_version: str | None = None, 

286 should_be_acted_on: bool = True, 

287 related_udeb_fs_root: VirtualPath | None = None, 

288 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(), 

289 source_package_fields: dict[str, str] | None = None, 

290 manifest_configuration: ManifestConfigurationImplementation = lambda x, y: None, 

291) -> PackageProcessingContext: 

292 process_table = DpkgArchitectureBuildProcessValuesTable(fake_host=host_arch) 

293 f = { 

294 "Package": "foo", 

295 "Architecture": "any", 

296 } 

297 if package_fields is not None: 

298 f.update(package_fields) 

299 

300 bin_package = BinaryPackage( 

301 Deb822(f), 

302 process_table, 

303 FAKE_DPKG_QUERY_TABLE, 

304 is_main_package=True, 

305 should_be_acted_on=should_be_acted_on, 

306 ) 

307 udeb_package = None 

308 s = { 

309 "Source": bin_package.name, 

310 } 

311 if source_package_fields is not None: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 s.update(source_package_fields) 

313 source_package = SourcePackage(Deb822(s)) 

314 if related_udeb_package_fields is not None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 uf = dict(related_udeb_package_fields) 

316 uf.setdefault("Package", f'{f["Package"]}-udeb') 

317 uf.setdefault("Architecture", f["Architecture"]) 

318 uf.setdefault("Package-Type", "udeb") 

319 udeb_package = BinaryPackage( 

320 Deb822(uf), 

321 process_table, 

322 FAKE_DPKG_QUERY_TABLE, 

323 is_main_package=False, 

324 should_be_acted_on=True, 

325 ) 

326 if related_udeb_package_version is None: 

327 related_udeb_package_version = binary_package_version 

328 if accessible_package_roots: 

329 apr = [] 

330 for fields, apr_fs_root in accessible_package_roots: 

331 apr_fields = Deb822(dict(fields)) 

332 if "Package" not in apr_fields: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise ValueError( 

334 "Missing mandatory Package field in member of accessible_package_roots" 

335 ) 

336 if "Architecture" not in apr_fields: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 raise ValueError( 

338 "Missing mandatory Architecture field in member of accessible_package_roots" 

339 ) 

340 apr_package = BinaryPackage( 

341 apr_fields, 

342 process_table, 

343 FAKE_DPKG_QUERY_TABLE, 

344 is_main_package=False, 

345 should_be_acted_on=True, 

346 ) 

347 r = package_cross_check_precheck(bin_package, apr_package) 

348 if not r[0]: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 raise ValueError( 

350 f"{apr_package.name} would not be accessible for {bin_package.name}" 

351 ) 

352 apr.append((apr_package, apr_fs_root)) 

353 

354 if related_udeb_fs_root is not None: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 if udeb_package is None: 

356 raise ValueError( 

357 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

358 ) 

359 r = package_cross_check_precheck(bin_package, udeb_package) 

360 if not r[0]: 

361 raise ValueError( 

362 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

363 " related_udeb_fs_root is irrelevant" 

364 ) 

365 apr.append((udeb_package, related_udeb_fs_root)) 

366 final_apr = tuple(apr) 

367 else: 

368 final_apr = tuple() 

369 

370 return PackageProcessingContextTestProvider( 

371 source_package=source_package, 

372 binary_package=bin_package, 

373 related_udeb_package=udeb_package, 

374 binary_package_version=binary_package_version, 

375 related_udeb_package_version=related_udeb_package_version, 

376 accessible_package_roots=lambda: final_apr, 

377 manifest_configuration=manifest_configuration, 

378 ) 

379 

380 

381def manifest_variable_resolution_context( 

382 *, 

383 debian_dir: VirtualPath | None = None, 

384) -> VariableContext: 

385 if debian_dir is None: 

386 debian_dir = InMemoryVirtualRootDir() 

387 

388 return VariableContext(debian_dir) 

389 

390 

391class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

392 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

393 

394 def __init__( 

395 self, 

396 plugin_metadata: DebputyPluginMetadata, 

397 plugin_source_id: str, 

398 maintscript_container: dict[str, list[RegisteredMaintscript]], 

399 ): 

400 self._plugin_metadata = plugin_metadata 

401 self._plugin_source_id = plugin_source_id 

402 self._maintscript_container = maintscript_container 

403 

404 @classmethod 

405 def _apply_condition_to_script( 

406 cls, condition: str, run_snippet: str, /, indent: bool | None = None 

407 ) -> str: 

408 return run_snippet 

409 

410 def _append_script( 

411 self, 

412 caller_name: str, 

413 maintscript: Maintscript, 

414 full_script: str, 

415 /, 

416 perform_substitution: bool = True, 

417 ) -> None: 

418 if self._plugin_source_id not in self._maintscript_container: 

419 self._maintscript_container[self._plugin_source_id] = [] 

420 self._maintscript_container[self._plugin_source_id].append( 

421 RegisteredMaintscript( 

422 maintscript, 

423 caller_name, 

424 full_script, 

425 perform_substitution, 

426 ) 

427 ) 

428 

429 

430class RegisteredMetadataImpl(RegisteredMetadata): 

431 __slots__ = ( 

432 "_substvars", 

433 "_triggers", 

434 "_maintscripts", 

435 ) 

436 

437 def __init__( 

438 self, 

439 substvars: Substvars, 

440 triggers: list[RegisteredTrigger], 

441 maintscripts: list[RegisteredMaintscript], 

442 ) -> None: 

443 self._substvars = substvars 

444 self._triggers = triggers 

445 self._maintscripts = maintscripts 

446 

447 @property 

448 def substvars(self) -> Substvars: 

449 return self._substvars 

450 

451 @property 

452 def triggers(self) -> list[RegisteredTrigger]: 

453 return self._triggers 

454 

455 def maintscripts( 

456 self, 

457 *, 

458 maintscript: Maintscript | None = None, 

459 ) -> list[RegisteredMaintscript]: 

460 if maintscript is None: 

461 return self._maintscripts 

462 return [m for m in self._maintscripts if m.maintscript == maintscript] 

463 

464 

465class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

466 __slots__ = ("_maintscript_container",) 

467 

468 def __init__( 

469 self, 

470 plugin_metadata: DebputyPluginMetadata, 

471 plugin_source_id: str, 

472 context: PackageProcessingContext, 

473 ) -> None: 

474 super().__init__( 

475 plugin_metadata, 

476 plugin_source_id, 

477 context, 

478 {}, 

479 FlushableSubstvars(), 

480 (None, None), 

481 ) 

482 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

483 

484 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

485 return MaintscriptAccessorTestProvider( 

486 self._plugin_metadata, 

487 self._plugin_source_id, 

488 self._maintscript_container, 

489 ) 

490 

491 def registered_metadata(self) -> RegisteredMetadata: 

492 return RegisteredMetadataImpl( 

493 self._substvars, 

494 [ 

495 RegisteredTrigger.from_plugin_provided_trigger(t) 

496 for t in self._triggers.values() 

497 if t.provider_source_id == self._plugin_source_id 

498 ], 

499 self._maintscript_container.get(self._plugin_source_id, []), 

500 ) 

501 

502 

503class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

504 __slots__ = ("_service_manager_details", "_service_definitions") 

505 

506 def __init__( 

507 self, 

508 service_manager_details: ServiceManagerDetails, 

509 detected_services: list[DetectedService[DSD]], 

510 ) -> None: 

511 self._service_manager_details = service_manager_details 

512 self._service_definitions = detected_services 

513 

514 def register_service( 

515 self, 

516 path: VirtualPath, 

517 name: str | list[str], 

518 *, 

519 type_of_service: str = "service", # "timer", etc. 

520 service_scope: str = "system", 

521 enable_by_default: bool = True, 

522 start_by_default: bool = True, 

523 default_upgrade_rule: ServiceUpgradeRule = "restart", 

524 service_context: DSD | None = None, 

525 ) -> None: 

526 names = name if isinstance(name, list) else [name] 

527 if len(names) < 1: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 raise ValueError( 

529 f"The service must have at least one name - {path.absolute} did not have any" 

530 ) 

531 self._service_definitions.append( 

532 DetectedService( 

533 path, 

534 names, 

535 type_of_service, 

536 service_scope, 

537 enable_by_default, 

538 start_by_default, 

539 default_upgrade_rule, 

540 service_context, 

541 ) 

542 ) 

543 

544 

545@contextlib.contextmanager 

546def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

547 if fs_root.is_read_write: 547 ↛ 553line 547 didn't jump to line 553 because the condition on line 547 was always true

548 assert isinstance(fs_root, InMemoryVirtualRootDir) 

549 fs_root.is_read_write = False 

550 yield fs_root 

551 fs_root.is_read_write = True 

552 else: 

553 yield fs_root 

554 

555 

556class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

557 def __init__( 

558 self, 

559 plugin_name: str, 

560 feature_set: PluginProvidedFeatureSet, 

561 substitution: SubstitutionImpl, 

562 ) -> None: 

563 self._feature_set = feature_set 

564 self._plugin_name = plugin_name 

565 self._packager_provided_files: None | ( 

566 dict[str, RegisteredPackagerProvidedFile] 

567 ) = None 

568 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

569 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

570 self._substitution = substitution 

571 assert plugin_name in self._feature_set.plugin_data 

572 

573 @property 

574 def _plugin_metadata(self) -> DebputyPluginMetadata: 

575 return self._feature_set.plugin_data[self._plugin_name] 

576 

577 def packager_provided_files_by_stem( 

578 self, 

579 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

580 ppf = self._packager_provided_files 

581 if ppf is None: 

582 result: dict[str, RegisteredPackagerProvidedFile] = {} 

583 for spec in self._feature_set.packager_provided_files.values(): 

584 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

585 continue 

586 # Registered as a virtual subclass, so this should always be True 

587 assert isinstance(spec, RegisteredPackagerProvidedFile) 

588 result[spec.stem] = spec 

589 self._packager_provided_files = result 

590 ppf = result 

591 return ppf 

592 

593 def run_metadata_detector( 

594 self, 

595 metadata_detector_id: str, 

596 fs_root: VirtualPath, 

597 context: PackageProcessingContext | None = None, 

598 ) -> RegisteredMetadata: 

599 if not fs_root.is_root_dir(): 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 raise ValueError("Provided path must be the file system root.") 

601 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

602 matching_detectors = [ 

603 d for d in detectors if d.detector_id == metadata_detector_id 

604 ] 

605 if len(matching_detectors) != 1: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true

606 assert not matching_detectors 

607 raise ValueError( 

608 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

609 f' "{metadata_detector_id}"' 

610 ) 

611 if context is None: 

612 context = package_metadata_context() 

613 detector = matching_detectors[0] 

614 if not detector.applies_to(context.binary_package): 

615 raise ValueError( 

616 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

617 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

618 " with the correct specification. As an example: " 

619 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

620 " package." 

621 ) 

622 

623 ctrl = BinaryCtrlAccessorTestProvider( 

624 self._plugin_metadata, 

625 metadata_detector_id, 

626 context, 

627 ) 

628 with _read_only_fs_root(fs_root) as ro_root: 

629 detector.run_detector( 

630 ro_root, 

631 ctrl, 

632 context, 

633 ) 

634 return ctrl.registered_metadata() 

635 

636 def run_package_processor( 

637 self, 

638 package_processor_id: str, 

639 fs_root: VirtualPath, 

640 context: PackageProcessingContext | None = None, 

641 ) -> None: 

642 if not fs_root.is_root_dir(): 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 raise ValueError("Provided path must be the file system root.") 

644 pp_key = (self._plugin_name, package_processor_id) 

645 package_processor = self._feature_set.all_package_processors.get(pp_key) 

646 if package_processor is None: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 raise ValueError( 

648 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

649 f' "{package_processor_id}"' 

650 ) 

651 if context is None: 651 ↛ 653line 651 didn't jump to line 653 because the condition on line 651 was always true

652 context = package_metadata_context() 

653 if not fs_root.is_read_write: 653 ↛ 654line 653 didn't jump to line 654 because the condition on line 653 was never true

654 raise ValueError( 

655 "The provided fs_root is read-only and it must be read-write for package processor" 

656 ) 

657 if not package_processor.applies_to(context.binary_package): 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 raise ValueError( 

659 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

660 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

661 " package with the correct specification. As an example: " 

662 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

663 " package." 

664 ) 

665 package_processor.run_package_processor( 

666 fs_root, 

667 None, 

668 context, 

669 ) 

670 

671 @property 

672 def declared_manifest_variables(self) -> frozenset[str]: 

673 return frozenset( 

674 { 

675 k 

676 for k, v in self._feature_set.manifest_variables.items() 

677 if v.plugin_metadata.plugin_name == self._plugin_name 

678 } 

679 ) 

680 

681 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

682 issues = [] 

683 for adr in self._feature_set.auto_discard_rules.values(): 

684 if adr.plugin_metadata.plugin_name != self._plugin_name: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true

685 continue 

686 for idx, example in enumerate(adr.examples): 

687 result = process_discard_rule_example( 

688 adr, 

689 example, 

690 ) 

691 if result.inconsistent_paths: 

692 issues.append( 

693 ADRExampleIssue( 

694 adr.name, 

695 idx, 

696 [ 

697 x.absolute + ("/" if x.is_dir else "") 

698 for x in result.inconsistent_paths 

699 ], 

700 ) 

701 ) 

702 return issues 

703 

704 def run_service_detection_and_integrations( 

705 self, 

706 service_manager: str, 

707 fs_root: VirtualPath, 

708 context: PackageProcessingContext | None = None, 

709 *, 

710 service_context_type_hint: type[DSD] | None = None, 

711 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]: 

712 if not fs_root.is_root_dir(): 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true

713 raise ValueError("Provided path must be the file system root.") 

714 try: 

715 service_manager_details = self._feature_set.service_managers[ 

716 service_manager 

717 ] 

718 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true

719 raise KeyError(service_manager) 

720 except KeyError: 

721 raise ValueError( 

722 f"The plugin {self._plugin_name} does not provide a" 

723 f" service manager called {service_manager}" 

724 ) from None 

725 

726 if context is None: 726 ↛ 728line 726 didn't jump to line 728 because the condition on line 726 was always true

727 context = package_metadata_context() 

728 detected_services: list[DetectedService[DSD]] = [] 

729 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

730 service_manager_details.service_detector( 

731 fs_root, 

732 registry, 

733 context, 

734 ) 

735 ctrl = BinaryCtrlAccessorTestProvider( 

736 self._plugin_metadata, 

737 service_manager_details.service_manager, 

738 context, 

739 ) 

740 if detected_services: 

741 service_definitions = [ 

742 ServiceDefinitionImpl( 

743 ds.names[0], 

744 ds.names, 

745 ds.path, 

746 ds.type_of_service, 

747 ds.service_scope, 

748 ds.enable_by_default, 

749 ds.start_by_default, 

750 ds.default_upgrade_rule, 

751 self._plugin_name, 

752 True, 

753 ds.service_context, 

754 ) 

755 for ds in detected_services 

756 ] 

757 service_manager_details.service_integrator( 

758 service_definitions, 

759 ctrl, 

760 context, 

761 ) 

762 return detected_services, ctrl.registered_metadata() 

763 

764 def manifest_variables( 

765 self, 

766 *, 

767 resolution_context: VariableContext | None = None, 

768 mocked_variables: Mapping[str, str] | None = None, 

769 ) -> Mapping[str, str]: 

770 valid_manifest_variables = frozenset( 

771 { 

772 n 

773 for n, v in self._feature_set.manifest_variables.items() 

774 if v.plugin_metadata.plugin_name == self._plugin_name 

775 } 

776 ) 

777 if resolution_context is None: 

778 resolution_context = manifest_variable_resolution_context() 

779 substitution = self._substitution.copy_for_subst_test( 

780 self._feature_set, 

781 resolution_context, 

782 extra_substitutions=mocked_variables, 

783 ) 

784 return SubstitutionTable( 

785 valid_manifest_variables, 

786 substitution, 

787 ) 

788 

789 

790class SubstitutionTable(Mapping[str, str]): 

791 def __init__( 

792 self, valid_manifest_variables: frozenset[str], substitution: Substitution 

793 ) -> None: 

794 self._valid_manifest_variables = valid_manifest_variables 

795 self._resolved: set[str] = set() 

796 self._substitution = substitution 

797 

798 def __contains__(self, item: object) -> bool: 

799 return item in self._valid_manifest_variables 

800 

801 def __getitem__(self, key: str) -> str: 

802 if key not in self._valid_manifest_variables: 802 ↛ 803line 802 didn't jump to line 803 because the condition on line 802 was never true

803 raise KeyError(key) 

804 v = self._substitution.substitute( 

805 "{{" + key + "}}", f"test of manifest variable `{key}`" 

806 ) 

807 self._resolved.add(key) 

808 return v 

809 

810 def __len__(self) -> int: 

811 return len(self._valid_manifest_variables) 

812 

813 def __iter__(self) -> Iterator[str]: 

814 return iter(self._valid_manifest_variables) 

815 

816 def keys(self) -> KeysView[str]: 

817 return cast("KeysView[str]", self._valid_manifest_variables)