Coverage for src/debputy/highlevel_manifest.py: 66%

854 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5from contextlib import suppress 

6from dataclasses import dataclass, field 

7from typing import ( 

8 List, 

9 Dict, 

10 Iterable, 

11 Mapping, 

12 Any, 

13 Union, 

14 Optional, 

15 TypeVar, 

16 Generic, 

17 cast, 

18 Set, 

19 Tuple, 

20 Sequence, 

21 FrozenSet, 

22) 

23 

24from debian.debian_support import DpkgArchTable 

25 

26from debputy.dh.debhelper_emulation import ( 

27 dhe_dbgsym_root_dir, 

28 assert_no_dbgsym_migration, 

29 read_dbgsym_file, 

30) 

31from ._deb_options_profiles import DebBuildOptionsAndProfiles 

32from ._manifest_constants import * 

33from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

34from .builtin_manifest_rules import builtin_mode_normalization_rules 

35from .exceptions import ( 

36 DebputySubstitutionError, 

37 DebputyRuntimeError, 

38 DebputyRuntimeErrorWithPreamble, 

39) 

40from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir 

41from .installations import ( 

42 InstallRule, 

43 SourcePathMatcher, 

44 PathAlreadyInstalledOrDiscardedError, 

45 NoMatchForInstallPatternError, 

46 InstallRuleContext, 

47 BinaryPackageInstallRuleContext, 

48 InstallSearchDirContext, 

49 SearchDir, 

50) 

51from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

52from .maintscript_snippet import ( 

53 DpkgMaintscriptHelperCommand, 

54 MaintscriptSnippetContainer, 

55) 

56from .manifest_conditions import ConditionContext 

57from .manifest_parser.base_types import ( 

58 FileSystemMatchRule, 

59 FileSystemExactMatchRule, 

60 BuildEnvironments, 

61) 

62from .manifest_parser.util import AttributePath 

63from .packager_provided_files import PackagerProvidedFile 

64from .packages import BinaryPackage, SourcePackage 

65from .plugin.api.feature_set import PluginProvidedFeatureSet 

66from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

67from .plugin.api.impl_types import ( 

68 PackageProcessingContextProvider, 

69 PackageDataTable, 

70) 

71from .plugin.api.spec import ( 

72 FlushableSubstvars, 

73 VirtualPath, 

74 DebputyIntegrationMode, 

75 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

76 INTEGRATION_MODE_FULL, 

77) 

78from .plugin.debputy.binary_package_rules import ServiceRule 

79from .plugin.debputy.to_be_api_types import BuildRule 

80from .plugin.plugin_state import run_in_context_of_plugin 

81from .substitution import Substitution 

82from .transformation_rules import ( 

83 TransformationRule, 

84 ModeNormalizationTransformationRule, 

85 NormalizeShebangLineTransformation, 

86) 

87from .util import ( 

88 _error, 

89 _warn, 

90 debian_policy_normalize_symlink_target, 

91 generated_content_dir, 

92 _info, 

93) 

94from .yaml import MANIFEST_YAML 

95from .yaml.compat import CommentedMap, CommentedSeq 

96 

97 

98class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

99 

100 @property 

101 def unmatched_paths(self) -> Sequence[VirtualPath]: 

102 return self.args[1] 

103 

104 @property 

105 def search_dir(self) -> VirtualPath: 

106 return self.args[2] 

107 

108 def render_preamble(self) -> None: 

109 _warn( 

110 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

111 ) 

112 _warn("") 

113 for entry in self.unmatched_paths: 

114 desc = _describe_missing_path(entry) 

115 _warn(f" * {desc}") 

116 _warn("") 

117 

118 

119@dataclass(slots=True) 

120class DbgsymInfo: 

121 binary_package: BinaryPackage 

122 dbgsym_fs_root: FSPath 

123 _dbgsym_root_fs: Optional[str] 

124 dbgsym_ids: List[str] 

125 

126 @property 

127 def dbgsym_root_dir(self) -> str: 

128 root_dir = self._dbgsym_root_fs 

129 if root_dir is None: 

130 root_dir = generated_content_dir( 

131 package=self.binary_package, 

132 subdir_key="dbgsym-fs-root", 

133 ) 

134 self._dbgsym_root_fs = root_dir 

135 return root_dir 

136 

137 @property 

138 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

139 return FSControlRootDir.create_root_dir( 

140 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

141 ) 

142 

143 

144@dataclass(slots=True, frozen=True) 

145class BinaryPackageData: 

146 source_package: SourcePackage 

147 binary_package: BinaryPackage 

148 binary_staging_root_dir: str 

149 fs_root: FSPath 

150 substvars: FlushableSubstvars 

151 package_metadata_context: PackageProcessingContextProvider 

152 ctrl_creator: BinaryCtrlAccessorProviderCreator 

153 dbgsym_info: DbgsymInfo 

154 

155 @property 

156 def control_output_dir(self) -> FSControlRootDir: 

157 return FSControlRootDir.create_root_dir( 

158 generated_content_dir( 

159 package=self.binary_package, 

160 subdir_key="DEBIAN", 

161 ) 

162 ) 

163 

164 

165@dataclass(slots=True) 

166class PackageTransformationDefinition: 

167 binary_package: BinaryPackage 

168 substitution: Substitution 

169 is_auto_generated_package: bool 

170 binary_version: Optional[str] = None 

171 search_dirs: Optional[List[FileSystemExactMatchRule]] = None 

172 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field( 

173 default_factory=list 

174 ) 

175 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field( 

176 default_factory=dict 

177 ) 

178 transformations: List[TransformationRule] = field(default_factory=list) 

179 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field( 

180 default_factory=dict 

181 ) 

182 install_rules: List[InstallRule] = field(default_factory=list) 

183 requested_service_rules: List[ServiceRule] = field(default_factory=list) 

184 

185 

186def _path_to_tar_member( 

187 path: FSPath, 

188 clamp_mtime_to: int, 

189) -> TarMember: 

190 mtime = float(clamp_mtime_to) 

191 owner, uid, group, gid = path.tar_owner_info 

192 mode = path.mode 

193 

194 if path.has_fs_path: 

195 mtime = min(mtime, path.mtime) 

196 

197 if path.is_dir: 

198 path_type = PathType.DIRECTORY 

199 elif path.is_file: 

200 # TODO: someday we will need to deal with hardlinks and it might appear here. 

201 path_type = PathType.FILE 

202 elif path.is_symlink: 202 ↛ 222line 202 didn't jump to line 222 because the condition on line 202 was always true

203 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

204 link_target = debian_policy_normalize_symlink_target( 

205 path.path, 

206 path.readlink(), 

207 ) 

208 return TarMember.virtual_path( 

209 path.tar_path, 

210 PathType.SYMLINK, 

211 mtime, 

212 link_target=link_target, 

213 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

214 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

215 mode=0o0777, 

216 owner=owner, 

217 uid=uid, 

218 group=group, 

219 gid=gid, 

220 ) 

221 else: 

