Coverage for src/debputy/installations.py: 68%
512 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
1import collections
2import dataclasses
3import os.path
4import re
5from enum import IntEnum
6from typing import TYPE_CHECKING, cast, Any
7from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping
9from debputy.exceptions import DebputyRuntimeError
10from debputy.filesystem_scan import FSPath
11from debputy.manifest_conditions import (
12 ConditionContext,
13 ManifestCondition,
14 _BUILD_DOCS_BDO,
15)
16from debputy.manifest_parser.base_types import (
17 FileSystemMatchRule,
18 FileSystemExactMatchRule,
19)
20from debputy.manifest_parser.tagging_types import DebputyDispatchableType
21from debputy.packages import BinaryPackage
22from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
23from debputy.plugin.plugin_state import run_in_context_of_plugin
24from debputy.substitution import Substitution
25from debputy.util import _error, _warn
27if TYPE_CHECKING:
28 from debputy.packager_provided_files import PackagerProvidedFile
29 from debputy.plugin.api import VirtualPath
30 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
33_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
34_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
35_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
36_MAN_REAL_SECTION = re.compile(r"^(\d+)")
37_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
38MAN_GUESS_LANG_FROM_PATH = re.compile(
39 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
40)
41MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
44class InstallRuleError(DebputyRuntimeError):
45 pass
48class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
49 @property
50 def path(self) -> str:
51 return cast("str", self.args[0])
53 @property
54 def into(self) -> frozenset[BinaryPackage]:
55 return cast(frozenset[BinaryPackage], self.args[1])
57 @property
58 def definition_source(self) -> str:
59 return cast("str", self.args[2])
62class ExactPathMatchTwiceError(InstallRuleError):
63 @property
64 def path(self) -> str:
65 return cast("str", self.args[1])
67 @property
68 def into(self) -> BinaryPackage:
69 return cast("BinaryPackage", self.args[2])
71 @property
72 def definition_source(self) -> str:
73 return cast("str", self.args[3])
76class NoMatchForInstallPatternError(InstallRuleError):
77 @property
78 def pattern(self) -> str:
79 return cast("str", self.args[1])
81 @property
82 def search_dirs(self) -> Sequence["SearchDir"]:
83 return cast("Sequence[SearchDir]", self.args[2])
85 @property
86 def definition_source(self) -> str:
87 return cast("str", self.args[3])
90@dataclasses.dataclass(slots=True, frozen=True)
91class SearchDir:
92 search_dir: "VirtualPath"
93 applies_to: frozenset[BinaryPackage]
96@dataclasses.dataclass(slots=True, frozen=True)
97class BinaryPackageInstallRuleContext:
98 binary_package: BinaryPackage
99 fs_root: FSPath
100 doc_main_package: BinaryPackage
102 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
103 return dataclasses.replace(self, **changes)
106@dataclasses.dataclass(slots=True, frozen=True)
107class InstallSearchDirContext:
108 search_dirs: Sequence[SearchDir]
109 check_for_uninstalled_dirs: Sequence["VirtualPath"]
110 # TODO: Support search dirs per-package
111 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
112 default_factory=dict
113 )
116@dataclasses.dataclass(slots=True)
117class InstallRuleContext:
118 # TODO: Search dirs should be per-package
119 search_dirs: Sequence[SearchDir]
120 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = (
121 dataclasses.field(default_factory=dict)
122 )
124 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
125 return self.binary_package_contexts[item]
127 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
128 self.binary_package_contexts[key] = value
130 def replace(self, **changes: Any) -> "InstallRuleContext":
131 return dataclasses.replace(self, **changes)
134@dataclasses.dataclass(slots=True, frozen=True)
135class PathMatch:
136 path: "VirtualPath"
137 search_dir: "VirtualPath"
138 is_exact_match: bool
139 into: frozenset[BinaryPackage]
142class DiscardState(IntEnum):
143 UNCHECKED = 0
144 NOT_DISCARDED = 1
145 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
146 DISCARDED_BY_MANIFEST_RULE = 3
149def _determine_manpage_section(
150 match_rule: PathMatch,
151 provided_section: int | None,
152 definition_source: str,
153) -> str | None:
154 section = str(provided_section) if provided_section is not None else None
155 if section is None:
156 detected_section = None
157 with open(match_rule.path.fs_path) as fd:
158 for line in fd:
159 if not line.startswith((".TH", ".Dt")):
160 continue
162 m = _MAN_DT_LINE.match(line)
163 if not m:
164 m = _MAN_TH_LINE.match(line)
165 if not m:
166 continue
167 detected_section = m.group(1)
168 if "." in detected_section:
169 _warn(
170 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
171 f" (detected via {definition_source}): It looks too much like a version"
172 )
173 detected_section = None
174 break
175 if detected_section is None:
176 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
177 if m:
178 detected_section = m.group(1)
179 section = detected_section
181 return section
184def _determine_manpage_real_section(
185 match_rule: PathMatch,
186 section: str | None,
187 definition_source: str,
188) -> int:
189 real_section = None
190 if section is not None:
191 m = _MAN_REAL_SECTION.match(section)
192 if m:
193 real_section = int(m.group(1))
194 if real_section is None or real_section < 0 or real_section > 9:
195 if real_section is not None:
196 _warn(
197 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
198 f" which is not a valid section (must be between 1 and 9 incl.)"
199 )
200 _error(
201 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page"
202 f" was detected via {definition_source}. Consider using `section: <number>` to"
203 " explicitly declare the section. Keep in mind that it applies to all man pages for that"
204 " rule and you may have to split the rule into two for this reason."
205 )
206 return real_section
209def _determine_manpage_language(
210 match_rule: PathMatch,
211 provided_language: str | None,
212) -> str | None:
213 if provided_language is not None:
214 if provided_language not in ("derive-from-basename", "derive-from-path"):
215 return provided_language if provided_language != "C" else None
216 if provided_language == "derive-from-basename":
217 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
218 if m is None:
219 return None
220 return m.group(1)
221 # Fall-through for derive-from-path case
222 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
223 if m is None:
224 return None
225 return m.group(1)
228def _dest_path_for_manpage(
229 provided_section: int | None,
230 provided_language: str | None,
231 definition_source: str,
232) -> Callable[["PathMatch"], str]:
233 def _manpage_dest_path(match_rule: PathMatch) -> str:
234 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
235 section = _determine_manpage_section(
236 match_rule, provided_section, definition_source
237 )
238 real_section = _determine_manpage_real_section(
239 match_rule, section, definition_source
240 )
241 assert section is not None
242 language = _determine_manpage_language(match_rule, provided_language)
243 if language is None:
244 maybe_language = ""
245 else:
246 maybe_language = f"{language}/"
247 lang_suffix = f".{language}"
248 if inst_basename.endswith(lang_suffix):
249 inst_basename = inst_basename[: -len(lang_suffix)]
251 return (
252 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
253 )
255 return _manpage_dest_path
258class SourcePathMatcher:
259 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None:
260 self._already_matched: dict[
261 str,
262 tuple[frozenset[BinaryPackage], str],
263 ] = {}
264 self._exact_match_request: set[tuple[str, str]] = set()
265 self._discarded: dict[str, DiscardState] = {}
266 self._auto_discard_rules = auto_discard_rules
267 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set)
269 def is_reserved(self, path: "VirtualPath") -> bool:
270 fs_path = path.fs_path
271 if fs_path in self._already_matched:
272 return True
273 result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
274 if result == DiscardState.UNCHECKED:
275 result = self._check_plugin_provided_exclude_state_for(path)
276 if result == DiscardState.NOT_DISCARDED:
277 return False
279 return True
281 def exclude(self, path: str) -> None:
282 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
284 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
285 for dr in self._auto_discard_rules:
286 verdict = dr.should_discard(path)
287 if verdict:
288 self.used_auto_discard_rules[dr.name].add(path.fs_path)
289 return True
290 return False
292 def _check_plugin_provided_exclude_state_for(
293 self,
294 path: "VirtualPath",
295 ) -> DiscardState:
296 cache_misses = []
297 current_path = path
298 while True:
299 fs_path = current_path.fs_path
300 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
301 if exclude_state != DiscardState.UNCHECKED:
302 verdict = exclude_state
303 break
304 cache_misses.append(fs_path)
305 if self._run_plugin_provided_discard_rules_on(current_path):
306 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
307 break
308 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
309 # be excluded without the files in it triggering the rule).
310 parent_dir = current_path.parent_dir
311 if not parent_dir:
312 verdict = DiscardState.NOT_DISCARDED
313 break
314 current_path = parent_dir
315 if cache_misses: 315 ↛ 318line 315 didn't jump to line 318 because the condition on line 315 was always true
316 for p in cache_misses:
317 self._discarded[p] = verdict
318 return verdict
320 def may_match(
321 self,
322 match: PathMatch,
323 *,
324 is_exact_match: bool = False,
325 ) -> tuple[frozenset[BinaryPackage], bool]:
326 m = self._already_matched.get(match.path.fs_path)
327 if m: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 return m[0], False
329 current_path = match.path.fs_path
330 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
332 if discard_state == DiscardState.UNCHECKED:
333 discard_state = self._check_plugin_provided_exclude_state_for(match.path)
335 assert discard_state is not None and discard_state != DiscardState.UNCHECKED
337 is_discarded = discard_state != DiscardState.NOT_DISCARDED
338 if (
339 is_exact_match
340 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
341 ):
342 is_discarded = False
343 return frozenset(), is_discarded
345 def reserve(
346 self,
347 path: "VirtualPath",
348 reserved_by: frozenset[BinaryPackage],
349 definition_source: str,
350 *,
351 is_exact_match: bool = False,
352 ) -> None:
353 fs_path = path.fs_path
354 self._already_matched[fs_path] = reserved_by, definition_source
355 if not is_exact_match: 355 ↛ 357line 355 didn't jump to line 357 because the condition on line 355 was always true
356 return
357 for pkg in reserved_by:
358 m_key = (pkg.name, fs_path)
359 self._exact_match_request.add(m_key)
360 try:
361 del self._discarded[fs_path]
362 except KeyError:
363 pass
364 for discarded_paths in self.used_auto_discard_rules.values():
365 discarded_paths.discard(fs_path)
367 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
368 stack = list(search_dir.iterdir)
369 while stack:
370 m = stack.pop()
371 if m.is_dir:
372 s_len = len(stack)
373 stack.extend(m.iterdir)
375 if s_len == len(stack) and not self.is_reserved(m): 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was never true
376 # "Explicitly" empty dir
377 yield m
378 elif not self.is_reserved(m):
379 yield m
381 def find_and_reserve_all_matches(
382 self,
383 match_rule: MatchRule,
384 search_dirs: Sequence[SearchDir],
385 dir_only_match: bool,
386 match_filter: Callable[["VirtualPath"], bool] | None,
387 reserved_by: frozenset[BinaryPackage],
388 definition_source: str,
389 ) -> tuple[list[PathMatch], tuple[int, ...]]:
390 matched = []
391 already_installed_paths = 0
392 already_excluded_paths = 0
393 glob_expand = not isinstance(match_rule, ExactFileSystemPath)
395 for match in _resolve_path(
396 match_rule,
397 search_dirs,
398 dir_only_match,
399 match_filter,
400 reserved_by,
401 ):
402 installed_into, excluded = self.may_match(
403 match, is_exact_match=not glob_expand
404 )
405 if installed_into: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 if glob_expand:
407 already_installed_paths += 1
408 continue
409 packages = ", ".join(p.name for p in installed_into)
410 raise PathAlreadyInstalledOrDiscardedError(
411 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
412 f" The definition that triggered this issue is {definition_source}.",
413 match,
414 installed_into,
415 definition_source,
416 )
417 if excluded:
418 if glob_expand: 418 ↛ 421line 418 didn't jump to line 421 because the condition on line 418 was always true
419 already_excluded_paths += 1
420 continue
421 raise PathAlreadyInstalledOrDiscardedError(
422 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
423 f" above the exclusion rule that excluded it. The definition that triggered this"
424 f" issue is {definition_source}.",
425 match,
426 installed_into,
427 definition_source,
428 )
429 if not glob_expand:
430 for pkg in match.into:
431 m_key = (pkg.name, match.path.fs_path)
432 if m_key in self._exact_match_request: 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true
433 raise ExactPathMatchTwiceError(
434 f'The path "{match.path.fs_path}" (via exact match) has already been installed'
435 f" into {pkg.name}. The second installation triggered by {definition_source}",
436 match.path,
437 pkg,
438 definition_source,
439 )
440 self._exact_match_request.add(m_key)
442 if reserved_by: 442 ↛ 448line 442 didn't jump to line 448 because the condition on line 442 was always true
443 self._already_matched[match.path.fs_path] = (
444 match.into,
445 definition_source,
446 )
447 else:
448 self.exclude(match.path.fs_path)
449 matched.append(match)
450 exclude_counts = already_installed_paths, already_excluded_paths
451 return matched, exclude_counts
454def _resolve_path(
455 match_rule: MatchRule,
456 search_dirs: Iterable["SearchDir"],
457 dir_only_match: bool,
458 match_filter: Callable[["VirtualPath"], bool] | None,
459 into: frozenset[BinaryPackage],
460) -> Iterator[PathMatch]:
461 missing_matches = set(into)
462 for sdir in search_dirs:
463 matched = False
464 if into and missing_matches.isdisjoint(sdir.applies_to): 464 ↛ 466line 464 didn't jump to line 466 because the condition on line 464 was never true
465 # All the packages, where this search dir applies, already got a match
466 continue
467 applicable = sdir.applies_to & missing_matches
468 for matched_path in match_rule.finditer(
469 sdir.search_dir,
470 ignore_paths=match_filter,
471 ):
472 if dir_only_match and not matched_path.is_dir: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 continue
474 if matched_path.parent_dir is None:
475 if match_rule is MATCH_ANYTHING: 475 ↛ 477line 475 didn't jump to line 477 because the condition on line 475 was always true
476 continue
477 _error(
478 f"The pattern {match_rule.describe_match_short()} matched the root dir."
479 )
480 yield PathMatch(matched_path, sdir.search_dir, False, applicable)
481 matched = True
482 # continue; we want to match everything we can from this search directory.
484 if matched:
485 missing_matches -= applicable
486 if into and not missing_matches:
487 # For install rules, we can stop as soon as all packages had a match
488 # For discard rules, all search directories must be visited. Otherwise,
489 # you would have to repeat the discard rule once per search dir to be
490 # sure something is fully discarded
491 break
494def _resolve_dest_paths(
495 match: PathMatch,
496 dest_paths: Sequence[tuple[str, bool]],
497 install_context: "InstallRuleContext",
498) -> Sequence[tuple[str, "FSPath"]]:
499 dest_and_roots = []
500 for dest_path, dest_path_is_format in dest_paths:
501 if dest_path_is_format:
502 for pkg in match.into:
503 parent_dir = match.path.parent_dir
504 pkg_install_context = install_context[pkg.name]
505 fs_root = pkg_install_context.fs_root
506 dpath = dest_path.format(
507 basename=match.path.name,
508 dirname=parent_dir.path if parent_dir is not None else "",
509 package_name=pkg.name,
510 doc_main_package_name=pkg_install_context.doc_main_package.name,
511 )
512 if dpath.endswith("/"): 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 raise ValueError(
514 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
515 f' with "/" ("{dest_path}"), which it must not!'
516 )
517 dest_and_roots.append((dpath, fs_root))
518 else:
519 if dest_path.endswith("/"): 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true
520 raise ValueError(
521 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
522 " which it must not!"
523 )
524 dest_and_roots.extend(
525 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
526 )
527 return dest_and_roots
530def _resolve_matches(
531 matches: list[PathMatch],
532 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
533 install_context: "InstallRuleContext",
534) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "FSPath"]]]]:
535 dest_and_roots: Sequence[tuple[str, "FSPath"]]
536 if callable(dest_paths): 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true
537 compute_dest_path = dest_paths
538 for match in matches:
539 dpath = compute_dest_path(match)
540 if dpath.endswith("/"):
541 raise ValueError(
542 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
543 )
544 dest_and_roots = [
545 (dpath, install_context[pkg.name].fs_root) for pkg in match.into
546 ]
547 yield match, dest_and_roots
548 else:
549 for match in matches:
550 dest_and_roots = _resolve_dest_paths(
551 match,
552 dest_paths,
553 install_context,
554 )
555 yield match, dest_and_roots
558class InstallRule(DebputyDispatchableType):
559 __slots__ = (
560 "_already_matched",
561 "_exact_match_request",
562 "_condition",
563 "_match_filter",
564 "_definition_source",
565 )
567 def __init__(
568 self,
569 condition: ManifestCondition | None,
570 definition_source: str,
571 *,
572 match_filter: Callable[["VirtualPath"], bool] | None = None,
573 ) -> None:
574 super().__init__()
575 self._condition = condition
576 self._definition_source = definition_source
577 self._match_filter = match_filter
579 def _check_single_match(
580 self, source: FileSystemMatchRule, matches: list[PathMatch]
581 ) -> None:
582 seen_pkgs = set[BinaryPackage]()
583 for m in matches:
584 problem_pkgs = seen_pkgs & m.into
585 if problem_pkgs: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
587 _error(
588 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
589 " However, it should match exactly one item. Please tighten the pattern defined"
590 f" in {self._definition_source}."
591 )
592 seen_pkgs.update(m.into)
594 def _match_pattern(
595 self,
596 path_matcher: SourcePathMatcher,
597 fs_match_rule: FileSystemMatchRule,
598 condition_context: ConditionContext,
599 search_dirs: Sequence[SearchDir],
600 into: frozenset[BinaryPackage],
601 ) -> list[PathMatch]:
602 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches(
603 fs_match_rule.match_rule,
604 search_dirs,
605 fs_match_rule.raw_match_rule.endswith("/"),
606 self._match_filter,
607 into,
608 self._definition_source,
609 )
611 already_installed_paths, already_excluded_paths = exclude_counts
613 if into: 613 ↛ 618line 613 didn't jump to line 618 because the condition on line 613 was always true
614 allow_empty_match = all(not p.should_be_acted_on for p in into)
615 else:
616 # discard rules must match provided at least one search dir exist. If none of them
617 # exist, then we assume the discard rule is for a package that will not be built
618 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
619 if self._condition is not None and not self._condition.evaluate( 619 ↛ 622line 619 didn't jump to line 622 because the condition on line 619 was never true
620 condition_context
621 ):
622 allow_empty_match = True
624 if not matched and not allow_empty_match:
625 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
626 if already_excluded_paths and already_installed_paths: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true
627 total_paths = already_excluded_paths + already_installed_paths
628 msg = (
629 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
630 f" {total_paths} path(s) already been matched previously either by install or"
631 f" exclude rules. If you wanted to install some of these paths into multiple"
632 f" packages, please tweak the definition that installed them to install them"
633 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
634 f" If you wanted to install these paths and exclude rules are getting in your"
635 f" way, then please move this install rule before the exclusion rule that causes"
636 f" issue or, in case of built-in excludes, list the paths explicitly (without"
637 f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
638 f" {fs_match_rule.match_rule.describe_match_exact()}"
639 )
640 elif already_excluded_paths: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true
641 msg = (
642 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
643 f" {already_excluded_paths} path(s) that have been excluded."
644 " If you wanted to install some of these paths, please move the install rule"
645 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
646 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
647 f" {fs_match_rule.match_rule.describe_match_exact()}"
648 )
649 elif already_installed_paths: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true
650 msg = (
651 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
652 f" {already_installed_paths} path(s) already been matched previously."
653 " If you wanted to install some of these paths into multiple packages,"
654 f" please tweak the definition that installed them to install them into"
655 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
656 f" Source for this issue is {self._definition_source}. Match rule:"
657 f" {fs_match_rule.match_rule.describe_match_exact()}"
658 )
659 else:
660 # TODO: Try harder to find the match and point out possible typos
661 msg = (
662 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
663 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
664 )
665 raise NoMatchForInstallPatternError(
666 msg,
667 fs_match_rule,
668 search_dirs,
669 self._definition_source,
670 )
671 return matched
673 def _install_matches(
674 self,
675 path_matcher: SourcePathMatcher,
676 matches: list[PathMatch],
677 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
678 install_context: "InstallRuleContext",
679 into: frozenset[BinaryPackage],
680 condition_context: ConditionContext,
681 ) -> None:
682 if (
683 self._condition is not None
684 and not self._condition.evaluate(condition_context)
685 ) or not any(p.should_be_acted_on for p in into):
686 # Rule is disabled; skip all its actions - also allow empty matches
687 # for this particular case. Though, we do go through the match and
688 # ensure all paths are reserved correctly such that the behavior
689 # between `binary-arch` vs `binary-indep` vs `binary` is consistent
690 for match in matches:
691 self._reserve_recursively(
692 path_matcher,
693 match,
694 into,
695 )
697 return
699 if not matches: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 raise ValueError("matches must not be empty")
702 for match, dest_paths_and_roots in _resolve_matches(
703 matches,
704 dest_paths,
705 install_context,
706 ):
707 install_recursively_into_dirs = []
708 for dest, fs_root in dest_paths_and_roots:
709 dir_part, basename = os.path.split(dest)
710 # We do not associate these with the FS path. First off,
711 # it is complicated to do in most cases (indeed, debhelper
712 # does not preserve these directories either) and secondly,
713 # it is "only" mtime and mode - mostly irrelevant as the
714 # directory is 99.9% likely to be 0755 (we are talking
715 # directories like "/usr", "/usr/share").
716 dir_path = fs_root.mkdirs(dir_part)
717 existing_path = dir_path.get(basename)
719 if match.path.is_dir:
720 if existing_path is not None and not existing_path.is_dir: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 existing_path.unlink()
722 existing_path = None
723 current_dir = existing_path
725 if current_dir is None: 725 ↛ 729line 725 didn't jump to line 729 because the condition on line 725 was always true
726 current_dir = dir_path.mkdir(
727 basename, reference_path=match.path
728 )
729 install_recursively_into_dirs.append(current_dir)
730 else:
731 if existing_path is not None and existing_path.is_dir: 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true
732 _error(
733 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
734 f" and is a directory. This error was triggered via {self._definition_source}."
735 )
737 if match.path.is_symlink:
738 dir_path.add_symlink(
739 basename, match.path.readlink(), reference_path=match.path
740 )
741 else:
742 dir_path.insert_file_from_fs_path(
743 basename,
744 match.path.fs_path,
745 follow_symlinks=False,
746 use_fs_path_mode=True,
747 reference_path=match.path,
748 )
749 if install_recursively_into_dirs:
750 self._install_dir_recursively(
751 path_matcher,
752 install_recursively_into_dirs,
753 match,
754 into,
755 )
757 def _reserve_recursively(
758 self,
759 path_matcher: SourcePathMatcher,
760 match: PathMatch,
761 into: frozenset[BinaryPackage],
762 ) -> None:
763 direct_matched_path = match.path
764 path_matcher.reserve(
765 direct_matched_path,
766 into,
767 self._definition_source,
768 is_exact_match=match.is_exact_match,
769 )
770 if not direct_matched_path.is_dir: 770 ↛ 771line 770 didn't jump to line 771 because the condition on line 770 was never true
771 return
772 stack = list(direct_matched_path.iterdir)
773 while stack:
774 current_path = stack.pop()
775 path_matcher.reserve(
776 current_path,
777 into,
778 self._definition_source,
779 is_exact_match=False,
780 )
781 if current_path.is_dir:
782 stack.extend(current_path.iterdir)
784 def _install_dir_recursively(
785 self,
786 path_matcher: SourcePathMatcher,
787 parent_dirs: Sequence[FSPath],
788 match: PathMatch,
789 into: frozenset[BinaryPackage],
790 ) -> None:
791 stack = [
792 (parent_dirs, e)
793 for e in match.path.iterdir
794 if not path_matcher.is_reserved(e)
795 ]
797 while stack:
798 current_dirs, dir_entry = stack.pop()
799 path_matcher.reserve(
800 dir_entry,
801 into,
802 self._definition_source,
803 is_exact_match=False,
804 )
805 if dir_entry.is_dir: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true
806 new_dirs = [
807 d.mkdir(dir_entry.name, reference_path=dir_entry)
808 for d in current_dirs
809 ]
810 stack.extend(
811 (new_dirs, de)
812 for de in dir_entry.iterdir
813 if not path_matcher.is_reserved(de)
814 )
815 elif dir_entry.is_symlink:
816 for current_dir in current_dirs:
817 current_dir.add_symlink(
818 dir_entry.name,
819 dir_entry.readlink(),
820 reference_path=dir_entry,
821 )
822 elif dir_entry.is_file: 822 ↛ 832line 822 didn't jump to line 832 because the condition on line 822 was always true
823 for current_dir in current_dirs:
824 current_dir.insert_file_from_fs_path(
825 dir_entry.name,
826 dir_entry.fs_path,
827 use_fs_path_mode=True,
828 follow_symlinks=False,
829 reference_path=dir_entry,
830 )
831 else:
832 _error(
833 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
834 )
836 def perform_install(
837 self,
838 path_matcher: SourcePathMatcher,
839 install_context: InstallRuleContext,
840 condition_context: ConditionContext,
841 ) -> None:
842 raise NotImplementedError
844 @classmethod
845 def install_as(
846 cls,
847 source: FileSystemMatchRule,
848 dest_path: str,
849 into: frozenset[BinaryPackage],
850 definition_source: str,
851 condition: ManifestCondition | None,
852 ) -> "InstallRule":
853 return GenericInstallationRule(
854 [source],
855 [(dest_path, False)],
856 into,
857 condition,
858 definition_source,
859 require_single_match=True,
860 )
862 @classmethod
863 def install_dest(
864 cls,
865 sources: Sequence[FileSystemMatchRule],
866 dest_dir: str | None,
867 into: frozenset[BinaryPackage],
868 definition_source: str,
869 condition: ManifestCondition | None,
870 ) -> "InstallRule":
871 if dest_dir is None:
872 dest_dir = "{dirname}/{basename}"
873 else:
874 dest_dir = os.path.join(dest_dir, "{basename}")
875 return GenericInstallationRule(
876 sources,
877 [(dest_dir, True)],
878 into,
879 condition,
880 definition_source,
881 )
883 @classmethod
884 def install_multi_as(
885 cls,
886 source: FileSystemMatchRule,
887 dest_paths: Sequence[str],
888 into: frozenset[BinaryPackage],
889 definition_source: str,
890 condition: ManifestCondition | None,
891 ) -> "InstallRule":
892 if len(dest_paths) < 2: 892 ↛ 893line 892 didn't jump to line 893 because the condition on line 892 was never true
893 raise ValueError(
894 "Please use `install_as` when there is less than 2 dest path"
895 )
896 dps = tuple((dp, False) for dp in dest_paths)
897 return GenericInstallationRule(
898 [source],
899 dps,
900 into,
901 condition,
902 definition_source,
903 require_single_match=True,
904 )
906 @classmethod
907 def install_multi_dest(
908 cls,
909 sources: Sequence[FileSystemMatchRule],
910 dest_dirs: Sequence[str],
911 into: frozenset[BinaryPackage],
912 definition_source: str,
913 condition: ManifestCondition | None,
914 ) -> "InstallRule":
915 if len(dest_dirs) < 2: 915 ↛ 916line 915 didn't jump to line 916 because the condition on line 915 was never true
916 raise ValueError(
917 "Please use `install_dest` when there is less than 2 dest dir"
918 )
919 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
920 return GenericInstallationRule(
921 sources,
922 dest_paths,
923 into,
924 condition,
925 definition_source,
926 )
928 @classmethod
929 def install_doc(
930 cls,
931 sources: Sequence[FileSystemMatchRule],
932 dest_dir: str | None,
933 into: frozenset[BinaryPackage],
934 definition_source: str,
935 condition: ManifestCondition | None,
936 ) -> "InstallRule":
937 cond: ManifestCondition = _BUILD_DOCS_BDO
938 if condition is not None:
939 cond = ManifestCondition.all_of([cond, condition])
940 dest_path_is_format = False
941 if dest_dir is None:
942 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
943 dest_path_is_format = True
945 return GenericInstallationRule(
946 sources,
947 [(dest_dir, dest_path_is_format)],
948 into,
949 cond,
950 definition_source,
951 )
953 @classmethod
954 def install_doc_as(
955 cls,
956 source: FileSystemMatchRule,
957 dest_path: str,
958 into: frozenset[BinaryPackage],
959 definition_source: str,
960 condition: ManifestCondition | None,
961 ) -> "InstallRule":
962 cond: ManifestCondition = _BUILD_DOCS_BDO
963 if condition is not None:
964 cond = ManifestCondition.all_of([cond, condition])
966 return GenericInstallationRule(
967 [source],
968 [(dest_path, False)],
969 into,
970 cond,
971 definition_source,
972 require_single_match=True,
973 )
975 @classmethod
976 def install_examples(
977 cls,
978 sources: Sequence[FileSystemMatchRule],
979 into: frozenset[BinaryPackage],
980 definition_source: str,
981 condition: ManifestCondition | None,
982 ) -> "InstallRule":
983 cond: ManifestCondition = _BUILD_DOCS_BDO
984 if condition is not None: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true
985 cond = ManifestCondition.all_of([cond, condition])
986 return GenericInstallationRule(
987 sources,
988 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
989 into,
990 cond,
991 definition_source,
992 )
994 @classmethod
995 def install_man(
996 cls,
997 sources: Sequence[FileSystemMatchRule],
998 into: frozenset[BinaryPackage],
999 section: int | None,
1000 language: str | None,
1001 definition_source: str,
1002 condition: ManifestCondition | None,
1003 ) -> "InstallRule":
1004 cond: ManifestCondition = _BUILD_DOCS_BDO
1005 if condition is not None: 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true
1006 cond = ManifestCondition.all_of([cond, condition])
1008 dest_path_computer = _dest_path_for_manpage(
1009 section, language, definition_source
1010 )
1012 return GenericInstallationRule(
1013 sources,
1014 dest_path_computer,
1015 into,
1016 cond,
1017 definition_source,
1018 match_filter=lambda m: not m.is_file,
1019 )
1021 @classmethod
1022 def discard_paths(
1023 cls,
1024 paths: Sequence[FileSystemMatchRule],
1025 definition_source: str,
1026 condition: ManifestCondition | None,
1027 *,
1028 limit_to: Sequence[FileSystemExactMatchRule] | None = None,
1029 ) -> "InstallRule":
1030 return DiscardRule(
1031 paths,
1032 condition,
1033 tuple(limit_to) if limit_to is not None else tuple(),
1034 definition_source,
1035 )
1038class PPFInstallRule(InstallRule):
1039 __slots__ = (
1040 "_ppfs",
1041 "_substitution",
1042 "_into",
1043 )
1045 # noinspection PyMissingConstructor
1046 def __init__(
1047 self,
1048 into: BinaryPackage,
1049 substitution: Substitution,
1050 ppfs: Sequence["PackagerProvidedFile"],
1051 ) -> None:
1052 run_in_context_of_plugin(
1053 "debputy",
1054 super().__init__,
1055 None,
1056 "<built-in; PPF install rule>",
1057 )
1058 self._substitution = substitution
1059 self._ppfs = ppfs
1060 self._into = into
1062 def perform_install(
1063 self,
1064 path_matcher: SourcePathMatcher,
1065 install_context: InstallRuleContext,
1066 condition_context: ConditionContext,
1067 ) -> None:
1068 binary_install_context = install_context[self._into.name]
1069 fs_root = binary_install_context.fs_root
1070 for ppf in self._ppfs:
1071 source_path = ppf.path.fs_path
1072 dest_dir, name = ppf.compute_dest()
1073 dir_path = fs_root.mkdirs(dest_dir)
1075 dir_path.insert_file_from_fs_path(
1076 name,
1077 source_path,
1078 follow_symlinks=True,
1079 use_fs_path_mode=False,
1080 mode=ppf.definition.default_mode,
1081 )
1084class GenericInstallationRule(InstallRule):
1085 __slots__ = (
1086 "_sources",
1087 "_into",
1088 "_dest_paths",
1089 "_require_single_match",
1090 )
1092 def __init__(
1093 self,
1094 sources: Sequence[FileSystemMatchRule],
1095 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
1096 into: frozenset[BinaryPackage],
1097 condition: ManifestCondition | None,
1098 definition_source: str,
1099 *,
1100 require_single_match: bool = False,
1101 match_filter: Callable[["VirtualPath"], bool] | None = None,
1102 ) -> None:
1103 super().__init__(
1104 condition,
1105 definition_source,
1106 match_filter=match_filter,
1107 )
1108 self._sources = sources
1109 self._into = into
1110 self._dest_paths = dest_paths
1111 self._require_single_match = require_single_match
1112 if self._require_single_match and len(sources) != 1: 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true
1113 raise ValueError("require_single_match implies sources must have len 1")
1115 def perform_install(
1116 self,
1117 path_matcher: SourcePathMatcher,
1118 install_context: InstallRuleContext,
1119 condition_context: ConditionContext,
1120 ) -> None:
1121 for source in self._sources:
1122 matches = self._match_pattern(
1123 path_matcher,
1124 source,
1125 condition_context,
1126 install_context.search_dirs,
1127 self._into,
1128 )
1129 if self._require_single_match and len(matches) > 1:
1130 self._check_single_match(source, matches)
1131 self._install_matches(
1132 path_matcher,
1133 matches,
1134 self._dest_paths,
1135 install_context,
1136 self._into,
1137 condition_context,
1138 )
1141class DiscardRule(InstallRule):
1142 __slots__ = ("_fs_match_rules", "_limit_to")
1144 def __init__(
1145 self,
1146 fs_match_rules: Sequence[FileSystemMatchRule],
1147 condition: ManifestCondition | None,
1148 limit_to: Sequence[FileSystemExactMatchRule],
1149 definition_source: str,
1150 ) -> None:
1151 super().__init__(condition, definition_source)
1152 self._fs_match_rules = fs_match_rules
1153 self._limit_to = limit_to
1155 def perform_install(
1156 self,
1157 path_matcher: SourcePathMatcher,
1158 install_context: InstallRuleContext,
1159 condition_context: ConditionContext,
1160 ) -> None:
1161 limit_to = self._limit_to
1162 if limit_to:
1163 matches = {x.match_rule.path for x in limit_to}
1164 search_dirs = tuple(
1165 s
1166 for s in install_context.search_dirs
1167 if s.search_dir.fs_path in matches
1168 )
1169 if len(limit_to) != len(search_dirs):
1170 m = matches.difference(s.search_dir.fs_path for s in search_dirs)
1171 paths = ":".join(m)
1172 _error(
1173 f"The discard rule defined at {self._definition_source} mentions the following"
1174 f" search directories that were not known to debputy: {paths}."
1175 " Either the search dir is missing somewhere else or it should be removed from"
1176 " the discard rule."
1177 )
1178 else:
1179 search_dirs = install_context.search_dirs
1181 for fs_match_rule in self._fs_match_rules:
1182 self._match_pattern(
1183 path_matcher,
1184 fs_match_rule,
1185 condition_context,
1186 search_dirs,
1187 into=frozenset(),
1188 )