Coverage for src/debputy/highlevel_manifest.py: 65%

900 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-04-06 19:25 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5import typing 

6from contextlib import suppress 

7from dataclasses import dataclass, field 

8from typing import ( 

9 Any, 

10 TypeVar, 

11 Generic, 

12 cast, 

13) 

14from collections.abc import Iterable, Mapping, Sequence, Callable 

15 

16from debian.debian_support import DpkgArchTable 

17 

18from debputy.dh.debhelper_emulation import ( 

19 dhe_dbgsym_root_dir, 

20 assert_no_dbgsym_migration, 

21 read_dbgsym_file, 

22) 

23from ._deb_options_profiles import DebBuildOptionsAndProfiles 

24from ._manifest_constants import * 

25from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

26from .builtin_manifest_rules import builtin_mode_normalization_rules 

27from .exceptions import ( 

28 DebputySubstitutionError, 

29 DebputyRuntimeErrorWithPreamble, 

30) 

31from .filesystem_scan import ( 

32 InMemoryVirtualRootDir, 

33 OSFSROOverlay, 

34 FSControlRootDir, 

35 InMemoryVirtualPathBase, 

36) 

37from .installations import ( 

38 InstallRule, 

39 SourcePathMatcher, 

40 PathAlreadyInstalledOrDiscardedError, 

41 NoMatchForInstallPatternError, 

42 InstallRuleContext, 

43 BinaryPackageInstallRuleContext, 

44 InstallSearchDirContext, 

45 SearchDir, 

46) 

47from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

48from .maintscript_snippet import ( 

49 DpkgMaintscriptHelperCommand, 

50 MaintscriptSnippetContainer, 

51) 

52from .manifest_conditions import ConditionContext 

53from .manifest_parser.base_types import ( 

54 FileSystemMatchRule, 

55 FileSystemExactMatchRule, 

56 BuildEnvironments, 

57) 

58from .manifest_parser.util import AttributePath 

59from .packager_provided_files import PackagerProvidedFile 

60from .packages import BinaryPackage, SourcePackage 

61from .plugin.api.feature_set import PluginProvidedFeatureSet 

62from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

63from .plugin.api.impl_types import ( 

64 PackageProcessingContextProvider, 

65 PackageDataTable, 

66) 

67from .plugin.api.spec import ( 

68 FlushableSubstvars, 

69 VirtualPath, 

70 DebputyIntegrationMode, 

71 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

72 INTEGRATION_MODE_FULL, 

73) 

74from debputy.plugins.debputy.binary_package_rules import ServiceRule 

75from debputy.plugins.debputy.build_system_rules import BuildRule 

76from .plugin.plugin_state import run_in_context_of_plugin 

77from .substitution import Substitution 

78from .transformation_rules import ( 

79 TransformationRule, 

80 ModeNormalizationTransformationRule, 

81 NormalizeShebangLineTransformation, 

82) 

83from .util import ( 

84 _error, 

85 _warn, 

86 debian_policy_normalize_symlink_target, 

87 generated_content_dir, 

88 _info, 

89) 

90from .yaml import MANIFEST_YAML 

91from .yaml.compat import CommentedMap, CommentedSeq 

92 

93 

94def tar_path(p: VirtualPath) -> str: 

95 path = p.path 

96 if p.is_dir: 

97 return path + "/" 

98 return path 

99 

100 

101def tar_owner_info(path: InMemoryVirtualPathBase) -> tuple[str, int, str, int]: 

102 owner = path._owner # noqa 

103 group = path._group # noqa 

104 return ( 

105 owner.entity_name, 

106 owner.entity_id, 

107 group.entity_name, 

108 group.entity_id, 

109 ) 

110 

111 

112class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

113 

114 @property 

115 def unmatched_paths(self) -> Sequence[VirtualPath]: 

116 return self.args[1] 

117 

118 @property 

119 def search_dir(self) -> VirtualPath: 

120 return self.args[2] 

121 

122 def render_preamble(self) -> None: 

123 _warn( 

124 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

125 ) 

126 _warn("") 

127 for entry in self.unmatched_paths: 

128 desc = _describe_missing_path(entry) 

129 _warn(f" * {desc}") 

130 _warn("") 

131 

132 

133@dataclass(slots=True) 

134class DbgsymInfo: 

135 binary_package: BinaryPackage 

136 dbgsym_fs_root: InMemoryVirtualPathBase 

137 _dbgsym_root_fs: str | None 

138 dbgsym_ids: list[str] 

139 run_dwz: bool 

140 

141 @property 

142 def dbgsym_root_dir(self) -> str: 

143 root_dir = self._dbgsym_root_fs 

144 if root_dir is None: 

145 root_dir = generated_content_dir( 

146 package=self.binary_package, 

147 subdir_key="dbgsym-fs-root", 

148 ) 

149 self._dbgsym_root_fs = root_dir 

150 return root_dir 

151 

152 @property 

153 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

154 return FSControlRootDir.create_root_dir( 

155 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

156 ) 

157 

158 

159@dataclass(slots=True, frozen=True) 

160class BinaryPackageData: 

161 source_package: SourcePackage 

162 binary_package: BinaryPackage 

163 binary_staging_root_dir: str 

164 fs_root: InMemoryVirtualPathBase 

165 substvars: FlushableSubstvars 

166 package_metadata_context: PackageProcessingContextProvider 

167 ctrl_creator: BinaryCtrlAccessorProviderCreator 

168 dbgsym_info: DbgsymInfo 

169 

170 @property 

171 def control_output_dir(self) -> FSControlRootDir: 

172 return FSControlRootDir.create_root_dir( 

173 generated_content_dir( 

174 package=self.binary_package, 

175 subdir_key="DEBIAN", 

176 ) 

177 ) 

178 

179 

180@dataclass(slots=True) 

181class PackageTransformationDefinition: 

182 binary_package: BinaryPackage 

183 substitution: Substitution 

184 is_auto_generated_package: bool 

185 binary_version: str | None = None 

186 search_dirs: list[FileSystemExactMatchRule] | None = None 

187 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field( 

188 default_factory=list 

189 ) 

190 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field( 

191 default_factory=dict 

192 ) 

193 transformations: list[TransformationRule] = field(default_factory=list) 

194 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field( 

195 default_factory=dict 

196 ) 

197 install_rules: list[InstallRule] = field(default_factory=list) 

198 requested_service_rules: list[ServiceRule] = field(default_factory=list) 

199 

200 

201def _path_to_tar_member( 

202 path: InMemoryVirtualPathBase, 

203 clamp_mtime_to: int, 

204) -> TarMember: 

205 mtime = float(clamp_mtime_to) 

206 owner, uid, group, gid = tar_owner_info(path) 

207 mode = path.mode 

208 

209 if path.has_fs_path: 

210 mtime = min(mtime, path.mtime) 

211 

212 if path.is_dir: 

213 path_type = PathType.DIRECTORY 

214 elif path.is_file: 

215 # TODO: someday we will need to deal with hardlinks and it might appear here. 

216 path_type = PathType.FILE 

217 elif path.is_symlink: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true

218 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

219 link_target = debian_policy_normalize_symlink_target( 

220 path.path, 

221 path.readlink(), 

222 ) 

