Coverage for src/debputy/highlevel_manifest.py: 64%

886 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5from contextlib import suppress 

6from dataclasses import dataclass, field 

7from typing import ( 

8 List, 

9 Dict, 

10 Any, 

11 Union, 

12 Optional, 

13 TypeVar, 

14 Generic, 

15 cast, 

16 Set, 

17 Tuple, 

18 FrozenSet, 

19) 

20from collections.abc import Iterable, Mapping, Sequence, Callable 

21 

22from debian.debian_support import DpkgArchTable 

23 

24from debputy.dh.debhelper_emulation import ( 

25 dhe_dbgsym_root_dir, 

26 assert_no_dbgsym_migration, 

27 read_dbgsym_file, 

28) 

29from ._deb_options_profiles import DebBuildOptionsAndProfiles 

30from ._manifest_constants import * 

31from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

32from .builtin_manifest_rules import builtin_mode_normalization_rules 

33from .exceptions import ( 

34 DebputySubstitutionError, 

35 DebputyRuntimeErrorWithPreamble, 

36) 

37from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir 

38from .installations import ( 

39 InstallRule, 

40 SourcePathMatcher, 

41 PathAlreadyInstalledOrDiscardedError, 

42 NoMatchForInstallPatternError, 

43 InstallRuleContext, 

44 BinaryPackageInstallRuleContext, 

45 InstallSearchDirContext, 

46 SearchDir, 

47) 

48from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

49from .maintscript_snippet import ( 

50 DpkgMaintscriptHelperCommand, 

51 MaintscriptSnippetContainer, 

52) 

53from .manifest_conditions import ConditionContext 

54from .manifest_parser.base_types import ( 

55 FileSystemMatchRule, 

56 FileSystemExactMatchRule, 

57 BuildEnvironments, 

58) 

59from .manifest_parser.util import AttributePath 

60from .packager_provided_files import PackagerProvidedFile 

61from .packages import BinaryPackage, SourcePackage 

62from .plugin.api.feature_set import PluginProvidedFeatureSet 

63from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

64from .plugin.api.impl_types import ( 

65 PackageProcessingContextProvider, 

66 PackageDataTable, 

67) 

68from .plugin.api.spec import ( 

69 FlushableSubstvars, 

70 VirtualPath, 

71 DebputyIntegrationMode, 

72 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

73 INTEGRATION_MODE_FULL, 

74) 

75from debputy.plugins.debputy.binary_package_rules import ServiceRule 

76from debputy.plugins.debputy.build_system_rules import BuildRule 

77from .plugin.plugin_state import run_in_context_of_plugin 

78from .substitution import Substitution 

79from .transformation_rules import ( 

80 TransformationRule, 

81 ModeNormalizationTransformationRule, 

82 NormalizeShebangLineTransformation, 

83) 

84from .util import ( 

85 _error, 

86 _warn, 

87 debian_policy_normalize_symlink_target, 

88 generated_content_dir, 

89 _info, 

90) 

91from .yaml import MANIFEST_YAML 

92from .yaml.compat import CommentedMap, CommentedSeq 

93 

94 

95class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

96 

97 @property 

98 def unmatched_paths(self) -> Sequence[VirtualPath]: 

99 return self.args[1] 

100 

101 @property 

102 def search_dir(self) -> VirtualPath: 

103 return self.args[2] 

104 

105 def render_preamble(self) -> None: 

106 _warn( 

107 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

108 ) 

109 _warn("") 

110 for entry in self.unmatched_paths: 

111 desc = _describe_missing_path(entry) 

112 _warn(f" * {desc}") 

113 _warn("") 

114 

115 

116@dataclass(slots=True) 

117class DbgsymInfo: 

118 binary_package: BinaryPackage 

119 dbgsym_fs_root: FSPath 

120 _dbgsym_root_fs: str | None 

121 dbgsym_ids: list[str] 

122 run_dwz: bool 

123 

124 @property 

125 def dbgsym_root_dir(self) -> str: 

126 root_dir = self._dbgsym_root_fs 

127 if root_dir is None: 

128 root_dir = generated_content_dir( 

129 package=self.binary_package, 

130 subdir_key="dbgsym-fs-root", 

131 ) 

132 self._dbgsym_root_fs = root_dir 

133 return root_dir 

134 

135 @property 

136 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

137 return FSControlRootDir.create_root_dir( 

138 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

139 ) 

140 

141 

142@dataclass(slots=True, frozen=True) 

143class BinaryPackageData: 

144 source_package: SourcePackage 

145 binary_package: BinaryPackage 

146 binary_staging_root_dir: str 

147 fs_root: FSPath 

148 substvars: FlushableSubstvars 

149 package_metadata_context: PackageProcessingContextProvider 

150 ctrl_creator: BinaryCtrlAccessorProviderCreator 

151 dbgsym_info: DbgsymInfo 

152 

153 @property 

154 def control_output_dir(self) -> FSControlRootDir: 

155 return FSControlRootDir.create_root_dir( 

156 generated_content_dir( 

157 package=self.binary_package, 

158 subdir_key="DEBIAN", 

159 ) 

160 ) 

161 

162 

163@dataclass(slots=True) 

164class PackageTransformationDefinition: 

165 binary_package: BinaryPackage 

166 substitution: Substitution 

167 is_auto_generated_package: bool 

168 binary_version: str | None = None 

169 search_dirs: list[FileSystemExactMatchRule] | None = None 

170 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field( 

171 default_factory=list 

172 ) 

173 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field( 

174 default_factory=dict 

175 ) 

176 transformations: list[TransformationRule] = field(default_factory=list) 

177 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field( 

178 default_factory=dict 

179 ) 

180 install_rules: list[InstallRule] = field(default_factory=list) 

181 requested_service_rules: list[ServiceRule] = field(default_factory=list) 

182 

183 

184def _path_to_tar_member( 

185 path: FSPath, 

186 clamp_mtime_to: int, 

187) -> TarMember: 

188 mtime = float(clamp_mtime_to) 

189 owner, uid, group, gid = path.tar_owner_info 

190 mode = path.mode 

191 

192 if path.has_fs_path: 

193 mtime = min(mtime, path.mtime) 

194 

195 if path.is_dir: 

196 path_type = PathType.DIRECTORY 

197 elif path.is_file: 

198 # TODO: someday we will need to deal with hardlinks and it might appear here. 

199 path_type = PathType.FILE 

200 elif path.is_symlink: 200 ↛ 220line 200 didn't jump to line 220 because the condition on line 200 was always true

201 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

202 link_target = debian_policy_normalize_symlink_target( 

203 path.path, 

204 path.readlink(), 

205 ) 

206 return TarMember.virtual_path( 

207 path.tar_path, 

208 PathType.SYMLINK, 

209 mtime, 

210 link_target=link_target, 

211 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

212 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

213 mode=0o0777, 

214 owner=owner, 

215 uid=uid, 

216 group=group, 

217 gid=gid, 

218 ) 

219 else: 

220 assert not path.is_symlink 

221 raise AssertionError( 

222 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

223 ) 

224 

225 if not path.has_fs_path: 

