Coverage for src/debputy/highlevel_manifest.py: 65%

900 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-28 21:56 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5import typing 

6from contextlib import suppress 

7from dataclasses import dataclass, field 

8from typing import ( 

9 Any, 

10 TypeVar, 

11 Generic, 

12 cast, 

13) 

14from collections.abc import Iterable, Mapping, Sequence, Callable 

15 

16from debian.debian_support import DpkgArchTable 

17 

18from debputy.dh.debhelper_emulation import ( 

19 dhe_dbgsym_root_dir, 

20 assert_no_dbgsym_migration, 

21 read_dbgsym_file, 

22) 

23from ._deb_options_profiles import DebBuildOptionsAndProfiles 

24from ._manifest_constants import * 

25from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

26from .builtin_manifest_rules import builtin_mode_normalization_rules 

27from .exceptions import ( 

28 DebputySubstitutionError, 

29 DebputyRuntimeErrorWithPreamble, 

30) 

31from .filesystem_scan import ( 

32 InMemoryVirtualRootDir, 

33 OSFSROOverlay, 

34 FSControlRootDir, 

35 InMemoryVirtualPathBase, 

36) 

37from .installations import ( 

38 InstallRule, 

39 SourcePathMatcher, 

40 PathAlreadyInstalledOrDiscardedError, 

41 NoMatchForInstallPatternError, 

42 InstallRuleContext, 

43 BinaryPackageInstallRuleContext, 

44 InstallSearchDirContext, 

45 SearchDir, 

46) 

47from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

48from .maintscript_snippet import ( 

49 DpkgMaintscriptHelperCommand, 

50 MaintscriptSnippetContainer, 

51) 

52from .manifest_conditions import ConditionContext 

53from .manifest_parser.base_types import ( 

54 FileSystemMatchRule, 

55 FileSystemExactMatchRule, 

56 BuildEnvironments, 

57) 

58from .manifest_parser.util import AttributePath 

59from .packager_provided_files import PackagerProvidedFile 

60from .packages import BinaryPackage, SourcePackage 

61from .plugin.api.feature_set import PluginProvidedFeatureSet 

62from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

63from .plugin.api.impl_types import ( 

64 PackageProcessingContextProvider, 

65 PackageDataTable, 

66) 

67from .plugin.api.spec import ( 

68 FlushableSubstvars, 

69 VirtualPath, 

70 DebputyIntegrationMode, 

71 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

72 INTEGRATION_MODE_FULL, 

73) 

74from debputy.plugins.debputy.binary_package_rules import ServiceRule 

75from debputy.plugins.debputy.build_system_rules import BuildRule 

76from .plugin.plugin_state import run_in_context_of_plugin 

77from .substitution import Substitution 

78from .transformation_rules import ( 

79 TransformationRule, 

80 ModeNormalizationTransformationRule, 

81 NormalizeShebangLineTransformation, 

82) 

83from .util import ( 

84 _error, 

85 _warn, 

86 debian_policy_normalize_symlink_target, 

87 generated_content_dir, 

88 _info, 

89) 

90from .yaml import MANIFEST_YAML 

91from .yaml.compat import CommentedMap, CommentedSeq 

92 

93 

94def tar_path(p: VirtualPath) -> str: 

95 path = p.path 

96 if p.is_dir: 

97 return path + "/" 

98 return path 

99 

100 

101def tar_owner_info(path: InMemoryVirtualPathBase) -> tuple[str, int, str, int]: 

102 owner = path._owner # noqa 

103 group = path._group # noqa 

104 return ( 

105 owner.entity_name, 

106 owner.entity_id, 

107 group.entity_name, 

108 group.entity_id, 

109 ) 

110 

111 

112class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

113 

114 @property 

115 def unmatched_paths(self) -> Sequence[VirtualPath]: 

116 return self.args[1] 

117 

118 @property 

119 def search_dir(self) -> VirtualPath: 

120 return self.args[2] 

121 

122 def render_preamble(self) -> None: 

123 _warn( 

124 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

125 ) 

126 _warn("") 

127 for entry in self.unmatched_paths: 

128 desc = _describe_missing_path(entry) 

129 _warn(f" * {desc}") 

130 _warn("") 

131 

132 

133@dataclass(slots=True) 

134class DbgsymInfo: 

135 binary_package: BinaryPackage 

136 dbgsym_fs_root: InMemoryVirtualPathBase 

137 _dbgsym_root_fs: str | None 

138 dbgsym_ids: list[str] 

139 run_dwz: bool 

140 

141 @property 

142 def dbgsym_root_dir(self) -> str: 

143 root_dir = self._dbgsym_root_fs 

144 if root_dir is None: 

145 root_dir = generated_content_dir( 

146 package=self.binary_package, 

147 subdir_key="dbgsym-fs-root", 

148 ) 

149 self._dbgsym_root_fs = root_dir 

150 return root_dir 

151 

152 @property 

153 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

154 return FSControlRootDir.create_root_dir( 

155 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

156 ) 

157 

158 

159@dataclass(slots=True, frozen=True) 

160class BinaryPackageData: 

161 source_package: SourcePackage 

162 binary_package: BinaryPackage 

163 binary_staging_root_dir: str 

164 fs_root: InMemoryVirtualPathBase 

165 substvars: FlushableSubstvars 

166 package_metadata_context: PackageProcessingContextProvider 

167 ctrl_creator: BinaryCtrlAccessorProviderCreator 

168 dbgsym_info: DbgsymInfo 

169 

170 @property 

171 def control_output_dir(self) -> FSControlRootDir: 

172 return FSControlRootDir.create_root_dir( 

173 generated_content_dir( 

174 package=self.binary_package, 

175 subdir_key="DEBIAN", 

176 ) 

177 ) 

178 

179 

180@dataclass(slots=True) 

181class PackageTransformationDefinition: 

182 binary_package: BinaryPackage 

183 substitution: Substitution 

184 is_auto_generated_package: bool 

185 binary_version: str | None = None 

186 search_dirs: list[FileSystemExactMatchRule] | None = None 

187 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field( 

188 default_factory=list 

189 ) 

190 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field( 

191 default_factory=dict 

192 ) 

193 transformations: list[TransformationRule] = field(default_factory=list) 

194 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field( 

195 default_factory=dict 

196 ) 

197 install_rules: list[InstallRule] = field(default_factory=list) 

198 requested_service_rules: list[ServiceRule] = field(default_factory=list) 

199 

200 

201def _path_to_tar_member( 

202 path: InMemoryVirtualPathBase, 

203 clamp_mtime_to: int, 

204) -> TarMember: 

205 mtime = float(clamp_mtime_to) 

206 owner, uid, group, gid = tar_owner_info(path) 

207 mode = path.mode 

208 

209 if path.has_fs_path: 

210 mtime = min(mtime, path.mtime) 

211 

212 if path.is_dir: 

213 path_type = PathType.DIRECTORY 

214 elif path.is_file: 

215 # TODO: someday we will need to deal with hardlinks and it might appear here. 

216 path_type = PathType.FILE 

217 elif path.is_symlink: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true

218 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

219 link_target = debian_policy_normalize_symlink_target( 

220 path.path, 

221 path.readlink(), 

222 ) 

