Coverage for src/debputy/highlevel_manifest.py: 64%

853 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-07-15 05:36 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5from contextlib import suppress 

6from dataclasses import dataclass, field 

7from typing import ( 

8 List, 

9 Dict, 

10 Iterable, 

11 Mapping, 

12 Any, 

13 Union, 

14 Optional, 

15 TypeVar, 

16 Generic, 

17 cast, 

18 Set, 

19 Tuple, 

20 Sequence, 

21 FrozenSet, 

22 Callable, 

23) 

24 

25from debian.debian_support import DpkgArchTable 

26 

27from debputy.dh.debhelper_emulation import ( 

28 dhe_dbgsym_root_dir, 

29 assert_no_dbgsym_migration, 

30 read_dbgsym_file, 

31) 

32from ._deb_options_profiles import DebBuildOptionsAndProfiles 

33from ._manifest_constants import * 

34from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

35from .builtin_manifest_rules import builtin_mode_normalization_rules 

36from .exceptions import ( 

37 DebputySubstitutionError, 

38 DebputyRuntimeErrorWithPreamble, 

39) 

40from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir 

41from .installations import ( 

42 InstallRule, 

43 SourcePathMatcher, 

44 PathAlreadyInstalledOrDiscardedError, 

45 NoMatchForInstallPatternError, 

46 InstallRuleContext, 

47 BinaryPackageInstallRuleContext, 

48 InstallSearchDirContext, 

49 SearchDir, 

50) 

51from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

52from .maintscript_snippet import ( 

53 DpkgMaintscriptHelperCommand, 

54 MaintscriptSnippetContainer, 

55) 

56from .manifest_conditions import ConditionContext 

57from .manifest_parser.base_types import ( 

58 FileSystemMatchRule, 

59 FileSystemExactMatchRule, 

60 BuildEnvironments, 

61) 

62from .manifest_parser.util import AttributePath 

63from .packager_provided_files import PackagerProvidedFile 

64from .packages import BinaryPackage, SourcePackage 

65from .plugin.api.feature_set import PluginProvidedFeatureSet 

66from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

67from .plugin.api.impl_types import ( 

68 PackageProcessingContextProvider, 

69 PackageDataTable, 

70) 

71from .plugin.api.spec import ( 

72 FlushableSubstvars, 

73 VirtualPath, 

74 DebputyIntegrationMode, 

75 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

76 INTEGRATION_MODE_FULL, 

77) 

78from debputy.plugins.debputy.binary_package_rules import ServiceRule 

79from debputy.plugins.debputy.build_system_rules import BuildRule 

80from .plugin.plugin_state import run_in_context_of_plugin 

81from .substitution import Substitution 

82from .transformation_rules import ( 

83 TransformationRule, 

84 ModeNormalizationTransformationRule, 

85 NormalizeShebangLineTransformation, 

86) 

87from .util import ( 

88 _error, 

89 _warn, 

90 debian_policy_normalize_symlink_target, 

91 generated_content_dir, 

92 _info, 

93) 

94from .yaml import MANIFEST_YAML 

95from .yaml.compat import CommentedMap, CommentedSeq 

96 

97 

98class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

99 

100 @property 

101 def unmatched_paths(self) -> Sequence[VirtualPath]: 

102 return self.args[1] 

103 

104 @property 

105 def search_dir(self) -> VirtualPath: 

106 return self.args[2] 

107 

108 def render_preamble(self) -> None: 

109 _warn( 

110 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

111 ) 

112 _warn("") 

113 for entry in self.unmatched_paths: 

114 desc = _describe_missing_path(entry) 

115 _warn(f" * {desc}") 

116 _warn("") 

117 

118 

119@dataclass(slots=True) 

120class DbgsymInfo: 

121 binary_package: BinaryPackage 

122 dbgsym_fs_root: FSPath 

123 _dbgsym_root_fs: Optional[str] 

124 dbgsym_ids: List[str] 

125 

126 @property 

127 def dbgsym_root_dir(self) -> str: 

128 root_dir = self._dbgsym_root_fs 

129 if root_dir is None: 

130 root_dir = generated_content_dir( 

131 package=self.binary_package, 

132 subdir_key="dbgsym-fs-root", 

133 ) 

134 self._dbgsym_root_fs = root_dir 

135 return root_dir 

136 

137 @property 

138 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

139 return FSControlRootDir.create_root_dir( 

140 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

141 ) 

142 

143 

144@dataclass(slots=True, frozen=True) 

145class BinaryPackageData: 

146 source_package: SourcePackage 

147 binary_package: BinaryPackage 

148 binary_staging_root_dir: str 

149 fs_root: FSPath 

150 substvars: FlushableSubstvars 

151 package_metadata_context: PackageProcessingContextProvider 

152 ctrl_creator: BinaryCtrlAccessorProviderCreator 

153 dbgsym_info: DbgsymInfo 

154 

155 @property 

156 def control_output_dir(self) -> FSControlRootDir: 

157 return FSControlRootDir.create_root_dir( 

158 generated_content_dir( 

159 package=self.binary_package, 

160 subdir_key="DEBIAN", 

161 ) 

162 ) 

163 

164 

165@dataclass(slots=True) 

166class PackageTransformationDefinition: 

167 binary_package: BinaryPackage 

168 substitution: Substitution 

169 is_auto_generated_package: bool 

170 binary_version: Optional[str] = None 

171 search_dirs: Optional[List[FileSystemExactMatchRule]] = None 

172 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field( 

173 default_factory=list 

174 ) 

175 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field( 

176 default_factory=dict 

177 ) 

178 transformations: List[TransformationRule] = field(default_factory=list) 

179 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field( 

180 default_factory=dict 

181 ) 

182 install_rules: List[InstallRule] = field(default_factory=list) 

183 requested_service_rules: List[ServiceRule] = field(default_factory=list) 

184 

185 

186def _path_to_tar_member( 

187 path: FSPath, 

188 clamp_mtime_to: int, 

189) -> TarMember: 

190 mtime = float(clamp_mtime_to) 

191 owner, uid, group, gid = path.tar_owner_info 

192 mode = path.mode 

193 

194 if path.has_fs_path: 

195 mtime = min(mtime, path.mtime) 

196 

197 if path.is_dir: 

198 path_type = PathType.DIRECTORY 

199 elif path.is_file: 

200 # TODO: someday we will need to deal with hardlinks and it might appear here. 

201 path_type = PathType.FILE 

202 elif path.is_symlink: 202 ↛ 222line 202 didn't jump to line 222 because the condition on line 202 was always true

203 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

204 link_target = debian_policy_normalize_symlink_target( 

205 path.path, 

206 path.readlink(), 

207 ) 

208 return TarMember.virtual_path( 

209 path.tar_path, 

210 PathType.SYMLINK, 

211 mtime, 

212 link_target=link_target, 

213 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

214 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

215 mode=0o0777, 

216 owner=owner, 

217 uid=uid, 

218 group=group, 

219 gid=gid, 

220 ) 

221 else: 

222 assert not path.is_symlink 

