Coverage for src/debputy/plugin/api/test_api/test_impl.py: 81%

307 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-04 10:15 +0000

1import contextlib 

2import dataclasses 

3import inspect 

4import os.path 

5from collections.abc import Mapping, Sequence, Iterator, KeysView, Callable 

6from importlib.resources.abc import Traversable 

7from io import BytesIO 

8from pathlib import Path 

9from typing import ( 

10 cast, 

11 TYPE_CHECKING, 

12) 

13 

14from debian.deb822 import Deb822 

15from debian.debian_support import DpkgArchTable 

16from debian.substvars import Substvars 

17 

18from debputy import DEBPUTY_PLUGIN_ROOT_DIR 

19from debputy.architecture_support import faked_arch_table 

20from debputy.filesystem_scan import OSFSROOverlay, FSRootDir 

21from debputy.packages import BinaryPackage, SourcePackage 

22from debputy.plugin.api import ( 

23 PluginInitializationEntryPoint, 

24 VirtualPath, 

25 PackageProcessingContext, 

26 DpkgTriggerType, 

27 Maintscript, 

28) 

29from debputy.plugin.api.example_processing import process_discard_rule_example 

30from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

31from debputy.plugin.api.impl import ( 

32 plugin_metadata_for_debputys_own_plugin, 

33 DebputyPluginInitializerProvider, 

34 parse_json_plugin_desc, 

35 MaintscriptAccessorProviderBase, 

36 BinaryCtrlAccessorProviderBase, 

37 PLUGIN_TEST_SUFFIX, 

38 find_json_plugin, 

39 ServiceDefinitionImpl, 

40) 

41from debputy.plugin.api.impl_types import ( 

42 PackagerProvidedFileClassSpec, 

43 DebputyPluginMetadata, 

44 PluginProvidedTrigger, 

45 ServiceManagerDetails, 

46) 

47from debputy.plugin.api.spec import ( 

48 MaintscriptAccessor, 

49 FlushableSubstvars, 

50 ServiceRegistry, 

51 DSD, 

52 ServiceUpgradeRule, 

53) 

54from debputy.plugin.api.test_api.test_spec import ( 

55 InitializedPluginUnderTest, 

56 RegisteredPackagerProvidedFile, 

57 RegisteredTrigger, 

58 RegisteredMaintscript, 

59 DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS, 

60 ADRExampleIssue, 

61 DetectedService, 

62 RegisteredMetadata, 

63) 

64from debputy.plugins.debputy.debputy_plugin import initialize_debputy_features 

65from debputy.substitution import SubstitutionImpl, VariableContext, Substitution 

66from debputy.util import package_cross_check_precheck 

67 

68if TYPE_CHECKING: 

69 from debputy.highlevel_manifest import HighLevelManifest 

70 

71 

72RegisteredPackagerProvidedFile.register(PackagerProvidedFileClassSpec) 

73 

74 

75type ManifestConfigurationImplementation[T] = Callable[ 

76 [SourcePackage | BinaryPackage, type[T]], T 

77] 

78 

79 

80@dataclasses.dataclass(frozen=True, slots=True) 

81class PackageProcessingContextTestProvider(PackageProcessingContext): 

82 source_package: SourcePackage 

83 binary_package: BinaryPackage 

84 binary_package_version: str 

85 related_udeb_package: BinaryPackage | None 

86 related_udeb_package_version: str | None 

87 accessible_package_roots: Callable[[], Sequence[tuple[BinaryPackage, VirtualPath]]] 

88 manifest_configuration: ManifestConfigurationImplementation 

89 

90 # TODO: implement (when needed) 

91 # dpkg_arch_query_table 

92 # deb_options_and_profiles (pull from binary ?) 

93 # source_condition_context 

94 # condition_context 

95 

96 

97def _initialize_plugin_under_test( 

98 plugin_metadata: DebputyPluginMetadata, 

99 load_debputy_plugin: bool = True, 

100) -> "InitializedPluginUnderTest": 