222 assert not path.is_symlink 

223 raise AssertionError( 

224 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

225 ) 

226 

227 if not path.has_fs_path: 

228 assert not path.is_file 

229 return TarMember.virtual_path( 

230 path.tar_path, 

231 path_type, 

232 mtime, 

233 mode=mode, 

234 owner=owner, 

235 uid=uid, 

236 group=group, 

237 gid=gid, 

238 ) 

239 may_steal_fs_path = path._can_replace_inline 

240 return TarMember.from_file( 

241 path.tar_path, 

242 path.fs_path, 

243 mode=mode, 

244 uid=uid, 

245 owner=owner, 

246 gid=gid, 

247 group=group, 

248 path_type=path_type, 

249 path_mtime=mtime, 

250 clamp_mtime_to=clamp_mtime_to, 

251 may_steal_fs_path=may_steal_fs_path, 

252 ) 

253 

254 

255def _generate_intermediate_manifest( 

256 fs_root: FSPath, 

257 clamp_mtime_to: int, 

258) -> Iterable[TarMember]: 

259 symlinks = [] 

260 for path in fs_root.all_paths(): 

261 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

262 if tar_member.path_type == PathType.SYMLINK: 

263 symlinks.append(tar_member) 

264 continue 

265 yield tar_member 

266 yield from symlinks 

267 

268 

269ST = TypeVar("ST") 

270T = TypeVar("T") 

271 

272 

273class AbstractYAMLSubStore(Generic[ST]): 

274 def __init__( 

275 self, 

276 parent_store: Any, 

277 parent_key: Optional[Union[int, str]], 

278 store: Optional[ST] = None, 

279 ) -> None: 

280 if parent_store is not None and parent_key is not None: 

281 try: 

282 from_parent_store = parent_store[parent_key] 

283 except (KeyError, IndexError): 

284 from_parent_store = None 

285 if ( 285 ↛ 290line 285 didn't jump to line 290

286 store is not None 

287 and from_parent_store is not None 

288 and store is not parent_store 

289 ): 

290 raise ValueError( 

291 "Store is provided but is not the one already in the parent store" 

292 ) 

293 if store is None: 293 ↛ 295line 293 didn't jump to line 295 because the condition on line 293 was always true

294 store = from_parent_store 

295 self._parent_store = parent_store 

296 self._parent_key = parent_key 

297 self._is_detached = ( 

298 parent_key is None or parent_store is None or parent_key not in parent_store 

299 ) 

300 assert self._is_detached or store is not None 

301 if store is None: 

302 store = self._create_new_instance() 

303 self._store: ST = store 

304 

305 def _create_new_instance(self) -> ST: 

306 raise NotImplementedError 

307 

308 def create_definition_if_missing(self) -> None: 

309 if self._is_detached: 

310 self.create_definition() 

311 

312 def create_definition(self) -> None: 

313 if not self._is_detached: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise RuntimeError("Definition is already present") 

315 parent_store = self._parent_store 

316 if parent_store is None: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 raise RuntimeError( 

318 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

319 ) 

320 if isinstance(parent_store, list): 

321 assert self._parent_key is None 

322 self._parent_key = len(parent_store) 

323 self._parent_store.append(self._store) 

324 else: 

325 parent_store[self._parent_key] = self._store 

326 self._is_detached = False 

327 

328 def remove_definition(self) -> None: 

329 self._ensure_attached() 

330 del self._parent_store[self._parent_key] 

331 if isinstance(self._parent_store, list): 

332 self._parent_key = None 

333 self._is_detached = True 

334 

335 def _ensure_attached(self) -> None: 

336 if self._is_detached: 

337 raise RuntimeError("The definition has been removed!") 

338 

339 

340class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]): 

341 def _create_new_instance(self) -> List[T]: 

342 return CommentedSeq() 

343 

344 

345class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]): 

346 def _create_new_instance(self) -> Dict[str, T]: 

347 return CommentedMap() 

348 

349 

350class MutableCondition: 

351 @classmethod 

352 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

353 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

354 

355 @classmethod 

356 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

357 return CommentedMap( 

358 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

359 ) 

360 

361 

362class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

363 @classmethod 

364 def new_symlink( 

365 cls, link_path: str, link_target: str, condition: Optional[Any] 

366 ) -> "MutableYAMLSymlink": 

367 inner = { 

368 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

369 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

370 } 

371 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

372 if condition is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 inner["when"] = condition 

374 return cls(None, None, store=CommentedMap(content)) 

375 

376 @property 

377 def symlink_path(self) -> str: 

378 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

379 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

380 ] 

381 

382 @symlink_path.setter 

383 def symlink_path(self, path: str) -> None: 

384 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

385 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

386 ] = path 

387 

388 @property 

389 def symlink_target(self) -> Optional[str]: 

390 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

391 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

392 ] 

393 

394 @symlink_target.setter 

395 def symlink_target(self, target: str) -> None: 

396 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

397 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

398 ] = target 

399 

400 

401class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

402 @classmethod 

403 def rm_conffile( 

404 cls, 

405 conffile: str, 

406 prior_to_version: Optional[str], 

407 owning_package: Optional[str], 

408 ) -> "MutableYAMLConffileManagementItem": 

409 r = cls( 

410 None, 

411 None, 

412 store=CommentedMap( 

413 { 

414 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

415 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

416 ) 

417 } 

418 ), 

419 ) 

420 r.prior_to_version = prior_to_version 

421 r.owning_package = owning_package 

422 return r 

423 

424 @classmethod 

425 def mv_conffile( 

426 cls, 

427 old_conffile: str, 

428 new_conffile: str, 

429 prior_to_version: Optional[str], 

430 owning_package: Optional[str], 

431 ) -> "MutableYAMLConffileManagementItem": 

432 r = cls( 

433 None, 

434 None, 

435 store=CommentedMap( 

436 { 

437 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

438 { 

439 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

440 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

441 } 

442 ) 

443 } 

444 ), 

445 ) 

446 r.prior_to_version = prior_to_version 

447 r.owning_package = owning_package 

448 return r 

449 

450 @property 

451 def _container(self) -> Dict[str, Any]: 

452 assert len(self._store) == 1 

453 return next(iter(self._store.values())) 

454 

455 @property 

456 def command(self) -> str: 

457 assert len(self._store) == 1 

458 return next(iter(self._store)) 

459 

460 @property 

461 def obsolete_conffile(self) -> str: 

462 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

463 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

464 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

465 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

466 

467 @obsolete_conffile.setter 

468 def obsolete_conffile(self, value: str) -> None: 

469 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

470 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

471 else: 

472 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

473 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

474 

475 @property 

476 def new_conffile(self) -> str: 

477 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

478 raise TypeError( 

479 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

480 f" This is a {self.command}" 

481 ) 

482 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

483 

484 @new_conffile.setter 

485 def new_conffile(self, value: str) -> None: 

486 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

487 raise TypeError( 

488 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

489 f" This is a {self.command}" 

490 ) 

491 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

492 