223 raise AssertionError( 

224 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

225 ) 

226 

227 if not path.has_fs_path: 

228 assert not path.is_file 

229 return TarMember.virtual_path( 

230 path.tar_path, 

231 path_type, 

232 mtime, 

233 mode=mode, 

234 owner=owner, 

235 uid=uid, 

236 group=group, 

237 gid=gid, 

238 ) 

239 may_steal_fs_path = path._can_replace_inline 

240 return TarMember.from_file( 

241 path.tar_path, 

242 path.fs_path, 

243 mode=mode, 

244 uid=uid, 

245 owner=owner, 

246 gid=gid, 

247 group=group, 

248 path_type=path_type, 

249 path_mtime=mtime, 

250 clamp_mtime_to=clamp_mtime_to, 

251 may_steal_fs_path=may_steal_fs_path, 

252 ) 

253 

254 

255def _generate_intermediate_manifest( 

256 fs_root: FSPath, 

257 clamp_mtime_to: int, 

258) -> Iterable[TarMember]: 

259 symlinks = [] 

260 for path in fs_root.all_paths(): 

261 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

262 if tar_member.path_type == PathType.SYMLINK: 

263 symlinks.append(tar_member) 

264 continue 

265 yield tar_member 

266 yield from symlinks 

267 

268 

269ST = TypeVar("ST") 

270T = TypeVar("T") 

271 

272 

273class AbstractYAMLSubStore(Generic[ST]): 

274 def __init__( 

275 self, 

276 parent_store: Any, 

277 parent_key: Optional[Union[int, str]], 

278 store: Optional[ST] = None, 

279 ) -> None: 

280 if parent_store is not None and parent_key is not None: 

281 try: 

282 from_parent_store = parent_store[parent_key] 

283 except (KeyError, IndexError): 

284 from_parent_store = None 

285 if ( 285 ↛ 290line 285 didn't jump to line 290 because the condition on line 285 was never true

286 store is not None 

287 and from_parent_store is not None 

288 and store is not parent_store 

289 ): 

290 raise ValueError( 

291 "Store is provided but is not the one already in the parent store" 

292 ) 

293 if store is None: 293 ↛ 295line 293 didn't jump to line 295 because the condition on line 293 was always true

294 store = from_parent_store 

295 self._parent_store = parent_store 

296 self._parent_key = parent_key 

297 self._is_detached = ( 

298 parent_key is None or parent_store is None or parent_key not in parent_store 

299 ) 

300 assert self._is_detached or store is not None 

301 if store is None: 

302 store = self._create_new_instance() 

303 self._store: ST = store 

304 

305 def _create_new_instance(self) -> ST: 

306 raise NotImplementedError 

307 

308 def create_definition_if_missing(self) -> None: 

309 if self._is_detached: 

310 self.create_definition() 

311 

312 def create_definition(self) -> None: 

313 if not self._is_detached: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise RuntimeError("Definition is already present") 

315 parent_store = self._parent_store 

316 if parent_store is None: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise RuntimeError( 

318 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

319 ) 

320 if isinstance(parent_store, list): 

321 assert self._parent_key is None 

322 self._parent_key = len(parent_store) 

323 self._parent_store.append(self._store) 

324 else: 

325 parent_store[self._parent_key] = self._store 

326 self._is_detached = False 

327 

328 def remove_definition(self) -> None: 

329 self._ensure_attached() 

330 del self._parent_store[self._parent_key] 

331 if isinstance(self._parent_store, list): 

332 self._parent_key = None 

333 self._is_detached = True 

334 

335 def _ensure_attached(self) -> None: 

336 if self._is_detached: 

337 raise RuntimeError("The definition has been removed!") 

338 

339 

340class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]): 

341 def _create_new_instance(self) -> List[T]: 

342 return CommentedSeq() 

343 

344 

345class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]): 

346 def _create_new_instance(self) -> Dict[str, T]: 

347 return CommentedMap() 

348 

349 

350class MutableCondition: 

351 @classmethod 

352 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

353 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

354 

355 @classmethod 

356 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

357 return CommentedMap( 

358 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

359 ) 

360 

361 

362class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

363 @classmethod 

364 def new_symlink( 

365 cls, link_path: str, link_target: str, condition: Optional[Any] 

366 ) -> "MutableYAMLSymlink": 

367 inner = { 

368 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

369 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

370 } 

371 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

372 if condition is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 inner["when"] = condition 

374 return cls(None, None, store=CommentedMap(content)) 

375 

376 @property 

377 def symlink_path(self) -> str: 

378 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

379 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

380 ] 

381 

382 @symlink_path.setter 

383 def symlink_path(self, path: str) -> None: 

384 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

385 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

386 ] = path 

387 

388 @property 

389 def symlink_target(self) -> Optional[str]: 

390 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

391 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

392 ] 

393 

394 @symlink_target.setter 

395 def symlink_target(self, target: str) -> None: 

396 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

397 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

398 ] = target 

399 

400 

401class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

402 @classmethod 

403 def rm_conffile( 

404 cls, 

405 conffile: str, 

406 prior_to_version: Optional[str], 

407 owning_package: Optional[str], 

408 ) -> "MutableYAMLConffileManagementItem": 

409 r = cls( 

410 None, 

411 None, 

412 store=CommentedMap( 

413 { 

414 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

415 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

416 ) 

417 } 

418 ), 

419 ) 

420 r.prior_to_version = prior_to_version 

421 r.owning_package = owning_package 

422 return r 

423 

424 @classmethod 

425 def mv_conffile( 

426 cls, 

427 old_conffile: str, 

428 new_conffile: str, 

429 prior_to_version: Optional[str], 

430 owning_package: Optional[str], 

431 ) -> "MutableYAMLConffileManagementItem": 

432 r = cls( 

433 None, 

434 None, 

435 store=CommentedMap( 

436 { 

437 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

438 { 

439 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

440 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

441 } 

442 ) 

443 } 

444 ), 

445 ) 

446 r.prior_to_version = prior_to_version 

447 r.owning_package = owning_package 

448 return r 

449 

450 @property 

451 def _container(self) -> Dict[str, Any]: 

452 assert len(self._store) == 1 

453 return next(iter(self._store.values())) 

454 

455 @property 

456 def command(self) -> str: 

457 assert len(self._store) == 1 

458 return next(iter(self._store)) 

459 

460 @property 

461 def obsolete_conffile(self) -> str: 

462 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

463 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

464 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

465 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

466 

467 @obsolete_conffile.setter 

468 def obsolete_conffile(self, value: str) -> None: 

469 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

470 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

471 else: 

472 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

473 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

474 

475 @property 

476 def new_conffile(self) -> str: 

477 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

478 raise TypeError( 

479 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

480 f" This is a {self.command}" 

481 ) 

482 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

483 

484 @new_conffile.setter 

485 def new_conffile(self, value: str) -> None: 

486 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

487 raise TypeError( 

488 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

489 f" This is a {self.command}" 

490 ) 

491 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

492 

493 @property 

494 def prior_to_version(self) -> Optional[str]: 

495 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

496 