101 feature_set = PluginProvidedFeatureSet() 

102 substitution = SubstitutionImpl( 

103 unresolvable_substitutions=frozenset(["SOURCE_DATE_EPOCH", "PACKAGE"]), 

104 variable_context=VariableContext( 

105 OSFSROOverlay.create_root_dir("debian", "debian"), 

106 ), 

107 plugin_feature_set=feature_set, 

108 ) 

109 

110 if load_debputy_plugin: 

111 debputy_plugin_metadata = plugin_metadata_for_debputys_own_plugin( 

112 initialize_debputy_features 

113 ) 

114 # Load debputy's own plugin first, so conflicts with debputy's plugin are detected early 

115 debputy_provider = DebputyPluginInitializerProvider( 

116 debputy_plugin_metadata, 

117 feature_set, 

118 substitution, 

119 ) 

120 debputy_provider.load_plugin() 

121 

122 plugin_under_test_provider = DebputyPluginInitializerProvider( 

123 plugin_metadata, 

124 feature_set, 

125 substitution, 

126 ) 

127 plugin_under_test_provider.load_plugin() 

128 

129 return InitializedPluginUnderTestImpl( 

130 plugin_metadata.plugin_name, 

131 feature_set, 

132 substitution, 

133 ) 

134 

135 

136def _auto_load_plugin_from_filename( 

137 py_test_filename: str, 

138) -> "InitializedPluginUnderTest": 

139 dirname, basename = os.path.split(py_test_filename) 

140 plugin_name = PLUGIN_TEST_SUFFIX.sub("", basename).replace("_", "-") 

141 

142 test_location = os.environ.get("DEBPUTY_TEST_PLUGIN_LOCATION", "uninstalled") 

143 if test_location == "uninstalled": 

144 json_basename = f"{plugin_name}.json" 

145 json_desc_file = os.path.join(dirname, json_basename) 

146 if "/" not in json_desc_file: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 json_desc_file = f"./{json_desc_file}" 

148 

149 if os.path.isfile(json_desc_file): 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true

150 return _initialize_plugin_from_desc(json_desc_file) 

151 

152 json_desc_file_in = f"{json_desc_file}.in" 

153 if os.path.isfile(json_desc_file_in): 

154 return _initialize_plugin_from_desc(json_desc_file) 

155 raise FileNotFoundError( 

156 f"Cannot determine the plugin JSON metadata descriptor: Expected it to be" 

157 f" {json_desc_file} or {json_desc_file_in}" 

158 ) 

159 

160 if test_location == "installed": 160 ↛ 164line 160 didn't jump to line 164 because the condition on line 160 was always true

161 plugin_metadata = find_json_plugin([str(DEBPUTY_PLUGIN_ROOT_DIR)], plugin_name) 

162 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

163 

164 raise ValueError( 

165 'Invalid or unsupported "DEBPUTY_TEST_PLUGIN_LOCATION" environment variable. It must be either' 

166 ' unset OR one of "installed", "uninstalled".' 

167 ) 

168 

169 

170def initialize_plugin_under_test( 

171 *, 

172 plugin_desc_file: str | None = None, 

173) -> "InitializedPluginUnderTest": 

174 """Load and initialize a plugin for testing it 

175 

176 This method will load the plugin via plugin description, which is the method that `debputy` does at 

177 run-time (in contrast to `initialize_plugin_under_test_preloaded`, which bypasses this concrete part 

178 of the flow). 

179 

180 :param plugin_desc_file: The plugin description file (`.json`) that describes how to load the plugin. 

181 If omitted, `debputy` will attempt to attempt the plugin description file based on the test itself. 

182 This works for "single-file" plugins, where the description file and the test are right next to 

183 each other. 

184 :return: The loaded plugin for testing 

185 """ 

186 if plugin_desc_file is None: 

187 caller_file = inspect.stack()[1].filename 