226 assert not path.is_file 

227 return TarMember.virtual_path( 

228 path.tar_path, 

229 path_type, 

230 mtime, 

231 mode=mode, 

232 owner=owner, 

233 uid=uid, 

234 group=group, 

235 gid=gid, 

236 ) 

237 may_steal_fs_path = path._can_replace_inline 

238 return TarMember.from_file( 

239 path.tar_path, 

240 path.fs_path, 

241 mode=mode, 

242 uid=uid, 

243 owner=owner, 

244 gid=gid, 

245 group=group, 

246 path_type=path_type, 

247 path_mtime=mtime, 

248 clamp_mtime_to=clamp_mtime_to, 

249 may_steal_fs_path=may_steal_fs_path, 

250 ) 

251 

252 

253def _generate_intermediate_manifest( 

254 fs_root: FSPath, 

255 clamp_mtime_to: int, 

256) -> Iterable[TarMember]: 

257 symlinks = [] 

258 for path in fs_root.all_paths(): 

259 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

260 if tar_member.path_type == PathType.SYMLINK: 

261 symlinks.append(tar_member) 

262 continue 

263 yield tar_member 

264 yield from symlinks 

265 

266 

267ST = TypeVar("ST") 

268T = TypeVar("T") 

269 

270 

271class AbstractYAMLSubStore(Generic[ST]): 

272 def __init__( 

273 self, 

274 parent_store: Any, 

275 parent_key: int | str | None, 

276 store: ST | None = None, 

277 ) -> None: 

278 if parent_store is not None and parent_key is not None: 

279 try: 

280 from_parent_store = parent_store[parent_key] 

281 except (KeyError, IndexError): 

282 from_parent_store = None 

283 if ( 283 ↛ 288line 283 didn't jump to line 288 because the condition on line 283 was never true

284 store is not None 

285 and from_parent_store is not None 

286 and store is not parent_store 

287 ): 

288 raise ValueError( 

289 "Store is provided but is not the one already in the parent store" 

290 ) 

291 if store is None: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true

292 store = from_parent_store 

293 self._parent_store = parent_store 

294 self._parent_key = parent_key 

295 self._is_detached = ( 

296 parent_key is None or parent_store is None or parent_key not in parent_store 

297 ) 

298 assert self._is_detached or store is not None 

299 if store is None: 

300 store = self._create_new_instance() 

301 self._store: ST = store 

302 

303 def _create_new_instance(self) -> ST: 

304 raise NotImplementedError 

305 

306 def create_definition_if_missing(self) -> None: 

307 if self._is_detached: 

308 self.create_definition() 

309 

310 def create_definition(self) -> None: 

311 if not self._is_detached: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 raise RuntimeError("Definition is already present") 

313 parent_store = self._parent_store 

314 if parent_store is None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 raise RuntimeError( 

316 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

317 ) 

318 if isinstance(parent_store, list): 

319 assert self._parent_key is None 

320 self._parent_key = len(parent_store) 

321 self._parent_store.append(self._store) 

322 else: 

323 parent_store[self._parent_key] = self._store 

324 self._is_detached = False 

325 

326 def remove_definition(self) -> None: 

327 self._ensure_attached() 

328 del self._parent_store[self._parent_key] 

329 if isinstance(self._parent_store, list): 

330 self._parent_key = None 

331 self._is_detached = True 

332 

333 def _ensure_attached(self) -> None: 

334 if self._is_detached: 

335 raise RuntimeError("The definition has been removed!") 

336 

337 

338class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]): 

339 def _create_new_instance(self) -> list[T]: 

340 return CommentedSeq() 

341 

342 

343class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]): 

344 def _create_new_instance(self) -> dict[str, T]: 

345 return CommentedMap() 

346 

347 

348class MutableCondition: 

349 @classmethod 

350 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

351 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

352 

353 @classmethod 

354 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

355 return CommentedMap( 

356 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

357 ) 

358 

359 

360class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

361 @classmethod 

362 def new_symlink( 

363 cls, link_path: str, link_target: str, condition: Any | None 

364 ) -> "MutableYAMLSymlink": 

365 inner = { 

366 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

367 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

368 } 

369 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

370 if condition is not None: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 inner["when"] = condition 

372 return cls(None, None, store=CommentedMap(content)) 

373 

374 @property 

375 def symlink_path(self) -> str: 

376 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

377 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

378 ] 

379 

380 @symlink_path.setter 

381 def symlink_path(self, path: str) -> None: 

382 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

384 ] = path 

385 

386 @property 

387 def symlink_target(self) -> str | None: 

388 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

389 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

390 ] 

391 

392 @symlink_target.setter 

393 def symlink_target(self, target: str) -> None: 

394 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

395 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

396 ] = target 

397 

398 

399class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

400 @classmethod 

401 def rm_conffile( 

402 cls, 

403 conffile: str, 

404 prior_to_version: str | None, 

405 owning_package: str | None, 

406 ) -> "MutableYAMLConffileManagementItem": 

407 r = cls( 

408 None, 

409 None, 

410 store=CommentedMap( 

411 { 

412 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

413 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

414 ) 

415 } 

416 ), 

417 ) 

418 r.prior_to_version = prior_to_version 

419 r.owning_package = owning_package 

420 return r 

421 

422 @classmethod 

423 def mv_conffile( 

424 cls, 

425 old_conffile: str, 

426 new_conffile: str, 

427 prior_to_version: str | None, 

428 owning_package: str | None, 

429 ) -> "MutableYAMLConffileManagementItem": 

430 r = cls( 

431 None, 

432 None, 

433 store=CommentedMap( 

434 { 

435 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

436 { 

437 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

438 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

439 } 

440 ) 

441 } 

442 ), 

443 ) 

444 r.prior_to_version = prior_to_version 

445 r.owning_package = owning_package 

446 return r 

447 

448 @property 

449 def _container(self) -> dict[str, Any]: 

450 assert len(self._store) == 1 

451 return next(iter(self._store.values())) 

452 

453 @property 

454 def command(self) -> str: 

455 assert len(self._store) == 1 

456 return next(iter(self._store)) 

457 

458 @property 

459 def obsolete_conffile(self) -> str: 

460 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

461 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

462 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

463 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

464 

465 @obsolete_conffile.setter 

466 def obsolete_conffile(self, value: str) -> None: 

467 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

468 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

469 else: 

470 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

471 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

472 

473 @property 

474 def new_conffile(self) -> str: 

475 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

476 raise TypeError( 

477 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

478 f" This is a {self.command}" 

479 ) 

480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

481 

482 @new_conffile.setter 

483 def new_conffile(self, value: str) -> None: 

484 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

485 raise TypeError( 

486 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

487 f" This is a {self.command}" 

488 ) 

489 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

490 

491 @property 

492 def prior_to_version(self) -> str | None: 

493 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

494 

495 @prior_to_version.setter 

496 def prior_to_version(self, value: str | None) -> None: 

497 if value is None: 

498 try: 

499 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

500 except KeyError: 

501 pass 

502 else: 

503 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

504 

505 @property 

506 def owning_package(self) -> str | None: 

507 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

508 

509 @owning_package.setter 

