Coverage for src/debputy/installations.py: 67%

516 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import collections 

2import dataclasses 

3import os.path 

4import re 

5from enum import IntEnum 

6from typing import ( 

7 List, 

8 Dict, 

9 FrozenSet, 

10 Union, 

11 Tuple, 

12 Set, 

13 Optional, 

14 TYPE_CHECKING, 

15 cast, 

16 Any, 

17) 

18from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping 

19 

20from debputy.exceptions import DebputyRuntimeError 

21from debputy.filesystem_scan import FSPath 

22from debputy.manifest_conditions import ( 

23 ConditionContext, 

24 ManifestCondition, 

25 _BUILD_DOCS_BDO, 

26) 

27from debputy.manifest_parser.base_types import ( 

28 FileSystemMatchRule, 

29 FileSystemExactMatchRule, 

30) 

31from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

32from debputy.packages import BinaryPackage 

33from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING 

34from debputy.plugin.plugin_state import run_in_context_of_plugin 

35from debputy.substitution import Substitution 

36from debputy.util import _error, _warn 

37 

38if TYPE_CHECKING: 

39 from debputy.packager_provided_files import PackagerProvidedFile 

40 from debputy.plugin.api import VirtualPath 

41 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule 

42 

43 

44_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?') 

45_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)") 

46_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$") 

47_MAN_REAL_SECTION = re.compile(r"^(\d+)") 

48_MAN_INST_BASENAME = re.compile(r"[.][^.]+$") 

49MAN_GUESS_LANG_FROM_PATH = re.compile( 

50 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/" 

51) 

52MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)") 

53 

54 

55class InstallRuleError(DebputyRuntimeError): 

56 pass 

57 

58 

59class PathAlreadyInstalledOrDiscardedError(InstallRuleError): 

60 @property 

61 def path(self) -> str: 

62 return cast("str", self.args[0]) 

63 

64 @property 

65 def into(self) -> frozenset[BinaryPackage]: 

66 return cast("FrozenSet[BinaryPackage]", self.args[1]) 

67 

68 @property 

69 def definition_source(self) -> str: 

70 return cast("str", self.args[2]) 

71 

72 

73class ExactPathMatchTwiceError(InstallRuleError): 

74 @property 

75 def path(self) -> str: 

76 return cast("str", self.args[1]) 

77 

78 @property 

79 def into(self) -> BinaryPackage: 

80 return cast("BinaryPackage", self.args[2]) 

81 

82 @property 

83 def definition_source(self) -> str: 

84 return cast("str", self.args[3]) 

85 

86 

87class NoMatchForInstallPatternError(InstallRuleError): 

88 @property 

89 def pattern(self) -> str: 

90 return cast("str", self.args[1]) 

91 

92 @property 

93 def search_dirs(self) -> Sequence["SearchDir"]: 

94 return cast("Sequence[SearchDir]", self.args[2]) 

95 

96 @property 

97 def definition_source(self) -> str: 

98 return cast("str", self.args[3]) 

99 

100 

101@dataclasses.dataclass(slots=True, frozen=True) 

102class SearchDir: 

103 search_dir: "VirtualPath" 

104 applies_to: frozenset[BinaryPackage] 

105 

106 

107@dataclasses.dataclass(slots=True, frozen=True) 

108class BinaryPackageInstallRuleContext: 

109 binary_package: BinaryPackage 

110 fs_root: FSPath 

111 doc_main_package: BinaryPackage 

112 

113 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext": 

114 return dataclasses.replace(self, **changes) 

115 

116 

117@dataclasses.dataclass(slots=True, frozen=True) 

118class InstallSearchDirContext: 

119 search_dirs: Sequence[SearchDir] 

120 check_for_uninstalled_dirs: Sequence["VirtualPath"] 

121 # TODO: Support search dirs per-package 

122 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field( 

123 default_factory=dict 

124 ) 

125 

126 

127@dataclasses.dataclass(slots=True) 

128class InstallRuleContext: 

129 # TODO: Search dirs should be per-package 

130 search_dirs: Sequence[SearchDir] 

131 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = ( 

132 dataclasses.field(default_factory=dict) 

133 ) 

134 

135 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext: 

136 return self.binary_package_contexts[item] 

137 

138 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None: 

139 self.binary_package_contexts[key] = value 

140 

141 def replace(self, **changes: Any) -> "InstallRuleContext": 

142 return dataclasses.replace(self, **changes) 

143 

144 

145@dataclasses.dataclass(slots=True, frozen=True) 

146class PathMatch: 

147 path: "VirtualPath" 

148 search_dir: "VirtualPath" 

149 is_exact_match: bool 

150 into: frozenset[BinaryPackage] 

151 

152 

153class DiscardState(IntEnum): 

154 UNCHECKED = 0 

155 NOT_DISCARDED = 1 

156 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2 

157 DISCARDED_BY_MANIFEST_RULE = 3 

158 

159 

160def _determine_manpage_section( 

161 match_rule: PathMatch, 

162 provided_section: int | None, 

163 definition_source: str, 

164) -> str | None: 