188 return _auto_load_plugin_from_filename(caller_file) 

189 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 raise RuntimeError( 

191 "Running the test against an installed plugin does not work when" 

192 " plugin_desc_file is provided. Please skip this test. You can " 

193 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

194 " conditional for this purpose." 

195 ) 

196 return _initialize_plugin_from_desc(plugin_desc_file) 

197 

198 

199def _initialize_plugin_from_desc( 

200 desc_file: str, 

201) -> "InitializedPluginUnderTest": 

202 if not desc_file.endswith((".json", ".json.in")): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 raise ValueError("The plugin file must end with .json or .json.in") 

204 

205 plugin_metadata = parse_json_plugin_desc(desc_file) 

206 

207 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

208 

209 

210def initialize_plugin_under_test_from_inline_json( 

211 plugin_name: str, 

212 json_content: str, 

213) -> "InitializedPluginUnderTest": 

214 with BytesIO(json_content.encode("utf-8")) as fd: 

215 plugin_metadata = parse_json_plugin_desc(plugin_name, fd=fd) 

216 

217 return _initialize_plugin_under_test(plugin_metadata, load_debputy_plugin=True) 

218 

219 

220def initialize_plugin_under_test_preloaded( 

221 api_compat_version: int, 

222 plugin_initializer: PluginInitializationEntryPoint, 

223 /, 

224 plugin_name: str = "plugin-under-test", 

225 load_debputy_plugin: bool = True, 

226 plugin_doc_path_resolver: Callable[[], Traversable | Path | None] = lambda: None, 

227) -> "InitializedPluginUnderTest": 

228 """Internal API: Initialize a plugin for testing without loading it from a file 

229 

230 This method by-passes the standard loading mechanism, meaning you will not test that your plugin 

231 description file is correct. Notably, any feature provided via the JSON description file will 

232 **NOT** be visible for the test. 

233 

234 This API is mostly useful for testing parts of debputy itself. 

235 

236 :param api_compat_version: The API version the plugin was written for. Use the same version as the 

237 version from the entry point (The `v1` part of `debputy.plugins.v1.initialize` translate into `1`). 

238 :param plugin_initializer: The entry point of the plugin 

239 :param plugin_name: Normally, debputy would derive this from the entry point. In the test, it will 

240 use a test name and version. However, you can explicitly set if you want the real name/version. 

241 :param load_debputy_plugin: Whether to load debputy's own plugin first. Doing so provides a more 

242 realistic test and enables the test to detect conflicts with debputy's own plugins (de facto making 

243 the plugin unloadable in practice if such a conflict is present). This option is mostly provided 

244 to enable debputy to use this method for self testing. 

245 :param plugin_doc_path_resolver: How to resolve the documentation (if relevant for the test). The 

246 default is to not load the documentation. 

247 :return: The loaded plugin for testing 

248 """ 

249 

250 if DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 raise RuntimeError( 

252 "Running the test against an installed plugin does not work when" 

253 " the plugin is preload. Please skip this test. You can " 

254 " import DEBPUTY_TEST_AGAINST_INSTALLED_PLUGINS and use that as" 

255 " conditional for this purpose." 

256 ) 

257 

258 plugin_metadata = DebputyPluginMetadata( 

259 plugin_name=plugin_name, 

260 api_compat_version=api_compat_version, 

261 plugin_initializer=plugin_initializer, 

262 plugin_loader=None, 

263 plugin_path="<loaded-via-test>", 

264 plugin_doc_path_resolver=plugin_doc_path_resolver, 

265 ) 

266 

267 return _initialize_plugin_under_test( 

268 plugin_metadata, 

269 load_debputy_plugin=load_debputy_plugin, 

270 ) 

271 

272 

273class _MockArchTable: 

274 @staticmethod 

275 def matches_architecture(_a: str, _b: str) -> bool: 

276 return True 

277 

278 

279FAKE_DPKG_QUERY_TABLE = cast(DpkgArchTable, _MockArchTable()) 

