Coverage for src/debputy/highlevel_manifest.py: 64%

878 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import dataclasses 

2import functools 

3import os 

4import textwrap 

5from contextlib import suppress 

6from dataclasses import dataclass, field 

7from typing import ( 

8 List, 

9 Dict, 

10 Iterable, 

11 Mapping, 

12 Any, 

13 Union, 

14 Optional, 

15 TypeVar, 

16 Generic, 

17 cast, 

18 Set, 

19 Tuple, 

20 Sequence, 

21 FrozenSet, 

22 Callable, 

23) 

24 

25from debian.debian_support import DpkgArchTable 

26 

27from debputy.dh.debhelper_emulation import ( 

28 dhe_dbgsym_root_dir, 

29 assert_no_dbgsym_migration, 

30 read_dbgsym_file, 

31) 

32from ._deb_options_profiles import DebBuildOptionsAndProfiles 

33from ._manifest_constants import * 

34from .architecture_support import DpkgArchitectureBuildProcessValuesTable 

35from .builtin_manifest_rules import builtin_mode_normalization_rules 

36from .exceptions import ( 

37 DebputySubstitutionError, 

38 DebputyRuntimeErrorWithPreamble, 

39) 

40from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir 

41from .installations import ( 

42 InstallRule, 

43 SourcePathMatcher, 

44 PathAlreadyInstalledOrDiscardedError, 

45 NoMatchForInstallPatternError, 

46 InstallRuleContext, 

47 BinaryPackageInstallRuleContext, 

48 InstallSearchDirContext, 

49 SearchDir, 

50) 

51from .intermediate_manifest import TarMember, PathType, IntermediateManifest 

52from .maintscript_snippet import ( 

53 DpkgMaintscriptHelperCommand, 

54 MaintscriptSnippetContainer, 

55) 

56from .manifest_conditions import ConditionContext 

57from .manifest_parser.base_types import ( 

58 FileSystemMatchRule, 

59 FileSystemExactMatchRule, 

60 BuildEnvironments, 

61) 

62from .manifest_parser.util import AttributePath 

63from .packager_provided_files import PackagerProvidedFile 

64from .packages import BinaryPackage, SourcePackage 

65from .plugin.api.feature_set import PluginProvidedFeatureSet 

66from .plugin.api.impl import BinaryCtrlAccessorProviderCreator 

67from .plugin.api.impl_types import ( 

68 PackageProcessingContextProvider, 

69 PackageDataTable, 

70) 

71from .plugin.api.spec import ( 

72 FlushableSubstvars, 

73 VirtualPath, 

74 DebputyIntegrationMode, 

75 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

76 INTEGRATION_MODE_FULL, 

77) 

78from debputy.plugins.debputy.binary_package_rules import ServiceRule 

79from debputy.plugins.debputy.build_system_rules import BuildRule 

80from .plugin.plugin_state import run_in_context_of_plugin 

81from .substitution import Substitution 

82from .transformation_rules import ( 

83 TransformationRule, 

84 ModeNormalizationTransformationRule, 

85 NormalizeShebangLineTransformation, 

86) 

87from .util import ( 

88 _error, 

89 _warn, 

90 debian_policy_normalize_symlink_target, 

91 generated_content_dir, 

92 _info, 

93) 

94from .yaml import MANIFEST_YAML 

95from .yaml.compat import CommentedMap, CommentedSeq 

96 

97 

98class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble): 

99 

100 @property 

101 def unmatched_paths(self) -> Sequence[VirtualPath]: 

102 return self.args[1] 

103 

104 @property 

105 def search_dir(self) -> VirtualPath: 

106 return self.args[2] 

107 

108 def render_preamble(self) -> None: 

109 _warn( 

110 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)." 

111 ) 

112 _warn("") 

113 for entry in self.unmatched_paths: 

114 desc = _describe_missing_path(entry) 

115 _warn(f" * {desc}") 

116 _warn("") 

117 

118 

119@dataclass(slots=True) 

120class DbgsymInfo: 

121 binary_package: BinaryPackage 

122 dbgsym_fs_root: FSPath 

123 _dbgsym_root_fs: Optional[str] 

124 dbgsym_ids: List[str] 

125 run_dwz: bool 

126 

127 @property 

128 def dbgsym_root_dir(self) -> str: 

129 root_dir = self._dbgsym_root_fs 

130 if root_dir is None: 

131 root_dir = generated_content_dir( 

132 package=self.binary_package, 

133 subdir_key="dbgsym-fs-root", 

134 ) 

135 self._dbgsym_root_fs = root_dir 

136 return root_dir 

137 

138 @property 

139 def dbgsym_ctrl_dir(self) -> FSControlRootDir: 

140 return FSControlRootDir.create_root_dir( 

141 os.path.join(self.dbgsym_root_dir, "DEBIAN") 

142 ) 

143 

144 

145@dataclass(slots=True, frozen=True) 

146class BinaryPackageData: 

147 source_package: SourcePackage 

148 binary_package: BinaryPackage 

149 binary_staging_root_dir: str 

150 fs_root: FSPath 

151 substvars: FlushableSubstvars 

152 package_metadata_context: PackageProcessingContextProvider 

153 ctrl_creator: BinaryCtrlAccessorProviderCreator 

154 dbgsym_info: DbgsymInfo 

155 

156 @property 

157 def control_output_dir(self) -> FSControlRootDir: 

158 return FSControlRootDir.create_root_dir( 

159 generated_content_dir( 

160 package=self.binary_package, 

161 subdir_key="DEBIAN", 

162 ) 

163 ) 

164 

165 

166@dataclass(slots=True) 

167class PackageTransformationDefinition: 

168 binary_package: BinaryPackage 

169 substitution: Substitution 

170 is_auto_generated_package: bool 

171 binary_version: Optional[str] = None 

172 search_dirs: Optional[List[FileSystemExactMatchRule]] = None 

173 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field( 

174 default_factory=list 

175 ) 

176 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field( 

177 default_factory=dict 

178 ) 

179 transformations: List[TransformationRule] = field(default_factory=list) 

180 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field( 

181 default_factory=dict 

182 ) 

183 install_rules: List[InstallRule] = field(default_factory=list) 

184 requested_service_rules: List[ServiceRule] = field(default_factory=list) 

185 

186 

187def _path_to_tar_member( 

188 path: FSPath, 

189 clamp_mtime_to: int, 

190) -> TarMember: 

191 mtime = float(clamp_mtime_to) 

192 owner, uid, group, gid = path.tar_owner_info 

193 mode = path.mode 

194 

195 if path.has_fs_path: 

196 mtime = min(mtime, path.mtime) 

197 

198 if path.is_dir: 

199 path_type = PathType.DIRECTORY 

200 elif path.is_file: 

201 # TODO: someday we will need to deal with hardlinks and it might appear here. 

202 path_type = PathType.FILE 

203 elif path.is_symlink: 203 ↛ 223line 203 didn't jump to line 223 because the condition on line 203 was always true

204 # Special-case that we resolve immediately (since we need to normalize the target anyway) 

205 link_target = debian_policy_normalize_symlink_target( 

206 path.path, 

207 path.readlink(), 

208 ) 

209 return TarMember.virtual_path( 

210 path.tar_path, 

211 PathType.SYMLINK, 

212 mtime, 

213 link_target=link_target, 

214 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set 

215 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777. 

216 mode=0o0777, 

217 owner=owner, 

218 uid=uid, 

219 group=group, 

220 gid=gid, 

221 ) 

222 else: 

223 assert not path.is_symlink 

224 raise AssertionError( 

225 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!" 

226 ) 

227 

228 if not path.has_fs_path: 

229 assert not path.is_file 

