Coverage for src/debputy/highlevel_manifest.py: 64%

891 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-12-28 16:12 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5import typing 

6from contextlib import suppress 

7from dataclasses import dataclass, field 

8from typing import ( 

9 Any, 

10 TypeVar, 

11 Generic, 

12 cast, 

13) 

14from collections.abc import Iterable, Mapping, Sequence, Callable 

15 

16from debian.debian_support import DpkgArchTable 

17 

18from debputy.dh.debhelper_emulation import ( 

19 dhe_dbgsym_root_dir, 

20 assert_no_dbgsym_migration, 

21 read_dbgsym_file, 

22) 

23from ._deb_options_profiles import DebBuildOptionsAndProfiles 

24from ._manifest_constants import * 

25from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

26from .builtin_manifest_rules import builtin_mode_normalization_rules 

27from .exceptions import ( 

28 DebputySubstitutionError, 

29 DebputyRuntimeErrorWithPreamble, 

30) 

31from .filesystem_scan import FSPath, FSRootDir, OSFSROOverlay, FSControlRootDir 

32from .installations import ( 

33 InstallRule, 

34 SourcePathMatcher, 

35 PathAlreadyInstalledOrDiscardedError, 

36 NoMatchForInstallPatternError, 

37 InstallRuleContext, 

38 BinaryPackageInstallRuleContext, 

39 InstallSearchDirContext, 

40 SearchDir, 

41) 

42from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

43from .maintscript_snippet import ( 

44 DpkgMaintscriptHelperCommand, 

45 MaintscriptSnippetContainer, 

46) 

47from .manifest_conditions import ConditionContext 

48from .manifest_parser.base_types import ( 

49 FileSystemMatchRule, 

50 FileSystemExactMatchRule, 

51 BuildEnvironments, 

52) 

53from .manifest_parser.util import AttributePath 

54from .packager_provided_files import PackagerProvidedFile 

55from .packages import BinaryPackage, SourcePackage 

56from .plugin.api.feature_set import PluginProvidedFeatureSet 

57from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

58from .plugin.api.impl_types import ( 

59 PackageProcessingContextProvider, 

60 PackageDataTable, 

61) 

62from .plugin.api.spec import ( 

63 FlushableSubstvars, 

64 VirtualPath, 

65 DebputyIntegrationMode, 

66 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

67 INTEGRATION_MODE_FULL, 

68) 

69from debputy.plugins.debputy.binary_package_rules import ServiceRule 

70from debputy.plugins.debputy.build_system_rules import BuildRule 

71from .plugin.plugin_state import run_in_context_of_plugin 

72from .substitution import Substitution 

73from .transformation_rules import ( 

74 TransformationRule, 

75 ModeNormalizationTransformationRule, 

76 NormalizeShebangLineTransformation, 

77) 

78from .util import ( 

79 _error, 

80 _warn, 

81 debian_policy_normalize_symlink_target, 

82 generated_content_dir, 

83 _info, 

84) 

85from .yaml import MANIFEST_YAML 

86from .yaml.compat import CommentedMap, CommentedSeq 

87 

88 

89class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

90 

91 @property 

92 def unmatched_paths(self) -> Sequence[VirtualPath]: 

93 return self.args[1] 

94 

95 @property 

96 def search_dir(self) -> VirtualPath: 

97 return self.args[2] 

98 

99 def render_preamble(self) -> None: 

100 _warn( 

101 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

102 ) 

103 _warn("") 

104 for entry in self.unmatched_paths: 

105 desc = _describe_missing_path(entry) 

106 _warn(f" * {desc}") 

107 _warn("") 

108 

109 

110@dataclass(slots=True) 

111class DbgsymInfo: 

112 binary_package: BinaryPackage 

113 dbgsym_fs_root: FSPath 

114 _dbgsym_root_fs: str | None 

115 dbgsym_ids: list[str] 

116 run_dwz: bool 

117 

118 @property 

119 def dbgsym_root_dir(self) -> str: 

120 root_dir = self._dbgsym_root_fs 

121 if root_dir is None: 

122 root_dir = generated_content_dir( 

123 package=self.binary_package, 

124 subdir_key="dbgsym-fs-root", 

125 ) 

126 self._dbgsym_root_fs = root_dir 

127 return root_dir 

128 

129 @property 

130 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

131 return FSControlRootDir.create_root_dir( 

132 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

133 ) 

134 

135 

136@dataclass(slots=True, frozen=True) 

137class BinaryPackageData: 

138 source_package: SourcePackage 

139 binary_package: BinaryPackage 

140 binary_staging_root_dir: str 

141 fs_root: FSPath 

142 substvars: FlushableSubstvars 

143 package_metadata_context: PackageProcessingContextProvider 

144 ctrl_creator: BinaryCtrlAccessorProviderCreator 

145 dbgsym_info: DbgsymInfo 

146 

147 @property 

148 def control_output_dir(self) -> FSControlRootDir: 

149 return FSControlRootDir.create_root_dir( 

150 generated_content_dir( 

151 package=self.binary_package, 

152 subdir_key="DEBIAN", 

153 ) 

154 ) 

155 

156 

157@dataclass(slots=True) 

158class PackageTransformationDefinition: 

159 binary_package: BinaryPackage 

160 substitution: Substitution 

161 is_auto_generated_package: bool 

162 binary_version: str | None = None 

163 search_dirs: list[FileSystemExactMatchRule] | None = None 

164 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field( 

165 default_factory=list 

166 ) 

167 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field( 

168 default_factory=dict 

169 ) 

170 transformations: list[TransformationRule] = field(default_factory=list) 

171 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field( 

172 default_factory=dict 

173 ) 

174 install_rules: list[InstallRule] = field(default_factory=list) 

175 requested_service_rules: list[ServiceRule] = field(default_factory=list) 

176 

177 

178def _path_to_tar_member( 

179 path: FSPath, 

180 clamp_mtime_to: int, 

181) -> TarMember: 

182 mtime = float(clamp_mtime_to) 

183 owner, uid, group, gid = path.tar_owner_info 

184 mode = path.mode 

185 

186 if path.has_fs_path: 

187 mtime = min(mtime, path.mtime) 

188 

189 if path.is_dir: 

190 path_type = PathType.DIRECTORY 

191 elif path.is_file: 

192 # TODO: someday we will need to deal with hardlinks and it might appear here. 

193 path_type = PathType.FILE 

194 elif path.is_symlink: 194 ↛ 214line 194 didn't jump to line 214 because the condition on line 194 was always true

195 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

196 link_target = debian_policy_normalize_symlink_target( 

197 path.path, 

198 path.readlink(), 

199 ) 

200 return TarMember.virtual_path( 

201 path.tar_path, 

202 PathType.SYMLINK, 

203 mtime, 

204 link_target=link_target, 

205 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

206 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

207 mode=0o0777, 

208 owner=owner, 

209 uid=uid, 

210 group=group, 

211 gid=gid, 

212 ) 

213 else: 

214 assert not path.is_symlink 

215 raise AssertionError( 

216 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

217 ) 

218 

219 if not path.has_fs_path: 

220 assert not path.is_file 