223 return TarMember.virtual_path( 

224 tar_path(path), 

225 PathType.SYMLINK, 

226 mtime, 

227 link_target=link_target, 

228 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

229 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

230 mode=0o0777, 

231 owner=owner, 

232 uid=uid, 

233 group=group, 

234 gid=gid, 

235 ) 

236 else: 

237 assert not path.is_symlink 

238 raise AssertionError( 

239 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

240 ) 

241 

242 if not path.has_fs_path: 

243 assert not path.is_file 

244 return TarMember.virtual_path( 

245 tar_path(path), 

246 path_type, 

247 mtime, 

248 mode=mode, 

249 owner=owner, 

250 uid=uid, 

251 group=group, 

252 gid=gid, 

253 ) 

254 may_steal_fs_path = getattr(path, "_can_replace_inline", False) 

255 return TarMember.from_file( 

256 tar_path(path), 

257 path.fs_path, 

258 mode=mode, 

259 uid=uid, 

260 owner=owner, 

261 gid=gid, 

262 group=group, 

263 path_type=path_type, 

264 path_mtime=mtime, 

265 clamp_mtime_to=clamp_mtime_to, 

266 may_steal_fs_path=may_steal_fs_path, 

267 ) 

268 

269 

270def _generate_intermediate_manifest( 

271 fs_root: InMemoryVirtualPathBase, 

272 clamp_mtime_to: int, 

273) -> Iterable[TarMember]: 

274 symlinks = [] 

275 for path in fs_root.all_paths(): 

276 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

277 if tar_member.path_type == PathType.SYMLINK: 

278 symlinks.append(tar_member) 

279 continue 

280 yield tar_member 

281 yield from symlinks 

282 

283 

284ST = TypeVar("ST") 

285T = TypeVar("T") 

286 

287 

288class AbstractYAMLSubStore(Generic[ST]): 

289 def __init__( 

290 self, 

291 parent_store: Any, 

292 parent_key: int | str | None, 

293 store: ST | None = None, 

294 ) -> None: 

295 if parent_store is not None and parent_key is not None: 

296 try: 

297 from_parent_store = parent_store[parent_key] 

298 except (KeyError, IndexError): 

299 from_parent_store = None 

300 if ( 300 ↛ 305line 300 didn't jump to line 305 because the condition on line 300 was never true

301 store is not None 

302 and from_parent_store is not None 

303 and store is not parent_store 

304 ): 

305 raise ValueError( 

306 "Store is provided but is not the one already in the parent store" 

307 ) 

308 if store is None: 308 ↛ 310line 308 didn't jump to line 310 because the condition on line 308 was always true

309 store = from_parent_store 

310 self._parent_store = parent_store 

311 self._parent_key = parent_key 

312 self._is_detached = ( 

313 parent_key is None or parent_store is None or parent_key not in parent_store 

314 ) 

315 assert self._is_detached or store is not None 

316 if store is None: 

317 store = self._create_new_instance() 

318 self._store: ST = store 

319 

320 def _create_new_instance(self) -> ST: 

321 raise NotImplementedError 

322 

323 def create_definition_if_missing(self) -> None: 

324 if self._is_detached: 

325 self.create_definition() 

326 

327 def create_definition(self) -> None: 

328 if not self._is_detached: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 raise RuntimeError("Definition is already present") 

330 parent_store = self._parent_store 

331 if parent_store is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise RuntimeError( 

333 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

334 ) 

335 if isinstance(parent_store, list): 

336 assert self._parent_key is None 

337 self._parent_key = len(parent_store) 

338 self._parent_store.append(self._store) 

339 else: 

340 parent_store[self._parent_key] = self._store 

341 self._is_detached = False 

342 

343 def remove_definition(self) -> None: 

344 self._ensure_attached() 

345 del self._parent_store[self._parent_key] 

346 if isinstance(self._parent_store, list): 

347 self._parent_key = None 

348 self._is_detached = True 

349 

350 def _ensure_attached(self) -> None: 

351 if self._is_detached: 

352 raise RuntimeError("The definition has been removed!") 

353 

354 

355class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]): 

356 def _create_new_instance(self) -> list[T]: 

357 return CommentedSeq() 

358 

359 

360class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]): 

361 def _create_new_instance(self) -> dict[str, T]: 

362 return CommentedMap() 

363 

364 

365class MutableCondition: 

366 @classmethod 

367 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

368 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

369 

370 @classmethod 

371 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

372 return CommentedMap( 

373 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

374 ) 

375 

376 

377class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

378 @classmethod 

379 def new_symlink( 

380 cls, link_path: str, link_target: str, condition: Any | None 

381 ) -> "MutableYAMLSymlink": 

382 inner = { 

383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

384 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

385 } 

386 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

387 if condition is not None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 inner["when"] = condition 

389 return cls(None, None, store=CommentedMap(content)) 

390 

391 @property 

392 def symlink_path(self) -> str: 

393 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

394 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

395 ] 

396 

397 @symlink_path.setter 

398 def symlink_path(self, path: str) -> None: 

399 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

400 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

401 ] = path 

402 

403 @property 

404 def symlink_target(self) -> str | None: 

405 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

406 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

407 ] 

408 

409 @symlink_target.setter 

410 def symlink_target(self, target: str) -> None: 

411 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

412 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

413 ] = target 

414 

415 

416class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

417 @classmethod 

418 def rm_conffile( 

419 cls, 

420 conffile: str, 

421 prior_to_version: str | None, 

422 owning_package: str | None, 

423 ) -> "MutableYAMLConffileManagementItem": 

424 r = cls( 

425 None, 

426 None, 

427 store=CommentedMap( 

428 { 

429 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

430 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

431 ) 

432 } 

433 ), 

434 ) 

435 r.prior_to_version = prior_to_version 

436 r.owning_package = owning_package 

437 return r 

438 

439 @classmethod 

440 def mv_conffile( 

441 cls, 

442 old_conffile: str, 

443 new_conffile: str, 

444 prior_to_version: str | None, 

445 owning_package: str | None, 

446 ) -> "MutableYAMLConffileManagementItem": 

447 r = cls( 

448 None, 

449 None, 

450 store=CommentedMap( 

451 { 

452 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

453 { 

454 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

455 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

456 } 

457 ) 

458 } 

459 ), 

460 ) 

461 r.prior_to_version = prior_to_version 

462 r.owning_package = owning_package 

463 return r 

464 

465 @property 

466 def _container(self) -> dict[str, Any]: 

467 assert len(self._store) == 1 

468 return next(iter(self._store.values())) 

469 

470 @property 

471 def command(self) -> str: 

472 assert len(self._store) == 1 

473 return next(iter(self._store)) 

474 

475 @property 

476 def obsolete_conffile(self) -> str: 

477 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

478 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

479 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

481 

482 @obsolete_conffile.setter 

483 def obsolete_conffile(self, value: str) -> None: 

484 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

485 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

486 else: 

487 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

488 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

489 

490 @property 

491 def new_conffile(self) -> str: 

492 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

493 raise TypeError( 

494 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

495 f" This is a {self.command}" 

496 ) 

497 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

498 

499 @new_conffile.setter 

500 def new_conffile(self, value: str) -> None: 

501 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

502 raise TypeError( 

503 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

504 f" This is a {self.command}" 

505 ) 

506 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

507 

508 @property 

509 def prior_to_version(self) -> str | None: 

510 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

511 