230 return TarMember.virtual_path( 

231 path.tar_path, 

232 path_type, 

233 mtime, 

234 mode=mode, 

235 owner=owner, 

236 uid=uid, 

237 group=group, 

238 gid=gid, 

239 ) 

240 may_steal_fs_path = path._can_replace_inline 

241 return TarMember.from_file( 

242 path.tar_path, 

243 path.fs_path, 

244 mode=mode, 

245 uid=uid, 

246 owner=owner, 

247 gid=gid, 

248 group=group, 

249 path_type=path_type, 

250 path_mtime=mtime, 

251 clamp_mtime_to=clamp_mtime_to, 

252 may_steal_fs_path=may_steal_fs_path, 

253 ) 

254 

255 

256def _generate_intermediate_manifest( 

257 fs_root: FSPath, 

258 clamp_mtime_to: int, 

259) -> Iterable[TarMember]: 

260 symlinks = [] 

261 for path in fs_root.all_paths(): 

262 tar_member = _path_to_tar_member(path, clamp_mtime_to) 

263 if tar_member.path_type == PathType.SYMLINK: 

264 symlinks.append(tar_member) 

265 continue 

266 yield tar_member 

267 yield from symlinks 

268 

269 

270ST = TypeVar("ST") 

271T = TypeVar("T") 

272 

273 

274class AbstractYAMLSubStore(Generic[ST]): 

275 def __init__( 

276 self, 

277 parent_store: Any, 

278 parent_key: Optional[Union[int, str]], 

279 store: Optional[ST] = None, 

280 ) -> None: 

281 if parent_store is not None and parent_key is not None: 

282 try: 

283 from_parent_store = parent_store[parent_key] 

284 except (KeyError, IndexError): 

285 from_parent_store = None 

286 if ( 286 ↛ 291line 286 didn't jump to line 291 because the condition on line 286 was never true

287 store is not None 

288 and from_parent_store is not None 

289 and store is not parent_store 

290 ): 

291 raise ValueError( 

292 "Store is provided but is not the one already in the parent store" 

293 ) 

294 if store is None: 294 ↛ 296line 294 didn't jump to line 296 because the condition on line 294 was always true

295 store = from_parent_store 

296 self._parent_store = parent_store 

297 self._parent_key = parent_key 

298 self._is_detached = ( 

299 parent_key is None or parent_store is None or parent_key not in parent_store 

300 ) 

301 assert self._is_detached or store is not None 

302 if store is None: 

303 store = self._create_new_instance() 

304 self._store: ST = store 

305 

306 def _create_new_instance(self) -> ST: 

307 raise NotImplementedError 

308 

309 def create_definition_if_missing(self) -> None: 

310 if self._is_detached: 

311 self.create_definition() 

312 

313 def create_definition(self) -> None: 

314 if not self._is_detached: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 raise RuntimeError("Definition is already present") 

316 parent_store = self._parent_store 

317 if parent_store is None: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 raise RuntimeError( 

319 f"Definition is not attached to any parent!? ({self.__class__.__name__})" 

320 ) 

321 if isinstance(parent_store, list): 

322 assert self._parent_key is None 

323 self._parent_key = len(parent_store) 

324 self._parent_store.append(self._store) 

325 else: 

326 parent_store[self._parent_key] = self._store 

327 self._is_detached = False 

328 

329 def remove_definition(self) -> None: 

330 self._ensure_attached() 

331 del self._parent_store[self._parent_key] 

332 if isinstance(self._parent_store, list): 

333 self._parent_key = None 

334 self._is_detached = True 

335 

336 def _ensure_attached(self) -> None: 

337 if self._is_detached: 

338 raise RuntimeError("The definition has been removed!") 

339 

340 

341class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]): 

342 def _create_new_instance(self) -> List[T]: 

343 return CommentedSeq() 

344 

345 

346class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]): 

347 def _create_new_instance(self) -> Dict[str, T]: 

348 return CommentedMap() 

349 

350 

351class MutableCondition: 

352 @classmethod 

353 def arch_matches(cls, arch_filter: str) -> CommentedMap: 

354 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter}) 

355 

356 @classmethod 

357 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap: 

358 return CommentedMap( 

359 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches} 

360 ) 

361 

362 

363class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]): 

364 @classmethod 

365 def new_symlink( 

366 cls, link_path: str, link_target: str, condition: Optional[Any] 

367 ) -> "MutableYAMLSymlink": 

368 inner = { 

369 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path, 

370 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target, 

371 } 

372 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner} 

373 if condition is not None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 inner["when"] = condition 

375 return cls(None, None, store=CommentedMap(content)) 

376 

377 @property 

378 def symlink_path(self) -> str: 

379 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

380 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

381 ] 

382 

383 @symlink_path.setter 

384 def symlink_path(self, path: str) -> None: 

385 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

386 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH 

387 ] = path 

388 

389 @property 

390 def symlink_target(self) -> Optional[str]: 

391 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

392 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

393 ] 

394 

395 @symlink_target.setter 

396 def symlink_target(self, target: str) -> None: 

397 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][ 

398 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET 

399 ] = target 

400 

401 

402class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]): 

403 @classmethod 

404 def rm_conffile( 

405 cls, 

406 conffile: str, 

407 prior_to_version: Optional[str], 

408 owning_package: Optional[str], 

409 ) -> "MutableYAMLConffileManagementItem": 

410 r = cls( 

411 None, 

412 None, 

413 store=CommentedMap( 

414 { 

415 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap( 

416 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile} 

417 ) 

418 } 

419 ), 

420 ) 

421 r.prior_to_version = prior_to_version 

422 r.owning_package = owning_package 

423 return r 

424 

425 @classmethod 

426 def mv_conffile( 

427 cls, 

428 old_conffile: str, 

429 new_conffile: str, 

430 prior_to_version: Optional[str], 

431 owning_package: Optional[str], 

432 ) -> "MutableYAMLConffileManagementItem": 

433 r = cls( 

434 None, 

435 None, 

436 store=CommentedMap( 

437 { 

438 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap( 

439 { 

440 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile, 

441 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile, 

442 } 

443 ) 

444 } 

445 ), 

446 ) 

447 r.prior_to_version = prior_to_version 

448 r.owning_package = owning_package 

449 return r 

450 

451 @property 

452 def _container(self) -> Dict[str, Any]: 

453 assert len(self._store) == 1 

454 return next(iter(self._store.values())) 

455 

456 @property 

457 def command(self) -> str: 

458 assert len(self._store) == 1 

459 return next(iter(self._store)) 

460 

461 @property 

462 def obsolete_conffile(self) -> str: 

463 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

464 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] 

465 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

466 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] 

467 

468 @obsolete_conffile.setter 

469 def obsolete_conffile(self, value: str) -> None: 

470 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE: 

471 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value 

472 else: 

473 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME 

474 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value 

475 

476 @property 

477 def new_conffile(self) -> str: 

478 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

479 raise TypeError( 

480 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

481 f" This is a {self.command}" 

482 ) 

483 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] 

484 

485 @new_conffile.setter 

486 def new_conffile(self, value: str) -> None: 

487 if self.command != MK_CONFFILE_MANAGEMENT_RENAME: 

488 raise TypeError( 

489 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}." 

490 f" This is a {self.command}" 

491 ) 

492 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value 

493 

494 @property 

495 def prior_to_version(self) -> Optional[str]: 

496 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION) 

497 

498 @prior_to_version.setter 

499 def prior_to_version(self, value: Optional[str]) -> None: 

500 if value is None: 

501 try: 

502 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

503 except KeyError: 

504 pass 

505 else: 

506 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value 

507 

508 @property 

509 def owning_package(self) -> Optional[str]: 

510 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] 

511 

512 @owning_package.setter 

