Coverage for src/debputy/installations.py: 69%
514 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
1import collections
2import dataclasses
3import os.path
4import re
5from enum import IntEnum
6from typing import TYPE_CHECKING, cast, Any
7from collections.abc import Callable, Iterator, Sequence, Iterable, Mapping
9from debputy.exceptions import DebputyRuntimeError
10from debputy.filesystem_scan import InMemoryVirtualPathBase
11from debputy.manifest_conditions import (
12 ConditionContext,
13 ManifestCondition,
14 _BUILD_DOCS_BDO,
15)
16from debputy.manifest_parser.base_types import (
17 FileSystemMatchRule,
18 FileSystemExactMatchRule,
19)
20from debputy.manifest_parser.tagging_types import DebputyDispatchableType
21from debputy.packages import BinaryPackage
22from debputy.path_matcher import MatchRule, ExactFileSystemPath, MATCH_ANYTHING
23from debputy.plugin.plugin_state import run_in_context_of_plugin
24from debputy.substitution import Substitution
25from debputy.util import _error, _warn
27if TYPE_CHECKING:
28 from debputy.packager_provided_files import PackagerProvidedFile
29 from debputy.plugin.api import VirtualPath
30 from debputy.plugin.api.impl_types import PluginProvidedDiscardRule
33_MAN_TH_LINE = re.compile(r'^[.]TH\s+\S+\s+"?(\d+[^"\s]*)"?')
34_MAN_DT_LINE = re.compile(r"^[.]Dt\s+\S+\s+(\d+\S*)")
35_MAN_SECTION_BASENAME = re.compile(r"[.]([1-9]\w*)(?:[.]gz)?$")
36_MAN_REAL_SECTION = re.compile(r"^(\d+)")
37_MAN_INST_BASENAME = re.compile(r"[.][^.]+$")
38MAN_GUESS_LANG_FROM_PATH = re.compile(
39 r"(?:^|/)man/(?:([a-z][a-z](?:_[A-Z][A-Z])?)(?:\.[^/]+)?)?/man[1-9]/"
40)
41MAN_GUESS_FROM_BASENAME = re.compile(r"[.]([a-z][a-z](?:_[A-Z][A-Z])?)[.](?:[1-9]|man)")
44class InstallRuleError(DebputyRuntimeError):
45 pass
48class PathAlreadyInstalledOrDiscardedError(InstallRuleError):
49 @property
50 def path(self) -> str:
51 return cast("str", self.args[0])
53 @property
54 def into(self) -> frozenset[BinaryPackage]:
55 return cast(frozenset[BinaryPackage], self.args[1])
57 @property
58 def definition_source(self) -> str:
59 return cast("str", self.args[2])
62class ExactPathMatchTwiceError(InstallRuleError):
63 @property
64 def path(self) -> str:
65 return cast("str", self.args[1])
67 @property
68 def into(self) -> BinaryPackage:
69 return cast("BinaryPackage", self.args[2])
71 @property
72 def definition_source(self) -> str:
73 return cast("str", self.args[3])
76class NoMatchForInstallPatternError(InstallRuleError):
77 @property
78 def pattern(self) -> str:
79 return cast("str", self.args[1])
81 @property
82 def search_dirs(self) -> Sequence["SearchDir"]:
83 return cast("Sequence[SearchDir]", self.args[2])
85 @property
86 def definition_source(self) -> str:
87 return cast("str", self.args[3])
90@dataclasses.dataclass(slots=True, frozen=True)
91class SearchDir:
92 search_dir: "VirtualPath"
93 applies_to: frozenset[BinaryPackage]
96@dataclasses.dataclass(slots=True, frozen=True)
97class BinaryPackageInstallRuleContext:
98 binary_package: BinaryPackage
99 fs_root: InMemoryVirtualPathBase
100 doc_main_package: BinaryPackage
102 def replace(self, **changes: Any) -> "BinaryPackageInstallRuleContext":
103 return dataclasses.replace(self, **changes)
106@dataclasses.dataclass(slots=True, frozen=True)
107class InstallSearchDirContext:
108 search_dirs: Sequence[SearchDir]
109 check_for_uninstalled_dirs: Sequence["VirtualPath"]
110 # TODO: Support search dirs per-package
111 debian_pkg_dirs: Mapping[str, "VirtualPath"] = dataclasses.field(
112 default_factory=dict
113 )
116@dataclasses.dataclass(slots=True)
117class InstallRuleContext:
118 # TODO: Search dirs should be per-package
119 search_dirs: Sequence[SearchDir]
120 binary_package_contexts: dict[str, BinaryPackageInstallRuleContext] = (
121 dataclasses.field(default_factory=dict)
122 )
124 def __getitem__(self, item: str) -> BinaryPackageInstallRuleContext:
125 return self.binary_package_contexts[item]
127 def __setitem__(self, key: str, value: BinaryPackageInstallRuleContext) -> None:
128 self.binary_package_contexts[key] = value
130 def replace(self, **changes: Any) -> "InstallRuleContext":
131 return dataclasses.replace(self, **changes)
134@dataclasses.dataclass(slots=True, frozen=True)
135class PathMatch:
136 path: "VirtualPath"
137 search_dir: "VirtualPath"
138 is_exact_match: bool
139 into: frozenset[BinaryPackage]
142class DiscardState(IntEnum):
143 UNCHECKED = 0
144 NOT_DISCARDED = 1
145 DISCARDED_BY_PLUGIN_PROVIDED_RULE = 2
146 DISCARDED_BY_MANIFEST_RULE = 3
149def _determine_manpage_section(
150 match_rule: PathMatch,
151 provided_section: int | None,
152 definition_source: str,
153) -> str | None:
154 section = str(provided_section) if provided_section is not None else None
155 if section is None:
156 detected_section = None
157 with open(match_rule.path.fs_path) as fd:
158 for line in fd:
159 if not line.startswith((".TH", ".Dt")):
160 continue
162 m = _MAN_DT_LINE.match(line)
163 if not m:
164 m = _MAN_TH_LINE.match(line)
165 if not m:
166 continue
167 detected_section = m.group(1)
168 if "." in detected_section:
169 _warn(
170 f"Ignoring detected section {detected_section} in {match_rule.path.fs_path}"
171 f" (detected via {definition_source}): It looks too much like a version"
172 )
173 detected_section = None
174 break
175 if detected_section is None:
176 m = _MAN_SECTION_BASENAME.search(os.path.basename(match_rule.path.path))
177 if m:
178 detected_section = m.group(1)
179 section = detected_section
181 return section
184def _determine_manpage_real_section(
185 match_rule: PathMatch,
186 section: str | None,
187 definition_source: str,
188) -> int:
189 real_section = None
190 if section is not None:
191 m = _MAN_REAL_SECTION.match(section)
192 if m:
193 real_section = int(m.group(1))
194 if real_section is None or real_section < 0 or real_section > 9:
195 if real_section is not None:
196 _warn(
197 f"Computed section for {match_rule.path.fs_path} was {real_section} (section: {section}),"
198 f" which is not a valid section (must be between 1 and 9 incl.)"
199 )
200 _error(
201 f"Could not determine the section for {match_rule.path.fs_path} automatically. The man page"
202 f" was detected via {definition_source}. Consider using `section: <number>` to"
203 " explicitly declare the section. Keep in mind that it applies to all man pages for that"
204 " rule and you may have to split the rule into two for this reason."
205 )
206 return real_section
209def _determine_manpage_language(
210 match_rule: PathMatch,
211 provided_language: str | None,
212) -> str | None:
213 if provided_language is not None:
214 if provided_language not in ("derive-from-basename", "derive-from-path"):
215 return provided_language if provided_language != "C" else None
216 if provided_language == "derive-from-basename":
217 m = MAN_GUESS_FROM_BASENAME.search(match_rule.path.name)
218 if m is None:
219 return None
220 return m.group(1)
221 # Fall-through for derive-from-path case
222 m = MAN_GUESS_LANG_FROM_PATH.search(match_rule.path.path)
223 if m is None:
224 return None
225 return m.group(1)
228def _dest_path_for_manpage(
229 provided_section: int | None,
230 provided_language: str | None,
231 definition_source: str,
232) -> Callable[["PathMatch"], str]:
233 def _manpage_dest_path(match_rule: PathMatch) -> str:
234 inst_basename = _MAN_INST_BASENAME.sub("", match_rule.path.name)
235 section = _determine_manpage_section(
236 match_rule, provided_section, definition_source
237 )
238 real_section = _determine_manpage_real_section(
239 match_rule, section, definition_source
240 )
241 assert section is not None
242 language = _determine_manpage_language(match_rule, provided_language)
243 if language is None:
244 maybe_language = ""
245 else:
246 maybe_language = f"{language}/"
247 lang_suffix = f".{language}"
248 if inst_basename.endswith(lang_suffix):
249 inst_basename = inst_basename[: -len(lang_suffix)]
251 return (
252 f"usr/share/man/{maybe_language}man{real_section}/{inst_basename}.{section}"
253 )
255 return _manpage_dest_path
258class SourcePathMatcher:
259 def __init__(self, auto_discard_rules: list["PluginProvidedDiscardRule"]) -> None:
260 self._already_matched: dict[
261 str,
262 tuple[frozenset[BinaryPackage], str],
263 ] = {}
264 self._exact_match_request: set[tuple[str, str]] = set()
265 self._discarded: dict[str, DiscardState] = {}
266 self._auto_discard_rules = auto_discard_rules
267 self.used_auto_discard_rules: dict[str, set[str]] = collections.defaultdict(set)
269 def is_reserved(self, path: "VirtualPath") -> bool:
270 fs_path = path.fs_path
271 if fs_path in self._already_matched:
272 return True
273 result = self._discarded.get(fs_path, DiscardState.UNCHECKED)
274 if result == DiscardState.UNCHECKED:
275 result = self._check_plugin_provided_exclude_state_for(path)
276 if result == DiscardState.NOT_DISCARDED:
277 return False
279 return True
281 def exclude(self, path: str) -> None:
282 self._discarded[path] = DiscardState.DISCARDED_BY_MANIFEST_RULE
284 def _run_plugin_provided_discard_rules_on(self, path: "VirtualPath") -> bool:
285 for dr in self._auto_discard_rules:
286 verdict = dr.should_discard(path)
287 if verdict:
288 self.used_auto_discard_rules[dr.name].add(path.fs_path)
289 return True
290 return False
292 def _check_plugin_provided_exclude_state_for(
293 self,
294 path: "VirtualPath",
295 ) -> DiscardState:
296 cache_misses = []
297 current_path = path
298 while True:
299 fs_path = current_path.fs_path
300 exclude_state = self._discarded.get(fs_path, DiscardState.UNCHECKED)
301 if exclude_state != DiscardState.UNCHECKED:
302 verdict = exclude_state
303 break
304 cache_misses.append(fs_path)
305 if self._run_plugin_provided_discard_rules_on(current_path):
306 verdict = DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
307 break
308 # We cannot trust a "NOT_DISCARDED" until we check its parent (the directory could
309 # be excluded without the files in it triggering the rule).
310 if current_path.is_root_dir():
311 verdict = DiscardState.NOT_DISCARDED
312 break
313 parent_dir = current_path.parent_dir
314 assert parent_dir is not None # type hint
315 current_path = parent_dir
316 if cache_misses: 316 ↛ 319line 316 didn't jump to line 319 because the condition on line 316 was always true
317 for p in cache_misses:
318 self._discarded[p] = verdict
319 return verdict
321 def may_match(
322 self,
323 match: PathMatch,
324 *,
325 is_exact_match: bool = False,
326 ) -> tuple[frozenset[BinaryPackage], bool]:
327 m = self._already_matched.get(match.path.fs_path)
328 if m: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 return m[0], False
330 current_path = match.path.fs_path
331 discard_state = self._discarded.get(current_path, DiscardState.UNCHECKED)
333 if discard_state == DiscardState.UNCHECKED:
334 discard_state = self._check_plugin_provided_exclude_state_for(match.path)
336 assert discard_state is not None and discard_state != DiscardState.UNCHECKED
338 is_discarded = discard_state != DiscardState.NOT_DISCARDED
339 if (
340 is_exact_match
341 and discard_state == DiscardState.DISCARDED_BY_PLUGIN_PROVIDED_RULE
342 ):
343 is_discarded = False
344 return frozenset(), is_discarded
346 def reserve(
347 self,
348 path: "VirtualPath",
349 reserved_by: frozenset[BinaryPackage],
350 definition_source: str,
351 *,
352 is_exact_match: bool = False,
353 ) -> None:
354 fs_path = path.fs_path
355 self._already_matched[fs_path] = reserved_by, definition_source
356 if not is_exact_match: 356 ↛ 358line 356 didn't jump to line 358 because the condition on line 356 was always true
357 return
358 for pkg in reserved_by:
359 m_key = (pkg.name, fs_path)
360 self._exact_match_request.add(m_key)
361 try:
362 del self._discarded[fs_path]
363 except KeyError:
364 pass
365 for discarded_paths in self.used_auto_discard_rules.values():
366 discarded_paths.discard(fs_path)
368 def detect_missing(self, search_dir: "VirtualPath") -> Iterator["VirtualPath"]:
369 stack = list(search_dir.iterdir())
370 while stack:
371 m = stack.pop()
372 if m.is_dir:
373 s_len = len(stack)
374 stack.extend(m.iterdir())
376 if s_len == len(stack) and not self.is_reserved(m): 376 ↛ 378line 376 didn't jump to line 378 because the condition on line 376 was never true
377 # "Explicitly" empty dir
378 yield m
379 elif not self.is_reserved(m):
380 yield m
382 def find_and_reserve_all_matches(
383 self,
384 match_rule: MatchRule,
385 search_dirs: Sequence[SearchDir],
386 dir_only_match: bool,
387 match_filter: Callable[["VirtualPath"], bool] | None,
388 reserved_by: frozenset[BinaryPackage],
389 definition_source: str,
390 ) -> tuple[list[PathMatch], tuple[int, ...]]:
391 matched = []
392 already_installed_paths = 0
393 already_excluded_paths = 0
394 glob_expand = not isinstance(match_rule, ExactFileSystemPath)
396 for match in _resolve_path(
397 match_rule,
398 search_dirs,
399 dir_only_match,
400 match_filter,
401 reserved_by,
402 ):
403 installed_into, excluded = self.may_match(
404 match, is_exact_match=not glob_expand
405 )
406 if installed_into: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 if glob_expand:
408 already_installed_paths += 1
409 continue
410 packages = ", ".join(p.name for p in installed_into)
411 raise PathAlreadyInstalledOrDiscardedError(
412 f'The "{match.path.fs_path}" has been reserved by and installed into {packages}.'
413 f" The definition that triggered this issue is {definition_source}.",
414 match,
415 installed_into,
416 definition_source,
417 )
418 if excluded:
419 if glob_expand: 419 ↛ 422line 419 didn't jump to line 422 because the condition on line 419 was always true
420 already_excluded_paths += 1
421 continue
422 raise PathAlreadyInstalledOrDiscardedError(
423 f'The "{match.path.fs_path}" has been excluded. If you want this path installed, move it'
424 f" above the exclusion rule that excluded it. The definition that triggered this"
425 f" issue is {definition_source}.",
426 match,
427 installed_into,
428 definition_source,
429 )
430 if not glob_expand:
431 for pkg in match.into:
432 m_key = (pkg.name, match.path.fs_path)
433 if m_key in self._exact_match_request: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 raise ExactPathMatchTwiceError(
435 f'The path "{match.path.fs_path}" (via exact match) has already been installed'
436 f" into {pkg.name}. The second installation triggered by {definition_source}",
437 match.path,
438 pkg,
439 definition_source,
440 )
441 self._exact_match_request.add(m_key)
443 if reserved_by: 443 ↛ 449line 443 didn't jump to line 449 because the condition on line 443 was always true
444 self._already_matched[match.path.fs_path] = (
445 match.into,
446 definition_source,
447 )
448 else:
449 self.exclude(match.path.fs_path)
450 matched.append(match)
451 exclude_counts = already_installed_paths, already_excluded_paths
452 return matched, exclude_counts
455def _resolve_path(
456 match_rule: MatchRule,
457 search_dirs: Iterable["SearchDir"],
458 dir_only_match: bool,
459 match_filter: Callable[["VirtualPath"], bool] | None,
460 into: frozenset[BinaryPackage],
461) -> Iterator[PathMatch]:
462 missing_matches = set(into)
463 for sdir in search_dirs:
464 matched = False
465 if into and missing_matches.isdisjoint(sdir.applies_to): 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was never true
466 # All the packages, where this search dir applies, already got a match
467 continue
468 applicable = sdir.applies_to & missing_matches
469 for matched_path in match_rule.finditer(
470 sdir.search_dir,
471 ignore_paths=match_filter,
472 ):
473 if dir_only_match and not matched_path.is_dir: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true
474 continue
475 if matched_path.is_root_dir():
476 if match_rule is MATCH_ANYTHING: 476 ↛ 478line 476 didn't jump to line 478 because the condition on line 476 was always true
477 continue
478 _error(
479 f"The pattern {match_rule.describe_match_short()} matched the root dir."
480 )
481 yield PathMatch(matched_path, sdir.search_dir, False, applicable)
482 matched = True
483 # continue; we want to match everything we can from this search directory.
485 if matched:
486 missing_matches -= applicable
487 if into and not missing_matches:
488 # For install rules, we can stop as soon as all packages had a match
489 # For discard rules, all search directories must be visited. Otherwise,
490 # you would have to repeat the discard rule once per search dir to be
491 # sure something is fully discarded
492 break
495def _resolve_dest_paths(
496 match: PathMatch,
497 dest_paths: Sequence[tuple[str, bool]],
498 install_context: "InstallRuleContext",
499) -> Sequence[tuple[str, "InMemoryVirtualPathBase"]]:
500 dest_and_roots = []
501 for dest_path, dest_path_is_format in dest_paths:
502 if dest_path_is_format:
503 for pkg in match.into:
504 parent_dir = match.path.parent_dir
505 pkg_install_context = install_context[pkg.name]
506 fs_root = pkg_install_context.fs_root
507 dpath = dest_path.format(
508 basename=match.path.name,
509 dirname=parent_dir.path if parent_dir is not None else "",
510 package_name=pkg.name,
511 doc_main_package_name=pkg_install_context.doc_main_package.name,
512 )
513 if dpath.endswith("/"): 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true
514 raise ValueError(
515 f'Provided destination (when resolved for {pkg.name}) for "{match.path.path}" ended'
516 f' with "/" ("{dest_path}"), which it must not!'
517 )
518 dest_and_roots.append((dpath, fs_root))
519 else:
520 if dest_path.endswith("/"): 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 raise ValueError(
522 f'Provided destination for "{match.path.path}" ended with "/" ("{dest_path}"),'
523 " which it must not!"
524 )
525 dest_and_roots.extend(
526 (dest_path, install_context[pkg.name].fs_root) for pkg in match.into
527 )
528 return dest_and_roots
531def _resolve_matches(
532 matches: list[PathMatch],
533 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
534 install_context: "InstallRuleContext",
535) -> Iterator[tuple[PathMatch, Sequence[tuple[str, "InMemoryVirtualPathBase"]]]]:
536 dest_and_roots: Sequence[tuple[str, "InMemoryVirtualPathBase"]]
537 if callable(dest_paths):
538 compute_dest_path = dest_paths
539 for match in matches:
540 dpath = compute_dest_path(match)
541 if dpath.endswith("/"): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true
542 raise ValueError(
543 f'Provided destination for "{match.path.path}" ended with "/" ("{dpath}"), which it must not!'
544 )
545 dest_and_roots = [
546 (dpath, install_context[pkg.name].fs_root) for pkg in match.into
547 ]
548 yield match, dest_and_roots
549 else:
550 for match in matches:
551 dest_and_roots = _resolve_dest_paths(
552 match,
553 dest_paths,
554 install_context,
555 )
556 yield match, dest_and_roots
559class InstallRule(DebputyDispatchableType):
560 __slots__ = (
561 "_already_matched",
562 "_exact_match_request",
563 "_condition",
564 "_match_filter",
565 "_definition_source",
566 )
568 def __init__(
569 self,
570 condition: ManifestCondition | None,
571 definition_source: str,
572 *,
573 match_filter: Callable[["VirtualPath"], bool] | None = None,
574 ) -> None:
575 super().__init__()
576 self._condition = condition
577 self._definition_source = definition_source
578 self._match_filter = match_filter
580 def _check_single_match(
581 self, source: FileSystemMatchRule, matches: list[PathMatch]
582 ) -> None:
583 seen_pkgs = set[BinaryPackage]()
584 for m in matches:
585 problem_pkgs = seen_pkgs & m.into
586 if problem_pkgs: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true
587 pkg_names = ", ".join(sorted(p.name for p in problem_pkgs))
588 _error(
589 f'The pattern "{source.raw_match_rule}" matched multiple entries for the packages: {pkg_names}.'
590 " However, it should match exactly one item. Please tighten the pattern defined"
591 f" in {self._definition_source}."
592 )
593 seen_pkgs.update(m.into)
595 def _match_pattern(
596 self,
597 path_matcher: SourcePathMatcher,
598 fs_match_rule: FileSystemMatchRule,
599 condition_context: ConditionContext,
600 search_dirs: Sequence[SearchDir],
601 into: frozenset[BinaryPackage],
602 ) -> list[PathMatch]:
603 matched, exclude_counts = path_matcher.find_and_reserve_all_matches(
604 fs_match_rule.match_rule,
605 search_dirs,
606 fs_match_rule.raw_match_rule.endswith("/"),
607 self._match_filter,
608 into,
609 self._definition_source,
610 )
612 already_installed_paths, already_excluded_paths = exclude_counts
614 if into: 614 ↛ 619line 614 didn't jump to line 619 because the condition on line 614 was always true
615 allow_empty_match = all(not p.should_be_acted_on for p in into)
616 else:
617 # discard rules must match provided at least one search dir exist. If none of them
618 # exist, then we assume the discard rule is for a package that will not be built
619 allow_empty_match = any(s.search_dir.is_dir for s in search_dirs)
620 if self._condition is not None and not self._condition.evaluate( 620 ↛ 623line 620 didn't jump to line 623 because the condition on line 620 was never true
621 condition_context
622 ):
623 allow_empty_match = True
625 if not matched and not allow_empty_match:
626 search_dir_text = ", ".join(x.search_dir.fs_path for x in search_dirs)
627 if already_excluded_paths and already_installed_paths: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 total_paths = already_excluded_paths + already_installed_paths
629 msg = (
630 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
631 f" {total_paths} path(s) already been matched previously either by install or"
632 f" exclude rules. If you wanted to install some of these paths into multiple"
633 f" packages, please tweak the definition that installed them to install them"
634 f' into multiple packages (usually change "into: foo" to "into: [foo, bar]".'
635 f" If you wanted to install these paths and exclude rules are getting in your"
636 f" way, then please move this install rule before the exclusion rule that causes"
637 f" issue or, in case of built-in excludes, list the paths explicitly (without"
638 f" using patterns). Source for this issue is {self._definition_source}. Match rule:"
639 f" {fs_match_rule.match_rule.describe_match_exact()}"
640 )
641 elif already_excluded_paths: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 msg = (
643 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
644 f" {already_excluded_paths} path(s) that have been excluded."
645 " If you wanted to install some of these paths, please move the install rule"
646 " before the exclusion rule or, in case of built-in excludes, list the paths explicitly"
647 f" (without using patterns). Source for this issue is {self._definition_source}. Match rule:"
648 f" {fs_match_rule.match_rule.describe_match_exact()}"
649 )
650 elif already_installed_paths: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 msg = (
652 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} after ignoring"
653 f" {already_installed_paths} path(s) already been matched previously."
654 " If you wanted to install some of these paths into multiple packages,"
655 f" please tweak the definition that installed them to install them into"
656 f' multiple packages (usually change "into: foo" to "into: [foo, bar]".'
657 f" Source for this issue is {self._definition_source}. Match rule:"
658 f" {fs_match_rule.match_rule.describe_match_exact()}"
659 )
660 else:
661 # TODO: Try harder to find the match and point out possible typos
662 msg = (
663 f"There were no matches for {fs_match_rule.raw_match_rule} in {search_dir_text} (definition:"
664 f" {self._definition_source}). Match rule: {fs_match_rule.match_rule.describe_match_exact()}"
665 )
666 raise NoMatchForInstallPatternError(
667 msg,
668 fs_match_rule,
669 search_dirs,
670 self._definition_source,
671 )
672 return matched
674 def _install_matches(
675 self,
676 path_matcher: SourcePathMatcher,
677 matches: list[PathMatch],
678 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
679 install_context: "InstallRuleContext",
680 into: frozenset[BinaryPackage],
681 condition_context: ConditionContext,
682 ) -> None:
683 if (
684 self._condition is not None
685 and not self._condition.evaluate(condition_context)
686 ) or not any(p.should_be_acted_on for p in into):
687 # Rule is disabled; skip all its actions - also allow empty matches
688 # for this particular case. Though, we do go through the match and
689 # ensure all paths are reserved correctly such that the behavior
690 # between `binary-arch` vs `binary-indep` vs `binary` is consistent
691 for match in matches:
692 self._reserve_recursively(
693 path_matcher,
694 match,
695 into,
696 )
698 return
700 if not matches: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 raise ValueError("matches must not be empty")
703 for match, dest_paths_and_roots in _resolve_matches(
704 matches,
705 dest_paths,
706 install_context,
707 ):
708 install_recursively_into_dirs = []
709 for dest, fs_root in dest_paths_and_roots:
710 dir_part, basename = os.path.split(dest)
711 # We do not associate these with the FS path. First off,
712 # it is complicated to do in most cases (indeed, debhelper
713 # does not preserve these directories either) and secondly,
714 # it is "only" mtime and mode - mostly irrelevant as the
715 # directory is 99.9% likely to be 0755 (we are talking
716 # directories like "/usr", "/usr/share").
717 dir_path = fs_root.mkdirs(dir_part)
718 existing_path = dir_path.get(basename)
720 if match.path.is_dir:
721 if existing_path is not None and not existing_path.is_dir: 721 ↛ 722line 721 didn't jump to line 722 because the condition on line 721 was never true
722 existing_path.unlink()
723 existing_path = None
724 current_dir = existing_path
726 if current_dir is None: 726 ↛ 730line 726 didn't jump to line 730 because the condition on line 726 was always true
727 current_dir = dir_path.mkdir(
728 basename, reference_path=match.path
729 )
730 install_recursively_into_dirs.append(current_dir)
731 else:
732 if existing_path is not None and existing_path.is_dir: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 _error(
734 f"Cannot install {match.path} ({match.path.fs_path}) as {dest}. That path already exist"
735 f" and is a directory. This error was triggered via {self._definition_source}."
736 )
738 if match.path.is_symlink:
739 dir_path.add_symlink(
740 basename, match.path.readlink(), reference_path=match.path
741 )
742 else:
743 dir_path.insert_file_from_fs_path(
744 basename,
745 match.path.fs_path,
746 follow_symlinks=False,
747 use_fs_path_mode=True,
748 reference_path=match.path,
749 )
750 if install_recursively_into_dirs:
751 self._install_dir_recursively(
752 path_matcher,
753 install_recursively_into_dirs,
754 match,
755 into,
756 )
758 def _reserve_recursively(
759 self,
760 path_matcher: SourcePathMatcher,
761 match: PathMatch,
762 into: frozenset[BinaryPackage],
763 ) -> None:
764 direct_matched_path = match.path
765 path_matcher.reserve(
766 direct_matched_path,
767 into,
768 self._definition_source,
769 is_exact_match=match.is_exact_match,
770 )
771 if not direct_matched_path.is_dir: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true
772 return
773 stack = list(direct_matched_path.iterdir())
774 while stack:
775 current_path = stack.pop()
776 path_matcher.reserve(
777 current_path,
778 into,
779 self._definition_source,
780 is_exact_match=False,
781 )
782 if current_path.is_dir:
783 stack.extend(current_path.iterdir())
785 def _install_dir_recursively(
786 self,
787 path_matcher: SourcePathMatcher,
788 parent_dirs: Sequence[InMemoryVirtualPathBase],
789 match: PathMatch,
790 into: frozenset[BinaryPackage],
791 ) -> None:
792 stack = [
793 (parent_dirs, e)
794 for e in match.path.iterdir()
795 if not path_matcher.is_reserved(e)
796 ]
798 while stack:
799 current_dirs, dir_entry = stack.pop()
800 path_matcher.reserve(
801 dir_entry,
802 into,
803 self._definition_source,
804 is_exact_match=False,
805 )
806 if dir_entry.is_dir: 806 ↛ 807line 806 didn't jump to line 807 because the condition on line 806 was never true
807 new_dirs = [
808 d.mkdir(dir_entry.name, reference_path=dir_entry)
809 for d in current_dirs
810 ]
811 stack.extend(
812 (new_dirs, de)
813 for de in dir_entry.iterdir()
814 if not path_matcher.is_reserved(de)
815 )
816 elif dir_entry.is_symlink:
817 for current_dir in current_dirs:
818 current_dir.add_symlink(
819 dir_entry.name,
820 dir_entry.readlink(),
821 reference_path=dir_entry,
822 )
823 elif dir_entry.is_file: 823 ↛ 833line 823 didn't jump to line 833 because the condition on line 823 was always true
824 for current_dir in current_dirs:
825 current_dir.insert_file_from_fs_path(
826 dir_entry.name,
827 dir_entry.fs_path,
828 use_fs_path_mode=True,
829 follow_symlinks=False,
830 reference_path=dir_entry,
831 )
832 else:
833 _error(
834 f"Unsupported file type: {dir_entry.fs_path} - neither a file, directory or symlink"
835 )
837 def perform_install(
838 self,
839 path_matcher: SourcePathMatcher,
840 install_context: InstallRuleContext,
841 condition_context: ConditionContext,
842 ) -> None:
843 raise NotImplementedError
845 @classmethod
846 def install_as(
847 cls,
848 source: FileSystemMatchRule,
849 dest_path: str,
850 into: frozenset[BinaryPackage],
851 definition_source: str,
852 condition: ManifestCondition | None,
853 ) -> "InstallRule":
854 return GenericInstallationRule(
855 [source],
856 [(dest_path, False)],
857 into,
858 condition,
859 definition_source,
860 require_single_match=True,
861 )
863 @classmethod
864 def install_dest(
865 cls,
866 sources: Sequence[FileSystemMatchRule],
867 dest_dir: str | None,
868 into: frozenset[BinaryPackage],
869 definition_source: str,
870 condition: ManifestCondition | None,
871 ) -> "InstallRule":
872 if dest_dir is None:
873 dest_dir = "{dirname}/{basename}"
874 else:
875 dest_dir = os.path.join(dest_dir, "{basename}")
876 return GenericInstallationRule(
877 sources,
878 [(dest_dir, True)],
879 into,
880 condition,
881 definition_source,
882 )
884 @classmethod
885 def install_multi_as(
886 cls,
887 source: FileSystemMatchRule,
888 dest_paths: Sequence[str],
889 into: frozenset[BinaryPackage],
890 definition_source: str,
891 condition: ManifestCondition | None,
892 ) -> "InstallRule":
893 if len(dest_paths) < 2: 893 ↛ 894line 893 didn't jump to line 894 because the condition on line 893 was never true
894 raise ValueError(
895 "Please use `install_as` when there is less than 2 dest path"
896 )
897 dps = tuple((dp, False) for dp in dest_paths)
898 return GenericInstallationRule(
899 [source],
900 dps,
901 into,
902 condition,
903 definition_source,
904 require_single_match=True,
905 )
907 @classmethod
908 def install_multi_dest(
909 cls,
910 sources: Sequence[FileSystemMatchRule],
911 dest_dirs: Sequence[str],
912 into: frozenset[BinaryPackage],
913 definition_source: str,
914 condition: ManifestCondition | None,
915 ) -> "InstallRule":
916 if len(dest_dirs) < 2: 916 ↛ 917line 916 didn't jump to line 917 because the condition on line 916 was never true
917 raise ValueError(
918 "Please use `install_dest` when there is less than 2 dest dir"
919 )
920 dest_paths = tuple((os.path.join(dp, "{basename}"), True) for dp in dest_dirs)
921 return GenericInstallationRule(
922 sources,
923 dest_paths,
924 into,
925 condition,
926 definition_source,
927 )
929 @classmethod
930 def install_doc(
931 cls,
932 sources: Sequence[FileSystemMatchRule],
933 dest_dir: str | None,
934 into: frozenset[BinaryPackage],
935 definition_source: str,
936 condition: ManifestCondition | None,
937 ) -> "InstallRule":
938 cond: ManifestCondition = _BUILD_DOCS_BDO
939 if condition is not None:
940 cond = ManifestCondition.all_of([cond, condition])
941 if dest_dir is None:
942 dest_dir = "usr/share/doc/{doc_main_package_name}/{basename}"
943 return GenericInstallationRule(
944 sources,
945 [(dest_dir, True)],
946 into,
947 cond,
948 definition_source,
949 )
951 def _dest_path_resolver(match: PathMatch) -> str:
952 return os.path.join(dest_dir, match.path.name)
954 return GenericInstallationRule(
955 sources,
956 _dest_path_resolver,
957 into,
958 cond,
959 definition_source,
960 )
962 @classmethod
963 def install_doc_as(
964 cls,
965 source: FileSystemMatchRule,
966 dest_path: str,
967 into: frozenset[BinaryPackage],
968 definition_source: str,
969 condition: ManifestCondition | None,
970 ) -> "InstallRule":
971 cond: ManifestCondition = _BUILD_DOCS_BDO
972 if condition is not None:
973 cond = ManifestCondition.all_of([cond, condition])
975 return GenericInstallationRule(
976 [source],
977 [(dest_path, False)],
978 into,
979 cond,
980 definition_source,
981 require_single_match=True,
982 )
984 @classmethod
985 def install_examples(
986 cls,
987 sources: Sequence[FileSystemMatchRule],
988 into: frozenset[BinaryPackage],
989 definition_source: str,
990 condition: ManifestCondition | None,
991 ) -> "InstallRule":
992 cond: ManifestCondition = _BUILD_DOCS_BDO
993 if condition is not None: 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true
994 cond = ManifestCondition.all_of([cond, condition])
995 return GenericInstallationRule(
996 sources,
997 [("usr/share/doc/{doc_main_package_name}/examples/{basename}", True)],
998 into,
999 cond,
1000 definition_source,
1001 )
1003 @classmethod
1004 def install_man(
1005 cls,
1006 sources: Sequence[FileSystemMatchRule],
1007 into: frozenset[BinaryPackage],
1008 section: int | None,
1009 language: str | None,
1010 definition_source: str,
1011 condition: ManifestCondition | None,
1012 ) -> "InstallRule":
1013 cond: ManifestCondition = _BUILD_DOCS_BDO
1014 if condition is not None: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true
1015 cond = ManifestCondition.all_of([cond, condition])
1017 dest_path_computer = _dest_path_for_manpage(
1018 section, language, definition_source
1019 )
1021 return GenericInstallationRule(
1022 sources,
1023 dest_path_computer,
1024 into,
1025 cond,
1026 definition_source,
1027 match_filter=lambda m: not m.is_file,
1028 )
1030 @classmethod
1031 def discard_paths(
1032 cls,
1033 paths: Sequence[FileSystemMatchRule],
1034 definition_source: str,
1035 condition: ManifestCondition | None,
1036 *,
1037 limit_to: Sequence[FileSystemExactMatchRule] | None = None,
1038 ) -> "InstallRule":
1039 return DiscardRule(
1040 paths,
1041 condition,
1042 tuple(limit_to) if limit_to is not None else tuple(),
1043 definition_source,
1044 )
1047class PPFInstallRule(InstallRule):
1048 __slots__ = (
1049 "_ppfs",
1050 "_substitution",
1051 "_into",
1052 )
1054 # noinspection PyMissingConstructor
1055 def __init__(
1056 self,
1057 into: BinaryPackage,
1058 substitution: Substitution,
1059 ppfs: Sequence["PackagerProvidedFile"],
1060 ) -> None:
1061 run_in_context_of_plugin(
1062 "debputy",
1063 super().__init__,
1064 None,
1065 "<built-in; PPF install rule>",
1066 )
1067 self._substitution = substitution
1068 self._ppfs = ppfs
1069 self._into = into
1071 def perform_install(
1072 self,
1073 path_matcher: SourcePathMatcher,
1074 install_context: InstallRuleContext,
1075 condition_context: ConditionContext,
1076 ) -> None:
1077 binary_install_context = install_context[self._into.name]
1078 fs_root = binary_install_context.fs_root
1079 for ppf in self._ppfs:
1080 source_path = ppf.path.fs_path
1081 dest_dir, name = ppf.compute_dest()
1082 dir_path = fs_root.mkdirs(dest_dir)
1084 dir_path.insert_file_from_fs_path(
1085 name,
1086 source_path,
1087 follow_symlinks=True,
1088 use_fs_path_mode=False,
1089 mode=ppf.definition.default_mode,
1090 )
1093class GenericInstallationRule(InstallRule):
1094 __slots__ = (
1095 "_sources",
1096 "_into",
1097 "_dest_paths",
1098 "_require_single_match",
1099 )
1101 def __init__(
1102 self,
1103 sources: Sequence[FileSystemMatchRule],
1104 dest_paths: Sequence[tuple[str, bool]] | Callable[[PathMatch], str],
1105 into: frozenset[BinaryPackage],
1106 condition: ManifestCondition | None,
1107 definition_source: str,
1108 *,
1109 require_single_match: bool = False,
1110 match_filter: Callable[["VirtualPath"], bool] | None = None,
1111 ) -> None:
1112 super().__init__(
1113 condition,
1114 definition_source,
1115 match_filter=match_filter,
1116 )
1117 self._sources = sources
1118 self._into = into
1119 self._dest_paths = dest_paths
1120 self._require_single_match = require_single_match
1121 if self._require_single_match and len(sources) != 1: 1121 ↛ 1122line 1121 didn't jump to line 1122 because the condition on line 1121 was never true
1122 raise ValueError("require_single_match implies sources must have len 1")
1124 def perform_install(
1125 self,
1126 path_matcher: SourcePathMatcher,
1127 install_context: InstallRuleContext,
1128 condition_context: ConditionContext,
1129 ) -> None:
1130 for source in self._sources:
1131 matches = self._match_pattern(
1132 path_matcher,
1133 source,
1134 condition_context,
1135 install_context.search_dirs,
1136 self._into,
1137 )
1138 if self._require_single_match and len(matches) > 1:
1139 self._check_single_match(source, matches)
1140 self._install_matches(
1141 path_matcher,
1142 matches,
1143 self._dest_paths,
1144 install_context,
1145 self._into,
1146 condition_context,
1147 )
1150class DiscardRule(InstallRule):
1151 __slots__ = ("_fs_match_rules", "_limit_to")
1153 def __init__(
1154 self,
1155 fs_match_rules: Sequence[FileSystemMatchRule],
1156 condition: ManifestCondition | None,
1157 limit_to: Sequence[FileSystemExactMatchRule],
1158 definition_source: str,
1159 ) -> None:
1160 super().__init__(condition, definition_source)
1161 self._fs_match_rules = fs_match_rules
1162 self._limit_to = limit_to
1164 def perform_install(
1165 self,
1166 path_matcher: SourcePathMatcher,
1167 install_context: InstallRuleContext,
1168 condition_context: ConditionContext,
1169 ) -> None:
1170 limit_to = self._limit_to
1171 if limit_to:
1172 matches = {x.match_rule.path for x in limit_to}
1173 search_dirs: Sequence[SearchDir] = tuple(
1174 s
1175 for s in install_context.search_dirs
1176 if s.search_dir.fs_path in matches
1177 )
1178 if len(limit_to) != len(search_dirs):
1179 m = matches.difference(s.search_dir.fs_path for s in search_dirs)
1180 paths = ":".join(m)
1181 _error(
1182 f"The discard rule defined at {self._definition_source} mentions the following"
1183 f" search directories that were not known to debputy: {paths}."
1184 " Either the search dir is missing somewhere else or it should be removed from"
1185 " the discard rule."
1186 )
1187 else:
1188 search_dirs = install_context.search_dirs
1190 for fs_match_rule in self._fs_match_rules:
1191 self._match_pattern(
1192 path_matcher,
1193 fs_match_rule,
1194 condition_context,
1195 search_dirs,
1196 into=frozenset(),
1197 )