223 return TarMember.virtual_path( 

224 tar_path(path), 

225 PathType.SYMLINK, 

226 mtime, 

227 link_target=link_target, 

228 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

229 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

230 mode=0o0777, 

231 owner=owner, 

232 uid=uid, 

233 group=group, 

234 gid=gid, 

235 ) 

236 else: 

237 assert not path.is_symlink 

238 raise AssertionError( 

239 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

240 ) 

241 

242 if not path.has_fs_path: 

243 assert not path.is_file 

244 return TarMember.virtual_path( 

245 tar_path(path), 

246 path_type, 

247 mtime, 

248 mode=mode, 

249 owner=owner, 

250 uid=uid, 

251 group=group, 

252 gid=gid, 

253 ) 

254 may_steal_fs_path = getattr(path, "_can_replace_inline", False) 

255 return TarMember.from_file( 

256 tar_path(path), 

257 path.fs_path, 

258 mode=mode, 

259 uid=uid, 

260 owner=owner, 

261 gid=gid, 

262 group=group, 

263 path_type=path_type, 

264 path_mtime=mtime, 

265 clamp_mtime_to=clamp_mtime_to, 

266 may_steal_fs_path=may_steal_fs_path, 

267 ) 

268 

269 

270def _generate_intermediate_manifest( 

271 fs_root: InMemoryVirtualPathBase, 

272 clamp_mtime_to: int, 

273) -> Iterable[TarMember]: 

274 symlinks = [] 

275 for path in fs_root.all_paths(): 

276 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

277 if tar_member.path_type == PathType.SYMLINK: 

278 symlinks.append(tar_member) 

279 continue 

280 yield tar_member 

281 yield from symlinks 

282 

283 

284ST = TypeVar("ST") 

285T = TypeVar("T") 

286 

287 

288class AbstractYAMLSubStore(Generic[ST]): 

289 def __init__( 

290 self, 

291 parent_store: Any, 

292 parent_key: int | str | None, 

293 store: ST | None = None, 

294 ) -> None: 

295 if parent_store is not None and parent_key is not None: 

296 try: 

297 from_parent_store = parent_store[parent_key] 

298 except (KeyError, IndexError): 

299 from_parent_store = None 

300 if ( 300 ↛ 305line 300 didn't jump to line 305 because the condition on line 300 was never true

301 store is not None 

302 and from_parent_store is not None 

303 and store is not parent_store 

304 ): 

305 raise ValueError( 

306 "Store is provided but is not the one already in the parent store" 

307 ) 

308 if store is None: 308 ↛ 310line 308 didn't jump to line 310 because the condition on line 308 was always true

309 store = from_parent_store 

310 self._parent_store = parent_store 

311 self._parent_key = parent_key 

312 self._is_detached = ( 

313 parent_key is None or parent_store is None or parent_key not in parent_store 

314 ) 

315 assert self._is_detached or store is not None 

316 if store is None: 

317 store = self._create_new_instance() 

318 self._store: ST = store 

319 

320 def _create_new_instance(self) -> ST: 

321 raise NotImplementedError 

322 

323 def create_definition_if_missing(self) -> None: 

324 if self._is_detached: 

325 self.create_definition() 

326 

327 def create_definition(self) -> None: 

328 if not self._is_detached: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 raise RuntimeError("Definition is already present") 

330 parent_store = self._parent_store 

331 if parent_store is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise RuntimeError( 

333 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

334 ) 

335 if isinstance(parent_store, list): 

336 assert self._parent_key is None 

337 self._parent_key = len(parent_store) 

338 self._parent_store.append(self._store) 

339 else: 

340 parent_store[self._parent_key] = self._store 

341 self._is_detached = False 

342 

343 def remove_definition(self) -> None: 

344 self._ensure_attached() 

345 del self._parent_store[self._parent_key] 

346 if isinstance(self._parent_store, list): 

347 self._parent_key = None 

348 self._is_detached = True 

349 

350 def _ensure_attached(self) -> None: 

351 if self._is_detached: 

352 raise RuntimeError("The definition has been removed!") 

353 

354 

355class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]): 

356 def _create_new_instance(self) -> list[T]: 

357 return CommentedSeq() 

358 

359 

360class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]): 

361 def _create_new_instance(self) -> dict[str, T]: 

362 return CommentedMap() 

363 

364 

365class MutableCondition: 

366 @classmethod 

367 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

368 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

369 

370 @classmethod 

371 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

372 return CommentedMap( 

373 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

374 ) 

375 

376 

377class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

378 @classmethod 

379 def new_symlink( 

380 cls, link_path: str, link_target: str, condition: Any | None 

381 ) -> "MutableYAMLSymlink": 

382 inner = { 

383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

384 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

385 } 

386 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

387 if condition is not None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 inner["when"] = condition 

389 return cls(None, None, store=CommentedMap(content)) 

390 

391 @property 

392 def symlink_path(self) -> str: 

393 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

394 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

395 ] 

396 

397 @symlink_path.setter 

398 def symlink_path(self, path: str) -> None: 

399 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

400 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

401 ] = path 

402 

403 @property 

404 def symlink_target(self) -> str | None: 

405 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

406 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

407 ] 

408 

409 @symlink_target.setter 

410 def symlink_target(self, target: str) -> None: 

411 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

412 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

413 ] = target 

414 

415 

416class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

417 @classmethod 

418 def rm_conffile( 

419 cls, 

420 conffile: str, 

421 prior_to_version: str | None, 

422 owning_package: str | None, 

423 ) -> "MutableYAMLConffileManagementItem": 

424 r = cls( 

425 None, 

426 None, 

427 store=CommentedMap( 

428 { 

429 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

430 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

431 ) 

432 } 

433 ), 

434 ) 

435 r.prior_to_version = prior_to_version 

436 r.owning_package = owning_package 

437 return r 

438 

439 @classmethod 

440 def mv_conffile( 

441 cls, 

442 old_conffile: str, 

443 new_conffile: str, 

444 prior_to_version: str | None, 

445 owning_package: str | None, 

446 ) -> "MutableYAMLConffileManagementItem": 

447 r = cls( 

448 None, 

449 None, 

450 store=CommentedMap( 

451 { 

452 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

453 { 

454 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

455 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

456 } 

457 ) 

458 } 

459 ), 

460 ) 

461 r.prior_to_version = prior_to_version 

462 r.owning_package = owning_package 

463 return r 

464 

465 @property 

466 def _container(self) -> dict[str, Any]: 

467 assert len(self._store) == 1 

468 return next(iter(self._store.values())) 

469 

470 @property 

471 def command(self) -> str: 

472 assert len(self._store) == 1 

473 return next(iter(self._store)) 

474 

475 @property 

476 def obsolete_conffile(self) -> str: 

477 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

478 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

479 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

481 

482 @obsolete_conffile.setter 

483 def obsolete_conffile(self, value: str) -> None: 

484 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

485 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

486 else: 

487 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

488 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

489 

490 @property 

491 def new_conffile(self) -> str: 

492 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

493 raise TypeError( 

494 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

495 f" This is a {self.command}" 

496 ) 

497 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

498 

499 @new_conffile.setter 

500 def new_conffile(self, value: str) -> None: 

501 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

502 raise TypeError( 

503 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

504 f" This is a {self.command}" 

505 ) 

506 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

507 

508 @property 

509 def prior_to_version(self) -> str | None: 