512 @prior_to_version.setter 

513 def prior_to_version(self, value: str | None) -> None: 

514 if value is None: 

515 try: 

516 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

517 except KeyError: 

518 pass 

519 else: 

520 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

521 

522 @property 

523 def owning_package(self) -> str | None: 

524 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

525 

526 @owning_package.setter 

527 def owning_package(self, value: str | None) -> None: 

528 if value is None: 

529 try: 

530 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

531 except KeyError: 

532 pass 

533 else: 

534 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

535 

536 

537class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

538 def _list_store( 

539 self, key, *, create_if_absent: bool = False 

540 ) -> list[dict[str, Any]] | None: 

541 if self._is_detached or key not in self._store: 

542 if create_if_absent: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 return None 

544 self.create_definition_if_missing() 

545 self._store[key] = [] 

546 return self._store[key] 

547 

548 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

549 parent_store = self._list_store(key, create_if_absent=True) 

550 assert parent_store is not None 

551 if not item._is_detached or ( 551 ↛ 554line 551 didn't jump to line 554 because the condition on line 551 was never true

552 item._parent_store is not None and item._parent_store is not parent_store 

553 ): 

554 raise RuntimeError( 

555 "Item is already attached or associated with a different container" 

556 ) 

557 item._parent_store = parent_store 

558 item.create_definition() 

559 

560 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

561 self._insert_item(MK_TRANSFORMATIONS, symlink) 

562 

563 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

564 store = self._list_store(MK_TRANSFORMATIONS) 

565 if store is None: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 return 

567 for i in range(len(store)): 567 ↛ 568line 567 didn't jump to line 568 because the loop on line 567 never started

568 d = store[i] 

569 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

570 yield MutableYAMLSymlink(store, i) 

571 

572 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

573 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

574 if store is None: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 return 

576 yield from ( 

577 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

578 ) 

579 

580 def add_conffile_management( 

581 self, conffile_management_item: MutableYAMLConffileManagementItem 

582 ) -> None: 

583 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

584 

585 

586class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

587 @property 

588 def _container(self) -> dict[str, Any]: 

589 assert len(self._store) == 1 

590 return next(iter(self._store.values())) 

591 

592 @property 

593 def into(self) -> list[str] | None: 

594 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

595 if v is None: 

596 return None 

597 if isinstance(v, str): 

598 return [v] 

599 return v 

600 

601 @into.setter 

602 def into(self, new_value: str | list[str] | None) -> None: 

603 if new_value is None: 603 ↛ 607line 603 didn't jump to line 607 because the condition on line 603 was always true

604 with suppress(KeyError): 

605 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

606 return 

607 if isinstance(new_value, str): 

608 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

609 return 

610 new_list = CommentedSeq(new_value) 

611 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

612 

613 @property 

614 def when(self) -> str | Mapping[str, Any] | None: 

615 return self._container[MK_CONDITION_WHEN] 

616 

617 @when.setter 

618 def when(self, new_value: str | Mapping[str, Any] | None) -> None: 

619 if new_value is None: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true

620 with suppress(KeyError): 

621 del self._container[MK_CONDITION_WHEN] 

622 return 

623 if isinstance(new_value, str): 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 self._container[MK_CONDITION_WHEN] = new_value 

625 return 

626 new_map = CommentedMap(new_value) 

627 self._container[MK_CONDITION_WHEN] = new_map 

628 

629 @classmethod 

630 def install_dest( 

631 cls, 

632 sources: str | list[str], 

633 into: str | list[str] | None, 

634 *, 

635 dest_dir: str | None = None, 

636 when: str | Mapping[str, Any] | None = None, 

637 ) -> "MutableYAMLInstallRuleInstall": 

638 k = MK_INSTALLATIONS_INSTALL_SOURCES 

639 if isinstance(sources, str): 

640 k = MK_INSTALLATIONS_INSTALL_SOURCE 

641 r = MutableYAMLInstallRuleInstall( 

642 None, 

643 None, 

644 store=CommentedMap( 

645 { 

646 MK_INSTALLATIONS_INSTALL: CommentedMap( 

647 { 

648 k: sources, 

649 } 

650 ) 

651 } 

652 ), 

653 ) 

654 r.dest_dir = dest_dir 

655 r.into = into 

656 if when is not None: 

657 r.when = when 

658 return r 

659 

660 @classmethod 

661 def multi_dest_install( 

662 cls, 

663 sources: str | list[str], 

664 dest_dirs: Sequence[str], 

665 into: str | list[str] | None, 

666 *, 

667 when: str | Mapping[str, Any] | None = None, 

668 ) -> "MutableYAMLInstallRuleInstall": 

669 k = MK_INSTALLATIONS_INSTALL_SOURCES 

670 if isinstance(sources, str): 670 ↛ 672line 670 didn't jump to line 672 because the condition on line 670 was always true

671 k = MK_INSTALLATIONS_INSTALL_SOURCE 

672 r = MutableYAMLInstallRuleInstall( 

673 None, 

674 None, 

675 store=CommentedMap( 

676 { 

677 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

678 { 

679 k: sources, 

680 "dest-dirs": dest_dirs, 

681 } 

682 ) 

683 } 

684 ), 

685 ) 

686 r.into = into 

687 if when is not None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true

688 r.when = when 

689 return r 

690 

691 @classmethod 

692 def install_as( 

693 cls, 

694 source: str, 

695 install_as: str, 

696 into: str | list[str] | None, 

697 when: str | Mapping[str, Any] | None = None, 

698 ) -> "MutableYAMLInstallRuleInstall": 

699 r = MutableYAMLInstallRuleInstall( 

700 None, 

701 None, 

702 store=CommentedMap( 

703 { 

704 MK_INSTALLATIONS_INSTALL: CommentedMap( 

705 { 

706 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

707 MK_INSTALLATIONS_INSTALL_AS: install_as, 

708 } 

709 ) 

710 } 

711 ), 

712 ) 

713 r.into = into 

714 if when is not None: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true

715 r.when = when 

716 return r 

717 

718 @classmethod 

719 def install_doc_as( 

720 cls, 

721 source: str, 

722 install_as: str, 

723 into: str | list[str] | None, 

724 when: str | Mapping[str, Any] | None = None, 

725 ) -> "MutableYAMLInstallRuleInstall": 

726 r = MutableYAMLInstallRuleInstall( 

727 None, 

728 None, 

729 store=CommentedMap( 

730 { 

731 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

732 { 

733 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

734 MK_INSTALLATIONS_INSTALL_AS: install_as, 

735 } 

736 ) 

737 } 

738 ), 

739 ) 

740 r.into = into 

741 if when is not None: 

742 r.when = when 

743 return r 

744 

745 @classmethod 

746 def install_docs( 

747 cls, 

748 sources: str | list[str], 

749 into: str | list[str] | None, 

750 *, 

751 dest_dir: str | None = None, 

752 when: str | Mapping[str, Any] | None = None, 

753 ) -> "MutableYAMLInstallRuleInstall": 

754 k = MK_INSTALLATIONS_INSTALL_SOURCES 

755 if isinstance(sources, str): 

756 k = MK_INSTALLATIONS_INSTALL_SOURCE 

757 r = MutableYAMLInstallRuleInstall( 

758 None, 

759 None, 

760 store=CommentedMap( 

761 { 

762 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

763 { 

764 k: sources, 

765 } 

766 ) 

767 } 

768 ), 

