Coverage for src/debputy/installations.py: 68%

515 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import collections 

2import dataclasses 

3import os.path 

4import re 

5from enum import IntEnum 

6from typing import ( 

7 List, 

8 Dict, 

9 FrozenSet, 

10 Callable, 

11 Union, 

12 Iterator, 

13 Tuple, 

14 Set, 

15 Sequence, 

16 Optional, 

17 Iterable, 

18 TYPE_CHECKING, 

19 cast, 

20 Any, 

21 Mapping, 

22) 

23 

24from debputy.exceptions import DebputyRuntimeError 

25from debputy.filesystem_scan import FSPath 

26from debputy.manifest_conditions import ( 

27 ConditionContext, 

28 ManifestCondition, 

29 _BUILD_DOCS_BDO, 

30) 

31from debputy.manifest_parser.base_types import ( 

32 FileSystemMatchRule, 

33 FileSystemExactMatchRule, 

34) 

35from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

36from debputy.packages import BinaryPackage 

37from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING 

38from debputy.plugin.plugin_state import run_in_context_of_plugin 

39from debputy.substitution import Substitution 

40from debputy.util import _error, _warn 

41 

42if TYPE_CHECKING: 

43 from debputy.packager_provided_files import PackagerProvidedFile 

44 from debputy.plugin.api import VirtualPath 

45 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule 

46 

47 

48_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?') 

49_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)") 

50_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$") 

51_MAN_REAL_SECTION = re.compile(r"^(\d+)") 

52_MAN_INST_BASENAME = re.compile(r"[.][^.]+$") 

53MAN_GUESS_LANG_FROM_PATH = re.compile( 

54 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/" 

55) 

56MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)") 

57 

58 

59class InstallRuleError(DebputyRuntimeError): 

60 pass 

61 

62 

63class PathAlreadyInstalledOrDiscardedError(InstallRuleError): 

64 @property 

65 def path(self) -> str: 

66 return cast("str", self.args[0]) 

67 

68 @property 

69 def into(self) -> FrozenSet[BinaryPackage]: 

70 return cast("FrozenSet[BinaryPackage]", self.args[1]) 

71 

72 @property 

73 def definition_source(self) -> str: 

74 return cast("str", self.args[2]) 

75 

76 

77class ExactPathMatchTwiceError(InstallRuleError): 

78 @property 

79 def path(self) -> str: 

80 return cast("str", self.args[1]) 

81 

82 @property 

83 def into(self) -> BinaryPackage: 

84 return cast("BinaryPackage", self.args[2]) 

85 

86 @property 

87 def definition_source(self) -> str: 

88 return cast("str", self.args[3]) 

89 

90 

91class NoMatchForInstallPatternError(InstallRuleError): 

92 @property 

93 def pattern(self) -> str: 

94 return cast("str", self.args[1]) 

95 

96 @property 

97 def search_dirs(self) -> Sequence["SearchDir"]: 

98 return cast("Sequence[SearchDir]", self.args[2]) 

99 

100 @property 

101 def definition_source(self) -> str: 

102 return cast("str", self.args[3]) 

103 

104 

105@dataclasses.dataclass(slots=True, frozen=True) 

106class SearchDir: 

107 search_dir: "VirtualPath" 

108 applies_to: FrozenSet[BinaryPackage] 

109 

110 

111@dataclasses.dataclass(slots=True, frozen=True) 

112class BinaryPackageInstallRuleContext: 

113 binary_package: BinaryPackage 

114 fs_root: FSPath 

115 doc_main_package: BinaryPackage 

116 

117 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext": 

118 return dataclasses.replace(self, **changes) 

119 

120 

121@dataclasses.dataclass(slots=True, frozen=True) 

122class InstallSearchDirContext: 

123 search_dirs: Sequence[SearchDir] 

124 check_for_uninstalled_dirs: Sequence["VirtualPath"] 

125 # TODO: Support search dirs per-package 

126 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field( 

127 default_factory=dict 

128 ) 

129 

130 

131@dataclasses.dataclass(slots=True) 

132class InstallRuleContext: 

133 # TODO: Search dirs should be per-package 

134 search_dirs: Sequence[SearchDir] 

135 binary_package_contexts: Dict[str, BinaryPackageInstallRuleContext] = ( 

136 dataclasses.field(default_factory=dict) 

137 ) 

138 

139 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext: 

140 return self.binary_package_contexts[item] 

141 

142 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None: 

143 self.binary_package_contexts[key] = value 

144 

145 def replace(self, **changes: Any) -> "InstallRuleContext": 

146 return dataclasses.replace(self, **changes) 

147 

148 

149@dataclasses.dataclass(slots=True, frozen=True) 

150class PathMatch: 

151 path: "VirtualPath" 

152 search_dir: "VirtualPath" 

153 is_exact_match: bool 

154 into: FrozenSet[BinaryPackage] 

155 

156 

157class DiscardState(IntEnum): 

158 UNCHECKED = 0 

159 NOT_DISCARDED = 1 

160 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2 

161 DISCARDED_BY_MANIFEST_RULE = 3 

162 

163 

164def _determine_manpage_section( 

165 match_rule: PathMatch, 

166 provided_section: Optional[int], 

167 definition_source: str, 

168) -> Optional[str]: 