221 return TarMember.virtual_path( 

222 path.tar_path, 

223 path_type, 

224 mtime, 

225 mode=mode, 

226 owner=owner, 

227 uid=uid, 

228 group=group, 

229 gid=gid, 

230 ) 

231 may_steal_fs_path = path._can_replace_inline 

232 return TarMember.from_file( 

233 path.tar_path, 

234 path.fs_path, 

235 mode=mode, 

236 uid=uid, 

237 owner=owner, 

238 gid=gid, 

239 group=group, 

240 path_type=path_type, 

241 path_mtime=mtime, 

242 clamp_mtime_to=clamp_mtime_to, 

243 may_steal_fs_path=may_steal_fs_path, 

244 ) 

245 

246 

247def _generate_intermediate_manifest( 

248 fs_root: FSPath, 

249 clamp_mtime_to: int, 

250) -> Iterable[TarMember]: 

251 symlinks = [] 

252 for path in fs_root.all_paths(): 

253 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

254 if tar_member.path_type == PathType.SYMLINK: 

255 symlinks.append(tar_member) 

256 continue 

257 yield tar_member 

258 yield from symlinks 

259 

260 

261ST = TypeVar("ST") 

262T = TypeVar("T") 

263 

264 

265class AbstractYAMLSubStore(Generic[ST]): 

266 def __init__( 

267 self, 

268 parent_store: Any, 

269 parent_key: int | str | None, 

270 store: ST | None = None, 

271 ) -> None: 

272 if parent_store is not None and parent_key is not None: 

273 try: 

274 from_parent_store = parent_store[parent_key] 

275 except (KeyError, IndexError): 

276 from_parent_store = None 

277 if ( 277 ↛ 282line 277 didn't jump to line 282 because the condition on line 277 was never true

278 store is not None 

279 and from_parent_store is not None 

280 and store is not parent_store 

281 ): 

282 raise ValueError( 

283 "Store is provided but is not the one already in the parent store" 

284 ) 

285 if store is None: 285 ↛ 287line 285 didn't jump to line 287 because the condition on line 285 was always true

286 store = from_parent_store 

287 self._parent_store = parent_store 

288 self._parent_key = parent_key 

289 self._is_detached = ( 

290 parent_key is None or parent_store is None or parent_key not in parent_store 

291 ) 

292 assert self._is_detached or store is not None 

293 if store is None: 

294 store = self._create_new_instance() 

295 self._store: ST = store 

296 

297 def _create_new_instance(self) -> ST: 

298 raise NotImplementedError 

299 

300 def create_definition_if_missing(self) -> None: 

301 if self._is_detached: 

302 self.create_definition() 

303 

304 def create_definition(self) -> None: 

305 if not self._is_detached: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 raise RuntimeError("Definition is already present") 

307 parent_store = self._parent_store 

308 if parent_store is None: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 raise RuntimeError( 

310 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

311 ) 

312 if isinstance(parent_store, list): 

313 assert self._parent_key is None 

314 self._parent_key = len(parent_store) 

315 self._parent_store.append(self._store) 

316 else: 

317 parent_store[self._parent_key] = self._store 

318 self._is_detached = False 

319 

320 def remove_definition(self) -> None: 

321 self._ensure_attached() 

322 del self._parent_store[self._parent_key] 

323 if isinstance(self._parent_store, list): 

324 self._parent_key = None 

325 self._is_detached = True 

326 

327 def _ensure_attached(self) -> None: 

328 if self._is_detached: 

329 raise RuntimeError("The definition has been removed!") 

330 

331 

332class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]): 

333 def _create_new_instance(self) -> list[T]: 

334 return CommentedSeq() 

335 

336 

337class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]): 

338 def _create_new_instance(self) -> dict[str, T]: 

339 return CommentedMap() 

340 

341 

342class MutableCondition: 

343 @classmethod 

344 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

345 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

346 

347 @classmethod 

348 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

349 return CommentedMap( 

350 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

351 ) 

352 

353 

354class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

355 @classmethod 

356 def new_symlink( 

357 cls, link_path: str, link_target: str, condition: Any | None 

358 ) -> "MutableYAMLSymlink": 

359 inner = { 

360 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

361 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

362 } 

363 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

364 if condition is not None: 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 inner["when"] = condition 

366 return cls(None, None, store=CommentedMap(content)) 

367 

368 @property 

369 def symlink_path(self) -> str: 

370 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

371 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

372 ] 

373 

374 @symlink_path.setter 

375 def symlink_path(self, path: str) -> None: 

376 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

377 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

378 ] = path 

379 

380 @property 

381 def symlink_target(self) -> str | None: 

382 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

384 ] 

385 

386 @symlink_target.setter 

387 def symlink_target(self, target: str) -> None: 

388 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

389 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

390 ] = target 

391 

392 

393class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

394 @classmethod 

395 def rm_conffile( 

396 cls, 

397 conffile: str, 

398 prior_to_version: str | None, 

399 owning_package: str | None, 

400 ) -> "MutableYAMLConffileManagementItem": 

401 r = cls( 

402 None, 

403 None, 

404 store=CommentedMap( 

405 { 

406 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

407 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

408 ) 

409 } 

410 ), 

411 ) 

412 r.prior_to_version = prior_to_version 

413 r.owning_package = owning_package 

414 return r 

415 

416 @classmethod 

417 def mv_conffile( 

418 cls, 

419 old_conffile: str, 

420 new_conffile: str, 

421 prior_to_version: str | None, 

422 owning_package: str | None, 

423 ) -> "MutableYAMLConffileManagementItem": 

424 r = cls( 

425 None, 

426 None, 

427 store=CommentedMap( 

428 { 

429 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

430 { 

431 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

432 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

433 } 

434 ) 

435 } 

436 ), 

437 ) 

438 r.prior_to_version = prior_to_version 

439 r.owning_package = owning_package 

440 return r 

441 

442 @property 

443 def _container(self) -> dict[str, Any]: 

444 assert len(self._store) == 1 

445 return next(iter(self._store.values())) 

446 

447 @property 

448 def command(self) -> str: 

449 assert len(self._store) == 1 

450 return next(iter(self._store)) 

451 

452 @property 

453 def obsolete_conffile(self) -> str: 

454 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

455 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

456 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

457 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

458 

459 @obsolete_conffile.setter 

460 def obsolete_conffile(self, value: str) -> None: 

461 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

462 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

463 else: 

464 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

465 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

466 

467 @property 

468 def new_conffile(self) -> str: 

469 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

470 raise TypeError( 

471 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

472 f" This is a {self.command}" 

473 ) 

474 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

475 

476 @new_conffile.setter 

477 def new_conffile(self, value: str) -> None: 

478 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

479 raise TypeError( 

480 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

481 f" This is a {self.command}" 

482 ) 

483 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

484 

485 @property 

486 def prior_to_version(self) -> str | None: 

487 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

488 

489 @prior_to_version.setter 

490 def prior_to_version(self, value: str | None) -> None: 

491 if value is None: 

492 try: 

493 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

494 except KeyError: 

495 pass 

496 else: 

497 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

498 

499 @property 

500 def owning_package(self) -> str | None: 

501 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

502 

503 @owning_package.setter 

504 def owning_package(self, value: str | None) -> None: 

505 if value is None: 

506 try: 

507 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

508 except KeyError: 

509 pass 

510 else: 