513 def owning_package(self, value: Optional[str]) -> None: 

514 if value is None: 

515 try: 

516 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] 

517 except KeyError: 

518 pass 

519 else: 

520 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value 

521 

522 

523class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore): 

524 def _list_store( 

525 self, key, *, create_if_absent: bool = False 

526 ) -> Optional[List[Dict[str, Any]]]: 

527 if self._is_detached or key not in self._store: 

528 if create_if_absent: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 return None 

530 self.create_definition_if_missing() 

531 self._store[key] = [] 

532 return self._store[key] 

533 

534 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None: 

535 parent_store = self._list_store(key, create_if_absent=True) 

536 assert parent_store is not None 

537 if not item._is_detached or ( 537 ↛ 540line 537 didn't jump to line 540 because the condition on line 537 was never true

538 item._parent_store is not None and item._parent_store is not parent_store 

539 ): 

540 raise RuntimeError( 

541 "Item is already attached or associated with a different container" 

542 ) 

543 item._parent_store = parent_store 

544 item.create_definition() 

545 

546 def add_symlink(self, symlink: MutableYAMLSymlink) -> None: 

547 self._insert_item(MK_TRANSFORMATIONS, symlink) 

548 

549 def symlinks(self) -> Iterable[MutableYAMLSymlink]: 

550 store = self._list_store(MK_TRANSFORMATIONS) 

551 if store is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 return 

553 for i in range(len(store)): 553 ↛ 554line 553 didn't jump to line 554 because the loop on line 553 never started

554 d = store[i] 

555 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d: 

556 yield MutableYAMLSymlink(store, i) 

557 

558 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]: 

559 store = self._list_store(MK_CONFFILE_MANAGEMENT) 

560 if store is None: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true

561 return 

562 yield from ( 

563 MutableYAMLConffileManagementItem(store, i) for i in range(len(store)) 

564 ) 

565 

566 def add_conffile_management( 

567 self, conffile_management_item: MutableYAMLConffileManagementItem 

568 ) -> None: 

569 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item) 

570 

571 

572class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore): 

573 @property 

574 def _container(self) -> Dict[str, Any]: 

575 assert len(self._store) == 1 

576 return next(iter(self._store.values())) 

577 

578 @property 

579 def into(self) -> Optional[List[str]]: 

580 v = self._container[MK_INSTALLATIONS_INSTALL_INTO] 

581 if v is None: 

582 return None 

583 if isinstance(v, str): 

584 return [v] 

585 return v 

586 

587 @into.setter 

588 def into(self, new_value: Optional[Union[str, List[str]]]) -> None: 

589 if new_value is None: 589 ↛ 593line 589 didn't jump to line 593 because the condition on line 589 was always true

590 with suppress(KeyError): 

591 del self._container[MK_INSTALLATIONS_INSTALL_INTO] 

592 return 

593 if isinstance(new_value, str): 

594 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value 

595 return 

596 new_list = CommentedSeq(new_value) 

597 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list 

598 

599 @property 

600 def when(self) -> Optional[Union[str, Mapping[str, Any]]]: 

601 return self._container[MK_CONDITION_WHEN] 

602 

603 @when.setter 

604 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None: 

605 if new_value is None: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true

606 with suppress(KeyError): 

607 del self._container[MK_CONDITION_WHEN] 

608 return 

609 if isinstance(new_value, str): 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 self._container[MK_CONDITION_WHEN] = new_value 

611 return 

612 new_map = CommentedMap(new_value) 

613 self._container[MK_CONDITION_WHEN] = new_map 

614 

615 @classmethod 

616 def install_dest( 

617 cls, 

618 sources: Union[str, List[str]], 

619 into: Optional[Union[str, List[str]]], 

620 *, 

621 dest_dir: Optional[str] = None, 

622 when: Optional[Union[str, Mapping[str, Any]]] = None, 

623 ) -> "MutableYAMLInstallRuleInstall": 

624 k = MK_INSTALLATIONS_INSTALL_SOURCES 

625 if isinstance(sources, str): 

626 k = MK_INSTALLATIONS_INSTALL_SOURCE 

627 r = MutableYAMLInstallRuleInstall( 

628 None, 

629 None, 

630 store=CommentedMap( 

631 { 

632 MK_INSTALLATIONS_INSTALL: CommentedMap( 

633 { 

634 k: sources, 

635 } 

636 ) 

637 } 

638 ), 

639 ) 

640 r.dest_dir = dest_dir 

641 r.into = into 

642 if when is not None: 

643 r.when = when 

644 return r 

645 

646 @classmethod 

647 def multi_dest_install( 

648 cls, 

649 sources: Union[str, List[str]], 

650 dest_dirs: Sequence[str], 

651 into: Optional[Union[str, List[str]]], 

652 *, 

653 when: Optional[Union[str, Mapping[str, Any]]] = None, 

654 ) -> "MutableYAMLInstallRuleInstall": 

655 k = MK_INSTALLATIONS_INSTALL_SOURCES 

656 if isinstance(sources, str): 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true

657 k = MK_INSTALLATIONS_INSTALL_SOURCE 

658 r = MutableYAMLInstallRuleInstall( 

659 None, 

660 None, 

661 store=CommentedMap( 

662 { 

663 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap( 

664 { 

665 k: sources, 

666 "dest-dirs": dest_dirs, 

667 } 

668 ) 

669 } 

670 ), 

671 ) 

672 r.into = into 

673 if when is not None: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 r.when = when 

675 return r 

676 

677 @classmethod 

678 def install_as( 

679 cls, 

680 source: str, 

681 install_as: str, 

682 into: Optional[Union[str, List[str]]], 

683 when: Optional[Union[str, Mapping[str, Any]]] = None, 

684 ) -> "MutableYAMLInstallRuleInstall": 

685 r = MutableYAMLInstallRuleInstall( 

686 None, 

687 None, 

688 store=CommentedMap( 

689 { 

690 MK_INSTALLATIONS_INSTALL: CommentedMap( 

691 { 

692 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

693 MK_INSTALLATIONS_INSTALL_AS: install_as, 

694 } 

695 ) 

696 } 

697 ), 

698 ) 

699 r.into = into 

700 if when is not None: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true

701 r.when = when 

702 return r 

703 

704 @classmethod 

705 def install_doc_as( 

706 cls, 

707 source: str, 

708 install_as: str, 

709 into: Optional[Union[str, List[str]]], 

710 when: Optional[Union[str, Mapping[str, Any]]] = None, 

711 ) -> "MutableYAMLInstallRuleInstall": 

712 r = MutableYAMLInstallRuleInstall( 

713 None, 

714 None, 

715 store=CommentedMap( 

716 { 

717 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

718 { 

719 MK_INSTALLATIONS_INSTALL_SOURCE: source, 

720 MK_INSTALLATIONS_INSTALL_AS: install_as, 

721 } 

722 ) 

723 } 

724 ), 

725 ) 

726 r.into = into 

727 if when is not None: 

728 r.when = when 

729 return r 

730 

731 @classmethod 

732 def install_docs( 

733 cls, 

734 sources: Union[str, List[str]], 

735 into: Optional[Union[str, List[str]]], 

736 *, 

737 dest_dir: Optional[str] = None, 

738 when: Optional[Union[str, Mapping[str, Any]]] = None, 

739 ) -> "MutableYAMLInstallRuleInstall": 

740 k = MK_INSTALLATIONS_INSTALL_SOURCES 

741 if isinstance(sources, str): 

742 k = MK_INSTALLATIONS_INSTALL_SOURCE 

743 r = MutableYAMLInstallRuleInstall( 

744 None, 

745 None, 

746 store=CommentedMap( 

747 { 

748 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap( 

749 { 

750 k: sources, 

751 } 

752 ) 

753 } 

754 ), 

755 ) 

