Coverage for src/debputy/util.py: 67%

579 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-26 19:30 +0000

1import argparse 

2import collections 

3import enum 

4import functools 

5import glob 

6import logging 

7import os 

8import re 

9import shutil 

10import subprocess 

11import sys 

12import time 

13from itertools import zip_longest 

14from pathlib import Path 

15from typing import NoReturn, TYPE_CHECKING, Optional, TypeVar, Literal, Any 

16from collections.abc import Iterator, Iterable, Sequence, Mapping 

17 

18from debian.deb822 import Deb822 

19 

20from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

21from debputy.exceptions import DebputySubstitutionError 

22from debputy.version import debputy_doc_root_dir 

23 

24try: 

25 from Levenshtein import distance 

26except ImportError: 

27 

28 CAN_DETECT_TYPOS = False 

29 

30 def detect_possible_typo( 

31 provided_value: str, 

32 known_values: Iterable[str], 

33 *, 

34 max_edit_distance: int = 2, 

35 ) -> Sequence[str]: 

36 return () 

37 

38else: 

39 

40 CAN_DETECT_TYPOS = True 

41 

42 def detect_possible_typo( 

43 provided_value: str, 

44 known_values: Iterable[str], 

45 *, 

46 max_edit_distance: int = 2, 

47 ) -> Sequence[str]: 

48 k_len = len(provided_value) 

49 candidates = [] 

50 for known_value in known_values: 

51 if abs(k_len - len(known_value)) > max_edit_distance: 

52 continue 

53 d = distance(provided_value, known_value) 

54 if d > max_edit_distance: 

55 continue 

56 candidates.append(known_value) 

57 return candidates 

58 

59 

60if TYPE_CHECKING: 

61 from debputy.types import EnvironmentModification 

62 from debputy.packages import BinaryPackage 

63 from debputy.substitution import Substitution 

64 

65 

66T = TypeVar("T") 

67 

68 

69SLASH_PRUNE = re.compile("//+") 

70PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

71PKGVERSION_REGEX = re.compile( 

72 r""" 

73 (?: \d+ : )? # Optional epoch 

74 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

75 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

76""", 

77 re.VERBOSE | re.ASCII, 

78) 

79 

80 

81class PackageTypeSelector(enum.Flag): 

82 DEB = enum.auto() 

83 UDEB = enum.auto() 

84 Singleton = Literal[DEB, UDEB] 

85 ALL = DEB | UDEB 

86 

87 def __str__(self) -> str: 

88 # 0 or ALL have no name, but only singletons are enumerated. 

89 l = [] 

90 for p in self: 

91 name = p.name 

92 assert name is not None, repr(p) 

93 l.append(name.lower()) 

94 return ", ".join(l) 

95 

96 @staticmethod 

97 def singleton(s: str) -> Singleton: 

98 """Like PackageTypeSelector[s.lower()], but typed as a 

99 singleton and rejecting 'DeB' and 'ALL'.""" 

100 match s: 

101 case "deb": 

102 return PackageTypeSelector.DEB 

103 case "udeb": 103 ↛ 105line 103 didn't jump to line 105 because the pattern on line 103 always matched

104 return PackageTypeSelector.UDEB 

105 case _: 

106 raise KeyError(s) 

107 

108 

109POSTINST_DEFAULT_CONDITION = ( 

110 '[ "$1" = "configure" ]' 

111 ' || [ "$1" = "abort-upgrade" ]' 

112 ' || [ "$1" = "abort-deconfigure" ]' 

113 ' || [ "$1" = "abort-remove" ]' 

114) 

115 

116 

117_SPACE_RE = re.compile(r"\s") 

118_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=") 

119_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