165 section = str(provided_section) if provided_section is not None else None 

166 if section is None: 

167 detected_section = None 

168 with open(match_rule.path.fs_path) as fd: 

169 for line in fd: 

170 if not line.startswith((".TH", ".Dt")): 

171 continue 

172 

173 m = _MAN_DT_LINE.match(line) 

174 if not m: 

175 m = _MAN_TH_LINE.match(line) 

176 if not m: 

177 continue 

178 detected_section = m.group(1) 

179 if "." in detected_section: 

180 _warn( 

181 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}" 

182 f" (detected via {definition_source}): It looks too much like a version" 

183 ) 

184 detected_section = None 

185 break 

186 if detected_section is None: 

187 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path)) 

188 if m: 

189 detected_section = m.group(1) 

190 section = detected_section 

191 

192 return section 

193 

194 

195def _determine_manpage_real_section( 

196 match_rule: PathMatch, 

197 section: str | None, 

198 definition_source: str, 

199) -> int: 

200 real_section = None 

201 if section is not None: 

202 m = _MAN_REAL_SECTION.match(section) 

203 if m: 

204 real_section = int(m.group(1)) 

205 if real_section is None or real_section < 0 or real_section > 9: 

206 if real_section is not None: 

207 _warn( 

208 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section})," 

209 f" which is not a valid section (must be between 1 and 9 incl.)" 

210 ) 

211 _error( 

212 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page" 

213 f" was detected via {definition_source}. Consider using `section: <number>` to" 

214 " explicitly declare the section. Keep in mind that it applies to all man pages for that" 

215 " rule and you may have to split the rule into two for this reason." 

216 ) 

217 return real_section 

218 

219 

220def _determine_manpage_language( 

221 match_rule: PathMatch, 

222 provided_language: str | None, 

223) -> str | None: 

224 if provided_language is not None: 

225 if provided_language not in ("derive-from-basename", "derive-from-path"): 

226 return provided_language if provided_language != "C" else None 

227 if provided_language == "derive-from-basename": 

228 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name) 

229 if m is None: 

230 return None 

231 return m.group(1) 

232 # Fall-through for derive-from-path case 

233 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path) 

234 if m is None: 

235 return None 

236 return m.group(1) 

237 

238 

239def _dest_path_for_manpage( 

240 provided_section: int | None, 

241 provided_language: str | None, 

242 definition_source: str, 

243) -> Callable[["PathMatch"], str]: 

244 def _manpage_dest_path(match_rule: PathMatch) -> str: 

245 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name) 

246 section = _determine_manpage_section( 

247 match_rule, provided_section, definition_source 

248 ) 

249 real_section = _determine_manpage_real_section( 

250 match_rule, section, definition_source 

251 ) 

252 assert section is not None 

253 language = _determine_manpage_language(match_rule, provided_language) 

254 if language is None: 

255 maybe_language = "" 

256 else: 

257 maybe_language = f"{language}/" 

258 lang_suffix = f".{language}" 

259 if inst_basename.endswith(lang_suffix): 

260 inst_basename = inst_basename[: -len(lang_suffix)] 

261 

262 return ( 

263 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}" 

264 ) 

265 

266 return _manpage_dest_path 

267 

268 

269class SourcePathMatcher: 

270 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None: 

271 self._already_matched: dict[ 

272 str, 

273 tuple[frozenset[BinaryPackage], str], 

274 ] = {} 

275 self._exact_match_request: set[tuple[str, str]] = set() 

276 self._discarded: dict[str, DiscardState] = {} 

277 self._auto_discard_rules = auto_discard_rules 

278 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set) 

279 

280 def is_reserved(self, path: "VirtualPath") -> bool: 

281 fs_path = path.fs_path 

282 if fs_path in self._already_matched: 

283 return True 

284 result = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

285 if result == DiscardState.UNCHECKED: 

286 result = self._check_plugin_provided_exclude_state_for(path) 

287 if result == DiscardState.NOT_DISCARDED: 

288 return False 

289 

290 return True 

291 

292 def exclude(self, path: str) -> None: 

293 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE 

294 

295 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool: 

296 for dr in self._auto_discard_rules: 

297 verdict = dr.should_discard(path) 

298 if verdict: 

299 self.used_auto_discard_rules[dr.name].add(path.fs_path) 

300 return True 

301 return False 

302 

303 def _check_plugin_provided_exclude_state_for( 

304 self, 

305 path: "VirtualPath", 

306 ) -> DiscardState: 

307 cache_misses = [] 

308 current_path = path 

309 while True: 

310 fs_path = current_path.fs_path 

311 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

312 if exclude_state != DiscardState.UNCHECKED: 

313 verdict = exclude_state 

314 break 

315 cache_misses.append(fs_path) 

316 if self._run_plugin_provided_discard_rules_on(current_path): 

317 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

318 break 

319 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could 

320 # be excluded without the files in it triggering the rule). 

321 parent_dir = current_path.parent_dir 

322 if not parent_dir: 

323 verdict = DiscardState.NOT_DISCARDED 

324 break 

325 current_path = parent_dir 