493 @property 

494 def prior_to_version(self) -> Optional[str]: 

495 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

496 

497 @prior_to_version.setter 

498 def prior_to_version(self, value: Optional[str]) -> None: 

499 if value is None: 

500 try: 

501 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

502 except KeyError: 

503 pass 

504 else: 

505 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

506 

507 @property 

508 def owning_package(self) -> Optional[str]: 

509 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

510 

511 @owning_package.setter 

512 def owning_package(self, value: Optional[str]) -> None: 

513 if value is None: 

514 try: 

515 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

516 except KeyError: 

517 pass 

518 else: 

519 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

520 

521 

522class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

523 def _list_store( 

524 self, key, *, create_if_absent: bool = False 

525 ) -> Optional[List[Dict[str, Any]]]: 

526 if self._is_detached or key not in self._store: 

527 if create_if_absent: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 return None 

529 self.create_definition_if_missing() 

530 self._store[key] = [] 

531 return self._store[key] 

532 

533 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

534 parent_store = self._list_store(key, create_if_absent=True) 

535 assert parent_store is not None 

536 if not item._is_detached or ( 536 ↛ 539line 536 didn't jump to line 539 because the condition on line 536 was never true

537 item._parent_store is not None and item._parent_store is not parent_store 

538 ): 

539 raise RuntimeError( 

540 "Item is already attached or associated with a different container" 

541 ) 

542 item._parent_store = parent_store 

543 item.create_definition() 

544 

545 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

546 self._insert_item(MK_TRANSFORMATIONS, symlink) 

547 

548 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

549 store = self._list_store(MK_TRANSFORMATIONS) 

550 if store is None: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true

551 return 

552 for i in range(len(store)): 552 ↛ 553line 552 didn't jump to line 553 because the loop on line 552 never started

553 d = store[i] 

554 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

555 yield MutableYAMLSymlink(store, i) 

556 

557 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

558 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

559 if store is None: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 return 

561 yield from ( 

562 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

563 ) 

564 

565 def add_conffile_management( 

566 self, conffile_management_item: MutableYAMLConffileManagementItem 

567 ) -> None: 

568 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

569 

570 

571class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

572 @property 

573 def _container(self) -> Dict[str, Any]: 

574 assert len(self._store) == 1 

575 return next(iter(self._store.values())) 

576 

577 @property 

578 def into(self) -> Optional[List[str]]: 

579 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

580 if v is None: 

581 return None 

582 if isinstance(v, str): 

583 return [v] 

584 return v 

585 

586 @into.setter 

587 def into(self, new_value: Optional[Union[str, List[str]]]) -> None: 

588 if new_value is None: 588 ↛ 592line 588 didn't jump to line 592 because the condition on line 588 was always true

589 with suppress(KeyError): 

590 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

591 return 

592 if isinstance(new_value, str): 

593 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

594 return 

595 new_list = CommentedSeq(new_value) 

596 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

597 

598 @property 

599 def when(self) -> Optional[Union[str, Mapping[str, Any]]]: 

600 return self._container[MK_CONDITION_WHEN] 

601 

602 @when.setter 

603 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None: 

604 if new_value is None: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 with suppress(KeyError): 

606 del self._container[MK_CONDITION_WHEN] 

607 return 

608 if isinstance(new_value, str): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 self._container[MK_CONDITION_WHEN] = new_value 

610 return 

611 new_map = CommentedMap(new_value) 

612 self._container[MK_CONDITION_WHEN] = new_map 

613 

614 @classmethod 

615 def install_dest( 

616 cls, 

617 sources: Union[str, List[str]], 

618 into: Optional[Union[str, List[str]]], 

619 *, 

620 dest_dir: Optional[str] = None, 

621 when: Optional[Union[str, Mapping[str, Any]]] = None, 

622 ) -> "MutableYAMLInstallRuleInstall": 

623 k = MK_INSTALLATIONS_INSTALL_SOURCES 

624 if isinstance(sources, str): 

625 k = MK_INSTALLATIONS_INSTALL_SOURCE 

626 r = MutableYAMLInstallRuleInstall( 

627 None, 

628 None, 

629 store=CommentedMap( 

630 { 

631 MK_INSTALLATIONS_INSTALL: CommentedMap( 

632 { 

633 k: sources, 

634 } 

635 ) 

636 } 

637 ), 

638 ) 

639 r.dest_dir = dest_dir 

640 r.into = into 

641 if when is not None: 

642 r.when = when 

643 return r 

644 

645 @classmethod 

646 def multi_dest_install( 

647 cls, 

648 sources: Union[str, List[str]], 

649 dest_dirs: Sequence[str], 

650 into: Optional[Union[str, List[str]]], 

651 *, 

652 when: Optional[Union[str, Mapping[str, Any]]] = None, 

653 ) -> "MutableYAMLInstallRuleInstall": 

654 k = MK_INSTALLATIONS_INSTALL_SOURCES 

655 if isinstance(sources, str): 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true

656 k = MK_INSTALLATIONS_INSTALL_SOURCE 

657 r = MutableYAMLInstallRuleInstall( 

658 None, 

659 None, 

660 store=CommentedMap( 

661 { 

662 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

663 { 

664 k: sources, 

665 "dest-dirs": dest_dirs, 

666 } 

667 ) 

668 } 

669 ), 

670 ) 

671 r.into = into 

672 if when is not None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 r.when = when 

674 return r 

675 

676 @classmethod 

677 def install_as( 

678 cls, 

679 source: str, 

680 install_as: str, 

681 into: Optional[Union[str, List[str]]], 

682 when: Optional[Union[str, Mapping[str, Any]]] = None, 

683 ) -> "MutableYAMLInstallRuleInstall": 

684 r = MutableYAMLInstallRuleInstall( 

685 None, 

686 None, 

687 store=CommentedMap( 

688 { 

689 MK_INSTALLATIONS_INSTALL: CommentedMap( 

690 { 

691 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

692 MK_INSTALLATIONS_INSTALL_AS: install_as, 

693 } 

694 ) 

695 } 

696 ), 

697 ) 

698 r.into = into 

699 if when is not None: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 r.when = when 

701 return r 

702 

703 @classmethod 

704 def install_doc_as( 

705 cls, 

706 source: str, 

707 install_as: str, 

708 into: Optional[Union[str, List[str]]], 

709 when: Optional[Union[str, Mapping[str, Any]]] = None, 

710 ) -> "MutableYAMLInstallRuleInstall": 

711 r = MutableYAMLInstallRuleInstall( 

712 None, 

713 None, 

714 store=CommentedMap( 

715 { 

716 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

717 { 

718 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

719 MK_INSTALLATIONS_INSTALL_AS: install_as, 

720 } 

721 ) 

722 } 

723 ), 

724 ) 

725 r.into = into 

726 if when is not None: 

727 r.when = when 

728 return r 

729 

730 @classmethod 