510 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

511 

512 @prior_to_version.setter 

513 def prior_to_version(self, value: str | None) -> None: 

514 if value is None: 

515 try: 

516 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

517 except KeyError: 

518 pass 

519 else: 

520 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

521 

522 @property 

523 def owning_package(self) -> str | None: 

524 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

525 

526 @owning_package.setter 

527 def owning_package(self, value: str | None) -> None: 

528 if value is None: 

529 try: 

530 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

531 except KeyError: 

532 pass 

533 else: 

534 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

535 

536 

537class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

538 def _list_store( 

539 self, key, *, create_if_absent: bool = False 

540 ) -> list[dict[str, Any]] | None: 

541 if self._is_detached or key not in self._store: 

542 if create_if_absent: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 return None 

544 self.create_definition_if_missing() 

545 self._store[key] = [] 

546 return self._store[key] 

547 

548 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

549 parent_store = self._list_store(key, create_if_absent=True) 

550 assert parent_store is not None 

551 if not item._is_detached or ( 551 ↛ 554line 551 didn't jump to line 554 because the condition on line 551 was never true

552 item._parent_store is not None and item._parent_store is not parent_store 

553 ): 

554 raise RuntimeError( 

555 "Item is already attached or associated with a different container" 

556 ) 

557 item._parent_store = parent_store 

558 item.create_definition() 

559 

560 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

561 self._insert_item(MK_TRANSFORMATIONS, symlink) 

562 

563 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

564 store = self._list_store(MK_TRANSFORMATIONS) 

565 if store is None: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 return 

567 for i in range(len(store)): 567 ↛ 568line 567 didn't jump to line 568 because the loop on line 567 never started

568 d = store[i] 

569 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

570 yield MutableYAMLSymlink(store, i) 

571 

572 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

573 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

574 if store is None: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 return 

576 yield from ( 

577 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

578 ) 

579 

580 def add_conffile_management( 

581 self, conffile_management_item: MutableYAMLConffileManagementItem 

582 ) -> None: 

583 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

584 

585 

586class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

587 @property 

588 def _container(self) -> dict[str, Any]: 

589 assert len(self._store) == 1 

590 return next(iter(self._store.values())) 

591 

592 @property 

593 def into(self) -> list[str] | None: 

594 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

595 if v is None: 

596 return None 

597 if isinstance(v, str): 

598 return [v] 

599 return v 

600 

601 @into.setter 

602 def into(self, new_value: str | list[str] | None) -> None: 

603 if new_value is None: 603 ↛ 607line 603 didn't jump to line 607 because the condition on line 603 was always true

604 with suppress(KeyError): 

605 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

606 return 

607 if isinstance(new_value, str): 

608 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

609 return 

610 new_list = CommentedSeq(new_value) 

611 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

612 

613 @property 

614 def when(self) -> str | Mapping[str, Any] | None: 

615 return self._container[MK_CONDITION_WHEN] 

616 

617 @when.setter 

618 def when(self, new_value: str | Mapping[str, Any] | None) -> None: 

619 if new_value is None: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true

620 with suppress(KeyError): 

621 del self._container[MK_CONDITION_WHEN] 

622 return 

623 if isinstance(new_value, str): 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 self._container[MK_CONDITION_WHEN] = new_value 

625 return 

626 new_map = CommentedMap(new_value) 

627 self._container[MK_CONDITION_WHEN] = new_map 

628 

629 @classmethod 

630 def install_dest( 

631 cls, 

632 sources: str | list[str], 

633 into: str | list[str] | None, 

634 *, 

635 dest_dir: str | None = None, 

636 when: str | Mapping[str, Any] | None = None, 

637 ) -> "MutableYAMLInstallRuleInstall": 

638 k = MK_INSTALLATIONS_INSTALL_SOURCES 

639 if isinstance(sources, str): 

640 k = MK_INSTALLATIONS_INSTALL_SOURCE 

641 r = MutableYAMLInstallRuleInstall( 

642 None, 

643 None, 

644 store=CommentedMap( 

645 { 

646 MK_INSTALLATIONS_INSTALL: CommentedMap( 

647 { 

648 k: sources, 

649 } 

650 ) 

651 } 

652 ), 

653 ) 

654 r.dest_dir = dest_dir 

655 r.into = into 

656 if when is not None: 

657 r.when = when 

658 return r 

659 

660 @classmethod 

661 def multi_dest_install( 

662 cls, 

663 sources: str | list[str], 

664 dest_dirs: Sequence[str], 

665 into: str | list[str] | None, 

666 *, 

667 when: str | Mapping[str, Any] | None = None, 

668 ) -> "MutableYAMLInstallRuleInstall": 

669 k = MK_INSTALLATIONS_INSTALL_SOURCES 

670 if isinstance(sources, str): 670 ↛ 672line 670 didn't jump to line 672 because the condition on line 670 was always true

671 k = MK_INSTALLATIONS_INSTALL_SOURCE 

672 r = MutableYAMLInstallRuleInstall( 

673 None, 

674 None, 

675 store=CommentedMap( 

676 { 

677 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

678 { 

679 k: sources, 

680 "dest-dirs": dest_dirs, 

681 } 

682 ) 

683 } 

684 ), 

685 ) 

686 r.into = into 

687 if when is not None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true

688 r.when = when 

689 return r 

690 

691 @classmethod 

692 def install_as( 

693 cls, 

694 source: str, 

695 install_as: str, 

696 into: str | list[str] | None, 

697 when: str | Mapping[str, Any] | None = None, 

698 ) -> "MutableYAMLInstallRuleInstall": 

699 r = MutableYAMLInstallRuleInstall( 

700 None, 

701 None, 

702 store=CommentedMap( 

703 { 

704 MK_INSTALLATIONS_INSTALL: CommentedMap( 

705 { 

706 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

707 MK_INSTALLATIONS_INSTALL_AS: install_as, 

708 } 

709 ) 

710 } 

711 ), 

712 ) 

713 r.into = into 

714 if when is not None: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true

715 r.when = when 

716 return r 

717 

718 @classmethod 

719 def install_doc_as( 

720 cls, 

721 source: str, 

722 install_as: str, 

723 into: str | list[str] | None, 

724 when: str | Mapping[str, Any] | None = None, 

725 ) -> "MutableYAMLInstallRuleInstall": 

726 r = MutableYAMLInstallRuleInstall( 

727 None, 

728 None, 

729 store=CommentedMap( 

730 { 

731 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

732 { 

733 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

734 MK_INSTALLATIONS_INSTALL_AS: install_as, 

735 } 

736 ) 

737 } 

738 ), 

739 ) 

740 r.into = into 

741 if when is not None: 

742 r.when = when 

743 return r 

744 

745 @classmethod 

746 def install_docs( 

747 cls, 

748 sources: str | list[str], 

749 into: str | list[str] | None, 

750 *, 

751 dest_dir: str | None = None, 

752 when: str | Mapping[str, Any] | None = None, 

753 ) -> "MutableYAMLInstallRuleInstall": 

754 k = MK_INSTALLATIONS_INSTALL_SOURCES 

755 if isinstance(sources, str): 

756 k = MK_INSTALLATIONS_INSTALL_SOURCE 