511 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

512 

513 

514class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

515 def _list_store( 

516 self, key, *, create_if_absent: bool = False 

517 ) -> list[dict[str, Any]] | None: 

518 if self._is_detached or key not in self._store: 

519 if create_if_absent: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 return None 

521 self.create_definition_if_missing() 

522 self._store[key] = [] 

523 return self._store[key] 

524 

525 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

526 parent_store = self._list_store(key, create_if_absent=True) 

527 assert parent_store is not None 

528 if not item._is_detached or ( 528 ↛ 531line 528 didn't jump to line 531 because the condition on line 528 was never true

529 item._parent_store is not None and item._parent_store is not parent_store 

530 ): 

531 raise RuntimeError( 

532 "Item is already attached or associated with a different container" 

533 ) 

534 item._parent_store = parent_store 

535 item.create_definition() 

536 

537 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

538 self._insert_item(MK_TRANSFORMATIONS, symlink) 

539 

540 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

541 store = self._list_store(MK_TRANSFORMATIONS) 

542 if store is None: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 return 

544 for i in range(len(store)): 544 ↛ 545line 544 didn't jump to line 545 because the loop on line 544 never started

545 d = store[i] 

546 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

547 yield MutableYAMLSymlink(store, i) 

548 

549 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

550 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

551 if store is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 return 

553 yield from ( 

554 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

555 ) 

556 

557 def add_conffile_management( 

558 self, conffile_management_item: MutableYAMLConffileManagementItem 

559 ) -> None: 

560 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

561 

562 

563class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

564 @property 

565 def _container(self) -> dict[str, Any]: 

566 assert len(self._store) == 1 

567 return next(iter(self._store.values())) 

568 

569 @property 

570 def into(self) -> list[str] | None: 

571 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

572 if v is None: 

573 return None 

574 if isinstance(v, str): 

575 return [v] 

576 return v 

577 

578 @into.setter 

579 def into(self, new_value: str | list[str] | None) -> None: 

580 if new_value is None: 580 ↛ 584line 580 didn't jump to line 584 because the condition on line 580 was always true

581 with suppress(KeyError): 

582 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

583 return 

584 if isinstance(new_value, str): 

585 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

586 return 

587 new_list = CommentedSeq(new_value) 

588 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

589 

590 @property 

591 def when(self) -> str | Mapping[str, Any] | None: 

592 return self._container[MK_CONDITION_WHEN] 

593 

594 @when.setter 

595 def when(self, new_value: str | Mapping[str, Any] | None) -> None: 

596 if new_value is None: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 with suppress(KeyError): 

598 del self._container[MK_CONDITION_WHEN] 

599 return 

600 if isinstance(new_value, str): 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true

601 self._container[MK_CONDITION_WHEN] = new_value 

602 return 

603 new_map = CommentedMap(new_value) 

604 self._container[MK_CONDITION_WHEN] = new_map 

605 

606 @classmethod 

607 def install_dest( 

608 cls, 

609 sources: str | list[str], 

610 into: str | list[str] | None, 

611 *, 

612 dest_dir: str | None = None, 

613 when: str | Mapping[str, Any] | None = None, 

614 ) -> "MutableYAMLInstallRuleInstall": 

615 k = MK_INSTALLATIONS_INSTALL_SOURCES 

616 if isinstance(sources, str): 

617 k = MK_INSTALLATIONS_INSTALL_SOURCE 

618 r = MutableYAMLInstallRuleInstall( 

619 None, 

620 None, 

621 store=CommentedMap( 

622 { 

623 MK_INSTALLATIONS_INSTALL: CommentedMap( 

624 { 

625 k: sources, 

626 } 

627 ) 

628 } 

629 ), 

630 ) 

631 r.dest_dir = dest_dir 

632 r.into = into 

633 if when is not None: 

634 r.when = when 

635 return r 

636 

637 @classmethod 

638 def multi_dest_install( 

639 cls, 

640 sources: str | list[str], 

641 dest_dirs: Sequence[str], 

642 into: str | list[str] | None, 

643 *, 

644 when: str | Mapping[str, Any] | None = None, 

645 ) -> "MutableYAMLInstallRuleInstall": 

646 k = MK_INSTALLATIONS_INSTALL_SOURCES 

647 if isinstance(sources, str): 647 ↛ 649line 647 didn't jump to line 649 because the condition on line 647 was always true

648 k = MK_INSTALLATIONS_INSTALL_SOURCE 

649 r = MutableYAMLInstallRuleInstall( 

650 None, 

651 None, 

652 store=CommentedMap( 

653 { 

654 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

655 { 

656 k: sources, 

657 "dest-dirs": dest_dirs, 

658 } 

659 ) 

660 } 

661 ), 

662 ) 

663 r.into = into 

664 if when is not None: 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true

665 r.when = when 

666 return r 

667 

668 @classmethod 

669 def install_as( 

670 cls, 

671 source: str, 

672 install_as: str, 

673 into: str | list[str] | None, 

674 when: str | Mapping[str, Any] | None = None, 

675 ) -> "MutableYAMLInstallRuleInstall": 

676 r = MutableYAMLInstallRuleInstall( 

677 None, 

678 None, 

679 store=CommentedMap( 

680 { 

681 MK_INSTALLATIONS_INSTALL: CommentedMap( 

682 { 

683 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

684 MK_INSTALLATIONS_INSTALL_AS: install_as, 

685 } 

686 ) 

687 } 

688 ), 

689 ) 

690 r.into = into 

691 if when is not None: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true

692 r.when = when 

693 return r 

694 

695 @classmethod 

696 def install_doc_as( 

697 cls, 

698 source: str, 

699 install_as: str, 

700 into: str | list[str] | None, 

701 when: str | Mapping[str, Any] | None = None, 

702 ) -> "MutableYAMLInstallRuleInstall": 

703 r = MutableYAMLInstallRuleInstall( 

704 None, 

705 None, 

706 store=CommentedMap( 

707 { 

708 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

709 { 

710 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

711 MK_INSTALLATIONS_INSTALL_AS: install_as, 

712 } 

713 ) 

714 } 

715 ), 

716 ) 

717 r.into = into 

718 if when is not None: 

719 r.when = when 

720 return r 

721 

722 @classmethod 

723 def install_docs( 

724 cls, 

725 sources: str | list[str], 

726 into: str | list[str] | None, 

727 *, 

728 dest_dir: str | None = None, 

729 when: str | Mapping[str, Any] | None = None, 

730 ) -> "MutableYAMLInstallRuleInstall": 

731 k = MK_INSTALLATIONS_INSTALL_SOURCES 

732 if isinstance(sources, str): 

733 k = MK_INSTALLATIONS_INSTALL_SOURCE 

734 r = MutableYAMLInstallRuleInstall( 

735 None, 

736 None, 

737 store=CommentedMap( 

738 { 

739 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

740 { 

741 k: sources, 

742 } 

743 ) 

744 } 

745 ), 

746 ) 

747 r.into = into 

748 r.dest_dir = dest_dir 

749 if when is not None: 

750 r.when = when 

751 return r 

752 

753 @classmethod 

754 def install_examples( 

755 cls, 

756 sources: str | list[str], 

757 into: str | list[str] | None, 

758 when: str | Mapping[str, Any] | None = None, 

759 ) -> "MutableYAMLInstallRuleInstallExamples": 