169 section = str(provided_section) if provided_section is not None else None 

170 if section is None: 

171 detected_section = None 

172 with open(match_rule.path.fs_path, "r") as fd: 

173 for line in fd: 

174 if not line.startswith((".TH", ".Dt")): 

175 continue 

176 

177 m = _MAN_DT_LINE.match(line) 

178 if not m: 

179 m = _MAN_TH_LINE.match(line) 

180 if not m: 

181 continue 

182 detected_section = m.group(1) 

183 if "." in detected_section: 

184 _warn( 

185 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}" 

186 f" (detected via {definition_source}): It looks too much like a version" 

187 ) 

188 detected_section = None 

189 break 

190 if detected_section is None: 

191 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path)) 

192 if m: 

193 detected_section = m.group(1) 

194 section = detected_section 

195 

196 return section 

197 

198 

199def _determine_manpage_real_section( 

200 match_rule: PathMatch, 

201 section: Optional[str], 

202 definition_source: str, 

203) -> int: 

204 real_section = None 

205 if section is not None: 

206 m = _MAN_REAL_SECTION.match(section) 

207 if m: 

208 real_section = int(m.group(1)) 

209 if real_section is None or real_section < 0 or real_section > 9: 

210 if real_section is not None: 

211 _warn( 

212 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section})," 

213 f" which is not a valid section (must be between 1 and 9 incl.)" 

214 ) 

215 _error( 

216 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page" 

217 f" was detected via {definition_source}. Consider using `section: <number>` to" 

218 " explicitly declare the section. Keep in mind that it applies to all man pages for that" 

219 " rule and you may have to split the rule into two for this reason." 

220 ) 

221 return real_section 

222 

223 

224def _determine_manpage_language( 

225 match_rule: PathMatch, 

226 provided_language: Optional[str], 

227) -> Optional[str]: 

228 if provided_language is not None: 

229 if provided_language not in ("derive-from-basename", "derive-from-path"): 

230 return provided_language if provided_language != "C" else None 

231 if provided_language == "derive-from-basename": 

232 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name) 

233 if m is None: 

234 return None 

235 return m.group(1) 

236 # Fall-through for derive-from-path case 

237 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path) 

238 if m is None: 

239 return None 

240 return m.group(1) 

241 

242 

243def _dest_path_for_manpage( 

244 provided_section: Optional[int], 

245 provided_language: Optional[str], 

246 definition_source: str, 

247) -> Callable[["PathMatch"], str]: 

248 def _manpage_dest_path(match_rule: PathMatch) -> str: 

249 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name) 

250 section = _determine_manpage_section( 

251 match_rule, provided_section, definition_source 

252 ) 

253 real_section = _determine_manpage_real_section( 

254 match_rule, section, definition_source 

255 ) 

256 assert section is not None 

257 language = _determine_manpage_language(match_rule, provided_language) 

258 if language is None: 

259 maybe_language = "" 

260 else: 

261 maybe_language = f"{language}/" 

262 lang_suffix = f".{language}" 

263 if inst_basename.endswith(lang_suffix): 

264 inst_basename = inst_basename[: -len(lang_suffix)] 

265 

266 return ( 

267 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}" 

268 ) 

269 

270 return _manpage_dest_path 

271 

272 

273class SourcePathMatcher: 

274 def __init__(self, auto_discard_rules: List["PluginProvidedDiscardRule"]) -> None: 

275 self._already_matched: Dict[ 

276 str, 

277 Tuple[FrozenSet[BinaryPackage], str], 

278 ] = {} 

279 self._exact_match_request: Set[Tuple[str, str]] = set() 

280 self._discarded: Dict[str, DiscardState] = {} 

281 self._auto_discard_rules = auto_discard_rules 

282 self.used_auto_discard_rules: Dict[str, Set[str]] = collections.defaultdict(set) 

283 

284 def is_reserved(self, path: "VirtualPath") -> bool: 

285 fs_path = path.fs_path 

286 if fs_path in self._already_matched: 

287 return True 

288 result = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

289 if result == DiscardState.UNCHECKED: 

290 result = self._check_plugin_provided_exclude_state_for(path) 

291 if result == DiscardState.NOT_DISCARDED: 

292 return False 

293 

294 return True 

295 

296 def exclude(self, path: str) -> None: 

297 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE 

298 

299 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool: 

300 for dr in self._auto_discard_rules: 

301 verdict = dr.should_discard(path) 

302 if verdict: 

303 self.used_auto_discard_rules[dr.name].add(path.fs_path) 

304 return True 

305 return False 

306 

307 def _check_plugin_provided_exclude_state_for( 

308 self, 

309 path: "VirtualPath", 

310 ) -> DiscardState: 

311 cache_misses = [] 

312 current_path = path 

313 while True: 

314 fs_path = current_path.fs_path 

315 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

316 if exclude_state != DiscardState.UNCHECKED: 

317 verdict = exclude_state 

318 break 

319 cache_misses.append(fs_path) 

320 if self._run_plugin_provided_discard_rules_on(current_path): 

321 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

322 break 

323 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could 

324 # be excluded without the files in it triggering the rule). 

