Coverage for src/debputy/util.py: 62%
544 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import argparse
2import collections
3import functools
4import glob
5import logging
6import os
7import re
8import shutil
9import subprocess
10import sys
11import time
12from itertools import zip_longest
13from pathlib import Path
14from typing import (
15 NoReturn,
16 TYPE_CHECKING,
17 Union,
18 Set,
19 FrozenSet,
20 Optional,
21 TypeVar,
22 Dict,
23 Iterator,
24 Iterable,
25 Literal,
26 Tuple,
27 Sequence,
28 List,
29 Mapping,
30 Any,
31)
33from debian.deb822 import Deb822
35from debputy import DEBPUTY_DOC_ROOT_DIR
36from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
37from debputy.exceptions import DebputySubstitutionError
39try:
40 from Levenshtein import distance
41except ImportError:
43 CAN_DETECT_TYPOS = False
45 def detect_possible_typo(
46 provided_value: str,
47 known_values: Iterable[str],
48 *,
49 max_edit_distance: int = 2,
50 ) -> Sequence[str]:
51 return ()
53else:
55 CAN_DETECT_TYPOS = True
57 def detect_possible_typo(
58 provided_value: str,
59 known_values: Iterable[str],
60 *,
61 max_edit_distance: int = 2,
62 ) -> Sequence[str]:
63 k_len = len(provided_value)
64 candidates = []
65 for known_value in known_values:
66 if abs(k_len - len(known_value)) > max_edit_distance:
67 continue
68 d = distance(provided_value, known_value)
69 if d > max_edit_distance:
70 continue
71 candidates.append(known_value)
72 return candidates
75if TYPE_CHECKING:
76 from debputy.types import EnvironmentModification
77 from debputy.packages import BinaryPackage
78 from debputy.substitution import Substitution
81T = TypeVar("T")
84SLASH_PRUNE = re.compile("//+")
85PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
86PKGVERSION_REGEX = re.compile(
87 r"""
88 (?: \d+ : )? # Optional epoch
89 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
90 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
91""",
92 re.VERBOSE | re.ASCII,
93)
94DEFAULT_PACKAGE_TYPE = "deb"
95DBGSYM_PACKAGE_TYPE = "deb"
96UDEB_PACKAGE_TYPE = "udeb"
98POSTINST_DEFAULT_CONDITION = (
99 '[ "$1" = "configure" ]'
100 ' || [ "$1" = "abort-upgrade" ]'
101 ' || [ "$1" = "abort-deconfigure" ]'
102 ' || [ "$1" = "abort-remove" ]'
103)
106_SPACE_RE = re.compile(r"\s")
107_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=")
108_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
109_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""")
110_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
111_DEFAULT_LOGGER: Optional[logging.Logger] = None
112_STDOUT_HANDLER: Optional[logging.StreamHandler[Any]] = None
113_STDERR_HANDLER: Optional[logging.StreamHandler[Any]] = None
114PRINT_COMMAND = logging.INFO + 3
115PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3
116TRACE_LOG = logging.DEBUG + 3
118# Map them back to `INFO`. The names must be unique so the prefix is stripped.
119logging.addLevelName(PRINT_COMMAND, "__INFO")
120logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO")
121logging.addLevelName(TRACE_LOG, "TRACE")
124def assume_not_none(x: Optional[T]) -> T:
125 if x is None: # pragma: no cover
126 raise ValueError(
127 'Internal error: None was given, but the receiver assumed "not None" here'
128 )
129 return x
132def _non_verbose_info(msg: str) -> None:
133 global _DEFAULT_LOGGER
134 logger = _DEFAULT_LOGGER
135 if logger is not None:
136 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg)
139def _info(msg: str) -> None:
140 global _DEFAULT_LOGGER
141 logger = _DEFAULT_LOGGER
142 if logger:
143 logger.info(msg)
144 # No fallback print for info
147def _is_trace_log_enabled() -> bool:
148 global _DEFAULT_LOGGER
149 logger = _DEFAULT_LOGGER
150 return logger is not None and logger.isEnabledFor(TRACE_LOG)
153def _trace_log(msg: str) -> None:
154 global _DEFAULT_LOGGER
155 logger = _DEFAULT_LOGGER
156 if logger:
157 logger.log(TRACE_LOG, msg)
158 # No fallback print for this level
161def _is_debug_log_enabled() -> bool:
162 global _DEFAULT_LOGGER
163 logger = _DEFAULT_LOGGER
164 return logger is not None and logger.isEnabledFor(logging.DEBUG)
167def _debug_log(msg: str) -> None:
168 global _DEFAULT_LOGGER
169 logger = _DEFAULT_LOGGER
170 if logger:
171 logger.debug(msg)
172 # No fallback print for this level
175def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn":
176 global _DEFAULT_LOGGER
177 logger = _DEFAULT_LOGGER
178 if logger:
179 logger.error(msg)
180 else:
181 me = os.path.basename(sys.argv[0]) if prog is None else prog
182 print(
183 f"{me}: error: {msg}",
184 file=sys.stderr,
185 )
186 sys.exit(1)
189def _warn(msg: str, *, prog: Optional[str] = None) -> None:
190 global _DEFAULT_LOGGER
191 logger = _DEFAULT_LOGGER
192 if logger:
193 logger.warning(msg)
194 else:
195 me = os.path.basename(sys.argv[0]) if prog is None else prog
197 print(
198 f"{me}: warning: {msg}",
199 file=sys.stderr,
200 )
203class ColorizedArgumentParser(argparse.ArgumentParser):
204 def error(self, message: str) -> NoReturn:
205 self.print_usage(sys.stderr)
206 _error(message, prog=self.prog)
209def ensure_dir(path: str) -> None:
210 if not os.path.isdir(path): 210 ↛ exitline 210 didn't return from function 'ensure_dir' because the condition on line 210 was always true
211 os.makedirs(path, mode=0o755, exist_ok=True)
214def _clean_path(orig_p: str) -> str:
215 p = SLASH_PRUNE.sub("/", orig_p)
216 if "." in p: 216 ↛ 229line 216 didn't jump to line 229 because the condition on line 216 was always true
217 path_base = p
218 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
219 # of a normalized path to be a no-op.
220 if path_base.startswith("./"):
221 path_base = path_base[2:]
222 assert path_base
223 for segment in path_base.split("/"):
224 if segment in (".", ".."):
225 raise ValueError(
226 'Please provide paths that are normalized (i.e., no ".." or ".").'
227 f' Offending input "{orig_p}"'
228 )
229 return p
232def _normalize_path(path: str, with_prefix: bool = True) -> str:
233 path = path.strip("/")
234 if not path or path == ".": 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 return "."
236 if "//" in path or "." in path:
237 path = _clean_path(path)
238 if with_prefix ^ path.startswith("./"):
239 if with_prefix: 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true
240 path = "./" + path
241 else:
242 path = path[2:]
243 return path
246def _normalize_link_target(link_target: str) -> str:
247 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
248 result: List[str] = []
249 for segment in link_target.split("/"):
250 if segment in (".", ""):
251 # Ignore these - the empty string is generally a trailing slash
252 continue
253 if segment == "..":
254 # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
255 if result: 255 ↛ 249line 255 didn't jump to line 249 because the condition on line 255 was always true
256 result.pop()
257 else:
258 result.append(segment)
259 return "/".join(result)
262def manifest_format_doc(anchor: str) -> str:
263 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md"
264 return f"{manifest_format}#{anchor}" if anchor else manifest_format
267def _backslash_escape(m: re.Match[str]) -> str:
268 return "\\" + m.group(0)
271def _escape_shell_word(w: str) -> str:
272 if _SPACE_RE.search(w):
273 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None:
274 s = m.span(0)
275 assert s[0] == 0
276 prefix = w[0 : s[1]]
277 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :])
278 return f'{prefix}"{escaped_value}"'
279 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
280 return f'"{w}"'
281 return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
284def escape_shell(*args: str) -> str:
285 return " ".join(_escape_shell_word(w) for w in args)
288def render_command(
289 *args: str,
290 cwd: Optional[str] = None,
291 env_mod: Optional["EnvironmentModification"] = None,
292) -> str:
293 env_mod_prefix = ""
294 if env_mod:
295 env_mod_parts = []
296 if bool(env_mod.removals):
297 env_mod_parts.append("env")
298 if cwd is not None:
299 env_mod_parts.append(f"--chdir={escape_shell(cwd)}")
300 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals)
301 env_mod_parts.extend(
302 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements
303 )
305 chdir_prefix = ""
306 if cwd is not None and cwd != ".":
307 chdir_prefix = f"cd {escape_shell(cwd)} && "
308 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}"
311def print_command(
312 *args: str,
313 cwd: Optional[str] = None,
314 env_mod: Optional["EnvironmentModification"] = None,
315 print_at_log_level: int = PRINT_COMMAND,
316) -> None:
317 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true
318 return
320 rendered_cmd = render_command(
321 *args,
322 cwd=cwd,
323 env_mod=env_mod,
324 )
325 print(f" {rendered_cmd}")
328def run_command(
329 *args: str,
330 cwd: Optional[str] = None,
331 env: Optional[Mapping[str, str]] = None,
332 env_mod: Optional["EnvironmentModification"] = None,
333 print_at_log_level: int = PRINT_COMMAND,
334 raise_file_not_found_on_missing_command: bool = False,
335) -> None:
336 print_command(
337 *args,
338 cwd=cwd,
339 env_mod=env_mod,
340 print_at_log_level=print_at_log_level,
341 )
342 if env_mod:
343 if env is None:
344 env = os.environ
345 env = env_mod.compute_env(env)
346 if env is os.environ:
347 env = None
348 try:
349 subprocess.check_call(args, cwd=cwd, env=env)
350 # At least "clean_logic.py" relies on catching FileNotFoundError
351 except KeyboardInterrupt:
352 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}")
353 except FileNotFoundError:
354 if raise_file_not_found_on_missing_command:
355 raise
356 if "/" in args[0]:
357 _error(f"Could not run {escape_shell(args[0])}: Path does not exist")
358 # Use the `command not found` to aid existing log pattern
359 _error(f"{escape_shell(args[0])}: command not found")
360 except subprocess.CalledProcessError as e:
361 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}")
364def run_build_system_command(
365 *args: str,
366 cwd: Optional[str] = None,
367 env: Optional[Mapping[str, str]] = None,
368 env_mod: Optional["EnvironmentModification"] = None,
369 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND,
370 raise_file_not_found_on_missing_command: bool = False,
371) -> None:
372 run_command(
373 *args,
374 cwd=cwd,
375 env=env,
376 env_mod=env_mod,
377 print_at_log_level=print_at_log_level,
378 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command,
379 )
382def debian_policy_normalize_symlink_target(
383 link_path: str,
384 link_target: str,
385 normalize_link_path: bool = False,
386) -> str:
387 if normalize_link_path:
388 link_path = _normalize_path(link_path)
389 elif not link_path.startswith("./"): 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true
390 raise ValueError("Link part was not normalized")
392 link_path = link_path[2:]
394 if not link_target.startswith("/"):
395 link_target = "/" + os.path.dirname(link_path) + "/" + link_target
397 link_path_parts = link_path.split("/")
398 link_target_parts = [
399 s for s in _normalize_link_target(link_target).split("/") if s != "."
400 ]
402 assert link_path_parts
404 if link_target_parts and link_path_parts[0] == link_target_parts[0]:
405 # Per Debian Policy, must be relative
407 # First determine the length of the overlap
408 common_segment_count = 1
409 shortest_path_length = min(len(link_target_parts), len(link_path_parts))
410 while (
411 common_segment_count < shortest_path_length
412 and link_target_parts[common_segment_count]
413 == link_path_parts[common_segment_count]
414 ):
415 common_segment_count += 1
417 if common_segment_count == shortest_path_length and len(
418 link_path_parts
419 ) - 1 == len(link_target_parts):
420 normalized_link_target = "."
421 else:
422 up_dir_count = len(link_path_parts) - 1 - common_segment_count
423 normalized_link_target_parts = []
424 if up_dir_count:
425 up_dir_part = "../" * up_dir_count
426 # We overshoot with a single '/', so rstrip it away
427 normalized_link_target_parts.append(up_dir_part.rstrip("/"))
428 # Add the relevant down parts
429 normalized_link_target_parts.extend(
430 link_target_parts[common_segment_count:]
431 )
433 normalized_link_target = "/".join(normalized_link_target_parts)
434 else:
435 # Per Debian Policy, must be absolute
436 normalized_link_target = "/" + "/".join(link_target_parts)
438 return normalized_link_target
441def has_glob_magic(pattern: str) -> bool:
442 return glob.has_magic(pattern) or "{" in pattern
445def glob_escape(replacement_value: str) -> str:
446 if not glob.has_magic(replacement_value) or "{" not in replacement_value:
447 return replacement_value
448 return (
449 replacement_value.replace("[", "[[]")
450 .replace("]", "[]]")
451 .replace("*", "[*]")
452 .replace("?", "[?]")
453 .replace("{", "[{]")
454 .replace("}", "[}]")
455 )
458# TODO: This logic should probably be moved to `python-debian`
459def active_profiles_match(
460 profiles_raw: str,
461 active_build_profiles: Union[Set[str], FrozenSet[str]],
462) -> bool:
463 profiles_raw = profiles_raw.strip()
464 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true
465 raise ValueError(
466 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
467 )
468 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
469 for profile_group_raw in profile_groups: 469 ↛ 485line 469 didn't jump to line 485 because the loop on line 469 didn't complete
470 should_process_package = True
471 for profile_name in profile_group_raw.split():
472 negation = False
473 if profile_name[0] == "!": 473 ↛ 477line 473 didn't jump to line 477 because the condition on line 473 was always true
474 negation = True
475 profile_name = profile_name[1:]
477 matched_profile = profile_name in active_build_profiles
478 if matched_profile == negation: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 should_process_package = False
480 break
482 if should_process_package: 482 ↛ 469line 482 didn't jump to line 469 because the condition on line 482 was always true
483 return True
485 return False
488def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]:
489 profiles_raw = build_profiles_raw.strip()
490 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true
491 raise ValueError(
492 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
493 )
494 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
495 return frozenset(frozenset(g.split()) for g in profile_groups)
498def resolve_source_date_epoch(
499 command_line_value: Optional[int],
500 *,
501 substitution: Optional["Substitution"] = None,
502) -> int:
503 mtime = command_line_value
504 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
505 sde_raw = os.environ["SOURCE_DATE_EPOCH"]
506 if sde_raw == "":
507 _error("SOURCE_DATE_EPOCH is set but empty.")
508 mtime = int(sde_raw)
509 if mtime is None and substitution is not None:
510 try:
511 sde_raw = substitution.substitute(
512 "{{SOURCE_DATE_EPOCH}}",
513 "Internal resolution",
514 )
515 mtime = int(sde_raw)
516 except (DebputySubstitutionError, ValueError):
517 pass
518 if mtime is None:
519 mtime = int(time.time())
520 os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
521 return mtime
524def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
525 with open(os.path.join(control_root_dir, "control"), "rt") as fd:
526 control_file = Deb822(fd)
528 package_name = control_file["Package"]
529 package_version = control_file["Version"]
530 package_architecture = control_file["Architecture"]
531 extension = control_file.get("Package-Type") or "deb"
532 if ":" in package_version:
533 package_version = package_version.split(":", 1)[1]
534 if is_udeb:
535 extension = "udeb"
537 return f"{package_name}_{package_version}_{package_architecture}.{extension}"
540_SCRATCH_DIR = None
541_DH_INTEGRATION_MODE = False
544def integrated_with_debhelper() -> None:
545 global _DH_INTEGRATION_MODE
546 _DH_INTEGRATION_MODE = True
549def scratch_dir(*, create_if_not_exists: bool = True) -> str:
550 global _SCRATCH_DIR
551 if _SCRATCH_DIR is not None:
552 return _SCRATCH_DIR
553 debputy_scratch_dir = "debian/.debputy/scratch-dir"
554 is_debputy_dir = True
555 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 _SCRATCH_DIR = debputy_scratch_dir
557 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
559 is_debputy_dir = False
560 else:
561 _SCRATCH_DIR = debputy_scratch_dir
562 if create_if_not_exists: 562 ↛ 566line 562 didn't jump to line 566 because the condition on line 562 was always true
563 ensure_dir(_SCRATCH_DIR)
564 if is_debputy_dir: 564 ↛ 566line 564 didn't jump to line 566 because the condition on line 564 was always true
565 generated_root_directory("debian/.debputy", internal_only=True)
566 return _SCRATCH_DIR
569def generated_root_directory(path: str, *, internal_only: bool = False) -> None:
570 root_dir = Path(path)
571 (root_dir / ".gitignore").write_text("*\n")
572 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule
573 if internal_only: 573 ↛ exitline 573 didn't return from function 'generated_root_directory' because the condition on line 573 was always true
574 (root_dir / "CACHEDIR.TAG").write_bytes(
575 b"Signature: 8a477f597d28d172789f06886806bc55"
576 )
579_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None
582def generated_content_dir(
583 *,
584 package: Optional["BinaryPackage"] = None,
585 subdir_key: Optional[str] = None,
586) -> str:
587 global _RUNTIME_CONTAINER_DIR_KEY
588 container_dir = _RUNTIME_CONTAINER_DIR_KEY
589 first_run = False
591 if container_dir is None:
592 first_run = True
593 container_dir = f"_pb-{os.getpid()}"
594 _RUNTIME_CONTAINER_DIR_KEY = container_dir
596 directory = os.path.join(scratch_dir(), container_dir)
598 if first_run and os.path.isdir(directory): 598 ↛ 603line 598 didn't jump to line 603 because the condition on line 598 was never true
599 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
600 # see "stale" data.
601 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
602 # reliable enough for that and we do not have an obvious hook for it.
603 shutil.rmtree(directory)
605 directory = os.path.join(
606 directory,
607 "generated-fs-content",
608 f"pkg_{package.name}" if package else "no-package",
609 )
610 if subdir_key is not None: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true
611 directory = os.path.join(directory, subdir_key)
613 os.makedirs(directory, exist_ok=True)
614 return directory
617PerlConfigVars = collections.namedtuple(
618 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"]
619)
620PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
621_PERL_MODULE_DIRS: Dict[str, PerlConfigVars] = {}
624@functools.lru_cache(1)
625def _perl_config_data() -> PerlConfigData:
626 d = (
627 subprocess.check_output(
628 [
629 "perl",
630 "-MConfig",
631 "-e",
632 'print "$Config{version}\n$Config{debian_abi}\n"',
633 ]
634 )
635 .decode("utf-8")
636 .splitlines()
637 )
638 return PerlConfigData(*d)
641def _perl_version() -> str:
642 return _perl_config_data().version
645def perlxs_api_dependency() -> str:
646 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
647 # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
648 config = _perl_config_data()
649 if config.debian_abi is not None and config.debian_abi != "":
650 return f"perlapi-{config.debian_abi}"
651 return f"perlapi-{config.version}"
654def resolve_perl_config(
655 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
656 dctrl_bin: Optional["BinaryPackage"],
657) -> PerlConfigVars:
658 global _PERL_MODULE_DIRS
659 if dpkg_architecture_variables.is_cross_compiling: 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true
660 if dctrl_bin is not None:
661 arch = dctrl_bin.resolved_architecture
662 ma = dctrl_bin.deb_multiarch
663 else:
664 arch = dpkg_architecture_variables.current_host_arch
665 ma = dpkg_architecture_variables.current_host_multiarch
666 else:
667 # We are de facto using the build-arch version of perl here; be explicit
668 arch = "_build_arch_"
669 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"]
670 config_vars = _PERL_MODULE_DIRS.get(arch)
671 if config_vars is None:
672 cmd = ["perl"]
673 if dpkg_architecture_variables.is_cross_compiling: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 version = _perl_version()
675 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}"
676 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
677 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
678 # cross builds... meh.
679 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")):
680 cmd.append(f"-I{cross_inc_dir}")
681 else:
682 cross_inc_dir = None
683 cmd.extend(
684 [
685 "-MConfig",
686 "-e",
687 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"',
688 ]
689 )
690 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
691 if len(output) != 4: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true
692 raise ValueError(
693 "Internal error: Unable to determine the perl include directories:"
694 f" Raw output from perl snippet: {output}"
695 )
696 config_vars = PerlConfigVars(
697 vendorlib="/" + _normalize_path(output[0], with_prefix=False),
698 vendorarch="/" + _normalize_path(output[1], with_prefix=False),
699 cross_inc_dir=cross_inc_dir,
700 ld=output[2],
701 path_sep=output[3],
702 )
703 _PERL_MODULE_DIRS[arch] = config_vars
704 return config_vars
707@functools.lru_cache(1)
708def detect_fakeroot() -> bool:
709 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
710 return False
711 env = dict(os.environ)
712 del env["LD_PRELOAD"]
713 try:
714 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
715 except subprocess.CalledProcessError:
716 print(
717 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
718 file=sys.stderr,
719 )
720 return False
723@functools.lru_cache(1)
724def _sc_arg_max() -> Optional[int]:
725 try:
726 return os.sysconf("SC_ARG_MAX")
727 except RuntimeError:
728 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
729 return None
732def _split_xargs_args(
733 static_cmd: Sequence[str],
734 max_args_byte_len: int,
735 varargs: Iterable[str],
736 reuse_list_ok: bool,
737) -> Iterator[List[str]]:
738 static_cmd_len = len(static_cmd)
739 remaining_len = max_args_byte_len
740 pending_args = list(static_cmd)
741 for arg in varargs:
742 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
743 remaining_len -= arg_len
744 if not remaining_len:
745 if len(pending_args) <= static_cmd_len:
746 raise ValueError(
747 f"Could not fit a single argument into the command line !?"
748 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
749 )
750 yield pending_args
751 remaining_len = max_args_byte_len - arg_len
752 if reuse_list_ok:
753 pending_args.clear()
754 pending_args.extend(static_cmd)
755 else:
756 pending_args = list(static_cmd)
757 pending_args.append(arg)
759 if len(pending_args) > static_cmd_len:
760 yield pending_args
763def xargs(
764 static_cmd: Sequence[str],
765 varargs: Iterable[str],
766 *,
767 env: Optional[Mapping[str, str]] = None,
768 reuse_list_ok: bool = False,
769) -> Iterator[List[str]]:
770 max_args_bytes = _sc_arg_max()
771 # len overshoots with one space explaining the -1. The _split_xargs_args
772 # will account for the space for the first argument
773 static_byte_len = (
774 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
775 )
776 if max_args_bytes is not None:
777 if env is None:
778 # +2 for nul bytes after key and value
779 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
780 else:
781 # +2 for nul bytes after key and value
782 static_byte_len += sum(
783 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
784 for k, v in env.items()
785 )
786 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
787 # that)
788 static_byte_len += 2 * 4096
789 else:
790 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
791 max_args_bytes = 20_000
792 remain_len = max_args_bytes - static_byte_len
793 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
796# itertools recipe
797def grouper(
798 iterable: Iterable[T],
799 n: int,
800 *,
801 incomplete: Literal["fill", "strict", "ignore"] = "fill",
802 fillvalue: Optional[T] = None,
803) -> Iterator[Tuple[T, ...]]:
804 """Collect data into non-overlapping fixed-length chunks or blocks"""
805 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
806 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
807 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
808 args = [iter(iterable)] * n
809 if incomplete == "fill": 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true
810 return zip_longest(*args, fillvalue=fillvalue)
811 if incomplete == "strict": 811 ↛ 813line 811 didn't jump to line 813 because the condition on line 811 was always true
812 return zip(*args, strict=True)
813 if incomplete == "ignore":
814 return zip(*args)
815 else:
816 raise ValueError("Expected fill, strict, or ignore")
819_LOGGING_SET_UP = False
822def _check_color() -> Tuple[bool, bool, Optional[str]]:
823 dpkg_or_default = os.environ.get(
824 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
825 )
826 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
827 bad_request = None
828 if requested_color not in {"auto", "always", "never"}: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true
829 bad_request = requested_color
830 requested_color = "auto"
832 if requested_color == "auto": 832 ↛ 836line 832 didn't jump to line 836 because the condition on line 832 was always true
833 stdout_color = sys.stdout.isatty()
834 stderr_color = sys.stdout.isatty()
835 else:
836 enable = requested_color == "always"
837 stdout_color = enable
838 stderr_color = enable
839 return stdout_color, stderr_color, bad_request
842def program_name() -> str:
843 name = os.path.basename(sys.argv[0])
844 if name.endswith(".py"): 844 ↛ 845line 844 didn't jump to line 845 because the condition on line 844 was never true
845 name = name[:-3]
846 if name == "__main__": 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true
847 name = os.path.basename(os.path.dirname(sys.argv[0]))
848 # FIXME: Not optimal that we have to hardcode these kind of things here
849 if name == "debputy_cmd": 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true
850 name = "debputy"
851 return name
854def package_cross_check_precheck(
855 pkg_a: "BinaryPackage",
856 pkg_b: "BinaryPackage",
857) -> Tuple[bool, bool]:
858 """Whether these two packages can do content cross-checks
860 :param pkg_a: The first package
861 :param pkg_b: The second package
862 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
863 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
864 that involves binary_package_a. Both can be True and both can be False at the same time, which
865 happens in common cases (arch:all + arch:any cases both to be False as a common example).
866 """
868 # Handle the two most obvious base-cases
869 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
870 return False, False
871 if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
872 return False, False
874 a_may_see_b = True
875 b_may_see_a = True
877 a_bp = pkg_a.fields.get("Build-Profiles", "")
878 b_bp = pkg_b.fields.get("Build-Profiles", "")
880 if a_bp != b_bp:
881 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
882 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
884 # Check for build profiles being identically but just ordered differently.
885 if a_bp_set != b_bp_set:
886 # For simplicity, we let groups cancel each other out. If one side has no clauses
887 # left, then it will always be built when the other is built.
888 #
889 # Eventually, someone will be here with a special case where more complex logic is
890 # required. Good luck to you! Remember to add test cases for it (the existing logic
891 # has some for a reason and if the logic is going to be more complex, it will need
892 # tests cases to assert it fixes the problem and does not regress)
893 if a_bp_set - b_bp_set:
894 a_may_see_b = False
895 if b_bp_set - a_bp_set:
896 b_may_see_a = False
898 if pkg_a.declared_architecture != pkg_b.declared_architecture:
899 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
900 if pkg_a.declared_architecture != "any": 900 ↛ 902line 900 didn't jump to line 902 because the condition on line 900 was always true
901 b_may_see_a = False
902 if pkg_a.declared_architecture != "any": 902 ↛ 905line 902 didn't jump to line 905 because the condition on line 902 was always true
903 a_may_see_b = False
905 return a_may_see_b, b_may_see_a
908def change_log_level(
909 log_level: int,
910) -> None:
911 if _DEFAULT_LOGGER is not None:
912 _DEFAULT_LOGGER.setLevel(log_level)
913 logging.getLogger("").setLevel(log_level)
916def current_log_level() -> Optional[int]:
917 if _DEFAULT_LOGGER is not None:
918 return _DEFAULT_LOGGER.level
919 return None
922def setup_logging(
923 *,
924 log_only_to_stderr: bool = False,
925 reconfigure_logging: bool = False,
926) -> None:
927 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
928 if _LOGGING_SET_UP and not reconfigure_logging: 928 ↛ 929line 928 didn't jump to line 929 because the condition on line 928 was never true
929 raise RuntimeError(
930 "Logging has already been configured."
931 " Use reconfigure_logging=True if you need to reconfigure it"
932 )
933 stdout_color, stderr_color, bad_request = _check_color()
934 colors: Optional[Dict[str, str]] = None
936 if stdout_color or stderr_color: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true
937 try:
938 import colorlog
940 except ImportError:
941 stdout_color = False
942 stderr_color = False
943 else:
944 colors = dict(colorlog.default_log_colors)
945 # Add our custom levels.
946 colors["_INFO"] = colors["INFO"]
947 colors["__INFO"] = colors["INFO"]
949 if log_only_to_stderr:
950 stdout = sys.stderr
951 stdout_color = stderr_color
952 else:
953 stdout = sys.stderr
955 class LogLevelFilter(logging.Filter):
956 def __init__(self, threshold: int, above: bool):
957 super().__init__()
958 self.threshold = threshold
959 self.above = above
961 def filter(self, record: logging.LogRecord) -> bool:
962 if self.above:
963 return record.levelno >= self.threshold
964 else:
965 return record.levelno < self.threshold
967 color_format = (
968 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
969 )
970 colorless_format = "{name}: {levelnamelower}: {message}"
972 existing_stdout_handler = _STDOUT_HANDLER
973 existing_stderr_handler = _STDERR_HANDLER
975 if stdout_color: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true
976 stdout_handler = colorlog.StreamHandler(stdout)
977 stdout_handler.setFormatter(
978 colorlog.ColoredFormatter(
979 color_format,
980 style="{",
981 force_color=True,
982 log_colors=colors,
983 )
984 )
985 logger = colorlog.getLogger()
986 if existing_stdout_handler is not None:
987 logger.removeHandler(existing_stdout_handler)
988 _STDOUT_HANDLER = stdout_handler
989 logger.addHandler(stdout_handler)
990 else:
991 stdout_handler = logging.StreamHandler(stdout)
992 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
993 logger = logging.getLogger()
994 if existing_stdout_handler is not None:
995 logger.removeHandler(existing_stdout_handler)
996 _STDOUT_HANDLER = stdout_handler
997 logger.addHandler(stdout_handler)
999 if stderr_color: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true
1000 stderr_handler = colorlog.StreamHandler(sys.stderr)
1001 stderr_handler.setFormatter(
1002 colorlog.ColoredFormatter(
1003 color_format,
1004 style="{",
1005 force_color=True,
1006 log_colors=colors,
1007 )
1008 )
1009 logger = logging.getLogger()
1010 if existing_stderr_handler is not None:
1011 logger.removeHandler(existing_stderr_handler)
1012 _STDERR_HANDLER = stderr_handler
1013 logger.addHandler(stderr_handler)
1014 else:
1015 stderr_handler = logging.StreamHandler(sys.stderr)
1016 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1017 logger = logging.getLogger()
1018 if existing_stderr_handler is not None:
1019 logger.removeHandler(existing_stderr_handler)
1020 _STDERR_HANDLER = stderr_handler
1021 logger.addHandler(stderr_handler)
1023 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
1024 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
1026 name = program_name()
1028 old_factory = logging.getLogRecordFactory()
1030 def record_factory(
1031 *args: Any, **kwargs: Any
1032 ) -> logging.LogRecord: # pragma: no cover
1033 record = old_factory(*args, **kwargs)
1034 record.levelname = record.levelname.lstrip("_")
1035 record.levelnamelower = record.levelname.lower()
1036 return record
1038 logging.setLogRecordFactory(record_factory)
1040 logging.getLogger().setLevel(logging.WARN)
1041 _DEFAULT_LOGGER = logging.getLogger(name)
1043 if bad_request: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true
1044 _DEFAULT_LOGGER.warning(
1045 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
1046 ' Resetting to "auto".'
1047 )
1049 _LOGGING_SET_UP = True