510 def owning_package(self, value: str | None) -> None: 

511 if value is None: 

512 try: 

513 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

514 except KeyError: 

515 pass 

516 else: 

517 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

518 

519 

520class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

521 def _list_store( 

522 self, key, *, create_if_absent: bool = False 

523 ) -> list[dict[str, Any]] | None: 

524 if self._is_detached or key not in self._store: 

525 if create_if_absent: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 return None 

527 self.create_definition_if_missing() 

528 self._store[key] = [] 

529 return self._store[key] 

530 

531 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

532 parent_store = self._list_store(key, create_if_absent=True) 

533 assert parent_store is not None 

534 if not item._is_detached or ( 534 ↛ 537line 534 didn't jump to line 537 because the condition on line 534 was never true

535 item._parent_store is not None and item._parent_store is not parent_store 

536 ): 

537 raise RuntimeError( 

538 "Item is already attached or associated with a different container" 

539 ) 

540 item._parent_store = parent_store 

541 item.create_definition() 

542 

543 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

544 self._insert_item(MK_TRANSFORMATIONS, symlink) 

545 

546 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

547 store = self._list_store(MK_TRANSFORMATIONS) 

548 if store is None: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 return 

550 for i in range(len(store)): 550 ↛ 551line 550 didn't jump to line 551 because the loop on line 550 never started

551 d = store[i] 

552 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

553 yield MutableYAMLSymlink(store, i) 

554 

555 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

556 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

557 if store is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 return 

559 yield from ( 

560 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

561 ) 

562 

563 def add_conffile_management( 

564 self, conffile_management_item: MutableYAMLConffileManagementItem 

565 ) -> None: 

566 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

567 

568 

569class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

570 @property 

571 def _container(self) -> dict[str, Any]: 

572 assert len(self._store) == 1 

573 return next(iter(self._store.values())) 

574 

575 @property 

576 def into(self) -> list[str] | None: 

577 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

578 if v is None: 

579 return None 

580 if isinstance(v, str): 

581 return [v] 

582 return v 

583 

584 @into.setter 

585 def into(self, new_value: str | list[str] | None) -> None: 

586 if new_value is None: 586 ↛ 590line 586 didn't jump to line 590 because the condition on line 586 was always true

587 with suppress(KeyError): 

588 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

589 return 

590 if isinstance(new_value, str): 

591 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

592 return 

593 new_list = CommentedSeq(new_value) 

594 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

595 

596 @property 

597 def when(self) -> str | Mapping[str, Any] | None: 

598 return self._container[MK_CONDITION_WHEN] 

599 

600 @when.setter 

601 def when(self, new_value: str | Mapping[str, Any] | None) -> None: 

602 if new_value is None: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 with suppress(KeyError): 

604 del self._container[MK_CONDITION_WHEN] 

605 return 

606 if isinstance(new_value, str): 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true

607 self._container[MK_CONDITION_WHEN] = new_value 

608 return 

609 new_map = CommentedMap(new_value) 

610 self._container[MK_CONDITION_WHEN] = new_map 

611 

612 @classmethod 

613 def install_dest( 

614 cls, 

615 sources: str | list[str], 

616 into: str | list[str] | None, 

617 *, 

618 dest_dir: str | None = None, 

619 when: str | Mapping[str, Any] | None = None, 

620 ) -> "MutableYAMLInstallRuleInstall": 

621 k = MK_INSTALLATIONS_INSTALL_SOURCES 

622 if isinstance(sources, str): 

623 k = MK_INSTALLATIONS_INSTALL_SOURCE 

624 r = MutableYAMLInstallRuleInstall( 

625 None, 

626 None, 

627 store=CommentedMap( 

628 { 

629 MK_INSTALLATIONS_INSTALL: CommentedMap( 

630 { 

631 k: sources, 

632 } 

633 ) 

634 } 

635 ), 

636 ) 

637 r.dest_dir = dest_dir 

638 r.into = into 

639 if when is not None: 

640 r.when = when 

641 return r 

642 

643 @classmethod 

644 def multi_dest_install( 

645 cls, 

646 sources: str | list[str], 

647 dest_dirs: Sequence[str], 

648 into: str | list[str] | None, 

649 *, 

650 when: str | Mapping[str, Any] | None = None, 

651 ) -> "MutableYAMLInstallRuleInstall": 

652 k = MK_INSTALLATIONS_INSTALL_SOURCES 

653 if isinstance(sources, str): 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true

654 k = MK_INSTALLATIONS_INSTALL_SOURCE 

655 r = MutableYAMLInstallRuleInstall( 

656 None, 

657 None, 

658 store=CommentedMap( 

659 { 

660 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

661 { 

662 k: sources, 

663 "dest-dirs": dest_dirs, 

664 } 

665 ) 

666 } 

667 ), 

668 ) 

669 r.into = into 

670 if when is not None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 r.when = when 

672 return r 

673 

674 @classmethod 

675 def install_as( 

676 cls, 

677 source: str, 

678 install_as: str, 

679 into: str | list[str] | None, 

680 when: str | Mapping[str, Any] | None = None, 

681 ) -> "MutableYAMLInstallRuleInstall": 

682 r = MutableYAMLInstallRuleInstall( 

683 None, 

684 None, 

685 store=CommentedMap( 

686 { 

687 MK_INSTALLATIONS_INSTALL: CommentedMap( 

688 { 

689 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

690 MK_INSTALLATIONS_INSTALL_AS: install_as, 

691 } 

692 ) 

693 } 

694 ), 

695 ) 

696 r.into = into 

697 if when is not None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 r.when = when 

699 return r 

700 

701 @classmethod 

702 def install_doc_as( 

703 cls, 

704 source: str, 

705 install_as: str, 

706 into: str | list[str] | None, 

707 when: str | Mapping[str, Any] | None = None, 

708 ) -> "MutableYAMLInstallRuleInstall": 

709 r = MutableYAMLInstallRuleInstall( 

710 None, 

711 None, 

712 store=CommentedMap( 

713 { 

714 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

715 { 

716 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

717 MK_INSTALLATIONS_INSTALL_AS: install_as, 

718 } 

719 ) 

720 } 

721 ), 

722 ) 

723 r.into = into 

724 if when is not None: 

725 r.when = when 

726 return r 

727 

728 @classmethod 

729 def install_docs( 

730 cls, 

731 sources: str | list[str], 

732 into: str | list[str] | None, 

733 *, 

734 dest_dir: str | None = None, 

735 when: str | Mapping[str, Any] | None = None, 

736 ) -> "MutableYAMLInstallRuleInstall": 

737 k = MK_INSTALLATIONS_INSTALL_SOURCES 

738 if isinstance(sources, str): 

739 k = MK_INSTALLATIONS_INSTALL_SOURCE 

740 r = MutableYAMLInstallRuleInstall( 

741 None, 

742 None, 

743 store=CommentedMap( 

744 { 

745 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

746 { 

747 k: sources, 

748 } 

749 ) 

750 } 

751 ), 

752 ) 

753 r.into = into 

754 r.dest_dir = dest_dir 

755 if when is not None: 