326 if cache_misses: 326 ↛ 329line 326 didn't jump to line 329 because the condition on line 326 was always true

327 for p in cache_misses: 

328 self._discarded[p] = verdict 

329 return verdict 

330 

331 def may_match( 

332 self, 

333 match: PathMatch, 

334 *, 

335 is_exact_match: bool = False, 

336 ) -> tuple[frozenset[BinaryPackage], bool]: 

337 m = self._already_matched.get(match.path.fs_path) 

338 if m: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 return m[0], False 

340 current_path = match.path.fs_path 

341 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED) 

342 

343 if discard_state == DiscardState.UNCHECKED: 

344 discard_state = self._check_plugin_provided_exclude_state_for(match.path) 

345 

346 assert discard_state is not None and discard_state != DiscardState.UNCHECKED 

347 

348 is_discarded = discard_state != DiscardState.NOT_DISCARDED 

349 if ( 

350 is_exact_match 

351 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

352 ): 

353 is_discarded = False 

354 return frozenset(), is_discarded 

355 

356 def reserve( 

357 self, 

358 path: "VirtualPath", 

359 reserved_by: frozenset[BinaryPackage], 

360 definition_source: str, 

361 *, 

362 is_exact_match: bool = False, 

363 ) -> None: 

364 fs_path = path.fs_path 

365 self._already_matched[fs_path] = reserved_by, definition_source 

366 if not is_exact_match: 366 ↛ 368line 366 didn't jump to line 368 because the condition on line 366 was always true

367 return 

368 for pkg in reserved_by: 

369 m_key = (pkg.name, fs_path) 

370 self._exact_match_request.add(m_key) 

371 try: 

372 del self._discarded[fs_path] 

373 except KeyError: 

374 pass 

375 for discarded_paths in self.used_auto_discard_rules.values(): 

376 discarded_paths.discard(fs_path) 

377 

378 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]: 

379 stack = list(search_dir.iterdir) 

380 while stack: 

381 m = stack.pop() 

382 if m.is_dir: 

383 s_len = len(stack) 

384 stack.extend(m.iterdir) 

385 

386 if s_len == len(stack) and not self.is_reserved(m): 386 ↛ 388line 386 didn't jump to line 388 because the condition on line 386 was never true

387 # "Explicitly" empty dir 

388 yield m 

389 elif not self.is_reserved(m): 

390 yield m 

391 

392 def find_and_reserve_all_matches( 

393 self, 

394 match_rule: MatchRule, 

395 search_dirs: Sequence[SearchDir], 

396 dir_only_match: bool, 

397 match_filter: Callable[["VirtualPath"], bool] | None, 

398 reserved_by: frozenset[BinaryPackage], 

399 definition_source: str, 

400 ) -> tuple[list[PathMatch], tuple[int, ...]]: 

401 matched = [] 

402 already_installed_paths = 0 

403 already_excluded_paths = 0 

404 glob_expand = False if isinstance(match_rule, ExactFileSystemPath) else True 

405 

406 for match in _resolve_path( 

407 match_rule, 

408 search_dirs, 

409 dir_only_match, 

410 match_filter, 

411 reserved_by, 

412 ): 

413 installed_into, excluded = self.may_match( 

414 match, is_exact_match=not glob_expand 

415 ) 

416 if installed_into: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true

417 if glob_expand: 

418 already_installed_paths += 1 

419 continue 

420 packages = ", ".join(p.name for p in installed_into) 

421 raise PathAlreadyInstalledOrDiscardedError( 

422 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.' 

423 f" The definition that triggered this issue is {definition_source}.", 

424 match, 

425 installed_into, 

426 definition_source, 

427 ) 

428 if excluded: 

429 if glob_expand: 429 ↛ 432line 429 didn't jump to line 432 because the condition on line 429 was always true

430 already_excluded_paths += 1 

431 continue 

432 raise PathAlreadyInstalledOrDiscardedError( 

433 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it' 

434 f" above the exclusion rule that excluded it. The definition that triggered this" 

435 f" issue is {definition_source}.", 

436 match, 

437 installed_into, 

438 definition_source, 

439 ) 

440 if not glob_expand: 

441 for pkg in match.into: 

442 m_key = (pkg.name, match.path.fs_path) 

443 if m_key in self._exact_match_request: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 raise ExactPathMatchTwiceError( 

445 f'The path "{match.path.fs_path}" (via exact match) has already been installed' 

446 f" into {pkg.name}. The second installation triggered by {definition_source}", 

447 match.path, 

448 pkg, 

449 definition_source, 

450 ) 

451 self._exact_match_request.add(m_key) 

452 

453 if reserved_by: 453 ↛ 459line 453 didn't jump to line 459 because the condition on line 453 was always true

454 self._already_matched[match.path.fs_path] = ( 

455 match.into, 

456 definition_source, 

457 ) 

458 else: 

459 self.exclude(match.path.fs_path) 

460 matched.append(match) 

461 exclude_counts = already_installed_paths, already_excluded_paths 

462 return matched, exclude_counts 

463 

464 