769 ) 

770 r.into = into 

771 r.dest_dir = dest_dir 

772 if when is not None: 

773 r.when = when 

774 return r 

775 

776 @classmethod 

777 def install_examples( 

778 cls, 

779 sources: str | list[str], 

780 into: str | list[str] | None, 

781 when: str | Mapping[str, Any] | None = None, 

782 ) -> "MutableYAMLInstallRuleInstallExamples": 

783 k = MK_INSTALLATIONS_INSTALL_SOURCES 

784 if isinstance(sources, str): 

785 k = MK_INSTALLATIONS_INSTALL_SOURCE 

786 r = MutableYAMLInstallRuleInstallExamples( 

787 None, 

788 None, 

789 store=CommentedMap( 

790 { 

791 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

792 { 

793 k: sources, 

794 } 

795 ) 

796 } 

797 ), 

798 ) 

799 r.into = into 

800 if when is not None: 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true

801 r.when = when 

802 return r 

803 

804 @classmethod 

805 def install_man( 

806 cls, 

807 sources: str | list[str], 

808 into: str | list[str] | None, 

809 language: str | None, 

810 when: str | Mapping[str, Any] | None = None, 

811 ) -> "MutableYAMLInstallRuleMan": 

812 k = MK_INSTALLATIONS_INSTALL_SOURCES 

813 if isinstance(sources, str): 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 k = MK_INSTALLATIONS_INSTALL_SOURCE 

815 r = MutableYAMLInstallRuleMan( 

816 None, 

817 None, 

818 store=CommentedMap( 

819 { 

820 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

821 { 

822 k: sources, 

823 } 

824 ) 

825 } 

826 ), 

827 ) 

828 r.language = language 

829 r.into = into 

830 if when is not None: 830 ↛ 831line 830 didn't jump to line 831 because the condition on line 830 was never true

831 r.when = when 

832 return r 

833 

834 @classmethod 

835 def discard( 

836 cls, 

837 sources: str | list[str], 

838 ) -> "MutableYAMLInstallRuleDiscard": 

839 return MutableYAMLInstallRuleDiscard( 

840 None, 

841 None, 

842 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

843 ) 

844 

845 

846class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

847 pass 

848 

849 

850class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

851 @property 

852 def language(self) -> str | None: 

853 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

854 

855 @language.setter 

856 def language(self, new_value: str | None) -> None: 

857 if new_value is not None: 

858 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

859 return 

860 with suppress(KeyError): 

861 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

862 

863 

864class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

865 pass 

866 

867 

868class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

869 @property 

870 def sources(self) -> list[str]: 

871 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

872 if isinstance(v, str): 

873 return [v] 

874 return v 

875 

876 @sources.setter 

877 def sources(self, new_value: str | list[str]) -> None: 

878 if isinstance(new_value, str): 

879 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

880 return 

881 new_list = CommentedSeq(new_value) 

882 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

883 

884 @property 

885 def dest_dir(self) -> str | None: 

886 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

887 

888 @dest_dir.setter 

889 def dest_dir(self, new_value: str | None) -> None: 

890 if new_value is not None and self.dest_as is not None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 raise ValueError( 

892 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

893 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

894 ) 

895 if new_value is not None: 

896 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

897 else: 

898 with suppress(KeyError): 

899 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

900 

901 @property 

902 def dest_as(self) -> str | None: 

903 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

904 

905 @dest_as.setter 

906 def dest_as(self, new_value: str | None) -> None: 

907 if new_value is not None: 

908 if self.dest_dir is not None: 

909 raise ValueError( 

910 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

911 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

912 ) 

913 

914 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

915 if isinstance(sources, list): 

916 if len(sources) != 1: 

917 raise ValueError( 

918 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

919 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

920 ) 

921 self.sources = sources[0] 

922 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

923 else: 

924 with suppress(KeyError): 

925 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

926 

927 

928class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

929 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

930 parent_store = self._store 

931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true

932 install_rule._parent_store is not None 

933 and install_rule._parent_store is not parent_store 

934 ): 

935 raise RuntimeError( 

936 "Item is already attached or associated with a different container" 

937 ) 

938 self.create_definition_if_missing() 

939 install_rule._parent_store = parent_store 

940 install_rule.create_definition() 

941 

942 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

943 parent_store = self._store 

944 for install_rule in install_rules: 

945 if not install_rule._is_detached or ( 945 ↛ 949line 945 didn't jump to line 949 because the condition on line 945 was never true

946 install_rule._parent_store is not None 

947 and install_rule._parent_store is not parent_store 

948 ): 

949 raise RuntimeError( 

950 "Item is already attached or associated with a different container" 

951 ) 

952 self.create_definition_if_missing() 

953 install_rule._parent_store = parent_store 

954 install_rule.create_definition() 

955 

956 

957class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

958 @property 

959 def variables(self) -> dict[str, Any]: 

960 return self._store 

961 

962 def __setitem__(self, key: str, value: Any) -> None: 

963 self._store[key] = value 

964 self.create_definition_if_missing() 

965 

966 

967class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

968 def manifest_variables( 

969 self, *, create_if_absent: bool = True 

970 ) -> MutableYAMLManifestVariables: 

971 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

972 if create_if_absent: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true

973 d.create_definition_if_missing() 

974 return d 

975 

976 

977class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]): 

978 def append(self, rule: str) -> None: 

979 self.create_definition_if_missing() 

980 self._store.append(rule) 

981 

982 def __len__(self) -> int: 

983 return len(self._store) 

984 

985 def extend(self, rules: Iterable[str]) -> None: 

986 it = iter(rules) 

987 try: 

988 first_rule = next(it) 

989 except StopIteration: 

990 return 

991 self.create_definition_if_missing() 

992 self._store.append(first_rule) 

993 self._store.extend(it) 

994 

995 

996class MutableYAMLManifest: 

997 def __init__(self, store: Any) -> None: 

998 self._store = store 

999 

1000 @classmethod 

1001 def empty_manifest(cls) -> "MutableYAMLManifest": 

1002 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

1003 

1004 @property 

1005 def manifest_version(self) -> str: 

1006 return self._store[MK_MANIFEST_VERSION] 

1007 

1008 @manifest_version.setter 

1009 def manifest_version(self, version: str) -> None: 

1010 if version not in SUPPORTED_MANIFEST_VERSIONS: 

1011 raise ValueError("Unsupported version") 

1012 self._store[MK_MANIFEST_VERSION] = version 

1013 

1014 def remove_during_clean( 

1015 self, 

1016 *, 

1017 create_if_absent: bool = True, 

1018 ) -> MutableYAMLRemoveDuringCleanDefinitions: 

1019 d = MutableYAMLRemoveDuringCleanDefinitions( 

1020 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN 

1021 ) 

1022 if create_if_absent: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true

1023 d.create_definition_if_missing() 

1024 return d 

1025 

1026 def installations( 

1027 self, 

1028 *, 

1029 create_if_absent: bool = True, 

1030 ) -> MutableYAMLInstallationsDefinition: 

1031 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

1032 if create_if_absent: 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true

1033 d.create_definition_if_missing() 

1034 return d 

1035 