756 r.when = when 

757 return r 

758 

759 @classmethod 

760 def install_examples( 

761 cls, 

762 sources: str | list[str], 

763 into: str | list[str] | None, 

764 when: str | Mapping[str, Any] | None = None, 

765 ) -> "MutableYAMLInstallRuleInstallExamples": 

766 k = MK_INSTALLATIONS_INSTALL_SOURCES 

767 if isinstance(sources, str): 

768 k = MK_INSTALLATIONS_INSTALL_SOURCE 

769 r = MutableYAMLInstallRuleInstallExamples( 

770 None, 

771 None, 

772 store=CommentedMap( 

773 { 

774 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

775 { 

776 k: sources, 

777 } 

778 ) 

779 } 

780 ), 

781 ) 

782 r.into = into 

783 if when is not None: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true

784 r.when = when 

785 return r 

786 

787 @classmethod 

788 def install_man( 

789 cls, 

790 sources: str | list[str], 

791 into: str | list[str] | None, 

792 language: str | None, 

793 when: str | Mapping[str, Any] | None = None, 

794 ) -> "MutableYAMLInstallRuleMan": 

795 k = MK_INSTALLATIONS_INSTALL_SOURCES 

796 if isinstance(sources, str): 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true

797 k = MK_INSTALLATIONS_INSTALL_SOURCE 

798 r = MutableYAMLInstallRuleMan( 

799 None, 

800 None, 

801 store=CommentedMap( 

802 { 

803 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

804 { 

805 k: sources, 

806 } 

807 ) 

808 } 

809 ), 

810 ) 

811 r.language = language 

812 r.into = into 

813 if when is not None: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 r.when = when 

815 return r 

816 

817 @classmethod 

818 def discard( 

819 cls, 

820 sources: str | list[str], 

821 ) -> "MutableYAMLInstallRuleDiscard": 

822 return MutableYAMLInstallRuleDiscard( 

823 None, 

824 None, 

825 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

826 ) 

827 

828 

829class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

830 pass 

831 

832 

833class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

834 @property 

835 def language(self) -> str | None: 

836 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

837 

838 @language.setter 

839 def language(self, new_value: str | None) -> None: 

840 if new_value is not None: 

841 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

842 return 

843 with suppress(KeyError): 

844 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

845 

846 

847class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

848 pass 

849 

850 

851class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

852 @property 

853 def sources(self) -> list[str]: 

854 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

855 if isinstance(v, str): 

856 return [v] 

857 return v 

858 

859 @sources.setter 

860 def sources(self, new_value: str | list[str]) -> None: 

861 if isinstance(new_value, str): 

862 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

863 return 

864 new_list = CommentedSeq(new_value) 

865 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

866 

867 @property 

868 def dest_dir(self) -> str | None: 

869 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

870 

871 @dest_dir.setter 

872 def dest_dir(self, new_value: str | None) -> None: 

873 if new_value is not None and self.dest_as is not None: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true

874 raise ValueError( 

875 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

876 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

877 ) 

878 if new_value is not None: 

879 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

880 else: 

881 with suppress(KeyError): 

882 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

883 

884 @property 

885 def dest_as(self) -> str | None: 

886 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

887 

888 @dest_as.setter 

889 def dest_as(self, new_value: str | None) -> None: 

890 if new_value is not None: 

891 if self.dest_dir is not None: 

892 raise ValueError( 

893 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

894 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

895 ) 

896 

897 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

898 if isinstance(sources, list): 

899 if len(sources) != 1: 

900 raise ValueError( 

901 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

902 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

903 ) 

904 self.sources = sources[0] 

905 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

906 else: 

907 with suppress(KeyError): 

908 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

909 

910 

911class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

912 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

913 parent_store = self._store 

914 if not install_rule._is_detached or ( 914 ↛ 918line 914 didn't jump to line 918 because the condition on line 914 was never true

915 install_rule._parent_store is not None 

916 and install_rule._parent_store is not parent_store 

917 ): 

918 raise RuntimeError( 

919 "Item is already attached or associated with a different container" 

920 ) 

921 self.create_definition_if_missing() 

922 install_rule._parent_store = parent_store 

923 install_rule.create_definition() 

924 

925 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

926 parent_store = self._store 

927 for install_rule in install_rules: 

928 if not install_rule._is_detached or ( 928 ↛ 932line 928 didn't jump to line 932 because the condition on line 928 was never true

929 install_rule._parent_store is not None 

930 and install_rule._parent_store is not parent_store 

931 ): 

932 raise RuntimeError( 

933 "Item is already attached or associated with a different container" 

934 ) 

935 self.create_definition_if_missing() 

936 install_rule._parent_store = parent_store 

937 install_rule.create_definition() 

938 

939 

940class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

941 @property 

942 def variables(self) -> dict[str, Any]: 

943 return self._store 

944 

945 def __setitem__(self, key: str, value: Any) -> None: 

946 self._store[key] = value 

947 self.create_definition_if_missing() 

948 

949 

950class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

951 def manifest_variables( 

952 self, *, create_if_absent: bool = True 

953 ) -> MutableYAMLManifestVariables: 

954 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

955 if create_if_absent: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 d.create_definition_if_missing() 

957 return d 

958 

959 

960class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]): 

961 def append(self, rule: str) -> None: 

962 self.create_definition_if_missing() 

963 self._store.append(rule) 

964 

965 def __len__(self) -> int: 

966 return len(self._store) 

967 

968 def extend(self, rules: Iterable[str]) -> None: 

969 it = iter(rules) 

970 try: 

971 first_rule = next(it) 

972 except StopIteration: 

973 return 

974 self.create_definition_if_missing() 

975 self._store.append(first_rule) 

976 self._store.extend(it) 

977 

978 

979class MutableYAMLManifest: 

980 def __init__(self, store: Any) -> None: 

981 self._store = store 

982 

983 @classmethod 

984 def empty_manifest(cls) -> "MutableYAMLManifest": 

985 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

986 

987 @property 

988 def manifest_version(self) -> str: 

989 return self._store[MK_MANIFEST_VERSION] 

990 

991 @manifest_version.setter 

992 def manifest_version(self, version: str) -> None: 

993 if version not in SUPPORTED_MANIFEST_VERSIONS: 

994 raise ValueError("Unsupported version") 

995 self._store[MK_MANIFEST_VERSION] = version 

996 

997 def remove_during_clean( 

998 self, 

999 *, 

1000 create_if_absent: bool = True, 

1001 ) -> MutableYAMLRemoveDuringCleanDefinitions: 

1002 d = MutableYAMLRemoveDuringCleanDefinitions( 

1003 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN 

1004 ) 

1005 if create_if_absent: 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true

1006 d.create_definition_if_missing() 

1007 return d 

1008 

1009 def installations( 

1010 self, 

1011 *, 

1012 create_if_absent: bool = True, 

1013 ) -> MutableYAMLInstallationsDefinition: 

1014 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

1015 if create_if_absent: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 d.create_definition_if_missing() 

1017 return d 

1018 

1019 def manifest_definitions( 

1020 self, 

1021 *, 

1022 create_if_absent: bool = True, 

1023 ) -> MutableYAMLManifestDefinitions: 