757 r = MutableYAMLInstallRuleInstall( 

758 None, 

759 None, 

760 store=CommentedMap( 

761 { 

762 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

763 { 

764 k: sources, 

765 } 

766 ) 

767 } 

768 ), 

769 ) 

770 r.into = into 

771 r.dest_dir = dest_dir 

772 if when is not None: 

773 r.when = when 

774 return r 

775 

776 @classmethod 

777 def install_examples( 

778 cls, 

779 sources: str | list[str], 

780 into: str | list[str] | None, 

781 when: str | Mapping[str, Any] | None = None, 

782 ) -> "MutableYAMLInstallRuleInstallExamples": 

783 k = MK_INSTALLATIONS_INSTALL_SOURCES 

784 if isinstance(sources, str): 

785 k = MK_INSTALLATIONS_INSTALL_SOURCE 

786 r = MutableYAMLInstallRuleInstallExamples( 

787 None, 

788 None, 

789 store=CommentedMap( 

790 { 

791 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

792 { 

793 k: sources, 

794 } 

795 ) 

796 } 

797 ), 

798 ) 

799 r.into = into 

800 if when is not None: 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true

801 r.when = when 

802 return r 

803 

804 @classmethod 

805 def install_man( 

806 cls, 

807 sources: str | list[str], 

808 into: str | list[str] | None, 

809 language: str | None, 

810 when: str | Mapping[str, Any] | None = None, 

811 ) -> "MutableYAMLInstallRuleMan": 

812 k = MK_INSTALLATIONS_INSTALL_SOURCES 

813 if isinstance(sources, str): 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 k = MK_INSTALLATIONS_INSTALL_SOURCE 

815 r = MutableYAMLInstallRuleMan( 

816 None, 

817 None, 

818 store=CommentedMap( 

819 { 

820 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

821 { 

822 k: sources, 

823 } 

824 ) 

825 } 

826 ), 

827 ) 

828 r.language = language 

829 r.into = into 

830 if when is not None: 830 ↛ 831line 830 didn't jump to line 831 because the condition on line 830 was never true

831 r.when = when 

832 return r 

833 

834 @classmethod 

835 def discard( 

836 cls, 

837 sources: str | list[str], 

838 ) -> "MutableYAMLInstallRuleDiscard": 

839 return MutableYAMLInstallRuleDiscard( 

840 None, 

841 None, 

842 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

843 ) 

844 

845 

846class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

847 pass 

848 

849 

850class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

851 @property 

852 def language(self) -> str | None: 

853 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

854 

855 @language.setter 

856 def language(self, new_value: str | None) -> None: 

857 if new_value is not None: 

858 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

859 return 

860 with suppress(KeyError): 

861 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

862 

863 

864class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

865 pass 

866 

867 

868class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

869 @property 

870 def sources(self) -> list[str]: 

871 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

872 if isinstance(v, str): 

873 return [v] 

874 return v 

875 

876 @sources.setter 

877 def sources(self, new_value: str | list[str]) -> None: 

878 if isinstance(new_value, str): 

879 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

880 return 

881 new_list = CommentedSeq(new_value) 

882 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

883 

884 @property 

885 def dest_dir(self) -> str | None: 

886 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

887 

888 @dest_dir.setter 

889 def dest_dir(self, new_value: str | None) -> None: 

890 if new_value is not None and self.dest_as is not None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 raise ValueError( 

892 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

893 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

894 ) 

895 if new_value is not None: 

896 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

897 else: 

898 with suppress(KeyError): 

899 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

900 

901 @property 

902 def dest_as(self) -> str | None: 

903 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

904 

905 @dest_as.setter 

906 def dest_as(self, new_value: str | None) -> None: 

907 if new_value is not None: 

908 if self.dest_dir is not None: 

909 raise ValueError( 

910 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

911 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

912 ) 

913 

914 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

915 if isinstance(sources, list): 

916 if len(sources) != 1: 

917 raise ValueError( 

918 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

919 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

920 ) 

921 self.sources = sources[0] 

922 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

923 else: 

924 with suppress(KeyError): 

925 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

926 

927 

928class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

929 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

930 parent_store = self._store 

931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true

932 install_rule._parent_store is not None 

933 and install_rule._parent_store is not parent_store 

934 ): 

935 raise RuntimeError( 

936 "Item is already attached or associated with a different container" 

937 ) 

938 self.create_definition_if_missing() 

939 install_rule._parent_store = parent_store 

940 install_rule.create_definition() 

941 

942 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

943 parent_store = self._store 

944 for install_rule in install_rules: 

945 if not install_rule._is_detached or ( 945 ↛ 949line 945 didn't jump to line 949 because the condition on line 945 was never true

946 install_rule._parent_store is not None 

947 and install_rule._parent_store is not parent_store 

948 ): 

949 raise RuntimeError( 

950 "Item is already attached or associated with a different container" 

951 ) 

952 self.create_definition_if_missing() 

953 install_rule._parent_store = parent_store 

954 install_rule.create_definition() 

955 

956 

957class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

958 @property 

959 def variables(self) -> dict[str, Any]: 

960 return self._store 

961 

962 def __setitem__(self, key: str, value: Any) -> None: 

963 self._store[key] = value 

964 self.create_definition_if_missing() 

965 

966 

967class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

968 def manifest_variables( 

969 self, *, create_if_absent: bool = True 

970 ) -> MutableYAMLManifestVariables: 

971 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

972 if create_if_absent: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true

973 d.create_definition_if_missing() 

974 return d 

975 

976 

977class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]): 

978 def append(self, rule: str) -> None: 

979 self.create_definition_if_missing() 

980 self._store.append(rule) 

981 

982 def __len__(self) -> int: 

983 return len(self._store) 

984 

985 def extend(self, rules: Iterable[str]) -> None: 

986 it = iter(rules) 

987 try: 

988 first_rule = next(it) 

989 except StopIteration: 

990 return 

991 self.create_definition_if_missing() 

992 self._store.append(first_rule) 

993 self._store.extend(it) 

994 

995 

996class MutableYAMLManifest: 

997 def __init__(self, store: Any) -> None: 

998 self._store = store 

999 

1000 @classmethod 

1001 def empty_manifest(cls) -> "MutableYAMLManifest": 

1002 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

1003 

1004 @property 

1005 def manifest_version(self) -> str: 

1006 return self._store[MK_MANIFEST_VERSION] 

1007 

1008 @manifest_version.setter 

1009 def manifest_version(self, version: str) -> None: 

1010 if version not in SUPPORTED_MANIFEST_VERSIONS: 

1011 raise ValueError("Unsupported version") 

1012 self._store[MK_MANIFEST_VERSION] = version 

1013 

1014 def remove_during_clean( 

1015 self, 

1016 *, 

1017 create_if_absent: bool = True, 

1018 ) -> MutableYAMLRemoveDuringCleanDefinitions: 

1019 d = MutableYAMLRemoveDuringCleanDefinitions( 

1020 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN 

1021 ) 

1022 if create_if_absent: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true

1023 d.create_definition_if_missing() 

1024 return d 

1025 

1026 def installations( 

1027 self, 

1028 *, 

1029 create_if_absent: bool = True, 

1030 ) -> MutableYAMLInstallationsDefinition: 

1031 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

1032 if create_if_absent: 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true

1033 d.create_definition_if_missing() 

1034 return d 

1035 