1036 def manifest_definitions( 

1037 self, 

1038 *, 

1039 create_if_absent: bool = True, 

1040 ) -> MutableYAMLManifestDefinitions: 

1041 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

1042 if create_if_absent: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true

1043 d.create_definition_if_missing() 

1044 return d 

1045 

1046 def package( 

1047 self, name: str, *, create_if_absent: bool = True 

1048 ) -> MutableYAMLPackageDefinition: 

1049 if MK_PACKAGES not in self._store: 1049 ↛ 1051line 1049 didn't jump to line 1051 because the condition on line 1049 was always true

1050 self._store[MK_PACKAGES] = CommentedMap() 

1051 packages_store = self._store[MK_PACKAGES] 

1052 package = packages_store.get(name) 

1053 if package is None: 1053 ↛ 1060line 1053 didn't jump to line 1060 because the condition on line 1053 was always true

1054 if not create_if_absent: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 raise KeyError(name) 

1056 assert packages_store is not None 

1057 d = MutableYAMLPackageDefinition(packages_store, name) 

1058 d.create_definition() 

1059 else: 

1060 d = MutableYAMLPackageDefinition(packages_store, name) 

1061 return d 

1062 

1063 def write_to(self, fd) -> None: 

1064 MANIFEST_YAML.dump(self._store, fd) 

1065 

1066 

1067def _describe_missing_path(entry: VirtualPath) -> str: 

1068 if entry.is_dir: 

1069 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1070 if entry.is_symlink: 

1071 target = os.readlink(entry.fs_path) 

1072 return f"{entry.fs_path} (symlink; links to {target})" 

1073 if entry.is_file: 

1074 return f"{entry.fs_path} (file)" 

1075 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1076 

1077 

1078def _detect_missing_installations( 

1079 path_matcher: SourcePathMatcher, 

1080 search_dir: VirtualPath, 

1081) -> None: 

1082 if not search_dir.is_dir: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 return 

1084 missing = list(path_matcher.detect_missing(search_dir)) 

1085 if not missing: 

1086 return 

1087 

1088 excl = textwrap.dedent( 

1089 """\ 

1090 - discard: "*" 

1091 """ 

1092 ) 

1093 

1094 raise PathNotCoveredByInstallRulesError( 

1095 "Please review the list and add either install rules or exclusions to `installations` in" 

1096 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1097 f" end of your 'installations`:\n\n{excl}\n", 

1098 missing, 

1099 search_dir, 

1100 ) 

1101 

1102 

1103def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1104 used_discard_rules = path_matcher.used_auto_discard_rules 

1105 # Discard rules can match and then be overridden. In that case, they appear 

1106 # but have 0 matches. 

1107 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1108 return 

1109 _info("The following automatic discard rules were triggered:") 

1110 example_path: str | None = None 

1111 for rule in sorted(used_discard_rules): 

1112 for fs_path in sorted(used_discard_rules[rule]): 

1113 if example_path is None: 1113 ↛ 1115line 1113 didn't jump to line 1115 because the condition on line 1113 was always true

1114 example_path = fs_path 

1115 _info(f" * {rule} -> {fs_path}") 

1116 assert example_path is not None 

1117 _info("") 

1118 _info( 

1119 "Note that some of these may have been overruled. The overrule detection logic is not" 

1120 ) 

1121 _info("100% reliable.") 

1122 _info("") 

1123 _info( 

1124 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1125 ) 

1126 _info(" installations:") 

1127 _info(" - install:") 

1128 _info(f" source: {example_path}") 

1129 

1130 

1131def _install_everything_from_source_dir_if_present( 

1132 dctrl_bin: BinaryPackage, 

1133 substitution: Substitution, 

1134 path_matcher: SourcePathMatcher, 

1135 install_rule_context: InstallRuleContext, 

1136 source_condition_context: ConditionContext, 

1137 source_dir: VirtualPath, 

1138 *, 

1139 into_dir: VirtualPath | None = None, 

1140) -> None: 

1141 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1142 pkg_set = frozenset([dctrl_bin]) 

1143 install_rule = run_in_context_of_plugin( 

1144 "debputy", 

1145 InstallRule.install_dest, 

1146 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1147 None, 

1148 pkg_set, 

1149 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1150 None, 

1151 ) 

1152 pkg_search_dir: tuple[SearchDir] = ( 

1153 SearchDir( 

1154 source_dir, 

1155 pkg_set, 

1156 ), 

1157 ) 

1158 replacements = { 

1159 "search_dirs": pkg_search_dir, 

1160 } 

1161 if into_dir is not None: 1161 ↛ 1162line 1161 didn't jump to line 1162 because the condition on line 1161 was never true

1162 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1163 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1164 binary_package_contexts[dctrl_bin.name] = updated 

1165 replacements["binary_package_contexts"] = binary_package_contexts 

1166 

1167 fake_install_rule_context = install_rule_context.replace(**replacements) 

1168 try: 

1169 install_rule.perform_install( 

1170 path_matcher, 

1171 fake_install_rule_context, 

1172 source_condition_context, 

1173 ) 

1174 except ( 

1175 NoMatchForInstallPatternError, 

1176 PathAlreadyInstalledOrDiscardedError, 

1177 ): 

1178 # Empty directory or everything excluded by default; ignore the error 

1179 pass 

1180 

1181 

1182def _add_build_install_dirs_to_per_package_search_dirs( 

1183 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1184 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]], 

1185 as_path: Callable[[str], VirtualPath], 

1186) -> None: 

1187 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set() 

1188 for dest_dir, for_packages in build_system_install_dirs: 

1189 dest_path = as_path(dest_dir) 

1190 for pkg in for_packages: 

1191 seen_key = (pkg, dest_dir) 

1192 if seen_key in seen_pp_search_dirs: 

1193 continue 

1194 seen_pp_search_dirs.add(seen_key) 

1195 if pkg not in per_package_search_dirs: 

1196 per_package_search_dirs[pkg] = [dest_path] 

1197 else: 

1198 per_package_search_dirs[pkg].append(dest_path) 

1199 

1200 

1201class HighLevelManifest: 

1202 def __init__( 

1203 self, 

1204 manifest_path: str, 

1205 mutable_manifest: MutableYAMLManifest | None, 

1206 remove_during_clean_rules: list[FileSystemMatchRule], 

1207 install_rules: list[InstallRule] | None, 

1208 source_package: SourcePackage, 

1209 binary_packages: Mapping[str, BinaryPackage], 

1210 substitution: Substitution, 

1211 package_transformations: Mapping[str, PackageTransformationDefinition], 

1212 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1213 dpkg_arch_query_table: DpkgArchTable, 

1214 build_env: DebBuildOptionsAndProfiles, 

1215 build_environments: BuildEnvironments, 

1216 build_rules: list[BuildRule] | None, 

1217 value_table: Mapping[ 

1218 tuple[SourcePackage | BinaryPackage, type[Any]], 

1219 Any, 

1220 ], 

1221 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1222 debian_dir: VirtualPath, 

1223 ) -> None: 

1224 self.manifest_path = manifest_path 

1225 self.mutable_manifest = mutable_manifest 

1226 self._remove_during_clean_rules: list[FileSystemMatchRule] = ( 

1227 remove_during_clean_rules 

1228 ) 

1229 self._install_rules = install_rules 

1230 self.source_package = source_package 

1231 self._binary_packages = binary_packages 