465def _resolve_path( 

466 match_rule: MatchRule, 

467 search_dirs: Iterable["SearchDir"], 

468 dir_only_match: bool, 

469 match_filter: Callable[["VirtualPath"], bool] | None, 

470 into: frozenset[BinaryPackage], 

471) -> Iterator[PathMatch]: 

472 missing_matches = set(into) 

473 for sdir in search_dirs: 

474 matched = False 

475 if into and missing_matches.isdisjoint(sdir.applies_to): 475 ↛ 477line 475 didn't jump to line 477 because the condition on line 475 was never true

476 # All the packages, where this search dir applies, already got a match 

477 continue 

478 applicable = sdir.applies_to & missing_matches 

479 for matched_path in match_rule.finditer( 

480 sdir.search_dir, 

481 ignore_paths=match_filter, 

482 ): 

483 if dir_only_match and not matched_path.is_dir: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 continue 

485 if matched_path.parent_dir is None: 

486 if match_rule is MATCH_ANYTHING: 486 ↛ 488line 486 didn't jump to line 488 because the condition on line 486 was always true

487 continue 

488 _error( 

489 f"The pattern {match_rule.describe_match_short()} matched the root dir." 

490 ) 

491 yield PathMatch(matched_path, sdir.search_dir, False, applicable) 

492 matched = True 

493 # continue; we want to match everything we can from this search directory. 

494 

495 if matched: 

496 missing_matches -= applicable 

497 if into and not missing_matches: 

498 # For install rules, we can stop as soon as all packages had a match 

499 # For discard rules, all search directories must be visited. Otherwise, 

500 # you would have to repeat the discard rule once per search dir to be 

501 # sure something is fully discarded 

502 break 

503 

504 

505def _resolve_dest_paths( 

506 match: PathMatch, 

507 dest_paths: Sequence[tuple[str, bool]], 

508 install_context: "InstallRuleContext", 

509) -> Sequence[tuple[str, "FSPath"]]: 

510 dest_and_roots = [] 

511 for dest_path, dest_path_is_format in dest_paths: 

512 if dest_path_is_format: 

513 for pkg in match.into: 

514 parent_dir = match.path.parent_dir 

515 pkg_install_context = install_context[pkg.name] 

516 fs_root = pkg_install_context.fs_root 

517 dpath = dest_path.format( 

518 basename=match.path.name, 

519 dirname=parent_dir.path if parent_dir is not None else "", 

520 package_name=pkg.name, 

521 doc_main_package_name=pkg_install_context.doc_main_package.name, 

522 ) 

523 if dpath.endswith("/"): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 raise ValueError( 

525 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended' 

526 f' with "/" ("{dest_path}"), which it must not!' 

527 ) 

528 dest_and_roots.append((dpath, fs_root)) 

529 else: 

530 if dest_path.endswith("/"): 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 raise ValueError( 

532 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),' 

533 " which it must not!" 

534 ) 

535 dest_and_roots.extend( 

536 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into 

537 ) 

538 return dest_and_roots 

539 

540 

541def _resolve_matches( 

542 matches: list[PathMatch], 

543 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

544 install_context: "InstallRuleContext", 

545) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "FSPath"]]]]: 

546 dest_and_roots: Sequence[tuple[str, "FSPath"]] 

547 if callable(dest_paths): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 compute_dest_path = dest_paths 

549 for match in matches: 

550 dpath = compute_dest_path(match) 

551 if dpath.endswith("/"): 

552 raise ValueError( 

553 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!' 

554 ) 

555 dest_and_roots = [ 

556 (dpath, install_context[pkg.name].fs_root) for pkg in match.into 

557 ] 

558 yield match, dest_and_roots 

559 else: 

560 for match in matches: 

561 dest_and_roots = _resolve_dest_paths( 

562 match, 

563 dest_paths, 

564 install_context, 

565 ) 

566 yield match, dest_and_roots 

567 

568 

569class InstallRule(DebputyDispatchableType): 

570 __slots__ = ( 

571 "_already_matched", 

572 "_exact_match_request", 

573 "_condition", 

574 "_match_filter", 

575 "_definition_source", 

576 ) 

577 

578 def __init__( 

579 self, 

580 condition: ManifestCondition | None, 

581 definition_source: str, 

582 *, 

583 match_filter: Callable[["VirtualPath"], bool] | None = None, 

584 ) -> None: 

585 super().__init__() 

586 self._condition = condition 

587 self._definition_source = definition_source 

588 self._match_filter = match_filter 

589 

590 def _check_single_match( 

591 self, source: FileSystemMatchRule, matches: list[PathMatch] 

592 ) -> None: 

593 seen_pkgs = set() 

594 problem_pkgs = frozenset() 

595 for m in matches: 

596 problem_pkgs = seen_pkgs & m.into 

597 if problem_pkgs: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true

598 break 

599 seen_pkgs.update(problem_pkgs) 

600 if problem_pkgs: 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true

601 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs)) 

602 _error( 

603 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.' 

604 "However, it should matched exactly one item. Please tighten the pattern defined" 

605 f" in {self._definition_source}" 

606 ) 

607 

