Coverage for src/debputy/installations.py: 69%

514 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-04-14 21:38 +0000

1import collections 

2import dataclasses 

3import os.path 

4import re 

5from enum import IntEnum 

6from typing import TYPE_CHECKING, cast, Any 

7from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping 

8 

9from debputy.exceptions import DebputyRuntimeError 

10from debputy.filesystem_scan import InMemoryVirtualPathBase 

11from debputy.manifest_conditions import ( 

12 ConditionContext, 

13 ManifestCondition, 

14 _BUILD_DOCS_BDO, 

15) 

16from debputy.manifest_parser.base_types import ( 

17 FileSystemMatchRule, 

18 FileSystemExactMatchRule, 

19) 

20from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

21from debputy.packages import BinaryPackage 

22from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING 

23from debputy.plugin.plugin_state import run_in_context_of_plugin 

24from debputy.substitution import Substitution 

25from debputy.util import _error, _warn 

26 

27if TYPE_CHECKING: 

28 from debputy.packager_provided_files import PackagerProvidedFile 

29 from debputy.plugin.api import VirtualPath 

30 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule 

31 

32 

33_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?') 

34_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)") 

35_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$") 

36_MAN_REAL_SECTION = re.compile(r"^(\d+)") 

37_MAN_INST_BASENAME = re.compile(r"[.][^.]+$") 

38MAN_GUESS_LANG_FROM_PATH = re.compile( 

39 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/" 

40) 

41MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)") 

42 

43 

44class InstallRuleError(DebputyRuntimeError): 

45 pass 

46 

47 

48class PathAlreadyInstalledOrDiscardedError(InstallRuleError): 

49 @property 

50 def path(self) -> str: 

51 return cast("str", self.args[0]) 

52 

53 @property 

54 def into(self) -> frozenset[BinaryPackage]: 

55 return cast(frozenset[BinaryPackage], self.args[1]) 

56 

57 @property 

58 def definition_source(self) -> str: 

59 return cast("str", self.args[2]) 

60 

61 

62class ExactPathMatchTwiceError(InstallRuleError): 

63 @property 

64 def path(self) -> str: 

65 return cast("str", self.args[1]) 

66 

67 @property 

68 def into(self) -> BinaryPackage: 

69 return cast("BinaryPackage", self.args[2]) 

70 

71 @property 

72 def definition_source(self) -> str: 

73 return cast("str", self.args[3]) 

74 

75 

76class NoMatchForInstallPatternError(InstallRuleError): 

77 @property 

78 def pattern(self) -> str: 

79 return cast("str", self.args[1]) 

80 

81 @property 

82 def search_dirs(self) -> Sequence["SearchDir"]: 

83 return cast("Sequence[SearchDir]", self.args[2]) 

84 

85 @property 

86 def definition_source(self) -> str: 

87 return cast("str", self.args[3]) 

88 

89 

90@dataclasses.dataclass(slots=True, frozen=True) 

91class SearchDir: 

92 search_dir: "VirtualPath" 

93 applies_to: frozenset[BinaryPackage] 

94 

95 

96@dataclasses.dataclass(slots=True, frozen=True) 

97class BinaryPackageInstallRuleContext: 

98 binary_package: BinaryPackage 

99 fs_root: InMemoryVirtualPathBase 

100 doc_main_package: BinaryPackage 

101 

102 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext": 

103 return dataclasses.replace(self, **changes) 

104 

105 

106@dataclasses.dataclass(slots=True, frozen=True) 

107class InstallSearchDirContext: 

108 search_dirs: Sequence[SearchDir] 

109 check_for_uninstalled_dirs: Sequence["VirtualPath"] 

110 # TODO: Support search dirs per-package 

111 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field( 

112 default_factory=dict 

113 ) 

114 

115 

116@dataclasses.dataclass(slots=True) 

117class InstallRuleContext: 

118 # TODO: Search dirs should be per-package 

119 search_dirs: Sequence[SearchDir] 

120 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = ( 

121 dataclasses.field(default_factory=dict) 

122 ) 

123 

124 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext: 

125 return self.binary_package_contexts[item] 

126 

127 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None: 

128 self.binary_package_contexts[key] = value 

129 

130 def replace(self, **changes: Any) -> "InstallRuleContext": 

131 return dataclasses.replace(self, **changes) 

132 

133 

134@dataclasses.dataclass(slots=True, frozen=True) 

135class PathMatch: 

136 path: "VirtualPath" 

137 search_dir: "VirtualPath" 

138 is_exact_match: bool 

139 into: frozenset[BinaryPackage] 

140 

141 

142class DiscardState(IntEnum): 

143 UNCHECKED = 0 

144 NOT_DISCARDED = 1 

145 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2 

146 DISCARDED_BY_MANIFEST_RULE = 3 

147 

148 

149def _determine_manpage_section( 

150 match_rule: PathMatch, 

151 provided_section: int | None, 

152 definition_source: str, 

153) -> str | None: 

154 section = str(provided_section) if provided_section is not None else None 

155 if section is None: 

156 detected_section = None 

157 with open(match_rule.path.fs_path) as fd: 