120_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""") 

121_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

122_DEFAULT_LOGGER: logging.Logger | None = None 

123_STDOUT_HANDLER: logging.StreamHandler[Any] | None = None 

124_STDERR_HANDLER: logging.StreamHandler[Any] | None = None 

125PRINT_COMMAND = logging.INFO + 3 

126PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3 

127TRACE_LOG = logging.DEBUG + 3 

128 

129# Map them back to `INFO`. The names must be unique so the prefix is stripped. 

130logging.addLevelName(PRINT_COMMAND, "__INFO") 

131logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO") 

132logging.addLevelName(TRACE_LOG, "TRACE") 

133 

134 

135def assume_not_none(x: T | None) -> T: 

136 if x is None: # pragma: no cover 

137 raise ValueError( 

138 'Internal error: None was given, but the receiver assumed "not None" here' 

139 ) 

140 return x 

141 

142 

143def _non_verbose_info(msg: str) -> None: 

144 global _DEFAULT_LOGGER 

145 logger = _DEFAULT_LOGGER 

146 if logger is not None: 

147 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg) 

148 

149 

150def _info(msg: str) -> None: 

151 global _DEFAULT_LOGGER 

152 logger = _DEFAULT_LOGGER 

153 if logger: 

154 logger.info(msg) 

155 # No fallback print for info 

156 

157 

158def _is_trace_log_enabled() -> bool: 

159 global _DEFAULT_LOGGER 

160 logger = _DEFAULT_LOGGER 

161 return logger is not None and logger.isEnabledFor(TRACE_LOG) 

162 

163 

164def _trace_log(msg: str) -> None: 

165 global _DEFAULT_LOGGER 

166 logger = _DEFAULT_LOGGER 

167 if logger: 

168 logger.log(TRACE_LOG, msg) 

169 # No fallback print for this level 

170 

171 

172def _is_debug_log_enabled() -> bool: 

173 global _DEFAULT_LOGGER 

174 logger = _DEFAULT_LOGGER 

175 return logger is not None and logger.isEnabledFor(logging.DEBUG) 

176 

177 

178def _debug_log(msg: str) -> None: 

179 global _DEFAULT_LOGGER 

180 logger = _DEFAULT_LOGGER 

181 if logger: 

182 logger.debug(msg) 

183 # No fallback print for this level 

184 

185 

186def _error(msg: str, *, prog: str | None = None) -> "NoReturn": 

187 global _DEFAULT_LOGGER 

188 logger = _DEFAULT_LOGGER 

189 if logger: 

190 logger.error(msg) 

191 else: 

192 me = os.path.basename(sys.argv[0]) if prog is None else prog 

193 print( 

194 f"{me}: error: {msg}", 

195 file=sys.stderr, 

196 ) 

197 sys.exit(1) 

198 

199 

200def _warn(msg: str, *, prog: str | None = None) -> None: 

201 global _DEFAULT_LOGGER 

202 logger = _DEFAULT_LOGGER 

203 if logger: 

204 logger.warning(msg) 

205 else: 

206 me = os.path.basename(sys.argv[0]) if prog is None else prog 

207 

208 print( 

209 f"{me}: warning: {msg}", 

210 file=sys.stderr, 

211 ) 

212 

213 

214class ColorizedArgumentParser(argparse.ArgumentParser): 

215 def error(self, message: str) -> NoReturn: 

216 self.print_usage(sys.stderr) 

217 _error(message, prog=self.prog) 

218 

219 

220def ensure_dir(path: str) -> None: 

221 if not os.path.isdir(path): 221 ↛ exitline 221 didn't return from function 'ensure_dir' because the condition on line 221 was always true

222 os.makedirs(path, mode=0o755, exist_ok=True) 

223 

224 

225def _clean_path(orig_p: str, allow_and_keep_upward_segments: bool = False) -> str: 

226 p = SLASH_PRUNE.sub("/", orig_p) 

227 if "." in p: 227 ↛ 250line 227 didn't jump to line 250 because the condition on line 227 was always true

228 path_base = p 

229 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

230 # of a normalized path to be a no-op. 

231 if path_base.startswith("./"): 

232 path_base = path_base[2:] 

233 assert path_base 

234 if allow_and_keep_upward_segments: 

235 stack = [] 

236 for segment in path_base.split("/"): 

237 if segment == ".": 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 continue 

239 stack.append(segment) 

240 p = "/".join(stack) 

241 if path_base.startswith("/"): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 p = "/" + p 

243 else: 

244 for segment in path_base.split("/"): 

245 if segment in (".", ".."): 

246 raise ValueError( 

247 'Please provide paths that are normalized (i.e., no ".." or ".").' 

248 f' Offending input "{orig_p}"' 

249 ) 

250 return p 

251 

252 

253def _normalize_path( 

254 path: str, 

255 with_prefix: bool = True, 

256 allow_and_keep_upward_segments: bool = False, 

257) -> str: 

258 path = path.strip("/") 

259 if not path or path == ".": 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 return "." 

261 if "//" in path or "." in path: 

262 path = _clean_path( 

263 path, 

264 allow_and_keep_upward_segments=allow_and_keep_upward_segments, 

265 ) 

266 if with_prefix ^ path.startswith("./"): 

267 if with_prefix: 267 ↛ 270line 267 didn't jump to line 270 because the condition on line 267 was always true

268 path = "./" + path 

269 else: 

270 path = path[2:] 

271 return path 

272 

273 

274def _normalize_link_target(link_target: str) -> str: 

275 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

276 result: list[str] = [] 

277 for segment in link_target.split("/"): 

278 if segment in (".", ""): 

279 # Ignore these - the empty string is generally a trailing slash 

280 continue 

281 if segment == "..": 

282 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

283 if result: 283 ↛ 277line 283 didn't jump to line 277 because the condition on line 283 was always true

284 result.pop() 

285 else: 

286 result.append(segment) 

287 return "/".join(result) 

288 

289 

290def manifest_format_doc(anchor: str) -> str: 

291 manifest_format = f"{debputy_doc_root_dir()}/MANIFEST-FORMAT.md" 

292 return f"{manifest_format}#{anchor}" if anchor else manifest_format 

293 

294 

295def _backslash_escape(m: re.Match[str]) -> str: 

296 return "\\" + m.group(0) 

297 

298 

299def _escape_shell_word(w: str) -> str: 

300 if _SPACE_RE.search(w): 

301 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None: 

302 s = m.span(0) 

303 assert s[0] == 0 

304 prefix = w[0 : s[1]] 

305 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :]) 

306 return f'{prefix}"{escaped_value}"' 

307 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

308 return f'"{w}"' 

309 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

310 

311 

312def escape_shell(*args: str) -> str: 

313 return " ".join(_escape_shell_word(w) for w in args) 

314 

315 

316def render_command( 

317 *args: str, 

318 cwd: str | None = None, 

319 env_mod: Optional["EnvironmentModification"] = None, 

320) -> str: 

321 env_mod_prefix = "" 

322 if env_mod: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 env_mod_parts = [] 

324 if bool(env_mod.removals): 

325 env_mod_parts.append("env") 

326 if cwd is not None: 

327 env_mod_parts.append(f"--chdir={escape_shell(cwd)}") 

328 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals) 

329 env_mod_parts.extend( 

330 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements 

331 ) 

332 

333 chdir_prefix = "" 

334 if cwd is not None and cwd != ".": 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 chdir_prefix = f"cd {escape_shell(cwd)} && " 

336 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}" 

337 

338 

339def print_command( 

340 *args: str, 

341 cwd: str | None = None, 

342 env_mod: Optional["EnvironmentModification"] = None, 

343 print_at_log_level: int = PRINT_COMMAND, 

344) -> None: 

345 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 

346 return 

347 

348 rendered_cmd = render_command( 

349 *args, 

350 cwd=cwd, 

351 env_mod=env_mod, 

352 ) 

353 global _STDOUT_HANDLER 

354 handler = _STDOUT_HANDLER 

355 if handler is not None: 355 ↛ 360line 355 didn't jump to line 360 because the condition on line 355 was always true

356 handler.flush() 

357 # Ensure command is output immediately so it is hanging after its output. 

358 # TODO: This should `file` in case something in debputy redirects stdout 

359 # (nothing does that for now) 

360 print(f" {rendered_cmd}") 

361 sys.stdout.flush() 

362 

363 

364def run_command( 

365 *args: str, 

366 cwd: str | None = None, 

367 env: Mapping[str, str] | None = None, 

368 env_mod: Optional["EnvironmentModification"] = None, 

369 print_at_log_level: int = PRINT_COMMAND, 

370 raise_file_not_found_on_missing_command: bool = False, 

371) -> None: 

372 print_command( 

373 *args, 

374 cwd=cwd, 

375 env_mod=env_mod, 

376 print_at_log_level=print_at_log_level, 

377 ) 

378 if env_mod: 

379 if env is None: 

380 env = os.environ 

381 env = env_mod.compute_env(env) 

382 if env is os.environ: 

383 env = None 

384 try: 

385 subprocess.check_call(args, cwd=cwd, env=env) 

386 # At least "clean_logic.py" relies on catching FileNotFoundError 

387 except KeyboardInterrupt: 

388 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}") 

389 except FileNotFoundError: 

390 if raise_file_not_found_on_missing_command: 

391 raise 

392 if "/" in args[0]: 

393 _error(f"Could not run {escape_shell(args[0])}: Path does not exist") 

394 # Use the `command not found` to aid existing log pattern 

395 _error(f"{escape_shell(args[0])}: command not found") 

396 except subprocess.CalledProcessError as e: 

397 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}") 

398 

399 

400def run_build_system_command( 

401 *args: str, 

402 cwd: str | None = None, 

403 env: Mapping[str, str] | None = None, 

404 env_mod: Optional["EnvironmentModification"] = None, 

405 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND, 

406 raise_file_not_found_on_missing_command: bool = False, 

407) -> None: 

408 run_command( 

409 *args, 

410 cwd=cwd, 

411 env=env, 

412 env_mod=env_mod, 

413 print_at_log_level=print_at_log_level, 

414 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command, 

415 ) 

416 

417 

418def debian_policy_normalize_symlink_target( 

419 link_path: str, 

420 link_target: str, 

421 normalize_link_path: bool = False, 

422) -> str: 

423 if normalize_link_path: 

424 link_path = _normalize_path(link_path) 

425 elif not link_path.startswith("./"): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 raise ValueError("Link part was not normalized") 

427 

428 link_path = link_path[2:] 

429 

430 if not link_target.startswith("/"): 

431 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

432 

433 link_path_parts = link_path.split("/") 

434 link_target_parts = [ 

435 s for s in _normalize_link_target(link_target).split("/") if s != "." 

436 ] 

437 

438 assert link_path_parts 

439 

440 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

441 # Per Debian Policy, must be relative 

442 

443 # First determine the length of the overlap 

444 common_segment_count = 1 

445 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

446 while ( 

447 common_segment_count < shortest_path_length 

448 and link_target_parts[common_segment_count] 

449 == link_path_parts[common_segment_count] 

450 ): 

451 common_segment_count += 1 

452 

453 if common_segment_count == shortest_path_length and len( 

454 link_path_parts 

455 ) - 1 == len(link_target_parts): 

456 normalized_link_target = "." 

457 else: 

458 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

459 normalized_link_target_parts = [] 

460 if up_dir_count: 

461 up_dir_part = "../" * up_dir_count 

462 # We overshoot with a single '/', so rstrip it away 

463 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

464 # Add the relevant down parts 

465 normalized_link_target_parts.extend( 

466 link_target_parts[common_segment_count:] 

467 ) 

468 

469 normalized_link_target = "/".join(normalized_link_target_parts) 

470 else: 

471 # Per Debian Policy, must be absolute 

472 normalized_link_target = "/" + "/".join(link_target_parts) 

473 

474 return normalized_link_target 

475 

476 

477def has_glob_magic(pattern: str) -> bool: 

478 return glob.has_magic(pattern) or "{" in pattern 

479 

480 

481def glob_escape(replacement_value: str) -> str: 

482 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

483 return replacement_value 

484 return ( 

485 replacement_value.replace("[", "[[]") 

486 .replace("]", "[]]") 

487 .replace("*", "[*]") 

488 .replace("?", "[?]") 

489 .replace("{", "[{]") 

490 .replace("}", "[}]") 

491 ) 

492 

493 

494# TODO: This logic should probably be moved to `python-debian` 

495# See bug1120283.py. 

496def active_profiles_match( 

497 profiles_raw: str, 

498 active_build_profiles: set[str] | frozenset[str], 

499) -> bool: 

500 profiles_raw = profiles_raw.strip() 

501 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true

502 raise ValueError( 

503 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

504 ) 

505 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

506 for profile_group_raw in profile_groups: 

507 should_process_package = True 

508 for profile_name in profile_group_raw.split(): 

509 negation = False 

510 if profile_name[0] == "!": 

511 negation = True 

512 profile_name = profile_name[1:] 

513 

514 matched_profile = profile_name in active_build_profiles 

515 if matched_profile == negation: 

516 should_process_package = False 

517 break 

518 

519 if should_process_package: 

520 return True 

521 

522 return False 

523 

524 

525def _parse_build_profiles(build_profiles_raw: str) -> frozenset[frozenset[str]]: 

526 profiles_raw = build_profiles_raw.strip() 

527 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 raise ValueError( 

529 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

530 ) 

531 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

532 return frozenset(frozenset(g.split()) for g in profile_groups) 

533 

534 

535def resolve_source_date_epoch( 

536 command_line_value: int | None, 

537 *, 

538 substitution: Optional["Substitution"] = None, 

539) -> int: 

540 mtime = command_line_value 

541 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

542 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

543 if sde_raw == "": 

544 _error("SOURCE_DATE_EPOCH is set but empty.") 

545 mtime = int(sde_raw) 

546 if mtime is None and substitution is not None: 

547 try: 

548 sde_raw = substitution.substitute( 

549 "{{SOURCE_DATE_EPOCH}}", 

550 "Internal resolution", 

551 ) 

552 mtime = int(sde_raw) 

553 except (DebputySubstitutionError, ValueError): 

554 pass 

555 if mtime is None: 

556 mtime = int(time.time()) 

557 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

558 return mtime 

559 

560 

561def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

562 with open(os.path.join(control_root_dir, "control")) as fd: 

563 control_file = Deb822(fd) 

564 

565 package_name = control_file["Package"] 

566 package_version = control_file["Version"] 

567 if "Architecture-Variant" in control_file: 

568 package_architecture = control_file["Architecture-Variant"] 

569 else: 

570 package_architecture = control_file["Architecture"] 

571 extension = control_file.get("Package-Type") or "deb" 

572 if ":" in package_version: 

573 package_version = package_version.split(":", 1)[1] 

574 if is_udeb: 

575 extension = "udeb" 

576 

577 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

578 

579 

580_SCRATCH_DIR = None 

581_DH_INTEGRATION_MODE = False 

582 

583 

584def integrated_with_debhelper() -> None: 

585 global _DH_INTEGRATION_MODE 

586 _DH_INTEGRATION_MODE = True 

587 

588 

589def scratch_dir(*, create_if_not_exists: bool = True) -> str: 

590 global _SCRATCH_DIR 

591 if _SCRATCH_DIR is not None: 

592 return _SCRATCH_DIR 

593 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

594 is_debputy_dir = True 

595 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true

596 _SCRATCH_DIR = debputy_scratch_dir 

597 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true

598 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

599 is_debputy_dir = False 

600 else: 

601 _SCRATCH_DIR = debputy_scratch_dir 

602 if create_if_not_exists: 602 ↛ 606line 602 didn't jump to line 606 because the condition on line 602 was always true

603 ensure_dir(_SCRATCH_DIR) 

604 if is_debputy_dir: 604 ↛ 606line 604 didn't jump to line 606 because the condition on line 604 was always true

605 generated_root_directory("debian/.debputy", internal_only=True) 

606 return _SCRATCH_DIR 

607 

608 

609def generated_root_directory(path: str, *, internal_only: bool = False) -> None: 

610 root_dir = Path(path) 

611 (root_dir / ".gitignore").write_text("*\n") 

612 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule 

613 if internal_only: 613 ↛ exitline 613 didn't return from function 'generated_root_directory' because the condition on line 613 was always true

614 (root_dir / "CACHEDIR.TAG").write_bytes( 

615 b"Signature: 8a477f597d28d172789f06886806bc55" 

616 ) 

617 

618 

619_RUNTIME_CONTAINER_DIR_KEY: str | None = None 

620 

621 

622def generated_content_dir( 

623 *, 

624 package: Optional["BinaryPackage"] = None, 

625 subdir_key: str | None = None, 

626) -> str: 

627 global _RUNTIME_CONTAINER_DIR_KEY 

628 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

629 first_run = False 

630 

631 if container_dir is None: 

632 first_run = True 

633 container_dir = f"_pb-{os.getpid()}" 

634 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

635 

636 directory = os.path.join(scratch_dir(), container_dir) 

637 

638 if first_run and os.path.isdir(directory): 638 ↛ 643line 638 didn't jump to line 643 because the condition on line 638 was never true

639 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

640 # see "stale" data. 

641 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

642 # reliable enough for that and we do not have an obvious hook for it. 

643 shutil.rmtree(directory) 

644 

645 directory = os.path.join( 

646 directory, 

647 "generated-fs-content", 

648 f"pkg_{package.name}" if package else "no-package", 

649 ) 

650 if subdir_key is not None: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 directory = os.path.join(directory, subdir_key) 

652 

653 os.makedirs(directory, exist_ok=True) 

654 return directory 

655 

656 

657PerlConfigVars = collections.namedtuple( 

658 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"] 

659) 

660PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

661_PERL_MODULE_DIRS: dict[str, PerlConfigVars] = {} 

662 

663 

664@functools.lru_cache(1) 

665def _perl_config_data() -> PerlConfigData: 

666 d = ( 

667 subprocess.check_output( 

668 [ 

669 "perl", 

670 "-MConfig", 

671 "-e", 

672 'print "$Config{version}\n$Config{debian_abi}\n"', 

673 ] 

674 ) 

675 .decode("utf-8") 

676 .splitlines() 

677 ) 

678 return PerlConfigData(*d) 

679 

680 

681def _perl_version() -> str: 

682 return _perl_config_data().version 

683 

684 

685def perlxs_api_dependency() -> str: 

686 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

687 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

688 config = _perl_config_data() 

689 if config.debian_abi is not None and config.debian_abi != "": 

690 return f"perlapi-{config.debian_abi}" 

691 return f"perlapi-{config.version}" 

692 

693 

694def resolve_perl_config( 

695 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

696 dctrl_bin: Optional["BinaryPackage"], 

697) -> PerlConfigVars: 

698 global _PERL_MODULE_DIRS 

699 if dpkg_architecture_variables.is_cross_compiling: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 if dctrl_bin is not None: 

701 arch = dctrl_bin.resolved_architecture 

702 ma = dctrl_bin.deb_multiarch 

703 else: 

704 arch = dpkg_architecture_variables.current_host_arch 

705 ma = dpkg_architecture_variables.current_host_multiarch 

706 else: 

707 # We are de facto using the build-arch version of perl here; be explicit 

708 arch = "_build_arch_" 

709 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"] 

710 config_vars = _PERL_MODULE_DIRS.get(arch) 

711 if config_vars is None: 

712 cmd = ["perl"] 

713 if dpkg_architecture_variables.is_cross_compiling: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 version = _perl_version() 

715 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}" 

716 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

717 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

718 # cross builds... meh. 

719 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")): 

720 cmd.append(f"-I{cross_inc_dir}") 

721 else: 

722 cross_inc_dir = None 

723 cmd.extend( 

724 [ 

725 "-MConfig", 

726 "-e", 

727 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"', 

728 ] 

729 ) 

730 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

731 if len(output) != 4: 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true

732 raise ValueError( 

733 "Internal error: Unable to determine the perl include directories:" 

734 f" Raw output from perl snippet: {output}" 

735 ) 

736 config_vars = PerlConfigVars( 

737 vendorlib="/" + _normalize_path(output[0], with_prefix=False), 

738 vendorarch="/" + _normalize_path(output[1], with_prefix=False), 

739 cross_inc_dir=cross_inc_dir, 

740 ld=output[2], 

741 path_sep=output[3], 

742 ) 

743 _PERL_MODULE_DIRS[arch] = config_vars 

744 return config_vars 

745 

746 

747@functools.lru_cache(1) 

748def detect_fakeroot() -> bool: 

749 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

750 return False 

751 env = dict(os.environ) 

752 del env["LD_PRELOAD"] 

753 try: 

754 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

755 except subprocess.CalledProcessError: 

756 print( 

757 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

758 file=sys.stderr, 

759 ) 

760 return False 

761 

762 

763@functools.lru_cache(1) 

764def _sc_arg_max() -> int | None: 

765 try: 

766 return os.sysconf("SC_ARG_MAX") 

767 except RuntimeError: 

768 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

769 return None 

770 

771 

772def _split_xargs_args( 

773 static_cmd: Sequence[str], 

774 max_args_byte_len: int, 

775 varargs: Iterable[str], 

776 reuse_list_ok: bool, 

777) -> Iterator[list[str]]: 

778 static_cmd_len = len(static_cmd) 

779 remaining_len = max_args_byte_len 

780 pending_args = list(static_cmd) 

781 for arg in varargs: 

782 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

783 remaining_len -= arg_len 

784 if not remaining_len: 

785 if len(pending_args) <= static_cmd_len: 

786 raise ValueError( 

787 f"Could not fit a single argument into the command line !?" 

788 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

789 ) 

790 yield pending_args 

791 remaining_len = max_args_byte_len - arg_len 

792 if reuse_list_ok: 

793 pending_args.clear() 

794 pending_args.extend(static_cmd) 

795 else: 

796 pending_args = list(static_cmd) 

797 pending_args.append(arg) 

798 

799 if len(pending_args) > static_cmd_len: 

800 yield pending_args 

801 

802 

803def xargs( 

804 static_cmd: Sequence[str], 

805 varargs: Iterable[str], 

806 *, 

807 env: Mapping[str, str] | None = None, 

808 reuse_list_ok: bool = False, 

809) -> Iterator[list[str]]: 

810 max_args_bytes = _sc_arg_max() 

811 # len overshoots with one space explaining the -1. The _split_xargs_args 

812 # will account for the space for the first argument 

813 static_byte_len = ( 

814 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

815 ) 

816 if max_args_bytes is not None: 

817 if env is None: 

818 # +2 for nul bytes after key and value 

819 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

820 else: 

821 # +2 for nul bytes after key and value 

822 static_byte_len += sum( 

823 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

824 for k, v in env.items() 

825 ) 

826 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

827 # that) 

828 static_byte_len += 2 * 4096 

829 else: 

830 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

831 max_args_bytes = 20_000 

832 remain_len = max_args_bytes - static_byte_len 

833 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

834 

835 

836# itertools recipe 

837def grouper( 

838 iterable: Iterable[T], 

839 n: int, 

840 *, 

841 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

842 fillvalue: T | None = None, 

843) -> Iterator[tuple[T, ...]]: 

844 """Collect data into non-overlapping fixed-length chunks or blocks""" 

845 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

846 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

847 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

848 args = [iter(iterable)] * n 

849 if incomplete == "fill": 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true

850 return zip_longest(*args, fillvalue=fillvalue) 

851 if incomplete == "strict": 851 ↛ 853line 851 didn't jump to line 853 because the condition on line 851 was always true

852 return zip(*args, strict=True) 

853 if incomplete == "ignore": 

854 return zip(*args) 

855 else: 

856 raise ValueError("Expected fill, strict, or ignore") 

857 

858 

859_LOGGING_SET_UP = False 

860 

861 

862def _check_color() -> tuple[bool, bool, str | None]: 

863 dpkg_or_default = os.environ.get( 

864 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

865 ) 

866 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

867 bad_request = None 

868 if requested_color not in {"auto", "always", "never"}: 868 ↛ 869line 868 didn't jump to line 869 because the condition on line 868 was never true

869 bad_request = requested_color 

870 requested_color = "auto" 

871 

872 if requested_color == "auto": 872 ↛ 876line 872 didn't jump to line 876 because the condition on line 872 was always true

873 stdout_color = sys.stdout.isatty() 

874 stderr_color = sys.stdout.isatty() 

875 else: 

876 enable = requested_color == "always" 

877 stdout_color = enable 

878 stderr_color = enable 

879 return stdout_color, stderr_color, bad_request 

880 

881 

882def program_name() -> str: 

883 name = os.path.basename(sys.argv[0]) 

884 if name.endswith(".py"): 884 ↛ 885line 884 didn't jump to line 885 because the condition on line 884 was never true

885 name = name[:-3] 

886 if name == "__main__": 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true

887 name = os.path.basename(os.path.dirname(sys.argv[0])) 

888 # FIXME: Not optimal that we have to hardcode these kind of things here 

889 if name == "debputy_cmd": 889 ↛ 890line 889 didn't jump to line 890 because the condition on line 889 was never true

890 name = "debputy" 

891 return name 

892 

893 

894def package_cross_check_precheck( 

895 pkg_a: "BinaryPackage", 

896 pkg_b: "BinaryPackage", 

897) -> tuple[bool, bool]: 

898 """Whether these two packages can do content cross-checks 