497 @prior_to_version.setter 

498 def prior_to_version(self, value: Optional[str]) -> None: 

499 if value is None: 

500 try: 

501 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

502 except KeyError: 

503 pass 

504 else: 

505 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

506 

507 @property 

508 def owning_package(self) -> Optional[str]: 

509 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

510 

511 @owning_package.setter 

512 def owning_package(self, value: Optional[str]) -> None: 

513 if value is None: 

514 try: 

515 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

516 except KeyError: 

517 pass 

518 else: 

519 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

520 

521 

522class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

523 def _list_store( 

524 self, key, *, create_if_absent: bool = False 

525 ) -> Optional[List[Dict[str, Any]]]: 

526 if self._is_detached or key not in self._store: 

527 if create_if_absent: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 return None 

529 self.create_definition_if_missing() 

530 self._store[key] = [] 

531 return self._store[key] 

532 

533 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

534 parent_store = self._list_store(key, create_if_absent=True) 

535 assert parent_store is not None 

536 if not item._is_detached or ( 536 ↛ 539line 536 didn't jump to line 539 because the condition on line 536 was never true

537 item._parent_store is not None and item._parent_store is not parent_store 

538 ): 

539 raise RuntimeError( 

540 "Item is already attached or associated with a different container" 

541 ) 

542 item._parent_store = parent_store 

543 item.create_definition() 

544 

545 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

546 self._insert_item(MK_TRANSFORMATIONS, symlink) 

547 

548 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

549 store = self._list_store(MK_TRANSFORMATIONS) 

550 if store is None: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true

551 return 

552 for i in range(len(store)): 552 ↛ 553line 552 didn't jump to line 553 because the loop on line 552 never started

553 d = store[i] 

554 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

555 yield MutableYAMLSymlink(store, i) 

556 

557 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

558 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

559 if store is None: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 return 

561 yield from ( 

562 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

563 ) 

564 

565 def add_conffile_management( 

566 self, conffile_management_item: MutableYAMLConffileManagementItem 

567 ) -> None: 

568 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

569 

570 

571class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

572 @property 

573 def _container(self) -> Dict[str, Any]: 

574 assert len(self._store) == 1 

575 return next(iter(self._store.values())) 

576 

577 @property 

578 def into(self) -> Optional[List[str]]: 

579 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

580 if v is None: 

581 return None 

582 if isinstance(v, str): 

583 return [v] 

584 return v 

585 

586 @into.setter 

587 def into(self, new_value: Optional[Union[str, List[str]]]) -> None: 

588 if new_value is None: 588 ↛ 592line 588 didn't jump to line 592 because the condition on line 588 was always true

589 with suppress(KeyError): 

590 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

591 return 

592 if isinstance(new_value, str): 

593 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

594 return 

595 new_list = CommentedSeq(new_value) 

596 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

597 

598 @property 

599 def when(self) -> Optional[Union[str, Mapping[str, Any]]]: 

600 return self._container[MK_CONDITION_WHEN] 

601 

602 @when.setter 

603 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None: 

604 if new_value is None: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 with suppress(KeyError): 

606 del self._container[MK_CONDITION_WHEN] 

607 return 

608 if isinstance(new_value, str): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 self._container[MK_CONDITION_WHEN] = new_value 

610 return 

611 new_map = CommentedMap(new_value) 

612 self._container[MK_CONDITION_WHEN] = new_map 

613 

614 @classmethod 

615 def install_dest( 

616 cls, 

617 sources: Union[str, List[str]], 

618 into: Optional[Union[str, List[str]]], 

619 *, 

620 dest_dir: Optional[str] = None, 

621 when: Optional[Union[str, Mapping[str, Any]]] = None, 

622 ) -> "MutableYAMLInstallRuleInstall": 

623 k = MK_INSTALLATIONS_INSTALL_SOURCES 

624 if isinstance(sources, str): 

625 k = MK_INSTALLATIONS_INSTALL_SOURCE 

626 r = MutableYAMLInstallRuleInstall( 

627 None, 

628 None, 

629 store=CommentedMap( 

630 { 

631 MK_INSTALLATIONS_INSTALL: CommentedMap( 

632 { 

633 k: sources, 

634 } 

635 ) 

636 } 

637 ), 

638 ) 

639 r.dest_dir = dest_dir 

640 r.into = into 

641 if when is not None: 

642 r.when = when 

643 return r 

644 

645 @classmethod 

646 def multi_dest_install( 

647 cls, 

648 sources: Union[str, List[str]], 

649 dest_dirs: Sequence[str], 

650 into: Optional[Union[str, List[str]]], 

651 *, 

652 when: Optional[Union[str, Mapping[str, Any]]] = None, 

653 ) -> "MutableYAMLInstallRuleInstall": 

654 k = MK_INSTALLATIONS_INSTALL_SOURCES 

655 if isinstance(sources, str): 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true

656 k = MK_INSTALLATIONS_INSTALL_SOURCE 

657 r = MutableYAMLInstallRuleInstall( 

658 None, 

659 None, 

660 store=CommentedMap( 

661 { 

662 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

663 { 

664 k: sources, 

665 "dest-dirs": dest_dirs, 

666 } 

667 ) 

668 } 

669 ), 

670 ) 

671 r.into = into 

672 if when is not None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 r.when = when 

674 return r 

675 

676 @classmethod 

677 def install_as( 

678 cls, 

679 source: str, 

680 install_as: str, 

681 into: Optional[Union[str, List[str]]], 

682 when: Optional[Union[str, Mapping[str, Any]]] = None, 

683 ) -> "MutableYAMLInstallRuleInstall": 

684 r = MutableYAMLInstallRuleInstall( 

685 None, 

686 None, 

687 store=CommentedMap( 

688 { 

689 MK_INSTALLATIONS_INSTALL: CommentedMap( 

690 { 

691 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

692 MK_INSTALLATIONS_INSTALL_AS: install_as, 

693 } 

694 ) 

695 } 

696 ), 

697 ) 

698 r.into = into 

699 if when is not None: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 r.when = when 

701 return r 

702 

703 @classmethod 

704 def install_doc_as( 

705 cls, 

706 source: str, 

707 install_as: str, 

708 into: Optional[Union[str, List[str]]], 

709 when: Optional[Union[str, Mapping[str, Any]]] = None, 

710 ) -> "MutableYAMLInstallRuleInstall": 

711 r = MutableYAMLInstallRuleInstall( 

712 None, 

713 None, 

714 store=CommentedMap( 

715 { 

716 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

717 { 

718 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

719 MK_INSTALLATIONS_INSTALL_AS: install_as, 

720 } 

721 ) 

722 } 

723 ), 

724 ) 

725 r.into = into 

726 if when is not None: 

727 r.when = when 

728 return r 

729 

730 @classmethod 

731 def install_docs( 

732 cls, 

733 sources: Union[str, List[str]], 

734 into: Optional[Union[str, List[str]]], 

735 *, 

736 dest_dir: Optional[str] = None, 

737 when: Optional[Union[str, Mapping[str, Any]]] = None, 

738 ) -> "MutableYAMLInstallRuleInstall": 