158 for line in fd: 

159 if not line.startswith((".TH", ".Dt")): 

160 continue 

161 

162 m = _MAN_DT_LINE.match(line) 

163 if not m: 

164 m = _MAN_TH_LINE.match(line) 

165 if not m: 

166 continue 

167 detected_section = m.group(1) 

168 if "." in detected_section: 

169 _warn( 

170 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}" 

171 f" (detected via {definition_source}): It looks too much like a version" 

172 ) 

173 detected_section = None 

174 break 

175 if detected_section is None: 

176 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path)) 

177 if m: 

178 detected_section = m.group(1) 

179 section = detected_section 

180 

181 return section 

182 

183 

184def _determine_manpage_real_section( 

185 match_rule: PathMatch, 

186 section: str | None, 

187 definition_source: str, 

188) -> int: 

189 real_section = None 

190 if section is not None: 

191 m = _MAN_REAL_SECTION.match(section) 

192 if m: 

193 real_section = int(m.group(1)) 

194 if real_section is None or real_section < 0 or real_section > 9: 

195 if real_section is not None: 

196 _warn( 

197 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section})," 

198 f" which is not a valid section (must be between 1 and 9 incl.)" 

199 ) 

200 _error( 

201 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page" 

202 f" was detected via {definition_source}. Consider using `section: <number>` to" 

203 " explicitly declare the section. Keep in mind that it applies to all man pages for that" 

204 " rule and you may have to split the rule into two for this reason." 

205 ) 

206 return real_section 

207 

208 

209def _determine_manpage_language( 

210 match_rule: PathMatch, 

211 provided_language: str | None, 

212) -> str | None: 

213 if provided_language is not None: 

214 if provided_language not in ("derive-from-basename", "derive-from-path"): 

215 return provided_language if provided_language != "C" else None 

216 if provided_language == "derive-from-basename": 

217 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name) 

218 if m is None: 

219 return None 

220 return m.group(1) 

221 # Fall-through for derive-from-path case 

222 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path) 

223 if m is None: 

224 return None 

225 return m.group(1) 

226 

227 

228def _dest_path_for_manpage( 

229 provided_section: int | None, 

230 provided_language: str | None, 

231 definition_source: str, 

232) -> Callable[["PathMatch"], str]: 

233 def _manpage_dest_path(match_rule: PathMatch) -> str: 

234 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name) 

235 section = _determine_manpage_section( 

236 match_rule, provided_section, definition_source 

237 ) 

238 real_section = _determine_manpage_real_section( 

239 match_rule, section, definition_source 

240 ) 

241 assert section is not None 

242 language = _determine_manpage_language(match_rule, provided_language) 

243 if language is None: 

244 maybe_language = "" 

245 else: 

246 maybe_language = f"{language}/" 

247 lang_suffix = f".{language}" 

248 if inst_basename.endswith(lang_suffix): 

249 inst_basename = inst_basename[: -len(lang_suffix)] 

250 

251 return ( 

252 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}" 

253 ) 

254 

255 return _manpage_dest_path 

256 

257 

258class SourcePathMatcher: 

259 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None: 

260 self._already_matched: dict[ 

261 str, 

262 tuple[frozenset[BinaryPackage], str], 

263 ] = {} 

264 self._exact_match_request: set[tuple[str, str]] = set() 

265 self._discarded: dict[str, DiscardState] = {} 

266 self._auto_discard_rules = auto_discard_rules 

267 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set) 

268 

269 def is_reserved(self, path: "VirtualPath") -> bool: 

270 fs_path = path.fs_path 

271 if fs_path in self._already_matched: 

272 return True 

273 result = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

274 if result == DiscardState.UNCHECKED: 

275 result = self._check_plugin_provided_exclude_state_for(path) 

276 if result == DiscardState.NOT_DISCARDED: 

277 return False 

278 

279 return True 

280 

281 def exclude(self, path: str) -> None: 

282 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE 

283 

284 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool: 

285 for dr in self._auto_discard_rules: 

286 verdict = dr.should_discard(path) 

287 if verdict: 

288 self.used_auto_discard_rules[dr.name].add(path.fs_path) 

289 return True 

290 return False 

291 

292 def _check_plugin_provided_exclude_state_for( 

293 self, 

294 path: "VirtualPath", 

295 ) -> DiscardState: 

296 cache_misses = [] 

297 current_path = path 

298 while True: 

299 fs_path = current_path.fs_path 

300 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED) 

301 if exclude_state != DiscardState.UNCHECKED: 

302 verdict = exclude_state 

303 break 

304 cache_misses.append(fs_path) 

305 if self._run_plugin_provided_discard_rules_on(current_path): 

306 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

307 break 

308 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could 

309 # be excluded without the files in it triggering the rule). 

310 if current_path.is_root_dir(): 

311 verdict = DiscardState.NOT_DISCARDED 

312 break 

313 parent_dir = current_path.parent_dir 

314 assert parent_dir is not None # type hint 

315 current_path = parent_dir 

316 if cache_misses: 316 ↛ 319line 316 didn't jump to line 319 because the condition on line 316 was always true