280del _MockArchTable 

281 

282 

283def package_metadata_context( 

284 *, 

285 host_arch: str = "amd64", 

286 package_fields: dict[str, str] | None = None, 

287 related_udeb_package_fields: dict[str, str] | None = None, 

288 binary_package_version: str = "1.0-1", 

289 related_udeb_package_version: str | None = None, 

290 should_be_acted_on: bool = True, 

291 related_udeb_fs_root: VirtualPath | None = None, 

292 accessible_package_roots: Sequence[tuple[Mapping[str, str], VirtualPath]] = tuple(), 

293 source_package_fields: dict[str, str] | None = None, 

294 manifest_configuration: ManifestConfigurationImplementation = lambda x, y: None, 

295) -> PackageProcessingContext: 

296 process_table = faked_arch_table(host_arch) 

297 f = { 

298 "Package": "foo", 

299 "Architecture": "any", 

300 } 

301 if package_fields is not None: 

302 f.update(package_fields) 

303 

304 bin_package = BinaryPackage( 

305 Deb822(f), 

306 process_table, 

307 FAKE_DPKG_QUERY_TABLE, 

308 is_main_package=True, 

309 should_be_acted_on=should_be_acted_on, 

310 ) 

311 udeb_package = None 

312 s = { 

313 "Source": bin_package.name, 

314 } 

315 if source_package_fields is not None: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 s.update(source_package_fields) 

317 source_package = SourcePackage(Deb822(s)) 

318 if related_udeb_package_fields is not None: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 uf = dict(related_udeb_package_fields) 

320 uf.setdefault("Package", f'{f["Package"]}-udeb') 

321 uf.setdefault("Architecture", f["Architecture"]) 

322 uf.setdefault("Package-Type", "udeb") 

323 udeb_package = BinaryPackage( 

324 Deb822(uf), 

325 process_table, 

326 FAKE_DPKG_QUERY_TABLE, 

327 is_main_package=False, 

328 should_be_acted_on=True, 

329 ) 

330 if related_udeb_package_version is None: 

331 related_udeb_package_version = binary_package_version 

332 if accessible_package_roots: 

333 apr = [] 

334 for fields, apr_fs_root in accessible_package_roots: 

335 apr_fields = Deb822(dict(fields)) 

336 if "Package" not in apr_fields: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 raise ValueError( 

338 "Missing mandatory Package field in member of accessible_package_roots" 

339 ) 

340 if "Architecture" not in apr_fields: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 raise ValueError( 

342 "Missing mandatory Architecture field in member of accessible_package_roots" 

343 ) 

344 apr_package = BinaryPackage( 

345 apr_fields, 

346 process_table, 

347 FAKE_DPKG_QUERY_TABLE, 

348 is_main_package=False, 

349 should_be_acted_on=True, 

350 ) 

351 r = package_cross_check_precheck(bin_package, apr_package) 

352 if not r[0]: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true

353 raise ValueError( 

354 f"{apr_package.name} would not be accessible for {bin_package.name}" 

355 ) 

356 apr.append((apr_package, apr_fs_root)) 

357 

358 if related_udeb_fs_root is not None: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 if udeb_package is None: 

360 raise ValueError( 

361 "related_udeb_package_fields must be given when related_udeb_fs_root is given" 

362 ) 

363 r = package_cross_check_precheck(bin_package, udeb_package) 

364 if not r[0]: 

365 raise ValueError( 

366 f"{udeb_package.name} would not be accessible for {bin_package.name}, so providing" 

367 " related_udeb_fs_root is irrelevant" 

368 ) 

369 apr.append((udeb_package, related_udeb_fs_root)) 

370 final_apr = tuple(apr) 

371 else: 

372 final_apr = tuple() 

373 