325 parent_dir = current_path.parent_dir 

326 if not parent_dir: 

327 verdict = DiscardState.NOT_DISCARDED 

328 break 

329 current_path = parent_dir 

330 if cache_misses: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 for p in cache_misses: 

332 self._discarded[p] = verdict 

333 return verdict 

334 

335 def may_match( 

336 self, 

337 match: PathMatch, 

338 *, 

339 is_exact_match: bool = False, 

340 ) -> Tuple[FrozenSet[BinaryPackage], bool]: 

341 m = self._already_matched.get(match.path.fs_path) 

342 if m: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true

343 return m[0], False 

344 current_path = match.path.fs_path 

345 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED) 

346 

347 if discard_state == DiscardState.UNCHECKED: 

348 discard_state = self._check_plugin_provided_exclude_state_for(match.path) 

349 

350 assert discard_state is not None and discard_state != DiscardState.UNCHECKED 

351 

352 is_discarded = discard_state != DiscardState.NOT_DISCARDED 

353 if ( 

354 is_exact_match 

355 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

356 ): 

357 is_discarded = False 

358 return frozenset(), is_discarded 

359 

360 def reserve( 

361 self, 

362 path: "VirtualPath", 

363 reserved_by: FrozenSet[BinaryPackage], 

364 definition_source: str, 

365 *, 

366 is_exact_match: bool = False, 

367 ) -> None: 

368 fs_path = path.fs_path 

369 self._already_matched[fs_path] = reserved_by, definition_source 

370 if not is_exact_match: 370 ↛ 372line 370 didn't jump to line 372 because the condition on line 370 was always true

371 return 

372 for pkg in reserved_by: 

373 m_key = (pkg.name, fs_path) 

374 self._exact_match_request.add(m_key) 

375 try: 

376 del self._discarded[fs_path] 

377 except KeyError: 

378 pass 

379 for discarded_paths in self.used_auto_discard_rules.values(): 

380 discarded_paths.discard(fs_path) 

381 

382 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]: 

383 stack = list(search_dir.iterdir) 

384 while stack: 

385 m = stack.pop() 

386 if m.is_dir: 

387 s_len = len(stack) 

388 stack.extend(m.iterdir) 

389 

390 if s_len == len(stack) and not self.is_reserved(m): 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true

391 # "Explicitly" empty dir 

392 yield m 

393 elif not self.is_reserved(m): 

394 yield m 

395 

396 def find_and_reserve_all_matches( 

397 self, 

398 match_rule: MatchRule, 

399 search_dirs: Sequence[SearchDir], 

400 dir_only_match: bool, 

401 match_filter: Optional[Callable[["VirtualPath"], bool]], 

402 reserved_by: FrozenSet[BinaryPackage], 

403 definition_source: str, 

404 ) -> Tuple[List[PathMatch], Tuple[int, ...]]: 

405 matched = [] 

406 already_installed_paths = 0 

407 already_excluded_paths = 0 

408 glob_expand = False if isinstance(match_rule, ExactFileSystemPath) else True 

409 

410 for match in _resolve_path( 

411 match_rule, 

412 search_dirs, 

413 dir_only_match, 

414 match_filter, 

415 reserved_by, 

416 ): 

417 installed_into, excluded = self.may_match( 

418 match, is_exact_match=not glob_expand 

419 ) 

420 if installed_into: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 if glob_expand: 

422 already_installed_paths += 1 

423 continue 

424 packages = ", ".join(p.name for p in installed_into) 

425 raise PathAlreadyInstalledOrDiscardedError( 

426 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.' 

427 f" The definition that triggered this issue is {definition_source}.", 

428 match, 

429 installed_into, 

430 definition_source, 

431 ) 

432 if excluded: 

433 if glob_expand: 433 ↛ 436line 433 didn't jump to line 436 because the condition on line 433 was always true

434 already_excluded_paths += 1 

435 continue 

436 raise PathAlreadyInstalledOrDiscardedError( 

437 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it' 

438 f" above the exclusion rule that excluded it. The definition that triggered this" 

439 f" issue is {definition_source}.", 

440 match, 

441 installed_into, 

442 definition_source, 

443 ) 

444 if not glob_expand: 

445 for pkg in match.into: 

446 m_key = (pkg.name, match.path.fs_path) 

447 if m_key in self._exact_match_request: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 raise ExactPathMatchTwiceError( 

449 f'The path "{match.path.fs_path}" (via exact match) has already been installed' 

450 f" into {pkg.name}. The second installation triggered by {definition_source}", 

451 match.path, 

452 pkg, 

453 definition_source, 

454 ) 

455 self._exact_match_request.add(m_key) 

456 

457 if reserved_by: 457 ↛ 463line 457 didn't jump to line 463 because the condition on line 457 was always true

458 self._already_matched[match.path.fs_path] = ( 

459 match.into, 

460 definition_source, 

461 ) 

462 else: 

463 self.exclude(match.path.fs_path) 

464 matched.append(match) 

465 exclude_counts = already_installed_paths, already_excluded_paths 

466 return matched, exclude_counts 

467 

468 