731 def install_docs( 

732 cls, 

733 sources: Union[str, List[str]], 

734 into: Optional[Union[str, List[str]]], 

735 *, 

736 dest_dir: Optional[str] = None, 

737 when: Optional[Union[str, Mapping[str, Any]]] = None, 

738 ) -> "MutableYAMLInstallRuleInstall": 

739 k = MK_INSTALLATIONS_INSTALL_SOURCES 

740 if isinstance(sources, str): 

741 k = MK_INSTALLATIONS_INSTALL_SOURCE 

742 r = MutableYAMLInstallRuleInstall( 

743 None, 

744 None, 

745 store=CommentedMap( 

746 { 

747 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

748 { 

749 k: sources, 

750 } 

751 ) 

752 } 

753 ), 

754 ) 

755 r.into = into 

756 r.dest_dir = dest_dir 

757 if when is not None: 

758 r.when = when 

759 return r 

760 

761 @classmethod 

762 def install_examples( 

763 cls, 

764 sources: Union[str, List[str]], 

765 into: Optional[Union[str, List[str]]], 

766 when: Optional[Union[str, Mapping[str, Any]]] = None, 

767 ) -> "MutableYAMLInstallRuleInstallExamples": 

768 k = MK_INSTALLATIONS_INSTALL_SOURCES 

769 if isinstance(sources, str): 

770 k = MK_INSTALLATIONS_INSTALL_SOURCE 

771 r = MutableYAMLInstallRuleInstallExamples( 

772 None, 

773 None, 

774 store=CommentedMap( 

775 { 

776 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

777 { 

778 k: sources, 

779 } 

780 ) 

781 } 

782 ), 

783 ) 

784 r.into = into 

785 if when is not None: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 r.when = when 

787 return r 

788 

789 @classmethod 

790 def install_man( 

791 cls, 

792 sources: Union[str, List[str]], 

793 into: Optional[Union[str, List[str]]], 

794 language: Optional[str], 

795 when: Optional[Union[str, Mapping[str, Any]]] = None, 

796 ) -> "MutableYAMLInstallRuleMan": 

797 k = MK_INSTALLATIONS_INSTALL_SOURCES 

798 if isinstance(sources, str): 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true

799 k = MK_INSTALLATIONS_INSTALL_SOURCE 

800 r = MutableYAMLInstallRuleMan( 

801 None, 

802 None, 

803 store=CommentedMap( 

804 { 

805 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

806 { 

807 k: sources, 

808 } 

809 ) 

810 } 

811 ), 

812 ) 

813 r.language = language 

814 r.into = into 

815 if when is not None: 815 ↛ 816line 815 didn't jump to line 816 because the condition on line 815 was never true

816 r.when = when 

817 return r 

818 

819 @classmethod 

820 def discard( 

821 cls, 

822 sources: Union[str, List[str]], 

823 ) -> "MutableYAMLInstallRuleDiscard": 

824 return MutableYAMLInstallRuleDiscard( 

825 None, 

826 None, 

827 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

828 ) 

829 

830 

831class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

832 pass 

833 

834 

835class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

836 @property 

837 def language(self) -> Optional[str]: 

838 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

839 

840 @language.setter 

841 def language(self, new_value: Optional[str]) -> None: 

842 if new_value is not None: 

843 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

844 return 

845 with suppress(KeyError): 

846 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

847 

848 

849class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

850 pass 

851 

852 

853class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

854 @property 

855 def sources(self) -> List[str]: 

856 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

857 if isinstance(v, str): 

858 return [v] 

859 return v 

860 

861 @sources.setter 

862 def sources(self, new_value: Union[str, List[str]]) -> None: 

863 if isinstance(new_value, str): 

864 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

865 return 

866 new_list = CommentedSeq(new_value) 

867 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

868 

869 @property 

870 def dest_dir(self) -> Optional[str]: 

871 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

872 

873 @dest_dir.setter 

874 def dest_dir(self, new_value: Optional[str]) -> None: 

875 if new_value is not None and self.dest_as is not None: 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true

876 raise ValueError( 

877 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

878 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

879 ) 

880 if new_value is not None: 

881 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

882 else: 

883 with suppress(KeyError): 

884 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

885 

886 @property 

887 def dest_as(self) -> Optional[str]: 

888 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

889 

890 @dest_as.setter 

891 def dest_as(self, new_value: Optional[str]) -> None: 

892 if new_value is not None: 

893 if self.dest_dir is not None: 

894 raise ValueError( 

895 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

896 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

897 ) 

898 

899 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

900 if isinstance(sources, list): 

901 if len(sources) != 1: 

902 raise ValueError( 

903 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

904 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

905 ) 

906 self.sources = sources[0] 

907 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

908 else: 

909 with suppress(KeyError): 

910 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

911 

912 

913class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

914 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

915 parent_store = self._store 

916 if not install_rule._is_detached or ( 916 ↛ 920line 916 didn't jump to line 920 because the condition on line 916 was never true

917 install_rule._parent_store is not None 

918 and install_rule._parent_store is not parent_store 

919 ): 

920 raise RuntimeError( 

921 "Item is already attached or associated with a different container" 

922 ) 

923 self.create_definition_if_missing() 

924 install_rule._parent_store = parent_store 

925 install_rule.create_definition() 

926 

927 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

928 parent_store = self._store 

929 for install_rule in install_rules: 

930 if not install_rule._is_detached or ( 930 ↛ 934line 930 didn't jump to line 934 because the condition on line 930 was never true

931 install_rule._parent_store is not None 

932 and install_rule._parent_store is not parent_store 

933 ): 

934 raise RuntimeError( 

935 "Item is already attached or associated with a different container" 

936 ) 

937 self.create_definition_if_missing() 

938 install_rule._parent_store = parent_store 

939 install_rule.create_definition() 

940 

941 

942class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

943 @property 

944 def variables(self) -> Dict[str, Any]: 

945 return self._store 

946 

947 def __setitem__(self, key: str, value: Any) -> None: 

948 self._store[key] = value 

949 self.create_definition_if_missing() 

950 

951 

952class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

953 def manifest_variables( 

954 self, *, create_if_absent: bool = True 

955 ) -> MutableYAMLManifestVariables: 

956 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

957 if create_if_absent: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 d.create_definition_if_missing() 

959 return d 

960 

961 

962class MutableYAMLManifest: 

963 def __init__(self, store: Any) -> None: 

964 self._store = store 

965 

966 @classmethod 

967 def empty_manifest(cls) -> "MutableYAMLManifest": 

968 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

969 

970 @property 

971 def manifest_version(self) -> str: 

972 return self._store[MK_MANIFEST_VERSION] 

973 

974 @manifest_version.setter 

975 def manifest_version(self, version: str) -> None: 

976 if version not in SUPPORTED_MANIFEST_VERSIONS: 

977 raise ValueError("Unsupported version") 

978 self._store[MK_MANIFEST_VERSION] = version 

979 

980 def installations( 

981 self, 

982 *, 

983 create_if_absent: bool = True, 

984 ) -> MutableYAMLInstallationsDefinition: 

985 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

986 if create_if_absent: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true