317 for p in cache_misses: 

318 self._discarded[p] = verdict 

319 return verdict 

320 

321 def may_match( 

322 self, 

323 match: PathMatch, 

324 *, 

325 is_exact_match: bool = False, 

326 ) -> tuple[frozenset[BinaryPackage], bool]: 

327 m = self._already_matched.get(match.path.fs_path) 

328 if m: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 return m[0], False 

330 current_path = match.path.fs_path 

331 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED) 

332 

333 if discard_state == DiscardState.UNCHECKED: 

334 discard_state = self._check_plugin_provided_exclude_state_for(match.path) 

335 

336 assert discard_state is not None and discard_state != DiscardState.UNCHECKED 

337 

338 is_discarded = discard_state != DiscardState.NOT_DISCARDED 

339 if ( 

340 is_exact_match 

341 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE 

342 ): 

343 is_discarded = False 

344 return frozenset(), is_discarded 

345 

346 def reserve( 

347 self, 

348 path: "VirtualPath", 

349 reserved_by: frozenset[BinaryPackage], 

350 definition_source: str, 

351 *, 

352 is_exact_match: bool = False, 

353 ) -> None: 

354 fs_path = path.fs_path 

355 self._already_matched[fs_path] = reserved_by, definition_source 

356 if not is_exact_match: 356 ↛ 358line 356 didn't jump to line 358 because the condition on line 356 was always true

357 return 

358 for pkg in reserved_by: 

359 m_key = (pkg.name, fs_path) 

360 self._exact_match_request.add(m_key) 

361 try: 

362 del self._discarded[fs_path] 

363 except KeyError: 

364 pass 

365 for discarded_paths in self.used_auto_discard_rules.values(): 

366 discarded_paths.discard(fs_path) 

367 

368 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]: 

369 stack = list(search_dir.iterdir()) 

370 while stack: 

371 m = stack.pop() 

372 if m.is_dir: 

373 s_len = len(stack) 

374 stack.extend(m.iterdir()) 

375 

376 if s_len == len(stack) and not self.is_reserved(m): 376 ↛ 378line 376 didn't jump to line 378 because the condition on line 376 was never true

377 # "Explicitly" empty dir 

378 yield m 

379 elif not self.is_reserved(m): 

380 yield m 

381 

382 def find_and_reserve_all_matches( 

383 self, 

384 match_rule: MatchRule, 

385 search_dirs: Sequence[SearchDir], 

386 dir_only_match: bool, 

387 match_filter: Callable[["VirtualPath"], bool] | None, 

388 reserved_by: frozenset[BinaryPackage], 

389 definition_source: str, 

390 ) -> tuple[list[PathMatch], tuple[int, ...]]: 

391 matched = [] 

392 already_installed_paths = 0 

393 already_excluded_paths = 0 

394 glob_expand = not isinstance(match_rule, ExactFileSystemPath) 

395 

396 for match in _resolve_path( 

397 match_rule, 

398 search_dirs, 

399 dir_only_match, 

400 match_filter, 

401 reserved_by, 

402 ): 

403 installed_into, excluded = self.may_match( 

404 match, is_exact_match=not glob_expand 

405 ) 

406 if installed_into: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true

407 if glob_expand: 

408 already_installed_paths += 1 

409 continue 

410 packages = ", ".join(p.name for p in installed_into) 

411 raise PathAlreadyInstalledOrDiscardedError( 

412 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.' 

413 f" The definition that triggered this issue is {definition_source}.", 

414 match, 

415 installed_into, 

416 definition_source, 

417 ) 

418 if excluded: 

419 if glob_expand: 419 ↛ 422line 419 didn't jump to line 422 because the condition on line 419 was always true

420 already_excluded_paths += 1 

421 continue 

422 raise PathAlreadyInstalledOrDiscardedError( 

423 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it' 

424 f" above the exclusion rule that excluded it. The definition that triggered this" 

425 f" issue is {definition_source}.", 

426 match, 

427 installed_into, 

428 definition_source, 

429 ) 

430 if not glob_expand: 

431 for pkg in match.into: 

432 m_key = (pkg.name, match.path.fs_path) 

433 if m_key in self._exact_match_request: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 raise ExactPathMatchTwiceError( 

435 f'The path "{match.path.fs_path}" (via exact match) has already been installed' 

436 f" into {pkg.name}. The second installation triggered by {definition_source}", 

437 match.path, 

438 pkg, 

439 definition_source, 

440 ) 

441 self._exact_match_request.add(m_key) 

442 

443 if reserved_by: 443 ↛ 449line 443 didn't jump to line 449 because the condition on line 443 was always true

444 self._already_matched[match.path.fs_path] = ( 

445 match.into, 

446 definition_source, 

447 ) 

448 else: 

449 self.exclude(match.path.fs_path) 

450 matched.append(match) 

451 exclude_counts = already_installed_paths, already_excluded_paths 

452 return matched, exclude_counts 

453 

454 

455def _resolve_path( 

456 match_rule: MatchRule, 

457 search_dirs: Iterable["SearchDir"], 

458 dir_only_match: bool, 

459 match_filter: Callable[["VirtualPath"], bool] | None, 

460 into: frozenset[BinaryPackage], 

461) -> Iterator[PathMatch]: 