469def _resolve_path( 

470 match_rule: MatchRule, 

471 search_dirs: Iterable["SearchDir"], 

472 dir_only_match: bool, 

473 match_filter: Optional[Callable[["VirtualPath"], bool]], 

474 into: FrozenSet[BinaryPackage], 

475) -> Iterator[PathMatch]: 

476 missing_matches = set(into) 

477 for sdir in search_dirs: 

478 matched = False 

479 if into and missing_matches.isdisjoint(sdir.applies_to): 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was never true

480 # All the packages, where this search dir applies, already got a match 

481 continue 

482 applicable = sdir.applies_to & missing_matches 

483 for matched_path in match_rule.finditer( 

484 sdir.search_dir, 

485 ignore_paths=match_filter, 

486 ): 

487 if dir_only_match and not matched_path.is_dir: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true

488 continue 

489 if matched_path.parent_dir is None: 

490 if match_rule is MATCH_ANYTHING: 490 ↛ 492line 490 didn't jump to line 492 because the condition on line 490 was always true

491 continue 

492 _error( 

493 f"The pattern {match_rule.describe_match_short()} matched the root dir." 

494 ) 

495 yield PathMatch(matched_path, sdir.search_dir, False, applicable) 

496 matched = True 

497 # continue; we want to match everything we can from this search directory. 

498 

499 if matched: 

500 missing_matches -= applicable 

501 if into and not missing_matches: 

502 # For install rules, we can stop as soon as all packages had a match 

503 # For discard rules, all search directories must be visited. Otherwise, 

504 # you would have to repeat the discard rule once per search dir to be 

505 # sure something is fully discarded 

506 break 

507 

508 

509def _resolve_dest_paths( 

510 match: PathMatch, 

511 dest_paths: Sequence[Tuple[str, bool]], 

512 install_context: "InstallRuleContext", 

513) -> Sequence[Tuple[str, "FSPath"]]: 

514 dest_and_roots = [] 

515 for dest_path, dest_path_is_format in dest_paths: 

516 if dest_path_is_format: 

517 for pkg in match.into: 

518 parent_dir = match.path.parent_dir 

519 pkg_install_context = install_context[pkg.name] 

520 fs_root = pkg_install_context.fs_root 

521 dpath = dest_path.format( 

522 basename=match.path.name, 

523 dirname=parent_dir.path if parent_dir is not None else "", 

524 package_name=pkg.name, 

525 doc_main_package_name=pkg_install_context.doc_main_package.name, 

526 ) 

527 if dpath.endswith("/"): 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 raise ValueError( 

529 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended' 

530 f' with "/" ("{dest_path}"), which it must not!' 

531 ) 

532 dest_and_roots.append((dpath, fs_root)) 

533 else: 

534 if dest_path.endswith("/"): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 raise ValueError( 

536 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),' 

537 " which it must not!" 

538 ) 

539 dest_and_roots.extend( 

540 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into 

541 ) 

542 return dest_and_roots 

543 

544 

545def _resolve_matches( 

546 matches: List[PathMatch], 

547 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]], 

548 install_context: "InstallRuleContext", 

549) -> Iterator[Tuple[PathMatch, Sequence[Tuple[str, "FSPath"]]]]: 

550 dest_and_roots: Sequence[Tuple[str, "FSPath"]] 

551 if callable(dest_paths): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 compute_dest_path = dest_paths 

553 for match in matches: 

554 dpath = compute_dest_path(match) 

555 if dpath.endswith("/"): 

556 raise ValueError( 

557 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!' 

558 ) 

559 dest_and_roots = [ 

560 (dpath, install_context[pkg.name].fs_root) for pkg in match.into 

561 ] 

562 yield match, dest_and_roots 

563 else: 

564 for match in matches: 

565 dest_and_roots = _resolve_dest_paths( 

566 match, 

567 dest_paths, 

568 install_context, 

569 ) 

570 yield match, dest_and_roots 

571 

572 

573class InstallRule(DebputyDispatchableType): 

574 __slots__ = ( 

575 "_already_matched", 

576 "_exact_match_request", 

577 "_condition", 

578 "_match_filter", 

579 "_definition_source", 

580 ) 

581 

582 def __init__( 

583 self, 

584 condition: Optional[ManifestCondition], 

585 definition_source: str, 

586 *, 

587 match_filter: Optional[Callable[["VirtualPath"], bool]] = None, 

588 ) -> None: 

589 super().__init__() 

590 self._condition = condition 

591 self._definition_source = definition_source 

592 self._match_filter = match_filter 

593 

594 def _check_single_match( 

595 self, source: FileSystemMatchRule, matches: List[PathMatch] 

596 ) -> None: 

597 seen_pkgs = set() 

598 problem_pkgs = frozenset() 

599 for m in matches: 

600 problem_pkgs = seen_pkgs & m.into 

601 if problem_pkgs: 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true

602 break 

603 seen_pkgs.update(problem_pkgs) 

604 if problem_pkgs: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs)) 

606 _error( 

607 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.' 

608 "However, it should matched exactly one item. Please tighten the pattern defined" 

609 f" in {self._definition_source}" 

610 ) 

611 