608 def _match_pattern( 

609 self, 

610 path_matcher: SourcePathMatcher, 

611 fs_match_rule: FileSystemMatchRule, 

612 condition_context: ConditionContext, 

613 search_dirs: Sequence[SearchDir], 

614 into: frozenset[BinaryPackage], 

615 ) -> list[PathMatch]: 

616 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches( 

617 fs_match_rule.match_rule, 

618 search_dirs, 

619 fs_match_rule.raw_match_rule.endswith("/"), 

620 self._match_filter, 

621 into, 

622 self._definition_source, 

623 ) 

624 

625 already_installed_paths, already_excluded_paths = exclude_counts 

626 

627 if into: 627 ↛ 632line 627 didn't jump to line 632 because the condition on line 627 was always true

628 allow_empty_match = all(not p.should_be_acted_on for p in into) 

629 else: 

630 # discard rules must match provided at least one search dir exist. If none of them 

631 # exist, then we assume the discard rule is for a package that will not be built 

632 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs) 

633 if self._condition is not None and not self._condition.evaluate( 633 ↛ 636line 633 didn't jump to line 636 because the condition on line 633 was never true

634 condition_context 

635 ): 

636 allow_empty_match = True 

637 

638 if not matched and not allow_empty_match: 

639 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs) 

640 if already_excluded_paths and already_installed_paths: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true

641 total_paths = already_excluded_paths + already_installed_paths 

642 msg = ( 

643 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

644 f" {total_paths} path(s) already been matched previously either by install or" 

645 f" exclude rules. If you wanted to install some of these paths into multiple" 

646 f" packages, please tweak the definition that installed them to install them" 

647 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

648 f" If you wanted to install these paths and exclude rules are getting in your" 

649 f" way, then please move this install rule before the exclusion rule that causes" 

650 f" issue or, in case of built-in excludes, list the paths explicitly (without" 

651 f" using patterns). Source for this issue is {self._definition_source}. Match rule:" 

652 f" {fs_match_rule.match_rule.describe_match_exact()}" 

653 ) 

654 elif already_excluded_paths: 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true

655 msg = ( 

656 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

657 f" {already_excluded_paths} path(s) that have been excluded." 

658 " If you wanted to install some of these paths, please move the install rule" 

659 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly" 

660 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:" 

661 f" {fs_match_rule.match_rule.describe_match_exact()}" 

662 ) 

663 elif already_installed_paths: 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true

664 msg = ( 

665 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

666 f" {already_installed_paths} path(s) already been matched previously." 

667 " If you wanted to install some of these paths into multiple packages," 

668 f" please tweak the definition that installed them to install them into" 

669 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

670 f" Source for this issue is {self._definition_source}. Match rule:" 

671 f" {fs_match_rule.match_rule.describe_match_exact()}" 

672 ) 

673 else: 

674 # TODO: Try harder to find the match and point out possible typos 

675 msg = ( 

676 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:" 

677 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}" 

678 ) 

679 raise NoMatchForInstallPatternError( 

680 msg, 

681 fs_match_rule, 

682 search_dirs, 

683 self._definition_source, 

684 ) 

685 return matched 

686 

687 def _install_matches( 

688 self, 

689 path_matcher: SourcePathMatcher, 

690 matches: list[PathMatch], 

691 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

692 install_context: "InstallRuleContext", 

693 into: frozenset[BinaryPackage], 

694 condition_context: ConditionContext, 

695 ) -> None: 

696 if ( 

697 self._condition is not None 

698 and not self._condition.evaluate(condition_context) 

699 ) or not any(p.should_be_acted_on for p in into): 

700 # Rule is disabled; skip all its actions - also allow empty matches 

701 # for this particular case. Though, we do go through the match and 

702 # ensure all paths are reserved correctly such that the behavior 

703 # between `binary-arch` vs `binary-indep` vs `binary` is consistent 

704 for match in matches: 

705 self._reserve_recursively( 

706 path_matcher, 

707 match, 

708 into, 

709 ) 

710 

711 return 

712 

713 if not matches: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 raise ValueError("matches must not be empty") 

715 

716 for match, dest_paths_and_roots in _resolve_matches( 

717 matches, 

718 dest_paths, 

719 install_context, 

720 ): 

721 install_recursively_into_dirs = [] 

722 for dest, fs_root in dest_paths_and_roots: 

723 dir_part, basename = os.path.split(dest) 

724 # We do not associate these with the FS path. First off, 

725 # it is complicated to do in most cases (indeed, debhelper 

726 # does not preserve these directories either) and secondly, 

727 # it is "only" mtime and mode - mostly irrelevant as the 

728 # directory is 99.9% likely to be 0755 (we are talking 

729 # directories like "/usr", "/usr/share"). 

730 dir_path = fs_root.mkdirs(dir_part) 

731 existing_path = dir_path.get(basename) 

732 

733 if match.path.is_dir: 

734 if existing_path is not None and not existing_path.is_dir: 734 ↛ 735line 734 didn't jump to line 735 because the condition on line 734 was never true

735 existing_path.unlink() 

736 existing_path = None 

737 current_dir = existing_path 

738 

