Coverage for src/debputy/util.py: 64%
560 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import argparse
2import collections
3import functools
4import glob
5import logging
6import os
7import re
8import shutil
9import subprocess
10import sys
11import time
12from itertools import zip_longest
13from pathlib import Path
14from typing import (
15 NoReturn,
16 TYPE_CHECKING,
17 Union,
18 Set,
19 FrozenSet,
20 Optional,
21 TypeVar,
22 Dict,
23 Literal,
24 Tuple,
25 List,
26 Any,
27)
28from collections.abc import Iterator, Iterable, Sequence, Mapping
30from debian.deb822 import Deb822
32from debputy import DEBPUTY_DOC_ROOT_DIR
33from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
34from debputy.exceptions import DebputySubstitutionError
36try:
37 from Levenshtein import distance
38except ImportError:
40 CAN_DETECT_TYPOS = False
42 def detect_possible_typo(
43 provided_value: str,
44 known_values: Iterable[str],
45 *,
46 max_edit_distance: int = 2,
47 ) -> Sequence[str]:
48 return ()
50else:
52 CAN_DETECT_TYPOS = True
54 def detect_possible_typo(
55 provided_value: str,
56 known_values: Iterable[str],
57 *,
58 max_edit_distance: int = 2,
59 ) -> Sequence[str]:
60 k_len = len(provided_value)
61 candidates = []
62 for known_value in known_values:
63 if abs(k_len - len(known_value)) > max_edit_distance:
64 continue
65 d = distance(provided_value, known_value)
66 if d > max_edit_distance:
67 continue
68 candidates.append(known_value)
69 return candidates
72if TYPE_CHECKING:
73 from debputy.types import EnvironmentModification
74 from debputy.packages import BinaryPackage
75 from debputy.substitution import Substitution
78T = TypeVar("T")
81SLASH_PRUNE = re.compile("//+")
82PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
83PKGVERSION_REGEX = re.compile(
84 r"""
85 (?: \d+ : )? # Optional epoch
86 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
87 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
88""",
89 re.VERBOSE | re.ASCII,
90)
91DEFAULT_PACKAGE_TYPE = "deb"
92DBGSYM_PACKAGE_TYPE = "deb"
93UDEB_PACKAGE_TYPE = "udeb"
95POSTINST_DEFAULT_CONDITION = (
96 '[ "$1" = "configure" ]'
97 ' || [ "$1" = "abort-upgrade" ]'
98 ' || [ "$1" = "abort-deconfigure" ]'
99 ' || [ "$1" = "abort-remove" ]'
100)
103_SPACE_RE = re.compile(r"\s")
104_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=")
105_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
106_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""")
107_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
108_DEFAULT_LOGGER: logging.Logger | None = None
109_STDOUT_HANDLER: logging.StreamHandler[Any] | None = None
110_STDERR_HANDLER: logging.StreamHandler[Any] | None = None
111PRINT_COMMAND = logging.INFO + 3
112PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3
113TRACE_LOG = logging.DEBUG + 3
115# Map them back to `INFO`. The names must be unique so the prefix is stripped.
116logging.addLevelName(PRINT_COMMAND, "__INFO")
117logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO")
118logging.addLevelName(TRACE_LOG, "TRACE")
121def assume_not_none(x: T | None) -> T:
122 if x is None: # pragma: no cover
123 raise ValueError(
124 'Internal error: None was given, but the receiver assumed "not None" here'
125 )
126 return x
129def _non_verbose_info(msg: str) -> None:
130 global _DEFAULT_LOGGER
131 logger = _DEFAULT_LOGGER
132 if logger is not None:
133 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg)
136def _info(msg: str) -> None:
137 global _DEFAULT_LOGGER
138 logger = _DEFAULT_LOGGER
139 if logger:
140 logger.info(msg)
141 # No fallback print for info
144def _is_trace_log_enabled() -> bool:
145 global _DEFAULT_LOGGER
146 logger = _DEFAULT_LOGGER
147 return logger is not None and logger.isEnabledFor(TRACE_LOG)
150def _trace_log(msg: str) -> None:
151 global _DEFAULT_LOGGER
152 logger = _DEFAULT_LOGGER
153 if logger:
154 logger.log(TRACE_LOG, msg)
155 # No fallback print for this level
158def _is_debug_log_enabled() -> bool:
159 global _DEFAULT_LOGGER
160 logger = _DEFAULT_LOGGER
161 return logger is not None and logger.isEnabledFor(logging.DEBUG)
164def _debug_log(msg: str) -> None:
165 global _DEFAULT_LOGGER
166 logger = _DEFAULT_LOGGER
167 if logger:
168 logger.debug(msg)
169 # No fallback print for this level
172def _error(msg: str, *, prog: str | None = None) -> "NoReturn":
173 global _DEFAULT_LOGGER
174 logger = _DEFAULT_LOGGER
175 if logger:
176 logger.error(msg)
177 else:
178 me = os.path.basename(sys.argv[0]) if prog is None else prog
179 print(
180 f"{me}: error: {msg}",
181 file=sys.stderr,
182 )
183 sys.exit(1)
186def _warn(msg: str, *, prog: str | None = None) -> None:
187 global _DEFAULT_LOGGER
188 logger = _DEFAULT_LOGGER
189 if logger:
190 logger.warning(msg)
191 else:
192 me = os.path.basename(sys.argv[0]) if prog is None else prog
194 print(
195 f"{me}: warning: {msg}",
196 file=sys.stderr,
197 )
200class ColorizedArgumentParser(argparse.ArgumentParser):
201 def error(self, message: str) -> NoReturn:
202 self.print_usage(sys.stderr)
203 _error(message, prog=self.prog)
206def ensure_dir(path: str) -> None:
207 if not os.path.isdir(path): 207 ↛ exitline 207 didn't return from function 'ensure_dir' because the condition on line 207 was always true
208 os.makedirs(path, mode=0o755, exist_ok=True)
211def _clean_path(orig_p: str, allow_and_keep_upward_segments: bool = False) -> str:
212 p = SLASH_PRUNE.sub("/", orig_p)
213 if "." in p: 213 ↛ 236line 213 didn't jump to line 236 because the condition on line 213 was always true
214 path_base = p
215 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
216 # of a normalized path to be a no-op.
217 if path_base.startswith("./"):
218 path_base = path_base[2:]
219 assert path_base
220 if allow_and_keep_upward_segments:
221 stack = []
222 for segment in path_base.split("/"):
223 if segment == ".": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 continue
225 stack.append(segment)
226 p = "/".join(stack)
227 if path_base.startswith("/"): 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 p = "/" + p
229 else:
230 for segment in path_base.split("/"):
231 if segment in (".", ".."):
232 raise ValueError(
233 'Please provide paths that are normalized (i.e., no ".." or ".").'
234 f' Offending input "{orig_p}"'
235 )
236 return p
239def _normalize_path(
240 path: str,
241 with_prefix: bool = True,
242 allow_and_keep_upward_segments: bool = False,
243) -> str:
244 path = path.strip("/")
245 if not path or path == ".": 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 return "."
247 if "//" in path or "." in path:
248 path = _clean_path(
249 path,
250 allow_and_keep_upward_segments=allow_and_keep_upward_segments,
251 )
252 if with_prefix ^ path.startswith("./"):
253 if with_prefix: 253 ↛ 256line 253 didn't jump to line 256 because the condition on line 253 was always true
254 path = "./" + path
255 else:
256 path = path[2:]
257 return path
260def _normalize_link_target(link_target: str) -> str:
261 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
262 result: list[str] = []
263 for segment in link_target.split("/"):
264 if segment in (".", ""):
265 # Ignore these - the empty string is generally a trailing slash
266 continue
267 if segment == "..":
268 # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
269 if result: 269 ↛ 263line 269 didn't jump to line 263 because the condition on line 269 was always true
270 result.pop()
271 else:
272 result.append(segment)
273 return "/".join(result)
276def manifest_format_doc(anchor: str) -> str:
277 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md"
278 return f"{manifest_format}#{anchor}" if anchor else manifest_format
281def _backslash_escape(m: re.Match[str]) -> str:
282 return "\\" + m.group(0)
285def _escape_shell_word(w: str) -> str:
286 if _SPACE_RE.search(w):
287 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None:
288 s = m.span(0)
289 assert s[0] == 0
290 prefix = w[0 : s[1]]
291 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :])
292 return f'{prefix}"{escaped_value}"'
293 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
294 return f'"{w}"'
295 return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
298def escape_shell(*args: str) -> str:
299 return " ".join(_escape_shell_word(w) for w in args)
302def render_command(
303 *args: str,
304 cwd: str | None = None,
305 env_mod: Optional["EnvironmentModification"] = None,
306) -> str:
307 env_mod_prefix = ""
308 if env_mod: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 env_mod_parts = []
310 if bool(env_mod.removals):
311 env_mod_parts.append("env")
312 if cwd is not None:
313 env_mod_parts.append(f"--chdir={escape_shell(cwd)}")
314 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals)
315 env_mod_parts.extend(
316 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements
317 )
319 chdir_prefix = ""
320 if cwd is not None and cwd != ".": 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 chdir_prefix = f"cd {escape_shell(cwd)} && "
322 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}"
325def print_command(
326 *args: str,
327 cwd: str | None = None,
328 env_mod: Optional["EnvironmentModification"] = None,
329 print_at_log_level: int = PRINT_COMMAND,
330) -> None:
331 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level):
332 return
334 rendered_cmd = render_command(
335 *args,
336 cwd=cwd,
337 env_mod=env_mod,
338 )
339 global _STDOUT_HANDLER
340 handler = _STDOUT_HANDLER
341 if handler is not None: 341 ↛ 346line 341 didn't jump to line 346 because the condition on line 341 was always true
342 handler.flush()
343 # Ensure command is output immediately so it is hanging after its output.
344 # TODO: This should `file` in case something in debputy redirects stdout
345 # (nothing does that for now)
346 print(f" {rendered_cmd}")
347 sys.stdout.flush()
350def run_command(
351 *args: str,
352 cwd: str | None = None,
353 env: Mapping[str, str] | None = None,
354 env_mod: Optional["EnvironmentModification"] = None,
355 print_at_log_level: int = PRINT_COMMAND,
356 raise_file_not_found_on_missing_command: bool = False,
357) -> None:
358 print_command(
359 *args,
360 cwd=cwd,
361 env_mod=env_mod,
362 print_at_log_level=print_at_log_level,
363 )
364 if env_mod:
365 if env is None:
366 env = os.environ
367 env = env_mod.compute_env(env)
368 if env is os.environ:
369 env = None
370 try:
371 subprocess.check_call(args, cwd=cwd, env=env)
372 # At least "clean_logic.py" relies on catching FileNotFoundError
373 except KeyboardInterrupt:
374 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}")
375 except FileNotFoundError:
376 if raise_file_not_found_on_missing_command:
377 raise
378 if "/" in args[0]:
379 _error(f"Could not run {escape_shell(args[0])}: Path does not exist")
380 # Use the `command not found` to aid existing log pattern
381 _error(f"{escape_shell(args[0])}: command not found")
382 except subprocess.CalledProcessError as e:
383 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}")
386def run_build_system_command(
387 *args: str,
388 cwd: str | None = None,
389 env: Mapping[str, str] | None = None,
390 env_mod: Optional["EnvironmentModification"] = None,
391 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND,
392 raise_file_not_found_on_missing_command: bool = False,
393) -> None:
394 run_command(
395 *args,
396 cwd=cwd,
397 env=env,
398 env_mod=env_mod,
399 print_at_log_level=print_at_log_level,
400 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command,
401 )
404def debian_policy_normalize_symlink_target(
405 link_path: str,
406 link_target: str,
407 normalize_link_path: bool = False,
408) -> str:
409 if normalize_link_path:
410 link_path = _normalize_path(link_path)
411 elif not link_path.startswith("./"): 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true
412 raise ValueError("Link part was not normalized")
414 link_path = link_path[2:]
416 if not link_target.startswith("/"):
417 link_target = "/" + os.path.dirname(link_path) + "/" + link_target
419 link_path_parts = link_path.split("/")
420 link_target_parts = [
421 s for s in _normalize_link_target(link_target).split("/") if s != "."
422 ]
424 assert link_path_parts
426 if link_target_parts and link_path_parts[0] == link_target_parts[0]:
427 # Per Debian Policy, must be relative
429 # First determine the length of the overlap
430 common_segment_count = 1
431 shortest_path_length = min(len(link_target_parts), len(link_path_parts))
432 while (
433 common_segment_count < shortest_path_length
434 and link_target_parts[common_segment_count]
435 == link_path_parts[common_segment_count]
436 ):
437 common_segment_count += 1
439 if common_segment_count == shortest_path_length and len(
440 link_path_parts
441 ) - 1 == len(link_target_parts):
442 normalized_link_target = "."
443 else:
444 up_dir_count = len(link_path_parts) - 1 - common_segment_count
445 normalized_link_target_parts = []
446 if up_dir_count:
447 up_dir_part = "../" * up_dir_count
448 # We overshoot with a single '/', so rstrip it away
449 normalized_link_target_parts.append(up_dir_part.rstrip("/"))
450 # Add the relevant down parts
451 normalized_link_target_parts.extend(
452 link_target_parts[common_segment_count:]
453 )
455 normalized_link_target = "/".join(normalized_link_target_parts)
456 else:
457 # Per Debian Policy, must be absolute
458 normalized_link_target = "/" + "/".join(link_target_parts)
460 return normalized_link_target
463def has_glob_magic(pattern: str) -> bool:
464 return glob.has_magic(pattern) or "{" in pattern
467def glob_escape(replacement_value: str) -> str:
468 if not glob.has_magic(replacement_value) or "{" not in replacement_value:
469 return replacement_value
470 return (
471 replacement_value.replace("[", "[[]")
472 .replace("]", "[]]")
473 .replace("*", "[*]")
474 .replace("?", "[?]")
475 .replace("{", "[{]")
476 .replace("}", "[}]")
477 )
480# TODO: This logic should probably be moved to `python-debian`
481def active_profiles_match(
482 profiles_raw: str,
483 active_build_profiles: set[str] | frozenset[str],
484) -> bool:
485 profiles_raw = profiles_raw.strip()
486 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 raise ValueError(
488 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
489 )
490 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
491 for profile_group_raw in profile_groups: 491 ↛ 507line 491 didn't jump to line 507 because the loop on line 491 didn't complete
492 should_process_package = True
493 for profile_name in profile_group_raw.split():
494 negation = False
495 if profile_name[0] == "!": 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true
496 negation = True
497 profile_name = profile_name[1:]
499 matched_profile = profile_name in active_build_profiles
500 if matched_profile == negation: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 should_process_package = False
502 break
504 if should_process_package: 504 ↛ 491line 504 didn't jump to line 491 because the condition on line 504 was always true
505 return True
507 return False
510def _parse_build_profiles(build_profiles_raw: str) -> frozenset[frozenset[str]]:
511 profiles_raw = build_profiles_raw.strip()
512 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 raise ValueError(
514 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
515 )
516 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
517 return frozenset(frozenset(g.split()) for g in profile_groups)
520def resolve_source_date_epoch(
521 command_line_value: int | None,
522 *,
523 substitution: Optional["Substitution"] = None,
524) -> int:
525 mtime = command_line_value
526 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
527 sde_raw = os.environ["SOURCE_DATE_EPOCH"]
528 if sde_raw == "":
529 _error("SOURCE_DATE_EPOCH is set but empty.")
530 mtime = int(sde_raw)
531 if mtime is None and substitution is not None:
532 try:
533 sde_raw = substitution.substitute(
534 "{{SOURCE_DATE_EPOCH}}",
535 "Internal resolution",
536 )
537 mtime = int(sde_raw)
538 except (DebputySubstitutionError, ValueError):
539 pass
540 if mtime is None:
541 mtime = int(time.time())
542 os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
543 return mtime
546def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
547 with open(os.path.join(control_root_dir, "control")) as fd:
548 control_file = Deb822(fd)
550 package_name = control_file["Package"]
551 package_version = control_file["Version"]
552 if "Architecture-Variant" in control_file:
553 package_architecture = control_file["Architecture-Variant"]
554 else:
555 package_architecture = control_file["Architecture"]
556 extension = control_file.get("Package-Type") or "deb"
557 if ":" in package_version:
558 package_version = package_version.split(":", 1)[1]
559 if is_udeb:
560 extension = "udeb"
562 return f"{package_name}_{package_version}_{package_architecture}.{extension}"
565_SCRATCH_DIR = None
566_DH_INTEGRATION_MODE = False
569def integrated_with_debhelper() -> None:
570 global _DH_INTEGRATION_MODE
571 _DH_INTEGRATION_MODE = True
574def scratch_dir(*, create_if_not_exists: bool = True) -> str:
575 global _SCRATCH_DIR
576 if _SCRATCH_DIR is not None:
577 return _SCRATCH_DIR
578 debputy_scratch_dir = "debian/.debputy/scratch-dir"
579 is_debputy_dir = True
580 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 580 ↛ 581line 580 didn't jump to line 581 because the condition on line 580 was never true
581 _SCRATCH_DIR = debputy_scratch_dir
582 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
584 is_debputy_dir = False
585 else:
586 _SCRATCH_DIR = debputy_scratch_dir
587 if create_if_not_exists: 587 ↛ 591line 587 didn't jump to line 591 because the condition on line 587 was always true
588 ensure_dir(_SCRATCH_DIR)
589 if is_debputy_dir: 589 ↛ 591line 589 didn't jump to line 591 because the condition on line 589 was always true
590 generated_root_directory("debian/.debputy", internal_only=True)
591 return _SCRATCH_DIR
594def generated_root_directory(path: str, *, internal_only: bool = False) -> None:
595 root_dir = Path(path)
596 (root_dir / ".gitignore").write_text("*\n")
597 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule
598 if internal_only: 598 ↛ exitline 598 didn't return from function 'generated_root_directory' because the condition on line 598 was always true
599 (root_dir / "CACHEDIR.TAG").write_bytes(
600 b"Signature: 8a477f597d28d172789f06886806bc55"
601 )
604_RUNTIME_CONTAINER_DIR_KEY: str | None = None
607def generated_content_dir(
608 *,
609 package: Optional["BinaryPackage"] = None,
610 subdir_key: str | None = None,
611) -> str:
612 global _RUNTIME_CONTAINER_DIR_KEY
613 container_dir = _RUNTIME_CONTAINER_DIR_KEY
614 first_run = False
616 if container_dir is None:
617 first_run = True
618 container_dir = f"_pb-{os.getpid()}"
619 _RUNTIME_CONTAINER_DIR_KEY = container_dir
621 directory = os.path.join(scratch_dir(), container_dir)
623 if first_run and os.path.isdir(directory): 623 ↛ 628line 623 didn't jump to line 628 because the condition on line 623 was never true
624 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
625 # see "stale" data.
626 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
627 # reliable enough for that and we do not have an obvious hook for it.
628 shutil.rmtree(directory)
630 directory = os.path.join(
631 directory,
632 "generated-fs-content",
633 f"pkg_{package.name}" if package else "no-package",
634 )
635 if subdir_key is not None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 directory = os.path.join(directory, subdir_key)
638 os.makedirs(directory, exist_ok=True)
639 return directory
642PerlConfigVars = collections.namedtuple(
643 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"]
644)
645PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
646_PERL_MODULE_DIRS: dict[str, PerlConfigVars] = {}
649@functools.lru_cache(1)
650def _perl_config_data() -> PerlConfigData:
651 d = (
652 subprocess.check_output(
653 [
654 "perl",
655 "-MConfig",
656 "-e",
657 'print "$Config{version}\n$Config{debian_abi}\n"',
658 ]
659 )
660 .decode("utf-8")
661 .splitlines()
662 )
663 return PerlConfigData(*d)
666def _perl_version() -> str:
667 return _perl_config_data().version
670def perlxs_api_dependency() -> str:
671 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
672 # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
673 config = _perl_config_data()
674 if config.debian_abi is not None and config.debian_abi != "":
675 return f"perlapi-{config.debian_abi}"
676 return f"perlapi-{config.version}"
679def resolve_perl_config(
680 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
681 dctrl_bin: Optional["BinaryPackage"],
682) -> PerlConfigVars:
683 global _PERL_MODULE_DIRS
684 if dpkg_architecture_variables.is_cross_compiling: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true
685 if dctrl_bin is not None:
686 arch = dctrl_bin.resolved_architecture
687 ma = dctrl_bin.deb_multiarch
688 else:
689 arch = dpkg_architecture_variables.current_host_arch
690 ma = dpkg_architecture_variables.current_host_multiarch
691 else:
692 # We are de facto using the build-arch version of perl here; be explicit
693 arch = "_build_arch_"
694 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"]
695 config_vars = _PERL_MODULE_DIRS.get(arch)
696 if config_vars is None:
697 cmd = ["perl"]
698 if dpkg_architecture_variables.is_cross_compiling: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 version = _perl_version()
700 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}"
701 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
702 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
703 # cross builds... meh.
704 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")):
705 cmd.append(f"-I{cross_inc_dir}")
706 else:
707 cross_inc_dir = None
708 cmd.extend(
709 [
710 "-MConfig",
711 "-e",
712 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"',
713 ]
714 )
715 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
716 if len(output) != 4: 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true
717 raise ValueError(
718 "Internal error: Unable to determine the perl include directories:"
719 f" Raw output from perl snippet: {output}"
720 )
721 config_vars = PerlConfigVars(
722 vendorlib="/" + _normalize_path(output[0], with_prefix=False),
723 vendorarch="/" + _normalize_path(output[1], with_prefix=False),
724 cross_inc_dir=cross_inc_dir,
725 ld=output[2],
726 path_sep=output[3],
727 )
728 _PERL_MODULE_DIRS[arch] = config_vars
729 return config_vars
732@functools.lru_cache(1)
733def detect_fakeroot() -> bool:
734 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
735 return False
736 env = dict(os.environ)
737 del env["LD_PRELOAD"]
738 try:
739 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
740 except subprocess.CalledProcessError:
741 print(
742 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
743 file=sys.stderr,
744 )
745 return False
748@functools.lru_cache(1)
749def _sc_arg_max() -> int | None:
750 try:
751 return os.sysconf("SC_ARG_MAX")
752 except RuntimeError:
753 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
754 return None
757def _split_xargs_args(
758 static_cmd: Sequence[str],
759 max_args_byte_len: int,
760 varargs: Iterable[str],
761 reuse_list_ok: bool,
762) -> Iterator[list[str]]:
763 static_cmd_len = len(static_cmd)
764 remaining_len = max_args_byte_len
765 pending_args = list(static_cmd)
766 for arg in varargs:
767 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
768 remaining_len -= arg_len
769 if not remaining_len:
770 if len(pending_args) <= static_cmd_len:
771 raise ValueError(
772 f"Could not fit a single argument into the command line !?"
773 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
774 )
775 yield pending_args
776 remaining_len = max_args_byte_len - arg_len
777 if reuse_list_ok:
778 pending_args.clear()
779 pending_args.extend(static_cmd)
780 else:
781 pending_args = list(static_cmd)
782 pending_args.append(arg)
784 if len(pending_args) > static_cmd_len:
785 yield pending_args
788def xargs(
789 static_cmd: Sequence[str],
790 varargs: Iterable[str],
791 *,
792 env: Mapping[str, str] | None = None,
793 reuse_list_ok: bool = False,
794) -> Iterator[list[str]]:
795 max_args_bytes = _sc_arg_max()
796 # len overshoots with one space explaining the -1. The _split_xargs_args
797 # will account for the space for the first argument
798 static_byte_len = (
799 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
800 )
801 if max_args_bytes is not None:
802 if env is None:
803 # +2 for nul bytes after key and value
804 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
805 else:
806 # +2 for nul bytes after key and value
807 static_byte_len += sum(
808 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
809 for k, v in env.items()
810 )
811 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
812 # that)
813 static_byte_len += 2 * 4096
814 else:
815 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
816 max_args_bytes = 20_000
817 remain_len = max_args_bytes - static_byte_len
818 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
821# itertools recipe
822def grouper(
823 iterable: Iterable[T],
824 n: int,
825 *,
826 incomplete: Literal["fill", "strict", "ignore"] = "fill",
827 fillvalue: T | None = None,
828) -> Iterator[tuple[T, ...]]:
829 """Collect data into non-overlapping fixed-length chunks or blocks"""
830 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
831 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
832 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
833 args = [iter(iterable)] * n
834 if incomplete == "fill": 834 ↛ 835line 834 didn't jump to line 835 because the condition on line 834 was never true
835 return zip_longest(*args, fillvalue=fillvalue)
836 if incomplete == "strict": 836 ↛ 838line 836 didn't jump to line 838 because the condition on line 836 was always true
837 return zip(*args, strict=True)
838 if incomplete == "ignore":
839 return zip(*args)
840 else:
841 raise ValueError("Expected fill, strict, or ignore")
844_LOGGING_SET_UP = False
847def _check_color() -> tuple[bool, bool, str | None]:
848 dpkg_or_default = os.environ.get(
849 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
850 )
851 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
852 bad_request = None
853 if requested_color not in {"auto", "always", "never"}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true
854 bad_request = requested_color
855 requested_color = "auto"
857 if requested_color == "auto": 857 ↛ 861line 857 didn't jump to line 861 because the condition on line 857 was always true
858 stdout_color = sys.stdout.isatty()
859 stderr_color = sys.stdout.isatty()
860 else:
861 enable = requested_color == "always"
862 stdout_color = enable
863 stderr_color = enable
864 return stdout_color, stderr_color, bad_request
867def program_name() -> str:
868 name = os.path.basename(sys.argv[0])
869 if name.endswith(".py"): 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true
870 name = name[:-3]
871 if name == "__main__": 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 name = os.path.basename(os.path.dirname(sys.argv[0]))
873 # FIXME: Not optimal that we have to hardcode these kind of things here
874 if name == "debputy_cmd": 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true
875 name = "debputy"
876 return name
879def package_cross_check_precheck(
880 pkg_a: "BinaryPackage",
881 pkg_b: "BinaryPackage",
882) -> tuple[bool, bool]:
883 """Whether these two packages can do content cross-checks
885 :param pkg_a: The first package
886 :param pkg_b: The second package
887 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
888 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
889 that involves binary_package_a. Both can be True and both can be False at the same time, which
890 happens in common cases (arch:all + arch:any cases both to be False as a common example).
891 """
893 # Handle the two most obvious base-cases
894 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
895 return False, False
896 if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
897 return False, False
899 a_may_see_b = True
900 b_may_see_a = True
902 a_bp = pkg_a.fields.get("Build-Profiles", "")
903 b_bp = pkg_b.fields.get("Build-Profiles", "")
905 if a_bp != b_bp:
906 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
907 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
909 # Check for build profiles being identically but just ordered differently.
910 if a_bp_set != b_bp_set:
911 # For simplicity, we let groups cancel each other out. If one side has no clauses
912 # left, then it will always be built when the other is built.
913 #
914 # Eventually, someone will be here with a special case where more complex logic is
915 # required. Good luck to you! Remember to add test cases for it (the existing logic
916 # has some for a reason and if the logic is going to be more complex, it will need
917 # tests cases to assert it fixes the problem and does not regress)
918 if a_bp_set - b_bp_set:
919 a_may_see_b = False
920 if b_bp_set - a_bp_set:
921 b_may_see_a = False
923 if pkg_a.declared_architecture != pkg_b.declared_architecture:
924 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
925 if pkg_a.declared_architecture != "any": 925 ↛ 927line 925 didn't jump to line 927 because the condition on line 925 was always true
926 b_may_see_a = False
927 if pkg_a.declared_architecture != "any": 927 ↛ 930line 927 didn't jump to line 930 because the condition on line 927 was always true
928 a_may_see_b = False
930 return a_may_see_b, b_may_see_a
933def change_log_level(
934 log_level: int,
935) -> None:
936 if _DEFAULT_LOGGER is not None: 936 ↛ 938line 936 didn't jump to line 938 because the condition on line 936 was always true
937 _DEFAULT_LOGGER.setLevel(log_level)
938 logging.getLogger("").setLevel(log_level)
941def current_log_level() -> int | None:
942 if _DEFAULT_LOGGER is not None:
943 return _DEFAULT_LOGGER.level
944 return None
947def setup_logging(
948 *,
949 log_only_to_stderr: bool = False,
950 reconfigure_logging: bool = False,
951) -> None:
952 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
953 if _LOGGING_SET_UP and not reconfigure_logging: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true
954 raise RuntimeError(
955 "Logging has already been configured."
956 " Use reconfigure_logging=True if you need to reconfigure it"
957 )
958 stdout_color, stderr_color, bad_request = _check_color()
959 colors: dict[str, str] | None = None
961 if stdout_color or stderr_color: 961 ↛ 962line 961 didn't jump to line 962 because the condition on line 961 was never true
962 try:
963 import colorlog
965 except ImportError:
966 stdout_color = False
967 stderr_color = False
968 else:
969 colors = dict(colorlog.default_log_colors)
970 # Add our custom levels.
971 colors["_INFO"] = colors["INFO"]
972 colors["__INFO"] = colors["INFO"]
974 if log_only_to_stderr:
975 stdout = sys.stderr
976 stdout_color = stderr_color
977 else:
978 stdout = sys.stdout
980 class LogLevelFilter(logging.Filter):
981 def __init__(self, threshold: int, above: bool):
982 super().__init__()
983 self.threshold = threshold
984 self.above = above
986 def filter(self, record: logging.LogRecord) -> bool:
987 if self.above:
988 return record.levelno >= self.threshold
989 else:
990 return record.levelno < self.threshold
992 color_format = (
993 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
994 )
995 colorless_format = "{name}: {levelnamelower}: {message}"
997 existing_stdout_handler = _STDOUT_HANDLER
998 existing_stderr_handler = _STDERR_HANDLER
1000 if stdout_color: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true
1001 stdout_handler = colorlog.StreamHandler(stdout)
1002 stdout_handler.setFormatter(
1003 colorlog.ColoredFormatter(
1004 color_format,
1005 style="{",
1006 force_color=True,
1007 log_colors=colors,
1008 )
1009 )
1010 logger = colorlog.getLogger()
1011 if existing_stdout_handler is not None:
1012 logger.removeHandler(existing_stdout_handler)
1013 _STDOUT_HANDLER = stdout_handler
1014 logger.addHandler(stdout_handler)
1015 else:
1016 stdout_handler = logging.StreamHandler(stdout)
1017 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1018 logger = logging.getLogger()
1019 if existing_stdout_handler is not None:
1020 logger.removeHandler(existing_stdout_handler)
1021 _STDOUT_HANDLER = stdout_handler
1022 logger.addHandler(stdout_handler)
1024 if stderr_color: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true
1025 stderr_handler = colorlog.StreamHandler(sys.stderr)
1026 stderr_handler.setFormatter(
1027 colorlog.ColoredFormatter(
1028 color_format,
1029 style="{",
1030 force_color=True,
1031 log_colors=colors,
1032 )
1033 )
1034 logger = logging.getLogger()
1035 if existing_stderr_handler is not None:
1036 logger.removeHandler(existing_stderr_handler)
1037 _STDERR_HANDLER = stderr_handler
1038 logger.addHandler(stderr_handler)
1039 else:
1040 stderr_handler = logging.StreamHandler(sys.stderr)
1041 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1042 logger = logging.getLogger()
1043 if existing_stderr_handler is not None:
1044 logger.removeHandler(existing_stderr_handler)
1045 _STDERR_HANDLER = stderr_handler
1046 logger.addHandler(stderr_handler)
1048 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
1049 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
1051 name = program_name()
1053 old_factory = logging.getLogRecordFactory()
1055 def record_factory(
1056 *args: Any, **kwargs: Any
1057 ) -> logging.LogRecord: # pragma: no cover
1058 record = old_factory(*args, **kwargs)
1059 record.levelname = record.levelname.lstrip("_")
1060 record.levelnamelower = record.levelname.lower()
1061 return record
1063 logging.setLogRecordFactory(record_factory)
1065 logging.getLogger().setLevel(logging.WARN)
1066 _DEFAULT_LOGGER = logging.getLogger(name)
1068 if bad_request: 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true
1069 _DEFAULT_LOGGER.warning(
1070 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
1071 ' Resetting to "auto".'
1072 )
1074 _LOGGING_SET_UP = True