612 def _match_pattern( 

613 self, 

614 path_matcher: SourcePathMatcher, 

615 fs_match_rule: FileSystemMatchRule, 

616 condition_context: ConditionContext, 

617 search_dirs: Sequence[SearchDir], 

618 into: FrozenSet[BinaryPackage], 

619 ) -> List[PathMatch]: 

620 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches( 

621 fs_match_rule.match_rule, 

622 search_dirs, 

623 fs_match_rule.raw_match_rule.endswith("/"), 

624 self._match_filter, 

625 into, 

626 self._definition_source, 

627 ) 

628 

629 already_installed_paths, already_excluded_paths = exclude_counts 

630 

631 if into: 631 ↛ 636line 631 didn't jump to line 636 because the condition on line 631 was always true

632 allow_empty_match = all(not p.should_be_acted_on for p in into) 

633 else: 

634 # discard rules must match provided at least one search dir exist. If none of them 

635 # exist, then we assume the discard rule is for a package that will not be built 

636 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs) 

637 if self._condition is not None and not self._condition.evaluate( 637 ↛ 640line 637 didn't jump to line 640 because the condition on line 637 was never true

638 condition_context 

639 ): 

640 allow_empty_match = True 

641 

642 if not matched and not allow_empty_match: 

643 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs) 

644 if already_excluded_paths and already_installed_paths: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 total_paths = already_excluded_paths + already_installed_paths 

646 msg = ( 

647 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

648 f" {total_paths} path(s) already been matched previously either by install or" 

649 f" exclude rules. If you wanted to install some of these paths into multiple" 

650 f" packages, please tweak the definition that installed them to install them" 

651 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

652 f" If you wanted to install these paths and exclude rules are getting in your" 

653 f" way, then please move this install rule before the exclusion rule that causes" 

654 f" issue or, in case of built-in excludes, list the paths explicitly (without" 

655 f" using patterns). Source for this issue is {self._definition_source}. Match rule:" 

656 f" {fs_match_rule.match_rule.describe_match_exact()}" 

657 ) 

658 elif already_excluded_paths: 658 ↛ 659line 658 didn't jump to line 659

659 msg = ( 

660 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

661 f" {already_excluded_paths} path(s) that have been excluded." 

662 " If you wanted to install some of these paths, please move the install rule" 

663 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly" 

664 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:" 

665 f" {fs_match_rule.match_rule.describe_match_exact()}" 

666 ) 

667 elif already_installed_paths: 667 ↛ 668line 667 didn't jump to line 668

668 msg = ( 

669 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

670 f" {already_installed_paths} path(s) already been matched previously." 

671 " If you wanted to install some of these paths into multiple packages," 

672 f" please tweak the definition that installed them to install them into" 

673 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

674 f" Source for this issue is {self._definition_source}. Match rule:" 

675 f" {fs_match_rule.match_rule.describe_match_exact()}" 

676 ) 

677 else: 

678 # TODO: Try harder to find the match and point out possible typos 

679 msg = ( 

680 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:" 

681 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}" 

682 ) 

683 raise NoMatchForInstallPatternError( 

684 msg, 

685 fs_match_rule, 

686 search_dirs, 

687 self._definition_source, 

688 ) 

689 return matched 

690 

691 def _install_matches( 

692 self, 

693 path_matcher: SourcePathMatcher, 

694 matches: List[PathMatch], 

695 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]], 

696 install_context: "InstallRuleContext", 

697 into: FrozenSet[BinaryPackage], 

698 condition_context: ConditionContext, 

699 ) -> None: 

700 if ( 

701 self._condition is not None 

702 and not self._condition.evaluate(condition_context) 

703 ) or not any(p.should_be_acted_on for p in into): 

704 # Rule is disabled; skip all its actions - also allow empty matches 

705 # for this particular case. Though, we do go through the match and 

706 # ensure all paths are reserved correctly such that the behavior 

707 # between `binary-arch` vs `binary-indep` vs `binary` is consistent 

708 for match in matches: 

709 self._reserve_recursively( 

710 path_matcher, 

711 match, 

712 into, 

713 ) 

714 

715 return 

716 

717 if not matches: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 raise ValueError("matches must not be empty") 

719 

720 for match, dest_paths_and_roots in _resolve_matches( 

721 matches, 

722 dest_paths, 

723 install_context, 

724 ): 

725 install_recursively_into_dirs = [] 

726 for dest, fs_root in dest_paths_and_roots: 

727 dir_part, basename = os.path.split(dest) 

728 # We do not associate these with the FS path. First off, 

729 # it is complicated to do in most cases (indeed, debhelper 

730 # does not preserve these directories either) and secondly, 

731 # it is "only" mtime and mode - mostly irrelevant as the 

732 # directory is 99.9% likely to be 0755 (we are talking 

733 # directories like "/usr", "/usr/share"). 

734 dir_path = fs_root.mkdirs(dir_part) 

735 existing_path = dir_path.get(basename) 

736 

737 if match.path.is_dir: 

738 if existing_path is not None and not existing_path.is_dir: 738 ↛ 739line 738 didn't jump to line 739 because the condition on line 738 was never true

739 existing_path.unlink() 

740 existing_path = None 

741 current_dir = existing_path 

742 

743 if current_dir is None: 743 ↛ 747line 743 didn't jump to line 747 because the condition on line 743 was always true

