Coverage for src/debputy/util.py: 67%
579 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-26 19:30 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-26 19:30 +0000
1import argparse
2import collections
3import enum
4import functools
5import glob
6import logging
7import os
8import re
9import shutil
10import subprocess
11import sys
12import time
13from itertools import zip_longest
14from pathlib import Path
15from typing import NoReturn, TYPE_CHECKING, Optional, TypeVar, Literal, Any
16from collections.abc import Iterator, Iterable, Sequence, Mapping
18from debian.deb822 import Deb822
20from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
21from debputy.exceptions import DebputySubstitutionError
22from debputy.version import debputy_doc_root_dir
24try:
25 from Levenshtein import distance
26except ImportError:
28 CAN_DETECT_TYPOS = False
30 def detect_possible_typo(
31 provided_value: str,
32 known_values: Iterable[str],
33 *,
34 max_edit_distance: int = 2,
35 ) -> Sequence[str]:
36 return ()
38else:
40 CAN_DETECT_TYPOS = True
42 def detect_possible_typo(
43 provided_value: str,
44 known_values: Iterable[str],
45 *,
46 max_edit_distance: int = 2,
47 ) -> Sequence[str]:
48 k_len = len(provided_value)
49 candidates = []
50 for known_value in known_values:
51 if abs(k_len - len(known_value)) > max_edit_distance:
52 continue
53 d = distance(provided_value, known_value)
54 if d > max_edit_distance:
55 continue
56 candidates.append(known_value)
57 return candidates
60if TYPE_CHECKING:
61 from debputy.types import EnvironmentModification
62 from debputy.packages import BinaryPackage
63 from debputy.substitution import Substitution
66T = TypeVar("T")
69SLASH_PRUNE = re.compile("//+")
70PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
71PKGVERSION_REGEX = re.compile(
72 r"""
73 (?: \d+ : )? # Optional epoch
74 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
75 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
76""",
77 re.VERBOSE | re.ASCII,
78)
81class PackageTypeSelector(enum.Flag):
82 DEB = enum.auto()
83 UDEB = enum.auto()
84 Singleton = Literal[DEB, UDEB]
85 ALL = DEB | UDEB
87 def __str__(self) -> str:
88 # 0 or ALL have no name, but only singletons are enumerated.
89 l = []
90 for p in self:
91 name = p.name
92 assert name is not None, repr(p)
93 l.append(name.lower())
94 return ", ".join(l)
96 @staticmethod
97 def singleton(s: str) -> Singleton:
98 """Like PackageTypeSelector[s.lower()], but typed as a
99 singleton and rejecting 'DeB' and 'ALL'."""
100 match s:
101 case "deb":
102 return PackageTypeSelector.DEB
103 case "udeb": 103 ↛ 105line 103 didn't jump to line 105 because the pattern on line 103 always matched
104 return PackageTypeSelector.UDEB
105 case _:
106 raise KeyError(s)
109POSTINST_DEFAULT_CONDITION = (
110 '[ "$1" = "configure" ]'
111 ' || [ "$1" = "abort-upgrade" ]'
112 ' || [ "$1" = "abort-deconfigure" ]'
113 ' || [ "$1" = "abort-remove" ]'
114)
117_SPACE_RE = re.compile(r"\s")
118_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=")
119_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
120_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""")
121_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
122_DEFAULT_LOGGER: logging.Logger | None = None
123_STDOUT_HANDLER: logging.StreamHandler[Any] | None = None
124_STDERR_HANDLER: logging.StreamHandler[Any] | None = None
125PRINT_COMMAND = logging.INFO + 3
126PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3
127TRACE_LOG = logging.DEBUG + 3
129# Map them back to `INFO`. The names must be unique so the prefix is stripped.
130logging.addLevelName(PRINT_COMMAND, "__INFO")
131logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO")
132logging.addLevelName(TRACE_LOG, "TRACE")
135def assume_not_none(x: T | None) -> T:
136 if x is None: # pragma: no cover
137 raise ValueError(
138 'Internal error: None was given, but the receiver assumed "not None" here'
139 )
140 return x
143def _non_verbose_info(msg: str) -> None:
144 global _DEFAULT_LOGGER
145 logger = _DEFAULT_LOGGER
146 if logger is not None:
147 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg)
150def _info(msg: str) -> None:
151 global _DEFAULT_LOGGER
152 logger = _DEFAULT_LOGGER
153 if logger:
154 logger.info(msg)
155 # No fallback print for info
158def _is_trace_log_enabled() -> bool:
159 global _DEFAULT_LOGGER
160 logger = _DEFAULT_LOGGER
161 return logger is not None and logger.isEnabledFor(TRACE_LOG)
164def _trace_log(msg: str) -> None:
165 global _DEFAULT_LOGGER
166 logger = _DEFAULT_LOGGER
167 if logger:
168 logger.log(TRACE_LOG, msg)
169 # No fallback print for this level
172def _is_debug_log_enabled() -> bool:
173 global _DEFAULT_LOGGER
174 logger = _DEFAULT_LOGGER
175 return logger is not None and logger.isEnabledFor(logging.DEBUG)
178def _debug_log(msg: str) -> None:
179 global _DEFAULT_LOGGER
180 logger = _DEFAULT_LOGGER
181 if logger:
182 logger.debug(msg)
183 # No fallback print for this level
186def _error(msg: str, *, prog: str | None = None) -> "NoReturn":
187 global _DEFAULT_LOGGER
188 logger = _DEFAULT_LOGGER
189 if logger:
190 logger.error(msg)
191 else:
192 me = os.path.basename(sys.argv[0]) if prog is None else prog
193 print(
194 f"{me}: error: {msg}",
195 file=sys.stderr,
196 )
197 sys.exit(1)
200def _warn(msg: str, *, prog: str | None = None) -> None:
201 global _DEFAULT_LOGGER
202 logger = _DEFAULT_LOGGER
203 if logger:
204 logger.warning(msg)
205 else:
206 me = os.path.basename(sys.argv[0]) if prog is None else prog
208 print(
209 f"{me}: warning: {msg}",
210 file=sys.stderr,
211 )
214class ColorizedArgumentParser(argparse.ArgumentParser):
215 def error(self, message: str) -> NoReturn:
216 self.print_usage(sys.stderr)
217 _error(message, prog=self.prog)
220def ensure_dir(path: str) -> None:
221 if not os.path.isdir(path): 221 ↛ exitline 221 didn't return from function 'ensure_dir' because the condition on line 221 was always true
222 os.makedirs(path, mode=0o755, exist_ok=True)
225def _clean_path(orig_p: str, allow_and_keep_upward_segments: bool = False) -> str:
226 p = SLASH_PRUNE.sub("/", orig_p)
227 if "." in p: 227 ↛ 250line 227 didn't jump to line 250 because the condition on line 227 was always true
228 path_base = p
229 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
230 # of a normalized path to be a no-op.
231 if path_base.startswith("./"):
232 path_base = path_base[2:]
233 assert path_base
234 if allow_and_keep_upward_segments:
235 stack = []
236 for segment in path_base.split("/"):
237 if segment == ".": 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 continue
239 stack.append(segment)
240 p = "/".join(stack)
241 if path_base.startswith("/"): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 p = "/" + p
243 else:
244 for segment in path_base.split("/"):
245 if segment in (".", ".."):
246 raise ValueError(
247 'Please provide paths that are normalized (i.e., no ".." or ".").'
248 f' Offending input "{orig_p}"'
249 )
250 return p
253def _normalize_path(
254 path: str,
255 with_prefix: bool = True,
256 allow_and_keep_upward_segments: bool = False,
257) -> str:
258 path = path.strip("/")
259 if not path or path == ".": 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 return "."
261 if "//" in path or "." in path:
262 path = _clean_path(
263 path,
264 allow_and_keep_upward_segments=allow_and_keep_upward_segments,
265 )
266 if with_prefix ^ path.startswith("./"):
267 if with_prefix: 267 ↛ 270line 267 didn't jump to line 270 because the condition on line 267 was always true
268 path = "./" + path
269 else:
270 path = path[2:]
271 return path
274def _normalize_link_target(link_target: str) -> str:
275 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
276 result: list[str] = []
277 for segment in link_target.split("/"):
278 if segment in (".", ""):
279 # Ignore these - the empty string is generally a trailing slash
280 continue
281 if segment == "..":
282 # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
283 if result: 283 ↛ 277line 283 didn't jump to line 277 because the condition on line 283 was always true
284 result.pop()
285 else:
286 result.append(segment)
287 return "/".join(result)
290def manifest_format_doc(anchor: str) -> str:
291 manifest_format = f"{debputy_doc_root_dir()}/MANIFEST-FORMAT.md"
292 return f"{manifest_format}#{anchor}" if anchor else manifest_format
295def _backslash_escape(m: re.Match[str]) -> str:
296 return "\\" + m.group(0)
299def _escape_shell_word(w: str) -> str:
300 if _SPACE_RE.search(w):
301 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None:
302 s = m.span(0)
303 assert s[0] == 0
304 prefix = w[0 : s[1]]
305 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :])
306 return f'{prefix}"{escaped_value}"'
307 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
308 return f'"{w}"'
309 return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
312def escape_shell(*args: str) -> str:
313 return " ".join(_escape_shell_word(w) for w in args)
316def render_command(
317 *args: str,
318 cwd: str | None = None,
319 env_mod: Optional["EnvironmentModification"] = None,
320) -> str:
321 env_mod_prefix = ""
322 if env_mod: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 env_mod_parts = []
324 if bool(env_mod.removals):
325 env_mod_parts.append("env")
326 if cwd is not None:
327 env_mod_parts.append(f"--chdir={escape_shell(cwd)}")
328 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals)
329 env_mod_parts.extend(
330 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements
331 )
333 chdir_prefix = ""
334 if cwd is not None and cwd != ".": 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 chdir_prefix = f"cd {escape_shell(cwd)} && "
336 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}"
339def print_command(
340 *args: str,
341 cwd: str | None = None,
342 env_mod: Optional["EnvironmentModification"] = None,
343 print_at_log_level: int = PRINT_COMMAND,
344) -> None:
345 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level):
346 return
348 rendered_cmd = render_command(
349 *args,
350 cwd=cwd,
351 env_mod=env_mod,
352 )
353 global _STDOUT_HANDLER
354 handler = _STDOUT_HANDLER
355 if handler is not None: 355 ↛ 360line 355 didn't jump to line 360 because the condition on line 355 was always true
356 handler.flush()
357 # Ensure command is output immediately so it is hanging after its output.
358 # TODO: This should `file` in case something in debputy redirects stdout
359 # (nothing does that for now)
360 print(f" {rendered_cmd}")
361 sys.stdout.flush()
364def run_command(
365 *args: str,
366 cwd: str | None = None,
367 env: Mapping[str, str] | None = None,
368 env_mod: Optional["EnvironmentModification"] = None,
369 print_at_log_level: int = PRINT_COMMAND,
370 raise_file_not_found_on_missing_command: bool = False,
371) -> None:
372 print_command(
373 *args,
374 cwd=cwd,
375 env_mod=env_mod,
376 print_at_log_level=print_at_log_level,
377 )
378 if env_mod:
379 if env is None:
380 env = os.environ
381 env = env_mod.compute_env(env)
382 if env is os.environ:
383 env = None
384 try:
385 subprocess.check_call(args, cwd=cwd, env=env)
386 # At least "clean_logic.py" relies on catching FileNotFoundError
387 except KeyboardInterrupt:
388 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}")
389 except FileNotFoundError:
390 if raise_file_not_found_on_missing_command:
391 raise
392 if "/" in args[0]:
393 _error(f"Could not run {escape_shell(args[0])}: Path does not exist")
394 # Use the `command not found` to aid existing log pattern
395 _error(f"{escape_shell(args[0])}: command not found")
396 except subprocess.CalledProcessError as e:
397 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}")
400def run_build_system_command(
401 *args: str,
402 cwd: str | None = None,
403 env: Mapping[str, str] | None = None,
404 env_mod: Optional["EnvironmentModification"] = None,
405 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND,
406 raise_file_not_found_on_missing_command: bool = False,
407) -> None:
408 run_command(
409 *args,
410 cwd=cwd,
411 env=env,
412 env_mod=env_mod,
413 print_at_log_level=print_at_log_level,
414 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command,
415 )
418def debian_policy_normalize_symlink_target(
419 link_path: str,
420 link_target: str,
421 normalize_link_path: bool = False,
422) -> str:
423 if normalize_link_path:
424 link_path = _normalize_path(link_path)
425 elif not link_path.startswith("./"): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 raise ValueError("Link part was not normalized")
428 link_path = link_path[2:]
430 if not link_target.startswith("/"):
431 link_target = "/" + os.path.dirname(link_path) + "/" + link_target
433 link_path_parts = link_path.split("/")
434 link_target_parts = [
435 s for s in _normalize_link_target(link_target).split("/") if s != "."
436 ]
438 assert link_path_parts
440 if link_target_parts and link_path_parts[0] == link_target_parts[0]:
441 # Per Debian Policy, must be relative
443 # First determine the length of the overlap
444 common_segment_count = 1
445 shortest_path_length = min(len(link_target_parts), len(link_path_parts))
446 while (
447 common_segment_count < shortest_path_length
448 and link_target_parts[common_segment_count]
449 == link_path_parts[common_segment_count]
450 ):
451 common_segment_count += 1
453 if common_segment_count == shortest_path_length and len(
454 link_path_parts
455 ) - 1 == len(link_target_parts):
456 normalized_link_target = "."
457 else:
458 up_dir_count = len(link_path_parts) - 1 - common_segment_count
459 normalized_link_target_parts = []
460 if up_dir_count:
461 up_dir_part = "../" * up_dir_count
462 # We overshoot with a single '/', so rstrip it away
463 normalized_link_target_parts.append(up_dir_part.rstrip("/"))
464 # Add the relevant down parts
465 normalized_link_target_parts.extend(
466 link_target_parts[common_segment_count:]
467 )
469 normalized_link_target = "/".join(normalized_link_target_parts)
470 else:
471 # Per Debian Policy, must be absolute
472 normalized_link_target = "/" + "/".join(link_target_parts)
474 return normalized_link_target
477def has_glob_magic(pattern: str) -> bool:
478 return glob.has_magic(pattern) or "{" in pattern
481def glob_escape(replacement_value: str) -> str:
482 if not glob.has_magic(replacement_value) or "{" not in replacement_value:
483 return replacement_value
484 return (
485 replacement_value.replace("[", "[[]")
486 .replace("]", "[]]")
487 .replace("*", "[*]")
488 .replace("?", "[?]")
489 .replace("{", "[{]")
490 .replace("}", "[}]")
491 )
494# TODO: This logic should probably be moved to `python-debian`
495# See bug1120283.py.
496def active_profiles_match(
497 profiles_raw: str,
498 active_build_profiles: set[str] | frozenset[str],
499) -> bool:
500 profiles_raw = profiles_raw.strip()
501 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true
502 raise ValueError(
503 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
504 )
505 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
506 for profile_group_raw in profile_groups:
507 should_process_package = True
508 for profile_name in profile_group_raw.split():
509 negation = False
510 if profile_name[0] == "!":
511 negation = True
512 profile_name = profile_name[1:]
514 matched_profile = profile_name in active_build_profiles
515 if matched_profile == negation:
516 should_process_package = False
517 break
519 if should_process_package:
520 return True
522 return False
525def _parse_build_profiles(build_profiles_raw: str) -> frozenset[frozenset[str]]:
526 profiles_raw = build_profiles_raw.strip()
527 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 raise ValueError(
529 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
530 )
531 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
532 return frozenset(frozenset(g.split()) for g in profile_groups)
535def resolve_source_date_epoch(
536 command_line_value: int | None,
537 *,
538 substitution: Optional["Substitution"] = None,
539) -> int:
540 mtime = command_line_value
541 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
542 sde_raw = os.environ["SOURCE_DATE_EPOCH"]
543 if sde_raw == "":
544 _error("SOURCE_DATE_EPOCH is set but empty.")
545 mtime = int(sde_raw)
546 if mtime is None and substitution is not None:
547 try:
548 sde_raw = substitution.substitute(
549 "{{SOURCE_DATE_EPOCH}}",
550 "Internal resolution",
551 )
552 mtime = int(sde_raw)
553 except (DebputySubstitutionError, ValueError):
554 pass
555 if mtime is None:
556 mtime = int(time.time())
557 os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
558 return mtime
561def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
562 with open(os.path.join(control_root_dir, "control")) as fd:
563 control_file = Deb822(fd)
565 package_name = control_file["Package"]
566 package_version = control_file["Version"]
567 if "Architecture-Variant" in control_file:
568 package_architecture = control_file["Architecture-Variant"]
569 else:
570 package_architecture = control_file["Architecture"]
571 extension = control_file.get("Package-Type") or "deb"
572 if ":" in package_version:
573 package_version = package_version.split(":", 1)[1]
574 if is_udeb:
575 extension = "udeb"
577 return f"{package_name}_{package_version}_{package_architecture}.{extension}"
580_SCRATCH_DIR = None
581_DH_INTEGRATION_MODE = False
584def integrated_with_debhelper() -> None:
585 global _DH_INTEGRATION_MODE
586 _DH_INTEGRATION_MODE = True
589def scratch_dir(*, create_if_not_exists: bool = True) -> str:
590 global _SCRATCH_DIR
591 if _SCRATCH_DIR is not None:
592 return _SCRATCH_DIR
593 debputy_scratch_dir = "debian/.debputy/scratch-dir"
594 is_debputy_dir = True
595 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true
596 _SCRATCH_DIR = debputy_scratch_dir
597 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
599 is_debputy_dir = False
600 else:
601 _SCRATCH_DIR = debputy_scratch_dir
602 if create_if_not_exists: 602 ↛ 606line 602 didn't jump to line 606 because the condition on line 602 was always true
603 ensure_dir(_SCRATCH_DIR)
604 if is_debputy_dir: 604 ↛ 606line 604 didn't jump to line 606 because the condition on line 604 was always true
605 generated_root_directory("debian/.debputy", internal_only=True)
606 return _SCRATCH_DIR
609def generated_root_directory(path: str, *, internal_only: bool = False) -> None:
610 root_dir = Path(path)
611 (root_dir / ".gitignore").write_text("*\n")
612 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule
613 if internal_only: 613 ↛ exitline 613 didn't return from function 'generated_root_directory' because the condition on line 613 was always true
614 (root_dir / "CACHEDIR.TAG").write_bytes(
615 b"Signature: 8a477f597d28d172789f06886806bc55"
616 )
619_RUNTIME_CONTAINER_DIR_KEY: str | None = None
622def generated_content_dir(
623 *,
624 package: Optional["BinaryPackage"] = None,
625 subdir_key: str | None = None,
626) -> str:
627 global _RUNTIME_CONTAINER_DIR_KEY
628 container_dir = _RUNTIME_CONTAINER_DIR_KEY
629 first_run = False
631 if container_dir is None:
632 first_run = True
633 container_dir = f"_pb-{os.getpid()}"
634 _RUNTIME_CONTAINER_DIR_KEY = container_dir
636 directory = os.path.join(scratch_dir(), container_dir)
638 if first_run and os.path.isdir(directory): 638 ↛ 643line 638 didn't jump to line 643 because the condition on line 638 was never true
639 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
640 # see "stale" data.
641 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
642 # reliable enough for that and we do not have an obvious hook for it.
643 shutil.rmtree(directory)
645 directory = os.path.join(
646 directory,
647 "generated-fs-content",
648 f"pkg_{package.name}" if package else "no-package",
649 )
650 if subdir_key is not None: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 directory = os.path.join(directory, subdir_key)
653 os.makedirs(directory, exist_ok=True)
654 return directory
657PerlConfigVars = collections.namedtuple(
658 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"]
659)
660PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
661_PERL_MODULE_DIRS: dict[str, PerlConfigVars] = {}
664@functools.lru_cache(1)
665def _perl_config_data() -> PerlConfigData:
666 d = (
667 subprocess.check_output(
668 [
669 "perl",
670 "-MConfig",
671 "-e",
672 'print "$Config{version}\n$Config{debian_abi}\n"',
673 ]
674 )
675 .decode("utf-8")
676 .splitlines()
677 )
678 return PerlConfigData(*d)
681def _perl_version() -> str:
682 return _perl_config_data().version
685def perlxs_api_dependency() -> str:
686 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
687 # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
688 config = _perl_config_data()
689 if config.debian_abi is not None and config.debian_abi != "":
690 return f"perlapi-{config.debian_abi}"
691 return f"perlapi-{config.version}"
694def resolve_perl_config(
695 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
696 dctrl_bin: Optional["BinaryPackage"],
697) -> PerlConfigVars:
698 global _PERL_MODULE_DIRS
699 if dpkg_architecture_variables.is_cross_compiling: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 if dctrl_bin is not None:
701 arch = dctrl_bin.resolved_architecture
702 ma = dctrl_bin.deb_multiarch
703 else:
704 arch = dpkg_architecture_variables.current_host_arch
705 ma = dpkg_architecture_variables.current_host_multiarch
706 else:
707 # We are de facto using the build-arch version of perl here; be explicit
708 arch = "_build_arch_"
709 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"]
710 config_vars = _PERL_MODULE_DIRS.get(arch)
711 if config_vars is None:
712 cmd = ["perl"]
713 if dpkg_architecture_variables.is_cross_compiling: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true
714 version = _perl_version()
715 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}"
716 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
717 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
718 # cross builds... meh.
719 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")):
720 cmd.append(f"-I{cross_inc_dir}")
721 else:
722 cross_inc_dir = None
723 cmd.extend(
724 [
725 "-MConfig",
726 "-e",
727 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"',
728 ]
729 )
730 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
731 if len(output) != 4: 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true
732 raise ValueError(
733 "Internal error: Unable to determine the perl include directories:"
734 f" Raw output from perl snippet: {output}"
735 )
736 config_vars = PerlConfigVars(
737 vendorlib="/" + _normalize_path(output[0], with_prefix=False),
738 vendorarch="/" + _normalize_path(output[1], with_prefix=False),
739 cross_inc_dir=cross_inc_dir,
740 ld=output[2],
741 path_sep=output[3],
742 )
743 _PERL_MODULE_DIRS[arch] = config_vars
744 return config_vars
747@functools.lru_cache(1)
748def detect_fakeroot() -> bool:
749 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
750 return False
751 env = dict(os.environ)
752 del env["LD_PRELOAD"]
753 try:
754 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
755 except subprocess.CalledProcessError:
756 print(
757 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
758 file=sys.stderr,
759 )
760 return False
763@functools.lru_cache(1)
764def _sc_arg_max() -> int | None:
765 try:
766 return os.sysconf("SC_ARG_MAX")
767 except RuntimeError:
768 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
769 return None
772def _split_xargs_args(
773 static_cmd: Sequence[str],
774 max_args_byte_len: int,
775 varargs: Iterable[str],
776 reuse_list_ok: bool,
777) -> Iterator[list[str]]:
778 static_cmd_len = len(static_cmd)
779 remaining_len = max_args_byte_len
780 pending_args = list(static_cmd)
781 for arg in varargs:
782 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
783 remaining_len -= arg_len
784 if not remaining_len:
785 if len(pending_args) <= static_cmd_len:
786 raise ValueError(
787 f"Could not fit a single argument into the command line !?"
788 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
789 )
790 yield pending_args
791 remaining_len = max_args_byte_len - arg_len
792 if reuse_list_ok:
793 pending_args.clear()
794 pending_args.extend(static_cmd)
795 else:
796 pending_args = list(static_cmd)
797 pending_args.append(arg)
799 if len(pending_args) > static_cmd_len:
800 yield pending_args
803def xargs(
804 static_cmd: Sequence[str],
805 varargs: Iterable[str],
806 *,
807 env: Mapping[str, str] | None = None,
808 reuse_list_ok: bool = False,
809) -> Iterator[list[str]]:
810 max_args_bytes = _sc_arg_max()
811 # len overshoots with one space explaining the -1. The _split_xargs_args
812 # will account for the space for the first argument
813 static_byte_len = (
814 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
815 )
816 if max_args_bytes is not None:
817 if env is None:
818 # +2 for nul bytes after key and value
819 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
820 else:
821 # +2 for nul bytes after key and value
822 static_byte_len += sum(
823 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
824 for k, v in env.items()
825 )
826 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
827 # that)
828 static_byte_len += 2 * 4096
829 else:
830 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
831 max_args_bytes = 20_000
832 remain_len = max_args_bytes - static_byte_len
833 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
836# itertools recipe
837def grouper(
838 iterable: Iterable[T],
839 n: int,
840 *,
841 incomplete: Literal["fill", "strict", "ignore"] = "fill",
842 fillvalue: T | None = None,
843) -> Iterator[tuple[T, ...]]:
844 """Collect data into non-overlapping fixed-length chunks or blocks"""
845 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
846 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
847 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
848 args = [iter(iterable)] * n
849 if incomplete == "fill": 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true
850 return zip_longest(*args, fillvalue=fillvalue)
851 if incomplete == "strict": 851 ↛ 853line 851 didn't jump to line 853 because the condition on line 851 was always true
852 return zip(*args, strict=True)
853 if incomplete == "ignore":
854 return zip(*args)
855 else:
856 raise ValueError("Expected fill, strict, or ignore")
859_LOGGING_SET_UP = False
862def _check_color() -> tuple[bool, bool, str | None]:
863 dpkg_or_default = os.environ.get(
864 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
865 )
866 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
867 bad_request = None
868 if requested_color not in {"auto", "always", "never"}: 868 ↛ 869line 868 didn't jump to line 869 because the condition on line 868 was never true
869 bad_request = requested_color
870 requested_color = "auto"
872 if requested_color == "auto": 872 ↛ 876line 872 didn't jump to line 876 because the condition on line 872 was always true
873 stdout_color = sys.stdout.isatty()
874 stderr_color = sys.stdout.isatty()
875 else:
876 enable = requested_color == "always"
877 stdout_color = enable
878 stderr_color = enable
879 return stdout_color, stderr_color, bad_request
882def program_name() -> str:
883 name = os.path.basename(sys.argv[0])
884 if name.endswith(".py"): 884 ↛ 885line 884 didn't jump to line 885 because the condition on line 884 was never true
885 name = name[:-3]
886 if name == "__main__": 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true
887 name = os.path.basename(os.path.dirname(sys.argv[0]))
888 # FIXME: Not optimal that we have to hardcode these kind of things here
889 if name == "debputy_cmd": 889 ↛ 890line 889 didn't jump to line 890 because the condition on line 889 was never true
890 name = "debputy"
891 return name
894def package_cross_check_precheck(
895 pkg_a: "BinaryPackage",
896 pkg_b: "BinaryPackage",
897) -> tuple[bool, bool]:
898 """Whether these two packages can do content cross-checks
900 :param pkg_a: The first package
901 :param pkg_b: The second package
902 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
903 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
904 that involves binary_package_a. Both can be True and both can be False at the same time, which
905 happens in common cases (arch:all + arch:any cases both to be False as a common example).
906 """
908 # Handle the two most obvious base-cases
909 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
910 return False, False
911 if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
912 return False, False
914 a_may_see_b = True
915 b_may_see_a = True
917 a_bp = pkg_a.fields.get("Build-Profiles", "")
918 b_bp = pkg_b.fields.get("Build-Profiles", "")
920 if a_bp != b_bp:
921 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
922 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
924 # Check for build profiles being identically but just ordered differently.
925 if a_bp_set != b_bp_set:
926 # For simplicity, we let groups cancel each other out. If one side has no clauses
927 # left, then it will always be built when the other is built.
928 #
929 # Eventually, someone will be here with a special case where more complex logic is
930 # required. Good luck to you! Remember to add test cases for it (the existing logic
931 # has some for a reason and if the logic is going to be more complex, it will need
932 # tests cases to assert it fixes the problem and does not regress)
933 if a_bp_set - b_bp_set:
934 a_may_see_b = False
935 if b_bp_set - a_bp_set:
936 b_may_see_a = False
938 if pkg_a.declared_architecture != pkg_b.declared_architecture:
939 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
940 if pkg_a.declared_architecture != "any": 940 ↛ 942line 940 didn't jump to line 942 because the condition on line 940 was always true
941 b_may_see_a = False
942 if pkg_a.declared_architecture != "any": 942 ↛ 945line 942 didn't jump to line 945 because the condition on line 942 was always true
943 a_may_see_b = False
945 return a_may_see_b, b_may_see_a
948def change_log_level(
949 log_level: int,
950) -> None:
951 if _DEFAULT_LOGGER is not None: 951 ↛ 953line 951 didn't jump to line 953 because the condition on line 951 was always true
952 _DEFAULT_LOGGER.setLevel(log_level)
953 logging.getLogger("").setLevel(log_level)
956def current_log_level() -> int | None:
957 if _DEFAULT_LOGGER is not None:
958 return _DEFAULT_LOGGER.level
959 return None
962def setup_logging(
963 *,
964 log_only_to_stderr: bool = False,
965 reconfigure_logging: bool = False,
966) -> None:
967 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
968 if _LOGGING_SET_UP and not reconfigure_logging: 968 ↛ 969line 968 didn't jump to line 969 because the condition on line 968 was never true
969 raise RuntimeError(
970 "Logging has already been configured."
971 " Use reconfigure_logging=True if you need to reconfigure it"
972 )
973 stdout_color, stderr_color, bad_request = _check_color()
974 colors: dict[str, str] | None = None
976 if stdout_color or stderr_color: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 try:
978 import colorlog
980 except ImportError:
981 stdout_color = False
982 stderr_color = False
983 else:
984 colors = dict(colorlog.default_log_colors)
985 # Add our custom levels.
986 colors["_INFO"] = colors["INFO"]
987 colors["__INFO"] = colors["INFO"]
989 if log_only_to_stderr:
990 stdout = sys.stderr
991 stdout_color = stderr_color
992 else:
993 stdout = sys.stdout
995 class LogLevelFilter(logging.Filter):
996 def __init__(self, threshold: int, above: bool):
997 super().__init__()
998 self.threshold = threshold
999 self.above = above
1001 def filter(self, record: logging.LogRecord) -> bool:
1002 if self.above:
1003 return record.levelno >= self.threshold
1004 else:
1005 return record.levelno < self.threshold
1007 color_format = (
1008 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
1009 )
1010 colorless_format = "{name}: {levelnamelower}: {message}"
1012 existing_stdout_handler = _STDOUT_HANDLER
1013 existing_stderr_handler = _STDERR_HANDLER
1015 if stdout_color: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 stdout_handler = colorlog.StreamHandler(stdout)
1017 stdout_handler.setFormatter(
1018 colorlog.ColoredFormatter(
1019 color_format,
1020 style="{",
1021 force_color=True,
1022 log_colors=colors,
1023 )
1024 )
1025 logger = colorlog.getLogger()
1026 if existing_stdout_handler is not None:
1027 logger.removeHandler(existing_stdout_handler)
1028 _STDOUT_HANDLER = stdout_handler
1029 logger.addHandler(stdout_handler)
1030 else:
1031 stdout_handler = logging.StreamHandler(stdout)
1032 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1033 logger = logging.getLogger()
1034 if existing_stdout_handler is not None:
1035 logger.removeHandler(existing_stdout_handler)
1036 _STDOUT_HANDLER = stdout_handler
1037 logger.addHandler(stdout_handler)
1039 if stderr_color: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true
1040 stderr_handler = colorlog.StreamHandler(sys.stderr)
1041 stderr_handler.setFormatter(
1042 colorlog.ColoredFormatter(
1043 color_format,
1044 style="{",
1045 force_color=True,
1046 log_colors=colors,
1047 )
1048 )
1049 logger = logging.getLogger()
1050 if existing_stderr_handler is not None:
1051 logger.removeHandler(existing_stderr_handler)
1052 _STDERR_HANDLER = stderr_handler
1053 logger.addHandler(stderr_handler)
1054 else:
1055 stderr_handler = logging.StreamHandler(sys.stderr)
1056 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1057 logger = logging.getLogger()
1058 if existing_stderr_handler is not None:
1059 logger.removeHandler(existing_stderr_handler)
1060 _STDERR_HANDLER = stderr_handler
1061 logger.addHandler(stderr_handler)
1063 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
1064 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
1066 name = program_name()
1068 old_factory = logging.getLogRecordFactory()
1070 def record_factory(
1071 *args: Any, **kwargs: Any
1072 ) -> logging.LogRecord: # pragma: no cover
1073 record = old_factory(*args, **kwargs)
1074 record.levelname = record.levelname.lstrip("_")
1075 record.levelnamelower = record.levelname.lower()
1076 return record
1078 logging.setLogRecordFactory(record_factory)
1080 logging.getLogger().setLevel(logging.WARN)
1081 _DEFAULT_LOGGER = logging.getLogger(name)
1083 if bad_request: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true
1084 _DEFAULT_LOGGER.warning(
1085 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
1086 ' Resetting to "auto".'
1087 )
1089 _LOGGING_SET_UP = True