374 return PackageProcessingContextTestProvider( 

375 source_package=source_package, 

376 binary_package=bin_package, 

377 related_udeb_package=udeb_package, 

378 binary_package_version=binary_package_version, 

379 related_udeb_package_version=related_udeb_package_version, 

380 accessible_package_roots=lambda: final_apr, 

381 manifest_configuration=manifest_configuration, 

382 ) 

383 

384 

385def manifest_variable_resolution_context( 

386 *, 

387 debian_dir: VirtualPath | None = None, 

388) -> VariableContext: 

389 if debian_dir is None: 

390 debian_dir = FSRootDir() 

391 

392 return VariableContext(debian_dir) 

393 

394 

395class MaintscriptAccessorTestProvider(MaintscriptAccessorProviderBase): 

396 __slots__ = ("_plugin_metadata", "_plugin_source_id", "_maintscript_container") 

397 

398 def __init__( 

399 self, 

400 plugin_metadata: DebputyPluginMetadata, 

401 plugin_source_id: str, 

402 maintscript_container: dict[str, list[RegisteredMaintscript]], 

403 ): 

404 self._plugin_metadata = plugin_metadata 

405 self._plugin_source_id = plugin_source_id 

406 self._maintscript_container = maintscript_container 

407 

408 @classmethod 

409 def _apply_condition_to_script( 

410 cls, condition: str, run_snippet: str, /, indent: bool | None = None 

411 ) -> str: 

412 return run_snippet 

413 

414 def _append_script( 

415 self, 

416 caller_name: str, 

417 maintscript: Maintscript, 

418 full_script: str, 

419 /, 

420 perform_substitution: bool = True, 

421 ) -> None: 

422 if self._plugin_source_id not in self._maintscript_container: 

423 self._maintscript_container[self._plugin_source_id] = [] 

424 self._maintscript_container[self._plugin_source_id].append( 

425 RegisteredMaintscript( 

426 maintscript, 

427 caller_name, 

428 full_script, 

429 perform_substitution, 

430 ) 

431 ) 

432 

433 

434class RegisteredMetadataImpl(RegisteredMetadata): 

435 __slots__ = ( 

436 "_substvars", 

437 "_triggers", 

438 "_maintscripts", 

439 ) 

440 

441 def __init__( 

442 self, 

443 substvars: Substvars, 

444 triggers: list[RegisteredTrigger], 

445 maintscripts: list[RegisteredMaintscript], 

446 ) -> None: 

447 self._substvars = substvars 

448 self._triggers = triggers 

449 self._maintscripts = maintscripts 

450 

451 @property 

452 def substvars(self) -> Substvars: 

453 return self._substvars 

454 

455 @property 

456 def triggers(self) -> list[RegisteredTrigger]: 

457 return self._triggers 

458 

459 def maintscripts( 

460 self, 

461 *, 

462 maintscript: Maintscript | None = None, 

463 ) -> list[RegisteredMaintscript]: 

464 if maintscript is None: 

465 return self._maintscripts 

466 return [m for m in self._maintscripts if m.maintscript == maintscript] 

467 

468 

469class BinaryCtrlAccessorTestProvider(BinaryCtrlAccessorProviderBase): 

470 __slots__ = ("_maintscript_container",) 

471 

472 def __init__( 

473 self, 

474 plugin_metadata: DebputyPluginMetadata, 

475 plugin_source_id: str, 

476 context: PackageProcessingContext, 

477 ) -> None: 

478 super().__init__( 

479 plugin_metadata, 

480 plugin_source_id, 

481 context, 

482 {}, 

483 FlushableSubstvars(), 

484 (None, None), 

485 ) 

486 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

487 

488 def _create_maintscript_accessor(self) -> MaintscriptAccessor: 

489 return MaintscriptAccessorTestProvider( 

490 self._plugin_metadata, 

491 self._plugin_source_id, 

492 self._maintscript_container, 

493 ) 

494 

495 def registered_metadata(self) -> RegisteredMetadata: 