744 current_dir = dir_path.mkdir( 

745 basename, reference_path=match.path 

746 ) 

747 install_recursively_into_dirs.append(current_dir) 

748 else: 

749 if existing_path is not None and existing_path.is_dir: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 _error( 

751 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist" 

752 f" and is a directory. This error was triggered via {self._definition_source}." 

753 ) 

754 

755 if match.path.is_symlink: 

756 dir_path.add_symlink( 

757 basename, match.path.readlink(), reference_path=match.path 

758 ) 

759 else: 

760 dir_path.insert_file_from_fs_path( 

761 basename, 

762 match.path.fs_path, 

763 follow_symlinks=False, 

764 use_fs_path_mode=True, 

765 reference_path=match.path, 

766 ) 

767 if install_recursively_into_dirs: 

768 self._install_dir_recursively( 

769 path_matcher, 

770 install_recursively_into_dirs, 

771 match, 

772 into, 

773 ) 

774 

775 def _reserve_recursively( 

776 self, 

777 path_matcher: SourcePathMatcher, 

778 match: PathMatch, 

779 into: FrozenSet[BinaryPackage], 

780 ) -> None: 

781 direct_matched_path = match.path 

782 path_matcher.reserve( 

783 direct_matched_path, 

784 into, 

785 self._definition_source, 

786 is_exact_match=match.is_exact_match, 

787 ) 

788 if not direct_matched_path.is_dir: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true

789 return 

790 stack = list(direct_matched_path.iterdir) 

791 while stack: 

792 current_path = stack.pop() 

793 path_matcher.reserve( 

794 current_path, 

795 into, 

796 self._definition_source, 

797 is_exact_match=False, 

798 ) 

799 if current_path.is_dir: 

800 stack.extend(current_path.iterdir) 

801 

802 def _install_dir_recursively( 

803 self, 

804 path_matcher: SourcePathMatcher, 

805 parent_dirs: Sequence[FSPath], 

806 match: PathMatch, 

807 into: FrozenSet[BinaryPackage], 

808 ) -> None: 

809 stack = [ 

810 (parent_dirs, e) 

811 for e in match.path.iterdir 

812 if not path_matcher.is_reserved(e) 

813 ] 

814 

815 while stack: 

816 current_dirs, dir_entry = stack.pop() 

817 path_matcher.reserve( 

818 dir_entry, 

819 into, 

820 self._definition_source, 

821 is_exact_match=False, 

822 ) 

823 if dir_entry.is_dir: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true

824 new_dirs = [ 

825 d.mkdir(dir_entry.name, reference_path=dir_entry) 

826 for d in current_dirs 

827 ] 

828 stack.extend( 

829 (new_dirs, de) 

830 for de in dir_entry.iterdir 

831 if not path_matcher.is_reserved(de) 

832 ) 

833 elif dir_entry.is_symlink: 

834 for current_dir in current_dirs: 

835 current_dir.add_symlink( 

836 dir_entry.name, 

837 dir_entry.readlink(), 

838 reference_path=dir_entry, 

839 ) 

840 elif dir_entry.is_file: 840 ↛ 850line 840 didn't jump to line 850 because the condition on line 840 was always true

841 for current_dir in current_dirs: 

842 current_dir.insert_file_from_fs_path( 

843 dir_entry.name, 

844 dir_entry.fs_path, 

845 use_fs_path_mode=True, 

846 follow_symlinks=False, 

847 reference_path=dir_entry, 

848 ) 

849 else: 

850 _error( 

851 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink" 

852 ) 

853 

854 def perform_install( 

855 self, 

856 path_matcher: SourcePathMatcher, 

857 install_context: InstallRuleContext, 

858 condition_context: ConditionContext, 

859 ) -> None: 

860 raise NotImplementedError 

861 

862 @classmethod 

863 def install_as( 

864 cls, 

865 source: FileSystemMatchRule, 

866 dest_path: str, 

867 into: FrozenSet[BinaryPackage], 

868 definition_source: str, 

869 condition: Optional[ManifestCondition], 

870 ) -> "InstallRule": 

871 return GenericInstallationRule( 

872 [source], 

873 [(dest_path, False)], 

874 into, 

875 condition, 

876 definition_source, 

877 require_single_match=True, 

878 ) 

879 

880 @classmethod 

881 def install_dest( 

882 cls, 

883 sources: Sequence[FileSystemMatchRule], 

884 dest_dir: Optional[str], 

885 into: FrozenSet[BinaryPackage], 

886 definition_source: str, 

887 condition: Optional[ManifestCondition], 

888 ) -> "InstallRule": 

889 if dest_dir is None: 

890 dest_dir = "{dirname}/{basename}" 

891 else: 

892 dest_dir = os.path.join(dest_dir, "{basename}") 

893 return GenericInstallationRule( 

894 sources, 

895 [(dest_dir, True)], 

896 into, 

897 condition, 

898 definition_source, 

899 ) 

900 

901 @classmethod 

902 def install_multi_as( 

903 cls, 

904 source: FileSystemMatchRule, 

905 dest_paths: Sequence[str], 

906 into: FrozenSet[BinaryPackage], 

907 definition_source: str, 

908 condition: Optional[ManifestCondition], 

909 ) -> "InstallRule": 