739 k = MK_INSTALLATIONS_INSTALL_SOURCES 

740 if isinstance(sources, str): 

741 k = MK_INSTALLATIONS_INSTALL_SOURCE 

742 r = MutableYAMLInstallRuleInstall( 

743 None, 

744 None, 

745 store=CommentedMap( 

746 { 

747 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

748 { 

749 k: sources, 

750 } 

751 ) 

752 } 

753 ), 

754 ) 

755 r.into = into 

756 r.dest_dir = dest_dir 

757 if when is not None: 

758 r.when = when 

759 return r 

760 

761 @classmethod 

762 def install_examples( 

763 cls, 

764 sources: Union[str, List[str]], 

765 into: Optional[Union[str, List[str]]], 

766 when: Optional[Union[str, Mapping[str, Any]]] = None, 

767 ) -> "MutableYAMLInstallRuleInstallExamples": 

768 k = MK_INSTALLATIONS_INSTALL_SOURCES 

769 if isinstance(sources, str): 

770 k = MK_INSTALLATIONS_INSTALL_SOURCE 

771 r = MutableYAMLInstallRuleInstallExamples( 

772 None, 

773 None, 

774 store=CommentedMap( 

775 { 

776 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

777 { 

778 k: sources, 

779 } 

780 ) 

781 } 

782 ), 

783 ) 

784 r.into = into 

785 if when is not None: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 r.when = when 

787 return r 

788 

789 @classmethod 

790 def install_man( 

791 cls, 

792 sources: Union[str, List[str]], 

793 into: Optional[Union[str, List[str]]], 

794 language: Optional[str], 

795 when: Optional[Union[str, Mapping[str, Any]]] = None, 

796 ) -> "MutableYAMLInstallRuleMan": 

797 k = MK_INSTALLATIONS_INSTALL_SOURCES 

798 if isinstance(sources, str): 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true

799 k = MK_INSTALLATIONS_INSTALL_SOURCE 

800 r = MutableYAMLInstallRuleMan( 

801 None, 

802 None, 

803 store=CommentedMap( 

804 { 

805 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

806 { 

807 k: sources, 

808 } 

809 ) 

810 } 

811 ), 

812 ) 

813 r.language = language 

814 r.into = into 

815 if when is not None: 815 ↛ 816line 815 didn't jump to line 816 because the condition on line 815 was never true

816 r.when = when 

817 return r 

818 

819 @classmethod 

820 def discard( 

821 cls, 

822 sources: Union[str, List[str]], 

823 ) -> "MutableYAMLInstallRuleDiscard": 

824 return MutableYAMLInstallRuleDiscard( 

825 None, 

826 None, 

827 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

828 ) 

829 

830 

831class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

832 pass 

833 

834 

835class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

836 @property 

837 def language(self) -> Optional[str]: 

838 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

839 

840 @language.setter 

841 def language(self, new_value: Optional[str]) -> None: 

842 if new_value is not None: 

843 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

844 return 

845 with suppress(KeyError): 

846 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

847 

848 

849class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

850 pass 

851 

852 

853class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

854 @property 

855 def sources(self) -> List[str]: 

856 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

857 if isinstance(v, str): 

858 return [v] 

859 return v 

860 

861 @sources.setter 

862 def sources(self, new_value: Union[str, List[str]]) -> None: 

863 if isinstance(new_value, str): 

864 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

865 return 

866 new_list = CommentedSeq(new_value) 

867 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

868 

869 @property 

870 def dest_dir(self) -> Optional[str]: 

871 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

872 

873 @dest_dir.setter 

874 def dest_dir(self, new_value: Optional[str]) -> None: 

875 if new_value is not None and self.dest_as is not None: 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true

876 raise ValueError( 

877 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

878 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

879 ) 

880 if new_value is not None: 

881 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

882 else: 

883 with suppress(KeyError): 

884 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

885 

886 @property 

887 def dest_as(self) -> Optional[str]: 

888 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

889 

890 @dest_as.setter 

891 def dest_as(self, new_value: Optional[str]) -> None: 

892 if new_value is not None: 

893 if self.dest_dir is not None: 

894 raise ValueError( 

895 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

896 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

897 ) 

898 

899 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

900 if isinstance(sources, list): 

901 if len(sources) != 1: 

902 raise ValueError( 

903 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

904 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

905 ) 

906 self.sources = sources[0] 

907 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

908 else: 

909 with suppress(KeyError): 

910 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

911 

912 

913class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

914 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

915 parent_store = self._store 

916 if not install_rule._is_detached or ( 916 ↛ 920line 916 didn't jump to line 920 because the condition on line 916 was never true

917 install_rule._parent_store is not None 

918 and install_rule._parent_store is not parent_store 

919 ): 

920 raise RuntimeError( 

921 "Item is already attached or associated with a different container" 

922 ) 

923 self.create_definition_if_missing() 

924 install_rule._parent_store = parent_store 

925 install_rule.create_definition() 

926 

927 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

928 parent_store = self._store 

929 for install_rule in install_rules: 

930 if not install_rule._is_detached or ( 930 ↛ 934line 930 didn't jump to line 934 because the condition on line 930 was never true

931 install_rule._parent_store is not None 

932 and install_rule._parent_store is not parent_store 

933 ): 

934 raise RuntimeError( 

935 "Item is already attached or associated with a different container" 

936 ) 

937 self.create_definition_if_missing() 

938 install_rule._parent_store = parent_store 

939 install_rule.create_definition() 

940 

941 

942class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

943 @property 

944 def variables(self) -> Dict[str, Any]: 

945 return self._store 

946 

947 def __setitem__(self, key: str, value: Any) -> None: 

948 self._store[key] = value 

949 self.create_definition_if_missing() 

950 

951 

952class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

953 def manifest_variables( 

954 self, *, create_if_absent: bool = True 

955 ) -> MutableYAMLManifestVariables: 

956 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

957 if create_if_absent: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 d.create_definition_if_missing() 

959 return d 

960 

961 

962class MutableYAMLManifest: 

963 def __init__(self, store: Any) -> None: 

964 self._store = store 

965 

966 @classmethod 

967 def empty_manifest(cls) -> "MutableYAMLManifest": 

968 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

969 

970 @property 

971 def manifest_version(self) -> str: 

972 return self._store[MK_MANIFEST_VERSION] 

973 

974 @manifest_version.setter 

975 def manifest_version(self, version: str) -> None: 

976 if version not in SUPPORTED_MANIFEST_VERSIONS: 

977 raise ValueError("Unsupported version") 

978 self._store[MK_MANIFEST_VERSION] = version 

979 

980 def installations( 

981 self, 

982 *, 

983 create_if_absent: bool = True, 

984 ) -> MutableYAMLInstallationsDefinition: 

985 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

986 if create_if_absent: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true

987 d.create_definition_if_missing() 

988 return d 

989 

990 def manifest_definitions( 

991 self, 

992 *, 

993 create_if_absent: bool = True, 

994 ) -> MutableYAMLManifestDefinitions: 