462 missing_matches = set(into) 

463 for sdir in search_dirs: 

464 matched = False 

465 if into and missing_matches.isdisjoint(sdir.applies_to): 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was never true

466 # All the packages, where this search dir applies, already got a match 

467 continue 

468 applicable = sdir.applies_to & missing_matches 

469 for matched_path in match_rule.finditer( 

470 sdir.search_dir, 

471 ignore_paths=match_filter, 

472 ): 

473 if dir_only_match and not matched_path.is_dir: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true

474 continue 

475 if matched_path.is_root_dir(): 

476 if match_rule is MATCH_ANYTHING: 476 ↛ 478line 476 didn't jump to line 478 because the condition on line 476 was always true

477 continue 

478 _error( 

479 f"The pattern {match_rule.describe_match_short()} matched the root dir." 

480 ) 

481 yield PathMatch(matched_path, sdir.search_dir, False, applicable) 

482 matched = True 

483 # continue; we want to match everything we can from this search directory. 

484 

485 if matched: 

486 missing_matches -= applicable 

487 if into and not missing_matches: 

488 # For install rules, we can stop as soon as all packages had a match 

489 # For discard rules, all search directories must be visited. Otherwise, 

490 # you would have to repeat the discard rule once per search dir to be 

491 # sure something is fully discarded 

492 break 

493 

494 

495def _resolve_dest_paths( 

496 match: PathMatch, 

497 dest_paths: Sequence[tuple[str, bool]], 

498 install_context: "InstallRuleContext", 

499) -> Sequence[tuple[str, "InMemoryVirtualPathBase"]]: 

500 dest_and_roots = [] 

501 for dest_path, dest_path_is_format in dest_paths: 

502 if dest_path_is_format: 

503 for pkg in match.into: 

504 parent_dir = match.path.parent_dir 

505 pkg_install_context = install_context[pkg.name] 

506 fs_root = pkg_install_context.fs_root 

507 dpath = dest_path.format( 

508 basename=match.path.name, 

509 dirname=parent_dir.path if parent_dir is not None else "", 

510 package_name=pkg.name, 

511 doc_main_package_name=pkg_install_context.doc_main_package.name, 

512 ) 

513 if dpath.endswith("/"): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true

514 raise ValueError( 

515 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended' 

516 f' with "/" ("{dest_path}"), which it must not!' 

517 ) 

518 dest_and_roots.append((dpath, fs_root)) 

519 else: 

520 if dest_path.endswith("/"): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 raise ValueError( 

522 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),' 

523 " which it must not!" 

524 ) 

525 dest_and_roots.extend( 

526 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into 

527 ) 

528 return dest_and_roots 

529 

530 

531def _resolve_matches( 

532 matches: list[PathMatch], 

533 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

534 install_context: "InstallRuleContext", 

535) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "InMemoryVirtualPathBase"]]]]: 

536 dest_and_roots: Sequence[tuple[str, "InMemoryVirtualPathBase"]] 

537 if callable(dest_paths): 

538 compute_dest_path = dest_paths 

539 for match in matches: 

540 dpath = compute_dest_path(match) 

541 if dpath.endswith("/"): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 raise ValueError( 

543 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!' 

544 ) 

545 dest_and_roots = [ 

546 (dpath, install_context[pkg.name].fs_root) for pkg in match.into 

547 ] 

548 yield match, dest_and_roots 

549 else: 

550 for match in matches: 

551 dest_and_roots = _resolve_dest_paths( 

552 match, 

553 dest_paths, 

554 install_context, 

555 ) 

556 yield match, dest_and_roots 

557 

558 

559class InstallRule(DebputyDispatchableType): 

560 __slots__ = ( 

561 "_already_matched", 

562 "_exact_match_request", 

563 "_condition", 

564 "_match_filter", 

565 "_definition_source", 

566 ) 

567 

568 def __init__( 

569 self, 

570 condition: ManifestCondition | None, 

571 definition_source: str, 

572 *, 

573 match_filter: Callable[["VirtualPath"], bool] | None = None, 

574 ) -> None: 

575 super().__init__() 

576 self._condition = condition 

577 self._definition_source = definition_source 

578 self._match_filter = match_filter 

579 

580 def _check_single_match( 

581 self, source: FileSystemMatchRule, matches: list[PathMatch] 

582 ) -> None: 

583 seen_pkgs = set[BinaryPackage]() 

584 for m in matches: 

585 problem_pkgs = seen_pkgs & m.into 

586 if problem_pkgs: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true

587 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs)) 

588 _error( 

589 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.' 

590 " However, it should match exactly one item. Please tighten the pattern defined" 

591 f" in {self._definition_source}." 

592 ) 

593 seen_pkgs.update(m.into) 

594 

595 def _match_pattern( 

596 self, 

597 path_matcher: SourcePathMatcher, 

598 fs_match_rule: FileSystemMatchRule, 

599 condition_context: ConditionContext, 

600 search_dirs: Sequence[SearchDir], 

601 into: frozenset[BinaryPackage], 

602 ) -> list[PathMatch]: 

