Coverage for src/debputy/installations.py: 67%
516 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import collections
2import dataclasses
3import os.path
4import re
5from enum import IntEnum
6from typing import (
7 List,
8 Dict,
9 FrozenSet,
10 Union,
11 Tuple,
12 Set,
13 Optional,
14 TYPE_CHECKING,
15 cast,
16 Any,
17)
18from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping
20from debputy.exceptions import DebputyRuntimeError
21from debputy.filesystem_scan import FSPath
22from debputy.manifest_conditions import (
23 ConditionContext,
24 ManifestCondition,
25 _BUILD_DOCS_BDO,
26)
27from debputy.manifest_parser.base_types import (
28 FileSystemMatchRule,
29 FileSystemExactMatchRule,
30)
31from debputy.manifest_parser.tagging_types import DebputyDispatchableType
32from debputy.packages import BinaryPackage
33from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
34from debputy.plugin.plugin_state import run_in_context_of_plugin
35from debputy.substitution import Substitution
36from debputy.util import _error, _warn
38if TYPE_CHECKING:
39 from debputy.packager_provided_files import PackagerProvidedFile
40 from debputy.plugin.api import VirtualPath
41 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
44_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
45_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
46_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
47_MAN_REAL_SECTION = re.compile(r"^(\d+)")
48_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
49MAN_GUESS_LANG_FROM_PATH = re.compile(
50 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
51)
52MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
55class InstallRuleError(DebputyRuntimeError):
56 pass
59class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
60 @property
61 def path(self) -> str:
62 return cast("str", self.args[0])
64 @property
65 def into(self) -> frozenset[BinaryPackage]:
66 return cast("FrozenSet[BinaryPackage]", self.args[1])
68 @property
69 def definition_source(self) -> str:
70 return cast("str", self.args[2])
73class ExactPathMatchTwiceError(InstallRuleError):
74 @property
75 def path(self) -> str:
76 return cast("str", self.args[1])
78 @property
79 def into(self) -> BinaryPackage:
80 return cast("BinaryPackage", self.args[2])
82 @property
83 def definition_source(self) -> str:
84 return cast("str", self.args[3])
87class NoMatchForInstallPatternError(InstallRuleError):
88 @property
89 def pattern(self) -> str:
90 return cast("str", self.args[1])
92 @property
93 def search_dirs(self) -> Sequence["SearchDir"]:
94 return cast("Sequence[SearchDir]", self.args[2])
96 @property
97 def definition_source(self) -> str:
98 return cast("str", self.args[3])
101@dataclasses.dataclass(slots=True, frozen=True)
102class SearchDir:
103 search_dir: "VirtualPath"
104 applies_to: frozenset[BinaryPackage]
107@dataclasses.dataclass(slots=True, frozen=True)
108class BinaryPackageInstallRuleContext:
109 binary_package: BinaryPackage
110 fs_root: FSPath
111 doc_main_package: BinaryPackage
113 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
114 return dataclasses.replace(self, **changes)
117@dataclasses.dataclass(slots=True, frozen=True)
118class InstallSearchDirContext:
119 search_dirs: Sequence[SearchDir]
120 check_for_uninstalled_dirs: Sequence["VirtualPath"]
121 # TODO: Support search dirs per-package
122 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
123 default_factory=dict
124 )
127@dataclasses.dataclass(slots=True)
128class InstallRuleContext:
129 # TODO: Search dirs should be per-package
130 search_dirs: Sequence[SearchDir]
131 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = (
132 dataclasses.field(default_factory=dict)
133 )
135 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
136 return self.binary_package_contexts[item]
138 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
139 self.binary_package_contexts[key] = value
141 def replace(self, **changes: Any) -> "InstallRuleContext":
142 return dataclasses.replace(self, **changes)
145@dataclasses.dataclass(slots=True, frozen=True)
146class PathMatch:
147 path: "VirtualPath"
148 search_dir: "VirtualPath"
149 is_exact_match: bool
150 into: frozenset[BinaryPackage]
153class DiscardState(IntEnum):
154 UNCHECKED = 0
155 NOT_DISCARDED = 1
156 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
157 DISCARDED_BY_MANIFEST_RULE = 3
160def _determine_manpage_section(
161 match_rule: PathMatch,
162 provided_section: int | None,
163 definition_source: str,
164) -> str | None:
165 section = str(provided_section) if provided_section is not None else None
166 if section is None:
167 detected_section = None
168 with open(match_rule.path.fs_path) as fd:
169 for line in fd:
170 if not line.startswith((".TH", ".Dt")):
171 continue
173 m = _MAN_DT_LINE.match(line)
174 if not m:
175 m = _MAN_TH_LINE.match(line)
176 if not m:
177 continue
178 detected_section = m.group(1)
179 if "." in detected_section:
180 _warn(
181 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
182 f" (detected via {definition_source}): It looks too much like a version"
183 )
184 detected_section = None
185 break
186 if detected_section is None:
187 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
188 if m:
189 detected_section = m.group(1)
190 section = detected_section
192 return section
195def _determine_manpage_real_section(
196 match_rule: PathMatch,
197 section: str | None,
198 definition_source: str,
199) -> int:
200 real_section = None
201 if section is not None:
202 m = _MAN_REAL_SECTION.match(section)
203 if m:
204 real_section = int(m.group(1))
205 if real_section is None or real_section < 0 or real_section > 9:
206 if real_section is not None:
207 _warn(
208 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
209 f" which is not a valid section (must be between 1 and 9 incl.)"
210 )
211 _error(
212 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page"
213 f" was detected via {definition_source}. Consider using `section: <number>` to"
214 " explicitly declare the section. Keep in mind that it applies to all man pages for that"
215 " rule and you may have to split the rule into two for this reason."
216 )
217 return real_section
220def _determine_manpage_language(
221 match_rule: PathMatch,
222 provided_language: str | None,
223) -> str | None:
224 if provided_language is not None:
225 if provided_language not in ("derive-from-basename", "derive-from-path"):
226 return provided_language if provided_language != "C" else None
227 if provided_language == "derive-from-basename":
228 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
229 if m is None:
230 return None
231 return m.group(1)
232 # Fall-through for derive-from-path case
233 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
234 if m is None:
235 return None
236 return m.group(1)
239def _dest_path_for_manpage(
240 provided_section: int | None,
241 provided_language: str | None,
242 definition_source: str,
243) -> Callable[["PathMatch"], str]:
244 def _manpage_dest_path(match_rule: PathMatch) -> str:
245 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
246 section = _determine_manpage_section(
247 match_rule, provided_section, definition_source
248 )
249 real_section = _determine_manpage_real_section(
250 match_rule, section, definition_source
251 )
252 assert section is not None
253 language = _determine_manpage_language(match_rule, provided_language)
254 if language is None:
255 maybe_language = ""
256 else:
257 maybe_language = f"{language}/"
258 lang_suffix = f".{language}"
259 if inst_basename.endswith(lang_suffix):
260 inst_basename = inst_basename[: -len(lang_suffix)]
262 return (
263 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
264 )
266 return _manpage_dest_path
269class SourcePathMatcher:
270 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None:
271 self._already_matched: dict[
272 str,
273 tuple[frozenset[BinaryPackage], str],
274 ] = {}
275 self._exact_match_request: set[tuple[str, str]] = set()
276 self._discarded: dict[str, DiscardState] = {}
277 self._auto_discard_rules = auto_discard_rules
278 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set)
280 def is_reserved(self, path: "VirtualPath") -> bool:
281 fs_path = path.fs_path
282 if fs_path in self._already_matched:
283 return True
284 result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
285 if result == DiscardState.UNCHECKED:
286 result = self._check_plugin_provided_exclude_state_for(path)
287 if result == DiscardState.NOT_DISCARDED:
288 return False
290 return True
292 def exclude(self, path: str) -> None:
293 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
295 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
296 for dr in self._auto_discard_rules:
297 verdict = dr.should_discard(path)
298 if verdict:
299 self.used_auto_discard_rules[dr.name].add(path.fs_path)
300 return True
301 return False
303 def _check_plugin_provided_exclude_state_for(
304 self,
305 path: "VirtualPath",
306 ) -> DiscardState:
307 cache_misses = []
308 current_path = path
309 while True:
310 fs_path = current_path.fs_path
311 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
312 if exclude_state != DiscardState.UNCHECKED:
313 verdict = exclude_state
314 break
315 cache_misses.append(fs_path)
316 if self._run_plugin_provided_discard_rules_on(current_path):
317 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
318 break
319 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
320 # be excluded without the files in it triggering the rule).
321 parent_dir = current_path.parent_dir
322 if not parent_dir:
323 verdict = DiscardState.NOT_DISCARDED
324 break
325 current_path = parent_dir
326 if cache_misses: 326 ↛ 329line 326 didn't jump to line 329 because the condition on line 326 was always true
327 for p in cache_misses:
328 self._discarded[p] = verdict
329 return verdict
331 def may_match(
332 self,
333 match: PathMatch,
334 *,
335 is_exact_match: bool = False,
336 ) -> tuple[frozenset[BinaryPackage], bool]:
337 m = self._already_matched.get(match.path.fs_path)
338 if m: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 return m[0], False
340 current_path = match.path.fs_path
341 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
343 if discard_state == DiscardState.UNCHECKED:
344 discard_state = self._check_plugin_provided_exclude_state_for(match.path)
346 assert discard_state is not None and discard_state != DiscardState.UNCHECKED
348 is_discarded = discard_state != DiscardState.NOT_DISCARDED
349 if (
350 is_exact_match
351 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
352 ):
353 is_discarded = False
354 return frozenset(), is_discarded
356 def reserve(
357 self,
358 path: "VirtualPath",
359 reserved_by: frozenset[BinaryPackage],
360 definition_source: str,
361 *,
362 is_exact_match: bool = False,
363 ) -> None:
364 fs_path = path.fs_path
365 self._already_matched[fs_path] = reserved_by, definition_source
366 if not is_exact_match: 366 ↛ 368line 366 didn't jump to line 368 because the condition on line 366 was always true
367 return
368 for pkg in reserved_by:
369 m_key = (pkg.name, fs_path)
370 self._exact_match_request.add(m_key)
371 try:
372 del self._discarded[fs_path]
373 except KeyError:
374 pass
375 for discarded_paths in self.used_auto_discard_rules.values():
376 discarded_paths.discard(fs_path)
378 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
379 stack = list(search_dir.iterdir)
380 while stack:
381 m = stack.pop()
382 if m.is_dir:
383 s_len = len(stack)
384 stack.extend(m.iterdir)
386 if s_len == len(stack) and not self.is_reserved(m): 386 ↛ 388line 386 didn't jump to line 388 because the condition on line 386 was never true
387 # "Explicitly" empty dir
388 yield m
389 elif not self.is_reserved(m):
390 yield m
392 def find_and_reserve_all_matches(
393 self,
394 match_rule: MatchRule,
395 search_dirs: Sequence[SearchDir],
396 dir_only_match: bool,
397 match_filter: Callable[["VirtualPath"], bool] | None,
398 reserved_by: frozenset[BinaryPackage],
399 definition_source: str,
400 ) -> tuple[list[PathMatch], tuple[int, ...]]:
401 matched = []
402 already_installed_paths = 0
403 already_excluded_paths = 0
404 glob_expand = False if isinstance(match_rule, ExactFileSystemPath) else True
406 for match in _resolve_path(
407 match_rule,
408 search_dirs,
409 dir_only_match,
410 match_filter,
411 reserved_by,
412 ):
413 installed_into, excluded = self.may_match(
414 match, is_exact_match=not glob_expand
415 )
416 if installed_into: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true
417 if glob_expand:
418 already_installed_paths += 1
419 continue
420 packages = ", ".join(p.name for p in installed_into)
421 raise PathAlreadyInstalledOrDiscardedError(
422 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
423 f" The definition that triggered this issue is {definition_source}.",
424 match,
425 installed_into,
426 definition_source,
427 )
428 if excluded:
429 if glob_expand: 429 ↛ 432line 429 didn't jump to line 432 because the condition on line 429 was always true
430 already_excluded_paths += 1
431 continue
432 raise PathAlreadyInstalledOrDiscardedError(
433 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
434 f" above the exclusion rule that excluded it. The definition that triggered this"
435 f" issue is {definition_source}.",
436 match,
437 installed_into,
438 definition_source,
439 )
440 if not glob_expand:
441 for pkg in match.into:
442 m_key = (pkg.name, match.path.fs_path)
443 if m_key in self._exact_match_request: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 raise ExactPathMatchTwiceError(
445 f'The path "{match.path.fs_path}" (via exact match) has already been installed'
446 f" into {pkg.name}. The second installation triggered by {definition_source}",
447 match.path,
448 pkg,
449 definition_source,
450 )
451 self._exact_match_request.add(m_key)
453 if reserved_by: 453 ↛ 459line 453 didn't jump to line 459 because the condition on line 453 was always true
454 self._already_matched[match.path.fs_path] = (
455 match.into,
456 definition_source,
457 )
458 else:
459 self.exclude(match.path.fs_path)
460 matched.append(match)
461 exclude_counts = already_installed_paths, already_excluded_paths
462 return matched, exclude_counts
465def _resolve_path(
466 match_rule: MatchRule,
467 search_dirs: Iterable["SearchDir"],
468 dir_only_match: bool,
469 match_filter: Callable[["VirtualPath"], bool] | None,
470 into: frozenset[BinaryPackage],
471) -> Iterator[PathMatch]:
472 missing_matches = set(into)
473 for sdir in search_dirs:
474 matched = False
475 if into and missing_matches.isdisjoint(sdir.applies_to): 475 ↛ 477line 475 didn't jump to line 477 because the condition on line 475 was never true
476 # All the packages, where this search dir applies, already got a match
477 continue
478 applicable = sdir.applies_to & missing_matches
479 for matched_path in match_rule.finditer(
480 sdir.search_dir,
481 ignore_paths=match_filter,
482 ):
483 if dir_only_match and not matched_path.is_dir: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 continue
485 if matched_path.parent_dir is None:
486 if match_rule is MATCH_ANYTHING: 486 ↛ 488line 486 didn't jump to line 488 because the condition on line 486 was always true
487 continue
488 _error(
489 f"The pattern {match_rule.describe_match_short()} matched the root dir."
490 )
491 yield PathMatch(matched_path, sdir.search_dir, False, applicable)
492 matched = True
493 # continue; we want to match everything we can from this search directory.
495 if matched:
496 missing_matches -= applicable
497 if into and not missing_matches:
498 # For install rules, we can stop as soon as all packages had a match
499 # For discard rules, all search directories must be visited. Otherwise,
500 # you would have to repeat the discard rule once per search dir to be
501 # sure something is fully discarded
502 break
505def _resolve_dest_paths(
506 match: PathMatch,
507 dest_paths: Sequence[tuple[str, bool]],
508 install_context: "InstallRuleContext",
509) -> Sequence[tuple[str, "FSPath"]]:
510 dest_and_roots = []
511 for dest_path, dest_path_is_format in dest_paths:
512 if dest_path_is_format:
513 for pkg in match.into:
514 parent_dir = match.path.parent_dir
515 pkg_install_context = install_context[pkg.name]
516 fs_root = pkg_install_context.fs_root
517 dpath = dest_path.format(
518 basename=match.path.name,
519 dirname=parent_dir.path if parent_dir is not None else "",
520 package_name=pkg.name,
521 doc_main_package_name=pkg_install_context.doc_main_package.name,
522 )
523 if dpath.endswith("/"): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 raise ValueError(
525 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
526 f' with "/" ("{dest_path}"), which it must not!'
527 )
528 dest_and_roots.append((dpath, fs_root))
529 else:
530 if dest_path.endswith("/"): 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 raise ValueError(
532 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
533 " which it must not!"
534 )
535 dest_and_roots.extend(
536 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
537 )
538 return dest_and_roots
541def _resolve_matches(
542 matches: list[PathMatch],
543 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
544 install_context: "InstallRuleContext",
545) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "FSPath"]]]]:
546 dest_and_roots: Sequence[tuple[str, "FSPath"]]
547 if callable(dest_paths): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 compute_dest_path = dest_paths
549 for match in matches:
550 dpath = compute_dest_path(match)
551 if dpath.endswith("/"):
552 raise ValueError(
553 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
554 )
555 dest_and_roots = [
556 (dpath, install_context[pkg.name].fs_root) for pkg in match.into
557 ]
558 yield match, dest_and_roots
559 else:
560 for match in matches:
561 dest_and_roots = _resolve_dest_paths(
562 match,
563 dest_paths,
564 install_context,
565 )
566 yield match, dest_and_roots
569class InstallRule(DebputyDispatchableType):
570 __slots__ = (
571 "_already_matched",
572 "_exact_match_request",
573 "_condition",
574 "_match_filter",
575 "_definition_source",
576 )
578 def __init__(
579 self,
580 condition: ManifestCondition | None,
581 definition_source: str,
582 *,
583 match_filter: Callable[["VirtualPath"], bool] | None = None,
584 ) -> None:
585 super().__init__()
586 self._condition = condition
587 self._definition_source = definition_source
588 self._match_filter = match_filter
590 def _check_single_match(
591 self, source: FileSystemMatchRule, matches: list[PathMatch]
592 ) -> None:
593 seen_pkgs = set()
594 problem_pkgs = frozenset()
595 for m in matches:
596 problem_pkgs = seen_pkgs & m.into
597 if problem_pkgs: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 break
599 seen_pkgs.update(problem_pkgs)
600 if problem_pkgs: 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true
601 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
602 _error(
603 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
604 "However, it should matched exactly one item. Please tighten the pattern defined"
605 f" in {self._definition_source}"
606 )
608 def _match_pattern(
609 self,
610 path_matcher: SourcePathMatcher,
611 fs_match_rule: FileSystemMatchRule,
612 condition_context: ConditionContext,
613 search_dirs: Sequence[SearchDir],
614 into: frozenset[BinaryPackage],
615 ) -> list[PathMatch]:
616 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches(
617 fs_match_rule.match_rule,
618 search_dirs,
619 fs_match_rule.raw_match_rule.endswith("/"),
620 self._match_filter,
621 into,
622 self._definition_source,
623 )
625 already_installed_paths, already_excluded_paths = exclude_counts
627 if into: 627 ↛ 632line 627 didn't jump to line 632 because the condition on line 627 was always true
628 allow_empty_match = all(not p.should_be_acted_on for p in into)
629 else:
630 # discard rules must match provided at least one search dir exist. If none of them
631 # exist, then we assume the discard rule is for a package that will not be built
632 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
633 if self._condition is not None and not self._condition.evaluate( 633 ↛ 636line 633 didn't jump to line 636 because the condition on line 633 was never true
634 condition_context
635 ):
636 allow_empty_match = True
638 if not matched and not allow_empty_match:
639 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
640 if already_excluded_paths and already_installed_paths: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true
641 total_paths = already_excluded_paths + already_installed_paths
642 msg = (
643 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
644 f" {total_paths} path(s) already been matched previously either by install or"
645 f" exclude rules. If you wanted to install some of these paths into multiple"
646 f" packages, please tweak the definition that installed them to install them"
647 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
648 f" If you wanted to install these paths and exclude rules are getting in your"
649 f" way, then please move this install rule before the exclusion rule that causes"
650 f" issue or, in case of built-in excludes, list the paths explicitly (without"
651 f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
652 f" {fs_match_rule.match_rule.describe_match_exact()}"
653 )
654 elif already_excluded_paths: 654 ↛ 655line 654 didn't jump to line 655 because the condition on line 654 was never true
655 msg = (
656 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
657 f" {already_excluded_paths} path(s) that have been excluded."
658 " If you wanted to install some of these paths, please move the install rule"
659 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
660 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
661 f" {fs_match_rule.match_rule.describe_match_exact()}"
662 )
663 elif already_installed_paths: 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true
664 msg = (
665 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
666 f" {already_installed_paths} path(s) already been matched previously."
667 " If you wanted to install some of these paths into multiple packages,"
668 f" please tweak the definition that installed them to install them into"
669 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
670 f" Source for this issue is {self._definition_source}. Match rule:"
671 f" {fs_match_rule.match_rule.describe_match_exact()}"
672 )
673 else:
674 # TODO: Try harder to find the match and point out possible typos
675 msg = (
676 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
677 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
678 )
679 raise NoMatchForInstallPatternError(
680 msg,
681 fs_match_rule,
682 search_dirs,
683 self._definition_source,
684 )
685 return matched
687 def _install_matches(
688 self,
689 path_matcher: SourcePathMatcher,
690 matches: list[PathMatch],
691 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
692 install_context: "InstallRuleContext",
693 into: frozenset[BinaryPackage],
694 condition_context: ConditionContext,
695 ) -> None:
696 if (
697 self._condition is not None
698 and not self._condition.evaluate(condition_context)
699 ) or not any(p.should_be_acted_on for p in into):
700 # Rule is disabled; skip all its actions - also allow empty matches
701 # for this particular case. Though, we do go through the match and
702 # ensure all paths are reserved correctly such that the behavior
703 # between `binary-arch` vs `binary-indep` vs `binary` is consistent
704 for match in matches:
705 self._reserve_recursively(
706 path_matcher,
707 match,
708 into,
709 )
711 return
713 if not matches: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true
714 raise ValueError("matches must not be empty")
716 for match, dest_paths_and_roots in _resolve_matches(
717 matches,
718 dest_paths,
719 install_context,
720 ):
721 install_recursively_into_dirs = []
722 for dest, fs_root in dest_paths_and_roots:
723 dir_part, basename = os.path.split(dest)
724 # We do not associate these with the FS path. First off,
725 # it is complicated to do in most cases (indeed, debhelper
726 # does not preserve these directories either) and secondly,
727 # it is "only" mtime and mode - mostly irrelevant as the
728 # directory is 99.9% likely to be 0755 (we are talking
729 # directories like "/usr", "/usr/share").
730 dir_path = fs_root.mkdirs(dir_part)
731 existing_path = dir_path.get(basename)
733 if match.path.is_dir:
734 if existing_path is not None and not existing_path.is_dir: 734 ↛ 735line 734 didn't jump to line 735 because the condition on line 734 was never true
735 existing_path.unlink()
736 existing_path = None
737 current_dir = existing_path
739 if current_dir is None: 739 ↛ 743line 739 didn't jump to line 743 because the condition on line 739 was always true
740 current_dir = dir_path.mkdir(
741 basename, reference_path=match.path
742 )
743 install_recursively_into_dirs.append(current_dir)
744 else:
745 if existing_path is not None and existing_path.is_dir: 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true
746 _error(
747 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
748 f" and is a directory. This error was triggered via {self._definition_source}."
749 )
751 if match.path.is_symlink:
752 dir_path.add_symlink(
753 basename, match.path.readlink(), reference_path=match.path
754 )
755 else:
756 dir_path.insert_file_from_fs_path(
757 basename,
758 match.path.fs_path,
759 follow_symlinks=False,
760 use_fs_path_mode=True,
761 reference_path=match.path,
762 )
763 if install_recursively_into_dirs:
764 self._install_dir_recursively(
765 path_matcher,
766 install_recursively_into_dirs,
767 match,
768 into,
769 )
771 def _reserve_recursively(
772 self,
773 path_matcher: SourcePathMatcher,
774 match: PathMatch,
775 into: frozenset[BinaryPackage],
776 ) -> None:
777 direct_matched_path = match.path
778 path_matcher.reserve(
779 direct_matched_path,
780 into,
781 self._definition_source,
782 is_exact_match=match.is_exact_match,
783 )
784 if not direct_matched_path.is_dir: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 return
786 stack = list(direct_matched_path.iterdir)
787 while stack:
788 current_path = stack.pop()
789 path_matcher.reserve(
790 current_path,
791 into,
792 self._definition_source,
793 is_exact_match=False,
794 )
795 if current_path.is_dir:
796 stack.extend(current_path.iterdir)
798 def _install_dir_recursively(
799 self,
800 path_matcher: SourcePathMatcher,
801 parent_dirs: Sequence[FSPath],
802 match: PathMatch,
803 into: frozenset[BinaryPackage],
804 ) -> None:
805 stack = [
806 (parent_dirs, e)
807 for e in match.path.iterdir
808 if not path_matcher.is_reserved(e)
809 ]
811 while stack:
812 current_dirs, dir_entry = stack.pop()
813 path_matcher.reserve(
814 dir_entry,
815 into,
816 self._definition_source,
817 is_exact_match=False,
818 )
819 if dir_entry.is_dir: 819 ↛ 820line 819 didn't jump to line 820 because the condition on line 819 was never true
820 new_dirs = [
821 d.mkdir(dir_entry.name, reference_path=dir_entry)
822 for d in current_dirs
823 ]
824 stack.extend(
825 (new_dirs, de)
826 for de in dir_entry.iterdir
827 if not path_matcher.is_reserved(de)
828 )
829 elif dir_entry.is_symlink:
830 for current_dir in current_dirs:
831 current_dir.add_symlink(
832 dir_entry.name,
833 dir_entry.readlink(),
834 reference_path=dir_entry,
835 )
836 elif dir_entry.is_file: 836 ↛ 846line 836 didn't jump to line 846 because the condition on line 836 was always true
837 for current_dir in current_dirs:
838 current_dir.insert_file_from_fs_path(
839 dir_entry.name,
840 dir_entry.fs_path,
841 use_fs_path_mode=True,
842 follow_symlinks=False,
843 reference_path=dir_entry,
844 )
845 else:
846 _error(
847 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
848 )
850 def perform_install(
851 self,
852 path_matcher: SourcePathMatcher,
853 install_context: InstallRuleContext,
854 condition_context: ConditionContext,
855 ) -> None:
856 raise NotImplementedError
858 @classmethod
859 def install_as(
860 cls,
861 source: FileSystemMatchRule,
862 dest_path: str,
863 into: frozenset[BinaryPackage],
864 definition_source: str,
865 condition: ManifestCondition | None,
866 ) -> "InstallRule":
867 return GenericInstallationRule(
868 [source],
869 [(dest_path, False)],
870 into,
871 condition,
872 definition_source,
873 require_single_match=True,
874 )
876 @classmethod
877 def install_dest(
878 cls,
879 sources: Sequence[FileSystemMatchRule],
880 dest_dir: str | None,
881 into: frozenset[BinaryPackage],
882 definition_source: str,
883 condition: ManifestCondition | None,
884 ) -> "InstallRule":
885 if dest_dir is None:
886 dest_dir = "{dirname}/{basename}"
887 else:
888 dest_dir = os.path.join(dest_dir, "{basename}")
889 return GenericInstallationRule(
890 sources,
891 [(dest_dir, True)],
892 into,
893 condition,
894 definition_source,
895 )
897 @classmethod
898 def install_multi_as(
899 cls,
900 source: FileSystemMatchRule,
901 dest_paths: Sequence[str],
902 into: frozenset[BinaryPackage],
903 definition_source: str,
904 condition: ManifestCondition | None,
905 ) -> "InstallRule":
906 if len(dest_paths) < 2: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true
907 raise ValueError(
908 "Please use `install_as` when there is less than 2 dest path"
909 )
910 dps = tuple((dp, False) for dp in dest_paths)
911 return GenericInstallationRule(
912 [source],
913 dps,
914 into,
915 condition,
916 definition_source,
917 require_single_match=True,
918 )
920 @classmethod
921 def install_multi_dest(
922 cls,
923 sources: Sequence[FileSystemMatchRule],
924 dest_dirs: Sequence[str],
925 into: frozenset[BinaryPackage],
926 definition_source: str,
927 condition: ManifestCondition | None,
928 ) -> "InstallRule":
929 if len(dest_dirs) < 2: 929 ↛ 930line 929 didn't jump to line 930 because the condition on line 929 was never true
930 raise ValueError(
931 "Please use `install_dest` when there is less than 2 dest dir"
932 )
933 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
934 return GenericInstallationRule(
935 sources,
936 dest_paths,
937 into,
938 condition,
939 definition_source,
940 )
942 @classmethod
943 def install_doc(
944 cls,
945 sources: Sequence[FileSystemMatchRule],
946 dest_dir: str | None,
947 into: frozenset[BinaryPackage],
948 definition_source: str,
949 condition: ManifestCondition | None,
950 ) -> "InstallRule":
951 cond: ManifestCondition = _BUILD_DOCS_BDO
952 if condition is not None:
953 cond = ManifestCondition.all_of([cond, condition])
954 dest_path_is_format = False
955 if dest_dir is None:
956 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
957 dest_path_is_format = True
959 return GenericInstallationRule(
960 sources,
961 [(dest_dir, dest_path_is_format)],
962 into,
963 cond,
964 definition_source,
965 )
967 @classmethod
968 def install_doc_as(
969 cls,
970 source: FileSystemMatchRule,
971 dest_path: str,
972 into: frozenset[BinaryPackage],
973 definition_source: str,
974 condition: ManifestCondition | None,
975 ) -> "InstallRule":
976 cond: ManifestCondition = _BUILD_DOCS_BDO
977 if condition is not None:
978 cond = ManifestCondition.all_of([cond, condition])
980 return GenericInstallationRule(
981 [source],
982 [(dest_path, False)],
983 into,
984 cond,
985 definition_source,
986 require_single_match=True,
987 )
989 @classmethod
990 def install_examples(
991 cls,
992 sources: Sequence[FileSystemMatchRule],
993 into: frozenset[BinaryPackage],
994 definition_source: str,
995 condition: ManifestCondition | None,
996 ) -> "InstallRule":
997 cond: ManifestCondition = _BUILD_DOCS_BDO
998 if condition is not None: 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true
999 cond = ManifestCondition.all_of([cond, condition])
1000 return GenericInstallationRule(
1001 sources,
1002 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
1003 into,
1004 cond,
1005 definition_source,
1006 )
1008 @classmethod
1009 def install_man(
1010 cls,
1011 sources: Sequence[FileSystemMatchRule],
1012 into: frozenset[BinaryPackage],
1013 section: int | None,
1014 language: str | None,
1015 definition_source: str,
1016 condition: ManifestCondition | None,
1017 ) -> "InstallRule":
1018 cond: ManifestCondition = _BUILD_DOCS_BDO
1019 if condition is not None: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true
1020 cond = ManifestCondition.all_of([cond, condition])
1022 dest_path_computer = _dest_path_for_manpage(
1023 section, language, definition_source
1024 )
1026 return GenericInstallationRule(
1027 sources,
1028 dest_path_computer,
1029 into,
1030 cond,
1031 definition_source,
1032 match_filter=lambda m: not m.is_file,
1033 )
1035 @classmethod
1036 def discard_paths(
1037 cls,
1038 paths: Sequence[FileSystemMatchRule],
1039 definition_source: str,
1040 condition: ManifestCondition | None,
1041 *,
1042 limit_to: Sequence[FileSystemExactMatchRule] | None = None,
1043 ) -> "InstallRule":
1044 return DiscardRule(
1045 paths,
1046 condition,
1047 tuple(limit_to) if limit_to is not None else tuple(),
1048 definition_source,
1049 )
1052class PPFInstallRule(InstallRule):
1053 __slots__ = (
1054 "_ppfs",
1055 "_substitution",
1056 "_into",
1057 )
1059 # noinspection PyMissingConstructor
1060 def __init__(
1061 self,
1062 into: BinaryPackage,
1063 substitution: Substitution,
1064 ppfs: Sequence["PackagerProvidedFile"],
1065 ) -> None:
1066 run_in_context_of_plugin(
1067 "debputy",
1068 super().__init__,
1069 None,
1070 "<built-in; PPF install rule>",
1071 )
1072 self._substitution = substitution
1073 self._ppfs = ppfs
1074 self._into = into
1076 def perform_install(
1077 self,
1078 path_matcher: SourcePathMatcher,
1079 install_context: InstallRuleContext,
1080 condition_context: ConditionContext,
1081 ) -> None:
1082 binary_install_context = install_context[self._into.name]
1083 fs_root = binary_install_context.fs_root
1084 for ppf in self._ppfs:
1085 source_path = ppf.path.fs_path
1086 dest_dir, name = ppf.compute_dest()
1087 dir_path = fs_root.mkdirs(dest_dir)
1089 dir_path.insert_file_from_fs_path(
1090 name,
1091 source_path,
1092 follow_symlinks=True,
1093 use_fs_path_mode=False,
1094 mode=ppf.definition.default_mode,
1095 )
1098class GenericInstallationRule(InstallRule):
1099 __slots__ = (
1100 "_sources",
1101 "_into",
1102 "_dest_paths",
1103 "_require_single_match",
1104 )
1106 def __init__(
1107 self,
1108 sources: Sequence[FileSystemMatchRule],
1109 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
1110 into: frozenset[BinaryPackage],
1111 condition: ManifestCondition | None,
1112 definition_source: str,
1113 *,
1114 require_single_match: bool = False,
1115 match_filter: Callable[["VirtualPath"], bool] | None = None,
1116 ) -> None:
1117 super().__init__(
1118 condition,
1119 definition_source,
1120 match_filter=match_filter,
1121 )
1122 self._sources = sources
1123 self._into = into
1124 self._dest_paths = dest_paths
1125 self._require_single_match = require_single_match
1126 if self._require_single_match and len(sources) != 1: 1126 ↛ 1127line 1126 didn't jump to line 1127 because the condition on line 1126 was never true
1127 raise ValueError("require_single_match implies sources must have len 1")
1129 def perform_install(
1130 self,
1131 path_matcher: SourcePathMatcher,
1132 install_context: InstallRuleContext,
1133 condition_context: ConditionContext,
1134 ) -> None:
1135 for source in self._sources:
1136 matches = self._match_pattern(
1137 path_matcher,
1138 source,
1139 condition_context,
1140 install_context.search_dirs,
1141 self._into,
1142 )
1143 if self._require_single_match and len(matches) > 1:
1144 self._check_single_match(source, matches)
1145 self._install_matches(
1146 path_matcher,
1147 matches,
1148 self._dest_paths,
1149 install_context,
1150 self._into,
1151 condition_context,
1152 )
1155class DiscardRule(InstallRule):
1156 __slots__ = ("_fs_match_rules", "_limit_to")
1158 def __init__(
1159 self,
1160 fs_match_rules: Sequence[FileSystemMatchRule],
1161 condition: ManifestCondition | None,
1162 limit_to: Sequence[FileSystemExactMatchRule],
1163 definition_source: str,
1164 ) -> None:
1165 super().__init__(condition, definition_source)
1166 self._fs_match_rules = fs_match_rules
1167 self._limit_to = limit_to
1169 def perform_install(
1170 self,
1171 path_matcher: SourcePathMatcher,
1172 install_context: InstallRuleContext,
1173 condition_context: ConditionContext,
1174 ) -> None:
1175 into = frozenset()
1176 limit_to = self._limit_to
1177 if limit_to:
1178 matches = {x.match_rule.path for x in limit_to}
1179 search_dirs = tuple(
1180 s
1181 for s in install_context.search_dirs
1182 if s.search_dir.fs_path in matches
1183 )
1184 if len(limit_to) != len(search_dirs):
1185 m = matches.difference(s.search_dir.fs_path for s in search_dirs)
1186 paths = ":".join(m)
1187 _error(
1188 f"The discard rule defined at {self._definition_source} mentions the following"
1189 f" search directories that were not known to debputy: {paths}."
1190 " Either the search dir is missing somewhere else or it should be removed from"
1191 " the discard rule."
1192 )
1193 else:
1194 search_dirs = install_context.search_dirs
1196 for fs_match_rule in self._fs_match_rules:
1197 self._match_pattern(
1198 path_matcher,
1199 fs_match_rule,
1200 condition_context,
1201 search_dirs,
1202 into,
1203 )