987 d.create_definition_if_missing() 

988 return d 

989 

990 def manifest_definitions( 

991 self, 

992 *, 

993 create_if_absent: bool = True, 

994 ) -> MutableYAMLManifestDefinitions: 

995 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

996 if create_if_absent: 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true

997 d.create_definition_if_missing() 

998 return d 

999 

1000 def package( 

1001 self, name: str, *, create_if_absent: bool = True 

1002 ) -> MutableYAMLPackageDefinition: 

1003 if MK_PACKAGES not in self._store: 1003 ↛ 1005line 1003 didn't jump to line 1005 because the condition on line 1003 was always true

1004 self._store[MK_PACKAGES] = CommentedMap() 

1005 packages_store = self._store[MK_PACKAGES] 

1006 package = packages_store.get(name) 

1007 if package is None: 1007 ↛ 1014line 1007 didn't jump to line 1014 because the condition on line 1007 was always true

1008 if not create_if_absent: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 raise KeyError(name) 

1010 assert packages_store is not None 

1011 d = MutableYAMLPackageDefinition(packages_store, name) 

1012 d.create_definition() 

1013 else: 

1014 d = MutableYAMLPackageDefinition(packages_store, name) 

1015 return d 

1016 

1017 def write_to(self, fd) -> None: 

1018 MANIFEST_YAML.dump(self._store, fd) 

1019 

1020 

1021def _describe_missing_path(entry: VirtualPath) -> str: 

1022 if entry.is_dir: 

1023 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1024 if entry.is_symlink: 

1025 target = os.readlink(entry.fs_path) 

1026 return f"{entry.fs_path} (symlink; links to {target})" 

1027 if entry.is_file: 

1028 return f"{entry.fs_path} (file)" 

1029 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1030 

1031 

1032def _detect_missing_installations( 

1033 path_matcher: SourcePathMatcher, 

1034 search_dir: VirtualPath, 

1035) -> None: 

1036 if not search_dir.is_dir: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true

1037 return 

1038 missing = list(path_matcher.detect_missing(search_dir)) 

1039 if not missing: 

1040 return 

1041 

1042 excl = textwrap.dedent( 

1043 """\ 

1044 - discard: "*" 

1045 """ 

1046 ) 

1047 

1048 raise PathNotCoveredByInstallRulesError( 

1049 "Please review the list and add either install rules or exclusions to `installations` in" 

1050 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1051 f" end of your 'installations`:\n\n{excl}\n", 

1052 missing, 

1053 search_dir, 

1054 ) 

1055 

1056 

1057def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1058 used_discard_rules = path_matcher.used_auto_discard_rules 

1059 # Discard rules can match and then be overridden. In that case, they appear 

1060 # but have 0 matches. 

1061 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1062 return 

1063 _info("The following automatic discard rules were triggered:") 

1064 example_path: Optional[str] = None 

1065 for rule in sorted(used_discard_rules): 

1066 for fs_path in sorted(used_discard_rules[rule]): 

1067 if example_path is None: 1067 ↛ 1069line 1067 didn't jump to line 1069 because the condition on line 1067 was always true

1068 example_path = fs_path 

1069 _info(f" * {rule} -> {fs_path}") 

1070 assert example_path is not None 

1071 _info("") 

1072 _info( 

1073 "Note that some of these may have been overruled. The overrule detection logic is not" 

1074 ) 

1075 _info("100% reliable.") 

1076 _info("") 

1077 _info( 

1078 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1079 ) 

1080 _info(" installations:") 

1081 _info(" - install:") 

1082 _info(f" source: {example_path}") 

1083 

1084 

1085def _install_everything_from_source_dir_if_present( 

1086 dctrl_bin: BinaryPackage, 

1087 substitution: Substitution, 

1088 path_matcher: SourcePathMatcher, 

1089 install_rule_context: InstallRuleContext, 

1090 source_condition_context: ConditionContext, 

1091 source_dir: VirtualPath, 

1092 *, 

1093 into_dir: Optional[VirtualPath] = None, 

1094) -> None: 

1095 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1096 pkg_set = frozenset([dctrl_bin]) 

1097 install_rule = run_in_context_of_plugin( 

1098 "debputy", 

1099 InstallRule.install_dest, 

1100 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1101 None, 

1102 pkg_set, 

1103 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1104 None, 

1105 ) 

1106 pkg_search_dir: Tuple[SearchDir] = ( 

1107 SearchDir( 

1108 source_dir, 

1109 pkg_set, 

1110 ), 

1111 ) 

1112 replacements = { 

1113 "search_dirs": pkg_search_dir, 

1114 } 

1115 if into_dir is not None: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true

1116 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1117 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1118 binary_package_contexts[dctrl_bin.name] = updated 

1119 replacements["binary_package_contexts"] = binary_package_contexts 

1120 

1121 fake_install_rule_context = install_rule_context.replace(**replacements) 

1122 try: 

1123 install_rule.perform_install( 

1124 path_matcher, 

1125 fake_install_rule_context, 

1126 source_condition_context, 

1127 ) 

1128 except ( 

1129 NoMatchForInstallPatternError, 

1130 PathAlreadyInstalledOrDiscardedError, 

1131 ): 

1132 # Empty directory or everything excluded by default; ignore the error 

1133 pass 

1134 

1135 

1136class HighLevelManifest: 

1137 def __init__( 

1138 self, 

1139 manifest_path: str, 

1140 mutable_manifest: Optional[MutableYAMLManifest], 

1141 install_rules: Optional[List[InstallRule]], 

1142 source_package: SourcePackage, 

1143 binary_packages: Mapping[str, BinaryPackage], 

1144 substitution: Substitution, 

1145 package_transformations: Mapping[str, PackageTransformationDefinition], 

1146 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1147 dpkg_arch_query_table: DpkgArchTable, 

1148 build_env: DebBuildOptionsAndProfiles, 

1149 build_environments: BuildEnvironments, 

1150 build_rules: Optional[List[BuildRule]], 

1151 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1152 debian_dir: VirtualPath, 

1153 ) -> None: 

1154 self.manifest_path = manifest_path 

1155 self.mutable_manifest = mutable_manifest 

1156 self._install_rules = install_rules 

1157 self._source_package = source_package 

1158 self._binary_packages = binary_packages 

1159 self.substitution = substitution 

1160 self.package_transformations = package_transformations 

1161 self._dpkg_architecture_variables = dpkg_architecture_variables 

1162 self._dpkg_arch_query_table = dpkg_arch_query_table 

1163 self._build_env = build_env 

1164 self._used_for: Set[str] = set() 

1165 self.build_environments = build_environments 

1166 self.build_rules = build_rules 

1167 self._plugin_provided_feature_set = plugin_provided_feature_set 

1168 self._debian_dir = debian_dir 

1169 self._source_condition_context = ConditionContext( 

1170 binary_package=None, 

1171 substitution=self.substitution, 

1172 deb_options_and_profiles=self._build_env, 

1173 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1174 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1175 ) 