760 k = MK_INSTALLATIONS_INSTALL_SOURCES 

761 if isinstance(sources, str): 

762 k = MK_INSTALLATIONS_INSTALL_SOURCE 

763 r = MutableYAMLInstallRuleInstallExamples( 

764 None, 

765 None, 

766 store=CommentedMap( 

767 { 

768 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

769 { 

770 k: sources, 

771 } 

772 ) 

773 } 

774 ), 

775 ) 

776 r.into = into 

777 if when is not None: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true

778 r.when = when 

779 return r 

780 

781 @classmethod 

782 def install_man( 

783 cls, 

784 sources: str | list[str], 

785 into: str | list[str] | None, 

786 language: str | None, 

787 when: str | Mapping[str, Any] | None = None, 

788 ) -> "MutableYAMLInstallRuleMan": 

789 k = MK_INSTALLATIONS_INSTALL_SOURCES 

790 if isinstance(sources, str): 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true

791 k = MK_INSTALLATIONS_INSTALL_SOURCE 

792 r = MutableYAMLInstallRuleMan( 

793 None, 

794 None, 

795 store=CommentedMap( 

796 { 

797 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

798 { 

799 k: sources, 

800 } 

801 ) 

802 } 

803 ), 

804 ) 

805 r.language = language 

806 r.into = into 

807 if when is not None: 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true

808 r.when = when 

809 return r 

810 

811 @classmethod 

812 def discard( 

813 cls, 

814 sources: str | list[str], 

815 ) -> "MutableYAMLInstallRuleDiscard": 

816 return MutableYAMLInstallRuleDiscard( 

817 None, 

818 None, 

819 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

820 ) 

821 

822 

823class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

824 pass 

825 

826 

827class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

828 @property 

829 def language(self) -> str | None: 

830 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

831 

832 @language.setter 

833 def language(self, new_value: str | None) -> None: 

834 if new_value is not None: 

835 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

836 return 

837 with suppress(KeyError): 

838 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

839 

840 

841class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

842 pass 

843 

844 

845class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

846 @property 

847 def sources(self) -> list[str]: 

848 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

849 if isinstance(v, str): 

850 return [v] 

851 return v 

852 

853 @sources.setter 

854 def sources(self, new_value: str | list[str]) -> None: 

855 if isinstance(new_value, str): 

856 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

857 return 

858 new_list = CommentedSeq(new_value) 

859 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

860 

861 @property 

862 def dest_dir(self) -> str | None: 

863 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

864 

865 @dest_dir.setter 

866 def dest_dir(self, new_value: str | None) -> None: 

867 if new_value is not None and self.dest_as is not None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true

868 raise ValueError( 

869 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

870 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

871 ) 

872 if new_value is not None: 

873 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

874 else: 

875 with suppress(KeyError): 

876 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

877 

878 @property 

879 def dest_as(self) -> str | None: 

880 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

881 

882 @dest_as.setter 

883 def dest_as(self, new_value: str | None) -> None: 

884 if new_value is not None: 

885 if self.dest_dir is not None: 

886 raise ValueError( 

887 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

888 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

889 ) 

890 

891 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

892 if isinstance(sources, list): 

893 if len(sources) != 1: 

894 raise ValueError( 

895 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

896 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

897 ) 

898 self.sources = sources[0] 

899 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

900 else: 

901 with suppress(KeyError): 

902 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

903 

904 

905class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

906 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

907 parent_store = self._store 

908 if not install_rule._is_detached or ( 908 ↛ 912line 908 didn't jump to line 912 because the condition on line 908 was never true

909 install_rule._parent_store is not None 

910 and install_rule._parent_store is not parent_store 

911 ): 

912 raise RuntimeError( 

913 "Item is already attached or associated with a different container" 

914 ) 

915 self.create_definition_if_missing() 

916 install_rule._parent_store = parent_store 

917 install_rule.create_definition() 

918 

919 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

920 parent_store = self._store 

921 for install_rule in install_rules: 

922 if not install_rule._is_detached or ( 922 ↛ 926line 922 didn't jump to line 926 because the condition on line 922 was never true

923 install_rule._parent_store is not None 

924 and install_rule._parent_store is not parent_store 

925 ): 

926 raise RuntimeError( 

927 "Item is already attached or associated with a different container" 

928 ) 

929 self.create_definition_if_missing() 

930 install_rule._parent_store = parent_store 

931 install_rule.create_definition() 

932 

933 

934class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

935 @property 

936 def variables(self) -> dict[str, Any]: 

937 return self._store 

938 

939 def __setitem__(self, key: str, value: Any) -> None: 

940 self._store[key] = value 

941 self.create_definition_if_missing() 

942 

943 

944class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

945 def manifest_variables( 

946 self, *, create_if_absent: bool = True 

947 ) -> MutableYAMLManifestVariables: 

948 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

949 if create_if_absent: 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true

950 d.create_definition_if_missing() 

951 return d 

952 

953 

954class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]): 

955 def append(self, rule: str) -> None: 

956 self.create_definition_if_missing() 

957 self._store.append(rule) 

958 

959 def __len__(self) -> int: 

960 return len(self._store) 

961 

962 def extend(self, rules: Iterable[str]) -> None: 

963 it = iter(rules) 

964 try: 

965 first_rule = next(it) 

966 except StopIteration: 

967 return 

968 self.create_definition_if_missing() 

969 self._store.append(first_rule) 

970 self._store.extend(it) 

971 

972 

973class MutableYAMLManifest: 

974 def __init__(self, store: Any) -> None: 

975 self._store = store 

976 

977 @classmethod 

978 def empty_manifest(cls) -> "MutableYAMLManifest": 

979 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

980 

981 @property 

982 def manifest_version(self) -> str: 

983 return self._store[MK_MANIFEST_VERSION] 

984 

985 @manifest_version.setter 

986 def manifest_version(self, version: str) -> None: 

987 if version not in SUPPORTED_MANIFEST_VERSIONS: 

988 raise ValueError("Unsupported version") 

989 self._store[MK_MANIFEST_VERSION] = version 

990 

991 def remove_during_clean( 

992 self, 

993 *, 

994 create_if_absent: bool = True, 

995 ) -> MutableYAMLRemoveDuringCleanDefinitions: 

996 d = MutableYAMLRemoveDuringCleanDefinitions( 

997 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN 

998 ) 

999 if create_if_absent: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true

1000 d.create_definition_if_missing() 

1001 return d 

1002 

1003 def installations( 

1004 self, 

1005 *, 

1006 create_if_absent: bool = True, 

1007 ) -> MutableYAMLInstallationsDefinition: 

1008 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

1009 if create_if_absent: 1009 ↛ 1010line 1009 didn't jump to line 1010 because the condition on line 1009 was never true

1010 d.create_definition_if_missing() 

1011 return d 

1012 

1013 def manifest_definitions( 

1014 self, 

1015 *, 

1016 create_if_absent: bool = True, 

1017 ) -> MutableYAMLManifestDefinitions: 

1018 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

1019 if create_if_absent: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 d.create_definition_if_missing() 

1021 return d 

1022 