496 return RegisteredMetadataImpl( 

497 self._substvars, 

498 [ 

499 RegisteredTrigger.from_plugin_provided_trigger(t) 

500 for t in self._triggers.values() 

501 if t.provider_source_id == self._plugin_source_id 

502 ], 

503 self._maintscript_container.get(self._plugin_source_id, []), 

504 ) 

505 

506 

507class ServiceRegistryTestImpl(ServiceRegistry[DSD]): 

508 __slots__ = ("_service_manager_details", "_service_definitions") 

509 

510 def __init__( 

511 self, 

512 service_manager_details: ServiceManagerDetails, 

513 detected_services: list[DetectedService[DSD]], 

514 ) -> None: 

515 self._service_manager_details = service_manager_details 

516 self._service_definitions = detected_services 

517 

518 def register_service( 

519 self, 

520 path: VirtualPath, 

521 name: str | list[str], 

522 *, 

523 type_of_service: str = "service", # "timer", etc. 

524 service_scope: str = "system", 

525 enable_by_default: bool = True, 

526 start_by_default: bool = True, 

527 default_upgrade_rule: ServiceUpgradeRule = "restart", 

528 service_context: DSD | None = None, 

529 ) -> None: 

530 names = name if isinstance(name, list) else [name] 

531 if len(names) < 1: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 raise ValueError( 

533 f"The service must have at least one name - {path.absolute} did not have any" 

534 ) 

535 self._service_definitions.append( 

536 DetectedService( 

537 path, 

538 names, 

539 type_of_service, 

540 service_scope, 

541 enable_by_default, 

542 start_by_default, 

543 default_upgrade_rule, 

544 service_context, 

545 ) 

546 ) 

547 

548 

549@contextlib.contextmanager 