910 if len(dest_paths) < 2: 910 ↛ 911line 910 didn't jump to line 911 because the condition on line 910 was never true

911 raise ValueError( 

912 "Please use `install_as` when there is less than 2 dest path" 

913 ) 

914 dps = tuple((dp, False) for dp in dest_paths) 

915 return GenericInstallationRule( 

916 [source], 

917 dps, 

918 into, 

919 condition, 

920 definition_source, 

921 require_single_match=True, 

922 ) 

923 

924 @classmethod 

925 def install_multi_dest( 

926 cls, 

927 sources: Sequence[FileSystemMatchRule], 

928 dest_dirs: Sequence[str], 

929 into: FrozenSet[BinaryPackage], 

930 definition_source: str, 

931 condition: Optional[ManifestCondition], 

932 ) -> "InstallRule": 

933 if len(dest_dirs) < 2: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 raise ValueError( 

935 "Please use `install_dest` when there is less than 2 dest dir" 

936 ) 

937 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs) 

938 return GenericInstallationRule( 

939 sources, 

940 dest_paths, 

941 into, 

942 condition, 

943 definition_source, 

944 ) 

945 

946 @classmethod 

947 def install_doc( 

948 cls, 

949 sources: Sequence[FileSystemMatchRule], 

950 dest_dir: Optional[str], 

951 into: FrozenSet[BinaryPackage], 

952 definition_source: str, 

953 condition: Optional[ManifestCondition], 

954 ) -> "InstallRule": 

955 cond: ManifestCondition = _BUILD_DOCS_BDO 

956 if condition is not None: 

957 cond = ManifestCondition.all_of([cond, condition]) 

958 dest_path_is_format = False 

959 if dest_dir is None: 

960 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}" 

961 dest_path_is_format = True 

962 

963 return GenericInstallationRule( 

964 sources, 

965 [(dest_dir, dest_path_is_format)], 

966 into, 

967 cond, 

968 definition_source, 

969 ) 

970 

971 @classmethod 

972 def install_doc_as( 

973 cls, 

974 source: FileSystemMatchRule, 

975 dest_path: str, 

976 into: FrozenSet[BinaryPackage], 

977 definition_source: str, 

978 condition: Optional[ManifestCondition], 

979 ) -> "InstallRule": 

980 cond: ManifestCondition = _BUILD_DOCS_BDO 

981 if condition is not None: 

982 cond = ManifestCondition.all_of([cond, condition]) 

983 

984 return GenericInstallationRule( 

985 [source], 

986 [(dest_path, False)], 

987 into, 

988 cond, 

989 definition_source, 

990 require_single_match=True, 

991 ) 

992 

993 @classmethod 

994 def install_examples( 

995 cls, 

996 sources: Sequence[FileSystemMatchRule], 

997 into: FrozenSet[BinaryPackage], 

998 definition_source: str, 

999 condition: Optional[ManifestCondition], 

1000 ) -> "InstallRule": 

1001 cond: ManifestCondition = _BUILD_DOCS_BDO 

1002 if condition is not None: 1002 ↛ 1003line 1002 didn't jump to line 1003 because the condition on line 1002 was never true

1003 cond = ManifestCondition.all_of([cond, condition]) 

1004 return GenericInstallationRule( 

1005 sources, 

1006 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)], 

1007 into, 

1008 cond, 

1009 definition_source, 

1010 ) 

1011 

1012 @classmethod 

1013 def install_man( 

1014 cls, 

1015 sources: Sequence[FileSystemMatchRule], 

1016 into: FrozenSet[BinaryPackage], 

1017 section: Optional[int], 

1018 language: Optional[str], 

1019 definition_source: str, 

1020 condition: Optional[ManifestCondition], 

1021 ) -> "InstallRule": 

1022 cond: ManifestCondition = _BUILD_DOCS_BDO 

1023 if condition is not None: 1023 ↛ 1024line 1023 didn't jump to line 1024 because the condition on line 1023 was never true

1024 cond = ManifestCondition.all_of([cond, condition]) 

1025 

1026 dest_path_computer = _dest_path_for_manpage( 

1027 section, language, definition_source 

1028 ) 

1029 

1030 return GenericInstallationRule( 1030 ↛ exitline 1030 didn't jump to the function exit

1031 sources, 

1032 dest_path_computer, 

1033 into, 

1034 cond, 

1035 definition_source, 

1036 match_filter=lambda m: not m.is_file, 

1037 ) 

1038 

1039 @classmethod 

1040 def discard_paths( 

1041 cls, 

1042 paths: Sequence[FileSystemMatchRule], 

1043 definition_source: str, 

1044 condition: Optional[ManifestCondition], 

1045 *, 

1046 limit_to: Optional[Sequence[FileSystemExactMatchRule]] = None, 

1047 ) -> "InstallRule": 

1048 return DiscardRule( 

1049 paths, 

1050 condition, 

1051 tuple(limit_to) if limit_to is not None else tuple(), 

1052 definition_source, 

1053 ) 

1054 

1055 

1056class PPFInstallRule(InstallRule): 

1057 __slots__ = ( 

1058 "_ppfs", 

1059 "_substitution", 

1060 "_into", 

1061 ) 