1023 def package( 

1024 self, name: str, *, create_if_absent: bool = True 

1025 ) -> MutableYAMLPackageDefinition: 

1026 if MK_PACKAGES not in self._store: 1026 ↛ 1028line 1026 didn't jump to line 1028 because the condition on line 1026 was always true

1027 self._store[MK_PACKAGES] = CommentedMap() 

1028 packages_store = self._store[MK_PACKAGES] 

1029 package = packages_store.get(name) 

1030 if package is None: 1030 ↛ 1037line 1030 didn't jump to line 1037 because the condition on line 1030 was always true

1031 if not create_if_absent: 1031 ↛ 1032line 1031 didn't jump to line 1032 because the condition on line 1031 was never true

1032 raise KeyError(name) 

1033 assert packages_store is not None 

1034 d = MutableYAMLPackageDefinition(packages_store, name) 

1035 d.create_definition() 

1036 else: 

1037 d = MutableYAMLPackageDefinition(packages_store, name) 

1038 return d 

1039 

1040 def write_to(self, fd) -> None: 

1041 MANIFEST_YAML.dump(self._store, fd) 

1042 

1043 

1044def _describe_missing_path(entry: VirtualPath) -> str: 

1045 if entry.is_dir: 

1046 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1047 if entry.is_symlink: 

1048 target = os.readlink(entry.fs_path) 

1049 return f"{entry.fs_path} (symlink; links to {target})" 

1050 if entry.is_file: 

1051 return f"{entry.fs_path} (file)" 

1052 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1053 

1054 

1055def _detect_missing_installations( 

1056 path_matcher: SourcePathMatcher, 

1057 search_dir: VirtualPath, 

1058) -> None: 

1059 if not search_dir.is_dir: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true

1060 return 

1061 missing = list(path_matcher.detect_missing(search_dir)) 

1062 if not missing: 

1063 return 

1064 

1065 excl = textwrap.dedent( 

1066 """\ 

1067 - discard: "*" 

1068 """ 

1069 ) 

1070 

1071 raise PathNotCoveredByInstallRulesError( 

1072 "Please review the list and add either install rules or exclusions to `installations` in" 

1073 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1074 f" end of your 'installations`:\n\n{excl}\n", 

1075 missing, 

1076 search_dir, 

1077 ) 

1078 

1079 

1080def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1081 used_discard_rules = path_matcher.used_auto_discard_rules 

1082 # Discard rules can match and then be overridden. In that case, they appear 

1083 # but have 0 matches. 

1084 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1085 return 

1086 _info("The following automatic discard rules were triggered:") 

1087 example_path: str | None = None 

1088 for rule in sorted(used_discard_rules): 

1089 for fs_path in sorted(used_discard_rules[rule]): 

1090 if example_path is None: 1090 ↛ 1092line 1090 didn't jump to line 1092 because the condition on line 1090 was always true

1091 example_path = fs_path 

1092 _info(f" * {rule} -> {fs_path}") 

1093 assert example_path is not None 

1094 _info("") 

1095 _info( 

1096 "Note that some of these may have been overruled. The overrule detection logic is not" 

1097 ) 

1098 _info("100% reliable.") 

1099 _info("") 

1100 _info( 

1101 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1102 ) 

1103 _info(" installations:") 

1104 _info(" - install:") 

1105 _info(f" source: {example_path}") 

1106 

1107 

1108def _install_everything_from_source_dir_if_present( 

1109 dctrl_bin: BinaryPackage, 

1110 substitution: Substitution, 

1111 path_matcher: SourcePathMatcher, 

1112 install_rule_context: InstallRuleContext, 

1113 source_condition_context: ConditionContext, 

1114 source_dir: VirtualPath, 

1115 *, 

1116 into_dir: VirtualPath | None = None, 

1117) -> None: 

1118 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1119 pkg_set = frozenset([dctrl_bin]) 

1120 install_rule = run_in_context_of_plugin( 

1121 "debputy", 

1122 InstallRule.install_dest, 

1123 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1124 None, 

1125 pkg_set, 

1126 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1127 None, 

1128 ) 

1129 pkg_search_dir: tuple[SearchDir] = ( 

1130 SearchDir( 

1131 source_dir, 

1132 pkg_set, 

1133 ), 

1134 ) 

1135 replacements = { 

1136 "search_dirs": pkg_search_dir, 

1137 } 

1138 if into_dir is not None: 1138 ↛ 1139line 1138 didn't jump to line 1139 because the condition on line 1138 was never true

1139 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1140 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1141 binary_package_contexts[dctrl_bin.name] = updated 

1142 replacements["binary_package_contexts"] = binary_package_contexts 

1143 

1144 fake_install_rule_context = install_rule_context.replace(**replacements) 

1145 try: 

1146 install_rule.perform_install( 

1147 path_matcher, 

1148 fake_install_rule_context, 

1149 source_condition_context, 

1150 ) 

1151 except ( 

1152 NoMatchForInstallPatternError, 

1153 PathAlreadyInstalledOrDiscardedError, 

1154 ): 

1155 # Empty directory or everything excluded by default; ignore the error 

1156 pass 

1157 

1158 

1159def _add_build_install_dirs_to_per_package_search_dirs( 

1160 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1161 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]], 

1162 as_path: Callable[[str], VirtualPath], 

1163) -> None: 

1164 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set() 

1165 for dest_dir, for_packages in build_system_install_dirs: 

1166 dest_path = as_path(dest_dir) 

1167 for pkg in for_packages: 

1168 seen_key = (pkg, dest_dir) 

1169 if seen_key in seen_pp_search_dirs: 

1170 continue 

1171 seen_pp_search_dirs.add(seen_key) 

1172 if pkg not in per_package_search_dirs: 

1173 per_package_search_dirs[pkg] = [dest_path] 

1174 else: 

1175 per_package_search_dirs[pkg].append(dest_path) 

1176 

1177 

1178class HighLevelManifest: 

1179 def __init__( 

1180 self, 

1181 manifest_path: str, 

1182 mutable_manifest: MutableYAMLManifest | None, 

1183 remove_during_clean_rules: list[FileSystemMatchRule], 

1184 install_rules: list[InstallRule] | None, 

1185 source_package: SourcePackage, 

1186 binary_packages: Mapping[str, BinaryPackage], 

1187 substitution: Substitution, 

1188 package_transformations: Mapping[str, PackageTransformationDefinition], 

1189 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1190 dpkg_arch_query_table: DpkgArchTable, 

1191 build_env: DebBuildOptionsAndProfiles, 

1192 build_environments: BuildEnvironments, 

1193 build_rules: list[BuildRule] | None, 

1194 value_table: Mapping[ 

1195 tuple[SourcePackage | BinaryPackage, type[Any]], 

1196 Any, 

1197 ], 

1198 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1199 debian_dir: VirtualPath, 

1200 ) -> None: 

1201 self.manifest_path = manifest_path 

1202 self.mutable_manifest = mutable_manifest 

1203 self._remove_during_clean_rules: list[FileSystemMatchRule] = ( 

1204 remove_during_clean_rules 

1205 ) 

1206 self._install_rules = install_rules 

1207 self.source_package = source_package 

1208 self._binary_packages = binary_packages 

1209 self.substitution = substitution 

1210 self.package_transformations = package_transformations 