1024 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

1025 if create_if_absent: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true

1026 d.create_definition_if_missing() 

1027 return d 

1028 

1029 def package( 

1030 self, name: str, *, create_if_absent: bool = True 

1031 ) -> MutableYAMLPackageDefinition: 

1032 if MK_PACKAGES not in self._store: 1032 ↛ 1034line 1032 didn't jump to line 1034 because the condition on line 1032 was always true

1033 self._store[MK_PACKAGES] = CommentedMap() 

1034 packages_store = self._store[MK_PACKAGES] 

1035 package = packages_store.get(name) 

1036 if package is None: 1036 ↛ 1043line 1036 didn't jump to line 1043 because the condition on line 1036 was always true

1037 if not create_if_absent: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true

1038 raise KeyError(name) 

1039 assert packages_store is not None 

1040 d = MutableYAMLPackageDefinition(packages_store, name) 

1041 d.create_definition() 

1042 else: 

1043 d = MutableYAMLPackageDefinition(packages_store, name) 

1044 return d 

1045 

1046 def write_to(self, fd) -> None: 

1047 MANIFEST_YAML.dump(self._store, fd) 

1048 

1049 

1050def _describe_missing_path(entry: VirtualPath) -> str: 

1051 if entry.is_dir: 

1052 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1053 if entry.is_symlink: 

1054 target = os.readlink(entry.fs_path) 

1055 return f"{entry.fs_path} (symlink; links to {target})" 

1056 if entry.is_file: 

1057 return f"{entry.fs_path} (file)" 

1058 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1059 

1060 

1061def _detect_missing_installations( 

1062 path_matcher: SourcePathMatcher, 

1063 search_dir: VirtualPath, 

1064) -> None: 

1065 if not search_dir.is_dir: 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true

1066 return 

1067 missing = list(path_matcher.detect_missing(search_dir)) 

1068 if not missing: 

1069 return 

1070 

1071 excl = textwrap.dedent( 

1072 """\ 

1073 - discard: "*" 

1074 """ 

1075 ) 

1076 

1077 raise PathNotCoveredByInstallRulesError( 

1078 "Please review the list and add either install rules or exclusions to `installations` in" 

1079 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1080 f" end of your 'installations`:\n\n{excl}\n", 

1081 missing, 

1082 search_dir, 

1083 ) 

1084 

1085 

1086def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1087 used_discard_rules = path_matcher.used_auto_discard_rules 

1088 # Discard rules can match and then be overridden. In that case, they appear 

1089 # but have 0 matches. 

1090 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1091 return 

1092 _info("The following automatic discard rules were triggered:") 

1093 example_path: str | None = None 

1094 for rule in sorted(used_discard_rules): 

1095 for fs_path in sorted(used_discard_rules[rule]): 

1096 if example_path is None: 1096 ↛ 1098line 1096 didn't jump to line 1098 because the condition on line 1096 was always true

1097 example_path = fs_path 

1098 _info(f" * {rule} -> {fs_path}") 

1099 assert example_path is not None 

1100 _info("") 

1101 _info( 

1102 "Note that some of these may have been overruled. The overrule detection logic is not" 

1103 ) 

1104 _info("100% reliable.") 

1105 _info("") 

1106 _info( 

1107 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1108 ) 

1109 _info(" installations:") 

1110 _info(" - install:") 

1111 _info(f" source: {example_path}") 

1112 

1113 

1114def _install_everything_from_source_dir_if_present( 

1115 dctrl_bin: BinaryPackage, 

1116 substitution: Substitution, 

1117 path_matcher: SourcePathMatcher, 

1118 install_rule_context: InstallRuleContext, 

1119 source_condition_context: ConditionContext, 

1120 source_dir: VirtualPath, 

1121 *, 

1122 into_dir: VirtualPath | None = None, 

1123) -> None: 

1124 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1125 pkg_set = frozenset([dctrl_bin]) 

1126 install_rule = run_in_context_of_plugin( 

1127 "debputy", 

1128 InstallRule.install_dest, 

1129 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1130 None, 

1131 pkg_set, 

1132 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1133 None, 

1134 ) 

1135 pkg_search_dir: tuple[SearchDir] = ( 

1136 SearchDir( 

1137 source_dir, 

1138 pkg_set, 

1139 ), 

1140 ) 

1141 replacements = { 

1142 "search_dirs": pkg_search_dir, 

1143 } 

1144 if into_dir is not None: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true

1145 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1146 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1147 binary_package_contexts[dctrl_bin.name] = updated 

1148 replacements["binary_package_contexts"] = binary_package_contexts 

1149 

1150 fake_install_rule_context = install_rule_context.replace(**replacements) 

1151 try: 

1152 install_rule.perform_install( 

1153 path_matcher, 

1154 fake_install_rule_context, 

1155 source_condition_context, 

1156 ) 

1157 except ( 

1158 NoMatchForInstallPatternError, 

1159 PathAlreadyInstalledOrDiscardedError, 

1160 ): 

1161 # Empty directory or everything excluded by default; ignore the error 

1162 pass 

1163 

1164 

1165def _add_build_install_dirs_to_per_package_search_dirs( 

1166 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1167 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]], 

1168 as_path: Callable[[str], VirtualPath], 

1169) -> None: 

1170 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set() 

1171 for dest_dir, for_packages in build_system_install_dirs: 

1172 dest_path = as_path(dest_dir) 

1173 for pkg in for_packages: 

1174 seen_key = (pkg, dest_dir) 

1175 if seen_key in seen_pp_search_dirs: 

1176 continue 

1177 seen_pp_search_dirs.add(seen_key) 

1178 if pkg not in per_package_search_dirs: 

1179 per_package_search_dirs[pkg] = [dest_path] 

1180 else: 

1181 per_package_search_dirs[pkg].append(dest_path) 

1182 

1183 

1184class HighLevelManifest: 

1185 def __init__( 

1186 self, 

1187 manifest_path: str, 

1188 mutable_manifest: MutableYAMLManifest | None, 

1189 remove_during_clean_rules: list[FileSystemMatchRule], 

1190 install_rules: list[InstallRule] | None, 

1191 source_package: SourcePackage, 

1192 binary_packages: Mapping[str, BinaryPackage], 

1193 substitution: Substitution, 

1194 package_transformations: Mapping[str, PackageTransformationDefinition], 

1195 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1196 dpkg_arch_query_table: DpkgArchTable, 

1197 build_env: DebBuildOptionsAndProfiles, 

1198 build_environments: BuildEnvironments, 

1199 build_rules: list[BuildRule] | None, 

1200 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1201 debian_dir: VirtualPath, 

1202 ) -> None: 

1203 self.manifest_path = manifest_path 

1204 self.mutable_manifest = mutable_manifest 

1205 self._remove_during_clean_rules: list[FileSystemMatchRule] = ( 

1206 remove_during_clean_rules 

1207 ) 

1208 self._install_rules = install_rules 

1209 self._source_package = source_package 

1210 self._binary_packages = binary_packages 

1211 self.substitution = substitution 

1212 self.package_transformations = package_transformations 