756 r.into = into 

757 r.dest_dir = dest_dir 

758 if when is not None: 

759 r.when = when 

760 return r 

761 

762 @classmethod 

763 def install_examples( 

764 cls, 

765 sources: Union[str, List[str]], 

766 into: Optional[Union[str, List[str]]], 

767 when: Optional[Union[str, Mapping[str, Any]]] = None, 

768 ) -> "MutableYAMLInstallRuleInstallExamples": 

769 k = MK_INSTALLATIONS_INSTALL_SOURCES 

770 if isinstance(sources, str): 

771 k = MK_INSTALLATIONS_INSTALL_SOURCE 

772 r = MutableYAMLInstallRuleInstallExamples( 

773 None, 

774 None, 

775 store=CommentedMap( 

776 { 

777 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap( 

778 { 

779 k: sources, 

780 } 

781 ) 

782 } 

783 ), 

784 ) 

785 r.into = into 

786 if when is not None: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true

787 r.when = when 

788 return r 

789 

790 @classmethod 

791 def install_man( 

792 cls, 

793 sources: Union[str, List[str]], 

794 into: Optional[Union[str, List[str]]], 

795 language: Optional[str], 

796 when: Optional[Union[str, Mapping[str, Any]]] = None, 

797 ) -> "MutableYAMLInstallRuleMan": 

798 k = MK_INSTALLATIONS_INSTALL_SOURCES 

799 if isinstance(sources, str): 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true

800 k = MK_INSTALLATIONS_INSTALL_SOURCE 

801 r = MutableYAMLInstallRuleMan( 

802 None, 

803 None, 

804 store=CommentedMap( 

805 { 

806 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap( 

807 { 

808 k: sources, 

809 } 

810 ) 

811 } 

812 ), 

813 ) 

814 r.language = language 

815 r.into = into 

816 if when is not None: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true

817 r.when = when 

818 return r 

819 

820 @classmethod 

821 def discard( 

822 cls, 

823 sources: Union[str, List[str]], 

824 ) -> "MutableYAMLInstallRuleDiscard": 

825 return MutableYAMLInstallRuleDiscard( 

826 None, 

827 None, 

828 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}), 

829 ) 

830 

831 

832class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule): 

833 pass 

834 

835 

836class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule): 

837 @property 

838 def language(self) -> Optional[str]: 

839 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

840 

841 @language.setter 

842 def language(self, new_value: Optional[str]) -> None: 

843 if new_value is not None: 

844 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value 

845 return 

846 with suppress(KeyError): 

847 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] 

848 

849 

850class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule): 

851 pass 

852 

853 

854class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule): 

855 @property 

856 def sources(self) -> List[str]: 

857 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

858 if isinstance(v, str): 

859 return [v] 

860 return v 

861 

862 @sources.setter 

863 def sources(self, new_value: Union[str, List[str]]) -> None: 

864 if isinstance(new_value, str): 

865 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value 

866 return 

867 new_list = CommentedSeq(new_value) 

868 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list 

869 

870 @property 

871 def dest_dir(self) -> Optional[str]: 

872 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR) 

873 

874 @dest_dir.setter 

875 def dest_dir(self, new_value: Optional[str]) -> None: 

876 if new_value is not None and self.dest_as is not None: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 raise ValueError( 

878 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

879 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

880 ) 

881 if new_value is not None: 

882 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value 

883 else: 

884 with suppress(KeyError): 

885 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] 

886 

887 @property 

888 def dest_as(self) -> Optional[str]: 

889 return self._container.get(MK_INSTALLATIONS_INSTALL_AS) 

890 

891 @dest_as.setter 

892 def dest_as(self, new_value: Optional[str]) -> None: 

893 if new_value is not None: 

894 if self.dest_dir is not None: 

895 raise ValueError( 

896 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and' 

897 f' "{MK_INSTALLATIONS_INSTALL_AS}"' 

898 ) 

899 

900 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES] 

901 if isinstance(sources, list): 

902 if len(sources) != 1: 

903 raise ValueError( 

904 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when' 

905 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item' 

906 ) 

907 self.sources = sources[0] 

908 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value 

909 else: 

910 with suppress(KeyError): 

911 del self._container[MK_INSTALLATIONS_INSTALL_AS] 

912 

913 

914class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]): 

915 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None: 

916 parent_store = self._store 

917 if not install_rule._is_detached or ( 917 ↛ 921line 917 didn't jump to line 921 because the condition on line 917 was never true

918 install_rule._parent_store is not None 

919 and install_rule._parent_store is not parent_store 

920 ): 

921 raise RuntimeError( 

922 "Item is already attached or associated with a different container" 

923 ) 

924 self.create_definition_if_missing() 

925 install_rule._parent_store = parent_store 

926 install_rule.create_definition() 

927 

928 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None: 

929 parent_store = self._store 

930 for install_rule in install_rules: 

931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true

932 install_rule._parent_store is not None 

933 and install_rule._parent_store is not parent_store 

934 ): 

935 raise RuntimeError( 

936 "Item is already attached or associated with a different container" 

937 ) 

938 self.create_definition_if_missing() 

939 install_rule._parent_store = parent_store 

940 install_rule.create_definition() 

941 

942 

943class MutableYAMLManifestVariables(AbstractYAMLDictSubStore): 

944 @property 

945 def variables(self) -> Dict[str, Any]: 

946 return self._store 

947 

948 def __setitem__(self, key: str, value: Any) -> None: 

949 self._store[key] = value 

950 self.create_definition_if_missing() 

951 

952 

953class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore): 

954 def manifest_variables( 

955 self, *, create_if_absent: bool = True 

956 ) -> MutableYAMLManifestVariables: 

957 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES) 

958 if create_if_absent: 958 ↛ 959line 958 didn't jump to line 959 because the condition on line 958 was never true

959 d.create_definition_if_missing() 

960 return d 

961 

962 

963class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]): 

964 def append(self, rule: str) -> None: 

965 self.create_definition_if_missing() 

966 self._store.append(rule) 

967 

968 def __len__(self) -> int: 

969 return len(self._store) 

970 

971 def extend(self, rules: Iterable[str]) -> None: 

972 it = iter(rules) 

973 try: 

974 first_rule = next(it) 

975 except StopIteration: 

976 return 

977 self.create_definition_if_missing() 

978 self._store.append(first_rule) 

979 self._store.extend(it) 

980 

981 

982class MutableYAMLManifest: 

983 def __init__(self, store: Any) -> None: 

984 self._store = store 

985 

986 @classmethod 

987 def empty_manifest(cls) -> "MutableYAMLManifest": 

988 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION})) 

989 

990 @property 

991 def manifest_version(self) -> str: 

992 return self._store[MK_MANIFEST_VERSION] 

993 

994 @manifest_version.setter 

995 def manifest_version(self, version: str) -> None: 

996 if version not in SUPPORTED_MANIFEST_VERSIONS: 

997 raise ValueError("Unsupported version") 

998 self._store[MK_MANIFEST_VERSION] = version 

999 

1000 def remove_during_clean( 

1001 self, 

1002 *, 

1003 create_if_absent: bool = True, 

1004 ) -> MutableYAMLRemoveDuringCleanDefinitions: 

1005 d = MutableYAMLRemoveDuringCleanDefinitions( 

1006 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN 

1007 ) 

1008 if create_if_absent: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 d.create_definition_if_missing() 

1010 return d 

1011 

1012 def installations( 

1013 self, 

1014 *, 

1015 create_if_absent: bool = True, 

1016 ) -> MutableYAMLInstallationsDefinition: 

1017 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS) 