1211 self._dpkg_architecture_variables = dpkg_architecture_variables 

1212 self.dpkg_arch_query_table = dpkg_arch_query_table 

1213 self._build_env = build_env 

1214 self._used_for: set[str] = set() 

1215 self.build_environments = build_environments 

1216 self.build_rules = build_rules 

1217 self._value_table = value_table 

1218 self._plugin_provided_feature_set = plugin_provided_feature_set 

1219 self._debian_dir = debian_dir 

1220 self._source_condition_context = ConditionContext( 

1221 binary_package=None, 

1222 substitution=self.substitution, 

1223 deb_options_and_profiles=self._build_env, 

1224 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1225 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1226 ) 

1227 

1228 def source_version(self, include_binnmu_version: bool = True) -> str: 

1229 # TODO: There should an easier way to determine the source version; really. 

1230 version_var = "{{DEB_VERSION}}" 

1231 if not include_binnmu_version: 

1232 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1233 try: 

1234 return self.substitution.substitute( 

1235 version_var, "internal (resolve version)" 

1236 ) 

1237 except DebputySubstitutionError as e: 

1238 raise AssertionError(f"Could not resolve {version_var}") from e 

1239 

1240 @property 

1241 def source_condition_context(self) -> ConditionContext: 

1242 return self._source_condition_context 

1243 

1244 @property 

1245 def debian_dir(self) -> VirtualPath: 

1246 return self._debian_dir 

1247 

1248 @property 

1249 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1250 return self._dpkg_architecture_variables 

1251 

1252 @property 

1253 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1254 return self._build_env 

1255 

1256 @property 

1257 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1258 return self._plugin_provided_feature_set 

1259 

1260 @property 

1261 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]: 

1262 return self._remove_during_clean_rules 

1263 

1264 @property 

1265 def active_packages(self) -> Iterable[BinaryPackage]: 

1266 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1267 

1268 @property 

1269 def all_packages(self) -> Iterable[BinaryPackage]: 

1270 yield from self._binary_packages.values() 

1271 

1272 def manifest_configuration[T]( 

1273 self, 

1274 context_package: SourcePackage | BinaryPackage, 

1275 value_type: type[T], 

1276 ) -> T | None: 

1277 res = self._value_table.get((context_package, value_type)) 

1278 return typing.cast("T | None", res) 

1279 

1280 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1281 return self.package_transformations[package] 

1282 

1283 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1284 name = package.name 

1285 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"): 

1286 doc_main_package_name = package.fields.get(doc_main_field) 

1287 if doc_main_package_name: 1287 ↛ 1288line 1287 didn't jump to line 1288 because the condition on line 1287 was never true

1288 main_package = self._binary_packages.get(doc_main_package_name) 

1289 if main_package is None: 

1290 _error( 

1291 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control" 

1292 ) 

1293 return main_package 

1294 # If it is not a -doc package, then docs should be installed 

1295 # under its own package name. 

1296 if not name.endswith("-doc"): 1296 ↛ 1298line 1296 didn't jump to line 1298 because the condition on line 1296 was always true

1297 return package 

1298 name = name[:-4] 

1299 main_package = self._binary_packages.get(name) 

1300 if main_package: 

1301 return main_package 

1302 if name.startswith("lib"): 

1303 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1304 if dev_pkg: 

1305 return dev_pkg 

1306 

1307 # If we found no better match; default to the doc package itself. 

1308 return package 

1309 

1310 def perform_installations( 

1311 self, 

1312 integration_mode: DebputyIntegrationMode, 

1313 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]], 

1314 *, 

1315 install_request_context: InstallSearchDirContext | None = None, 

1316 ) -> PackageDataTable: 

1317 package_data_dict = {} 

1318 package_data_table = PackageDataTable(package_data_dict) 

1319 enable_manifest_installation_feature = ( 

1320 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1321 ) 

1322 

1323 if build_system_install_dirs: 1323 ↛ 1324line 1323 didn't jump to line 1324 because the condition on line 1323 was never true

1324 if integration_mode != INTEGRATION_MODE_FULL: 

1325 raise ValueError( 

1326 "The build_system_install_dirs parameter can only be used in full integration mode" 

1327 ) 

1328 if install_request_context: 

1329 raise ValueError( 

1330 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1331 " (not implemented)" 

1332 ) 

1333 

1334 if install_request_context is None: 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was never true

1335 

1336 @functools.lru_cache(None) 

1337 def _as_path(fs_path: str) -> VirtualPath: 

1338 return OSFSROOverlay.create_root_dir(".", fs_path) 

1339 

1340 dtmp_dir = _as_path("debian/tmp") 

1341 source_root_dir = _as_path(".") 

1342 into = frozenset(self._binary_packages.values()) 

1343 per_package_search_dirs = { 

1344 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1345 for t in self.package_transformations.values() 

1346 if t.search_dirs is not None 

1347 } 

1348 

1349 if integration_mode == INTEGRATION_MODE_FULL: 

1350 # In this mode, we have no default search dir. Everything ends up being 

1351 # per-package instead (since that is easier logic-wise). 

1352 # 

1353 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1354 # Note we still initialize dtmp_dir, since it affects a later guard. 

1355 default_search_dirs = [] 

1356 _add_build_install_dirs_to_per_package_search_dirs( 

1357 build_system_install_dirs, 

1358 per_package_search_dirs, 

1359 _as_path, 

1360 ) 

1361 

1362 # We can end here with per_package_search_dirs having no search dirs for any package 

1363 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1364 # 

1365 # This is not a problem in itself as the installation rules can still apply to the 

1366 # source root and there should be no reason to install something from d/<pkg> into 

1367 # d/<another-pkg> 

1368 else: 

1369 default_search_dirs = [dtmp_dir] 

1370 

1371 search_dirs = _determine_search_dir_order( 

1372 per_package_search_dirs, 

1373 into, 

1374 default_search_dirs, 

1375 source_root_dir, 

1376 ) 

1377 check_for_uninstalled_dirs = tuple( 

1378 s.search_dir 

1379 for s in search_dirs 

1380 if s.search_dir.fs_path != source_root_dir.fs_path 

1381 ) 

1382 if enable_manifest_installation_feature: 

1383 _present_installation_dirs( 

1384 search_dirs, check_for_uninstalled_dirs, into 

1385 ) 

1386 else: 

1387 dtmp_dir = None 

1388 search_dirs = install_request_context.search_dirs 

1389 into = frozenset(self._binary_packages.values()) 

1390 seen: set[BinaryPackage] = set() 

1391 for search_dir in search_dirs: 

1392 seen.update(search_dir.applies_to) 

1393 

1394 missing = into - seen 

1395 if missing: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true

1396 names = ", ".join(p.name for p in missing) 

1397 raise ValueError( 

1398 f"The following package(s) had no search dirs: {names}." 

1399 " (Generally, the source root would be applicable to all packages)" 

1400 ) 

1401 extra_names = seen - into 

1402 if extra_names: 1402 ↛ 1403line 1402 didn't jump to line 1403 because the condition on line 1402 was never true

1403 names = ", ".join(p.name for p in extra_names) 

1404 raise ValueError( 

1405 f"The install_request_context referenced the following unknown package(s): {names}" 

1406 ) 