1232 self.substitution = substitution 

1233 self.package_transformations = package_transformations 

1234 self._dpkg_architecture_variables = dpkg_architecture_variables 

1235 self.dpkg_arch_query_table = dpkg_arch_query_table 

1236 self._build_env = build_env 

1237 self._used_for: set[str] = set() 

1238 self.build_environments = build_environments 

1239 self.build_rules = build_rules 

1240 self._value_table = value_table 

1241 self._plugin_provided_feature_set = plugin_provided_feature_set 

1242 self._debian_dir = debian_dir 

1243 self._source_condition_context = ConditionContext( 

1244 binary_package=None, 

1245 substitution=self.substitution, 

1246 deb_options_and_profiles=self._build_env, 

1247 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1248 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1249 ) 

1250 

1251 def source_version(self, include_binnmu_version: bool = True) -> str: 

1252 # TODO: There should an easier way to determine the source version; really. 

1253 version_var = "{{DEB_VERSION}}" 

1254 if not include_binnmu_version: 

1255 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1256 try: 

1257 return self.substitution.substitute( 

1258 version_var, "internal (resolve version)" 

1259 ) 

1260 except DebputySubstitutionError as e: 

1261 raise AssertionError(f"Could not resolve {version_var}") from e 

1262 

1263 @property 

1264 def source_condition_context(self) -> ConditionContext: 

1265 return self._source_condition_context 

1266 

1267 @property 

1268 def debian_dir(self) -> VirtualPath: 

1269 return self._debian_dir 

1270 

1271 @property 

1272 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1273 return self._dpkg_architecture_variables 

1274 

1275 @property 

1276 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1277 return self._build_env 

1278 

1279 @property 

1280 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1281 return self._plugin_provided_feature_set 

1282 

1283 @property 

1284 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]: 

1285 return self._remove_during_clean_rules 

1286 

1287 @property 

1288 def active_packages(self) -> Iterable[BinaryPackage]: 

1289 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1290 

1291 @property 

1292 def all_packages(self) -> Iterable[BinaryPackage]: 

1293 yield from self._binary_packages.values() 

1294 

1295 def manifest_configuration[T]( 

1296 self, 

1297 context_package: SourcePackage | BinaryPackage, 

1298 value_type: type[T], 

1299 ) -> T | None: 

1300 res = self._value_table.get((context_package, value_type)) 

1301 return typing.cast("T | None", res) 

1302 

1303 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1304 return self.package_transformations[package] 

1305 

1306 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1307 name = package.name 

1308 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"): 

1309 doc_main_package_name = package.fields.get(doc_main_field) 

1310 if doc_main_package_name: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true

1311 main_package = self._binary_packages.get(doc_main_package_name) 

1312 if main_package is None: 

1313 _error( 

1314 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control" 

1315 ) 

1316 return main_package 

1317 # If it is not a -doc package, then docs should be installed 

1318 # under its own package name. 

1319 if not name.endswith("-doc"): 1319 ↛ 1321line 1319 didn't jump to line 1321 because the condition on line 1319 was always true

1320 return package 

1321 name = name[:-4] 

1322 main_package = self._binary_packages.get(name) 

1323 if main_package: 

1324 return main_package 

1325 if name.startswith("lib"): 

1326 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1327 if dev_pkg: 

1328 return dev_pkg 

1329 

1330 # If we found no better match; default to the doc package itself. 

1331 return package 

1332 

1333 def perform_installations( 

1334 self, 

1335 integration_mode: DebputyIntegrationMode, 

1336 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1337 *, 

1338 install_request_context: InstallSearchDirContext | None = None, 

1339 ) -> PackageDataTable: 

1340 package_data_dict = {} 

1341 package_data_table = PackageDataTable(package_data_dict) 

1342 enable_manifest_installation_feature = ( 

1343 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1344 ) 

1345 

1346 if build_system_install_dirs: 1346 ↛ 1347line 1346 didn't jump to line 1347 because the condition on line 1346 was never true

1347 if integration_mode != INTEGRATION_MODE_FULL: 

1348 raise ValueError( 

1349 "The build_system_install_dirs parameter can only be used in full integration mode" 

1350 ) 

1351 if install_request_context: 

1352 raise ValueError( 

1353 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1354 " (not implemented)" 

1355 ) 

1356 

1357 if install_request_context is None: 1357 ↛ 1359line 1357 didn't jump to line 1359 because the condition on line 1357 was never true

1358 

1359 @functools.lru_cache(None) 

1360 def _as_path(fs_path: str) -> VirtualPath: 

1361 return OSFSROOverlay.create_root_dir(".", fs_path) 

1362 

1363 dtmp_dir = _as_path("debian/tmp") 

1364 source_root_dir = _as_path(".") 

1365 into = frozenset(self._binary_packages.values()) 

1366 per_package_search_dirs = { 

1367 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1368 for t in self.package_transformations.values() 

1369 if t.search_dirs is not None 

1370 } 

1371 

1372 if integration_mode == INTEGRATION_MODE_FULL: 

1373 # In this mode, we have no default search dir. Everything ends up being 

1374 # per-package instead (since that is easier logic-wise). 

1375 # 

1376 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1377 # Note we still initialize dtmp_dir, since it affects a later guard. 

1378 default_search_dirs = [] 

1379 _add_build_install_dirs_to_per_package_search_dirs( 

1380 build_system_install_dirs, 

1381 per_package_search_dirs, 

1382 _as_path, 

1383 ) 

1384 

1385 # We can end here with per_package_search_dirs having no search dirs for any package 

1386 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1387 # 

1388 # This is not a problem in itself as the installation rules can still apply to the 

1389 # source root and there should be no reason to install something from d/<pkg> into 

1390 # d/<another-pkg> 

1391 else: 

1392 default_search_dirs = [dtmp_dir] 

1393 

1394 search_dirs = _determine_search_dir_order( 

1395 per_package_search_dirs, 

1396 into, 

1397 default_search_dirs, 

1398 source_root_dir, 

1399 ) 

1400 check_for_uninstalled_dirs = tuple( 

1401 s.search_dir 

1402 for s in search_dirs 

1403 if s.search_dir.fs_path != source_root_dir.fs_path 

1404 ) 

1405 if enable_manifest_installation_feature: 

1406 _present_installation_dirs( 

1407 search_dirs, check_for_uninstalled_dirs, into 

1408 ) 

1409 else: 

1410 dtmp_dir = None 

1411 search_dirs = install_request_context.search_dirs 

1412 into = frozenset(self._binary_packages.values()) 

1413 seen: set[BinaryPackage] = set() 

1414 for search_dir in search_dirs: 

1415 seen.update(search_dir.applies_to) 

1416 

1417 missing = into - seen 

1418 if missing: 1418 ↛ 1419line 1418 didn't jump to line 1419 because the condition on line 1418 was never true

1419 names = ", ".join(p.name for p in missing) 

1420 raise ValueError( 

1421 f"The following package(s) had no search dirs: {names}." 

1422 " (Generally, the source root would be applicable to all packages)" 

1423 ) 

1424 extra_names = seen - into 

1425 if extra_names: 1425 ↛ 1426line 1425 didn't jump to line 1426 because the condition on line 1425 was never true

1426 names = ", ".join(p.name for p in extra_names) 