1036 def manifest_definitions( 

1037 self, 

1038 *, 

1039 create_if_absent: bool = True, 

1040 ) -> MutableYAMLManifestDefinitions: 

1041 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

1042 if create_if_absent: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true

1043 d.create_definition_if_missing() 

1044 return d 

1045 

1046 def package( 

1047 self, name: str, *, create_if_absent: bool = True 

1048 ) -> MutableYAMLPackageDefinition: 

1049 if MK_PACKAGES not in self._store: 1049 ↛ 1051line 1049 didn't jump to line 1051 because the condition on line 1049 was always true

1050 self._store[MK_PACKAGES] = CommentedMap() 

1051 packages_store = self._store[MK_PACKAGES] 

1052 package = packages_store.get(name) 

1053 if package is None: 1053 ↛ 1060line 1053 didn't jump to line 1060 because the condition on line 1053 was always true

1054 if not create_if_absent: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 raise KeyError(name) 

1056 assert packages_store is not None 

1057 d = MutableYAMLPackageDefinition(packages_store, name) 

1058 d.create_definition() 

1059 else: 

1060 d = MutableYAMLPackageDefinition(packages_store, name) 

1061 return d 

1062 

1063 def write_to(self, fd) -> None: 

1064 MANIFEST_YAML.dump(self._store, fd) 

1065 

1066 

1067def _describe_missing_path(entry: VirtualPath) -> str: 

1068 if entry.is_dir: 

1069 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1070 if entry.is_symlink: 

1071 target = os.readlink(entry.fs_path) 

1072 return f"{entry.fs_path} (symlink; links to {target})" 

1073 if entry.is_file: 

1074 return f"{entry.fs_path} (file)" 

1075 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1076 

1077 

1078def _detect_missing_installations( 

1079 path_matcher: SourcePathMatcher, 

1080 search_dir: VirtualPath, 

1081) -> None: 

1082 if not search_dir.is_dir: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 return 

1084 missing = list(path_matcher.detect_missing(search_dir)) 

1085 if not missing: 

1086 return 

1087 

1088 excl = textwrap.dedent("""\ 

1089 - discard: "*" 

1090 """) 

1091 

1092 raise PathNotCoveredByInstallRulesError( 

1093 "Please review the list and add either install rules or exclusions to `installations` in" 

1094 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1095 f" end of your 'installations`:\n\n{excl}\n", 

1096 missing, 

1097 search_dir, 

1098 ) 

1099 

1100 

1101def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1102 used_discard_rules = path_matcher.used_auto_discard_rules 

1103 # Discard rules can match and then be overridden. In that case, they appear 

1104 # but have 0 matches. 

1105 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1106 return 

1107 _info("The following automatic discard rules were triggered:") 

1108 example_path: str | None = None 

1109 for rule in sorted(used_discard_rules): 

1110 for fs_path in sorted(used_discard_rules[rule]): 

1111 if example_path is None: 1111 ↛ 1113line 1111 didn't jump to line 1113 because the condition on line 1111 was always true

1112 example_path = fs_path 

1113 _info(f" * {rule} -> {fs_path}") 

1114 assert example_path is not None 

1115 _info("") 

1116 _info( 

1117 "Note that some of these may have been overruled. The overrule detection logic is not" 

1118 ) 

1119 _info("100% reliable.") 

1120 _info("") 

1121 _info( 

1122 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1123 ) 

1124 _info(" installations:") 

1125 _info(" - install:") 

1126 _info(f" source: {example_path}") 

1127 

1128 

1129def _install_everything_from_source_dir_if_present( 

1130 dctrl_bin: BinaryPackage, 

1131 substitution: Substitution, 

1132 path_matcher: SourcePathMatcher, 

1133 install_rule_context: InstallRuleContext, 

1134 source_condition_context: ConditionContext, 

1135 source_dir: VirtualPath, 

1136 *, 

1137 into_dir: VirtualPath | None = None, 

1138) -> None: 

1139 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1140 pkg_set = frozenset([dctrl_bin]) 

1141 install_rule = run_in_context_of_plugin( 

1142 "debputy", 

1143 InstallRule.install_dest, 

1144 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1145 None, 

1146 pkg_set, 

1147 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1148 None, 

1149 ) 

1150 pkg_search_dir: tuple[SearchDir] = ( 

1151 SearchDir( 

1152 source_dir, 

1153 pkg_set, 

1154 ), 

1155 ) 

1156 replacements = { 

1157 "search_dirs": pkg_search_dir, 

1158 } 

1159 if into_dir is not None: 1159 ↛ 1160line 1159 didn't jump to line 1160 because the condition on line 1159 was never true

1160 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1161 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1162 binary_package_contexts[dctrl_bin.name] = updated 

1163 replacements["binary_package_contexts"] = binary_package_contexts 

1164 

1165 fake_install_rule_context = install_rule_context.replace(**replacements) 

1166 try: 

1167 install_rule.perform_install( 

1168 path_matcher, 

1169 fake_install_rule_context, 

1170 source_condition_context, 

1171 ) 

1172 except ( 

1173 NoMatchForInstallPatternError, 

1174 PathAlreadyInstalledOrDiscardedError, 

1175 ): 

1176 # Empty directory or everything excluded by default; ignore the error 

1177 pass 

1178 

1179 

1180def _add_build_install_dirs_to_per_package_search_dirs( 

1181 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1182 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]], 

1183 as_path: Callable[[str], VirtualPath], 

1184) -> None: 

1185 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set() 

1186 for dest_dir, for_packages in build_system_install_dirs: 

1187 dest_path = as_path(dest_dir) 

1188 for pkg in for_packages: 

1189 seen_key = (pkg, dest_dir) 

1190 if seen_key in seen_pp_search_dirs: 

1191 continue 

1192 seen_pp_search_dirs.add(seen_key) 

1193 if pkg not in per_package_search_dirs: 

1194 per_package_search_dirs[pkg] = [dest_path] 

1195 else: 

1196 per_package_search_dirs[pkg].append(dest_path) 

1197 

1198 

1199class HighLevelManifest: 

1200 def __init__( 

1201 self, 

1202 manifest_path: str, 

1203 mutable_manifest: MutableYAMLManifest | None, 

1204 remove_during_clean_rules: list[FileSystemMatchRule], 

1205 install_rules: list[InstallRule] | None, 

1206 source_package: SourcePackage, 

1207 binary_packages: Mapping[str, BinaryPackage], 

1208 substitution: Substitution, 

1209 package_transformations: Mapping[str, PackageTransformationDefinition], 

1210 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1211 dpkg_arch_query_table: DpkgArchTable, 

1212 build_env: DebBuildOptionsAndProfiles, 

1213 build_environments: BuildEnvironments, 

1214 build_rules: list[BuildRule] | None, 

1215 value_table: Mapping[ 

1216 tuple[SourcePackage | BinaryPackage, type[Any]], 

1217 Any, 

1218 ], 

1219 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1220 debian_dir: VirtualPath, 

1221 ) -> None: 

1222 self.manifest_path = manifest_path 

1223 self.mutable_manifest = mutable_manifest 

1224 self._remove_during_clean_rules: list[FileSystemMatchRule] = ( 

1225 remove_during_clean_rules 

1226 ) 

1227 self._install_rules = install_rules 

1228 self.source_package = source_package 