739 if current_dir is None: 739 ↛ 743line 739 didn't jump to line 743 because the condition on line 739 was always true

740 current_dir = dir_path.mkdir( 

741 basename, reference_path=match.path 

742 ) 

743 install_recursively_into_dirs.append(current_dir) 

744 else: 

745 if existing_path is not None and existing_path.is_dir: 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true

746 _error( 

747 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist" 

748 f" and is a directory. This error was triggered via {self._definition_source}." 

749 ) 

750 

751 if match.path.is_symlink: 

752 dir_path.add_symlink( 

753 basename, match.path.readlink(), reference_path=match.path 

754 ) 

755 else: 

756 dir_path.insert_file_from_fs_path( 

757 basename, 

758 match.path.fs_path, 

759 follow_symlinks=False, 

760 use_fs_path_mode=True, 

761 reference_path=match.path, 

762 ) 

763 if install_recursively_into_dirs: 

764 self._install_dir_recursively( 

765 path_matcher, 

766 install_recursively_into_dirs, 

767 match, 

768 into, 

769 ) 

770 

771 def _reserve_recursively( 

772 self, 

773 path_matcher: SourcePathMatcher, 

774 match: PathMatch, 

775 into: frozenset[BinaryPackage], 

776 ) -> None: 

777 direct_matched_path = match.path 

778 path_matcher.reserve( 

779 direct_matched_path, 

780 into, 

781 self._definition_source, 

782 is_exact_match=match.is_exact_match, 

783 ) 

784 if not direct_matched_path.is_dir: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 return 

786 stack = list(direct_matched_path.iterdir) 

787 while stack: 

788 current_path = stack.pop() 

789 path_matcher.reserve( 

790 current_path, 

791 into, 

792 self._definition_source, 

793 is_exact_match=False, 

794 ) 

795 if current_path.is_dir: 

796 stack.extend(current_path.iterdir) 

797 

798 def _install_dir_recursively( 

799 self, 

800 path_matcher: SourcePathMatcher, 

801 parent_dirs: Sequence[FSPath], 

802 match: PathMatch, 

803 into: frozenset[BinaryPackage], 

804 ) -> None: 

805 stack = [ 

806 (parent_dirs, e) 

807 for e in match.path.iterdir 

808 if not path_matcher.is_reserved(e) 

809 ] 

810 

811 while stack: 

812 current_dirs, dir_entry = stack.pop() 

813 path_matcher.reserve( 

814 dir_entry, 

815 into, 

816 self._definition_source, 

817 is_exact_match=False, 

818 ) 

819 if dir_entry.is_dir: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true

820 new_dirs = [ 

821 d.mkdir(dir_entry.name, reference_path=dir_entry) 

822 for d in current_dirs 

823 ] 

824 stack.extend( 

825 (new_dirs, de) 

826 for de in dir_entry.iterdir 

827 if not path_matcher.is_reserved(de) 

828 ) 

829 elif dir_entry.is_symlink: 

830 for current_dir in current_dirs: 

831 current_dir.add_symlink( 

832 dir_entry.name, 

833 dir_entry.readlink(), 

834 reference_path=dir_entry, 

835 ) 

836 elif dir_entry.is_file: 836 ↛ 846line 836 didn't jump to line 846 because the condition on line 836 was always true

837 for current_dir in current_dirs: 

838 current_dir.insert_file_from_fs_path( 

839 dir_entry.name, 

840 dir_entry.fs_path, 

841 use_fs_path_mode=True, 

842 follow_symlinks=False, 

843 reference_path=dir_entry, 

844 ) 

845 else: 

846 _error( 

847 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink" 

848 ) 

849 

850 def perform_install( 

851 self, 

852 path_matcher: SourcePathMatcher, 

853 install_context: InstallRuleContext, 

854 condition_context: ConditionContext, 

855 ) -> None: 

856 raise NotImplementedError 

857 

858 @classmethod 

859 def install_as( 

860 cls, 

861 source: FileSystemMatchRule, 

862 dest_path: str, 

863 into: frozenset[BinaryPackage], 

864 definition_source: str, 

865 condition: ManifestCondition | None, 

866 ) -> "InstallRule": 

867 return GenericInstallationRule( 

868 [source], 

869 [(dest_path, False)], 

870 into, 

871 condition, 

872 definition_source, 

873 require_single_match=True, 

874 ) 

875 

876 @classmethod 

877 def install_dest( 

878 cls, 

879 sources: Sequence[FileSystemMatchRule], 

880 dest_dir: str | None, 

881 into: frozenset[BinaryPackage], 

882 definition_source: str, 

883 condition: ManifestCondition | None, 

884 ) -> "InstallRule": 

885 if dest_dir is None: 

886 dest_dir = "{dirname}/{basename}" 

887 else: 

888 dest_dir = os.path.join(dest_dir, "{basename}") 

889 return GenericInstallationRule( 

890 sources, 

891 [(dest_dir, True)], 

892 into, 

893 condition, 

894 definition_source, 

895 ) 

896 

897 @classmethod 