1427 raise ValueError( 

1428 f"The install_request_context referenced the following unknown package(s): {names}" 

1429 ) 

1430 

1431 check_for_uninstalled_dirs = ( 

1432 install_request_context.check_for_uninstalled_dirs 

1433 ) 

1434 

1435 install_rule_context = InstallRuleContext(search_dirs) 

1436 

1437 if ( 1437 ↛ 1444line 1437 didn't jump to line 1444 because the condition on line 1437 was never true

1438 enable_manifest_installation_feature 

1439 and self._install_rules is None 

1440 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1441 and dtmp_dir is not None 

1442 and os.path.isdir(dtmp_dir.fs_path) 

1443 ): 

1444 msg = ( 

1445 "The build system appears to have provided the output of upstream build system's" 

1446 " install in debian/tmp. However, these are no provisions for debputy to install" 

1447 " any of that into any of the debian packages listed in debian/control." 

1448 " To avoid accidentally creating empty packages, debputy will insist that you " 

1449 " explicitly define an empty installation definition if you did not want to " 

1450 " install any of those files even though they have been provided." 

1451 ' Example: "installations: []"' 

1452 ) 

1453 _error(msg) 

1454 elif ( 1454 ↛ 1457line 1454 didn't jump to line 1457 because the condition on line 1454 was never true

1455 not enable_manifest_installation_feature and self._install_rules is not None 

1456 ): 

1457 _error( 

1458 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1459 f" Please remove or comment out the `installations` keyword." 

1460 ) 

1461 

1462 for dctrl_bin in self.all_packages: 

1463 package = dctrl_bin.name 

1464 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1465 

1466 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1467 dctrl_bin, 

1468 InMemoryVirtualRootDir(), 

1469 doc_main_package, 

1470 ) 

1471 

1472 if enable_manifest_installation_feature: 1472 ↛ 1477line 1472 didn't jump to line 1477 because the condition on line 1472 was always true

1473 discard_rules = list( 

1474 self.plugin_provided_feature_set.auto_discard_rules.values() 

1475 ) 

1476 else: 

1477 discard_rules = [ 

1478 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1479 ] 

1480 path_matcher = SourcePathMatcher(discard_rules) 

1481 

1482 source_condition_context = self._source_condition_context 

1483 

1484 for dctrl_bin in self.active_packages: 

1485 package = dctrl_bin.name 

1486 if install_request_context: 1486 ↛ 1491line 1486 didn't jump to line 1491 because the condition on line 1486 was always true

1487 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1488 package 

1489 ) 

1490 else: 

1491 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1492 if os.path.isdir(build_system_staging_dir_fs_path): 

1493 build_system_staging_dir = OSFSROOverlay.create_root_dir( 

1494 ".", 

1495 build_system_staging_dir_fs_path, 

1496 ) 

1497 else: 

1498 build_system_staging_dir = None 

1499 

1500 if build_system_staging_dir is not None: 

1501 _install_everything_from_source_dir_if_present( 

1502 dctrl_bin, 

1503 self.substitution, 

1504 path_matcher, 

1505 install_rule_context, 

1506 source_condition_context, 

1507 build_system_staging_dir, 

1508 ) 

1509 

1510 if self._install_rules: 

1511 # FIXME: Check that every install rule remains used after transformations have run. 

1512 # What we want to check is transformations do not exclude everything from an install 

1513 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1514 # match. 

1515 for install_rule in self._install_rules: 

1516 install_rule.perform_install( 

1517 path_matcher, 

1518 install_rule_context, 

1519 source_condition_context, 

1520 ) 

1521 

1522 if enable_manifest_installation_feature: 1522 ↛ 1526line 1522 didn't jump to line 1526 because the condition on line 1522 was always true

1523 for search_dir in check_for_uninstalled_dirs: 

1524 _detect_missing_installations(path_matcher, search_dir) 

1525 

1526 for dctrl_bin in self.all_packages: 

1527 package = dctrl_bin.name 

1528 binary_install_rule_context = install_rule_context[package] 

1529 build_system_pkg_staging_dir = os.path.join("debian", package) 

1530 fs_root = binary_install_rule_context.fs_root 

1531 

1532 context = self.package_transformations[package] 

1533 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1534 for special_install_rule in context.install_rules: 1534 ↛ 1535line 1534 didn't jump to line 1535 because the loop on line 1534 never started

1535 special_install_rule.perform_install( 

1536 path_matcher, 

1537 install_rule_context, 

1538 source_condition_context, 

1539 ) 

1540 

1541 if dctrl_bin.should_be_acted_on: 

1542 self.apply_fs_transformations(package, fs_root) 

1543 substvars_file = f"debian/{package}.substvars" 

1544 substvars = FlushableSubstvars.load_from_path( 

1545 substvars_file, missing_ok=True 

1546 ) 

1547 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1548 substvars.substvars_path = None 

1549 else: 

1550 substvars = FlushableSubstvars() 

1551 

1552 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1553 if udeb_package and not udeb_package.is_udeb: 1553 ↛ 1554line 1553 didn't jump to line 1554 because the condition on line 1553 was never true

1554 udeb_package = None 

1555 

1556 package_metadata_context = PackageProcessingContextProvider( 

1557 self, 

1558 dctrl_bin, 

1559 udeb_package, 

1560 package_data_table, 

1561 ) 

1562 

1563 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1564 package_metadata_context, 

1565 substvars, 

1566 context.maintscript_snippets, 

1567 context.substitution, 

1568 ) 

1569 

1570 if not enable_manifest_installation_feature: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true

1571 assert_no_dbgsym_migration(dctrl_bin) 

1572 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1573 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir( 

1574 "", 

1575 dh_dbgsym_root_fs, 

1576 ) 

1577 dbgsym_root_fs = InMemoryVirtualRootDir() 

1578 _install_everything_from_source_dir_if_present( 

1579 dctrl_bin, 

1580 self.substitution, 

1581 path_matcher, 

1582 install_rule_context, 

1583 source_condition_context, 

1584 dh_dbgsym_root_path, 

1585 into_dir=dbgsym_root_fs, 

1586 ) 

1587 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1588 dbgsym_info = DbgsymInfo( 

1589 dctrl_bin, 

1590 dbgsym_root_fs, 

1591 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1592 dbgsym_build_ids, 

1593 # TODO: Provide manifest feature to support this. 

1594 False, 

1595 ) 

1596 else: 

1597 dbgsym_info = DbgsymInfo( 

1598 dctrl_bin, 

1599 InMemoryVirtualRootDir(), 

1600 None, 

1601 [], 

1602 False, 

1603 ) 

1604 

1605 package_data_dict[package] = BinaryPackageData( 

1606 self.source_package, 

1607 dctrl_bin, 

1608 build_system_pkg_staging_dir, 

1609 fs_root, 

1610 substvars, 

1611 package_metadata_context, 

1612 ctrl_creator, 

1613 dbgsym_info, 

1614 ) 

1615 

1616 if enable_manifest_installation_feature: 1616 ↛ 1619line 1616 didn't jump to line 1619 because the condition on line 1616 was always true

1617 _list_automatic_discard_rules(path_matcher) 

1618 

1619 return package_data_table 

1620 

1621 def condition_context( 

1622 self, binary_package: BinaryPackage | str | None 

1623 ) -> ConditionContext: 