1176 

1177 def source_version(self, include_binnmu_version: bool = True) -> str: 

1178 # TODO: There should an easier way to determine the source version; really. 

1179 version_var = "{{DEB_VERSION}}" 

1180 if not include_binnmu_version: 

1181 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1182 try: 

1183 return self.substitution.substitute( 

1184 version_var, "internal (resolve version)" 

1185 ) 

1186 except DebputySubstitutionError as e: 

1187 raise AssertionError(f"Could not resolve {version_var}") from e 

1188 

1189 @property 

1190 def source_condition_context(self) -> ConditionContext: 

1191 return self._source_condition_context 

1192 

1193 @property 

1194 def debian_dir(self) -> VirtualPath: 

1195 return self._debian_dir 

1196 

1197 @property 

1198 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1199 return self._dpkg_architecture_variables 

1200 

1201 @property 

1202 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1203 return self._build_env 

1204 

1205 @property 

1206 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1207 return self._plugin_provided_feature_set 

1208 

1209 @property 

1210 def active_packages(self) -> Iterable[BinaryPackage]: 

1211 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1212 

1213 @property 

1214 def all_packages(self) -> Iterable[BinaryPackage]: 

1215 yield from self._binary_packages.values() 

1216 

1217 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1218 return self.package_transformations[package] 

1219 

1220 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1221 name = package.name 

1222 # If it is not a -doc package, then docs should be installed 

1223 # under its own package name. 

1224 if not name.endswith("-doc"): 1224 ↛ 1226line 1224 didn't jump to line 1226 because the condition on line 1224 was always true

1225 return package 

1226 name = name[:-4] 

1227 main_package = self._binary_packages.get(name) 

1228 if main_package: 

1229 return main_package 

1230 if name.startswith("lib"): 

1231 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1232 if dev_pkg: 

1233 return dev_pkg 

1234 

1235 # If we found no better match; default to the doc package itself. 

1236 return package 

1237 

1238 def perform_installations( 

1239 self, 

1240 integration_mode: DebputyIntegrationMode, 

1241 *, 

1242 install_request_context: Optional[InstallSearchDirContext] = None, 

1243 ) -> PackageDataTable: 

1244 package_data_dict = {} 

1245 package_data_table = PackageDataTable(package_data_dict) 

1246 enable_manifest_installation_feature = ( 

1247 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1248 ) 

1249 if install_request_context is None: 1249 ↛ 1251line 1249 didn't jump to line 1251 because the condition on line 1249 was never true

1250 

1251 @functools.lru_cache(None) 

1252 def _as_path(fs_path: str) -> VirtualPath: 

1253 return FSROOverlay.create_root_dir(".", fs_path) 

1254 

1255 dtmp_dir = _as_path("debian/tmp") 

1256 source_root_dir = _as_path(".") 

1257 into = frozenset(self._binary_packages.values()) 

1258 per_package_search_dirs = { 

1259 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1260 for t in self.package_transformations.values() 

1261 if t.search_dirs is not None 

1262 } 

1263 

1264 if integration_mode == INTEGRATION_MODE_FULL: 

1265 # In this mode, we have no default search dir. Everything ends up being 

1266 # per-package instead (since that is easier logic-wise). 

1267 # 

1268 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1269 # Note we still initialize dtmp_dir, since it affects a later guard. 

1270 default_search_dirs = [] 

1271 seen_pp_search_dirs: Set[Tuple[BinaryPackage, str]] = set() 

1272 # TODO: We should **not** re-detect the default build system here. Instead, 

1273 # we should have accumulated these per-package install dirs when we 

1274 # invoked the build systems (notably, we are also not using the right 

1275 # environment, which happens to work for now but that might break in 

1276 # the future). 

1277 build_rules = self.build_rules 

1278 if build_rules is None: 

1279 from .build_support.buildsystem_detection import ( 

1280 auto_detect_buildsystem, 

1281 ) 

1282 

1283 bs = auto_detect_buildsystem(self) 

1284 build_rules = [bs] if bs is not None else [] 

1285 

1286 for build_rule in build_rules: 

1287 dest_dir = build_rule.install_dest_dir() 

1288 if dest_dir is None: 

1289 continue 

1290 dest_path = _as_path(dest_dir) 

1291 for pkg in build_rule.for_packages: 

1292 seen_key = (pkg, dest_dir) 

1293 if seen_key in seen_pp_search_dirs: 

1294 continue 

1295 seen_pp_search_dirs.add(seen_key) 

1296 if pkg not in per_package_search_dirs: 

1297 per_package_search_dirs[pkg] = [dest_path] 

1298 else: 

1299 per_package_search_dirs[pkg].append(dest_path) 

1300 

1301 # We can end here with per_package_search_dirs having no search dirs for any package 

1302 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1303 # 

1304 # This is not a problem in itself as the installation rules can still apply to the 

1305 # source root and there should be no reason to install something from d/<pkg> into 

1306 # d/<another-pkg> 

1307 else: 

1308 default_search_dirs = [dtmp_dir] 

1309 

1310 search_dirs = _determine_search_dir_order( 

1311 per_package_search_dirs, 

1312 into, 

1313 default_search_dirs, 

1314 source_root_dir, 

1315 ) 

1316 check_for_uninstalled_dirs = tuple( 

1317 s.search_dir 

1318 for s in search_dirs 

1319 if s.search_dir.fs_path != source_root_dir.fs_path 

1320 ) 

1321 if enable_manifest_installation_feature: 

1322 _present_installation_dirs( 

1323 search_dirs, check_for_uninstalled_dirs, into 

1324 ) 

1325 else: 

1326 dtmp_dir = None 

1327 search_dirs = install_request_context.search_dirs 

1328 into = frozenset(self._binary_packages.values()) 

1329 seen: Set[BinaryPackage] = set() 

1330 for search_dir in search_dirs: 

1331 seen.update(search_dir.applies_to) 

1332 

1333 missing = into - seen 

1334 if missing: 1334 ↛ 1335line 1334 didn't jump to line 1335 because the condition on line 1334 was never true

1335 names = ", ".join(p.name for p in missing) 

1336 raise ValueError( 

1337 f"The following package(s) had no search dirs: {names}." 

1338 " (Generally, the source root would be applicable to all packages)" 

1339 ) 

1340 extra_names = seen - into 

1341 if extra_names: 1341 ↛ 1342line 1341 didn't jump to line 1342 because the condition on line 1341 was never true

1342 names = ", ".join(p.name for p in extra_names) 

1343 raise ValueError( 

1344 f"The install_request_context referenced the following unknown package(s): {names}" 

1345 ) 

1346 

1347 check_for_uninstalled_dirs = ( 

1348 install_request_context.check_for_uninstalled_dirs 

1349 ) 

1350 

1351 install_rule_context = InstallRuleContext(search_dirs) 

1352 

1353 if ( 1353 ↛ 1360line 1353 didn't jump to line 1360

1354 enable_manifest_installation_feature 

1355 and self._install_rules is None 

1356 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1357 and dtmp_dir is not None 

1358 and os.path.isdir(dtmp_dir.fs_path) 

1359 ): 