1407 

1408 check_for_uninstalled_dirs = ( 

1409 install_request_context.check_for_uninstalled_dirs 

1410 ) 

1411 

1412 install_rule_context = InstallRuleContext(search_dirs) 

1413 

1414 if ( 1414 ↛ 1421line 1414 didn't jump to line 1421 because the condition on line 1414 was never true

1415 enable_manifest_installation_feature 

1416 and self._install_rules is None 

1417 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1418 and dtmp_dir is not None 

1419 and os.path.isdir(dtmp_dir.fs_path) 

1420 ): 

1421 msg = ( 

1422 "The build system appears to have provided the output of upstream build system's" 

1423 " install in debian/tmp. However, these are no provisions for debputy to install" 

1424 " any of that into any of the debian packages listed in debian/control." 

1425 " To avoid accidentally creating empty packages, debputy will insist that you " 

1426 " explicitly define an empty installation definition if you did not want to " 

1427 " install any of those files even though they have been provided." 

1428 ' Example: "installations: []"' 

1429 ) 

1430 _error(msg) 

1431 elif ( 1431 ↛ 1434line 1431 didn't jump to line 1434 because the condition on line 1431 was never true

1432 not enable_manifest_installation_feature and self._install_rules is not None 

1433 ): 

1434 _error( 

1435 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1436 f" Please remove or comment out the `installations` keyword." 

1437 ) 

1438 

1439 for dctrl_bin in self.all_packages: 

1440 package = dctrl_bin.name 

1441 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1442 

1443 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1444 dctrl_bin, 

1445 FSRootDir(), 

1446 doc_main_package, 

1447 ) 

1448 

1449 if enable_manifest_installation_feature: 1449 ↛ 1454line 1449 didn't jump to line 1454 because the condition on line 1449 was always true

1450 discard_rules = list( 

1451 self.plugin_provided_feature_set.auto_discard_rules.values() 

1452 ) 

1453 else: 

1454 discard_rules = [ 

1455 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1456 ] 

1457 path_matcher = SourcePathMatcher(discard_rules) 

1458 

1459 source_condition_context = self._source_condition_context 

1460 

1461 for dctrl_bin in self.active_packages: 

1462 package = dctrl_bin.name 

1463 if install_request_context: 1463 ↛ 1468line 1463 didn't jump to line 1468 because the condition on line 1463 was always true

1464 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1465 package 

1466 ) 

1467 else: 

1468 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1469 if os.path.isdir(build_system_staging_dir_fs_path): 

1470 build_system_staging_dir = OSFSROOverlay.create_root_dir( 

1471 ".", 

1472 build_system_staging_dir_fs_path, 

1473 ) 

1474 else: 

1475 build_system_staging_dir = None 

1476 

1477 if build_system_staging_dir is not None: 

1478 _install_everything_from_source_dir_if_present( 

1479 dctrl_bin, 

1480 self.substitution, 

1481 path_matcher, 

1482 install_rule_context, 

1483 source_condition_context, 

1484 build_system_staging_dir, 

1485 ) 

1486 

1487 if self._install_rules: 

1488 # FIXME: Check that every install rule remains used after transformations have run. 

1489 # What we want to check is transformations do not exclude everything from an install 

1490 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1491 # match. 

1492 for install_rule in self._install_rules: 

1493 install_rule.perform_install( 

1494 path_matcher, 

1495 install_rule_context, 

1496 source_condition_context, 

1497 ) 

1498 

1499 if enable_manifest_installation_feature: 1499 ↛ 1503line 1499 didn't jump to line 1503 because the condition on line 1499 was always true

1500 for search_dir in check_for_uninstalled_dirs: 

1501 _detect_missing_installations(path_matcher, search_dir) 

1502 

1503 for dctrl_bin in self.all_packages: 

1504 package = dctrl_bin.name 

1505 binary_install_rule_context = install_rule_context[package] 

1506 build_system_pkg_staging_dir = os.path.join("debian", package) 

1507 fs_root = binary_install_rule_context.fs_root 

1508 

1509 context = self.package_transformations[package] 

1510 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1511 for special_install_rule in context.install_rules: 1511 ↛ 1512line 1511 didn't jump to line 1512 because the loop on line 1511 never started

1512 special_install_rule.perform_install( 

1513 path_matcher, 

1514 install_rule_context, 

1515 source_condition_context, 

1516 ) 

1517 

1518 if dctrl_bin.should_be_acted_on: 

1519 self.apply_fs_transformations(package, fs_root) 

1520 substvars_file = f"debian/{package}.substvars" 

1521 substvars = FlushableSubstvars.load_from_path( 

1522 substvars_file, missing_ok=True 

1523 ) 

1524 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1525 substvars.substvars_path = None 

1526 else: 

1527 substvars = FlushableSubstvars() 

1528 

1529 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1530 if udeb_package and not udeb_package.is_udeb: 1530 ↛ 1531line 1530 didn't jump to line 1531 because the condition on line 1530 was never true

1531 udeb_package = None 

1532 

1533 package_metadata_context = PackageProcessingContextProvider( 

1534 self, 

1535 dctrl_bin, 

1536 udeb_package, 

1537 package_data_table, 

1538 ) 

1539 

1540 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1541 package_metadata_context, 

1542 substvars, 

1543 context.maintscript_snippets, 

1544 context.substitution, 

1545 ) 

1546 

1547 if not enable_manifest_installation_feature: 1547 ↛ 1548line 1547 didn't jump to line 1548 because the condition on line 1547 was never true

1548 assert_no_dbgsym_migration(dctrl_bin) 

1549 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1550 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir( 

1551 "", 

1552 dh_dbgsym_root_fs, 

1553 ) 

1554 dbgsym_root_fs = FSRootDir() 

1555 _install_everything_from_source_dir_if_present( 

1556 dctrl_bin, 

1557 self.substitution, 

1558 path_matcher, 

1559 install_rule_context, 

1560 source_condition_context, 

1561 dh_dbgsym_root_path, 

1562 into_dir=dbgsym_root_fs, 

1563 ) 

1564 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1565 dbgsym_info = DbgsymInfo( 

1566 dctrl_bin, 

1567 dbgsym_root_fs, 

1568 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1569 dbgsym_build_ids, 

1570 # TODO: Provide manifest feature to support this. 

1571 False, 

1572 ) 

1573 else: 

1574 dbgsym_info = DbgsymInfo( 

1575 dctrl_bin, 

1576 FSRootDir(), 

1577 None, 

1578 [], 

1579 False, 

1580 ) 

1581 

1582 package_data_dict[package] = BinaryPackageData( 

1583 self.source_package, 

1584 dctrl_bin, 

1585 build_system_pkg_staging_dir, 

1586 fs_root, 

1587 substvars, 

1588 package_metadata_context, 

1589 ctrl_creator, 

1590 dbgsym_info, 

1591 ) 

1592 

1593 if enable_manifest_installation_feature: 1593 ↛ 1596line 1593 didn't jump to line 1596 because the condition on line 1593 was always true

1594 _list_automatic_discard_rules(path_matcher) 

1595 

1596 return package_data_table 

1597 

1598 def condition_context( 

1599 self, binary_package: BinaryPackage | str | None 

1600 ) -> ConditionContext: 