1213 self._dpkg_architecture_variables = dpkg_architecture_variables 

1214 self._dpkg_arch_query_table = dpkg_arch_query_table 

1215 self._build_env = build_env 

1216 self._used_for: set[str] = set() 

1217 self.build_environments = build_environments 

1218 self.build_rules = build_rules 

1219 self._plugin_provided_feature_set = plugin_provided_feature_set 

1220 self._debian_dir = debian_dir 

1221 self._source_condition_context = ConditionContext( 

1222 binary_package=None, 

1223 substitution=self.substitution, 

1224 deb_options_and_profiles=self._build_env, 

1225 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1226 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1227 ) 

1228 

1229 def source_version(self, include_binnmu_version: bool = True) -> str: 

1230 # TODO: There should an easier way to determine the source version; really. 

1231 version_var = "{{DEB_VERSION}}" 

1232 if not include_binnmu_version: 

1233 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1234 try: 

1235 return self.substitution.substitute( 

1236 version_var, "internal (resolve version)" 

1237 ) 

1238 except DebputySubstitutionError as e: 

1239 raise AssertionError(f"Could not resolve {version_var}") from e 

1240 

1241 @property 

1242 def source_condition_context(self) -> ConditionContext: 

1243 return self._source_condition_context 

1244 

1245 @property 

1246 def debian_dir(self) -> VirtualPath: 

1247 return self._debian_dir 

1248 

1249 @property 

1250 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1251 return self._dpkg_architecture_variables 

1252 

1253 @property 

1254 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1255 return self._build_env 

1256 

1257 @property 

1258 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1259 return self._plugin_provided_feature_set 

1260 

1261 @property 

1262 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]: 

1263 return self._remove_during_clean_rules 

1264 

1265 @property 

1266 def active_packages(self) -> Iterable[BinaryPackage]: 

1267 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1268 

1269 @property 

1270 def all_packages(self) -> Iterable[BinaryPackage]: 

1271 yield from self._binary_packages.values() 

1272 

1273 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1274 return self.package_transformations[package] 

1275 

1276 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1277 name = package.name 

1278 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"): 

1279 doc_main_package_name = package.fields.get(doc_main_field) 

1280 if doc_main_package_name: 1280 ↛ 1281line 1280 didn't jump to line 1281 because the condition on line 1280 was never true

1281 main_package = self._binary_packages.get(doc_main_package_name) 

1282 if main_package is None: 

1283 _error( 

1284 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control" 

1285 ) 

1286 return main_package 

1287 # If it is not a -doc package, then docs should be installed 

1288 # under its own package name. 

1289 if not name.endswith("-doc"): 1289 ↛ 1291line 1289 didn't jump to line 1291 because the condition on line 1289 was always true

1290 return package 

1291 name = name[:-4] 

1292 main_package = self._binary_packages.get(name) 

1293 if main_package: 

1294 return main_package 

1295 if name.startswith("lib"): 

1296 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1297 if dev_pkg: 

1298 return dev_pkg 

1299 

1300 # If we found no better match; default to the doc package itself. 

1301 return package 

1302 

1303 def perform_installations( 

1304 self, 

1305 integration_mode: DebputyIntegrationMode, 

1306 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1307 *, 

1308 install_request_context: InstallSearchDirContext | None = None, 

1309 ) -> PackageDataTable: 

1310 package_data_dict = {} 

1311 package_data_table = PackageDataTable(package_data_dict) 

1312 enable_manifest_installation_feature = ( 

1313 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1314 ) 

1315 

1316 if build_system_install_dirs: 1316 ↛ 1317line 1316 didn't jump to line 1317 because the condition on line 1316 was never true

1317 if integration_mode != INTEGRATION_MODE_FULL: 

1318 raise ValueError( 

1319 "The build_system_install_dirs parameter can only be used in full integration mode" 

1320 ) 

1321 if install_request_context: 

1322 raise ValueError( 

1323 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1324 " (not implemented)" 

1325 ) 

1326 

1327 if install_request_context is None: 1327 ↛ 1329line 1327 didn't jump to line 1329 because the condition on line 1327 was never true

1328 

1329 @functools.lru_cache(None) 

1330 def _as_path(fs_path: str) -> VirtualPath: 

1331 return FSROOverlay.create_root_dir(".", fs_path) 

1332 

1333 dtmp_dir = _as_path("debian/tmp") 

1334 source_root_dir = _as_path(".") 

1335 into = frozenset(self._binary_packages.values()) 

1336 per_package_search_dirs = { 

1337 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1338 for t in self.package_transformations.values() 

1339 if t.search_dirs is not None 

1340 } 

1341 

1342 if integration_mode == INTEGRATION_MODE_FULL: 

1343 # In this mode, we have no default search dir. Everything ends up being 

1344 # per-package instead (since that is easier logic-wise). 

1345 # 

1346 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1347 # Note we still initialize dtmp_dir, since it affects a later guard. 

1348 default_search_dirs = [] 

1349 _add_build_install_dirs_to_per_package_search_dirs( 

1350 build_system_install_dirs, 

1351 per_package_search_dirs, 

1352 _as_path, 

1353 ) 

1354 

1355 # We can end here with per_package_search_dirs having no search dirs for any package 

1356 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1357 # 

1358 # This is not a problem in itself as the installation rules can still apply to the 

1359 # source root and there should be no reason to install something from d/<pkg> into 

1360 # d/<another-pkg> 

1361 else: 

1362 default_search_dirs = [dtmp_dir] 

1363 

1364 search_dirs = _determine_search_dir_order( 

1365 per_package_search_dirs, 

1366 into, 

1367 default_search_dirs, 

1368 source_root_dir, 

1369 ) 

1370 check_for_uninstalled_dirs = tuple( 

1371 s.search_dir 

1372 for s in search_dirs 

1373 if s.search_dir.fs_path != source_root_dir.fs_path 

1374 ) 

1375 if enable_manifest_installation_feature: 

1376 _present_installation_dirs( 

1377 search_dirs, check_for_uninstalled_dirs, into 

1378 ) 

1379 else: 

1380 dtmp_dir = None 

1381 search_dirs = install_request_context.search_dirs 

1382 into = frozenset(self._binary_packages.values()) 

1383 seen: set[BinaryPackage] = set() 

1384 for search_dir in search_dirs: 

1385 seen.update(search_dir.applies_to) 

1386 

1387 missing = into - seen 

1388 if missing: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the condition on line 1388 was never true

1389 names = ", ".join(p.name for p in missing) 

1390 raise ValueError( 

1391 f"The following package(s) had no search dirs: {names}." 

1392 " (Generally, the source root would be applicable to all packages)" 

1393 ) 

1394 extra_names = seen - into 

1395 if extra_names: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true

1396 names = ", ".join(p.name for p in extra_names) 

1397 raise ValueError( 

1398 f"The install_request_context referenced the following unknown package(s): {names}" 

1399 ) 

1400 

1401 check_for_uninstalled_dirs = ( 

1402 install_request_context.check_for_uninstalled_dirs 

1403 ) 

1404 

1405 install_rule_context = InstallRuleContext(search_dirs) 