899 

900 :param pkg_a: The first package 

901 :param pkg_b: The second package 

902 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

903 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

904 that involves binary_package_a. Both can be True and both can be False at the same time, which 

905 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

906 """ 

907 

908 # Handle the two most obvious base-cases 

909 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

910 return False, False 

911 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

912 return False, False 

913 

914 a_may_see_b = True 

915 b_may_see_a = True 

916 

917 a_bp = pkg_a.fields.get("Build-Profiles", "") 

918 b_bp = pkg_b.fields.get("Build-Profiles", "") 

919 

920 if a_bp != b_bp: 

921 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

922 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

923 

924 # Check for build profiles being identically but just ordered differently. 

925 if a_bp_set != b_bp_set: 

926 # For simplicity, we let groups cancel each other out. If one side has no clauses 

927 # left, then it will always be built when the other is built. 

928 # 

929 # Eventually, someone will be here with a special case where more complex logic is 

930 # required. Good luck to you! Remember to add test cases for it (the existing logic 

931 # has some for a reason and if the logic is going to be more complex, it will need 

932 # tests cases to assert it fixes the problem and does not regress) 

933 if a_bp_set - b_bp_set: 

934 a_may_see_b = False 

935 if b_bp_set - a_bp_set: 

936 b_may_see_a = False 

937 

938 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

939 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

940 if pkg_a.declared_architecture != "any": 940 ↛ 942line 940 didn't jump to line 942 because the condition on line 940 was always true

941 b_may_see_a = False 

942 if pkg_a.declared_architecture != "any": 942 ↛ 945line 942 didn't jump to line 945 because the condition on line 942 was always true

943 a_may_see_b = False 

944 

945 return a_may_see_b, b_may_see_a 

946 

947 

948def change_log_level( 

949 log_level: int, 

950) -> None: 

951 if _DEFAULT_LOGGER is not None: 951 ↛ 953line 951 didn't jump to line 953 because the condition on line 951 was always true

952 _DEFAULT_LOGGER.setLevel(log_level) 

953 logging.getLogger("").setLevel(log_level) 

954 

955 

956def current_log_level() -> int | None: 

957 if _DEFAULT_LOGGER is not None: 

958 return _DEFAULT_LOGGER.level 

959 return None 

960 

961 

962def setup_logging( 

963 *, 

964 log_only_to_stderr: bool = False, 

965 reconfigure_logging: bool = False, 

966) -> None: 

967 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

968 if _LOGGING_SET_UP and not reconfigure_logging: 968 ↛ 969line 968 didn't jump to line 969 because the condition on line 968 was never true

969 raise RuntimeError( 

970 "Logging has already been configured." 

971 " Use reconfigure_logging=True if you need to reconfigure it" 

972 ) 

973 stdout_color, stderr_color, bad_request = _check_color() 

974 colors: dict[str, str] | None = None 

975 

976 if stdout_color or stderr_color: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true

977 try: 

978 import colorlog 

979 

980 except ImportError: 

981 stdout_color = False 

982 stderr_color = False 

983 else: 

984 colors = dict(colorlog.default_log_colors) 

985 # Add our custom levels. 

986 colors["_INFO"] = colors["INFO"] 

987 colors["__INFO"] = colors["INFO"] 

988 

989 if log_only_to_stderr: 

990 stdout = sys.stderr 

991 stdout_color = stderr_color 

992 else: 

993 stdout = sys.stdout 

994 

995 class LogLevelFilter(logging.Filter): 

996 def __init__(self, threshold: int, above: bool): 

997 super().__init__() 

998 self.threshold = threshold 

999 self.above = above 

1000 

1001 def filter(self, record: logging.LogRecord) -> bool: 

1002 if self.above: 

1003 return record.levelno >= self.threshold 

1004 else: 

1005 return record.levelno < self.threshold 

1006 

1007 color_format = ( 

1008 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

1009 ) 

1010 colorless_format = "{name}: {levelnamelower}: {message}" 

1011 

1012 existing_stdout_handler = _STDOUT_HANDLER 

1013 existing_stderr_handler = _STDERR_HANDLER 

1014 

1015 if stdout_color: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 stdout_handler = colorlog.StreamHandler(stdout) 

1017 stdout_handler.setFormatter( 

1018 colorlog.ColoredFormatter( 

1019 color_format, 

1020 style="{", 

1021 force_color=True, 

1022 log_colors=colors, 

1023 ) 

1024 ) 

1025 logger = colorlog.getLogger() 

1026 if existing_stdout_handler is not None: 

1027 logger.removeHandler(existing_stdout_handler) 

1028 _STDOUT_HANDLER = stdout_handler 

1029 logger.addHandler(stdout_handler) 

1030 else: 

1031 stdout_handler = logging.StreamHandler(stdout) 

1032 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1033 logger = logging.getLogger() 

1034 if existing_stdout_handler is not None: 

1035 logger.removeHandler(existing_stdout_handler) 

1036 _STDOUT_HANDLER = stdout_handler 

1037 logger.addHandler(stdout_handler) 

1038 

1039 if stderr_color: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true

1040 stderr_handler = colorlog.StreamHandler(sys.stderr) 

1041 stderr_handler.setFormatter( 

1042 colorlog.ColoredFormatter( 

1043 color_format, 

1044 style="{", 

1045 force_color=True, 

1046 log_colors=colors, 

1047 ) 

1048 ) 

1049 logger = logging.getLogger() 

1050 if existing_stderr_handler is not None: 

1051 logger.removeHandler(existing_stderr_handler) 

1052 _STDERR_HANDLER = stderr_handler 

1053 logger.addHandler(stderr_handler) 

1054 else: 

1055 stderr_handler = logging.StreamHandler(sys.stderr) 

1056 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1057 logger = logging.getLogger() 

1058 if existing_stderr_handler is not None: 

1059 logger.removeHandler(existing_stderr_handler) 

1060 _STDERR_HANDLER = stderr_handler 

1061 logger.addHandler(stderr_handler) 

1062 

1063 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

1064 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

1065 

1066 name = program_name() 

1067 

1068 old_factory = logging.getLogRecordFactory() 

1069 

1070 def record_factory( 

1071 *args: Any, **kwargs: Any 

1072 ) -> logging.LogRecord: # pragma: no cover 

1073 record = old_factory(*args, **kwargs) 

1074 record.levelname = record.levelname.lstrip("_") 

1075 record.levelnamelower = record.levelname.lower() 

1076 return record 

1077 

1078 logging.setLogRecordFactory(record_factory) 

1079 

1080 logging.getLogger().setLevel(logging.WARN) 

1081 _DEFAULT_LOGGER = logging.getLogger(name) 

1082 

1083 if bad_request: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true

1084 _DEFAULT_LOGGER.warning( 

1085 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

1086 ' Resetting to "auto".' 

1087 ) 

1088 

1089 _LOGGING_SET_UP = True