1062 

1063 # noinspection PyMissingConstructor 

1064 def __init__( 

1065 self, 

1066 into: BinaryPackage, 

1067 substitution: Substitution, 

1068 ppfs: Sequence["PackagerProvidedFile"], 

1069 ) -> None: 

1070 run_in_context_of_plugin( 

1071 "debputy", 

1072 super().__init__, 

1073 None, 

1074 "<built-in; PPF install rule>", 

1075 ) 

1076 self._substitution = substitution 

1077 self._ppfs = ppfs 

1078 self._into = into 

1079 

1080 def perform_install( 

1081 self, 

1082 path_matcher: SourcePathMatcher, 

1083 install_context: InstallRuleContext, 

1084 condition_context: ConditionContext, 

1085 ) -> None: 

1086 binary_install_context = install_context[self._into.name] 

1087 fs_root = binary_install_context.fs_root 

1088 for ppf in self._ppfs: 

1089 source_path = ppf.path.fs_path 

1090 dest_dir, name = ppf.compute_dest() 

1091 dir_path = fs_root.mkdirs(dest_dir) 

1092 

1093 dir_path.insert_file_from_fs_path( 

1094 name, 

1095 source_path, 

1096 follow_symlinks=True, 

1097 use_fs_path_mode=False, 

1098 mode=ppf.definition.default_mode, 

1099 ) 

1100 

1101 

1102class GenericInstallationRule(InstallRule): 

1103 __slots__ = ( 

1104 "_sources", 

1105 "_into", 

1106 "_dest_paths", 

1107 "_require_single_match", 

1108 ) 

1109 

1110 def __init__( 

1111 self, 

1112 sources: Sequence[FileSystemMatchRule], 

1113 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]], 

1114 into: FrozenSet[BinaryPackage], 

1115 condition: Optional[ManifestCondition], 

1116 definition_source: str, 

1117 *, 

1118 require_single_match: bool = False, 

1119 match_filter: Optional[Callable[["VirtualPath"], bool]] = None, 

1120 ) -> None: 

1121 super().__init__( 

1122 condition, 

1123 definition_source, 

1124 match_filter=match_filter, 

1125 ) 

1126 self._sources = sources 

1127 self._into = into 

1128 self._dest_paths = dest_paths 

1129 self._require_single_match = require_single_match 

1130 if self._require_single_match and len(sources) != 1: 1130 ↛ 1131line 1130 didn't jump to line 1131 because the condition on line 1130 was never true

1131 raise ValueError("require_single_match implies sources must have len 1") 

1132 

1133 def perform_install( 

1134 self, 

1135 path_matcher: SourcePathMatcher, 

1136 install_context: InstallRuleContext, 

1137 condition_context: ConditionContext, 

1138 ) -> None: 

1139 for source in self._sources: 

1140 matches = self._match_pattern( 

1141 path_matcher, 

1142 source, 

1143 condition_context, 

1144 install_context.search_dirs, 

1145 self._into, 

1146 ) 

1147 if self._require_single_match and len(matches) > 1: 

1148 self._check_single_match(source, matches) 

1149 self._install_matches( 

1150 path_matcher, 

1151 matches, 

1152 self._dest_paths, 

1153 install_context, 

1154 self._into, 

1155 condition_context, 

1156 ) 

1157 

1158 

1159class DiscardRule(InstallRule): 

1160 __slots__ = ("_fs_match_rules", "_limit_to") 

1161 

1162 def __init__( 

1163 self, 

1164 fs_match_rules: Sequence[FileSystemMatchRule], 

1165 condition: Optional[ManifestCondition], 

1166 limit_to: Sequence[FileSystemExactMatchRule], 

1167 definition_source: str, 

1168 ) -> None: 

1169 super().__init__(condition, definition_source) 

1170 self._fs_match_rules = fs_match_rules 

1171 self._limit_to = limit_to 

1172 

1173 def perform_install( 

1174 self, 

1175 path_matcher: SourcePathMatcher, 

1176 install_context: InstallRuleContext, 

1177 condition_context: ConditionContext, 

1178 ) -> None: 

1179 into = frozenset() 

1180 limit_to = self._limit_to 

1181 if limit_to: 

1182 matches = {x.match_rule.path for x in limit_to} 

1183 search_dirs = tuple( 

1184 s 

1185 for s in install_context.search_dirs 

1186 if s.search_dir.fs_path in matches 

1187 ) 

1188 if len(limit_to) != len(search_dirs): 

1189 m = matches.difference(s.search_dir.fs_path for s in search_dirs) 

1190 paths = ":".join(m) 

1191 _error( 

1192 f"The discard rule defined at {self._definition_source} mentions the following" 

1193 f" search directories that were not known to debputy: {paths}." 

1194 " Either the search dir is missing somewhere else or it should be removed from" 

1195 " the discard rule." 

1196 ) 

1197 else: 

1198 search_dirs = install_context.search_dirs 

1199 

1200 for fs_match_rule in self._fs_match_rules: 

1201 self._match_pattern( 

1202 path_matcher, 

1203 fs_match_rule, 

1204 condition_context, 

1205 search_dirs, 

1206 into, 

1207 )