Coverage for src/debputy/installations.py: 68%
513 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
1import collections
2import dataclasses
3import os.path
4import re
5from enum import IntEnum
6from typing import TYPE_CHECKING, cast, Any
7from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping
9from debputy.exceptions import DebputyRuntimeError
10from debputy.filesystem_scan import FSPath
11from debputy.manifest_conditions import (
12 ConditionContext,
13 ManifestCondition,
14 _BUILD_DOCS_BDO,
15)
16from debputy.manifest_parser.base_types import (
17 FileSystemMatchRule,
18 FileSystemExactMatchRule,
19)
20from debputy.manifest_parser.tagging_types import DebputyDispatchableType
21from debputy.packages import BinaryPackage
22from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
23from debputy.plugin.plugin_state import run_in_context_of_plugin
24from debputy.substitution import Substitution
25from debputy.util import _error, _warn
27if TYPE_CHECKING:
28 from debputy.packager_provided_files import PackagerProvidedFile
29 from debputy.plugin.api import VirtualPath
30 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
33_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
34_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
35_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
36_MAN_REAL_SECTION = re.compile(r"^(\d+)")
37_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
38MAN_GUESS_LANG_FROM_PATH = re.compile(
39 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
40)
41MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
44class InstallRuleError(DebputyRuntimeError):
45 pass
48class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
49 @property
50 def path(self) -> str:
51 return cast("str", self.args[0])
53 @property
54 def into(self) -> frozenset[BinaryPackage]:
55 return cast(frozenset[BinaryPackage], self.args[1])
57 @property
58 def definition_source(self) -> str:
59 return cast("str", self.args[2])
62class ExactPathMatchTwiceError(InstallRuleError):
63 @property
64 def path(self) -> str:
65 return cast("str", self.args[1])
67 @property
68 def into(self) -> BinaryPackage:
69 return cast("BinaryPackage", self.args[2])
71 @property
72 def definition_source(self) -> str:
73 return cast("str", self.args[3])
76class NoMatchForInstallPatternError(InstallRuleError):
77 @property
78 def pattern(self) -> str:
79 return cast("str", self.args[1])
81 @property
82 def search_dirs(self) -> Sequence["SearchDir"]:
83 return cast("Sequence[SearchDir]", self.args[2])
85 @property
86 def definition_source(self) -> str:
87 return cast("str", self.args[3])
90@dataclasses.dataclass(slots=True, frozen=True)
91class SearchDir:
92 search_dir: "VirtualPath"
93 applies_to: frozenset[BinaryPackage]
96@dataclasses.dataclass(slots=True, frozen=True)
97class BinaryPackageInstallRuleContext:
98 binary_package: BinaryPackage
99 fs_root: FSPath
100 doc_main_package: BinaryPackage
102 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
103 return dataclasses.replace(self, **changes)
106@dataclasses.dataclass(slots=True, frozen=True)
107class InstallSearchDirContext:
108 search_dirs: Sequence[SearchDir]
109 check_for_uninstalled_dirs: Sequence["VirtualPath"]
110 # TODO: Support search dirs per-package
111 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
112 default_factory=dict
113 )
116@dataclasses.dataclass(slots=True)
117class InstallRuleContext:
118 # TODO: Search dirs should be per-package
119 search_dirs: Sequence[SearchDir]
120 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = (
121 dataclasses.field(default_factory=dict)
122 )
124 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
125 return self.binary_package_contexts[item]
127 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
128 self.binary_package_contexts[key] = value
130 def replace(self, **changes: Any) -> "InstallRuleContext":
131 return dataclasses.replace(self, **changes)
134@dataclasses.dataclass(slots=True, frozen=True)
135class PathMatch:
136 path: "VirtualPath"
137 search_dir: "VirtualPath"
138 is_exact_match: bool
139 into: frozenset[BinaryPackage]
142class DiscardState(IntEnum):
143 UNCHECKED = 0
144 NOT_DISCARDED = 1
145 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
146 DISCARDED_BY_MANIFEST_RULE = 3
149def _determine_manpage_section(
150 match_rule: PathMatch,
151 provided_section: int | None,
152 definition_source: str,
153) -> str | None:
154 section = str(provided_section) if provided_section is not None else None
155 if section is None:
156 detected_section = None
157 with open(match_rule.path.fs_path) as fd:
158 for line in fd:
159 if not line.startswith((".TH", ".Dt")):
160 continue
162 m = _MAN_DT_LINE.match(line)
163 if not m:
164 m = _MAN_TH_LINE.match(line)
165 if not m:
166 continue
167 detected_section = m.group(1)
168 if "." in detected_section:
169 _warn(
170 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
171 f" (detected via {definition_source}): It looks too much like a version"
172 )
173 detected_section = None
174 break
175 if detected_section is None:
176 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
177 if m:
178 detected_section = m.group(1)
179 section = detected_section
181 return section
184def _determine_manpage_real_section(
185 match_rule: PathMatch,
186 section: str | None,
187 definition_source: str,
188) -> int:
189 real_section = None
190 if section is not None:
191 m = _MAN_REAL_SECTION.match(section)
192 if m:
193 real_section = int(m.group(1))
194 if real_section is None or real_section < 0 or real_section > 9:
195 if real_section is not None:
196 _warn(
197 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
198 f" which is not a valid section (must be between 1 and 9 incl.)"
199 )
200 _error(
201 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page"
202 f" was detected via {definition_source}. Consider using `section: <number>` to"
203 " explicitly declare the section. Keep in mind that it applies to all man pages for that"
204 " rule and you may have to split the rule into two for this reason."
205 )
206 return real_section
209def _determine_manpage_language(
210 match_rule: PathMatch,
211 provided_language: str | None,
212) -> str | None:
213 if provided_language is not None:
214 if provided_language not in ("derive-from-basename", "derive-from-path"):
215 return provided_language if provided_language != "C" else None
216 if provided_language == "derive-from-basename":
217 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
218 if m is None:
219 return None
220 return m.group(1)
221 # Fall-through for derive-from-path case
222 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
223 if m is None:
224 return None
225 return m.group(1)
228def _dest_path_for_manpage(
229 provided_section: int | None,
230 provided_language: str | None,
231 definition_source: str,
232) -> Callable[["PathMatch"], str]:
233 def _manpage_dest_path(match_rule: PathMatch) -> str:
234 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
235 section = _determine_manpage_section(
236 match_rule, provided_section, definition_source
237 )
238 real_section = _determine_manpage_real_section(
239 match_rule, section, definition_source
240 )
241 assert section is not None
242 language = _determine_manpage_language(match_rule, provided_language)
243 if language is None:
244 maybe_language = ""
245 else:
246 maybe_language = f"{language}/"
247 lang_suffix = f".{language}"
248 if inst_basename.endswith(lang_suffix):
249 inst_basename = inst_basename[: -len(lang_suffix)]
251 return (
252 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
253 )
255 return _manpage_dest_path
258class SourcePathMatcher:
259 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None:
260 self._already_matched: dict[
261 str,
262 tuple[frozenset[BinaryPackage], str],
263 ] = {}
264 self._exact_match_request: set[tuple[str, str]] = set()
265 self._discarded: dict[str, DiscardState] = {}
266 self._auto_discard_rules = auto_discard_rules
267 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set)
269 def is_reserved(self, path: "VirtualPath") -> bool:
270 fs_path = path.fs_path
271 if fs_path in self._already_matched:
272 return True
273 result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
274 if result == DiscardState.UNCHECKED:
275 result = self._check_plugin_provided_exclude_state_for(path)
276 if result == DiscardState.NOT_DISCARDED:
277 return False
279 return True
281 def exclude(self, path: str) -> None:
282 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
284 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
285 for dr in self._auto_discard_rules:
286 verdict = dr.should_discard(path)
287 if verdict:
288 self.used_auto_discard_rules[dr.name].add(path.fs_path)
289 return True
290 return False
292 def _check_plugin_provided_exclude_state_for(
293 self,
294 path: "VirtualPath",
295 ) -> DiscardState:
296 cache_misses = []
297 current_path = path
298 while True:
299 fs_path = current_path.fs_path
300 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
301 if exclude_state != DiscardState.UNCHECKED:
302 verdict = exclude_state
303 break
304 cache_misses.append(fs_path)
305 if self._run_plugin_provided_discard_rules_on(current_path):
306 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
307 break
308 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
309 # be excluded without the files in it triggering the rule).
310 if current_path.is_root_dir():
311 verdict = DiscardState.NOT_DISCARDED
312 break
313 parent_dir = current_path.parent_dir
314 assert parent_dir is not None # type hint
315 current_path = parent_dir
316 if cache_misses: 316 ↛ 319line 316 didn't jump to line 319 because the condition on line 316 was always true
317 for p in cache_misses:
318 self._discarded[p] = verdict
319 return verdict
321 def may_match(
322 self,
323 match: PathMatch,
324 *,
325 is_exact_match: bool = False,
326 ) -> tuple[frozenset[BinaryPackage], bool]:
327 m = self._already_matched.get(match.path.fs_path)
328 if m: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 return m[0], False
330 current_path = match.path.fs_path
331 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
333 if discard_state == DiscardState.UNCHECKED:
334 discard_state = self._check_plugin_provided_exclude_state_for(match.path)
336 assert discard_state is not None and discard_state != DiscardState.UNCHECKED
338 is_discarded = discard_state != DiscardState.NOT_DISCARDED
339 if (
340 is_exact_match
341 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
342 ):
343 is_discarded = False
344 return frozenset(), is_discarded
346 def reserve(
347 self,
348 path: "VirtualPath",
349 reserved_by: frozenset[BinaryPackage],
350 definition_source: str,
351 *,
352 is_exact_match: bool = False,
353 ) -> None:
354 fs_path = path.fs_path
355 self._already_matched[fs_path] = reserved_by, definition_source
356 if not is_exact_match: 356 ↛ 358line 356 didn't jump to line 358 because the condition on line 356 was always true
357 return
358 for pkg in reserved_by:
359 m_key = (pkg.name, fs_path)
360 self._exact_match_request.add(m_key)
361 try:
362 del self._discarded[fs_path]
363 except KeyError:
364 pass
365 for discarded_paths in self.used_auto_discard_rules.values():
366 discarded_paths.discard(fs_path)
368 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
369 stack = list(search_dir.iterdir())
370 while stack:
371 m = stack.pop()
372 if m.is_dir:
373 s_len = len(stack)
374 stack.extend(m.iterdir())
376 if s_len == len(stack) and not self.is_reserved(m): 376 ↛ 378line 376 didn't jump to line 378 because the condition on line 376 was never true
377 # "Explicitly" empty dir
378 yield m
379 elif not self.is_reserved(m):
380 yield m
382 def find_and_reserve_all_matches(
383 self,
384 match_rule: MatchRule,
385 search_dirs: Sequence[SearchDir],
386 dir_only_match: bool,
387 match_filter: Callable[["VirtualPath"], bool] | None,
388 reserved_by: frozenset[BinaryPackage],
389 definition_source: str,
390 ) -> tuple[list[PathMatch], tuple[int, ...]]:
391 matched = []
392 already_installed_paths = 0
393 already_excluded_paths = 0
394 glob_expand = not isinstance(match_rule, ExactFileSystemPath)
396 for match in _resolve_path(
397 match_rule,
398 search_dirs,
399 dir_only_match,
400 match_filter,
401 reserved_by,
402 ):
403 installed_into, excluded = self.may_match(
404 match, is_exact_match=not glob_expand
405 )
406 if installed_into: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 if glob_expand:
408 already_installed_paths += 1
409 continue
410 packages = ", ".join(p.name for p in installed_into)
411 raise PathAlreadyInstalledOrDiscardedError(
412 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
413 f" The definition that triggered this issue is {definition_source}.",
414 match,
415 installed_into,
416 definition_source,
417 )
418 if excluded:
419 if glob_expand: 419 ↛ 422line 419 didn't jump to line 422 because the condition on line 419 was always true
420 already_excluded_paths += 1
421 continue
422 raise PathAlreadyInstalledOrDiscardedError(
423 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
424 f" above the exclusion rule that excluded it. The definition that triggered this"
425 f" issue is {definition_source}.",
426 match,
427 installed_into,
428 definition_source,
429 )
430 if not glob_expand:
431 for pkg in match.into:
432 m_key = (pkg.name, match.path.fs_path)
433 if m_key in self._exact_match_request: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 raise ExactPathMatchTwiceError(
435 f'The path "{match.path.fs_path}" (via exact match) has already been installed'
436 f" into {pkg.name}. The second installation triggered by {definition_source}",
437 match.path,
438 pkg,
439 definition_source,
440 )
441 self._exact_match_request.add(m_key)
443 if reserved_by: 443 ↛ 449line 443 didn't jump to line 449 because the condition on line 443 was always true
444 self._already_matched[match.path.fs_path] = (
445 match.into,
446 definition_source,
447 )
448 else:
449 self.exclude(match.path.fs_path)
450 matched.append(match)
451 exclude_counts = already_installed_paths, already_excluded_paths
452 return matched, exclude_counts
455def _resolve_path(
456 match_rule: MatchRule,
457 search_dirs: Iterable["SearchDir"],
458 dir_only_match: bool,
459 match_filter: Callable[["VirtualPath"], bool] | None,
460 into: frozenset[BinaryPackage],
461) -> Iterator[PathMatch]:
462 missing_matches = set(into)
463 for sdir in search_dirs:
464 matched = False
465 if into and missing_matches.isdisjoint(sdir.applies_to): 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was never true
466 # All the packages, where this search dir applies, already got a match
467 continue
468 applicable = sdir.applies_to & missing_matches
469 for matched_path in match_rule.finditer(
470 sdir.search_dir,
471 ignore_paths=match_filter,
472 ):
473 if dir_only_match and not matched_path.is_dir: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true
474 continue
475 if matched_path.is_root_dir():
476 if match_rule is MATCH_ANYTHING: 476 ↛ 478line 476 didn't jump to line 478 because the condition on line 476 was always true
477 continue
478 _error(
479 f"The pattern {match_rule.describe_match_short()} matched the root dir."
480 )
481 yield PathMatch(matched_path, sdir.search_dir, False, applicable)
482 matched = True
483 # continue; we want to match everything we can from this search directory.
485 if matched:
486 missing_matches -= applicable
487 if into and not missing_matches:
488 # For install rules, we can stop as soon as all packages had a match
489 # For discard rules, all search directories must be visited. Otherwise,
490 # you would have to repeat the discard rule once per search dir to be
491 # sure something is fully discarded
492 break
495def _resolve_dest_paths(
496 match: PathMatch,
497 dest_paths: Sequence[tuple[str, bool]],
498 install_context: "InstallRuleContext",
499) -> Sequence[tuple[str, "FSPath"]]:
500 dest_and_roots = []
501 for dest_path, dest_path_is_format in dest_paths:
502 if dest_path_is_format:
503 for pkg in match.into:
504 parent_dir = match.path.parent_dir
505 pkg_install_context = install_context[pkg.name]
506 fs_root = pkg_install_context.fs_root
507 dpath = dest_path.format(
508 basename=match.path.name,
509 dirname=parent_dir.path if parent_dir is not None else "",
510 package_name=pkg.name,
511 doc_main_package_name=pkg_install_context.doc_main_package.name,
512 )
513 if dpath.endswith("/"): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true
514 raise ValueError(
515 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
516 f' with "/" ("{dest_path}"), which it must not!'
517 )
518 dest_and_roots.append((dpath, fs_root))
519 else:
520 if dest_path.endswith("/"): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 raise ValueError(
522 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
523 " which it must not!"
524 )
525 dest_and_roots.extend(
526 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
527 )
528 return dest_and_roots
531def _resolve_matches(
532 matches: list[PathMatch],
533 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
534 install_context: "InstallRuleContext",
535) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "FSPath"]]]]:
536 dest_and_roots: Sequence[tuple[str, "FSPath"]]
537 if callable(dest_paths): 537 ↛ 538line 537 didn't jump to line 538 because the condition on line 537 was never true
538 compute_dest_path = dest_paths
539 for match in matches:
540 dpath = compute_dest_path(match)
541 if dpath.endswith("/"):
542 raise ValueError(
543 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
544 )
545 dest_and_roots = [
546 (dpath, install_context[pkg.name].fs_root) for pkg in match.into
547 ]
548 yield match, dest_and_roots
549 else:
550 for match in matches:
551 dest_and_roots = _resolve_dest_paths(
552 match,
553 dest_paths,
554 install_context,
555 )
556 yield match, dest_and_roots
559class InstallRule(DebputyDispatchableType):
560 __slots__ = (
561 "_already_matched",
562 "_exact_match_request",
563 "_condition",
564 "_match_filter",
565 "_definition_source",
566 )
568 def __init__(
569 self,
570 condition: ManifestCondition | None,
571 definition_source: str,
572 *,
573 match_filter: Callable[["VirtualPath"], bool] | None = None,
574 ) -> None:
575 super().__init__()
576 self._condition = condition
577 self._definition_source = definition_source
578 self._match_filter = match_filter
580 def _check_single_match(
581 self, source: FileSystemMatchRule, matches: list[PathMatch]
582 ) -> None:
583 seen_pkgs = set[BinaryPackage]()
584 for m in matches:
585 problem_pkgs = seen_pkgs & m.into
586 if problem_pkgs: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true
587 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
588 _error(
589 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
590 " However, it should match exactly one item. Please tighten the pattern defined"
591 f" in {self._definition_source}."
592 )
593 seen_pkgs.update(m.into)
595 def _match_pattern(
596 self,
597 path_matcher: SourcePathMatcher,
598 fs_match_rule: FileSystemMatchRule,
599 condition_context: ConditionContext,
600 search_dirs: Sequence[SearchDir],
601 into: frozenset[BinaryPackage],
602 ) -> list[PathMatch]:
603 (matched, exclude_counts) = path_matcher.find_and_reserve_all_matches(
604 fs_match_rule.match_rule,
605 search_dirs,
606 fs_match_rule.raw_match_rule.endswith("/"),
607 self._match_filter,
608 into,
609 self._definition_source,
610 )
612 already_installed_paths, already_excluded_paths = exclude_counts
614 if into: 614 ↛ 619line 614 didn't jump to line 619 because the condition on line 614 was always true
615 allow_empty_match = all(not p.should_be_acted_on for p in into)
616 else:
617 # discard rules must match provided at least one search dir exist. If none of them
618 # exist, then we assume the discard rule is for a package that will not be built
619 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
620 if self._condition is not None and not self._condition.evaluate( 620 ↛ 623line 620 didn't jump to line 623 because the condition on line 620 was never true
621 condition_context
622 ):
623 allow_empty_match = True
625 if not matched and not allow_empty_match:
626 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
627 if already_excluded_paths and already_installed_paths: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 total_paths = already_excluded_paths + already_installed_paths
629 msg = (
630 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
631 f" {total_paths} path(s) already been matched previously either by install or"
632 f" exclude rules. If you wanted to install some of these paths into multiple"
633 f" packages, please tweak the definition that installed them to install them"
634 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
635 f" If you wanted to install these paths and exclude rules are getting in your"
636 f" way, then please move this install rule before the exclusion rule that causes"
637 f" issue or, in case of built-in excludes, list the paths explicitly (without"
638 f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
639 f" {fs_match_rule.match_rule.describe_match_exact()}"
640 )
641 elif already_excluded_paths: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 msg = (
643 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
644 f" {already_excluded_paths} path(s) that have been excluded."
645 " If you wanted to install some of these paths, please move the install rule"
646 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
647 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
648 f" {fs_match_rule.match_rule.describe_match_exact()}"
649 )
650 elif already_installed_paths: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 msg = (
652 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
653 f" {already_installed_paths} path(s) already been matched previously."
654 " If you wanted to install some of these paths into multiple packages,"
655 f" please tweak the definition that installed them to install them into"
656 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
657 f" Source for this issue is {self._definition_source}. Match rule:"
658 f" {fs_match_rule.match_rule.describe_match_exact()}"
659 )
660 else:
661 # TODO: Try harder to find the match and point out possible typos
662 msg = (
663 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
664 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
665 )
666 raise NoMatchForInstallPatternError(
667 msg,
668 fs_match_rule,
669 search_dirs,
670 self._definition_source,
671 )
672 return matched
674 def _install_matches(
675 self,
676 path_matcher: SourcePathMatcher,
677 matches: list[PathMatch],
678 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
679 install_context: "InstallRuleContext",
680 into: frozenset[BinaryPackage],
681 condition_context: ConditionContext,
682 ) -> None:
683 if (
684 self._condition is not None
685 and not self._condition.evaluate(condition_context)
686 ) or not any(p.should_be_acted_on for p in into):
687 # Rule is disabled; skip all its actions - also allow empty matches
688 # for this particular case. Though, we do go through the match and
689 # ensure all paths are reserved correctly such that the behavior
690 # between `binary-arch` vs `binary-indep` vs `binary` is consistent
691 for match in matches:
692 self._reserve_recursively(
693 path_matcher,
694 match,
695 into,
696 )
698 return
700 if not matches: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 raise ValueError("matches must not be empty")
703 for match, dest_paths_and_roots in _resolve_matches(
704 matches,
705 dest_paths,
706 install_context,
707 ):
708 install_recursively_into_dirs = []
709 for dest, fs_root in dest_paths_and_roots:
710 dir_part, basename = os.path.split(dest)
711 # We do not associate these with the FS path. First off,
712 # it is complicated to do in most cases (indeed, debhelper
713 # does not preserve these directories either) and secondly,
714 # it is "only" mtime and mode - mostly irrelevant as the
715 # directory is 99.9% likely to be 0755 (we are talking
716 # directories like "/usr", "/usr/share").
717 dir_path = fs_root.mkdirs(dir_part)
718 existing_path = dir_path.get(basename)
720 if match.path.is_dir:
721 if existing_path is not None and not existing_path.is_dir: 721 ↛ 722line 721 didn't jump to line 722 because the condition on line 721 was never true
722 existing_path.unlink()
723 existing_path = None
724 current_dir = existing_path
726 if current_dir is None: 726 ↛ 730line 726 didn't jump to line 730 because the condition on line 726 was always true
727 current_dir = dir_path.mkdir(
728 basename, reference_path=match.path
729 )
730 install_recursively_into_dirs.append(current_dir)
731 else:
732 if existing_path is not None and existing_path.is_dir: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 _error(
734 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
735 f" and is a directory. This error was triggered via {self._definition_source}."
736 )
738 if match.path.is_symlink:
739 dir_path.add_symlink(
740 basename, match.path.readlink(), reference_path=match.path
741 )
742 else:
743 dir_path.insert_file_from_fs_path(
744 basename,
745 match.path.fs_path,
746 follow_symlinks=False,
747 use_fs_path_mode=True,
748 reference_path=match.path,
749 )
750 if install_recursively_into_dirs:
751 self._install_dir_recursively(
752 path_matcher,
753 install_recursively_into_dirs,
754 match,
755 into,
756 )
758 def _reserve_recursively(
759 self,
760 path_matcher: SourcePathMatcher,
761 match: PathMatch,
762 into: frozenset[BinaryPackage],
763 ) -> None:
764 direct_matched_path = match.path
765 path_matcher.reserve(
766 direct_matched_path,
767 into,
768 self._definition_source,
769 is_exact_match=match.is_exact_match,
770 )
771 if not direct_matched_path.is_dir: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true
772 return
773 stack = list(direct_matched_path.iterdir())
774 while stack:
775 current_path = stack.pop()
776 path_matcher.reserve(
777 current_path,
778 into,
779 self._definition_source,
780 is_exact_match=False,
781 )
782 if current_path.is_dir:
783 stack.extend(current_path.iterdir())
785 def _install_dir_recursively(
786 self,
787 path_matcher: SourcePathMatcher,
788 parent_dirs: Sequence[FSPath],
789 match: PathMatch,
790 into: frozenset[BinaryPackage],
791 ) -> None:
792 stack = [
793 (parent_dirs, e)
794 for e in match.path.iterdir()
795 if not path_matcher.is_reserved(e)
796 ]
798 while stack:
799 current_dirs, dir_entry = stack.pop()
800 path_matcher.reserve(
801 dir_entry,
802 into,
803 self._definition_source,
804 is_exact_match=False,
805 )
806 if dir_entry.is_dir: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 new_dirs = [
808 d.mkdir(dir_entry.name, reference_path=dir_entry)
809 for d in current_dirs
810 ]
811 stack.extend(
812 (new_dirs, de)
813 for de in dir_entry.iterdir()
814 if not path_matcher.is_reserved(de)
815 )
816 elif dir_entry.is_symlink:
817 for current_dir in current_dirs:
818 current_dir.add_symlink(
819 dir_entry.name,
820 dir_entry.readlink(),
821 reference_path=dir_entry,
822 )
823 elif dir_entry.is_file: 823 ↛ 833line 823 didn't jump to line 833 because the condition on line 823 was always true
824 for current_dir in current_dirs:
825 current_dir.insert_file_from_fs_path(
826 dir_entry.name,
827 dir_entry.fs_path,
828 use_fs_path_mode=True,
829 follow_symlinks=False,
830 reference_path=dir_entry,
831 )
832 else:
833 _error(
834 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
835 )
837 def perform_install(
838 self,
839 path_matcher: SourcePathMatcher,
840 install_context: InstallRuleContext,
841 condition_context: ConditionContext,
842 ) -> None:
843 raise NotImplementedError
845 @classmethod
846 def install_as(
847 cls,
848 source: FileSystemMatchRule,
849 dest_path: str,
850 into: frozenset[BinaryPackage],
851 definition_source: str,
852 condition: ManifestCondition | None,
853 ) -> "InstallRule":
854 return GenericInstallationRule(
855 [source],
856 [(dest_path, False)],
857 into,
858 condition,
859 definition_source,
860 require_single_match=True,
861 )
863 @classmethod
864 def install_dest(
865 cls,
866 sources: Sequence[FileSystemMatchRule],
867 dest_dir: str | None,
868 into: frozenset[BinaryPackage],
869 definition_source: str,
870 condition: ManifestCondition | None,
871 ) -> "InstallRule":
872 if dest_dir is None:
873 dest_dir = "{dirname}/{basename}"
874 else:
875 dest_dir = os.path.join(dest_dir, "{basename}")
876 return GenericInstallationRule(
877 sources,
878 [(dest_dir, True)],
879 into,
880 condition,
881 definition_source,
882 )
884 @classmethod
885 def install_multi_as(
886 cls,
887 source: FileSystemMatchRule,
888 dest_paths: Sequence[str],
889 into: frozenset[BinaryPackage],
890 definition_source: str,
891 condition: ManifestCondition | None,
892 ) -> "InstallRule":
893 if len(dest_paths) < 2: 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true
894 raise ValueError(
895 "Please use `install_as` when there is less than 2 dest path"
896 )
897 dps = tuple((dp, False) for dp in dest_paths)
898 return GenericInstallationRule(
899 [source],
900 dps,
901 into,
902 condition,
903 definition_source,
904 require_single_match=True,
905 )
907 @classmethod
908 def install_multi_dest(
909 cls,
910 sources: Sequence[FileSystemMatchRule],
911 dest_dirs: Sequence[str],
912 into: frozenset[BinaryPackage],
913 definition_source: str,
914 condition: ManifestCondition | None,
915 ) -> "InstallRule":
916 if len(dest_dirs) < 2: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true
917 raise ValueError(
918 "Please use `install_dest` when there is less than 2 dest dir"
919 )
920 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
921 return GenericInstallationRule(
922 sources,
923 dest_paths,
924 into,
925 condition,
926 definition_source,
927 )
929 @classmethod
930 def install_doc(
931 cls,
932 sources: Sequence[FileSystemMatchRule],
933 dest_dir: str | None,
934 into: frozenset[BinaryPackage],
935 definition_source: str,
936 condition: ManifestCondition | None,
937 ) -> "InstallRule":
938 cond: ManifestCondition = _BUILD_DOCS_BDO
939 if condition is not None:
940 cond = ManifestCondition.all_of([cond, condition])
941 dest_path_is_format = False
942 if dest_dir is None:
943 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
944 dest_path_is_format = True
946 return GenericInstallationRule(
947 sources,
948 [(dest_dir, dest_path_is_format)],
949 into,
950 cond,
951 definition_source,
952 )
954 @classmethod
955 def install_doc_as(
956 cls,
957 source: FileSystemMatchRule,
958 dest_path: str,
959 into: frozenset[BinaryPackage],
960 definition_source: str,
961 condition: ManifestCondition | None,
962 ) -> "InstallRule":
963 cond: ManifestCondition = _BUILD_DOCS_BDO
964 if condition is not None:
965 cond = ManifestCondition.all_of([cond, condition])
967 return GenericInstallationRule(
968 [source],
969 [(dest_path, False)],
970 into,
971 cond,
972 definition_source,
973 require_single_match=True,
974 )
976 @classmethod
977 def install_examples(
978 cls,
979 sources: Sequence[FileSystemMatchRule],
980 into: frozenset[BinaryPackage],
981 definition_source: str,
982 condition: ManifestCondition | None,
983 ) -> "InstallRule":
984 cond: ManifestCondition = _BUILD_DOCS_BDO
985 if condition is not None: 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true
986 cond = ManifestCondition.all_of([cond, condition])
987 return GenericInstallationRule(
988 sources,
989 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
990 into,
991 cond,
992 definition_source,
993 )
995 @classmethod
996 def install_man(
997 cls,
998 sources: Sequence[FileSystemMatchRule],
999 into: frozenset[BinaryPackage],
1000 section: int | None,
1001 language: str | None,
1002 definition_source: str,
1003 condition: ManifestCondition | None,
1004 ) -> "InstallRule":
1005 cond: ManifestCondition = _BUILD_DOCS_BDO
1006 if condition is not None: 1006 ↛ 1007line 1006 didn't jump to line 1007 because the condition on line 1006 was never true
1007 cond = ManifestCondition.all_of([cond, condition])
1009 dest_path_computer = _dest_path_for_manpage(
1010 section, language, definition_source
1011 )
1013 return GenericInstallationRule(
1014 sources,
1015 dest_path_computer,
1016 into,
1017 cond,
1018 definition_source,
1019 match_filter=lambda m: not m.is_file,
1020 )
1022 @classmethod
1023 def discard_paths(
1024 cls,
1025 paths: Sequence[FileSystemMatchRule],
1026 definition_source: str,
1027 condition: ManifestCondition | None,
1028 *,
1029 limit_to: Sequence[FileSystemExactMatchRule] | None = None,
1030 ) -> "InstallRule":
1031 return DiscardRule(
1032 paths,
1033 condition,
1034 tuple(limit_to) if limit_to is not None else tuple(),
1035 definition_source,
1036 )
1039class PPFInstallRule(InstallRule):
1040 __slots__ = (
1041 "_ppfs",
1042 "_substitution",
1043 "_into",
1044 )
1046 # noinspection PyMissingConstructor
1047 def __init__(
1048 self,
1049 into: BinaryPackage,
1050 substitution: Substitution,
1051 ppfs: Sequence["PackagerProvidedFile"],
1052 ) -> None:
1053 run_in_context_of_plugin(
1054 "debputy",
1055 super().__init__,
1056 None,
1057 "<built-in; PPF install rule>",
1058 )
1059 self._substitution = substitution
1060 self._ppfs = ppfs
1061 self._into = into
1063 def perform_install(
1064 self,
1065 path_matcher: SourcePathMatcher,
1066 install_context: InstallRuleContext,
1067 condition_context: ConditionContext,
1068 ) -> None:
1069 binary_install_context = install_context[self._into.name]
1070 fs_root = binary_install_context.fs_root
1071 for ppf in self._ppfs:
1072 source_path = ppf.path.fs_path
1073 dest_dir, name = ppf.compute_dest()
1074 dir_path = fs_root.mkdirs(dest_dir)
1076 dir_path.insert_file_from_fs_path(
1077 name,
1078 source_path,
1079 follow_symlinks=True,
1080 use_fs_path_mode=False,
1081 mode=ppf.definition.default_mode,
1082 )
1085class GenericInstallationRule(InstallRule):
1086 __slots__ = (
1087 "_sources",
1088 "_into",
1089 "_dest_paths",
1090 "_require_single_match",
1091 )
1093 def __init__(
1094 self,
1095 sources: Sequence[FileSystemMatchRule],
1096 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
1097 into: frozenset[BinaryPackage],
1098 condition: ManifestCondition | None,
1099 definition_source: str,
1100 *,
1101 require_single_match: bool = False,
1102 match_filter: Callable[["VirtualPath"], bool] | None = None,
1103 ) -> None:
1104 super().__init__(
1105 condition,
1106 definition_source,
1107 match_filter=match_filter,
1108 )
1109 self._sources = sources
1110 self._into = into
1111 self._dest_paths = dest_paths
1112 self._require_single_match = require_single_match
1113 if self._require_single_match and len(sources) != 1: 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 raise ValueError("require_single_match implies sources must have len 1")
1116 def perform_install(
1117 self,
1118 path_matcher: SourcePathMatcher,
1119 install_context: InstallRuleContext,
1120 condition_context: ConditionContext,
1121 ) -> None:
1122 for source in self._sources:
1123 matches = self._match_pattern(
1124 path_matcher,
1125 source,
1126 condition_context,
1127 install_context.search_dirs,
1128 self._into,
1129 )
1130 if self._require_single_match and len(matches) > 1:
1131 self._check_single_match(source, matches)
1132 self._install_matches(
1133 path_matcher,
1134 matches,
1135 self._dest_paths,
1136 install_context,
1137 self._into,
1138 condition_context,
1139 )
1142class DiscardRule(InstallRule):
1143 __slots__ = ("_fs_match_rules", "_limit_to")
1145 def __init__(
1146 self,
1147 fs_match_rules: Sequence[FileSystemMatchRule],
1148 condition: ManifestCondition | None,
1149 limit_to: Sequence[FileSystemExactMatchRule],
1150 definition_source: str,
1151 ) -> None:
1152 super().__init__(condition, definition_source)
1153 self._fs_match_rules = fs_match_rules
1154 self._limit_to = limit_to
1156 def perform_install(
1157 self,
1158 path_matcher: SourcePathMatcher,
1159 install_context: InstallRuleContext,
1160 condition_context: ConditionContext,
1161 ) -> None:
1162 limit_to = self._limit_to
1163 if limit_to:
1164 matches = {x.match_rule.path for x in limit_to}
1165 search_dirs: Sequence[SearchDir] = tuple(
1166 s
1167 for s in install_context.search_dirs
1168 if s.search_dir.fs_path in matches
1169 )
1170 if len(limit_to) != len(search_dirs):
1171 m = matches.difference(s.search_dir.fs_path for s in search_dirs)
1172 paths = ":".join(m)
1173 _error(
1174 f"The discard rule defined at {self._definition_source} mentions the following"
1175 f" search directories that were not known to debputy: {paths}."
1176 " Either the search dir is missing somewhere else or it should be removed from"
1177 " the discard rule."
1178 )
1179 else:
1180 search_dirs = install_context.search_dirs
1182 for fs_match_rule in self._fs_match_rules:
1183 self._match_pattern(
1184 path_matcher,
1185 fs_match_rule,
1186 condition_context,
1187 search_dirs,
1188 into=frozenset(),
1189 )