1601 if binary_package is None: 1601 ↛ 1602line 1601 didn't jump to line 1602 because the condition on line 1601 was never true

1602 return self._source_condition_context 

1603 if not isinstance(binary_package, str): 

1604 binary_package = binary_package.name 

1605 

1606 package_transformation = self.package_transformations[binary_package] 

1607 return self._source_condition_context.replace( 

1608 binary_package=package_transformation.binary_package, 

1609 substitution=package_transformation.substitution, 

1610 ) 

1611 

1612 def apply_fs_transformations( 

1613 self, 

1614 package: str, 

1615 fs_root: FSPath, 

1616 ) -> None: 

1617 if package in self._used_for: 1617 ↛ 1618line 1617 didn't jump to line 1618 because the condition on line 1617 was never true

1618 raise ValueError( 

1619 f"data.tar contents for {package} has already been finalized!?" 

1620 ) 

1621 if package not in self.package_transformations: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true

1622 raise ValueError( 

1623 f'The package "{package}" was not relevant for the manifest!?' 

1624 ) 

1625 package_transformation = self.package_transformations[package] 

1626 condition_context = ConditionContext( 

1627 binary_package=package_transformation.binary_package, 

1628 substitution=package_transformation.substitution, 

1629 deb_options_and_profiles=self._build_env, 

1630 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1631 dpkg_arch_query_table=self.dpkg_arch_query_table, 

1632 ) 

1633 norm_rules = list( 

1634 builtin_mode_normalization_rules( 

1635 self._dpkg_architecture_variables, 

1636 package_transformation.binary_package, 

1637 package_transformation.substitution, 

1638 ) 

1639 ) 

1640 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1641 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1642 for transformation in package_transformation.transformations: 

1643 transformation.run_transform_file_system(fs_root, condition_context) 

1644 interpreter_normalization = NormalizeShebangLineTransformation() 

1645 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1646 

1647 def finalize_data_tar_contents( 

1648 self, 

1649 package: str, 

1650 fs_root: FSPath, 

1651 clamp_mtime_to: int, 

1652 ) -> IntermediateManifest: 

1653 if package in self._used_for: 1653 ↛ 1654line 1653 didn't jump to line 1654 because the condition on line 1653 was never true

1654 raise ValueError( 

1655 f"data.tar contents for {package} has already been finalized!?" 

1656 ) 

1657 if package not in self.package_transformations: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true

1658 raise ValueError( 

1659 f'The package "{package}" was not relevant for the manifest!?' 

1660 ) 

1661 self._used_for.add(package) 

1662 

1663 # At this point, there so be no further mutations to the file system (because the will not 

1664 # be present in the intermediate manifest) 

1665 cast("FSRootDir", fs_root).is_read_write = False 

1666 

1667 intermediate_manifest = list( 

1668 _generate_intermediate_manifest( 

1669 fs_root, 

1670 clamp_mtime_to, 

1671 ) 

1672 ) 

1673 return intermediate_manifest 

1674 

1675 def apply_to_binary_staging_directory( 

1676 self, 

1677 package: str, 

1678 fs_root: FSPath, 

1679 clamp_mtime_to: int, 

1680 ) -> IntermediateManifest: 

1681 self.apply_fs_transformations(package, fs_root) 

1682 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1683 

1684 

1685@dataclasses.dataclass(slots=True) 

1686class SearchDirOrderState: 

1687 search_dir: VirtualPath 

1688 applies_to: set[BinaryPackage] | frozenset[BinaryPackage] = dataclasses.field( 

1689 default_factory=set 

1690 ) 

1691 after: set[str] = dataclasses.field(default_factory=set) 

1692 

1693 

1694def _present_installation_dirs( 

1695 search_dirs: Sequence[SearchDir], 

1696 checked_missing_dirs: Sequence[VirtualPath], 

1697 all_pkgs: frozenset[BinaryPackage], 

1698) -> None: 

1699 _info("The following directories are considered search dirs (in order):") 

1700 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1701 for search_dir in search_dirs: 

1702 applies_to = "" 

1703 if search_dir.applies_to < all_pkgs: 

1704 names = ", ".join(p.name for p in search_dir.applies_to) 

1705 applies_to = f" [only applicable to: {names}]" 

1706 remark = "" 

1707 if not os.path.isdir(search_dir.search_dir.fs_path): 

1708 remark = " (skipped; absent)" 

1709 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1710 

1711 if checked_missing_dirs: 

1712 _info('The following directories are considered for "not-installed" paths;') 

1713 for d in checked_missing_dirs: 

1714 remark = "" 

1715 if not os.path.isdir(d.fs_path): 

1716 remark = " (skipped; absent)" 

1717 _info(f" * {d.fs_path:{max_len}}{remark}") 

1718 

1719 

1720def _determine_search_dir_order( 

1721 requested: Mapping[BinaryPackage, list[VirtualPath]], 

1722 all_pkgs: frozenset[BinaryPackage], 

1723 default_search_dirs: list[VirtualPath], 

1724 source_root: VirtualPath, 

1725) -> Sequence[SearchDir]: 

1726 search_dir_table = {} 

1727 assert requested.keys() <= all_pkgs 

1728 for pkg in all_pkgs: 

1729 paths = requested.get(pkg, default_search_dirs) 

1730 previous_search_dir: SearchDirOrderState | None = None 

1731 for path in paths: 

1732 try: 

1733 search_dir_state = search_dir_table[path.fs_path] 

1734 except KeyError: 

1735 search_dir_state = SearchDirOrderState(path) 

1736 search_dir_table[path.fs_path] = search_dir_state 

1737 search_dir_state.applies_to.add(pkg) 

1738 if previous_search_dir is not None: 

1739 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1740 previous_search_dir = search_dir_state 

1741 

1742 search_dirs_in_order = [] 

1743 released = set() 

1744 remaining = set() 

1745 for search_dir_state in search_dir_table.values(): 

1746 if not (search_dir_state.after <= released): 

1747 remaining.add(search_dir_state.search_dir.fs_path) 

1748 continue 

1749 search_dirs_in_order.append(search_dir_state) 

1750 released.add(search_dir_state.search_dir.fs_path) 

1751 

1752 while remaining: 

1753 current_released = len(released) 

1754 for fs_path in remaining: 

1755 search_dir_state = search_dir_table[fs_path] 

1756 if not search_dir_state.after.issubset(released): 

1757 remaining.add(search_dir_state.search_dir.fs_path) 

1758 continue 

1759 search_dirs_in_order.append(search_dir_state) 

1760 released.add(search_dir_state.search_dir.fs_path) 

1761 

1762 if current_released == len(released): 

1763 names = ", ".join(remaining) 

1764 _error( 

1765 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1766 " Note that the search directories across all packages have to be ordered (and the" 

1767 " source root should generally be last)" 

1768 ) 

1769 remaining -= released 

1770 

1771 search_dirs_in_order.append( 

1772 SearchDirOrderState( 

1773 source_root, 

1774 all_pkgs, 

1775 ) 

1776 ) 

1777 

1778 return tuple( 

1779 # Avoid duplicating all_pkgs 

1780 SearchDir( 

1781 s.search_dir, 

1782 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1783 ) 

1784 for s in search_dirs_in_order 

1785 )