995 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

996 if create_if_absent: 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true

997 d.create_definition_if_missing() 

998 return d 

999 

1000 def package( 

1001 self, name: str, *, create_if_absent: bool = True 

1002 ) -> MutableYAMLPackageDefinition: 

1003 if MK_PACKAGES not in self._store: 1003 ↛ 1005line 1003 didn't jump to line 1005 because the condition on line 1003 was always true

1004 self._store[MK_PACKAGES] = CommentedMap() 

1005 packages_store = self._store[MK_PACKAGES] 

1006 package = packages_store.get(name) 

1007 if package is None: 1007 ↛ 1014line 1007 didn't jump to line 1014 because the condition on line 1007 was always true

1008 if not create_if_absent: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 raise KeyError(name) 

1010 assert packages_store is not None 

1011 d = MutableYAMLPackageDefinition(packages_store, name) 

1012 d.create_definition() 

1013 else: 

1014 d = MutableYAMLPackageDefinition(packages_store, name) 

1015 return d 

1016 

1017 def write_to(self, fd) -> None: 

1018 MANIFEST_YAML.dump(self._store, fd) 

1019 

1020 

1021def _describe_missing_path(entry: VirtualPath) -> str: 

1022 if entry.is_dir: 

1023 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1024 if entry.is_symlink: 

1025 target = os.readlink(entry.fs_path) 

1026 return f"{entry.fs_path} (symlink; links to {target})" 

1027 if entry.is_file: 

1028 return f"{entry.fs_path} (file)" 

1029 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1030 

1031 

1032def _detect_missing_installations( 

1033 path_matcher: SourcePathMatcher, 

1034 search_dir: VirtualPath, 

1035) -> None: 

1036 if not search_dir.is_dir: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true

1037 return 

1038 missing = list(path_matcher.detect_missing(search_dir)) 

1039 if not missing: 

1040 return 

1041 

1042 excl = textwrap.dedent( 

1043 """\ 

1044 - discard: "*" 

1045 """ 

1046 ) 

1047 

1048 raise PathNotCoveredByInstallRulesError( 

1049 "Please review the list and add either install rules or exclusions to `installations` in" 

1050 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1051 f" end of your 'installations`:\n\n{excl}\n", 

1052 missing, 

1053 search_dir, 

1054 ) 

1055 

1056 

1057def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1058 used_discard_rules = path_matcher.used_auto_discard_rules 

1059 # Discard rules can match and then be overridden. In that case, they appear 

1060 # but have 0 matches. 

1061 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1062 return 

1063 _info("The following automatic discard rules were triggered:") 

1064 example_path: Optional[str] = None 

1065 for rule in sorted(used_discard_rules): 

1066 for fs_path in sorted(used_discard_rules[rule]): 

1067 if example_path is None: 1067 ↛ 1069line 1067 didn't jump to line 1069 because the condition on line 1067 was always true

1068 example_path = fs_path 

1069 _info(f" * {rule} -> {fs_path}") 

1070 assert example_path is not None 

1071 _info("") 

1072 _info( 

1073 "Note that some of these may have been overruled. The overrule detection logic is not" 

1074 ) 

1075 _info("100% reliable.") 

1076 _info("") 

1077 _info( 

1078 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1079 ) 

1080 _info(" installations:") 

1081 _info(" - install:") 

1082 _info(f" source: {example_path}") 

1083 

1084 

1085def _install_everything_from_source_dir_if_present( 

1086 dctrl_bin: BinaryPackage, 

1087 substitution: Substitution, 

1088 path_matcher: SourcePathMatcher, 

1089 install_rule_context: InstallRuleContext, 

1090 source_condition_context: ConditionContext, 

1091 source_dir: VirtualPath, 

1092 *, 

1093 into_dir: Optional[VirtualPath] = None, 

1094) -> None: 

1095 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1096 pkg_set = frozenset([dctrl_bin]) 

1097 install_rule = run_in_context_of_plugin( 

1098 "debputy", 

1099 InstallRule.install_dest, 

1100 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1101 None, 

1102 pkg_set, 

1103 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1104 None, 

1105 ) 

1106 pkg_search_dir: Tuple[SearchDir] = ( 

1107 SearchDir( 

1108 source_dir, 

1109 pkg_set, 

1110 ), 

1111 ) 

1112 replacements = { 

1113 "search_dirs": pkg_search_dir, 

1114 } 

1115 if into_dir is not None: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true

1116 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1117 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1118 binary_package_contexts[dctrl_bin.name] = updated 

1119 replacements["binary_package_contexts"] = binary_package_contexts 

1120 

1121 fake_install_rule_context = install_rule_context.replace(**replacements) 

1122 try: 

1123 install_rule.perform_install( 

1124 path_matcher, 

1125 fake_install_rule_context, 

1126 source_condition_context, 

1127 ) 

1128 except ( 

1129 NoMatchForInstallPatternError, 

1130 PathAlreadyInstalledOrDiscardedError, 

1131 ): 

1132 # Empty directory or everything excluded by default; ignore the error 

1133 pass 

1134 

1135 

1136def _add_build_install_dirs_to_per_package_search_dirs( 

1137 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]], 

1138 per_package_search_dirs: Dict[BinaryPackage, List[VirtualPath]], 

1139 as_path: Callable[[str], VirtualPath], 

1140) -> None: 

1141 seen_pp_search_dirs: Set[Tuple[BinaryPackage, str]] = set() 

1142 for dest_dir, for_packages in build_system_install_dirs: 

1143 dest_path = as_path(dest_dir) 

1144 for pkg in for_packages: 

1145 seen_key = (pkg, dest_dir) 

1146 if seen_key in seen_pp_search_dirs: 

1147 continue 

1148 seen_pp_search_dirs.add(seen_key) 

1149 if pkg not in per_package_search_dirs: 

1150 per_package_search_dirs[pkg] = [dest_path] 

1151 else: 

1152 per_package_search_dirs[pkg].append(dest_path) 

1153 

1154 

1155class HighLevelManifest: 

1156 def __init__( 

1157 self, 

1158 manifest_path: str, 

1159 mutable_manifest: Optional[MutableYAMLManifest], 

1160 install_rules: Optional[List[InstallRule]], 

1161 source_package: SourcePackage, 

1162 binary_packages: Mapping[str, BinaryPackage], 

1163 substitution: Substitution, 

1164 package_transformations: Mapping[str, PackageTransformationDefinition], 

1165 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1166 dpkg_arch_query_table: DpkgArchTable, 

1167 build_env: DebBuildOptionsAndProfiles, 

1168 build_environments: BuildEnvironments, 

1169 build_rules: Optional[List[BuildRule]], 

1170 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1171 debian_dir: VirtualPath, 

1172 ) -> None: 

1173 self.manifest_path = manifest_path 

1174 self.mutable_manifest = mutable_manifest 

1175 self._install_rules = install_rules 

1176 self._source_package = source_package 

1177 self._binary_packages = binary_packages 

1178 self.substitution = substitution 

1179 self.package_transformations = package_transformations 