603 matched, exclude_counts = path_matcher.find_and_reserve_all_matches( 

604 fs_match_rule.match_rule, 

605 search_dirs, 

606 fs_match_rule.raw_match_rule.endswith("/"), 

607 self._match_filter, 

608 into, 

609 self._definition_source, 

610 ) 

611 

612 already_installed_paths, already_excluded_paths = exclude_counts 

613 

614 if into: 614 ↛ 619line 614 didn't jump to line 619 because the condition on line 614 was always true

615 allow_empty_match = all(not p.should_be_acted_on for p in into) 

616 else: 

617 # discard rules must match provided at least one search dir exist. If none of them 

618 # exist, then we assume the discard rule is for a package that will not be built 

619 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs) 

620 if self._condition is not None and not self._condition.evaluate( 620 ↛ 623line 620 didn't jump to line 623 because the condition on line 620 was never true

621 condition_context 

622 ): 

623 allow_empty_match = True 

624 

625 if not matched and not allow_empty_match: 

626 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs) 

627 if already_excluded_paths and already_installed_paths: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true

628 total_paths = already_excluded_paths + already_installed_paths 

629 msg = ( 

630 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

631 f" {total_paths} path(s) already been matched previously either by install or" 

632 f" exclude rules. If you wanted to install some of these paths into multiple" 

633 f" packages, please tweak the definition that installed them to install them" 

634 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

635 f" If you wanted to install these paths and exclude rules are getting in your" 

636 f" way, then please move this install rule before the exclusion rule that causes" 

637 f" issue or, in case of built-in excludes, list the paths explicitly (without" 

638 f" using patterns). Source for this issue is {self._definition_source}. Match rule:" 

639 f" {fs_match_rule.match_rule.describe_match_exact()}" 

640 ) 

641 elif already_excluded_paths: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 msg = ( 

643 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

644 f" {already_excluded_paths} path(s) that have been excluded." 

645 " If you wanted to install some of these paths, please move the install rule" 

646 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly" 

647 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:" 

648 f" {fs_match_rule.match_rule.describe_match_exact()}" 

649 ) 

650 elif already_installed_paths: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 msg = ( 

652 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring" 

653 f" {already_installed_paths} path(s) already been matched previously." 

654 " If you wanted to install some of these paths into multiple packages," 

655 f" please tweak the definition that installed them to install them into" 

656 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".' 

657 f" Source for this issue is {self._definition_source}. Match rule:" 

658 f" {fs_match_rule.match_rule.describe_match_exact()}" 

659 ) 

660 else: 

661 # TODO: Try harder to find the match and point out possible typos 

662 msg = ( 

663 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:" 

664 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}" 

665 ) 

666 raise NoMatchForInstallPatternError( 

667 msg, 

668 fs_match_rule, 

669 search_dirs, 

670 self._definition_source, 

671 ) 

672 return matched 

673 

674 def _install_matches( 

675 self, 

676 path_matcher: SourcePathMatcher, 

677 matches: list[PathMatch], 

678 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

679 install_context: "InstallRuleContext", 

680 into: frozenset[BinaryPackage], 

681 condition_context: ConditionContext, 

682 ) -> None: 

683 if ( 

684 self._condition is not None 

685 and not self._condition.evaluate(condition_context) 

686 ) or not any(p.should_be_acted_on for p in into): 

687 # Rule is disabled; skip all its actions - also allow empty matches 

688 # for this particular case. Though, we do go through the match and 

689 # ensure all paths are reserved correctly such that the behavior 

690 # between `binary-arch` vs `binary-indep` vs `binary` is consistent 

691 for match in matches: 

692 self._reserve_recursively( 

693 path_matcher, 

694 match, 

695 into, 

696 ) 

697 

698 return 

699 

700 if not matches: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true

701 raise ValueError("matches must not be empty") 

702 

703 for match, dest_paths_and_roots in _resolve_matches( 

704 matches, 

705 dest_paths, 

706 install_context, 

707 ): 

708 install_recursively_into_dirs = [] 

709 for dest, fs_root in dest_paths_and_roots: 

710 dir_part, basename = os.path.split(dest) 

711 # We do not associate these with the FS path. First off, 

712 # it is complicated to do in most cases (indeed, debhelper 

713 # does not preserve these directories either) and secondly, 

714 # it is "only" mtime and mode - mostly irrelevant as the 

715 # directory is 99.9% likely to be 0755 (we are talking 

716 # directories like "/usr", "/usr/share"). 

717 dir_path = fs_root.mkdirs(dir_part) 

718 existing_path = dir_path.get(basename) 

719 

720 if match.path.is_dir: 

721 if existing_path is not None and not existing_path.is_dir: 721 ↛ 722line 721 didn't jump to line 722 because the condition on line 721 was never true

722 existing_path.unlink() 

723 existing_path = None 

724 current_dir = existing_path 

725 

726 if current_dir is None: 726 ↛ 730line 726 didn't jump to line 730 because the condition on line 726 was always true