1406 

1407 if ( 1407 ↛ 1414line 1407 didn't jump to line 1414 because the condition on line 1407 was never true

1408 enable_manifest_installation_feature 

1409 and self._install_rules is None 

1410 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1411 and dtmp_dir is not None 

1412 and os.path.isdir(dtmp_dir.fs_path) 

1413 ): 

1414 msg = ( 

1415 "The build system appears to have provided the output of upstream build system's" 

1416 " install in debian/tmp. However, these are no provisions for debputy to install" 

1417 " any of that into any of the debian packages listed in debian/control." 

1418 " To avoid accidentally creating empty packages, debputy will insist that you " 

1419 " explicitly define an empty installation definition if you did not want to " 

1420 " install any of those files even though they have been provided." 

1421 ' Example: "installations: []"' 

1422 ) 

1423 _error(msg) 

1424 elif ( 1424 ↛ 1427line 1424 didn't jump to line 1427 because the condition on line 1424 was never true

1425 not enable_manifest_installation_feature and self._install_rules is not None 

1426 ): 

1427 _error( 

1428 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1429 f" Please remove or comment out the `installations` keyword." 

1430 ) 

1431 

1432 for dctrl_bin in self.all_packages: 

1433 package = dctrl_bin.name 

1434 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1435 

1436 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1437 dctrl_bin, 

1438 FSRootDir(), 

1439 doc_main_package, 

1440 ) 

1441 

1442 if enable_manifest_installation_feature: 1442 ↛ 1447line 1442 didn't jump to line 1447 because the condition on line 1442 was always true

1443 discard_rules = list( 

1444 self.plugin_provided_feature_set.auto_discard_rules.values() 

1445 ) 

1446 else: 

1447 discard_rules = [ 

1448 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1449 ] 

1450 path_matcher = SourcePathMatcher(discard_rules) 

1451 

1452 source_condition_context = self._source_condition_context 

1453 

1454 for dctrl_bin in self.active_packages: 

1455 package = dctrl_bin.name 

1456 if install_request_context: 1456 ↛ 1461line 1456 didn't jump to line 1461 because the condition on line 1456 was always true

1457 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1458 package 

1459 ) 

1460 else: 

1461 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1462 if os.path.isdir(build_system_staging_dir_fs_path): 

1463 build_system_staging_dir = FSROOverlay.create_root_dir( 

1464 ".", 

1465 build_system_staging_dir_fs_path, 

1466 ) 

1467 else: 

1468 build_system_staging_dir = None 

1469 

1470 if build_system_staging_dir is not None: 

1471 _install_everything_from_source_dir_if_present( 

1472 dctrl_bin, 

1473 self.substitution, 

1474 path_matcher, 

1475 install_rule_context, 

1476 source_condition_context, 

1477 build_system_staging_dir, 

1478 ) 

1479 

1480 if self._install_rules: 

1481 # FIXME: Check that every install rule remains used after transformations have run. 

1482 # What we want to check is transformations do not exclude everything from an install 

1483 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1484 # match. 

1485 for install_rule in self._install_rules: 

1486 install_rule.perform_install( 

1487 path_matcher, 

1488 install_rule_context, 

1489 source_condition_context, 

1490 ) 

1491 

1492 if enable_manifest_installation_feature: 1492 ↛ 1496line 1492 didn't jump to line 1496 because the condition on line 1492 was always true

1493 for search_dir in check_for_uninstalled_dirs: 

1494 _detect_missing_installations(path_matcher, search_dir) 

1495 

1496 for dctrl_bin in self.all_packages: 

1497 package = dctrl_bin.name 

1498 binary_install_rule_context = install_rule_context[package] 

1499 build_system_pkg_staging_dir = os.path.join("debian", package) 

1500 fs_root = binary_install_rule_context.fs_root 

1501 

1502 context = self.package_transformations[package] 

1503 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1504 for special_install_rule in context.install_rules: 1504 ↛ 1505line 1504 didn't jump to line 1505 because the loop on line 1504 never started

1505 special_install_rule.perform_install( 

1506 path_matcher, 

1507 install_rule_context, 

1508 source_condition_context, 

1509 ) 

1510 

1511 if dctrl_bin.should_be_acted_on: 

1512 self.apply_fs_transformations(package, fs_root) 

1513 substvars_file = f"debian/{package}.substvars" 

1514 substvars = FlushableSubstvars.load_from_path( 

1515 substvars_file, missing_ok=True 

1516 ) 

1517 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1518 substvars.substvars_path = None 

1519 else: 

1520 substvars = FlushableSubstvars() 

1521 

1522 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1523 if udeb_package and not udeb_package.is_udeb: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true

1524 udeb_package = None 

1525 

1526 package_metadata_context = PackageProcessingContextProvider( 

1527 self, 

1528 dctrl_bin, 

1529 udeb_package, 

1530 package_data_table, 

1531 # FIXME: source_package 

1532 ) 

1533 

1534 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1535 package_metadata_context, 

1536 substvars, 

1537 context.maintscript_snippets, 

1538 context.substitution, 

1539 ) 

1540 

1541 if not enable_manifest_installation_feature: 1541 ↛ 1542line 1541 didn't jump to line 1542 because the condition on line 1541 was never true

1542 assert_no_dbgsym_migration(dctrl_bin) 

1543 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1544 dh_dbgsym_root_path = FSROOverlay.create_root_dir( 

1545 "", 

1546 dh_dbgsym_root_fs, 

1547 ) 

1548 dbgsym_root_fs = FSRootDir() 

1549 _install_everything_from_source_dir_if_present( 

1550 dctrl_bin, 

1551 self.substitution, 

1552 path_matcher, 

1553 install_rule_context, 

1554 source_condition_context, 

1555 dh_dbgsym_root_path, 

1556 into_dir=dbgsym_root_fs, 

1557 ) 

1558 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1559 dbgsym_info = DbgsymInfo( 

1560 dctrl_bin, 

1561 dbgsym_root_fs, 

1562 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1563 dbgsym_build_ids, 

1564 # TODO: Provide manifest feature to support this. 

1565 False, 

1566 ) 

1567 else: 

1568 dbgsym_info = DbgsymInfo( 

1569 dctrl_bin, 

1570 FSRootDir(), 

1571 None, 

1572 [], 

1573 False, 

1574 ) 

1575 

1576 package_data_dict[package] = BinaryPackageData( 

1577 self._source_package, 

1578 dctrl_bin, 

1579 build_system_pkg_staging_dir, 

1580 fs_root, 

1581 substvars, 

1582 package_metadata_context, 

1583 ctrl_creator, 

1584 dbgsym_info, 

1585 ) 

1586 

1587 if enable_manifest_installation_feature: 1587 ↛ 1590line 1587 didn't jump to line 1590 because the condition on line 1587 was always true

1588 _list_automatic_discard_rules(path_matcher) 

1589 

1590 return package_data_table 

1591 

1592 def condition_context( 

1593 self, binary_package: BinaryPackage | str | None 

1594 ) -> ConditionContext: 

1595 if binary_package is None: 1595 ↛ 1596line 1595 didn't jump to line 1596 because the condition on line 1595 was never true