1624 if binary_package is None: 1624 ↛ 1625line 1624 didn't jump to line 1625 because the condition on line 1624 was never true

1625 return self._source_condition_context 

1626 if not isinstance(binary_package, str): 

1627 binary_package = binary_package.name 

1628 

1629 package_transformation = self.package_transformations[binary_package] 

1630 return self._source_condition_context.replace( 

1631 binary_package=package_transformation.binary_package, 

1632 substitution=package_transformation.substitution, 

1633 ) 

1634 

1635 def apply_fs_transformations( 

1636 self, 

1637 package: str, 

1638 fs_root: InMemoryVirtualPathBase, 

1639 ) -> None: 

1640 if package in self._used_for: 1640 ↛ 1641line 1640 didn't jump to line 1641 because the condition on line 1640 was never true

1641 raise ValueError( 

1642 f"data.tar contents for {package} has already been finalized!?" 

1643 ) 

1644 if package not in self.package_transformations: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true

1645 raise ValueError( 

1646 f'The package "{package}" was not relevant for the manifest!?' 

1647 ) 

1648 package_transformation = self.package_transformations[package] 

1649 condition_context = ConditionContext( 

1650 binary_package=package_transformation.binary_package, 

1651 substitution=package_transformation.substitution, 

1652 deb_options_and_profiles=self._build_env, 

1653 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1654 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1655 ) 

1656 norm_rules = list( 

1657 builtin_mode_normalization_rules( 

1658 self._dpkg_architecture_variables, 

1659 package_transformation.binary_package, 

1660 package_transformation.substitution, 

1661 ) 

1662 ) 

1663 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1664 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1665 for transformation in package_transformation.transformations: 

1666 transformation.run_transform_file_system(fs_root, condition_context) 

1667 interpreter_normalization = NormalizeShebangLineTransformation() 

1668 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1669 

1670 def finalize_data_tar_contents( 

1671 self, 

1672 package: str, 

1673 fs_root: InMemoryVirtualPathBase, 

1674 clamp_mtime_to: int, 

1675 ) -> IntermediateManifest: 

1676 if package in self._used_for: 1676 ↛ 1677line 1676 didn't jump to line 1677 because the condition on line 1676 was never true

1677 raise ValueError( 

1678 f"data.tar contents for {package} has already been finalized!?" 

1679 ) 

1680 if package not in self.package_transformations: 1680 ↛ 1681line 1680 didn't jump to line 1681 because the condition on line 1680 was never true

1681 raise ValueError( 

1682 f'The package "{package}" was not relevant for the manifest!?' 

1683 ) 

1684 self._used_for.add(package) 

1685 

1686 # At this point, there so be no further mutations to the file system (because the will not 

1687 # be present in the intermediate manifest) 

1688 cast("FSRootDir", fs_root).is_read_write = False 

1689 

1690 intermediate_manifest = list( 

1691 _generate_intermediate_manifest( 

1692 fs_root, 

1693 clamp_mtime_to, 

1694 ) 

1695 ) 

1696 return intermediate_manifest 

1697 

1698 def apply_to_binary_staging_directory( 

1699 self, 

1700 package: str, 

1701 fs_root: InMemoryVirtualPathBase, 

1702 clamp_mtime_to: int, 

1703 ) -> IntermediateManifest: 

1704 self.apply_fs_transformations(package, fs_root) 

1705 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1706 

1707 

1708@dataclasses.dataclass(slots=True) 

1709class SearchDirOrderState: 

1710 search_dir: VirtualPath 

1711 applies_to: set[BinaryPackage] = dataclasses.field(default_factory=set) 

1712 after: set[str] = dataclasses.field(default_factory=set) 

1713 

1714 

1715def _present_installation_dirs( 

1716 search_dirs: Sequence[SearchDir], 

1717 checked_missing_dirs: Sequence[VirtualPath], 

1718 all_pkgs: frozenset[BinaryPackage], 

1719) -> None: 

1720 _info("The following directories are considered search dirs (in order):") 

1721 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1722 for search_dir in search_dirs: 

1723 applies_to = "" 

1724 if search_dir.applies_to < all_pkgs: 

1725 names = ", ".join(p.name for p in search_dir.applies_to) 

1726 applies_to = f" [only applicable to: {names}]" 

1727 remark = "" 

1728 if not os.path.isdir(search_dir.search_dir.fs_path): 

1729 remark = " (skipped; absent)" 

1730 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1731 

1732 if checked_missing_dirs: 

1733 _info('The following directories are considered for "not-installed" paths;') 

1734 for d in checked_missing_dirs: 

1735 remark = "" 

1736 if not os.path.isdir(d.fs_path): 

1737 remark = " (skipped; absent)" 

1738 _info(f" * {d.fs_path:{max_len}}{remark}") 

1739 

1740 

1741def _determine_search_dir_order( 

1742 requested: Mapping[BinaryPackage, list[VirtualPath]], 

1743 all_pkgs: frozenset[BinaryPackage], 

1744 default_search_dirs: list[VirtualPath], 

1745 source_root: VirtualPath, 

1746) -> Sequence[SearchDir]: 

1747 search_dir_table = dict[str, SearchDirOrderState]() 

1748 assert requested.keys() <= all_pkgs 

1749 for pkg in all_pkgs: 

1750 paths = requested.get(pkg, default_search_dirs) 

1751 previous_search_dir: SearchDirOrderState | None = None 

1752 for path in paths: 

1753 try: 

1754 search_dir_state = search_dir_table[path.fs_path] 

1755 except KeyError: 

1756 search_dir_state = SearchDirOrderState(path) 

1757 search_dir_table[path.fs_path] = search_dir_state 

1758 search_dir_state.applies_to.add(pkg) 

1759 if previous_search_dir is not None: 

1760 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1761 previous_search_dir = search_dir_state 

1762 

1763 search_dirs_in_order = [] 

1764 released = set[str]() 

1765 remaining = set() 

1766 for search_dir_state in search_dir_table.values(): 

1767 if not (search_dir_state.after <= released): 

1768 remaining.add(search_dir_state.search_dir.fs_path) 

1769 continue 

1770 search_dirs_in_order.append(search_dir_state) 

1771 released.add(search_dir_state.search_dir.fs_path) 

1772 

1773 while remaining: 

1774 current_released = len(released) 

1775 for fs_path in remaining: 

1776 search_dir_state = search_dir_table[fs_path] 

1777 if not search_dir_state.after.issubset(released): 

1778 remaining.add(search_dir_state.search_dir.fs_path) 

1779 continue 

1780 search_dirs_in_order.append(search_dir_state) 

1781 released.add(search_dir_state.search_dir.fs_path) 

1782 

1783 if current_released == len(released): 

1784 names = ", ".join(remaining) 

1785 _error( 

1786 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1787 " Note that the search directories across all packages have to be ordered (and the" 

1788 " source root should generally be last)" 

1789 ) 

1790 remaining -= released 

1791 

1792 search_dirs_in_order.append( 

1793 SearchDirOrderState( 

1794 source_root, 

1795 set(all_pkgs), 

1796 ) 

1797 ) 

1798 

1799 return tuple( 

1800 # Avoid duplicating all_pkgs 

1801 SearchDir( 

1802 s.search_dir, 

1803 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1804 ) 

1805 for s in search_dirs_in_order 

1806 )