1018 if create_if_absent: 1018 ↛ 1019line 1018 didn't jump to line 1019 because the condition on line 1018 was never true

1019 d.create_definition_if_missing() 

1020 return d 

1021 

1022 def manifest_definitions( 

1023 self, 

1024 *, 

1025 create_if_absent: bool = True, 

1026 ) -> MutableYAMLManifestDefinitions: 

1027 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS) 

1028 if create_if_absent: 1028 ↛ 1029line 1028 didn't jump to line 1029 because the condition on line 1028 was never true

1029 d.create_definition_if_missing() 

1030 return d 

1031 

1032 def package( 

1033 self, name: str, *, create_if_absent: bool = True 

1034 ) -> MutableYAMLPackageDefinition: 

1035 if MK_PACKAGES not in self._store: 1035 ↛ 1037line 1035 didn't jump to line 1037 because the condition on line 1035 was always true

1036 self._store[MK_PACKAGES] = CommentedMap() 

1037 packages_store = self._store[MK_PACKAGES] 

1038 package = packages_store.get(name) 

1039 if package is None: 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true

1040 if not create_if_absent: 1040 ↛ 1041line 1040 didn't jump to line 1041 because the condition on line 1040 was never true

1041 raise KeyError(name) 

1042 assert packages_store is not None 

1043 d = MutableYAMLPackageDefinition(packages_store, name) 

1044 d.create_definition() 

1045 else: 

1046 d = MutableYAMLPackageDefinition(packages_store, name) 

1047 return d 

1048 

1049 def write_to(self, fd) -> None: 

1050 MANIFEST_YAML.dump(self._store, fd) 

1051 

1052 

1053def _describe_missing_path(entry: VirtualPath) -> str: 

1054 if entry.is_dir: 

1055 return f"{entry.fs_path}/ (empty directory; possible integration point)" 

1056 if entry.is_symlink: 

1057 target = os.readlink(entry.fs_path) 

1058 return f"{entry.fs_path} (symlink; links to {target})" 

1059 if entry.is_file: 

1060 return f"{entry.fs_path} (file)" 

1061 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)" 

1062 

1063 

1064def _detect_missing_installations( 

1065 path_matcher: SourcePathMatcher, 

1066 search_dir: VirtualPath, 

1067) -> None: 

1068 if not search_dir.is_dir: 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true

1069 return 

1070 missing = list(path_matcher.detect_missing(search_dir)) 

1071 if not missing: 

1072 return 

1073 

1074 excl = textwrap.dedent( 

1075 """\ 

1076 - discard: "*" 

1077 """ 

1078 ) 

1079 

1080 raise PathNotCoveredByInstallRulesError( 

1081 "Please review the list and add either install rules or exclusions to `installations` in" 

1082 " debian/debputy.manifest. If you do not need any of these paths, add the following to the" 

1083 f" end of your 'installations`:\n\n{excl}\n", 

1084 missing, 

1085 search_dir, 

1086 ) 

1087 

1088 

1089def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None: 

1090 used_discard_rules = path_matcher.used_auto_discard_rules 

1091 # Discard rules can match and then be overridden. In that case, they appear 

1092 # but have 0 matches. 

1093 if not sum((len(v) for v in used_discard_rules.values()), 0): 

1094 return 

1095 _info("The following automatic discard rules were triggered:") 

1096 example_path: Optional[str] = None 

1097 for rule in sorted(used_discard_rules): 

1098 for fs_path in sorted(used_discard_rules[rule]): 

1099 if example_path is None: 1099 ↛ 1101line 1099 didn't jump to line 1101 because the condition on line 1099 was always true

1100 example_path = fs_path 

1101 _info(f" * {rule} -> {fs_path}") 

1102 assert example_path is not None 

1103 _info("") 

1104 _info( 

1105 "Note that some of these may have been overruled. The overrule detection logic is not" 

1106 ) 

1107 _info("100% reliable.") 

1108 _info("") 

1109 _info( 

1110 "You can overrule an automatic discard rule by explicitly listing the path. As an example:" 

1111 ) 

1112 _info(" installations:") 

1113 _info(" - install:") 

1114 _info(f" source: {example_path}") 

1115 

1116 

1117def _install_everything_from_source_dir_if_present( 

1118 dctrl_bin: BinaryPackage, 

1119 substitution: Substitution, 

1120 path_matcher: SourcePathMatcher, 

1121 install_rule_context: InstallRuleContext, 

1122 source_condition_context: ConditionContext, 

1123 source_dir: VirtualPath, 

1124 *, 

1125 into_dir: Optional[VirtualPath] = None, 

1126) -> None: 

1127 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"] 

1128 pkg_set = frozenset([dctrl_bin]) 

1129 install_rule = run_in_context_of_plugin( 

1130 "debputy", 

1131 InstallRule.install_dest, 

1132 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)], 

1133 None, 

1134 pkg_set, 

1135 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}", 

1136 None, 

1137 ) 

1138 pkg_search_dir: Tuple[SearchDir] = ( 

1139 SearchDir( 

1140 source_dir, 

1141 pkg_set, 

1142 ), 

1143 ) 

1144 replacements = { 

1145 "search_dirs": pkg_search_dir, 

1146 } 

1147 if into_dir is not None: 1147 ↛ 1148line 1147 didn't jump to line 1148 because the condition on line 1147 was never true

1148 binary_package_contexts = dict(install_rule_context.binary_package_contexts) 

1149 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir) 

1150 binary_package_contexts[dctrl_bin.name] = updated 

1151 replacements["binary_package_contexts"] = binary_package_contexts 

1152 

1153 fake_install_rule_context = install_rule_context.replace(**replacements) 

1154 try: 

1155 install_rule.perform_install( 

1156 path_matcher, 

1157 fake_install_rule_context, 

1158 source_condition_context, 

1159 ) 

1160 except ( 

1161 NoMatchForInstallPatternError, 

1162 PathAlreadyInstalledOrDiscardedError, 

1163 ): 

1164 # Empty directory or everything excluded by default; ignore the error 

1165 pass 

1166 

1167 

1168def _add_build_install_dirs_to_per_package_search_dirs( 

1169 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]], 

1170 per_package_search_dirs: Dict[BinaryPackage, List[VirtualPath]], 

1171 as_path: Callable[[str], VirtualPath], 

1172) -> None: 

1173 seen_pp_search_dirs: Set[Tuple[BinaryPackage, str]] = set() 

1174 for dest_dir, for_packages in build_system_install_dirs: 

1175 dest_path = as_path(dest_dir) 

1176 for pkg in for_packages: 

1177 seen_key = (pkg, dest_dir) 

1178 if seen_key in seen_pp_search_dirs: 

1179 continue 

1180 seen_pp_search_dirs.add(seen_key) 

1181 if pkg not in per_package_search_dirs: 

1182 per_package_search_dirs[pkg] = [dest_path] 

1183 else: 

1184 per_package_search_dirs[pkg].append(dest_path) 

1185 

1186 

1187class HighLevelManifest: 

1188 def __init__( 

1189 self, 

1190 manifest_path: str, 

1191 mutable_manifest: Optional[MutableYAMLManifest], 

1192 remove_during_clean_rules: List[FileSystemMatchRule], 

1193 install_rules: Optional[List[InstallRule]], 

1194 source_package: SourcePackage, 

1195 binary_packages: Mapping[str, BinaryPackage], 

1196 substitution: Substitution, 

1197 package_transformations: Mapping[str, PackageTransformationDefinition], 

1198 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

1199 dpkg_arch_query_table: DpkgArchTable, 

1200 build_env: DebBuildOptionsAndProfiles, 

1201 build_environments: BuildEnvironments, 

1202 build_rules: Optional[List[BuildRule]], 

1203 plugin_provided_feature_set: PluginProvidedFeatureSet, 

1204 debian_dir: VirtualPath, 

1205 ) -> None: 