1360 msg = ( 

1361 "The build system appears to have provided the output of upstream build system's" 

1362 " install in debian/tmp. However, these are no provisions for debputy to install" 

1363 " any of that into any of the debian packages listed in debian/control." 

1364 " To avoid accidentally creating empty packages, debputy will insist that you " 

1365 " explicitly define an empty installation definition if you did not want to " 

1366 " install any of those files even though they have been provided." 

1367 ' Example: "installations: []"' 

1368 ) 

1369 _error(msg) 

1370 elif ( 1370 ↛ 1373line 1370 didn't jump to line 1373

1371 not enable_manifest_installation_feature and self._install_rules is not None 

1372 ): 

1373 _error( 

1374 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1375 f" Please remove or comment out the `installations` keyword." 

1376 ) 

1377 

1378 for dctrl_bin in self.all_packages: 

1379 package = dctrl_bin.name 

1380 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1381 

1382 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1383 dctrl_bin, 

1384 FSRootDir(), 

1385 doc_main_package, 

1386 ) 

1387 

1388 if enable_manifest_installation_feature: 1388 ↛ 1393line 1388 didn't jump to line 1393

1389 discard_rules = list( 

1390 self.plugin_provided_feature_set.auto_discard_rules.values() 

1391 ) 

1392 else: 

1393 discard_rules = [ 

1394 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1395 ] 

1396 path_matcher = SourcePathMatcher(discard_rules) 

1397 

1398 source_condition_context = self._source_condition_context 

1399 

1400 for dctrl_bin in self.active_packages: 

1401 package = dctrl_bin.name 

1402 if install_request_context: 1402 ↛ 1407line 1402 didn't jump to line 1407 because the condition on line 1402 was always true

1403 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1404 package 

1405 ) 

1406 else: 

1407 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1408 if os.path.isdir(build_system_staging_dir_fs_path): 

1409 build_system_staging_dir = FSROOverlay.create_root_dir( 

1410 ".", 

1411 build_system_staging_dir_fs_path, 

1412 ) 

1413 else: 

1414 build_system_staging_dir = None 

1415 

1416 if build_system_staging_dir is not None: 

1417 _install_everything_from_source_dir_if_present( 

1418 dctrl_bin, 

1419 self.substitution, 

1420 path_matcher, 

1421 install_rule_context, 

1422 source_condition_context, 

1423 build_system_staging_dir, 

1424 ) 

1425 

1426 if self._install_rules: 

1427 # FIXME: Check that every install rule remains used after transformations have run. 

1428 # What we want to check is transformations do not exclude everything from an install 

1429 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1430 # match. 

1431 for install_rule in self._install_rules: 

1432 install_rule.perform_install( 

1433 path_matcher, 

1434 install_rule_context, 

1435 source_condition_context, 

1436 ) 

1437 

1438 if enable_manifest_installation_feature: 1438 ↛ 1442line 1438 didn't jump to line 1442 because the condition on line 1438 was always true

1439 for search_dir in check_for_uninstalled_dirs: 

1440 _detect_missing_installations(path_matcher, search_dir) 

1441 

1442 for dctrl_bin in self.all_packages: 

1443 package = dctrl_bin.name 

1444 binary_install_rule_context = install_rule_context[package] 

1445 build_system_pkg_staging_dir = os.path.join("debian", package) 

1446 fs_root = binary_install_rule_context.fs_root 

1447 

1448 context = self.package_transformations[package] 

1449 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1450 for special_install_rule in context.install_rules: 1450 ↛ 1451line 1450 didn't jump to line 1451 because the loop on line 1450 never started

1451 special_install_rule.perform_install( 

1452 path_matcher, 

1453 install_rule_context, 

1454 source_condition_context, 

1455 ) 

1456 

1457 if dctrl_bin.should_be_acted_on: 

1458 self.apply_fs_transformations(package, fs_root) 

1459 substvars_file = f"debian/{package}.substvars" 

1460 substvars = FlushableSubstvars.load_from_path( 

1461 substvars_file, missing_ok=True 

1462 ) 

1463 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1464 substvars.substvars_path = None 

1465 else: 

1466 substvars = FlushableSubstvars() 

1467 

1468 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1469 if udeb_package and not udeb_package.is_udeb: 1469 ↛ 1470line 1469 didn't jump to line 1470 because the condition on line 1469 was never true

1470 udeb_package = None 

1471 

1472 package_metadata_context = PackageProcessingContextProvider( 

1473 self, 

1474 dctrl_bin, 

1475 udeb_package, 

1476 package_data_table, 

1477 # FIXME: source_package 

1478 ) 

1479 

1480 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1481 package_metadata_context, 

1482 substvars, 

1483 context.maintscript_snippets, 

1484 context.substitution, 

1485 ) 

1486 

1487 if not enable_manifest_installation_feature: 1487 ↛ 1488line 1487 didn't jump to line 1488 because the condition on line 1487 was never true

1488 assert_no_dbgsym_migration(dctrl_bin) 

1489 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1490 dh_dbgsym_root_path = FSROOverlay.create_root_dir( 

1491 "", 

1492 dh_dbgsym_root_fs, 

1493 ) 

1494 dbgsym_root_fs = FSRootDir() 

1495 _install_everything_from_source_dir_if_present( 

1496 dctrl_bin, 

1497 self.substitution, 

1498 path_matcher, 

1499 install_rule_context, 

1500 source_condition_context, 

1501 dh_dbgsym_root_path, 

1502 into_dir=dbgsym_root_fs, 

1503 ) 

1504 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1505 dbgsym_info = DbgsymInfo( 

1506 dctrl_bin, 

1507 dbgsym_root_fs, 

1508 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1509 dbgsym_build_ids, 

1510 ) 

1511 else: 

1512 dbgsym_info = DbgsymInfo( 

1513 dctrl_bin, 

1514 FSRootDir(), 

1515 None, 

1516 [], 

1517 ) 

1518 

1519 package_data_dict[package] = BinaryPackageData( 

1520 self._source_package, 

1521 dctrl_bin, 

1522 build_system_pkg_staging_dir, 

1523 fs_root, 

1524 substvars, 

1525 package_metadata_context, 

1526 ctrl_creator, 

1527 dbgsym_info, 

1528 ) 

1529 

1530 if enable_manifest_installation_feature: 1530 ↛ 1533line 1530 didn't jump to line 1533 because the condition on line 1530 was always true

1531 _list_automatic_discard_rules(path_matcher) 

1532 

1533 return package_data_table 

1534 

1535 def condition_context( 

1536 self, binary_package: Optional[Union[BinaryPackage, str]] 

1537 ) -> ConditionContext: 

1538 if binary_package is None: 1538 ↛ 1539line 1538 didn't jump to line 1539 because the condition on line 1538 was never true

1539 return self._source_condition_context 