898 def install_multi_as( 

899 cls, 

900 source: FileSystemMatchRule, 

901 dest_paths: Sequence[str], 

902 into: frozenset[BinaryPackage], 

903 definition_source: str, 

904 condition: ManifestCondition | None, 

905 ) -> "InstallRule": 

906 if len(dest_paths) < 2: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true

907 raise ValueError( 

908 "Please use `install_as` when there is less than 2 dest path" 

909 ) 

910 dps = tuple((dp, False) for dp in dest_paths) 

911 return GenericInstallationRule( 

912 [source], 

913 dps, 

914 into, 

915 condition, 

916 definition_source, 

917 require_single_match=True, 

918 ) 

919 

920 @classmethod 

921 def install_multi_dest( 

922 cls, 

923 sources: Sequence[FileSystemMatchRule], 

924 dest_dirs: Sequence[str], 

925 into: frozenset[BinaryPackage], 

926 definition_source: str, 

927 condition: ManifestCondition | None, 

928 ) -> "InstallRule": 

929 if len(dest_dirs) < 2: 929 ↛ 930line 929 didn't jump to line 930 because the condition on line 929 was never true

930 raise ValueError( 

931 "Please use `install_dest` when there is less than 2 dest dir" 

932 ) 

933 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs) 

934 return GenericInstallationRule( 

935 sources, 

936 dest_paths, 

937 into, 

938 condition, 

939 definition_source, 

940 ) 

941 

942 @classmethod 

943 def install_doc( 

944 cls, 

945 sources: Sequence[FileSystemMatchRule], 

946 dest_dir: str | None, 

947 into: frozenset[BinaryPackage], 

948 definition_source: str, 

949 condition: ManifestCondition | None, 

950 ) -> "InstallRule": 

951 cond: ManifestCondition = _BUILD_DOCS_BDO 

952 if condition is not None: 

953 cond = ManifestCondition.all_of([cond, condition]) 

954 dest_path_is_format = False 

955 if dest_dir is None: 

956 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}" 

957 dest_path_is_format = True 

958 

959 return GenericInstallationRule( 

960 sources, 

961 [(dest_dir, dest_path_is_format)], 

962 into, 

963 cond, 

964 definition_source, 

965 ) 

966 

967 @classmethod 

968 def install_doc_as( 

969 cls, 

970 source: FileSystemMatchRule, 

971 dest_path: str, 

972 into: frozenset[BinaryPackage], 

973 definition_source: str, 

974 condition: ManifestCondition | None, 

975 ) -> "InstallRule": 

976 cond: ManifestCondition = _BUILD_DOCS_BDO 

977 if condition is not None: 

978 cond = ManifestCondition.all_of([cond, condition]) 

979 

980 return GenericInstallationRule( 

981 [source], 

982 [(dest_path, False)], 

983 into, 

984 cond, 

985 definition_source, 

986 require_single_match=True, 

987 ) 

988 

989 @classmethod 

990 def install_examples( 

991 cls, 

992 sources: Sequence[FileSystemMatchRule], 

993 into: frozenset[BinaryPackage], 

994 definition_source: str, 

995 condition: ManifestCondition | None, 

996 ) -> "InstallRule": 

997 cond: ManifestCondition = _BUILD_DOCS_BDO 

998 if condition is not None: 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true

999 cond = ManifestCondition.all_of([cond, condition]) 

1000 return GenericInstallationRule( 

1001 sources, 

1002 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)], 

1003 into, 

1004 cond, 

1005 definition_source, 

1006 ) 

1007 

1008 @classmethod 

1009 def install_man( 

1010 cls, 

1011 sources: Sequence[FileSystemMatchRule], 

1012 into: frozenset[BinaryPackage], 

1013 section: int | None, 

1014 language: str | None, 

1015 definition_source: str, 

1016 condition: ManifestCondition | None, 

1017 ) -> "InstallRule": 

1018 cond: ManifestCondition = _BUILD_DOCS_BDO 

1019 if condition is not None: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 cond = ManifestCondition.all_of([cond, condition]) 

1021 

1022 dest_path_computer = _dest_path_for_manpage( 

1023 section, language, definition_source 

1024 ) 

1025 

1026 return GenericInstallationRule( 

1027 sources, 

1028 dest_path_computer, 

1029 into, 

1030 cond, 

1031 definition_source, 

1032 match_filter=lambda m: not m.is_file, 

1033 ) 

1034 

1035 @classmethod 

1036 def discard_paths( 

1037 cls, 

1038 paths: Sequence[FileSystemMatchRule], 

1039 definition_source: str, 

1040 condition: ManifestCondition | None, 

1041 *, 

1042 limit_to: Sequence[FileSystemExactMatchRule] | None = None, 

1043 ) -> "InstallRule": 

1044 return DiscardRule( 

1045 paths, 

1046 condition, 

1047 tuple(limit_to) if limit_to is not None else tuple(), 

1048 definition_source, 

1049 ) 

1050 

1051 

1052class PPFInstallRule(InstallRule): 

1053 __slots__ = ( 

1054 "_ppfs", 

1055 "_substitution", 

1056 "_into", 

1057 ) 

1058 