550def _read_only_fs_root(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

551 if fs_root.is_read_write: 551 ↛ 557line 551 didn't jump to line 557 because the condition on line 551 was always true

552 assert isinstance(fs_root, FSRootDir) 

553 fs_root.is_read_write = False 

554 yield fs_root 

555 fs_root.is_read_write = True 

556 else: 

557 yield fs_root 

558 

559 

560class InitializedPluginUnderTestImpl(InitializedPluginUnderTest): 

561 def __init__( 

562 self, 

563 plugin_name: str, 

564 feature_set: PluginProvidedFeatureSet, 

565 substitution: SubstitutionImpl, 

566 ) -> None: 

567 self._feature_set = feature_set 

568 self._plugin_name = plugin_name 

569 self._packager_provided_files: None | ( 

570 dict[str, RegisteredPackagerProvidedFile] 

571 ) = None 

572 self._triggers: dict[tuple[DpkgTriggerType, str], PluginProvidedTrigger] = {} 

573 self._maintscript_container: dict[str, list[RegisteredMaintscript]] = {} 

574 self._substitution = substitution 

575 assert plugin_name in self._feature_set.plugin_data 

576 

577 @property 

578 def _plugin_metadata(self) -> DebputyPluginMetadata: 

579 return self._feature_set.plugin_data[self._plugin_name] 

580 

581 def packager_provided_files_by_stem( 

582 self, 

583 ) -> Mapping[str, RegisteredPackagerProvidedFile]: 

584 ppf = self._packager_provided_files 

585 if ppf is None: 

586 result: dict[str, RegisteredPackagerProvidedFile] = {} 

587 for spec in self._feature_set.packager_provided_files.values(): 

588 if spec.debputy_plugin_metadata.plugin_name != self._plugin_name: 

589 continue 

590 # Registered as a virtual subclass, so this should always be True 

591 assert isinstance(spec, RegisteredPackagerProvidedFile) 

592 result[spec.stem] = spec 

593 self._packager_provided_files = result 

594 ppf = result 

595 return ppf 

596 

597 def run_metadata_detector( 

598 self, 

599 metadata_detector_id: str, 

600 fs_root: VirtualPath, 

601 context: PackageProcessingContext | None = None, 

602 ) -> RegisteredMetadata: 

603 if fs_root.parent_dir is not None: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 raise ValueError("Provided path must be the file system root.") 

605 detectors = self._feature_set.metadata_maintscript_detectors[self._plugin_name] 

606 matching_detectors = [ 

607 d for d in detectors if d.detector_id == metadata_detector_id 

608 ] 

609 if len(matching_detectors) != 1: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 assert not matching_detectors 

611 raise ValueError( 

612 f"The plugin {self._plugin_name} did not provide a metadata detector with ID" 

613 f' "{metadata_detector_id}"' 

614 ) 

615 if context is None: 

616 context = package_metadata_context() 

617 detector = matching_detectors[0] 

618 if not detector.applies_to(context.binary_package): 

619 raise ValueError( 

620 f'The detector "{metadata_detector_id}" from {self._plugin_name} does not apply to the' 

621 " given package. Consider using `package_metadata_context()` to emulate a binary package" 

622 " with the correct specification. As an example: " 

623 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

624 " package." 

625 ) 

626 

627 ctrl = BinaryCtrlAccessorTestProvider( 

628 self._plugin_metadata, 

629 metadata_detector_id, 

630 context, 

631 ) 

632 with _read_only_fs_root(fs_root) as ro_root: 

633 detector.run_detector( 

634 ro_root, 

635 ctrl, 

636 context, 

637 ) 

638 return ctrl.registered_metadata() 

639 

640 def run_package_processor( 

641 self, 

642 package_processor_id: str, 

643 fs_root: VirtualPath, 

644 context: PackageProcessingContext | None = None, 

645 ) -> None: 

646 if fs_root.parent_dir is not None: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 raise ValueError("Provided path must be the file system root.") 

648 pp_key = (self._plugin_name, package_processor_id) 

649 package_processor = self._feature_set.all_package_processors.get(pp_key) 

650 if package_processor is None: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 raise ValueError( 

652 f"The plugin {self._plugin_name} did not provide a package processor with ID" 

653 f' "{package_processor_id}"' 

654 ) 

655 if context is None: 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true

656 context = package_metadata_context() 

657 if not fs_root.is_read_write: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 raise ValueError( 

659 "The provided fs_root is read-only and it must be read-write for package processor" 

660 ) 

661 if not package_processor.applies_to(context.binary_package): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true

662 raise ValueError( 

663 f'The package processor "{package_processor_id}" from {self._plugin_name} does not apply' 

664 " to the given package. Consider using `package_metadata_context()` to emulate a binary" 

665 " package with the correct specification. As an example: " 

666 '`package_metadata_context(package_fields={"Package-Type": "udeb"})` would emulate a udeb' 

667 " package." 

668 ) 

669 package_processor.run_package_processor( 

670 fs_root, 

671 None, 

672 context, 

673 ) 

674 

675 @property 

676 def declared_manifest_variables(self) -> frozenset[str]: 

677 return frozenset( 

678 { 

679 k 

680 for k, v in self._feature_set.manifest_variables.items() 

681 if v.plugin_metadata.plugin_name == self._plugin_name 

682 } 

683 ) 

684 

685 def automatic_discard_rules_examples_with_issues(self) -> Sequence[ADRExampleIssue]: 

686 issues = [] 

687 for adr in self._feature_set.auto_discard_rules.values(): 

688 if adr.plugin_metadata.plugin_name != self._plugin_name: 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true

689 continue 

690 for idx, example in enumerate(adr.examples): 

691 result = process_discard_rule_example( 

692 adr, 

693 example, 

694 ) 

695 if result.inconsistent_paths: 

696 issues.append( 

697 ADRExampleIssue( 

698 adr.name, 

699 idx, 

700 [ 

701 x.absolute + ("/" if x.is_dir else "") 

702 for x in result.inconsistent_paths 

703 ], 

704 ) 

705 ) 

706 return issues 

707 

708 def run_service_detection_and_integrations( 

709 self, 

710 service_manager: str, 

711 fs_root: VirtualPath, 

712 context: PackageProcessingContext | None = None, 

713 *, 

714 service_context_type_hint: type[DSD] | None = None, 

715 ) -> tuple[list[DetectedService[DSD]], RegisteredMetadata]: 

716 if fs_root.parent_dir is not None: 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true

717 raise ValueError("Provided path must be the file system root.") 

718 try: 

719 service_manager_details = self._feature_set.service_managers[ 

720 service_manager 

721 ] 

722 if service_manager_details.plugin_metadata.plugin_name != self._plugin_name: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true

723 raise KeyError(service_manager) 

724 except KeyError: 

725 raise ValueError( 

726 f"The plugin {self._plugin_name} does not provide a" 

727 f" service manager called {service_manager}" 

728 ) from None 

729 

730 if context is None: 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true

731 context = package_metadata_context() 

732 detected_services: list[DetectedService[DSD]] = [] 

733 registry = ServiceRegistryTestImpl(service_manager_details, detected_services) 

734 service_manager_details.service_detector( 

735 fs_root, 

736 registry, 

737 context, 

738 ) 

739 ctrl = BinaryCtrlAccessorTestProvider( 

740 self._plugin_metadata, 

741 service_manager_details.service_manager, 

742 context, 

743 ) 

744 if detected_services: 

745 service_definitions = [ 

746 ServiceDefinitionImpl( 

747 ds.names[0], 

748 ds.names, 

749 ds.path, 

750 ds.type_of_service, 

751 ds.service_scope, 

752 ds.enable_by_default, 

753 ds.start_by_default, 

754 ds.default_upgrade_rule, 

755 self._plugin_name, 

756 True, 

757 ds.service_context, 

758 ) 

759 for ds in detected_services 

760 ] 

761 service_manager_details.service_integrator( 

762 service_definitions, 

763 ctrl, 

764 context, 

765 ) 

766 return detected_services, ctrl.registered_metadata() 

767 

768 def manifest_variables( 

769 self, 

770 *, 

771 resolution_context: VariableContext | None = None, 

772 mocked_variables: Mapping[str, str] | None = None, 

773 ) -> Mapping[str, str]: 

774 valid_manifest_variables = frozenset( 

775 { 

776 n 

777 for n, v in self._feature_set.manifest_variables.items() 

778 if v.plugin_metadata.plugin_name == self._plugin_name 

779 } 

780 ) 

781 if resolution_context is None: 

782 resolution_context = manifest_variable_resolution_context() 

783 substitution = self._substitution.copy_for_subst_test( 

784 self._feature_set, 

785 resolution_context, 

786 extra_substitutions=mocked_variables, 

787 ) 

788 return SubstitutionTable( 

789 valid_manifest_variables, 

790 substitution, 

791 ) 

792 

793 

794class SubstitutionTable(Mapping[str, str]): 

795 def __init__( 

796 self, valid_manifest_variables: frozenset[str], substitution: Substitution 

797 ) -> None: 

798 self._valid_manifest_variables = valid_manifest_variables 

799 self._resolved: set[str] = set() 

800 self._substitution = substitution 

801 

802 def __contains__(self, item: object) -> bool: 

803 return item in self._valid_manifest_variables 

804 

805 def __getitem__(self, key: str) -> str: 

806 if key not in self._valid_manifest_variables: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true

807 raise KeyError(key) 

808 v = self._substitution.substitute( 

809 "{{" + key + "}}", f"test of manifest variable `{key}`" 

810 ) 

811 self._resolved.add(key) 

812 return v 

813 

814 def __len__(self) -> int: 

815 return len(self._valid_manifest_variables) 

816 

817 def __iter__(self) -> Iterator[str]: 

818 return iter(self._valid_manifest_variables) 

819 

820 def keys(self) -> KeysView[str]: 

821 return cast("KeysView[str]", self._valid_manifest_variables)