1540 if not isinstance(binary_package, str): 1540 ↛ 1541line 1540 didn't jump to line 1541 because the condition on line 1540 was never true

1541 binary_package = binary_package.name 

1542 

1543 package_transformation = self.package_transformations[binary_package] 

1544 return self._source_condition_context.replace( 

1545 binary_package=package_transformation.binary_package, 

1546 substitution=package_transformation.substitution, 

1547 ) 

1548 

1549 def apply_fs_transformations( 

1550 self, 

1551 package: str, 

1552 fs_root: FSPath, 

1553 ) -> None: 

1554 if package in self._used_for: 1554 ↛ 1555line 1554 didn't jump to line 1555 because the condition on line 1554 was never true

1555 raise ValueError( 

1556 f"data.tar contents for {package} has already been finalized!?" 

1557 ) 

1558 if package not in self.package_transformations: 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true

1559 raise ValueError( 

1560 f'The package "{package}" was not relevant for the manifest!?' 

1561 ) 

1562 package_transformation = self.package_transformations[package] 

1563 condition_context = ConditionContext( 

1564 binary_package=package_transformation.binary_package, 

1565 substitution=package_transformation.substitution, 

1566 deb_options_and_profiles=self._build_env, 

1567 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1568 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1569 ) 

1570 norm_rules = list( 

1571 builtin_mode_normalization_rules( 

1572 self._dpkg_architecture_variables, 

1573 package_transformation.binary_package, 

1574 package_transformation.substitution, 

1575 ) 

1576 ) 

1577 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1578 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1579 for transformation in package_transformation.transformations: 

1580 transformation.run_transform_file_system(fs_root, condition_context) 

1581 interpreter_normalization = NormalizeShebangLineTransformation() 

1582 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1583 

1584 def finalize_data_tar_contents( 

1585 self, 

1586 package: str, 

1587 fs_root: FSPath, 

1588 clamp_mtime_to: int, 

1589 ) -> IntermediateManifest: 

1590 if package in self._used_for: 1590 ↛ 1591line 1590 didn't jump to line 1591 because the condition on line 1590 was never true

1591 raise ValueError( 

1592 f"data.tar contents for {package} has already been finalized!?" 

1593 ) 

1594 if package not in self.package_transformations: 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true

1595 raise ValueError( 

1596 f'The package "{package}" was not relevant for the manifest!?' 

1597 ) 

1598 self._used_for.add(package) 

1599 

1600 # At this point, there so be no further mutations to the file system (because the will not 

1601 # be present in the intermediate manifest) 

1602 cast("FSRootDir", fs_root).is_read_write = False 

1603 

1604 intermediate_manifest = list( 

1605 _generate_intermediate_manifest( 

1606 fs_root, 

1607 clamp_mtime_to, 

1608 ) 

1609 ) 

1610 return intermediate_manifest 

1611 

1612 def apply_to_binary_staging_directory( 

1613 self, 

1614 package: str, 

1615 fs_root: FSPath, 

1616 clamp_mtime_to: int, 

1617 ) -> IntermediateManifest: 

1618 self.apply_fs_transformations(package, fs_root) 

1619 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1620 

1621 

1622@dataclasses.dataclass(slots=True) 

1623class SearchDirOrderState: 

1624 search_dir: VirtualPath 

1625 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field( 

1626 default_factory=set 

1627 ) 

1628 after: Set[str] = dataclasses.field(default_factory=set) 

1629 

1630 

1631def _present_installation_dirs( 

1632 search_dirs: Sequence[SearchDir], 

1633 checked_missing_dirs: Sequence[VirtualPath], 

1634 all_pkgs: FrozenSet[BinaryPackage], 

1635) -> None: 

1636 _info("The following directories are considered search dirs (in order):") 

1637 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1638 for search_dir in search_dirs: 

1639 applies_to = "" 

1640 if search_dir.applies_to < all_pkgs: 

1641 names = ", ".join(p.name for p in search_dir.applies_to) 

1642 applies_to = f" [only applicable to: {names}]" 

1643 remark = "" 

1644 if not os.path.isdir(search_dir.search_dir.fs_path): 

1645 remark = " (skipped; absent)" 

1646 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1647 

1648 if checked_missing_dirs: 

1649 _info('The following directories are considered for "not-installed" paths;') 

1650 for d in checked_missing_dirs: 

1651 remark = "" 

1652 if not os.path.isdir(d.fs_path): 

1653 remark = " (skipped; absent)" 

1654 _info(f" * {d.fs_path:{max_len}}{remark}") 

1655 

1656 

1657def _determine_search_dir_order( 

1658 requested: Mapping[BinaryPackage, List[VirtualPath]], 

1659 all_pkgs: FrozenSet[BinaryPackage], 

1660 default_search_dirs: List[VirtualPath], 

1661 source_root: VirtualPath, 

1662) -> Sequence[SearchDir]: 

1663 search_dir_table = {} 

1664 assert requested.keys() <= all_pkgs 

1665 for pkg in all_pkgs: 

1666 paths = requested.get(pkg, default_search_dirs) 

1667 previous_search_dir: Optional[SearchDirOrderState] = None 

1668 for path in paths: 

1669 try: 

1670 search_dir_state = search_dir_table[path.fs_path] 

1671 except KeyError: 

1672 search_dir_state = SearchDirOrderState(path) 

1673 search_dir_table[path.fs_path] = search_dir_state 

1674 search_dir_state.applies_to.add(pkg) 

1675 if previous_search_dir is not None: 

1676 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1677 previous_search_dir = search_dir_state 

1678 

1679 search_dirs_in_order = [] 

1680 released = set() 

1681 remaining = set() 

1682 for search_dir_state in search_dir_table.values(): 

1683 if not (search_dir_state.after <= released): 

1684 remaining.add(search_dir_state.search_dir.fs_path) 

1685 continue 

1686 search_dirs_in_order.append(search_dir_state) 

1687 released.add(search_dir_state.search_dir.fs_path) 

1688 

1689 while remaining: 

1690 current_released = len(released) 

1691 for fs_path in remaining: 

1692 search_dir_state = search_dir_table[fs_path] 

1693 if not search_dir_state.after.issubset(released): 

1694 remaining.add(search_dir_state.search_dir.fs_path) 

1695 continue 

1696 search_dirs_in_order.append(search_dir_state) 

1697 released.add(search_dir_state.search_dir.fs_path) 

1698 

1699 if current_released == len(released): 

1700 names = ", ".join(remaining) 

1701 _error( 

1702 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1703 " Note that the search directories across all packages have to be ordered (and the" 

1704 " source root should generally be last)" 

1705 ) 

1706 remaining -= released 

1707 

1708 search_dirs_in_order.append( 

1709 SearchDirOrderState( 

1710 source_root, 

1711 all_pkgs, 

1712 ) 

1713 ) 

1714 

1715 return tuple( 

1716 # Avoid duplicating all_pkgs 

1717 SearchDir( 

1718 s.search_dir, 

1719 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1720 ) 

1721 for s in search_dirs_in_order 

1722 )