1180 self._dpkg_architecture_variables = dpkg_architecture_variables 

1181 self._dpkg_arch_query_table = dpkg_arch_query_table 

1182 self._build_env = build_env 

1183 self._used_for: Set[str] = set() 

1184 self.build_environments = build_environments 

1185 self.build_rules = build_rules 

1186 self._plugin_provided_feature_set = plugin_provided_feature_set 

1187 self._debian_dir = debian_dir 

1188 self._source_condition_context = ConditionContext( 

1189 binary_package=None, 

1190 substitution=self.substitution, 

1191 deb_options_and_profiles=self._build_env, 

1192 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1193 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1194 ) 

1195 

1196 def source_version(self, include_binnmu_version: bool = True) -> str: 

1197 # TODO: There should an easier way to determine the source version; really. 

1198 version_var = "{{DEB_VERSION}}" 

1199 if not include_binnmu_version: 

1200 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1201 try: 

1202 return self.substitution.substitute( 

1203 version_var, "internal (resolve version)" 

1204 ) 

1205 except DebputySubstitutionError as e: 

1206 raise AssertionError(f"Could not resolve {version_var}") from e 

1207 

1208 @property 

1209 def source_condition_context(self) -> ConditionContext: 

1210 return self._source_condition_context 

1211 

1212 @property 

1213 def debian_dir(self) -> VirtualPath: 

1214 return self._debian_dir 

1215 

1216 @property 

1217 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1218 return self._dpkg_architecture_variables 

1219 

1220 @property 

1221 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1222 return self._build_env 

1223 

1224 @property 

1225 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1226 return self._plugin_provided_feature_set 

1227 

1228 @property 

1229 def active_packages(self) -> Iterable[BinaryPackage]: 

1230 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1231 

1232 @property 

1233 def all_packages(self) -> Iterable[BinaryPackage]: 

1234 yield from self._binary_packages.values() 

1235 

1236 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1237 return self.package_transformations[package] 

1238 

1239 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1240 name = package.name 

1241 # If it is not a -doc package, then docs should be installed 

1242 # under its own package name. 

1243 if not name.endswith("-doc"): 1243 ↛ 1245line 1243 didn't jump to line 1245 because the condition on line 1243 was always true

1244 return package 

1245 name = name[:-4] 

1246 main_package = self._binary_packages.get(name) 

1247 if main_package: 

1248 return main_package 

1249 if name.startswith("lib"): 

1250 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1251 if dev_pkg: 

1252 return dev_pkg 

1253 

1254 # If we found no better match; default to the doc package itself. 

1255 return package 

1256 

1257 def perform_installations( 

1258 self, 

1259 integration_mode: DebputyIntegrationMode, 

1260 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]], 

1261 *, 

1262 install_request_context: Optional[InstallSearchDirContext] = None, 

1263 ) -> PackageDataTable: 

1264 package_data_dict = {} 

1265 package_data_table = PackageDataTable(package_data_dict) 

1266 enable_manifest_installation_feature = ( 

1267 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1268 ) 

1269 

1270 if build_system_install_dirs: 1270 ↛ 1271line 1270 didn't jump to line 1271 because the condition on line 1270 was never true

1271 if integration_mode != INTEGRATION_MODE_FULL: 

1272 raise ValueError( 

1273 "The build_system_install_dirs parameter can only be used in full integration mode" 

1274 ) 

1275 if install_request_context: 

1276 raise ValueError( 

1277 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1278 " (not implemented)" 

1279 ) 

1280 

1281 if install_request_context is None: 1281 ↛ 1283line 1281 didn't jump to line 1283 because the condition on line 1281 was never true

1282 

1283 @functools.lru_cache(None) 

1284 def _as_path(fs_path: str) -> VirtualPath: 

1285 return FSROOverlay.create_root_dir(".", fs_path) 

1286 

1287 dtmp_dir = _as_path("debian/tmp") 

1288 source_root_dir = _as_path(".") 

1289 into = frozenset(self._binary_packages.values()) 

1290 per_package_search_dirs = { 

1291 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1292 for t in self.package_transformations.values() 

1293 if t.search_dirs is not None 

1294 } 

1295 

1296 if integration_mode == INTEGRATION_MODE_FULL: 

1297 # In this mode, we have no default search dir. Everything ends up being 

1298 # per-package instead (since that is easier logic-wise). 

1299 # 

1300 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1301 # Note we still initialize dtmp_dir, since it affects a later guard. 

1302 default_search_dirs = [] 

1303 _add_build_install_dirs_to_per_package_search_dirs( 

1304 build_system_install_dirs, 

1305 per_package_search_dirs, 

1306 _as_path, 

1307 ) 

1308 

1309 # We can end here with per_package_search_dirs having no search dirs for any package 

1310 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1311 # 

1312 # This is not a problem in itself as the installation rules can still apply to the 

1313 # source root and there should be no reason to install something from d/<pkg> into 

1314 # d/<another-pkg> 

1315 else: 

1316 default_search_dirs = [dtmp_dir] 

1317 

1318 search_dirs = _determine_search_dir_order( 

1319 per_package_search_dirs, 

1320 into, 

1321 default_search_dirs, 

1322 source_root_dir, 

1323 ) 

1324 check_for_uninstalled_dirs = tuple( 

1325 s.search_dir 

1326 for s in search_dirs 

1327 if s.search_dir.fs_path != source_root_dir.fs_path 

1328 ) 

1329 if enable_manifest_installation_feature: 

1330 _present_installation_dirs( 

1331 search_dirs, check_for_uninstalled_dirs, into 

1332 ) 

1333 else: 

1334 dtmp_dir = None 

1335 search_dirs = install_request_context.search_dirs 

1336 into = frozenset(self._binary_packages.values()) 

1337 seen: Set[BinaryPackage] = set() 

1338 for search_dir in search_dirs: 

1339 seen.update(search_dir.applies_to) 

1340 

1341 missing = into - seen 

1342 if missing: 1342 ↛ 1343line 1342 didn't jump to line 1343 because the condition on line 1342 was never true

1343 names = ", ".join(p.name for p in missing) 

1344 raise ValueError( 

1345 f"The following package(s) had no search dirs: {names}." 

1346 " (Generally, the source root would be applicable to all packages)" 

1347 ) 

1348 extra_names = seen - into 

1349 if extra_names: 1349 ↛ 1350line 1349 didn't jump to line 1350 because the condition on line 1349 was never true

1350 names = ", ".join(p.name for p in extra_names) 

1351 raise ValueError( 

1352 f"The install_request_context referenced the following unknown package(s): {names}" 

1353 ) 

1354 

1355 check_for_uninstalled_dirs = ( 

1356 install_request_context.check_for_uninstalled_dirs 

1357 ) 

1358 

1359 install_rule_context = InstallRuleContext(search_dirs) 

1360 

1361 if ( 1361 ↛ 1368line 1361 didn't jump to line 1368 because the condition on line 1361 was never true

1362 enable_manifest_installation_feature 

1363 and self._install_rules is None 

1364 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1365 and dtmp_dir is not None 

1366 and os.path.isdir(dtmp_dir.fs_path) 

1367 ): 