1206 self.manifest_path = manifest_path 

1207 self.mutable_manifest = mutable_manifest 

1208 self._remove_during_clean_rules: List[FileSystemMatchRule] = ( 

1209 remove_during_clean_rules 

1210 ) 

1211 self._install_rules = install_rules 

1212 self._source_package = source_package 

1213 self._binary_packages = binary_packages 

1214 self.substitution = substitution 

1215 self.package_transformations = package_transformations 

1216 self._dpkg_architecture_variables = dpkg_architecture_variables 

1217 self._dpkg_arch_query_table = dpkg_arch_query_table 

1218 self._build_env = build_env 

1219 self._used_for: Set[str] = set() 

1220 self.build_environments = build_environments 

1221 self.build_rules = build_rules 

1222 self._plugin_provided_feature_set = plugin_provided_feature_set 

1223 self._debian_dir = debian_dir 

1224 self._source_condition_context = ConditionContext( 

1225 binary_package=None, 

1226 substitution=self.substitution, 

1227 deb_options_and_profiles=self._build_env, 

1228 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1229 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1230 ) 

1231 

1232 def source_version(self, include_binnmu_version: bool = True) -> str: 

1233 # TODO: There should an easier way to determine the source version; really. 

1234 version_var = "{{DEB_VERSION}}" 

1235 if not include_binnmu_version: 

1236 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}" 

1237 try: 

1238 return self.substitution.substitute( 

1239 version_var, "internal (resolve version)" 

1240 ) 

1241 except DebputySubstitutionError as e: 

1242 raise AssertionError(f"Could not resolve {version_var}") from e 

1243 

1244 @property 

1245 def source_condition_context(self) -> ConditionContext: 

1246 return self._source_condition_context 

1247 

1248 @property 

1249 def debian_dir(self) -> VirtualPath: 

1250 return self._debian_dir 

1251 

1252 @property 

1253 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

1254 return self._dpkg_architecture_variables 

1255 

1256 @property 

1257 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

1258 return self._build_env 

1259 

1260 @property 

1261 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet: 

1262 return self._plugin_provided_feature_set 

1263 

1264 @property 

1265 def remove_during_clean_rules(self) -> List[FileSystemMatchRule]: 

1266 return self._remove_during_clean_rules 

1267 

1268 @property 

1269 def active_packages(self) -> Iterable[BinaryPackage]: 

1270 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on) 

1271 

1272 @property 

1273 def all_packages(self) -> Iterable[BinaryPackage]: 

1274 yield from self._binary_packages.values() 

1275 

1276 def package_state_for(self, package: str) -> PackageTransformationDefinition: 

1277 return self.package_transformations[package] 

1278 

1279 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage: 

1280 name = package.name 

1281 # If it is not a -doc package, then docs should be installed 

1282 # under its own package name. 

1283 if not name.endswith("-doc"): 1283 ↛ 1285line 1283 didn't jump to line 1285 because the condition on line 1283 was always true

1284 return package 

1285 name = name[:-4] 

1286 main_package = self._binary_packages.get(name) 

1287 if main_package: 

1288 return main_package 

1289 if name.startswith("lib"): 

1290 dev_pkg = self._binary_packages.get(f"{name}-dev") 

1291 if dev_pkg: 

1292 return dev_pkg 

1293 

1294 # If we found no better match; default to the doc package itself. 

1295 return package 

1296 

1297 def perform_installations( 

1298 self, 

1299 integration_mode: DebputyIntegrationMode, 

1300 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]], 

1301 *, 

1302 install_request_context: Optional[InstallSearchDirContext] = None, 

1303 ) -> PackageDataTable: 

1304 package_data_dict = {} 

1305 package_data_table = PackageDataTable(package_data_dict) 

1306 enable_manifest_installation_feature = ( 

1307 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR 

1308 ) 

1309 

1310 if build_system_install_dirs: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true

1311 if integration_mode != INTEGRATION_MODE_FULL: 

1312 raise ValueError( 

1313 "The build_system_install_dirs parameter can only be used in full integration mode" 

1314 ) 

1315 if install_request_context: 

1316 raise ValueError( 

1317 "The build_system_install_dirs parameter cannot be used with install_request_context" 

1318 " (not implemented)" 

1319 ) 

1320 

1321 if install_request_context is None: 1321 ↛ 1323line 1321 didn't jump to line 1323 because the condition on line 1321 was never true

1322 

1323 @functools.lru_cache(None) 

1324 def _as_path(fs_path: str) -> VirtualPath: 

1325 return FSROOverlay.create_root_dir(".", fs_path) 

1326 

1327 dtmp_dir = _as_path("debian/tmp") 

1328 source_root_dir = _as_path(".") 

1329 into = frozenset(self._binary_packages.values()) 

1330 per_package_search_dirs = { 

1331 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs] 

1332 for t in self.package_transformations.values() 

1333 if t.search_dirs is not None 

1334 } 

1335 

1336 if integration_mode == INTEGRATION_MODE_FULL: 

1337 # In this mode, we have no default search dir. Everything ends up being 

1338 # per-package instead (since that is easier logic-wise). 

1339 # 

1340 # Even dtmp_dir is omitted here (since it is not universally applicable). 

1341 # Note we still initialize dtmp_dir, since it affects a later guard. 

1342 default_search_dirs = [] 

1343 _add_build_install_dirs_to_per_package_search_dirs( 

1344 build_system_install_dirs, 

1345 per_package_search_dirs, 

1346 _as_path, 

1347 ) 

1348 

1349 # We can end here with per_package_search_dirs having no search dirs for any package 

1350 # (single binary, where everything is installed into d/<pkg> is the most common case). 

1351 # 

1352 # This is not a problem in itself as the installation rules can still apply to the 

1353 # source root and there should be no reason to install something from d/<pkg> into 

1354 # d/<another-pkg> 

1355 else: 

1356 default_search_dirs = [dtmp_dir] 

1357 

1358 search_dirs = _determine_search_dir_order( 

1359 per_package_search_dirs, 

1360 into, 

1361 default_search_dirs, 

1362 source_root_dir, 

1363 ) 

1364 check_for_uninstalled_dirs = tuple( 

1365 s.search_dir 

1366 for s in search_dirs 

1367 if s.search_dir.fs_path != source_root_dir.fs_path 

1368 ) 

1369 if enable_manifest_installation_feature: 

1370 _present_installation_dirs( 

1371 search_dirs, check_for_uninstalled_dirs, into 

1372 ) 

1373 else: 

1374 dtmp_dir = None 

1375 search_dirs = install_request_context.search_dirs 

1376 into = frozenset(self._binary_packages.values()) 

1377 seen: Set[BinaryPackage] = set() 

1378 for search_dir in search_dirs: 

1379 seen.update(search_dir.applies_to) 

1380 

1381 missing = into - seen 

1382 if missing: 1382 ↛ 1383line 1382 didn't jump to line 1383 because the condition on line 1382 was never true

1383 names = ", ".join(p.name for p in missing) 

1384 raise ValueError( 

1385 f"The following package(s) had no search dirs: {names}." 

1386 " (Generally, the source root would be applicable to all packages)" 

1387 ) 

1388 extra_names = seen - into 

1389 if extra_names: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the condition on line 1389 was never true

1390 names = ", ".join(p.name for p in extra_names) 

1391 raise ValueError( 

1392 f"The install_request_context referenced the following unknown package(s): {names}" 

1393 ) 

1394 

1395 check_for_uninstalled_dirs = ( 

1396 install_request_context.check_for_uninstalled_dirs 

1397 ) 

1398 