1229 self._binary_packages = binary_packages 

1230 self.substitution = substitution 

1231 self.package_transformations = package_transformations 

1232 self._dpkg_architecture_variables = dpkg_architecture_variables 

1233 self.dpkg_arch_query_table = dpkg_arch_query_table 

1234 self._build_env = build_env 

1235 self._used_for: set[str] = set() 

1236 self.build_environments = build_environments 

1237 self.build_rules = build_rules 

1238 self._value_table = value_table 

1239 self._plugin_provided_feature_set = plugin_provided_feature_set 

1240 self._debian_dir = debian_dir 

1241 self._source_condition_context = ConditionContext( 

1242 binary_package=None, 

1243 substitution=self.substitution, 

1244 deb_options_and_profiles=self._build_env, 

1245 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1246 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1247 ) 

1248 

1249 def source_version(self, include_binnmu_version: bool = True) -> str: 

1250 # TODO: There should an easier way to determine the source version; really. 

1251 version_var = "{{DEB_VERSION}}" 

1252 if not include_binnmu_version: 

1253 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1254 try: 

1255 return self.substitution.substitute( 

1256 version_var, "internal (resolve version)" 

1257 ) 

1258 except DebputySubstitutionError as e: 

1259 raise AssertionError(f"Could not resolve {version_var}") from e 

1260 

1261 @property 

1262 def source_condition_context(self) -> ConditionContext: 

1263 return self._source_condition_context 

1264 

1265 @property 

1266 def debian_dir(self) -> VirtualPath: 

1267 return self._debian_dir 

1268 

1269 @property 

1270 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1271 return self._dpkg_architecture_variables 

1272 

1273 @property 

1274 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1275 return self._build_env 

1276 

1277 @property 

1278 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1279 return self._plugin_provided_feature_set 

1280 

1281 @property 

1282 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]: 

1283 return self._remove_during_clean_rules 

1284 

1285 @property 

1286 def active_packages(self) -> Iterable[BinaryPackage]: 

1287 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1288 

1289 @property 

1290 def all_packages(self) -> Iterable[BinaryPackage]: 

1291 yield from self._binary_packages.values() 

1292 

1293 def manifest_configuration[T]( 

1294 self, 

1295 context_package: SourcePackage | BinaryPackage, 

1296 value_type: type[T], 

1297 ) -> T | None: 

1298 res = self._value_table.get((context_package, value_type)) 

1299 return typing.cast("T | None", res) 

1300 

1301 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1302 return self.package_transformations[package] 

1303 

1304 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1305 name = package.name 

1306 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"): 

1307 doc_main_package_name = package.fields.get(doc_main_field) 

1308 if doc_main_package_name: 1308 ↛ 1309line 1308 didn't jump to line 1309 because the condition on line 1308 was never true

1309 main_package = self._binary_packages.get(doc_main_package_name) 

1310 if main_package is None: 

1311 _error( 

1312 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control" 

1313 ) 

1314 return main_package 

1315 # If it is not a -doc package, then docs should be installed 

1316 # under its own package name. 

1317 if not name.endswith("-doc"): 1317 ↛ 1319line 1317 didn't jump to line 1319 because the condition on line 1317 was always true

1318 return package 

1319 name = name[:-4] 

1320 main_package = self._binary_packages.get(name) 

1321 if main_package: 

1322 return main_package 

1323 if name.startswith("lib"): 

1324 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1325 if dev_pkg: 

1326 return dev_pkg 

1327 

1328 # If we found no better match; default to the doc package itself. 

1329 return package 

1330 

1331 def perform_installations( 

1332 self, 

1333 integration_mode: DebputyIntegrationMode, 

1334 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1335 *, 

1336 install_request_context: InstallSearchDirContext | None = None, 

1337 ) -> PackageDataTable: 

1338 package_data_dict = {} 

1339 package_data_table = PackageDataTable(package_data_dict) 

1340 enable_manifest_installation_feature = ( 

1341 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1342 ) 

1343 

1344 if build_system_install_dirs: 1344 ↛ 1345line 1344 didn't jump to line 1345 because the condition on line 1344 was never true

1345 if integration_mode != INTEGRATION_MODE_FULL: 

1346 raise ValueError( 

1347 "The build_system_install_dirs parameter can only be used in full integration mode" 

1348 ) 

1349 if install_request_context: 

1350 raise ValueError( 

1351 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1352 " (not implemented)" 

1353 ) 

1354 

1355 if install_request_context is None: 1355 ↛ 1357line 1355 didn't jump to line 1357 because the condition on line 1355 was never true

1356 

1357 @functools.lru_cache(None) 

1358 def _as_path(fs_path: str) -> VirtualPath: 

1359 return OSFSROOverlay.create_root_dir(".", fs_path) 

1360 

1361 dtmp_dir = _as_path("debian/tmp") 

1362 source_root_dir = _as_path(".") 

1363 into = frozenset(self._binary_packages.values()) 

1364 per_package_search_dirs = { 

1365 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1366 for t in self.package_transformations.values() 

1367 if t.search_dirs is not None 

1368 } 

1369 

1370 if integration_mode == INTEGRATION_MODE_FULL: 

1371 # In this mode, we have no default search dir. Everything ends up being 

1372 # per-package instead (since that is easier logic-wise). 

1373 # 

1374 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1375 # Note we still initialize dtmp_dir, since it affects a later guard. 

1376 default_search_dirs = [] 

1377 _add_build_install_dirs_to_per_package_search_dirs( 

1378 build_system_install_dirs, 

1379 per_package_search_dirs, 

1380 _as_path, 

1381 ) 

1382 

1383 # We can end here with per_package_search_dirs having no search dirs for any package 

1384 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1385 # 

1386 # This is not a problem in itself as the installation rules can still apply to the 

1387 # source root and there should be no reason to install something from d/<pkg> into 

1388 # d/<another-pkg> 

1389 else: 

1390 default_search_dirs = [dtmp_dir] 

1391 

1392 search_dirs = _determine_search_dir_order( 

1393 per_package_search_dirs, 

1394 into, 

1395 default_search_dirs, 

1396 source_root_dir, 

1397 ) 

1398 check_for_uninstalled_dirs = tuple( 

1399 s.search_dir 

1400 for s in search_dirs 

1401 if s.search_dir.fs_path != source_root_dir.fs_path 

1402 ) 

1403 if enable_manifest_installation_feature: 

1404 _present_installation_dirs( 

1405 search_dirs, check_for_uninstalled_dirs, into 

1406 ) 

1407 else: 

1408 dtmp_dir = None 

1409 search_dirs = install_request_context.search_dirs 

1410 into = frozenset(self._binary_packages.values()) 

1411 seen: set[BinaryPackage] = set() 

1412 for search_dir in search_dirs: 

1413 seen.update(search_dir.applies_to) 

1414 

1415 missing = into - seen 

1416 if missing: 1416 ↛ 1417line 1416 didn't jump to line 1417 because the condition on line 1416 was never true

1417 names = ", ".join(p.name for p in missing) 

1418 raise ValueError( 

1419 f"The following package(s) had no search dirs: {names}." 

1420 " (Generally, the source root would be applicable to all packages)" 

1421 ) 

1422 extra_names = seen - into 

1423 if extra_names: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true

1424 names = ", ".join(p.name for p in extra_names) 