1368 msg = ( 

1369 "The build system appears to have provided the output of upstream build system's" 

1370 " install in debian/tmp. However, these are no provisions for debputy to install" 

1371 " any of that into any of the debian packages listed in debian/control." 

1372 " To avoid accidentally creating empty packages, debputy will insist that you " 

1373 " explicitly define an empty installation definition if you did not want to " 

1374 " install any of those files even though they have been provided." 

1375 ' Example: "installations: []"' 

1376 ) 

1377 _error(msg) 

1378 elif ( 1378 ↛ 1381line 1378 didn't jump to line 1381 because the condition on line 1378 was never true

1379 not enable_manifest_installation_feature and self._install_rules is not None 

1380 ): 

1381 _error( 

1382 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1383 f" Please remove or comment out the `installations` keyword." 

1384 ) 

1385 

1386 for dctrl_bin in self.all_packages: 

1387 package = dctrl_bin.name 

1388 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1389 

1390 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1391 dctrl_bin, 

1392 FSRootDir(), 

1393 doc_main_package, 

1394 ) 

1395 

1396 if enable_manifest_installation_feature: 1396 ↛ 1401line 1396 didn't jump to line 1401 because the condition on line 1396 was always true

1397 discard_rules = list( 

1398 self.plugin_provided_feature_set.auto_discard_rules.values() 

1399 ) 

1400 else: 

1401 discard_rules = [ 

1402 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1403 ] 

1404 path_matcher = SourcePathMatcher(discard_rules) 

1405 

1406 source_condition_context = self._source_condition_context 

1407 

1408 for dctrl_bin in self.active_packages: 

1409 package = dctrl_bin.name 

1410 if install_request_context: 1410 ↛ 1415line 1410 didn't jump to line 1415 because the condition on line 1410 was always true

1411 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1412 package 

1413 ) 

1414 else: 

1415 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1416 if os.path.isdir(build_system_staging_dir_fs_path): 

1417 build_system_staging_dir = FSROOverlay.create_root_dir( 

1418 ".", 

1419 build_system_staging_dir_fs_path, 

1420 ) 

1421 else: 

1422 build_system_staging_dir = None 

1423 

1424 if build_system_staging_dir is not None: 

1425 _install_everything_from_source_dir_if_present( 

1426 dctrl_bin, 

1427 self.substitution, 

1428 path_matcher, 

1429 install_rule_context, 

1430 source_condition_context, 

1431 build_system_staging_dir, 

1432 ) 

1433 

1434 if self._install_rules: 

1435 # FIXME: Check that every install rule remains used after transformations have run. 

1436 # What we want to check is transformations do not exclude everything from an install 

1437 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1438 # match. 

1439 for install_rule in self._install_rules: 

1440 install_rule.perform_install( 

1441 path_matcher, 

1442 install_rule_context, 

1443 source_condition_context, 

1444 ) 

1445 

1446 if enable_manifest_installation_feature: 1446 ↛ 1450line 1446 didn't jump to line 1450 because the condition on line 1446 was always true

1447 for search_dir in check_for_uninstalled_dirs: 

1448 _detect_missing_installations(path_matcher, search_dir) 

1449 

1450 for dctrl_bin in self.all_packages: 

1451 package = dctrl_bin.name 

1452 binary_install_rule_context = install_rule_context[package] 

1453 build_system_pkg_staging_dir = os.path.join("debian", package) 

1454 fs_root = binary_install_rule_context.fs_root 

1455 

1456 context = self.package_transformations[package] 

1457 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1458 for special_install_rule in context.install_rules: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the loop on line 1458 never started

1459 special_install_rule.perform_install( 

1460 path_matcher, 

1461 install_rule_context, 

1462 source_condition_context, 

1463 ) 

1464 

1465 if dctrl_bin.should_be_acted_on: 

1466 self.apply_fs_transformations(package, fs_root) 

1467 substvars_file = f"debian/{package}.substvars" 

1468 substvars = FlushableSubstvars.load_from_path( 

1469 substvars_file, missing_ok=True 

1470 ) 

1471 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1472 substvars.substvars_path = None 

1473 else: 

1474 substvars = FlushableSubstvars() 

1475 

1476 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1477 if udeb_package and not udeb_package.is_udeb: 1477 ↛ 1478line 1477 didn't jump to line 1478 because the condition on line 1477 was never true

1478 udeb_package = None 

1479 

1480 package_metadata_context = PackageProcessingContextProvider( 

1481 self, 

1482 dctrl_bin, 

1483 udeb_package, 

1484 package_data_table, 

1485 # FIXME: source_package 

1486 ) 

1487 

1488 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1489 package_metadata_context, 

1490 substvars, 

1491 context.maintscript_snippets, 

1492 context.substitution, 

1493 ) 

1494 

1495 if not enable_manifest_installation_feature: 1495 ↛ 1496line 1495 didn't jump to line 1496 because the condition on line 1495 was never true

1496 assert_no_dbgsym_migration(dctrl_bin) 

1497 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1498 dh_dbgsym_root_path = FSROOverlay.create_root_dir( 

1499 "", 

1500 dh_dbgsym_root_fs, 

1501 ) 

1502 dbgsym_root_fs = FSRootDir() 

1503 _install_everything_from_source_dir_if_present( 

1504 dctrl_bin, 

1505 self.substitution, 

1506 path_matcher, 

1507 install_rule_context, 

1508 source_condition_context, 

1509 dh_dbgsym_root_path, 

1510 into_dir=dbgsym_root_fs, 

1511 ) 

1512 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1513 dbgsym_info = DbgsymInfo( 

1514 dctrl_bin, 

1515 dbgsym_root_fs, 

1516 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1517 dbgsym_build_ids, 

1518 ) 

1519 else: 

1520 dbgsym_info = DbgsymInfo( 

1521 dctrl_bin, 

1522 FSRootDir(), 

1523 None, 

1524 [], 

1525 ) 

1526 

1527 package_data_dict[package] = BinaryPackageData( 

1528 self._source_package, 

1529 dctrl_bin, 

1530 build_system_pkg_staging_dir, 

1531 fs_root, 

1532 substvars, 

1533 package_metadata_context, 

1534 ctrl_creator, 

1535 dbgsym_info, 

1536 ) 

1537 

1538 if enable_manifest_installation_feature: 1538 ↛ 1541line 1538 didn't jump to line 1541 because the condition on line 1538 was always true

1539 _list_automatic_discard_rules(path_matcher) 

1540 

1541 return package_data_table 

1542 

1543 def condition_context( 

1544 self, binary_package: Optional[Union[BinaryPackage, str]] 

1545 ) -> ConditionContext: 

1546 if binary_package is None: 1546 ↛ 1547line 1546 didn't jump to line 1547 because the condition on line 1546 was never true

1547 return self._source_condition_context 

1548 if not isinstance(binary_package, str): 1548 ↛ 1549line 1548 didn't jump to line 1549 because the condition on line 1548 was never true

1549 binary_package = binary_package.name 