1596 return self._source_condition_context 

1597 if not isinstance(binary_package, str): 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true

1598 binary_package = binary_package.name 

1599 

1600 package_transformation = self.package_transformations[binary_package] 

1601 return self._source_condition_context.replace( 

1602 binary_package=package_transformation.binary_package, 

1603 substitution=package_transformation.substitution, 

1604 ) 

1605 

1606 def apply_fs_transformations( 

1607 self, 

1608 package: str, 

1609 fs_root: FSPath, 

1610 ) -> None: 

1611 if package in self._used_for: 1611 ↛ 1612line 1611 didn't jump to line 1612 because the condition on line 1611 was never true

1612 raise ValueError( 

1613 f"data.tar contents for {package} has already been finalized!?" 

1614 ) 

1615 if package not in self.package_transformations: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true

1616 raise ValueError( 

1617 f'The package "{package}" was not relevant for the manifest!?' 

1618 ) 

1619 package_transformation = self.package_transformations[package] 

1620 condition_context = ConditionContext( 

1621 binary_package=package_transformation.binary_package, 

1622 substitution=package_transformation.substitution, 

1623 deb_options_and_profiles=self._build_env, 

1624 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1625 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1626 ) 

1627 norm_rules = list( 

1628 builtin_mode_normalization_rules( 

1629 self._dpkg_architecture_variables, 

1630 package_transformation.binary_package, 

1631 package_transformation.substitution, 

1632 ) 

1633 ) 

1634 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1635 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1636 for transformation in package_transformation.transformations: 

1637 transformation.run_transform_file_system(fs_root, condition_context) 

1638 interpreter_normalization = NormalizeShebangLineTransformation() 

1639 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1640 

1641 def finalize_data_tar_contents( 

1642 self, 

1643 package: str, 

1644 fs_root: FSPath, 

1645 clamp_mtime_to: int, 

1646 ) -> IntermediateManifest: 

1647 if package in self._used_for: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true

1648 raise ValueError( 

1649 f"data.tar contents for {package} has already been finalized!?" 

1650 ) 

1651 if package not in self.package_transformations: 1651 ↛ 1652line 1651 didn't jump to line 1652 because the condition on line 1651 was never true

1652 raise ValueError( 

1653 f'The package "{package}" was not relevant for the manifest!?' 

1654 ) 

1655 self._used_for.add(package) 

1656 

1657 # At this point, there so be no further mutations to the file system (because the will not 

1658 # be present in the intermediate manifest) 

1659 cast("FSRootDir", fs_root).is_read_write = False 

1660 

1661 intermediate_manifest = list( 

1662 _generate_intermediate_manifest( 

1663 fs_root, 

1664 clamp_mtime_to, 

1665 ) 

1666 ) 

1667 return intermediate_manifest 

1668 

1669 def apply_to_binary_staging_directory( 

1670 self, 

1671 package: str, 

1672 fs_root: FSPath, 

1673 clamp_mtime_to: int, 

1674 ) -> IntermediateManifest: 

1675 self.apply_fs_transformations(package, fs_root) 

1676 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1677 

1678 

1679@dataclasses.dataclass(slots=True) 

1680class SearchDirOrderState: 

1681 search_dir: VirtualPath 

1682 applies_to: set[BinaryPackage] | frozenset[BinaryPackage] = dataclasses.field( 

1683 default_factory=set 

1684 ) 

1685 after: set[str] = dataclasses.field(default_factory=set) 

1686 

1687 

1688def _present_installation_dirs( 

1689 search_dirs: Sequence[SearchDir], 

1690 checked_missing_dirs: Sequence[VirtualPath], 

1691 all_pkgs: frozenset[BinaryPackage], 

1692) -> None: 

1693 _info("The following directories are considered search dirs (in order):") 

1694 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1695 for search_dir in search_dirs: 

1696 applies_to = "" 

1697 if search_dir.applies_to < all_pkgs: 

1698 names = ", ".join(p.name for p in search_dir.applies_to) 

1699 applies_to = f" [only applicable to: {names}]" 

1700 remark = "" 

1701 if not os.path.isdir(search_dir.search_dir.fs_path): 

1702 remark = " (skipped; absent)" 

1703 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1704 

1705 if checked_missing_dirs: 

1706 _info('The following directories are considered for "not-installed" paths;') 

1707 for d in checked_missing_dirs: 

1708 remark = "" 

1709 if not os.path.isdir(d.fs_path): 

1710 remark = " (skipped; absent)" 

1711 _info(f" * {d.fs_path:{max_len}}{remark}") 

1712 

1713 

1714def _determine_search_dir_order( 

1715 requested: Mapping[BinaryPackage, list[VirtualPath]], 

1716 all_pkgs: frozenset[BinaryPackage], 

1717 default_search_dirs: list[VirtualPath], 

1718 source_root: VirtualPath, 

1719) -> Sequence[SearchDir]: 

1720 search_dir_table = {} 

1721 assert requested.keys() <= all_pkgs 

1722 for pkg in all_pkgs: 

1723 paths = requested.get(pkg, default_search_dirs) 

1724 previous_search_dir: SearchDirOrderState | None = None 

1725 for path in paths: 

1726 try: 

1727 search_dir_state = search_dir_table[path.fs_path] 

1728 except KeyError: 

1729 search_dir_state = SearchDirOrderState(path) 

1730 search_dir_table[path.fs_path] = search_dir_state 

1731 search_dir_state.applies_to.add(pkg) 

1732 if previous_search_dir is not None: 

1733 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1734 previous_search_dir = search_dir_state 

1735 

1736 search_dirs_in_order = [] 

1737 released = set() 

1738 remaining = set() 

1739 for search_dir_state in search_dir_table.values(): 

1740 if not (search_dir_state.after <= released): 

1741 remaining.add(search_dir_state.search_dir.fs_path) 

1742 continue 

1743 search_dirs_in_order.append(search_dir_state) 

1744 released.add(search_dir_state.search_dir.fs_path) 

1745 

1746 while remaining: 

1747 current_released = len(released) 

1748 for fs_path in remaining: 

1749 search_dir_state = search_dir_table[fs_path] 

1750 if not search_dir_state.after.issubset(released): 

1751 remaining.add(search_dir_state.search_dir.fs_path) 

1752 continue 

1753 search_dirs_in_order.append(search_dir_state) 

1754 released.add(search_dir_state.search_dir.fs_path) 

1755 

1756 if current_released == len(released): 

1757 names = ", ".join(remaining) 

1758 _error( 

1759 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1760 " Note that the search directories across all packages have to be ordered (and the" 

1761 " source root should generally be last)" 

1762 ) 

1763 remaining -= released 

1764 

1765 search_dirs_in_order.append( 

1766 SearchDirOrderState( 

1767 source_root, 

1768 all_pkgs, 

1769 ) 

1770 ) 

1771 

1772 return tuple( 

1773 # Avoid duplicating all_pkgs 

1774 SearchDir( 

1775 s.search_dir, 

1776 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1777 ) 

1778 for s in search_dirs_in_order 

1779 )