1399 install_rule_context = InstallRuleContext(search_dirs) 

1400 

1401 if ( 1401 ↛ 1408line 1401 didn't jump to line 1408 because the condition on line 1401 was never true

1402 enable_manifest_installation_feature 

1403 and self._install_rules is None 

1404 # TODO: Should we also do this for full mode when build systems provided search dirs? 

1405 and dtmp_dir is not None 

1406 and os.path.isdir(dtmp_dir.fs_path) 

1407 ): 

1408 msg = ( 

1409 "The build system appears to have provided the output of upstream build system's" 

1410 " install in debian/tmp. However, these are no provisions for debputy to install" 

1411 " any of that into any of the debian packages listed in debian/control." 

1412 " To avoid accidentally creating empty packages, debputy will insist that you " 

1413 " explicitly define an empty installation definition if you did not want to " 

1414 " install any of those files even though they have been provided." 

1415 ' Example: "installations: []"' 

1416 ) 

1417 _error(msg) 

1418 elif ( 1418 ↛ 1421line 1418 didn't jump to line 1421 because the condition on line 1418 was never true

1419 not enable_manifest_installation_feature and self._install_rules is not None 

1420 ): 

1421 _error( 

1422 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode." 

1423 f" Please remove or comment out the `installations` keyword." 

1424 ) 

1425 

1426 for dctrl_bin in self.all_packages: 

1427 package = dctrl_bin.name 

1428 doc_main_package = self._detect_doc_main_package_for(dctrl_bin) 

1429 

1430 install_rule_context[package] = BinaryPackageInstallRuleContext( 

1431 dctrl_bin, 

1432 FSRootDir(), 

1433 doc_main_package, 

1434 ) 

1435 

1436 if enable_manifest_installation_feature: 1436 ↛ 1441line 1436 didn't jump to line 1441 because the condition on line 1436 was always true

1437 discard_rules = list( 

1438 self.plugin_provided_feature_set.auto_discard_rules.values() 

1439 ) 

1440 else: 

1441 discard_rules = [ 

1442 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"] 

1443 ] 

1444 path_matcher = SourcePathMatcher(discard_rules) 

1445 

1446 source_condition_context = self._source_condition_context 

1447 

1448 for dctrl_bin in self.active_packages: 

1449 package = dctrl_bin.name 

1450 if install_request_context: 1450 ↛ 1455line 1450 didn't jump to line 1455 because the condition on line 1450 was always true

1451 build_system_staging_dir = install_request_context.debian_pkg_dirs.get( 

1452 package 

1453 ) 

1454 else: 

1455 build_system_staging_dir_fs_path = os.path.join("debian", package) 

1456 if os.path.isdir(build_system_staging_dir_fs_path): 

1457 build_system_staging_dir = FSROOverlay.create_root_dir( 

1458 ".", 

1459 build_system_staging_dir_fs_path, 

1460 ) 

1461 else: 

1462 build_system_staging_dir = None 

1463 

1464 if build_system_staging_dir is not None: 

1465 _install_everything_from_source_dir_if_present( 

1466 dctrl_bin, 

1467 self.substitution, 

1468 path_matcher, 

1469 install_rule_context, 

1470 source_condition_context, 

1471 build_system_staging_dir, 

1472 ) 

1473 

1474 if self._install_rules: 

1475 # FIXME: Check that every install rule remains used after transformations have run. 

1476 # What we want to check is transformations do not exclude everything from an install 

1477 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string 

1478 # match. 

1479 for install_rule in self._install_rules: 

1480 install_rule.perform_install( 

1481 path_matcher, 

1482 install_rule_context, 

1483 source_condition_context, 

1484 ) 

1485 

1486 if enable_manifest_installation_feature: 1486 ↛ 1490line 1486 didn't jump to line 1490 because the condition on line 1486 was always true

1487 for search_dir in check_for_uninstalled_dirs: 

1488 _detect_missing_installations(path_matcher, search_dir) 

1489 

1490 for dctrl_bin in self.all_packages: 

1491 package = dctrl_bin.name 

1492 binary_install_rule_context = install_rule_context[package] 

1493 build_system_pkg_staging_dir = os.path.join("debian", package) 

1494 fs_root = binary_install_rule_context.fs_root 

1495 

1496 context = self.package_transformations[package] 

1497 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature: 

1498 for special_install_rule in context.install_rules: 1498 ↛ 1499line 1498 didn't jump to line 1499 because the loop on line 1498 never started

1499 special_install_rule.perform_install( 

1500 path_matcher, 

1501 install_rule_context, 

1502 source_condition_context, 

1503 ) 

1504 

1505 if dctrl_bin.should_be_acted_on: 

1506 self.apply_fs_transformations(package, fs_root) 

1507 substvars_file = f"debian/{package}.substvars" 

1508 substvars = FlushableSubstvars.load_from_path( 

1509 substvars_file, missing_ok=True 

1510 ) 

1511 # We do not want to touch the substvars file (non-clean rebuild contamination) 

1512 substvars.substvars_path = None 

1513 else: 

1514 substvars = FlushableSubstvars() 

1515 

1516 udeb_package = self._binary_packages.get(f"{package}-udeb") 

1517 if udeb_package and not udeb_package.is_udeb: 1517 ↛ 1518line 1517 didn't jump to line 1518 because the condition on line 1517 was never true

1518 udeb_package = None 

1519 

1520 package_metadata_context = PackageProcessingContextProvider( 

1521 self, 

1522 dctrl_bin, 

1523 udeb_package, 

1524 package_data_table, 

1525 # FIXME: source_package 

1526 ) 

1527 

1528 ctrl_creator = BinaryCtrlAccessorProviderCreator( 

1529 package_metadata_context, 

1530 substvars, 

1531 context.maintscript_snippets, 

1532 context.substitution, 

1533 ) 

1534 

1535 if not enable_manifest_installation_feature: 1535 ↛ 1536line 1535 didn't jump to line 1536 because the condition on line 1535 was never true

1536 assert_no_dbgsym_migration(dctrl_bin) 

1537 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin) 

1538 dh_dbgsym_root_path = FSROOverlay.create_root_dir( 

1539 "", 

1540 dh_dbgsym_root_fs, 

1541 ) 

1542 dbgsym_root_fs = FSRootDir() 

1543 _install_everything_from_source_dir_if_present( 

1544 dctrl_bin, 

1545 self.substitution, 

1546 path_matcher, 

1547 install_rule_context, 

1548 source_condition_context, 

1549 dh_dbgsym_root_path, 

1550 into_dir=dbgsym_root_fs, 

1551 ) 

1552 dbgsym_build_ids = read_dbgsym_file(dctrl_bin) 

1553 dbgsym_info = DbgsymInfo( 

1554 dctrl_bin, 

1555 dbgsym_root_fs, 

1556 os.path.join(dh_dbgsym_root_fs, "DEBIAN"), 

1557 dbgsym_build_ids, 

1558 # TODO: Provide manifest feature to support this. 

1559 False, 

1560 ) 

1561 else: 

1562 dbgsym_info = DbgsymInfo( 

1563 dctrl_bin, 

1564 FSRootDir(), 

1565 None, 

1566 [], 

1567 False, 

1568 ) 

1569 

1570 package_data_dict[package] = BinaryPackageData( 

1571 self._source_package, 

1572 dctrl_bin, 

1573 build_system_pkg_staging_dir, 

1574 fs_root, 

1575 substvars, 

1576 package_metadata_context, 

1577 ctrl_creator, 

1578 dbgsym_info, 

1579 ) 

1580 

1581 if enable_manifest_installation_feature: 1581 ↛ 1584line 1581 didn't jump to line 1584 because the condition on line 1581 was always true

1582 _list_automatic_discard_rules(path_matcher) 