1550 

1551 package_transformation = self.package_transformations[binary_package] 

1552 return self._source_condition_context.replace( 

1553 binary_package=package_transformation.binary_package, 

1554 substitution=package_transformation.substitution, 

1555 ) 

1556 

1557 def apply_fs_transformations( 

1558 self, 

1559 package: str, 

1560 fs_root: FSPath, 

1561 ) -> None: 

1562 if package in self._used_for: 1562 ↛ 1563line 1562 didn't jump to line 1563 because the condition on line 1562 was never true

1563 raise ValueError( 

1564 f"data.tar contents for {package} has already been finalized!?" 

1565 ) 

1566 if package not in self.package_transformations: 1566 ↛ 1567line 1566 didn't jump to line 1567 because the condition on line 1566 was never true

1567 raise ValueError( 

1568 f'The package "{package}" was not relevant for the manifest!?' 

1569 ) 

1570 package_transformation = self.package_transformations[package] 

1571 condition_context = ConditionContext( 

1572 binary_package=package_transformation.binary_package, 

1573 substitution=package_transformation.substitution, 

1574 deb_options_and_profiles=self._build_env, 

1575 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1576 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1577 ) 

1578 norm_rules = list( 

1579 builtin_mode_normalization_rules( 

1580 self._dpkg_architecture_variables, 

1581 package_transformation.binary_package, 

1582 package_transformation.substitution, 

1583 ) 

1584 ) 

1585 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1586 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1587 for transformation in package_transformation.transformations: 

1588 transformation.run_transform_file_system(fs_root, condition_context) 

1589 interpreter_normalization = NormalizeShebangLineTransformation() 

1590 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1591 

1592 def finalize_data_tar_contents( 

1593 self, 

1594 package: str, 

1595 fs_root: FSPath, 

1596 clamp_mtime_to: int, 

1597 ) -> IntermediateManifest: 

1598 if package in self._used_for: 1598 ↛ 1599line 1598 didn't jump to line 1599 because the condition on line 1598 was never true

1599 raise ValueError( 

1600 f"data.tar contents for {package} has already been finalized!?" 

1601 ) 

1602 if package not in self.package_transformations: 1602 ↛ 1603line 1602 didn't jump to line 1603 because the condition on line 1602 was never true

1603 raise ValueError( 

1604 f'The package "{package}" was not relevant for the manifest!?' 

1605 ) 

1606 self._used_for.add(package) 

1607 

1608 # At this point, there so be no further mutations to the file system (because the will not 

1609 # be present in the intermediate manifest) 

1610 cast("FSRootDir", fs_root).is_read_write = False 

1611 

1612 intermediate_manifest = list( 

1613 _generate_intermediate_manifest( 

1614 fs_root, 

1615 clamp_mtime_to, 

1616 ) 

1617 ) 

1618 return intermediate_manifest 

1619 

1620 def apply_to_binary_staging_directory( 

1621 self, 

1622 package: str, 

1623 fs_root: FSPath, 

1624 clamp_mtime_to: int, 

1625 ) -> IntermediateManifest: 

1626 self.apply_fs_transformations(package, fs_root) 

1627 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1628 

1629 

1630@dataclasses.dataclass(slots=True) 

1631class SearchDirOrderState: 

1632 search_dir: VirtualPath 

1633 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field( 

1634 default_factory=set 

1635 ) 

1636 after: Set[str] = dataclasses.field(default_factory=set) 

1637 

1638 

1639def _present_installation_dirs( 

1640 search_dirs: Sequence[SearchDir], 

1641 checked_missing_dirs: Sequence[VirtualPath], 

1642 all_pkgs: FrozenSet[BinaryPackage], 

1643) -> None: 

1644 _info("The following directories are considered search dirs (in order):") 

1645 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1646 for search_dir in search_dirs: 

1647 applies_to = "" 

1648 if search_dir.applies_to < all_pkgs: 

1649 names = ", ".join(p.name for p in search_dir.applies_to) 

1650 applies_to = f" [only applicable to: {names}]" 

1651 remark = "" 

1652 if not os.path.isdir(search_dir.search_dir.fs_path): 

1653 remark = " (skipped; absent)" 

1654 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1655 

1656 if checked_missing_dirs: 

1657 _info('The following directories are considered for "not-installed" paths;') 

1658 for d in checked_missing_dirs: 

1659 remark = "" 

1660 if not os.path.isdir(d.fs_path): 

1661 remark = " (skipped; absent)" 

1662 _info(f" * {d.fs_path:{max_len}}{remark}") 

1663 

1664 

1665def _determine_search_dir_order( 

1666 requested: Mapping[BinaryPackage, List[VirtualPath]], 

1667 all_pkgs: FrozenSet[BinaryPackage], 

1668 default_search_dirs: List[VirtualPath], 

1669 source_root: VirtualPath, 

1670) -> Sequence[SearchDir]: 

1671 search_dir_table = {} 

1672 assert requested.keys() <= all_pkgs 

1673 for pkg in all_pkgs: 

1674 paths = requested.get(pkg, default_search_dirs) 

1675 previous_search_dir: Optional[SearchDirOrderState] = None 

1676 for path in paths: 

1677 try: 

1678 search_dir_state = search_dir_table[path.fs_path] 

1679 except KeyError: 

1680 search_dir_state = SearchDirOrderState(path) 

1681 search_dir_table[path.fs_path] = search_dir_state 

1682 search_dir_state.applies_to.add(pkg) 

1683 if previous_search_dir is not None: 

1684 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1685 previous_search_dir = search_dir_state 

1686 

1687 search_dirs_in_order = [] 

1688 released = set() 

1689 remaining = set() 

1690 for search_dir_state in search_dir_table.values(): 

1691 if not (search_dir_state.after <= released): 

1692 remaining.add(search_dir_state.search_dir.fs_path) 

1693 continue 

1694 search_dirs_in_order.append(search_dir_state) 

1695 released.add(search_dir_state.search_dir.fs_path) 

1696 

1697 while remaining: 

1698 current_released = len(released) 

1699 for fs_path in remaining: 

1700 search_dir_state = search_dir_table[fs_path] 

1701 if not search_dir_state.after.issubset(released): 

1702 remaining.add(search_dir_state.search_dir.fs_path) 

1703 continue 

1704 search_dirs_in_order.append(search_dir_state) 

1705 released.add(search_dir_state.search_dir.fs_path) 

1706 

1707 if current_released == len(released): 

1708 names = ", ".join(remaining) 

1709 _error( 

1710 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1711 " Note that the search directories across all packages have to be ordered (and the" 

1712 " source root should generally be last)" 

1713 ) 

1714 remaining -= released 

1715 

1716 search_dirs_in_order.append( 

1717 SearchDirOrderState( 

1718 source_root, 

1719 all_pkgs, 

1720 ) 

1721 ) 

1722 

1723 return tuple( 

1724 # Avoid duplicating all_pkgs 

1725 SearchDir( 

1726 s.search_dir, 

1727 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1728 ) 

1729 for s in search_dirs_in_order 

1730 )