1059 # noinspection PyMissingConstructor 

1060 def __init__( 

1061 self, 

1062 into: BinaryPackage, 

1063 substitution: Substitution, 

1064 ppfs: Sequence["PackagerProvidedFile"], 

1065 ) -> None: 

1066 run_in_context_of_plugin( 

1067 "debputy", 

1068 super().__init__, 

1069 None, 

1070 "<built-in; PPF install rule>", 

1071 ) 

1072 self._substitution = substitution 

1073 self._ppfs = ppfs 

1074 self._into = into 

1075 

1076 def perform_install( 

1077 self, 

1078 path_matcher: SourcePathMatcher, 

1079 install_context: InstallRuleContext, 

1080 condition_context: ConditionContext, 

1081 ) -> None: 

1082 binary_install_context = install_context[self._into.name] 

1083 fs_root = binary_install_context.fs_root 

1084 for ppf in self._ppfs: 

1085 source_path = ppf.path.fs_path 

1086 dest_dir, name = ppf.compute_dest() 

1087 dir_path = fs_root.mkdirs(dest_dir) 

1088 

1089 dir_path.insert_file_from_fs_path( 

1090 name, 

1091 source_path, 

1092 follow_symlinks=True, 

1093 use_fs_path_mode=False, 

1094 mode=ppf.definition.default_mode, 

1095 ) 

1096 

1097 

1098class GenericInstallationRule(InstallRule): 

1099 __slots__ = ( 

1100 "_sources", 

1101 "_into", 

1102 "_dest_paths", 

1103 "_require_single_match", 

1104 ) 

1105 

1106 def __init__( 

1107 self, 

1108 sources: Sequence[FileSystemMatchRule], 

1109 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

1110 into: frozenset[BinaryPackage], 

1111 condition: ManifestCondition | None, 

1112 definition_source: str, 

1113 *, 

1114 require_single_match: bool = False, 

1115 match_filter: Callable[["VirtualPath"], bool] | None = None, 

1116 ) -> None: 

1117 super().__init__( 

1118 condition, 

1119 definition_source, 

1120 match_filter=match_filter, 

1121 ) 

1122 self._sources = sources 

1123 self._into = into 

1124 self._dest_paths = dest_paths 

1125 self._require_single_match = require_single_match 

1126 if self._require_single_match and len(sources) != 1: 1126 ↛ 1127line 1126 didn't jump to line 1127 because the condition on line 1126 was never true

1127 raise ValueError("require_single_match implies sources must have len 1") 

1128 

1129 def perform_install( 

1130 self, 

1131 path_matcher: SourcePathMatcher, 

1132 install_context: InstallRuleContext, 

1133 condition_context: ConditionContext, 

1134 ) -> None: 

1135 for source in self._sources: 

1136 matches = self._match_pattern( 

1137 path_matcher, 

1138 source, 

1139 condition_context, 

1140 install_context.search_dirs, 

1141 self._into, 

1142 ) 

1143 if self._require_single_match and len(matches) > 1: 

1144 self._check_single_match(source, matches) 

1145 self._install_matches( 

1146 path_matcher, 

1147 matches, 

1148 self._dest_paths, 

1149 install_context, 

1150 self._into, 

1151 condition_context, 

1152 ) 

1153 

1154 

1155class DiscardRule(InstallRule): 

1156 __slots__ = ("_fs_match_rules", "_limit_to") 

1157 

1158 def __init__( 

1159 self, 

1160 fs_match_rules: Sequence[FileSystemMatchRule], 

1161 condition: ManifestCondition | None, 

1162 limit_to: Sequence[FileSystemExactMatchRule], 

1163 definition_source: str, 

1164 ) -> None: 

1165 super().__init__(condition, definition_source) 

1166 self._fs_match_rules = fs_match_rules 

1167 self._limit_to = limit_to 

1168 

1169 def perform_install( 

1170 self, 

1171 path_matcher: SourcePathMatcher, 

1172 install_context: InstallRuleContext, 

1173 condition_context: ConditionContext, 

1174 ) -> None: 

1175 into = frozenset() 

1176 limit_to = self._limit_to 

1177 if limit_to: 

1178 matches = {x.match_rule.path for x in limit_to} 

1179 search_dirs = tuple( 

1180 s 

1181 for s in install_context.search_dirs 

1182 if s.search_dir.fs_path in matches 

1183 ) 

1184 if len(limit_to) != len(search_dirs): 

1185 m = matches.difference(s.search_dir.fs_path for s in search_dirs) 

1186 paths = ":".join(m) 

1187 _error( 

1188 f"The discard rule defined at {self._definition_source} mentions the following" 

1189 f" search directories that were not known to debputy: {paths}." 

1190 " Either the search dir is missing somewhere else or it should be removed from" 

1191 " the discard rule." 

1192 ) 

1193 else: 

1194 search_dirs = install_context.search_dirs 

1195 

1196 for fs_match_rule in self._fs_match_rules: 

1197 self._match_pattern( 

1198 path_matcher, 

1199 fs_match_rule, 

1200 condition_context, 

1201 search_dirs, 

1202 into, 

1203 )