727 current_dir = dir_path.mkdir( 

728 basename, reference_path=match.path 

729 ) 

730 install_recursively_into_dirs.append(current_dir) 

731 else: 

732 if existing_path is not None and existing_path.is_dir: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 _error( 

734 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist" 

735 f" and is a directory. This error was triggered via {self._definition_source}." 

736 ) 

737 

738 if match.path.is_symlink: 

739 dir_path.add_symlink( 

740 basename, match.path.readlink(), reference_path=match.path 

741 ) 

742 else: 

743 dir_path.insert_file_from_fs_path( 

744 basename, 

745 match.path.fs_path, 

746 follow_symlinks=False, 

747 use_fs_path_mode=True, 

748 reference_path=match.path, 

749 ) 

750 if install_recursively_into_dirs: 

751 self._install_dir_recursively( 

752 path_matcher, 

753 install_recursively_into_dirs, 

754 match, 

755 into, 

756 ) 

757 

758 def _reserve_recursively( 

759 self, 

760 path_matcher: SourcePathMatcher, 

761 match: PathMatch, 

762 into: frozenset[BinaryPackage], 

763 ) -> None: 

764 direct_matched_path = match.path 

765 path_matcher.reserve( 

766 direct_matched_path, 

767 into, 

768 self._definition_source, 

769 is_exact_match=match.is_exact_match, 

770 ) 

771 if not direct_matched_path.is_dir: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true

772 return 

773 stack = list(direct_matched_path.iterdir()) 

774 while stack: 

775 current_path = stack.pop() 

776 path_matcher.reserve( 

777 current_path, 

778 into, 

779 self._definition_source, 

780 is_exact_match=False, 

781 ) 

782 if current_path.is_dir: 

783 stack.extend(current_path.iterdir()) 

784 

785 def _install_dir_recursively( 

786 self, 

787 path_matcher: SourcePathMatcher, 

788 parent_dirs: Sequence[InMemoryVirtualPathBase], 

789 match: PathMatch, 

790 into: frozenset[BinaryPackage], 

791 ) -> None: 

792 stack = [ 

793 (parent_dirs, e) 

794 for e in match.path.iterdir() 

795 if not path_matcher.is_reserved(e) 

796 ] 

797 

798 while stack: 

799 current_dirs, dir_entry = stack.pop() 

800 path_matcher.reserve( 

801 dir_entry, 

802 into, 

803 self._definition_source, 

804 is_exact_match=False, 

805 ) 

806 if dir_entry.is_dir: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true

807 new_dirs = [ 

808 d.mkdir(dir_entry.name, reference_path=dir_entry) 

809 for d in current_dirs 

810 ] 

811 stack.extend( 

812 (new_dirs, de) 

813 for de in dir_entry.iterdir() 

814 if not path_matcher.is_reserved(de) 

815 ) 

816 elif dir_entry.is_symlink: 

817 for current_dir in current_dirs: 

818 current_dir.add_symlink( 

819 dir_entry.name, 

820 dir_entry.readlink(), 

821 reference_path=dir_entry, 

822 ) 

823 elif dir_entry.is_file: 823 ↛ 833line 823 didn't jump to line 833 because the condition on line 823 was always true

824 for current_dir in current_dirs: 

825 current_dir.insert_file_from_fs_path( 

826 dir_entry.name, 

827 dir_entry.fs_path, 

828 use_fs_path_mode=True, 

829 follow_symlinks=False, 

830 reference_path=dir_entry, 

831 ) 

832 else: 

833 _error( 

834 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink" 

835 ) 

836 

837 def perform_install( 

838 self, 

839 path_matcher: SourcePathMatcher, 

840 install_context: InstallRuleContext, 

841 condition_context: ConditionContext, 

842 ) -> None: 

843 raise NotImplementedError 

844 

845 @classmethod 

846 def install_as( 

847 cls, 

848 source: FileSystemMatchRule, 

849 dest_path: str, 

850 into: frozenset[BinaryPackage], 

851 definition_source: str, 

852 condition: ManifestCondition | None, 

853 ) -> "InstallRule": 

854 return GenericInstallationRule( 

855 [source], 

856 [(dest_path, False)], 

857 into, 

858 condition, 

859 definition_source, 

860 require_single_match=True, 

861 ) 

862 

863 @classmethod 

864 def install_dest( 

865 cls, 

866 sources: Sequence[FileSystemMatchRule], 

867 dest_dir: str | None, 

868 into: frozenset[BinaryPackage], 

869 definition_source: str, 

870 condition: ManifestCondition | None, 

871 ) -> "InstallRule": 

872 if dest_dir is None: 

873 dest_dir = "{dirname}/{basename}" 

874 else: 

875 dest_dir = os.path.join(dest_dir, "{basename}") 

876 return GenericInstallationRule( 

877 sources, 

878 [(dest_dir, True)], 

879 into, 

880 condition, 

881 definition_source, 

882 ) 

883 

884 @classmethod 

885 def install_multi_as( 

886 cls, 

887 source: FileSystemMatchRule, 

888 dest_paths: Sequence[str], 

889 into: frozenset[BinaryPackage], 

890 definition_source: str, 

891 condition: ManifestCondition | None, 

892 ) -> "InstallRule": 