1583 

1584 return package_data_table 

1585 

1586 def condition_context( 

1587 self, binary_package: Optional[Union[BinaryPackage, str]] 

1588 ) -> ConditionContext: 

1589 if binary_package is None: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true

1590 return self._source_condition_context 

1591 if not isinstance(binary_package, str): 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true

1592 binary_package = binary_package.name 

1593 

1594 package_transformation = self.package_transformations[binary_package] 

1595 return self._source_condition_context.replace( 

1596 binary_package=package_transformation.binary_package, 

1597 substitution=package_transformation.substitution, 

1598 ) 

1599 

1600 def apply_fs_transformations( 

1601 self, 

1602 package: str, 

1603 fs_root: FSPath, 

1604 ) -> None: 

1605 if package in self._used_for: 1605 ↛ 1606line 1605 didn't jump to line 1606 because the condition on line 1605 was never true

1606 raise ValueError( 

1607 f"data.tar contents for {package} has already been finalized!?" 

1608 ) 

1609 if package not in self.package_transformations: 1609 ↛ 1610line 1609 didn't jump to line 1610 because the condition on line 1609 was never true

1610 raise ValueError( 

1611 f'The package "{package}" was not relevant for the manifest!?' 

1612 ) 

1613 package_transformation = self.package_transformations[package] 

1614 condition_context = ConditionContext( 

1615 binary_package=package_transformation.binary_package, 

1616 substitution=package_transformation.substitution, 

1617 deb_options_and_profiles=self._build_env, 

1618 dpkg_architecture_variables=self._dpkg_architecture_variables, 

1619 dpkg_arch_query_table=self._dpkg_arch_query_table, 

1620 ) 

1621 norm_rules = list( 

1622 builtin_mode_normalization_rules( 

1623 self._dpkg_architecture_variables, 

1624 package_transformation.binary_package, 

1625 package_transformation.substitution, 

1626 ) 

1627 ) 

1628 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules) 

1629 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context) 

1630 for transformation in package_transformation.transformations: 

1631 transformation.run_transform_file_system(fs_root, condition_context) 

1632 interpreter_normalization = NormalizeShebangLineTransformation() 

1633 interpreter_normalization.transform_file_system(fs_root, condition_context) 

1634 

1635 def finalize_data_tar_contents( 

1636 self, 

1637 package: str, 

1638 fs_root: FSPath, 

1639 clamp_mtime_to: int, 

1640 ) -> IntermediateManifest: 

1641 if package in self._used_for: 1641 ↛ 1642line 1641 didn't jump to line 1642 because the condition on line 1641 was never true

1642 raise ValueError( 

1643 f"data.tar contents for {package} has already been finalized!?" 

1644 ) 

1645 if package not in self.package_transformations: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true

1646 raise ValueError( 

1647 f'The package "{package}" was not relevant for the manifest!?' 

1648 ) 

1649 self._used_for.add(package) 

1650 

1651 # At this point, there so be no further mutations to the file system (because the will not 

1652 # be present in the intermediate manifest) 

1653 cast("FSRootDir", fs_root).is_read_write = False 

1654 

1655 intermediate_manifest = list( 

1656 _generate_intermediate_manifest( 

1657 fs_root, 

1658 clamp_mtime_to, 

1659 ) 

1660 ) 

1661 return intermediate_manifest 

1662 

1663 def apply_to_binary_staging_directory( 

1664 self, 

1665 package: str, 

1666 fs_root: FSPath, 

1667 clamp_mtime_to: int, 

1668 ) -> IntermediateManifest: 

1669 self.apply_fs_transformations(package, fs_root) 

1670 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to) 

1671 

1672 

1673@dataclasses.dataclass(slots=True) 

1674class SearchDirOrderState: 

1675 search_dir: VirtualPath 

1676 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field( 

1677 default_factory=set 

1678 ) 

1679 after: Set[str] = dataclasses.field(default_factory=set) 

1680 

1681 

1682def _present_installation_dirs( 

1683 search_dirs: Sequence[SearchDir], 

1684 checked_missing_dirs: Sequence[VirtualPath], 

1685 all_pkgs: FrozenSet[BinaryPackage], 

1686) -> None: 

1687 _info("The following directories are considered search dirs (in order):") 

1688 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1) 

1689 for search_dir in search_dirs: 

1690 applies_to = "" 

1691 if search_dir.applies_to < all_pkgs: 

1692 names = ", ".join(p.name for p in search_dir.applies_to) 

1693 applies_to = f" [only applicable to: {names}]" 

1694 remark = "" 

1695 if not os.path.isdir(search_dir.search_dir.fs_path): 

1696 remark = " (skipped; absent)" 

1697 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}") 

1698 

1699 if checked_missing_dirs: 

1700 _info('The following directories are considered for "not-installed" paths;') 

1701 for d in checked_missing_dirs: 

1702 remark = "" 

1703 if not os.path.isdir(d.fs_path): 

1704 remark = " (skipped; absent)" 

1705 _info(f" * {d.fs_path:{max_len}}{remark}") 

1706 

1707 

1708def _determine_search_dir_order( 

1709 requested: Mapping[BinaryPackage, List[VirtualPath]], 

1710 all_pkgs: FrozenSet[BinaryPackage], 

1711 default_search_dirs: List[VirtualPath], 

1712 source_root: VirtualPath, 

1713) -> Sequence[SearchDir]: 

1714 search_dir_table = {} 

1715 assert requested.keys() <= all_pkgs 

1716 for pkg in all_pkgs: 

1717 paths = requested.get(pkg, default_search_dirs) 

1718 previous_search_dir: Optional[SearchDirOrderState] = None 

1719 for path in paths: 

1720 try: 

1721 search_dir_state = search_dir_table[path.fs_path] 

1722 except KeyError: 

1723 search_dir_state = SearchDirOrderState(path) 

1724 search_dir_table[path.fs_path] = search_dir_state 

1725 search_dir_state.applies_to.add(pkg) 

1726 if previous_search_dir is not None: 

1727 search_dir_state.after.add(previous_search_dir.search_dir.fs_path) 

1728 previous_search_dir = search_dir_state 

1729 

1730 search_dirs_in_order = [] 

1731 released = set() 

1732 remaining = set() 

1733 for search_dir_state in search_dir_table.values(): 

1734 if not (search_dir_state.after <= released): 

1735 remaining.add(search_dir_state.search_dir.fs_path) 

1736 continue 

1737 search_dirs_in_order.append(search_dir_state) 

1738 released.add(search_dir_state.search_dir.fs_path) 

1739 

1740 while remaining: 

1741 current_released = len(released) 

1742 for fs_path in remaining: 

1743 search_dir_state = search_dir_table[fs_path] 

1744 if not search_dir_state.after.issubset(released): 

1745 remaining.add(search_dir_state.search_dir.fs_path) 

1746 continue 

1747 search_dirs_in_order.append(search_dir_state) 

1748 released.add(search_dir_state.search_dir.fs_path) 

1749 

1750 if current_released == len(released): 

1751 names = ", ".join(remaining) 

1752 _error( 

1753 f"There is a circular dependency (somewhere) between the search dirs: {names}." 

1754 " Note that the search directories across all packages have to be ordered (and the" 

1755 " source root should generally be last)" 

1756 ) 

1757 remaining -= released 

1758 

1759 search_dirs_in_order.append( 

1760 SearchDirOrderState( 

1761 source_root, 

1762 all_pkgs, 

1763 ) 

1764 ) 

1765 

1766 return tuple( 

1767 # Avoid duplicating all_pkgs 

1768 SearchDir( 

1769 s.search_dir, 

1770 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs, 

1771 ) 

1772 for s in search_dirs_in_order 

1773 )