1425 raise ValueError( 

1426 f"The install_request_context referenced the following unknown package(s): {names}" 

1427 ) 

1428 

1429 check_for_uninstalled_dirs = ( 

1430 install_request_context.check_for_uninstalled_dirs 

1431 ) 

1432 

1433 install_rule_context = InstallRuleContext(search_dirs) 

1434 

1435 if ( 1435 ↛ 1442line 1435 didn't jump to line 1442 because the condition on line 1435 was never true

1436 enable_manifest_installation_feature 

1437 and self._install_rules is None 

1438 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1439 and dtmp_dir is not None 

1440 and os.path.isdir(dtmp_dir.fs_path) 

1441 ): 

1442 msg = ( 

1443 "The build system appears to have provided the output of upstream build system's" 

1444 " install in debian/tmp. However, these are no provisions for debputy to install" 

1445 " any of that into any of the debian packages listed in debian/control." 

1446 " To avoid accidentally creating empty packages, debputy will insist that you " 

1447 " explicitly define an empty installation definition if you did not want to " 

1448 " install any of those files even though they have been provided." 

1449 ' Example: "installations: []"' 

1450 ) 

1451 _error(msg) 

1452 elif ( 1452 ↛ 1455line 1452 didn't jump to line 1455 because the condition on line 1452 was never true

1453 not enable_manifest_installation_feature and self._install_rules is not None 

1454 ): 

1455 _error( 

1456 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1457 f" Please remove or comment out the `installations` keyword." 

1458 ) 

1459 

1460 for dctrl_bin in self.all_packages: 

1461 package = dctrl_bin.name 

1462 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1463 

1464 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1465 dctrl_bin, 

1466 InMemoryVirtualRootDir(), 

1467 doc_main_package, 

1468 ) 

1469 

1470 if enable_manifest_installation_feature: 1470 ↛ 1475line 1470 didn't jump to line 1475 because the condition on line 1470 was always true

1471 discard_rules = list( 

1472 self.plugin_provided_feature_set.auto_discard_rules.values() 

1473 ) 

1474 else: 

1475 discard_rules = [ 

1476 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1477 ] 

1478 path_matcher = SourcePathMatcher(discard_rules) 

1479 

1480 source_condition_context = self._source_condition_context 

1481 

1482 for dctrl_bin in self.active_packages: 

1483 package = dctrl_bin.name 

1484 if install_request_context: 1484 ↛ 1489line 1484 didn't jump to line 1489 because the condition on line 1484 was always true

1485 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1486 package 

1487 ) 

1488 else: 

1489 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1490 if os.path.isdir(build_system_staging_dir_fs_path): 

1491 build_system_staging_dir = OSFSROOverlay.create_root_dir( 

1492 ".", 

1493 build_system_staging_dir_fs_path, 

1494 ) 

1495 else: 

1496 build_system_staging_dir = None 

1497 

1498 if build_system_staging_dir is not None: 

1499 _install_everything_from_source_dir_if_present( 

1500 dctrl_bin, 

1501 self.substitution, 

1502 path_matcher, 

1503 install_rule_context, 

1504 source_condition_context, 

1505 build_system_staging_dir, 

1506 ) 

1507 

1508 if self._install_rules: 

1509 # FIXME: Check that every install rule remains used after transformations have run. 

1510 # What we want to check is transformations do not exclude everything from an install 

1511 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1512 # match. 

1513 for install_rule in self._install_rules: 

1514 install_rule.perform_install( 

1515 path_matcher, 

1516 install_rule_context, 

1517 source_condition_context, 

1518 ) 

1519 

1520 if enable_manifest_installation_feature: 1520 ↛ 1524line 1520 didn't jump to line 1524 because the condition on line 1520 was always true

1521 for search_dir in check_for_uninstalled_dirs: 

1522 _detect_missing_installations(path_matcher, search_dir) 

1523 

1524 for dctrl_bin in self.all_packages: 

1525 package = dctrl_bin.name 

1526 binary_install_rule_context = install_rule_context[package] 

1527 build_system_pkg_staging_dir = os.path.join("debian", package) 

1528 fs_root = binary_install_rule_context.fs_root 

1529 

1530 context = self.package_transformations[package] 

1531 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1532 for special_install_rule in context.install_rules: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the loop on line 1532 never started

1533 special_install_rule.perform_install( 

1534 path_matcher, 

1535 install_rule_context, 

1536 source_condition_context, 

1537 ) 

1538 

1539 if dctrl_bin.should_be_acted_on: 

1540 self.apply_fs_transformations(package, fs_root) 

1541 substvars_file = f"debian/{package}.substvars" 

1542 substvars = FlushableSubstvars.load_from_path( 

1543 substvars_file, missing_ok=True 

1544 ) 

1545 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1546 substvars.substvars_path = None 

1547 else: 

1548 substvars = FlushableSubstvars() 

1549 

1550 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1551 if udeb_package and not udeb_package.is_udeb: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true

1552 udeb_package = None 

1553 

1554 package_metadata_context = PackageProcessingContextProvider( 

1555 self, 

1556 dctrl_bin, 

1557 udeb_package, 

1558 package_data_table, 

1559 ) 

1560 

1561 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1562 package_metadata_context, 

1563 substvars, 

1564 context.maintscript_snippets, 

1565 context.substitution, 

1566 ) 

1567 

1568 if not enable_manifest_installation_feature: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true

1569 assert_no_dbgsym_migration(dctrl_bin) 

1570 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1571 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir( 

1572 "", 

1573 dh_dbgsym_root_fs, 

1574 ) 

1575 dbgsym_root_fs = InMemoryVirtualRootDir() 

1576 _install_everything_from_source_dir_if_present( 

1577 dctrl_bin, 

1578 self.substitution, 

1579 path_matcher, 

1580 install_rule_context, 

1581 source_condition_context, 

1582 dh_dbgsym_root_path, 

1583 into_dir=dbgsym_root_fs, 

1584 ) 

1585 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1586 dbgsym_info = DbgsymInfo( 

1587 dctrl_bin, 

1588 dbgsym_root_fs, 

1589 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1590 dbgsym_build_ids, 

1591 # TODO: Provide manifest feature to support this. 

1592 False, 

1593 ) 

1594 else: 

1595 dbgsym_info = DbgsymInfo( 

1596 dctrl_bin, 

1597 InMemoryVirtualRootDir(), 

1598 None, 

1599 [], 

1600 False, 

1601 ) 

1602 

1603 package_data_dict[package] = BinaryPackageData( 

1604 self.source_package, 

1605 dctrl_bin, 

1606 build_system_pkg_staging_dir, 

1607 fs_root, 

1608 substvars, 

1609 package_metadata_context, 

1610 ctrl_creator, 

1611 dbgsym_info, 

1612 ) 

1613 

1614 if enable_manifest_installation_feature: 1614 ↛ 1617line 1614 didn't jump to line 1617 because the condition on line 1614 was always true

1615 _list_automatic_discard_rules(path_matcher) 

1616 

1617 return package_data_table 

1618 

1619 def condition_context( 

1620 self, binary_package: BinaryPackage | str | None 

1621 ) -> ConditionContext: 

1622 if binary_package is None: 1622 ↛ 1623line 1622 didn't jump to line 1623 because the condition on line 1622 was never true

