Coverage for src/debputy/util.py: 61%
548 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import argparse
2import collections
3import functools
4import glob
5import logging
6import os
7import re
8import shutil
9import subprocess
10import sys
11import time
12from itertools import zip_longest
13from pathlib import Path
14from typing import (
15 NoReturn,
16 TYPE_CHECKING,
17 Union,
18 Set,
19 FrozenSet,
20 Optional,
21 TypeVar,
22 Dict,
23 Iterator,
24 Iterable,
25 Literal,
26 Tuple,
27 Sequence,
28 List,
29 Mapping,
30 Any,
31)
33from debian.deb822 import Deb822
35from debputy import DEBPUTY_DOC_ROOT_DIR
36from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
37from debputy.exceptions import DebputySubstitutionError
39try:
40 from Levenshtein import distance
41except ImportError:
43 CAN_DETECT_TYPOS = False
45 def detect_possible_typo(
46 provided_value: str,
47 known_values: Iterable[str],
48 *,
49 max_edit_distance: int = 2,
50 ) -> Sequence[str]:
51 return ()
53else:
55 CAN_DETECT_TYPOS = True
57 def detect_possible_typo(
58 provided_value: str,
59 known_values: Iterable[str],
60 *,
61 max_edit_distance: int = 2,
62 ) -> Sequence[str]:
63 k_len = len(provided_value)
64 candidates = []
65 for known_value in known_values:
66 if abs(k_len - len(known_value)) > max_edit_distance:
67 continue
68 d = distance(provided_value, known_value)
69 if d > max_edit_distance:
70 continue
71 candidates.append(known_value)
72 return candidates
75if TYPE_CHECKING:
76 from debputy.types import EnvironmentModification
77 from debputy.packages import BinaryPackage
78 from debputy.substitution import Substitution
81T = TypeVar("T")
84SLASH_PRUNE = re.compile("//+")
85PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
86PKGVERSION_REGEX = re.compile(
87 r"""
88 (?: \d+ : )? # Optional epoch
89 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
90 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
91""",
92 re.VERBOSE | re.ASCII,
93)
94DEFAULT_PACKAGE_TYPE = "deb"
95DBGSYM_PACKAGE_TYPE = "deb"
96UDEB_PACKAGE_TYPE = "udeb"
98POSTINST_DEFAULT_CONDITION = (
99 '[ "$1" = "configure" ]'
100 ' || [ "$1" = "abort-upgrade" ]'
101 ' || [ "$1" = "abort-deconfigure" ]'
102 ' || [ "$1" = "abort-remove" ]'
103)
106_SPACE_RE = re.compile(r"\s")
107_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=")
108_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
109_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""")
110_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
111_DEFAULT_LOGGER: Optional[logging.Logger] = None
112_STDOUT_HANDLER: Optional[logging.StreamHandler[Any]] = None
113_STDERR_HANDLER: Optional[logging.StreamHandler[Any]] = None
114PRINT_COMMAND = logging.INFO + 3
115PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3
116TRACE_LOG = logging.DEBUG + 3
118# Map them back to `INFO`. The names must be unique so the prefix is stripped.
119logging.addLevelName(PRINT_COMMAND, "__INFO")
120logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO")
121logging.addLevelName(TRACE_LOG, "TRACE")
124def assume_not_none(x: Optional[T]) -> T:
125 if x is None: # pragma: no cover
126 raise ValueError(
127 'Internal error: None was given, but the receiver assumed "not None" here'
128 )
129 return x
132def _non_verbose_info(msg: str) -> None:
133 global _DEFAULT_LOGGER
134 logger = _DEFAULT_LOGGER
135 if logger is not None:
136 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg)
139def _info(msg: str) -> None:
140 global _DEFAULT_LOGGER
141 logger = _DEFAULT_LOGGER
142 if logger:
143 logger.info(msg)
144 # No fallback print for info
147def _is_trace_log_enabled() -> bool:
148 global _DEFAULT_LOGGER
149 logger = _DEFAULT_LOGGER
150 return logger is not None and logger.isEnabledFor(TRACE_LOG)
153def _trace_log(msg: str) -> None:
154 global _DEFAULT_LOGGER
155 logger = _DEFAULT_LOGGER
156 if logger:
157 logger.log(TRACE_LOG, msg)
158 # No fallback print for this level
161def _is_debug_log_enabled() -> bool:
162 global _DEFAULT_LOGGER
163 logger = _DEFAULT_LOGGER
164 return logger is not None and logger.isEnabledFor(logging.DEBUG)
167def _debug_log(msg: str) -> None:
168 global _DEFAULT_LOGGER
169 logger = _DEFAULT_LOGGER
170 if logger:
171 logger.debug(msg)
172 # No fallback print for this level
175def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn":
176 global _DEFAULT_LOGGER
177 logger = _DEFAULT_LOGGER
178 if logger:
179 logger.error(msg)
180 else:
181 me = os.path.basename(sys.argv[0]) if prog is None else prog
182 print(
183 f"{me}: error: {msg}",
184 file=sys.stderr,
185 )
186 sys.exit(1)
189def _warn(msg: str, *, prog: Optional[str] = None) -> None:
190 global _DEFAULT_LOGGER
191 logger = _DEFAULT_LOGGER
192 if logger:
193 logger.warning(msg)
194 else:
195 me = os.path.basename(sys.argv[0]) if prog is None else prog
197 print(
198 f"{me}: warning: {msg}",
199 file=sys.stderr,
200 )
203class ColorizedArgumentParser(argparse.ArgumentParser):
204 def error(self, message: str) -> NoReturn:
205 self.print_usage(sys.stderr)
206 _error(message, prog=self.prog)
209def ensure_dir(path: str) -> None:
210 if not os.path.isdir(path): 210 ↛ exitline 210 didn't return from function 'ensure_dir' because the condition on line 210 was always true
211 os.makedirs(path, mode=0o755, exist_ok=True)
214def _clean_path(orig_p: str) -> str:
215 p = SLASH_PRUNE.sub("/", orig_p)
216 if "." in p: 216 ↛ 229line 216 didn't jump to line 229 because the condition on line 216 was always true
217 path_base = p
218 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
219 # of a normalized path to be a no-op.
220 if path_base.startswith("./"):
221 path_base = path_base[2:]
222 assert path_base
223 for segment in path_base.split("/"):
224 if segment in (".", ".."):
225 raise ValueError(
226 'Please provide paths that are normalized (i.e., no ".." or ".").'
227 f' Offending input "{orig_p}"'
228 )
229 return p
232def _normalize_path(path: str, with_prefix: bool = True) -> str:
233 path = path.strip("/")
234 if not path or path == ".": 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 return "."
236 if "//" in path or "." in path:
237 path = _clean_path(path)
238 if with_prefix ^ path.startswith("./"):
239 if with_prefix: 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true
240 path = "./" + path
241 else:
242 path = path[2:]
243 return path
246def _normalize_link_target(link_target: str) -> str:
247 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
248 result: List[str] = []
249 for segment in link_target.split("/"):
250 if segment in (".", ""):
251 # Ignore these - the empty string is generally a trailing slash
252 continue
253 if segment == "..":
254 # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
255 if result: 255 ↛ 249line 255 didn't jump to line 249 because the condition on line 255 was always true
256 result.pop()
257 else:
258 result.append(segment)
259 return "/".join(result)
262def manifest_format_doc(anchor: str) -> str:
263 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md"
264 return f"{manifest_format}#{anchor}" if anchor else manifest_format
267def _backslash_escape(m: re.Match[str]) -> str:
268 return "\\" + m.group(0)
271def _escape_shell_word(w: str) -> str:
272 if _SPACE_RE.search(w):
273 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None:
274 s = m.span(0)
275 assert s[0] == 0
276 prefix = w[0 : s[1]]
277 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :])
278 return f'{prefix}"{escaped_value}"'
279 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
280 return f'"{w}"'
281 return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
284def escape_shell(*args: str) -> str:
285 return " ".join(_escape_shell_word(w) for w in args)
288def render_command(
289 *args: str,
290 cwd: Optional[str] = None,
291 env_mod: Optional["EnvironmentModification"] = None,
292) -> str:
293 env_mod_prefix = ""
294 if env_mod:
295 env_mod_parts = []
296 if bool(env_mod.removals):
297 env_mod_parts.append("env")
298 if cwd is not None:
299 env_mod_parts.append(f"--chdir={escape_shell(cwd)}")
300 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals)
301 env_mod_parts.extend(
302 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements
303 )
305 chdir_prefix = ""
306 if cwd is not None and cwd != ".":
307 chdir_prefix = f"cd {escape_shell(cwd)} && "
308 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}"
311def print_command(
312 *args: str,
313 cwd: Optional[str] = None,
314 env_mod: Optional["EnvironmentModification"] = None,
315 print_at_log_level: int = PRINT_COMMAND,
316) -> None:
317 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true
318 return
320 rendered_cmd = render_command(
321 *args,
322 cwd=cwd,
323 env_mod=env_mod,
324 )
325 global _STDOUT_HANDLER
326 handler = _STDOUT_HANDLER
327 if handler is not None:
328 handler.flush()
329 # Ensure command is output immediately so it is hanging after its output.
330 # TODO: This should `file` in case something in debputy redirects stdout
331 # (nothing does that for now)
332 print(f" {rendered_cmd}")
333 sys.stdout.flush()
336def run_command(
337 *args: str,
338 cwd: Optional[str] = None,
339 env: Optional[Mapping[str, str]] = None,
340 env_mod: Optional["EnvironmentModification"] = None,
341 print_at_log_level: int = PRINT_COMMAND,
342 raise_file_not_found_on_missing_command: bool = False,
343) -> None:
344 print_command(
345 *args,
346 cwd=cwd,
347 env_mod=env_mod,
348 print_at_log_level=print_at_log_level,
349 )
350 if env_mod:
351 if env is None:
352 env = os.environ
353 env = env_mod.compute_env(env)
354 if env is os.environ:
355 env = None
356 try:
357 subprocess.check_call(args, cwd=cwd, env=env)
358 # At least "clean_logic.py" relies on catching FileNotFoundError
359 except KeyboardInterrupt:
360 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}")
361 except FileNotFoundError:
362 if raise_file_not_found_on_missing_command:
363 raise
364 if "/" in args[0]:
365 _error(f"Could not run {escape_shell(args[0])}: Path does not exist")
366 # Use the `command not found` to aid existing log pattern
367 _error(f"{escape_shell(args[0])}: command not found")
368 except subprocess.CalledProcessError as e:
369 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}")
372def run_build_system_command(
373 *args: str,
374 cwd: Optional[str] = None,
375 env: Optional[Mapping[str, str]] = None,
376 env_mod: Optional["EnvironmentModification"] = None,
377 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND,
378 raise_file_not_found_on_missing_command: bool = False,
379) -> None:
380 run_command(
381 *args,
382 cwd=cwd,
383 env=env,
384 env_mod=env_mod,
385 print_at_log_level=print_at_log_level,
386 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command,
387 )
390def debian_policy_normalize_symlink_target(
391 link_path: str,
392 link_target: str,
393 normalize_link_path: bool = False,
394) -> str:
395 if normalize_link_path:
396 link_path = _normalize_path(link_path)
397 elif not link_path.startswith("./"): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 raise ValueError("Link part was not normalized")
400 link_path = link_path[2:]
402 if not link_target.startswith("/"):
403 link_target = "/" + os.path.dirname(link_path) + "/" + link_target
405 link_path_parts = link_path.split("/")
406 link_target_parts = [
407 s for s in _normalize_link_target(link_target).split("/") if s != "."
408 ]
410 assert link_path_parts
412 if link_target_parts and link_path_parts[0] == link_target_parts[0]:
413 # Per Debian Policy, must be relative
415 # First determine the length of the overlap
416 common_segment_count = 1
417 shortest_path_length = min(len(link_target_parts), len(link_path_parts))
418 while (
419 common_segment_count < shortest_path_length
420 and link_target_parts[common_segment_count]
421 == link_path_parts[common_segment_count]
422 ):
423 common_segment_count += 1
425 if common_segment_count == shortest_path_length and len(
426 link_path_parts
427 ) - 1 == len(link_target_parts):
428 normalized_link_target = "."
429 else:
430 up_dir_count = len(link_path_parts) - 1 - common_segment_count
431 normalized_link_target_parts = []
432 if up_dir_count:
433 up_dir_part = "../" * up_dir_count
434 # We overshoot with a single '/', so rstrip it away
435 normalized_link_target_parts.append(up_dir_part.rstrip("/"))
436 # Add the relevant down parts
437 normalized_link_target_parts.extend(
438 link_target_parts[common_segment_count:]
439 )
441 normalized_link_target = "/".join(normalized_link_target_parts)
442 else:
443 # Per Debian Policy, must be absolute
444 normalized_link_target = "/" + "/".join(link_target_parts)
446 return normalized_link_target
449def has_glob_magic(pattern: str) -> bool:
450 return glob.has_magic(pattern) or "{" in pattern
453def glob_escape(replacement_value: str) -> str:
454 if not glob.has_magic(replacement_value) or "{" not in replacement_value:
455 return replacement_value
456 return (
457 replacement_value.replace("[", "[[]")
458 .replace("]", "[]]")
459 .replace("*", "[*]")
460 .replace("?", "[?]")
461 .replace("{", "[{]")
462 .replace("}", "[}]")
463 )
466# TODO: This logic should probably be moved to `python-debian`
467def active_profiles_match(
468 profiles_raw: str,
469 active_build_profiles: Union[Set[str], FrozenSet[str]],
470) -> bool:
471 profiles_raw = profiles_raw.strip()
472 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 raise ValueError(
474 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
475 )
476 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
477 for profile_group_raw in profile_groups: 477 ↛ 493line 477 didn't jump to line 493 because the loop on line 477 didn't complete
478 should_process_package = True
479 for profile_name in profile_group_raw.split():
480 negation = False
481 if profile_name[0] == "!": 481 ↛ 485line 481 didn't jump to line 485 because the condition on line 481 was always true
482 negation = True
483 profile_name = profile_name[1:]
485 matched_profile = profile_name in active_build_profiles
486 if matched_profile == negation: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 should_process_package = False
488 break
490 if should_process_package: 490 ↛ 477line 490 didn't jump to line 477 because the condition on line 490 was always true
491 return True
493 return False
496def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]:
497 profiles_raw = build_profiles_raw.strip()
498 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise ValueError(
500 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
501 )
502 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
503 return frozenset(frozenset(g.split()) for g in profile_groups)
506def resolve_source_date_epoch(
507 command_line_value: Optional[int],
508 *,
509 substitution: Optional["Substitution"] = None,
510) -> int:
511 mtime = command_line_value
512 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
513 sde_raw = os.environ["SOURCE_DATE_EPOCH"]
514 if sde_raw == "":
515 _error("SOURCE_DATE_EPOCH is set but empty.")
516 mtime = int(sde_raw)
517 if mtime is None and substitution is not None:
518 try:
519 sde_raw = substitution.substitute(
520 "{{SOURCE_DATE_EPOCH}}",
521 "Internal resolution",
522 )
523 mtime = int(sde_raw)
524 except (DebputySubstitutionError, ValueError):
525 pass
526 if mtime is None:
527 mtime = int(time.time())
528 os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
529 return mtime
532def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
533 with open(os.path.join(control_root_dir, "control"), "rt") as fd:
534 control_file = Deb822(fd)
536 package_name = control_file["Package"]
537 package_version = control_file["Version"]
538 package_architecture = control_file["Architecture"]
539 extension = control_file.get("Package-Type") or "deb"
540 if ":" in package_version:
541 package_version = package_version.split(":", 1)[1]
542 if is_udeb:
543 extension = "udeb"
545 return f"{package_name}_{package_version}_{package_architecture}.{extension}"
548_SCRATCH_DIR = None
549_DH_INTEGRATION_MODE = False
552def integrated_with_debhelper() -> None:
553 global _DH_INTEGRATION_MODE
554 _DH_INTEGRATION_MODE = True
557def scratch_dir(*, create_if_not_exists: bool = True) -> str:
558 global _SCRATCH_DIR
559 if _SCRATCH_DIR is not None:
560 return _SCRATCH_DIR
561 debputy_scratch_dir = "debian/.debputy/scratch-dir"
562 is_debputy_dir = True
563 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true
564 _SCRATCH_DIR = debputy_scratch_dir
565 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
567 is_debputy_dir = False
568 else:
569 _SCRATCH_DIR = debputy_scratch_dir
570 if create_if_not_exists: 570 ↛ 574line 570 didn't jump to line 574 because the condition on line 570 was always true
571 ensure_dir(_SCRATCH_DIR)
572 if is_debputy_dir: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was always true
573 generated_root_directory("debian/.debputy", internal_only=True)
574 return _SCRATCH_DIR
577def generated_root_directory(path: str, *, internal_only: bool = False) -> None:
578 root_dir = Path(path)
579 (root_dir / ".gitignore").write_text("*\n")
580 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule
581 if internal_only: 581 ↛ exitline 581 didn't return from function 'generated_root_directory' because the condition on line 581 was always true
582 (root_dir / "CACHEDIR.TAG").write_bytes(
583 b"Signature: 8a477f597d28d172789f06886806bc55"
584 )
587_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None
590def generated_content_dir(
591 *,
592 package: Optional["BinaryPackage"] = None,
593 subdir_key: Optional[str] = None,
594) -> str:
595 global _RUNTIME_CONTAINER_DIR_KEY
596 container_dir = _RUNTIME_CONTAINER_DIR_KEY
597 first_run = False
599 if container_dir is None:
600 first_run = True
601 container_dir = f"_pb-{os.getpid()}"
602 _RUNTIME_CONTAINER_DIR_KEY = container_dir
604 directory = os.path.join(scratch_dir(), container_dir)
606 if first_run and os.path.isdir(directory): 606 ↛ 611line 606 didn't jump to line 611 because the condition on line 606 was never true
607 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
608 # see "stale" data.
609 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
610 # reliable enough for that and we do not have an obvious hook for it.
611 shutil.rmtree(directory)
613 directory = os.path.join(
614 directory,
615 "generated-fs-content",
616 f"pkg_{package.name}" if package else "no-package",
617 )
618 if subdir_key is not None: 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true
619 directory = os.path.join(directory, subdir_key)
621 os.makedirs(directory, exist_ok=True)
622 return directory
625PerlConfigVars = collections.namedtuple(
626 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"]
627)
628PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
629_PERL_MODULE_DIRS: Dict[str, PerlConfigVars] = {}
632@functools.lru_cache(1)
633def _perl_config_data() -> PerlConfigData:
634 d = (
635 subprocess.check_output(
636 [
637 "perl",
638 "-MConfig",
639 "-e",
640 'print "$Config{version}\n$Config{debian_abi}\n"',
641 ]
642 )
643 .decode("utf-8")
644 .splitlines()
645 )
646 return PerlConfigData(*d)
649def _perl_version() -> str:
650 return _perl_config_data().version
653def perlxs_api_dependency() -> str:
654 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
655 # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
656 config = _perl_config_data()
657 if config.debian_abi is not None and config.debian_abi != "":
658 return f"perlapi-{config.debian_abi}"
659 return f"perlapi-{config.version}"
662def resolve_perl_config(
663 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
664 dctrl_bin: Optional["BinaryPackage"],
665) -> PerlConfigVars:
666 global _PERL_MODULE_DIRS
667 if dpkg_architecture_variables.is_cross_compiling: 667 ↛ 668line 667 didn't jump to line 668 because the condition on line 667 was never true
668 if dctrl_bin is not None:
669 arch = dctrl_bin.resolved_architecture
670 ma = dctrl_bin.deb_multiarch
671 else:
672 arch = dpkg_architecture_variables.current_host_arch
673 ma = dpkg_architecture_variables.current_host_multiarch
674 else:
675 # We are de facto using the build-arch version of perl here; be explicit
676 arch = "_build_arch_"
677 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"]
678 config_vars = _PERL_MODULE_DIRS.get(arch)
679 if config_vars is None:
680 cmd = ["perl"]
681 if dpkg_architecture_variables.is_cross_compiling: 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true
682 version = _perl_version()
683 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}"
684 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
685 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
686 # cross builds... meh.
687 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")):
688 cmd.append(f"-I{cross_inc_dir}")
689 else:
690 cross_inc_dir = None
691 cmd.extend(
692 [
693 "-MConfig",
694 "-e",
695 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"',
696 ]
697 )
698 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
699 if len(output) != 4: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 raise ValueError(
701 "Internal error: Unable to determine the perl include directories:"
702 f" Raw output from perl snippet: {output}"
703 )
704 config_vars = PerlConfigVars(
705 vendorlib="/" + _normalize_path(output[0], with_prefix=False),
706 vendorarch="/" + _normalize_path(output[1], with_prefix=False),
707 cross_inc_dir=cross_inc_dir,
708 ld=output[2],
709 path_sep=output[3],
710 )
711 _PERL_MODULE_DIRS[arch] = config_vars
712 return config_vars
715@functools.lru_cache(1)
716def detect_fakeroot() -> bool:
717 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
718 return False
719 env = dict(os.environ)
720 del env["LD_PRELOAD"]
721 try:
722 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
723 except subprocess.CalledProcessError:
724 print(
725 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
726 file=sys.stderr,
727 )
728 return False
731@functools.lru_cache(1)
732def _sc_arg_max() -> Optional[int]:
733 try:
734 return os.sysconf("SC_ARG_MAX")
735 except RuntimeError:
736 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
737 return None
740def _split_xargs_args(
741 static_cmd: Sequence[str],
742 max_args_byte_len: int,
743 varargs: Iterable[str],
744 reuse_list_ok: bool,
745) -> Iterator[List[str]]:
746 static_cmd_len = len(static_cmd)
747 remaining_len = max_args_byte_len
748 pending_args = list(static_cmd)
749 for arg in varargs:
750 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
751 remaining_len -= arg_len
752 if not remaining_len:
753 if len(pending_args) <= static_cmd_len:
754 raise ValueError(
755 f"Could not fit a single argument into the command line !?"
756 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
757 )
758 yield pending_args
759 remaining_len = max_args_byte_len - arg_len
760 if reuse_list_ok:
761 pending_args.clear()
762 pending_args.extend(static_cmd)
763 else:
764 pending_args = list(static_cmd)
765 pending_args.append(arg)
767 if len(pending_args) > static_cmd_len:
768 yield pending_args
771def xargs(
772 static_cmd: Sequence[str],
773 varargs: Iterable[str],
774 *,
775 env: Optional[Mapping[str, str]] = None,
776 reuse_list_ok: bool = False,
777) -> Iterator[List[str]]:
778 max_args_bytes = _sc_arg_max()
779 # len overshoots with one space explaining the -1. The _split_xargs_args
780 # will account for the space for the first argument
781 static_byte_len = (
782 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
783 )
784 if max_args_bytes is not None:
785 if env is None:
786 # +2 for nul bytes after key and value
787 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
788 else:
789 # +2 for nul bytes after key and value
790 static_byte_len += sum(
791 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
792 for k, v in env.items()
793 )
794 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
795 # that)
796 static_byte_len += 2 * 4096
797 else:
798 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
799 max_args_bytes = 20_000
800 remain_len = max_args_bytes - static_byte_len
801 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
804# itertools recipe
805def grouper(
806 iterable: Iterable[T],
807 n: int,
808 *,
809 incomplete: Literal["fill", "strict", "ignore"] = "fill",
810 fillvalue: Optional[T] = None,
811) -> Iterator[Tuple[T, ...]]:
812 """Collect data into non-overlapping fixed-length chunks or blocks"""
813 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
814 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
815 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
816 args = [iter(iterable)] * n
817 if incomplete == "fill": 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true
818 return zip_longest(*args, fillvalue=fillvalue)
819 if incomplete == "strict": 819 ↛ 821line 819 didn't jump to line 821 because the condition on line 819 was always true
820 return zip(*args, strict=True)
821 if incomplete == "ignore":
822 return zip(*args)
823 else:
824 raise ValueError("Expected fill, strict, or ignore")
827_LOGGING_SET_UP = False
830def _check_color() -> Tuple[bool, bool, Optional[str]]:
831 dpkg_or_default = os.environ.get(
832 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
833 )
834 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
835 bad_request = None
836 if requested_color not in {"auto", "always", "never"}: 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true
837 bad_request = requested_color
838 requested_color = "auto"
840 if requested_color == "auto": 840 ↛ 844line 840 didn't jump to line 844 because the condition on line 840 was always true
841 stdout_color = sys.stdout.isatty()
842 stderr_color = sys.stdout.isatty()
843 else:
844 enable = requested_color == "always"
845 stdout_color = enable
846 stderr_color = enable
847 return stdout_color, stderr_color, bad_request
850def program_name() -> str:
851 name = os.path.basename(sys.argv[0])
852 if name.endswith(".py"): 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 name = name[:-3]
854 if name == "__main__": 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true
855 name = os.path.basename(os.path.dirname(sys.argv[0]))
856 # FIXME: Not optimal that we have to hardcode these kind of things here
857 if name == "debputy_cmd": 857 ↛ 858line 857 didn't jump to line 858 because the condition on line 857 was never true
858 name = "debputy"
859 return name
862def package_cross_check_precheck(
863 pkg_a: "BinaryPackage",
864 pkg_b: "BinaryPackage",
865) -> Tuple[bool, bool]:
866 """Whether these two packages can do content cross-checks
868 :param pkg_a: The first package
869 :param pkg_b: The second package
870 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
871 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
872 that involves binary_package_a. Both can be True and both can be False at the same time, which
873 happens in common cases (arch:all + arch:any cases both to be False as a common example).
874 """
876 # Handle the two most obvious base-cases
877 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
878 return False, False
879 if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
880 return False, False
882 a_may_see_b = True
883 b_may_see_a = True
885 a_bp = pkg_a.fields.get("Build-Profiles", "")
886 b_bp = pkg_b.fields.get("Build-Profiles", "")
888 if a_bp != b_bp:
889 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
890 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
892 # Check for build profiles being identically but just ordered differently.
893 if a_bp_set != b_bp_set:
894 # For simplicity, we let groups cancel each other out. If one side has no clauses
895 # left, then it will always be built when the other is built.
896 #
897 # Eventually, someone will be here with a special case where more complex logic is
898 # required. Good luck to you! Remember to add test cases for it (the existing logic
899 # has some for a reason and if the logic is going to be more complex, it will need
900 # tests cases to assert it fixes the problem and does not regress)
901 if a_bp_set - b_bp_set:
902 a_may_see_b = False
903 if b_bp_set - a_bp_set:
904 b_may_see_a = False
906 if pkg_a.declared_architecture != pkg_b.declared_architecture:
907 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
908 if pkg_a.declared_architecture != "any": 908 ↛ 910line 908 didn't jump to line 910 because the condition on line 908 was always true
909 b_may_see_a = False
910 if pkg_a.declared_architecture != "any": 910 ↛ 913line 910 didn't jump to line 913 because the condition on line 910 was always true
911 a_may_see_b = False
913 return a_may_see_b, b_may_see_a
916def change_log_level(
917 log_level: int,
918) -> None:
919 if _DEFAULT_LOGGER is not None:
920 _DEFAULT_LOGGER.setLevel(log_level)
921 logging.getLogger("").setLevel(log_level)
924def current_log_level() -> Optional[int]:
925 if _DEFAULT_LOGGER is not None:
926 return _DEFAULT_LOGGER.level
927 return None
930def setup_logging(
931 *,
932 log_only_to_stderr: bool = False,
933 reconfigure_logging: bool = False,
934) -> None:
935 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
936 if _LOGGING_SET_UP and not reconfigure_logging: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true
937 raise RuntimeError(
938 "Logging has already been configured."
939 " Use reconfigure_logging=True if you need to reconfigure it"
940 )
941 stdout_color, stderr_color, bad_request = _check_color()
942 colors: Optional[Dict[str, str]] = None
944 if stdout_color or stderr_color: 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true
945 try:
946 import colorlog
948 except ImportError:
949 stdout_color = False
950 stderr_color = False
951 else:
952 colors = dict(colorlog.default_log_colors)
953 # Add our custom levels.
954 colors["_INFO"] = colors["INFO"]
955 colors["__INFO"] = colors["INFO"]
957 if log_only_to_stderr:
958 stdout = sys.stderr
959 stdout_color = stderr_color
960 else:
961 stdout = sys.stderr
963 class LogLevelFilter(logging.Filter):
964 def __init__(self, threshold: int, above: bool):
965 super().__init__()
966 self.threshold = threshold
967 self.above = above
969 def filter(self, record: logging.LogRecord) -> bool:
970 if self.above:
971 return record.levelno >= self.threshold
972 else:
973 return record.levelno < self.threshold
975 color_format = (
976 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
977 )
978 colorless_format = "{name}: {levelnamelower}: {message}"
980 existing_stdout_handler = _STDOUT_HANDLER
981 existing_stderr_handler = _STDERR_HANDLER
983 if stdout_color: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true
984 stdout_handler = colorlog.StreamHandler(stdout)
985 stdout_handler.setFormatter(
986 colorlog.ColoredFormatter(
987 color_format,
988 style="{",
989 force_color=True,
990 log_colors=colors,
991 )
992 )
993 logger = colorlog.getLogger()
994 if existing_stdout_handler is not None:
995 logger.removeHandler(existing_stdout_handler)
996 _STDOUT_HANDLER = stdout_handler
997 logger.addHandler(stdout_handler)
998 else:
999 stdout_handler = logging.StreamHandler(stdout)
1000 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1001 logger = logging.getLogger()
1002 if existing_stdout_handler is not None:
1003 logger.removeHandler(existing_stdout_handler)
1004 _STDOUT_HANDLER = stdout_handler
1005 logger.addHandler(stdout_handler)
1007 if stderr_color: 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true
1008 stderr_handler = colorlog.StreamHandler(sys.stderr)
1009 stderr_handler.setFormatter(
1010 colorlog.ColoredFormatter(
1011 color_format,
1012 style="{",
1013 force_color=True,
1014 log_colors=colors,
1015 )
1016 )
1017 logger = logging.getLogger()
1018 if existing_stderr_handler is not None:
1019 logger.removeHandler(existing_stderr_handler)
1020 _STDERR_HANDLER = stderr_handler
1021 logger.addHandler(stderr_handler)
1022 else:
1023 stderr_handler = logging.StreamHandler(sys.stderr)
1024 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
1025 logger = logging.getLogger()
1026 if existing_stderr_handler is not None:
1027 logger.removeHandler(existing_stderr_handler)
1028 _STDERR_HANDLER = stderr_handler
1029 logger.addHandler(stderr_handler)
1031 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
1032 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
1034 name = program_name()
1036 old_factory = logging.getLogRecordFactory()
1038 def record_factory(
1039 *args: Any, **kwargs: Any
1040 ) -> logging.LogRecord: # pragma: no cover
1041 record = old_factory(*args, **kwargs)
1042 record.levelname = record.levelname.lstrip("_")
1043 record.levelnamelower = record.levelname.lower()
1044 return record
1046 logging.setLogRecordFactory(record_factory)
1048 logging.getLogger().setLevel(logging.WARN)
1049 _DEFAULT_LOGGER = logging.getLogger(name)
1051 if bad_request: 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true
1052 _DEFAULT_LOGGER.warning(
1053 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
1054 ' Resetting to "auto".'
1055 )
1057 _LOGGING_SET_UP = True