893 if len(dest_paths) < 2: 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true

894 raise ValueError( 

895 "Please use `install_as` when there is less than 2 dest path" 

896 ) 

897 dps = tuple((dp, False) for dp in dest_paths) 

898 return GenericInstallationRule( 

899 [source], 

900 dps, 

901 into, 

902 condition, 

903 definition_source, 

904 require_single_match=True, 

905 ) 

906 

907 @classmethod 

908 def install_multi_dest( 

909 cls, 

910 sources: Sequence[FileSystemMatchRule], 

911 dest_dirs: Sequence[str], 

912 into: frozenset[BinaryPackage], 

913 definition_source: str, 

914 condition: ManifestCondition | None, 

915 ) -> "InstallRule": 

916 if len(dest_dirs) < 2: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true

917 raise ValueError( 

918 "Please use `install_dest` when there is less than 2 dest dir" 

919 ) 

920 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs) 

921 return GenericInstallationRule( 

922 sources, 

923 dest_paths, 

924 into, 

925 condition, 

926 definition_source, 

927 ) 

928 

929 @classmethod 

930 def install_doc( 

931 cls, 

932 sources: Sequence[FileSystemMatchRule], 

933 dest_dir: str | None, 

934 into: frozenset[BinaryPackage], 

935 definition_source: str, 

936 condition: ManifestCondition | None, 

937 ) -> "InstallRule": 

938 cond: ManifestCondition = _BUILD_DOCS_BDO 

939 if condition is not None: 

940 cond = ManifestCondition.all_of([cond, condition]) 

941 if dest_dir is None: 

942 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}" 

943 return GenericInstallationRule( 

944 sources, 

945 [(dest_dir, True)], 

946 into, 

947 cond, 

948 definition_source, 

949 ) 

950 

951 def _dest_path_resolver(match: PathMatch) -> str: 

952 return os.path.join(dest_dir, match.path.name) 

953 

954 return GenericInstallationRule( 

955 sources, 

956 _dest_path_resolver, 

957 into, 

958 cond, 

959 definition_source, 

960 ) 

961 

962 @classmethod 

963 def install_doc_as( 

964 cls, 

965 source: FileSystemMatchRule, 

966 dest_path: str, 

967 into: frozenset[BinaryPackage], 

968 definition_source: str, 

969 condition: ManifestCondition | None, 

970 ) -> "InstallRule": 

971 cond: ManifestCondition = _BUILD_DOCS_BDO 

972 if condition is not None: 

973 cond = ManifestCondition.all_of([cond, condition]) 

974 

975 return GenericInstallationRule( 

976 [source], 

977 [(dest_path, False)], 

978 into, 

979 cond, 

980 definition_source, 

981 require_single_match=True, 

982 ) 

983 

984 @classmethod 

985 def install_examples( 

986 cls, 

987 sources: Sequence[FileSystemMatchRule], 

988 into: frozenset[BinaryPackage], 

989 definition_source: str, 

990 condition: ManifestCondition | None, 

991 ) -> "InstallRule": 

992 cond: ManifestCondition = _BUILD_DOCS_BDO 

993 if condition is not None: 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true

994 cond = ManifestCondition.all_of([cond, condition]) 

995 return GenericInstallationRule( 

996 sources, 

997 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)], 

998 into, 

999 cond, 

1000 definition_source, 

1001 ) 

1002 

1003 @classmethod 

1004 def install_man( 

1005 cls, 

1006 sources: Sequence[FileSystemMatchRule], 

1007 into: frozenset[BinaryPackage], 

1008 section: int | None, 

1009 language: str | None, 

1010 definition_source: str, 

1011 condition: ManifestCondition | None, 

1012 ) -> "InstallRule": 

1013 cond: ManifestCondition = _BUILD_DOCS_BDO 

1014 if condition is not None: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true

1015 cond = ManifestCondition.all_of([cond, condition]) 

1016 

1017 dest_path_computer = _dest_path_for_manpage( 

1018 section, language, definition_source 

1019 ) 

1020 

1021 return GenericInstallationRule( 

1022 sources, 

1023 dest_path_computer, 

1024 into, 

1025 cond, 

1026 definition_source, 

1027 match_filter=lambda m: not m.is_file, 

1028 ) 

1029 

1030 @classmethod 

1031 def discard_paths( 

1032 cls, 

1033 paths: Sequence[FileSystemMatchRule], 

1034 definition_source: str, 

1035 condition: ManifestCondition | None, 

1036 *, 

1037 limit_to: Sequence[FileSystemExactMatchRule] | None = None, 

1038 ) -> "InstallRule": 

1039 return DiscardRule( 

1040 paths, 

1041 condition, 

1042 tuple(limit_to) if limit_to is not None else tuple(), 

1043 definition_source, 

1044 ) 

1045 

1046 

1047class PPFInstallRule(InstallRule): 

1048 __slots__ = ( 

1049 "_ppfs", 

1050 "_substitution", 

1051 "_into", 

1052 ) 

1053 

1054 # noinspection PyMissingConstructor 

