Coverage for src/debputy/installations.py: 68%
515 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import collections
2import dataclasses
3import os.path
4import re
5from enum import IntEnum
6from typing import (
7 List,
8 Dict,
9 FrozenSet,
10 Callable,
11 Union,
12 Iterator,
13 Tuple,
14 Set,
15 Sequence,
16 Optional,
17 Iterable,
18 TYPE_CHECKING,
19 cast,
20 Any,
21 Mapping,
22)
24from debputy.exceptions import DebputyRuntimeError
25from debputy.filesystem_scan import FSPath
26from debputy.manifest_conditions import (
27 ConditionContext,
28 ManifestCondition,
29 _BUILD_DOCS_BDO,
30)
31from debputy.manifest_parser.base_types import (
32 FileSystemMatchRule,
33 FileSystemExactMatchRule,
34)
35from debputy.manifest_parser.tagging_types import DebputyDispatchableType
36from debputy.packages import BinaryPackage
37from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
38from debputy.plugin.plugin_state import run_in_context_of_plugin
39from debputy.substitution import Substitution
40from debputy.util import _error, _warn
42if TYPE_CHECKING:
43 from debputy.packager_provided_files import PackagerProvidedFile
44 from debputy.plugin.api import VirtualPath
45 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
48_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
49_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
50_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
51_MAN_REAL_SECTION = re.compile(r"^(\d+)")
52_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
53MAN_GUESS_LANG_FROM_PATH = re.compile(
54 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
55)
56MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
59class InstallRuleError(DebputyRuntimeError):
60 pass
63class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
64 @property
65 def path(self) -> str:
66 return cast("str", self.args[0])
68 @property
69 def into(self) -> FrozenSet[BinaryPackage]:
70 return cast("FrozenSet[BinaryPackage]", self.args[1])
72 @property
73 def definition_source(self) -> str:
74 return cast("str", self.args[2])
77class ExactPathMatchTwiceError(InstallRuleError):
78 @property
79 def path(self) -> str:
80 return cast("str", self.args[1])
82 @property
83 def into(self) -> BinaryPackage:
84 return cast("BinaryPackage", self.args[2])
86 @property
87 def definition_source(self) -> str:
88 return cast("str", self.args[3])
91class NoMatchForInstallPatternError(InstallRuleError):
92 @property
93 def pattern(self) -> str:
94 return cast("str", self.args[1])
96 @property
97 def search_dirs(self) -> Sequence["SearchDir"]:
98 return cast("Sequence[SearchDir]", self.args[2])
100 @property
101 def definition_source(self) -> str:
102 return cast("str", self.args[3])
105@dataclasses.dataclass(slots=True, frozen=True)
106class SearchDir:
107 search_dir: "VirtualPath"
108 applies_to: FrozenSet[BinaryPackage]
111@dataclasses.dataclass(slots=True, frozen=True)
112class BinaryPackageInstallRuleContext:
113 binary_package: BinaryPackage
114 fs_root: FSPath
115 doc_main_package: BinaryPackage
117 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
118 return dataclasses.replace(self, **changes)
121@dataclasses.dataclass(slots=True, frozen=True)
122class InstallSearchDirContext:
123 search_dirs: Sequence[SearchDir]
124 check_for_uninstalled_dirs: Sequence["VirtualPath"]
125 # TODO: Support search dirs per-package
126 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
127 default_factory=dict
128 )
131@dataclasses.dataclass(slots=True)
132class InstallRuleContext:
133 # TODO: Search dirs should be per-package
134 search_dirs: Sequence[SearchDir]
135 binary_package_contexts: Dict[str, BinaryPackageInstallRuleContext] = (
136 dataclasses.field(default_factory=dict)
137 )
139 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
140 return self.binary_package_contexts[item]
142 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
143 self.binary_package_contexts[key] = value
145 def replace(self, **changes: Any) -> "InstallRuleContext":
146 return dataclasses.replace(self, **changes)
149@dataclasses.dataclass(slots=True, frozen=True)
150class PathMatch:
151 path: "VirtualPath"
152 search_dir: "VirtualPath"
153 is_exact_match: bool
154 into: FrozenSet[BinaryPackage]
157class DiscardState(IntEnum):
158 UNCHECKED = 0
159 NOT_DISCARDED = 1
160 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
161 DISCARDED_BY_MANIFEST_RULE = 3
164def _determine_manpage_section(
165 match_rule: PathMatch,
166 provided_section: Optional[int],
167 definition_source: str,
168) -> Optional[str]:
169 section = str(provided_section) if provided_section is not None else None
170 if section is None:
171 detected_section = None
172 with open(match_rule.path.fs_path, "r") as fd:
173 for line in fd:
174 if not line.startswith((".TH", ".Dt")):
175 continue
177 m = _MAN_DT_LINE.match(line)
178 if not m:
179 m = _MAN_TH_LINE.match(line)
180 if not m:
181 continue
182 detected_section = m.group(1)
183 if "." in detected_section:
184 _warn(
185 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
186 f" (detected via {definition_source}): It looks too much like a version"
187 )
188 detected_section = None
189 break
190 if detected_section is None:
191 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
192 if m:
193 detected_section = m.group(1)
194 section = detected_section
196 return section
199def _determine_manpage_real_section(
200 match_rule: PathMatch,
201 section: Optional[str],
202 definition_source: str,
203) -> int:
204 real_section = None
205 if section is not None:
206 m = _MAN_REAL_SECTION.match(section)
207 if m:
208 real_section = int(m.group(1))
209 if real_section is None or real_section < 0 or real_section > 9:
210 if real_section is not None:
211 _warn(
212 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
213 f" which is not a valid section (must be between 1 and 9 incl.)"
214 )
215 _error(
216 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page"
217 f" was detected via {definition_source}. Consider using `section: <number>` to"
218 " explicitly declare the section. Keep in mind that it applies to all man pages for that"
219 " rule and you may have to split the rule into two for this reason."
220 )
221 return real_section
224def _determine_manpage_language(
225 match_rule: PathMatch,
226 provided_language: Optional[str],
227) -> Optional[str]:
228 if provided_language is not None:
229 if provided_language not in ("derive-from-basename", "derive-from-path"):
230 return provided_language if provided_language != "C" else None
231 if provided_language == "derive-from-basename":
232 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
233 if m is None:
234 return None
235 return m.group(1)
236 # Fall-through for derive-from-path case
237 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
238 if m is None:
239 return None
240 return m.group(1)
243def _dest_path_for_manpage(
244 provided_section: Optional[int],
245 provided_language: Optional[str],
246 definition_source: str,
247) -> Callable[["PathMatch"], str]:
248 def _manpage_dest_path(match_rule: PathMatch) -> str:
249 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
250 section = _determine_manpage_section(
251 match_rule, provided_section, definition_source
252 )
253 real_section = _determine_manpage_real_section(
254 match_rule, section, definition_source
255 )
256 assert section is not None
257 language = _determine_manpage_language(match_rule, provided_language)
258 if language is None:
259 maybe_language = ""
260 else:
261 maybe_language = f"{language}/"
262 lang_suffix = f".{language}"
263 if inst_basename.endswith(lang_suffix):
264 inst_basename = inst_basename[: -len(lang_suffix)]
266 return (
267 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
268 )
270 return _manpage_dest_path
273class SourcePathMatcher:
274 def __init__(self, auto_discard_rules: List["PluginProvidedDiscardRule"]) -> None:
275 self._already_matched: Dict[
276 str,
277 Tuple[FrozenSet[BinaryPackage], str],
278 ] = {}
279 self._exact_match_request: Set[Tuple[str, str]] = set()
280 self._discarded: Dict[str, DiscardState] = {}
281 self._auto_discard_rules = auto_discard_rules
282 self.used_auto_discard_rules: Dict[str, Set[str]] = collections.defaultdict(set)
284 def is_reserved(self, path: "VirtualPath") -> bool:
285 fs_path = path.fs_path
286 if fs_path in self._already_matched:
287 return True
288 result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
289 if result == DiscardState.UNCHECKED:
290 result = self._check_plugin_provided_exclude_state_for(path)
291 if result == DiscardState.NOT_DISCARDED:
292 return False
294 return True
296 def exclude(self, path: str) -> None:
297 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
299 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
300 for dr in self._auto_discard_rules:
301 verdict = dr.should_discard(path)
302 if verdict:
303 self.used_auto_discard_rules[dr.name].add(path.fs_path)
304 return True
305 return False
307 def _check_plugin_provided_exclude_state_for(
308 self,
309 path: "VirtualPath",
310 ) -> DiscardState:
311 cache_misses = []
312 current_path = path
313 while True:
314 fs_path = current_path.fs_path
315 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
316 if exclude_state != DiscardState.UNCHECKED:
317 verdict = exclude_state
318 break
319 cache_misses.append(fs_path)
320 if self._run_plugin_provided_discard_rules_on(current_path):
321 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
322 break
323 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
324 # be excluded without the files in it triggering the rule).
325 parent_dir = current_path.parent_dir
326 if not parent_dir:
327 verdict = DiscardState.NOT_DISCARDED
328 break
329 current_path = parent_dir
330 if cache_misses: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 for p in cache_misses:
332 self._discarded[p] = verdict
333 return verdict
335 def may_match(
336 self,
337 match: PathMatch,
338 *,
339 is_exact_match: bool = False,
340 ) -> Tuple[FrozenSet[BinaryPackage], bool]:
341 m = self._already_matched.get(match.path.fs_path)
342 if m: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true
343 return m[0], False
344 current_path = match.path.fs_path
345 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
347 if discard_state == DiscardState.UNCHECKED:
348 discard_state = self._check_plugin_provided_exclude_state_for(match.path)
350 assert discard_state is not None and discard_state != DiscardState.UNCHECKED
352 is_discarded = discard_state != DiscardState.NOT_DISCARDED
353 if (
354 is_exact_match
355 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
356 ):
357 is_discarded = False
358 return frozenset(), is_discarded
360 def reserve(
361 self,
362 path: "VirtualPath",
363 reserved_by: FrozenSet[BinaryPackage],
364 definition_source: str,
365 *,
366 is_exact_match: bool = False,
367 ) -> None:
368 fs_path = path.fs_path
369 self._already_matched[fs_path] = reserved_by, definition_source
370 if not is_exact_match: 370 ↛ 372line 370 didn't jump to line 372 because the condition on line 370 was always true
371 return
372 for pkg in reserved_by:
373 m_key = (pkg.name, fs_path)
374 self._exact_match_request.add(m_key)
375 try:
376 del self._discarded[fs_path]
377 except KeyError:
378 pass
379 for discarded_paths in self.used_auto_discard_rules.values():
380 discarded_paths.discard(fs_path)
382 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
383 stack = list(search_dir.iterdir)
384 while stack:
385 m = stack.pop()
386 if m.is_dir:
387 s_len = len(stack)
388 stack.extend(m.iterdir)
390 if s_len == len(stack) and not self.is_reserved(m): 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true
391 # "Explicitly" empty dir
392 yield m
393 elif not self.is_reserved(m):
394 yield m
396 def find_and_reserve_all_matches(
397 self,
398 match_rule: MatchRule,
399 search_dirs: Sequence[SearchDir],
400 dir_only_match: bool,
401 match_filter: Optional[Callable[["VirtualPath"], bool]],
402 reserved_by: FrozenSet[BinaryPackage],
403 definition_source: str,
404 ) -> Tuple[List[PathMatch], Tuple[int, ...]]:
405 matched = []
406 already_installed_paths = 0
407 already_excluded_paths = 0
408 glob_expand = False if isinstance(match_rule, ExactFileSystemPath) else True
410 for match in _resolve_path(
411 match_rule,
412 search_dirs,
413 dir_only_match,
414 match_filter,
415 reserved_by,
416 ):
417 installed_into, excluded = self.may_match(
418 match, is_exact_match=not glob_expand
419 )
420 if installed_into: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 if glob_expand:
422 already_installed_paths += 1
423 continue
424 packages = ", ".join(p.name for p in installed_into)
425 raise PathAlreadyInstalledOrDiscardedError(
426 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
427 f" The definition that triggered this issue is {definition_source}.",
428 match,
429 installed_into,
430 definition_source,
431 )
432 if excluded:
433 if glob_expand: 433 ↛ 436line 433 didn't jump to line 436 because the condition on line 433 was always true
434 already_excluded_paths += 1
435 continue
436 raise PathAlreadyInstalledOrDiscardedError(
437 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
438 f" above the exclusion rule that excluded it. The definition that triggered this"
439 f" issue is {definition_source}.",
440 match,
441 installed_into,
442 definition_source,
443 )
444 if not glob_expand:
445 for pkg in match.into:
446 m_key = (pkg.name, match.path.fs_path)
447 if m_key in self._exact_match_request: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 raise ExactPathMatchTwiceError(
449 f'The path "{match.path.fs_path}" (via exact match) has already been installed'
450 f" into {pkg.name}. The second installation triggered by {definition_source}",
451 match.path,
452 pkg,
453 definition_source,
454 )
455 self._exact_match_request.add(m_key)
457 if reserved_by: 457 ↛ 463line 457 didn't jump to line 463 because the condition on line 457 was always true
458 self._already_matched[match.path.fs_path] = (
459 match.into,
460 definition_source,
461 )
462 else:
463 self.exclude(match.path.fs_path)
464 matched.append(match)
465 exclude_counts = already_installed_paths, already_excluded_paths
466 return matched, exclude_counts
469def _resolve_path(
470 match_rule: MatchRule,
471 search_dirs: Iterable["SearchDir"],
472 dir_only_match: bool,
473 match_filter: Optional[Callable[["VirtualPath"], bool]],
474 into: FrozenSet[BinaryPackage],
475) -> Iterator[PathMatch]:
476 missing_matches = set(into)
477 for sdir in search_dirs:
478 matched = False
479 if into and missing_matches.isdisjoint(sdir.applies_to): 479 ↛ 481line 479 didn't jump to line 481 because the condition on line 479 was never true
480 # All the packages, where this search dir applies, already got a match
481 continue
482 applicable = sdir.applies_to & missing_matches
483 for matched_path in match_rule.finditer(
484 sdir.search_dir,
485 ignore_paths=match_filter,
486 ):
487 if dir_only_match and not matched_path.is_dir: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 continue
489 if matched_path.parent_dir is None:
490 if match_rule is MATCH_ANYTHING: 490 ↛ 492line 490 didn't jump to line 492 because the condition on line 490 was always true
491 continue
492 _error(
493 f"The pattern {match_rule.describe_match_short()} matched the root dir."
494 )
495 yield PathMatch(matched_path, sdir.search_dir, False, applicable)
496 matched = True
497 # continue; we want to match everything we can from this search directory.
499 if matched:
500 missing_matches -= applicable
501 if into and not missing_matches:
502 # For install rules, we can stop as soon as all packages had a match
503 # For discard rules, all search directories must be visited. Otherwise,
504 # you would have to repeat the discard rule once per search dir to be
505 # sure something is fully discarded
506 break
509def _resolve_dest_paths(
510 match: PathMatch,
511 dest_paths: Sequence[Tuple[str, bool]],
512 install_context: "InstallRuleContext",
513) -> Sequence[Tuple[str, "FSPath"]]:
514 dest_and_roots = []
515 for dest_path, dest_path_is_format in dest_paths:
516 if dest_path_is_format:
517 for pkg in match.into:
518 parent_dir = match.path.parent_dir
519 pkg_install_context = install_context[pkg.name]
520 fs_root = pkg_install_context.fs_root
521 dpath = dest_path.format(
522 basename=match.path.name,
523 dirname=parent_dir.path if parent_dir is not None else "",
524 package_name=pkg.name,
525 doc_main_package_name=pkg_install_context.doc_main_package.name,
526 )
527 if dpath.endswith("/"): 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 raise ValueError(
529 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
530 f' with "/" ("{dest_path}"), which it must not!'
531 )
532 dest_and_roots.append((dpath, fs_root))
533 else:
534 if dest_path.endswith("/"): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 raise ValueError(
536 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
537 " which it must not!"
538 )
539 dest_and_roots.extend(
540 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
541 )
542 return dest_and_roots
545def _resolve_matches(
546 matches: List[PathMatch],
547 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
548 install_context: "InstallRuleContext",
549) -> Iterator[Tuple[PathMatch, Sequence[Tuple[str, "FSPath"]]]]:
550 dest_and_roots: Sequence[Tuple[str, "FSPath"]]
551 if callable(dest_paths): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 compute_dest_path = dest_paths
553 for match in matches:
554 dpath = compute_dest_path(match)
555 if dpath.endswith("/"):
556 raise ValueError(
557 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
558 )
559 dest_and_roots = [
560 (dpath, install_context[pkg.name].fs_root) for pkg in match.into
561 ]
562 yield match, dest_and_roots
563 else:
564 for match in matches:
565 dest_and_roots = _resolve_dest_paths(
566 match,
567 dest_paths,
568 install_context,
569 )
570 yield match, dest_and_roots
573class InstallRule(DebputyDispatchableType):
574 __slots__ = (
575 "_already_matched",
576 "_exact_match_request",
577 "_condition",
578 "_match_filter",
579 "_definition_source",
580 )
582 def __init__(
583 self,
584 condition: Optional[ManifestCondition],
585 definition_source: str,
586 *,
587 match_filter: Optional[Callable[["VirtualPath"], bool]] = None,
588 ) -> None:
589 super().__init__()
590 self._condition = condition
591 self._definition_source = definition_source
592 self._match_filter = match_filter
594 def _check_single_match(
595 self, source: FileSystemMatchRule, matches: List[PathMatch]
596 ) -> None:
597 seen_pkgs = set()
598 problem_pkgs = frozenset()
599 for m in matches:
600 problem_pkgs = seen_pkgs & m.into
601 if problem_pkgs: 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true
602 break
603 seen_pkgs.update(problem_pkgs)
604 if problem_pkgs: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
606 _error(
607 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
608 "However, it should matched exactly one item. Please tighten the pattern defined"
609 f" in {self._definition_source}"
610 )
612 def _match_pattern(
613 self,
614 path_matcher: SourcePathMatcher,
615 fs_match_rule: FileSystemMatchRule,
616 condition_context: ConditionContext,
617 search_dirs: Sequence[SearchDir],
618 into: FrozenSet[BinaryPackage],
619 ) -> List[PathMatch]:
620 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches(
621 fs_match_rule.match_rule,
622 search_dirs,
623 fs_match_rule.raw_match_rule.endswith("/"),
624 self._match_filter,
625 into,
626 self._definition_source,
627 )
629 already_installed_paths, already_excluded_paths = exclude_counts
631 if into: 631 ↛ 636line 631 didn't jump to line 636 because the condition on line 631 was always true
632 allow_empty_match = all(not p.should_be_acted_on for p in into)
633 else:
634 # discard rules must match provided at least one search dir exist. If none of them
635 # exist, then we assume the discard rule is for a package that will not be built
636 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
637 if self._condition is not None and not self._condition.evaluate( 637 ↛ 640line 637 didn't jump to line 640 because the condition on line 637 was never true
638 condition_context
639 ):
640 allow_empty_match = True
642 if not matched and not allow_empty_match:
643 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
644 if already_excluded_paths and already_installed_paths: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true
645 total_paths = already_excluded_paths + already_installed_paths
646 msg = (
647 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
648 f" {total_paths} path(s) already been matched previously either by install or"
649 f" exclude rules. If you wanted to install some of these paths into multiple"
650 f" packages, please tweak the definition that installed them to install them"
651 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
652 f" If you wanted to install these paths and exclude rules are getting in your"
653 f" way, then please move this install rule before the exclusion rule that causes"
654 f" issue or, in case of built-in excludes, list the paths explicitly (without"
655 f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
656 f" {fs_match_rule.match_rule.describe_match_exact()}"
657 )
658 elif already_excluded_paths: 658 ↛ 659line 658 didn't jump to line 659
659 msg = (
660 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
661 f" {already_excluded_paths} path(s) that have been excluded."
662 " If you wanted to install some of these paths, please move the install rule"
663 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
664 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
665 f" {fs_match_rule.match_rule.describe_match_exact()}"
666 )
667 elif already_installed_paths: 667 ↛ 668line 667 didn't jump to line 668
668 msg = (
669 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
670 f" {already_installed_paths} path(s) already been matched previously."
671 " If you wanted to install some of these paths into multiple packages,"
672 f" please tweak the definition that installed them to install them into"
673 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
674 f" Source for this issue is {self._definition_source}. Match rule:"
675 f" {fs_match_rule.match_rule.describe_match_exact()}"
676 )
677 else:
678 # TODO: Try harder to find the match and point out possible typos
679 msg = (
680 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
681 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
682 )
683 raise NoMatchForInstallPatternError(
684 msg,
685 fs_match_rule,
686 search_dirs,
687 self._definition_source,
688 )
689 return matched
691 def _install_matches(
692 self,
693 path_matcher: SourcePathMatcher,
694 matches: List[PathMatch],
695 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
696 install_context: "InstallRuleContext",
697 into: FrozenSet[BinaryPackage],
698 condition_context: ConditionContext,
699 ) -> None:
700 if (
701 self._condition is not None
702 and not self._condition.evaluate(condition_context)
703 ) or not any(p.should_be_acted_on for p in into):
704 # Rule is disabled; skip all its actions - also allow empty matches
705 # for this particular case. Though, we do go through the match and
706 # ensure all paths are reserved correctly such that the behavior
707 # between `binary-arch` vs `binary-indep` vs `binary` is consistent
708 for match in matches:
709 self._reserve_recursively(
710 path_matcher,
711 match,
712 into,
713 )
715 return
717 if not matches: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 raise ValueError("matches must not be empty")
720 for match, dest_paths_and_roots in _resolve_matches(
721 matches,
722 dest_paths,
723 install_context,
724 ):
725 install_recursively_into_dirs = []
726 for dest, fs_root in dest_paths_and_roots:
727 dir_part, basename = os.path.split(dest)
728 # We do not associate these with the FS path. First off,
729 # it is complicated to do in most cases (indeed, debhelper
730 # does not preserve these directories either) and secondly,
731 # it is "only" mtime and mode - mostly irrelevant as the
732 # directory is 99.9% likely to be 0755 (we are talking
733 # directories like "/usr", "/usr/share").
734 dir_path = fs_root.mkdirs(dir_part)
735 existing_path = dir_path.get(basename)
737 if match.path.is_dir:
738 if existing_path is not None and not existing_path.is_dir: 738 ↛ 739line 738 didn't jump to line 739 because the condition on line 738 was never true
739 existing_path.unlink()
740 existing_path = None
741 current_dir = existing_path
743 if current_dir is None: 743 ↛ 747line 743 didn't jump to line 747 because the condition on line 743 was always true
744 current_dir = dir_path.mkdir(
745 basename, reference_path=match.path
746 )
747 install_recursively_into_dirs.append(current_dir)
748 else:
749 if existing_path is not None and existing_path.is_dir: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true
750 _error(
751 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
752 f" and is a directory. This error was triggered via {self._definition_source}."
753 )
755 if match.path.is_symlink:
756 dir_path.add_symlink(
757 basename, match.path.readlink(), reference_path=match.path
758 )
759 else:
760 dir_path.insert_file_from_fs_path(
761 basename,
762 match.path.fs_path,
763 follow_symlinks=False,
764 use_fs_path_mode=True,
765 reference_path=match.path,
766 )
767 if install_recursively_into_dirs:
768 self._install_dir_recursively(
769 path_matcher,
770 install_recursively_into_dirs,
771 match,
772 into,
773 )
775 def _reserve_recursively(
776 self,
777 path_matcher: SourcePathMatcher,
778 match: PathMatch,
779 into: FrozenSet[BinaryPackage],
780 ) -> None:
781 direct_matched_path = match.path
782 path_matcher.reserve(
783 direct_matched_path,
784 into,
785 self._definition_source,
786 is_exact_match=match.is_exact_match,
787 )
788 if not direct_matched_path.is_dir: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true
789 return
790 stack = list(direct_matched_path.iterdir)
791 while stack:
792 current_path = stack.pop()
793 path_matcher.reserve(
794 current_path,
795 into,
796 self._definition_source,
797 is_exact_match=False,
798 )
799 if current_path.is_dir:
800 stack.extend(current_path.iterdir)
802 def _install_dir_recursively(
803 self,
804 path_matcher: SourcePathMatcher,
805 parent_dirs: Sequence[FSPath],
806 match: PathMatch,
807 into: FrozenSet[BinaryPackage],
808 ) -> None:
809 stack = [
810 (parent_dirs, e)
811 for e in match.path.iterdir
812 if not path_matcher.is_reserved(e)
813 ]
815 while stack:
816 current_dirs, dir_entry = stack.pop()
817 path_matcher.reserve(
818 dir_entry,
819 into,
820 self._definition_source,
821 is_exact_match=False,
822 )
823 if dir_entry.is_dir: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true
824 new_dirs = [
825 d.mkdir(dir_entry.name, reference_path=dir_entry)
826 for d in current_dirs
827 ]
828 stack.extend(
829 (new_dirs, de)
830 for de in dir_entry.iterdir
831 if not path_matcher.is_reserved(de)
832 )
833 elif dir_entry.is_symlink:
834 for current_dir in current_dirs:
835 current_dir.add_symlink(
836 dir_entry.name,
837 dir_entry.readlink(),
838 reference_path=dir_entry,
839 )
840 elif dir_entry.is_file: 840 ↛ 850line 840 didn't jump to line 850 because the condition on line 840 was always true
841 for current_dir in current_dirs:
842 current_dir.insert_file_from_fs_path(
843 dir_entry.name,
844 dir_entry.fs_path,
845 use_fs_path_mode=True,
846 follow_symlinks=False,
847 reference_path=dir_entry,
848 )
849 else:
850 _error(
851 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
852 )
854 def perform_install(
855 self,
856 path_matcher: SourcePathMatcher,
857 install_context: InstallRuleContext,
858 condition_context: ConditionContext,
859 ) -> None:
860 raise NotImplementedError
862 @classmethod
863 def install_as(
864 cls,
865 source: FileSystemMatchRule,
866 dest_path: str,
867 into: FrozenSet[BinaryPackage],
868 definition_source: str,
869 condition: Optional[ManifestCondition],
870 ) -> "InstallRule":
871 return GenericInstallationRule(
872 [source],
873 [(dest_path, False)],
874 into,
875 condition,
876 definition_source,
877 require_single_match=True,
878 )
880 @classmethod
881 def install_dest(
882 cls,
883 sources: Sequence[FileSystemMatchRule],
884 dest_dir: Optional[str],
885 into: FrozenSet[BinaryPackage],
886 definition_source: str,
887 condition: Optional[ManifestCondition],
888 ) -> "InstallRule":
889 if dest_dir is None:
890 dest_dir = "{dirname}/{basename}"
891 else:
892 dest_dir = os.path.join(dest_dir, "{basename}")
893 return GenericInstallationRule(
894 sources,
895 [(dest_dir, True)],
896 into,
897 condition,
898 definition_source,
899 )
901 @classmethod
902 def install_multi_as(
903 cls,
904 source: FileSystemMatchRule,
905 dest_paths: Sequence[str],
906 into: FrozenSet[BinaryPackage],
907 definition_source: str,
908 condition: Optional[ManifestCondition],
909 ) -> "InstallRule":
910 if len(dest_paths) < 2: 910 ↛ 911line 910 didn't jump to line 911 because the condition on line 910 was never true
911 raise ValueError(
912 "Please use `install_as` when there is less than 2 dest path"
913 )
914 dps = tuple((dp, False) for dp in dest_paths)
915 return GenericInstallationRule(
916 [source],
917 dps,
918 into,
919 condition,
920 definition_source,
921 require_single_match=True,
922 )
924 @classmethod
925 def install_multi_dest(
926 cls,
927 sources: Sequence[FileSystemMatchRule],
928 dest_dirs: Sequence[str],
929 into: FrozenSet[BinaryPackage],
930 definition_source: str,
931 condition: Optional[ManifestCondition],
932 ) -> "InstallRule":
933 if len(dest_dirs) < 2: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 raise ValueError(
935 "Please use `install_dest` when there is less than 2 dest dir"
936 )
937 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
938 return GenericInstallationRule(
939 sources,
940 dest_paths,
941 into,
942 condition,
943 definition_source,
944 )
946 @classmethod
947 def install_doc(
948 cls,
949 sources: Sequence[FileSystemMatchRule],
950 dest_dir: Optional[str],
951 into: FrozenSet[BinaryPackage],
952 definition_source: str,
953 condition: Optional[ManifestCondition],
954 ) -> "InstallRule":
955 cond: ManifestCondition = _BUILD_DOCS_BDO
956 if condition is not None:
957 cond = ManifestCondition.all_of([cond, condition])
958 dest_path_is_format = False
959 if dest_dir is None:
960 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
961 dest_path_is_format = True
963 return GenericInstallationRule(
964 sources,
965 [(dest_dir, dest_path_is_format)],
966 into,
967 cond,
968 definition_source,
969 )
971 @classmethod
972 def install_doc_as(
973 cls,
974 source: FileSystemMatchRule,
975 dest_path: str,
976 into: FrozenSet[BinaryPackage],
977 definition_source: str,
978 condition: Optional[ManifestCondition],
979 ) -> "InstallRule":
980 cond: ManifestCondition = _BUILD_DOCS_BDO
981 if condition is not None:
982 cond = ManifestCondition.all_of([cond, condition])
984 return GenericInstallationRule(
985 [source],
986 [(dest_path, False)],
987 into,
988 cond,
989 definition_source,
990 require_single_match=True,
991 )
993 @classmethod
994 def install_examples(
995 cls,
996 sources: Sequence[FileSystemMatchRule],
997 into: FrozenSet[BinaryPackage],
998 definition_source: str,
999 condition: Optional[ManifestCondition],
1000 ) -> "InstallRule":
1001 cond: ManifestCondition = _BUILD_DOCS_BDO
1002 if condition is not None: 1002 ↛ 1003line 1002 didn't jump to line 1003 because the condition on line 1002 was never true
1003 cond = ManifestCondition.all_of([cond, condition])
1004 return GenericInstallationRule(
1005 sources,
1006 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
1007 into,
1008 cond,
1009 definition_source,
1010 )
1012 @classmethod
1013 def install_man(
1014 cls,
1015 sources: Sequence[FileSystemMatchRule],
1016 into: FrozenSet[BinaryPackage],
1017 section: Optional[int],
1018 language: Optional[str],
1019 definition_source: str,
1020 condition: Optional[ManifestCondition],
1021 ) -> "InstallRule":
1022 cond: ManifestCondition = _BUILD_DOCS_BDO
1023 if condition is not None: 1023 ↛ 1024line 1023 didn't jump to line 1024 because the condition on line 1023 was never true
1024 cond = ManifestCondition.all_of([cond, condition])
1026 dest_path_computer = _dest_path_for_manpage(
1027 section, language, definition_source
1028 )
1030 return GenericInstallationRule( 1030 ↛ exitline 1030 didn't jump to the function exit
1031 sources,
1032 dest_path_computer,
1033 into,
1034 cond,
1035 definition_source,
1036 match_filter=lambda m: not m.is_file,
1037 )
1039 @classmethod
1040 def discard_paths(
1041 cls,
1042 paths: Sequence[FileSystemMatchRule],
1043 definition_source: str,
1044 condition: Optional[ManifestCondition],
1045 *,
1046 limit_to: Optional[Sequence[FileSystemExactMatchRule]] = None,
1047 ) -> "InstallRule":
1048 return DiscardRule(
1049 paths,
1050 condition,
1051 tuple(limit_to) if limit_to is not None else tuple(),
1052 definition_source,
1053 )
1056class PPFInstallRule(InstallRule):
1057 __slots__ = (
1058 "_ppfs",
1059 "_substitution",
1060 "_into",
1061 )
1063 # noinspection PyMissingConstructor
1064 def __init__(
1065 self,
1066 into: BinaryPackage,
1067 substitution: Substitution,
1068 ppfs: Sequence["PackagerProvidedFile"],
1069 ) -> None:
1070 run_in_context_of_plugin(
1071 "debputy",
1072 super().__init__,
1073 None,
1074 "<built-in; PPF install rule>",
1075 )
1076 self._substitution = substitution
1077 self._ppfs = ppfs
1078 self._into = into
1080 def perform_install(
1081 self,
1082 path_matcher: SourcePathMatcher,
1083 install_context: InstallRuleContext,
1084 condition_context: ConditionContext,
1085 ) -> None:
1086 binary_install_context = install_context[self._into.name]
1087 fs_root = binary_install_context.fs_root
1088 for ppf in self._ppfs:
1089 source_path = ppf.path.fs_path
1090 dest_dir, name = ppf.compute_dest()
1091 dir_path = fs_root.mkdirs(dest_dir)
1093 dir_path.insert_file_from_fs_path(
1094 name,
1095 source_path,
1096 follow_symlinks=True,
1097 use_fs_path_mode=False,
1098 mode=ppf.definition.default_mode,
1099 )
1102class GenericInstallationRule(InstallRule):
1103 __slots__ = (
1104 "_sources",
1105 "_into",
1106 "_dest_paths",
1107 "_require_single_match",
1108 )
1110 def __init__(
1111 self,
1112 sources: Sequence[FileSystemMatchRule],
1113 dest_paths: Union[Sequence[Tuple[str, bool]], Callable[[PathMatch], str]],
1114 into: FrozenSet[BinaryPackage],
1115 condition: Optional[ManifestCondition],
1116 definition_source: str,
1117 *,
1118 require_single_match: bool = False,
1119 match_filter: Optional[Callable[["VirtualPath"], bool]] = None,
1120 ) -> None:
1121 super().__init__(
1122 condition,
1123 definition_source,
1124 match_filter=match_filter,
1125 )
1126 self._sources = sources
1127 self._into = into
1128 self._dest_paths = dest_paths
1129 self._require_single_match = require_single_match
1130 if self._require_single_match and len(sources) != 1: 1130 ↛ 1131line 1130 didn't jump to line 1131 because the condition on line 1130 was never true
1131 raise ValueError("require_single_match implies sources must have len 1")
1133 def perform_install(
1134 self,
1135 path_matcher: SourcePathMatcher,
1136 install_context: InstallRuleContext,
1137 condition_context: ConditionContext,
1138 ) -> None:
1139 for source in self._sources:
1140 matches = self._match_pattern(
1141 path_matcher,
1142 source,
1143 condition_context,
1144 install_context.search_dirs,
1145 self._into,
1146 )
1147 if self._require_single_match and len(matches) > 1:
1148 self._check_single_match(source, matches)
1149 self._install_matches(
1150 path_matcher,
1151 matches,
1152 self._dest_paths,
1153 install_context,
1154 self._into,
1155 condition_context,
1156 )
1159class DiscardRule(InstallRule):
1160 __slots__ = ("_fs_match_rules", "_limit_to")
1162 def __init__(
1163 self,
1164 fs_match_rules: Sequence[FileSystemMatchRule],
1165 condition: Optional[ManifestCondition],
1166 limit_to: Sequence[FileSystemExactMatchRule],
1167 definition_source: str,
1168 ) -> None:
1169 super().__init__(condition, definition_source)
1170 self._fs_match_rules = fs_match_rules
1171 self._limit_to = limit_to
1173 def perform_install(
1174 self,
1175 path_matcher: SourcePathMatcher,
1176 install_context: InstallRuleContext,
1177 condition_context: ConditionContext,
1178 ) -> None:
1179 into = frozenset()
1180 limit_to = self._limit_to
1181 if limit_to:
1182 matches = {x.match_rule.path for x in limit_to}
1183 search_dirs = tuple(
1184 s
1185 for s in install_context.search_dirs
1186 if s.search_dir.fs_path in matches
1187 )
1188 if len(limit_to) != len(search_dirs):
1189 m = matches.difference(s.search_dir.fs_path for s in search_dirs)
1190 paths = ":".join(m)
1191 _error(
1192 f"The discard rule defined at {self._definition_source} mentions the following"
1193 f" search directories that were not known to debputy: {paths}."
1194 " Either the search dir is missing somewhere else or it should be removed from"
1195 " the discard rule."
1196 )
1197 else:
1198 search_dirs = install_context.search_dirs
1200 for fs_match_rule in self._fs_match_rules:
1201 self._match_pattern(
1202 path_matcher,
1203 fs_match_rule,
1204 condition_context,
1205 search_dirs,
1206 into,
1207 )