1623 return self._source_condition_context 

1624 if not isinstance(binary_package, str): 

1625 binary_package = binary_package.name 

1626 

1627 package_transformation = self.package_transformations[binary_package] 

1628 return self._source_condition_context.replace( 

1629 binary_package=package_transformation.binary_package, 

1630 substitution=package_transformation.substitution, 

1631 ) 

1632 

1633 def apply_fs_transformations( 

1634 self, 

1635 package: str, 

1636 fs_root: InMemoryVirtualPathBase, 

1637 ) -> None: 

1638 if package in self._used_for: 1638 ↛ 1639line 1638 didn't jump to line 1639 because the condition on line 1638 was never true

1639 raise ValueError( 

1640 f"data.tar contents for {package} has already been finalized!?" 

1641 ) 

1642 if package not in self.package_transformations: 1642 ↛ 1643line 1642 didn't jump to line 1643 because the condition on line 1642 was never true

1643 raise ValueError( 

1644 f'The package "{package}" was not relevant for the manifest!?' 

1645 ) 

1646 package_transformation = self.package_transformations[package] 

1647 condition_context = ConditionContext( 

1648 binary_package=package_transformation.binary_package, 

1649 substitution=package_transformation.substitution, 

1650 deb_options_and_profiles=self._build_env, 

1651 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1652 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1653 ) 

1654 norm_rules = list( 

1655 builtin_mode_normalization_rules( 

1656 self._dpkg_architecture_variables, 

1657 package_transformation.binary_package, 

1658 package_transformation.substitution, 

1659 ) 

1660 ) 

1661 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1662 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1663 for transformation in package_transformation.transformations: 

1664 transformation.run_transform_file_system(fs_root, condition_context) 

1665 interpreter_normalization = NormalizeShebangLineTransformation() 

1666 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1667 

1668 def finalize_data_tar_contents( 

1669 self, 

1670 package: str, 

1671 fs_root: InMemoryVirtualPathBase, 

1672 clamp_mtime_to: int, 

1673 ) -> IntermediateManifest: 

1674 if package in self._used_for: 1674 ↛ 1675line 1674 didn't jump to line 1675 because the condition on line 1674 was never true

1675 raise ValueError( 

1676 f"data.tar contents for {package} has already been finalized!?" 

1677 ) 

1678 if package not in self.package_transformations: 1678 ↛ 1679line 1678 didn't jump to line 1679 because the condition on line 1678 was never true

1679 raise ValueError( 

1680 f'The package "{package}" was not relevant for the manifest!?' 

1681 ) 

1682 self._used_for.add(package) 

1683 

1684 # At this point, there so be no further mutations to the file system (because the will not 

1685 # be present in the intermediate manifest) 

1686 cast("FSRootDir", fs_root).is_read_write = False 

1687 

1688 intermediate_manifest = list( 

1689 _generate_intermediate_manifest( 

1690 fs_root, 

1691 clamp_mtime_to, 

1692 ) 

1693 ) 

1694 return intermediate_manifest 

1695 

1696 def apply_to_binary_staging_directory( 

1697 self, 

1698 package: str, 

1699 fs_root: InMemoryVirtualPathBase, 

1700 clamp_mtime_to: int, 

1701 ) -> IntermediateManifest: 

1702 self.apply_fs_transformations(package, fs_root) 

1703 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1704 

1705 

1706@dataclasses.dataclass(slots=True) 

1707class SearchDirOrderState: 

1708 search_dir: VirtualPath 

1709 applies_to: set[BinaryPackage] = dataclasses.field(default_factory=set) 

1710 after: set[str] = dataclasses.field(default_factory=set) 

1711 

1712 

1713def _present_installation_dirs( 

1714 search_dirs: Sequence[SearchDir], 

1715 checked_missing_dirs: Sequence[VirtualPath], 

1716 all_pkgs: frozenset[BinaryPackage], 

1717) -> None: 

1718 _info("The following directories are considered search dirs (in order):") 

1719 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1720 for search_dir in search_dirs: 

1721 applies_to = "" 

1722 if search_dir.applies_to < all_pkgs: 

1723 names = ", ".join(p.name for p in search_dir.applies_to) 

1724 applies_to = f" [only applicable to: {names}]" 

1725 remark = "" 

1726 if not os.path.isdir(search_dir.search_dir.fs_path): 

1727 remark = " (skipped; absent)" 

1728 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1729 

1730 if checked_missing_dirs: 

1731 _info('The following directories are considered for "not-installed" paths;') 

1732 for d in checked_missing_dirs: 

1733 remark = "" 

1734 if not os.path.isdir(d.fs_path): 

1735 remark = " (skipped; absent)" 

1736 _info(f" * {d.fs_path:{max_len}}{remark}") 

1737 

1738 

1739def _determine_search_dir_order( 

1740 requested: Mapping[BinaryPackage, list[VirtualPath]], 

1741 all_pkgs: frozenset[BinaryPackage], 

1742 default_search_dirs: list[VirtualPath], 

1743 source_root: VirtualPath, 

1744) -> Sequence[SearchDir]: 

1745 search_dir_table = dict[str, SearchDirOrderState]() 

1746 assert requested.keys() <= all_pkgs 

1747 for pkg in all_pkgs: 

1748 paths = requested.get(pkg, default_search_dirs) 

1749 previous_search_dir: SearchDirOrderState | None = None 

1750 for path in paths: 

1751 try: 

1752 search_dir_state = search_dir_table[path.fs_path] 

1753 except KeyError: 

1754 search_dir_state = SearchDirOrderState(path) 

1755 search_dir_table[path.fs_path] = search_dir_state 

1756 search_dir_state.applies_to.add(pkg) 

1757 if previous_search_dir is not None: 

1758 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1759 previous_search_dir = search_dir_state 

1760 

1761 search_dirs_in_order = [] 

1762 released = set[str]() 

1763 remaining = set() 

1764 for search_dir_state in search_dir_table.values(): 

1765 if not (search_dir_state.after <= released): 

1766 remaining.add(search_dir_state.search_dir.fs_path) 

1767 continue 

1768 search_dirs_in_order.append(search_dir_state) 

1769 released.add(search_dir_state.search_dir.fs_path) 

1770 

1771 while remaining: 

1772 current_released = len(released) 

1773 for fs_path in remaining: 

1774 search_dir_state = search_dir_table[fs_path] 

1775 if not search_dir_state.after.issubset(released): 

1776 remaining.add(search_dir_state.search_dir.fs_path) 

1777 continue 

1778 search_dirs_in_order.append(search_dir_state) 

1779 released.add(search_dir_state.search_dir.fs_path) 

1780 

1781 if current_released == len(released): 

1782 names = ", ".join(remaining) 

1783 _error( 

1784 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1785 " Note that the search directories across all packages have to be ordered (and the" 

1786 " source root should generally be last)" 

1787 ) 

1788 remaining -= released 

1789 

1790 search_dirs_in_order.append( 

1791 SearchDirOrderState( 

1792 source_root, 

1793 set(all_pkgs), 

1794 ) 

1795 ) 

1796 

1797 return tuple( 

1798 # Avoid duplicating all_pkgs 

1799 SearchDir( 

1800 s.search_dir, 

1801 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1802 ) 

1803 for s in search_dirs_in_order 

1804 )