1055 def __init__( 

1056 self, 

1057 into: BinaryPackage, 

1058 substitution: Substitution, 

1059 ppfs: Sequence["PackagerProvidedFile"], 

1060 ) -> None: 

1061 run_in_context_of_plugin( 

1062 "debputy", 

1063 super().__init__, 

1064 None, 

1065 "<built-in; PPF install rule>", 

1066 ) 

1067 self._substitution = substitution 

1068 self._ppfs = ppfs 

1069 self._into = into 

1070 

1071 def perform_install( 

1072 self, 

1073 path_matcher: SourcePathMatcher, 

1074 install_context: InstallRuleContext, 

1075 condition_context: ConditionContext, 

1076 ) -> None: 

1077 binary_install_context = install_context[self._into.name] 

1078 fs_root = binary_install_context.fs_root 

1079 for ppf in self._ppfs: 

1080 source_path = ppf.path.fs_path 

1081 dest_dir, name = ppf.compute_dest() 

1082 dir_path = fs_root.mkdirs(dest_dir) 

1083 

1084 dir_path.insert_file_from_fs_path( 

1085 name, 

1086 source_path, 

1087 follow_symlinks=True, 

1088 use_fs_path_mode=False, 

1089 mode=ppf.definition.default_mode, 

1090 ) 

1091 

1092 

1093class GenericInstallationRule(InstallRule): 

1094 __slots__ = ( 

1095 "_sources", 

1096 "_into", 

1097 "_dest_paths", 

1098 "_require_single_match", 

1099 ) 

1100 

1101 def __init__( 

1102 self, 

1103 sources: Sequence[FileSystemMatchRule], 

1104 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str], 

1105 into: frozenset[BinaryPackage], 

1106 condition: ManifestCondition | None, 

1107 definition_source: str, 

1108 *, 

1109 require_single_match: bool = False, 

1110 match_filter: Callable[["VirtualPath"], bool] | None = None, 

1111 ) -> None: 

1112 super().__init__( 

1113 condition, 

1114 definition_source, 

1115 match_filter=match_filter, 

1116 ) 

1117 self._sources = sources 

1118 self._into = into 

1119 self._dest_paths = dest_paths 

1120 self._require_single_match = require_single_match 

1121 if self._require_single_match and len(sources) != 1: 1121 ↛ 1122line 1121 didn't jump to line 1122 because the condition on line 1121 was never true

1122 raise ValueError("require_single_match implies sources must have len 1") 

1123 

1124 def perform_install( 

1125 self, 

1126 path_matcher: SourcePathMatcher, 

1127 install_context: InstallRuleContext, 

1128 condition_context: ConditionContext, 

1129 ) -> None: 

1130 for source in self._sources: 

1131 matches = self._match_pattern( 

1132 path_matcher, 

1133 source, 

1134 condition_context, 

1135 install_context.search_dirs, 

1136 self._into, 

1137 ) 

1138 if self._require_single_match and len(matches) > 1: 

1139 self._check_single_match(source, matches) 

1140 self._install_matches( 

1141 path_matcher, 

1142 matches, 

1143 self._dest_paths, 

1144 install_context, 

1145 self._into, 

1146 condition_context, 

1147 ) 

1148 

1149 

1150class DiscardRule(InstallRule): 

1151 __slots__ = ("_fs_match_rules", "_limit_to") 

1152 

1153 def __init__( 

1154 self, 

1155 fs_match_rules: Sequence[FileSystemMatchRule], 

1156 condition: ManifestCondition | None, 

1157 limit_to: Sequence[FileSystemExactMatchRule], 

1158 definition_source: str, 

1159 ) -> None: 

1160 super().__init__(condition, definition_source) 

1161 self._fs_match_rules = fs_match_rules 

1162 self._limit_to = limit_to 

1163 

1164 def perform_install( 

1165 self, 

1166 path_matcher: SourcePathMatcher, 

1167 install_context: InstallRuleContext, 

1168 condition_context: ConditionContext, 

1169 ) -> None: 

1170 limit_to = self._limit_to 

1171 if limit_to: 

1172 matches = {x.match_rule.path for x in limit_to} 

1173 search_dirs: Sequence[SearchDir] = tuple( 

1174 s 

1175 for s in install_context.search_dirs 

1176 if s.search_dir.fs_path in matches 

1177 ) 

1178 if len(limit_to) != len(search_dirs): 

1179 m = matches.difference(s.search_dir.fs_path for s in search_dirs) 

1180 paths = ":".join(m) 

1181 _error( 

1182 f"The discard rule defined at {self._definition_source} mentions the following" 

1183 f" search directories that were not known to debputy: {paths}." 

1184 " Either the search dir is missing somewhere else or it should be removed from" 

1185 " the discard rule." 

1186 ) 

1187 else: 

1188 search_dirs = install_context.search_dirs 

1189 

1190 for fs_match_rule in self._fs_match_rules: 

1191 self._match_pattern( 

1192 path_matcher, 

1193 fs_match_rule, 

1194 condition_context, 

1195 search_dirs, 

1196 into=frozenset(), 

1197 )