Coverage for src/debputy/util.py: 64%

560 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import argparse 

2import collections 

3import functools 

4import glob 

5import logging 

6import os 

7import re 

8import shutil 

9import subprocess 

10import sys 

11import time 

12from itertools import zip_longest 

13from pathlib import Path 

14from typing import ( 

15 NoReturn, 

16 TYPE_CHECKING, 

17 Union, 

18 Set, 

19 FrozenSet, 

20 Optional, 

21 TypeVar, 

22 Dict, 

23 Literal, 

24 Tuple, 

25 List, 

26 Any, 

27) 

28from collections.abc import Iterator, Iterable, Sequence, Mapping 

29 

30from debian.deb822 import Deb822 

31 

32from debputy import DEBPUTY_DOC_ROOT_DIR 

33from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

34from debputy.exceptions import DebputySubstitutionError 

35 

36try: 

37 from Levenshtein import distance 

38except ImportError: 

39 

40 CAN_DETECT_TYPOS = False 

41 

42 def detect_possible_typo( 

43 provided_value: str, 

44 known_values: Iterable[str], 

45 *, 

46 max_edit_distance: int = 2, 

47 ) -> Sequence[str]: 

48 return () 

49 

50else: 

51 

52 CAN_DETECT_TYPOS = True 

53 

54 def detect_possible_typo( 

55 provided_value: str, 

56 known_values: Iterable[str], 

57 *, 

58 max_edit_distance: int = 2, 

59 ) -> Sequence[str]: 

60 k_len = len(provided_value) 

61 candidates = [] 

62 for known_value in known_values: 

63 if abs(k_len - len(known_value)) > max_edit_distance: 

64 continue 

65 d = distance(provided_value, known_value) 

66 if d > max_edit_distance: 

67 continue 

68 candidates.append(known_value) 

69 return candidates 

70 

71 

72if TYPE_CHECKING: 

73 from debputy.types import EnvironmentModification 

74 from debputy.packages import BinaryPackage 

75 from debputy.substitution import Substitution 

76 

77 

78T = TypeVar("T") 

79 

80 

81SLASH_PRUNE = re.compile("//+") 

82PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

83PKGVERSION_REGEX = re.compile( 

84 r""" 

85 (?: \d+ : )? # Optional epoch 

86 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

87 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

88""", 

89 re.VERBOSE | re.ASCII, 

90) 

91DEFAULT_PACKAGE_TYPE = "deb" 

92DBGSYM_PACKAGE_TYPE = "deb" 

93UDEB_PACKAGE_TYPE = "udeb" 

94 

95POSTINST_DEFAULT_CONDITION = ( 

96 '[ "$1" = "configure" ]' 

97 ' || [ "$1" = "abort-upgrade" ]' 

98 ' || [ "$1" = "abort-deconfigure" ]' 

99 ' || [ "$1" = "abort-remove" ]' 

100) 

101 

102 

103_SPACE_RE = re.compile(r"\s") 

104_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=") 

105_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

106_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""") 

107_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

108_DEFAULT_LOGGER: logging.Logger | None = None 

109_STDOUT_HANDLER: logging.StreamHandler[Any] | None = None 

110_STDERR_HANDLER: logging.StreamHandler[Any] | None = None 

111PRINT_COMMAND = logging.INFO + 3 

112PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3 

113TRACE_LOG = logging.DEBUG + 3 

114 

115# Map them back to `INFO`. The names must be unique so the prefix is stripped. 

116logging.addLevelName(PRINT_COMMAND, "__INFO") 

117logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO") 

118logging.addLevelName(TRACE_LOG, "TRACE") 

119 

120 

121def assume_not_none(x: T | None) -> T: 

122 if x is None: # pragma: no cover 

123 raise ValueError( 

124 'Internal error: None was given, but the receiver assumed "not None" here' 

125 ) 

126 return x 

127 

128 

129def _non_verbose_info(msg: str) -> None: 

130 global _DEFAULT_LOGGER 

131 logger = _DEFAULT_LOGGER 

132 if logger is not None: 

133 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg) 

134 

135 

136def _info(msg: str) -> None: 

137 global _DEFAULT_LOGGER 

138 logger = _DEFAULT_LOGGER 

139 if logger: 

140 logger.info(msg) 

141 # No fallback print for info 

142 

143 

144def _is_trace_log_enabled() -> bool: 

145 global _DEFAULT_LOGGER 

146 logger = _DEFAULT_LOGGER 

147 return logger is not None and logger.isEnabledFor(TRACE_LOG) 

148 

149 

150def _trace_log(msg: str) -> None: 

151 global _DEFAULT_LOGGER 

152 logger = _DEFAULT_LOGGER 

153 if logger: 

154 logger.log(TRACE_LOG, msg) 

155 # No fallback print for this level 

156 

157 

158def _is_debug_log_enabled() -> bool: 

159 global _DEFAULT_LOGGER 

160 logger = _DEFAULT_LOGGER 

161 return logger is not None and logger.isEnabledFor(logging.DEBUG) 

162 

163 

164def _debug_log(msg: str) -> None: 

165 global _DEFAULT_LOGGER 

166 logger = _DEFAULT_LOGGER 

167 if logger: 

168 logger.debug(msg) 

169 # No fallback print for this level 

170 

171 

172def _error(msg: str, *, prog: str | None = None) -> "NoReturn": 

173 global _DEFAULT_LOGGER 

174 logger = _DEFAULT_LOGGER 

175 if logger: 

176 logger.error(msg) 

177 else: 

178 me = os.path.basename(sys.argv[0]) if prog is None else prog 

179 print( 

180 f"{me}: error: {msg}", 

181 file=sys.stderr, 

182 ) 

183 sys.exit(1) 

184 

185 

186def _warn(msg: str, *, prog: str | None = None) -> None: 

187 global _DEFAULT_LOGGER 

188 logger = _DEFAULT_LOGGER 

189 if logger: 

190 logger.warning(msg) 

191 else: 

192 me = os.path.basename(sys.argv[0]) if prog is None else prog 

193 

194 print( 

195 f"{me}: warning: {msg}", 

196 file=sys.stderr, 

197 ) 

198 

199 

200class ColorizedArgumentParser(argparse.ArgumentParser): 

201 def error(self, message: str) -> NoReturn: 

202 self.print_usage(sys.stderr) 

203 _error(message, prog=self.prog) 

204 

205 

206def ensure_dir(path: str) -> None: 

207 if not os.path.isdir(path): 207 ↛ exitline 207 didn't return from function 'ensure_dir' because the condition on line 207 was always true

208 os.makedirs(path, mode=0o755, exist_ok=True) 

209 

210 

211def _clean_path(orig_p: str, allow_and_keep_upward_segments: bool = False) -> str: 

212 p = SLASH_PRUNE.sub("/", orig_p) 

213 if "." in p: 213 ↛ 236line 213 didn't jump to line 236 because the condition on line 213 was always true

214 path_base = p 

215 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

216 # of a normalized path to be a no-op. 

217 if path_base.startswith("./"): 

218 path_base = path_base[2:] 

219 assert path_base 

220 if allow_and_keep_upward_segments: 

221 stack = [] 

222 for segment in path_base.split("/"): 

223 if segment == ".": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 continue 

225 stack.append(segment) 

226 p = "/".join(stack) 

227 if path_base.startswith("/"): 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 p = "/" + p 

229 else: 

230 for segment in path_base.split("/"): 

231 if segment in (".", ".."): 

232 raise ValueError( 

233 'Please provide paths that are normalized (i.e., no ".." or ".").' 

234 f' Offending input "{orig_p}"' 

235 ) 

236 return p 

237 

238 

239def _normalize_path( 

240 path: str, 

241 with_prefix: bool = True, 

242 allow_and_keep_upward_segments: bool = False, 

243) -> str: 

244 path = path.strip("/") 

245 if not path or path == ".": 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 return "." 

247 if "//" in path or "." in path: 

248 path = _clean_path( 

249 path, 

250 allow_and_keep_upward_segments=allow_and_keep_upward_segments, 

251 ) 

252 if with_prefix ^ path.startswith("./"): 

253 if with_prefix: 253 ↛ 256line 253 didn't jump to line 256 because the condition on line 253 was always true

254 path = "./" + path 

255 else: 

256 path = path[2:] 

257 return path 

258 

259 

260def _normalize_link_target(link_target: str) -> str: 

261 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

262 result: list[str] = [] 

263 for segment in link_target.split("/"): 

264 if segment in (".", ""): 

265 # Ignore these - the empty string is generally a trailing slash 

266 continue 

267 if segment == "..": 

268 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

269 if result: 269 ↛ 263line 269 didn't jump to line 263 because the condition on line 269 was always true

270 result.pop() 

271 else: 

272 result.append(segment) 

273 return "/".join(result) 

274 

275 

276def manifest_format_doc(anchor: str) -> str: 

277 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md" 

278 return f"{manifest_format}#{anchor}" if anchor else manifest_format 

279 

280 

281def _backslash_escape(m: re.Match[str]) -> str: 

282 return "\\" + m.group(0) 

283 

284 

285def _escape_shell_word(w: str) -> str: 

286 if _SPACE_RE.search(w): 

287 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None: 

288 s = m.span(0) 

289 assert s[0] == 0 

290 prefix = w[0 : s[1]] 

291 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :]) 

292 return f'{prefix}"{escaped_value}"' 

293 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

294 return f'"{w}"' 

295 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

296 

297 

298def escape_shell(*args: str) -> str: 

299 return " ".join(_escape_shell_word(w) for w in args) 

300 

301 

302def render_command( 

303 *args: str, 

304 cwd: str | None = None, 

305 env_mod: Optional["EnvironmentModification"] = None, 

306) -> str: 

307 env_mod_prefix = "" 

308 if env_mod: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 env_mod_parts = [] 

310 if bool(env_mod.removals): 

311 env_mod_parts.append("env") 

312 if cwd is not None: 

313 env_mod_parts.append(f"--chdir={escape_shell(cwd)}") 

314 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals) 

315 env_mod_parts.extend( 

316 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements 

317 ) 

318 

319 chdir_prefix = "" 

320 if cwd is not None and cwd != ".": 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 chdir_prefix = f"cd {escape_shell(cwd)} && " 

322 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}" 

323 

324 

325def print_command( 

326 *args: str, 

327 cwd: str | None = None, 

328 env_mod: Optional["EnvironmentModification"] = None, 

329 print_at_log_level: int = PRINT_COMMAND, 

330) -> None: 

331 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 

332 return 

333 

334 rendered_cmd = render_command( 

335 *args, 

336 cwd=cwd, 

337 env_mod=env_mod, 

338 ) 

339 global _STDOUT_HANDLER 

340 handler = _STDOUT_HANDLER 

341 if handler is not None: 341 ↛ 346line 341 didn't jump to line 346 because the condition on line 341 was always true

342 handler.flush() 

343 # Ensure command is output immediately so it is hanging after its output. 

344 # TODO: This should `file` in case something in debputy redirects stdout 

345 # (nothing does that for now) 

346 print(f" {rendered_cmd}") 

347 sys.stdout.flush() 

348 

349 

350def run_command( 

351 *args: str, 

352 cwd: str | None = None, 

353 env: Mapping[str, str] | None = None, 

354 env_mod: Optional["EnvironmentModification"] = None, 

355 print_at_log_level: int = PRINT_COMMAND, 

356 raise_file_not_found_on_missing_command: bool = False, 

357) -> None: 

358 print_command( 

359 *args, 

360 cwd=cwd, 

361 env_mod=env_mod, 

362 print_at_log_level=print_at_log_level, 

363 ) 

364 if env_mod: 

365 if env is None: 

366 env = os.environ 

367 env = env_mod.compute_env(env) 

368 if env is os.environ: 

369 env = None 

370 try: 

371 subprocess.check_call(args, cwd=cwd, env=env) 

372 # At least "clean_logic.py" relies on catching FileNotFoundError 

373 except KeyboardInterrupt: 

374 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}") 

375 except FileNotFoundError: 

376 if raise_file_not_found_on_missing_command: 

377 raise 

378 if "/" in args[0]: 

379 _error(f"Could not run {escape_shell(args[0])}: Path does not exist") 

380 # Use the `command not found` to aid existing log pattern 

381 _error(f"{escape_shell(args[0])}: command not found") 

382 except subprocess.CalledProcessError as e: 

383 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}") 

384 

385 

386def run_build_system_command( 

387 *args: str, 

388 cwd: str | None = None, 

389 env: Mapping[str, str] | None = None, 

390 env_mod: Optional["EnvironmentModification"] = None, 

391 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND, 

392 raise_file_not_found_on_missing_command: bool = False, 

393) -> None: 

394 run_command( 

395 *args, 

396 cwd=cwd, 

397 env=env, 

398 env_mod=env_mod, 

399 print_at_log_level=print_at_log_level, 

400 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command, 

401 ) 

402 

403 

404def debian_policy_normalize_symlink_target( 

405 link_path: str, 

406 link_target: str, 

407 normalize_link_path: bool = False, 

408) -> str: 

409 if normalize_link_path: 

410 link_path = _normalize_path(link_path) 

411 elif not link_path.startswith("./"): 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true

412 raise ValueError("Link part was not normalized") 

413 

414 link_path = link_path[2:] 

415 

416 if not link_target.startswith("/"): 

417 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

418 

419 link_path_parts = link_path.split("/") 

420 link_target_parts = [ 

421 s for s in _normalize_link_target(link_target).split("/") if s != "." 

422 ] 

423 

424 assert link_path_parts 

425 

426 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

427 # Per Debian Policy, must be relative 

428 

429 # First determine the length of the overlap 

430 common_segment_count = 1 

431 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

432 while ( 

433 common_segment_count < shortest_path_length 

434 and link_target_parts[common_segment_count] 

435 == link_path_parts[common_segment_count] 

436 ): 

437 common_segment_count += 1 

438 

439 if common_segment_count == shortest_path_length and len( 

440 link_path_parts 

441 ) - 1 == len(link_target_parts): 

442 normalized_link_target = "." 

443 else: 

444 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

445 normalized_link_target_parts = [] 

446 if up_dir_count: 

447 up_dir_part = "../" * up_dir_count 

448 # We overshoot with a single '/', so rstrip it away 

449 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

450 # Add the relevant down parts 

451 normalized_link_target_parts.extend( 

452 link_target_parts[common_segment_count:] 

453 ) 

454 

455 normalized_link_target = "/".join(normalized_link_target_parts) 

456 else: 

457 # Per Debian Policy, must be absolute 

458 normalized_link_target = "/" + "/".join(link_target_parts) 

459 

460 return normalized_link_target 

461 

462 

463def has_glob_magic(pattern: str) -> bool: 

464 return glob.has_magic(pattern) or "{" in pattern 

465 

466 

467def glob_escape(replacement_value: str) -> str: 

468 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

469 return replacement_value 

470 return ( 

471 replacement_value.replace("[", "[[]") 

472 .replace("]", "[]]") 

473 .replace("*", "[*]") 

474 .replace("?", "[?]") 

475 .replace("{", "[{]") 

476 .replace("}", "[}]") 

477 ) 

478 

479 

480# TODO: This logic should probably be moved to `python-debian` 

481def active_profiles_match( 

482 profiles_raw: str, 

483 active_build_profiles: set[str] | frozenset[str], 

484) -> bool: 

485 profiles_raw = profiles_raw.strip() 

486 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 raise ValueError( 

488 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

489 ) 

490 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

491 for profile_group_raw in profile_groups: 491 ↛ 507line 491 didn't jump to line 507 because the loop on line 491 didn't complete

492 should_process_package = True 

493 for profile_name in profile_group_raw.split(): 

494 negation = False 

495 if profile_name[0] == "!": 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true

496 negation = True 

497 profile_name = profile_name[1:] 

498 

499 matched_profile = profile_name in active_build_profiles 

500 if matched_profile == negation: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 should_process_package = False 

502 break 

503 

504 if should_process_package: 504 ↛ 491line 504 didn't jump to line 491 because the condition on line 504 was always true

505 return True 

506 

507 return False 

508 

509 

510def _parse_build_profiles(build_profiles_raw: str) -> frozenset[frozenset[str]]: 

511 profiles_raw = build_profiles_raw.strip() 

512 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 raise ValueError( 

514 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

515 ) 

516 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

517 return frozenset(frozenset(g.split()) for g in profile_groups) 

518 

519 

520def resolve_source_date_epoch( 

521 command_line_value: int | None, 

522 *, 

523 substitution: Optional["Substitution"] = None, 

524) -> int: 

525 mtime = command_line_value 

526 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

527 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

528 if sde_raw == "": 

529 _error("SOURCE_DATE_EPOCH is set but empty.") 

530 mtime = int(sde_raw) 

531 if mtime is None and substitution is not None: 

532 try: 

533 sde_raw = substitution.substitute( 

534 "{{SOURCE_DATE_EPOCH}}", 

535 "Internal resolution", 

536 ) 

537 mtime = int(sde_raw) 

538 except (DebputySubstitutionError, ValueError): 

539 pass 

540 if mtime is None: 

541 mtime = int(time.time()) 

542 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

543 return mtime 

544 

545 

546def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

547 with open(os.path.join(control_root_dir, "control")) as fd: 

548 control_file = Deb822(fd) 

549 

550 package_name = control_file["Package"] 

551 package_version = control_file["Version"] 

552 if "Architecture-Variant" in control_file: 

553 package_architecture = control_file["Architecture-Variant"] 

554 else: 

555 package_architecture = control_file["Architecture"] 

556 extension = control_file.get("Package-Type") or "deb" 

557 if ":" in package_version: 

558 package_version = package_version.split(":", 1)[1] 

559 if is_udeb: 

560 extension = "udeb" 

561 

562 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

563 

564 

565_SCRATCH_DIR = None 

566_DH_INTEGRATION_MODE = False 

567 

568 

569def integrated_with_debhelper() -> None: 

570 global _DH_INTEGRATION_MODE 

571 _DH_INTEGRATION_MODE = True 

572 

573 

574def scratch_dir(*, create_if_not_exists: bool = True) -> str: 

575 global _SCRATCH_DIR 

576 if _SCRATCH_DIR is not None: 

577 return _SCRATCH_DIR 

578 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

579 is_debputy_dir = True 

580 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 580 ↛ 581line 580 didn't jump to line 581 because the condition on line 580 was never true

581 _SCRATCH_DIR = debputy_scratch_dir 

582 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

584 is_debputy_dir = False 

585 else: 

586 _SCRATCH_DIR = debputy_scratch_dir 

587 if create_if_not_exists: 587 ↛ 591line 587 didn't jump to line 591 because the condition on line 587 was always true

588 ensure_dir(_SCRATCH_DIR) 

589 if is_debputy_dir: 589 ↛ 591line 589 didn't jump to line 591 because the condition on line 589 was always true

590 generated_root_directory("debian/.debputy", internal_only=True) 

591 return _SCRATCH_DIR 

592 

593 

594def generated_root_directory(path: str, *, internal_only: bool = False) -> None: 

595 root_dir = Path(path) 

596 (root_dir / ".gitignore").write_text("*\n") 

597 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule 

598 if internal_only: 598 ↛ exitline 598 didn't return from function 'generated_root_directory' because the condition on line 598 was always true

599 (root_dir / "CACHEDIR.TAG").write_bytes( 

600 b"Signature: 8a477f597d28d172789f06886806bc55" 

601 ) 

602 

603 

604_RUNTIME_CONTAINER_DIR_KEY: str | None = None 

605 

606 

607def generated_content_dir( 

608 *, 

609 package: Optional["BinaryPackage"] = None, 

610 subdir_key: str | None = None, 

611) -> str: 

612 global _RUNTIME_CONTAINER_DIR_KEY 

613 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

614 first_run = False 

615 

616 if container_dir is None: 

617 first_run = True 

618 container_dir = f"_pb-{os.getpid()}" 

619 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

620 

621 directory = os.path.join(scratch_dir(), container_dir) 

622 

623 if first_run and os.path.isdir(directory): 623 ↛ 628line 623 didn't jump to line 628 because the condition on line 623 was never true

624 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

625 # see "stale" data. 

626 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

627 # reliable enough for that and we do not have an obvious hook for it. 

628 shutil.rmtree(directory) 

629 

630 directory = os.path.join( 

631 directory, 

632 "generated-fs-content", 

633 f"pkg_{package.name}" if package else "no-package", 

634 ) 

635 if subdir_key is not None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 directory = os.path.join(directory, subdir_key) 

637 

638 os.makedirs(directory, exist_ok=True) 

639 return directory 

640 

641 

642PerlConfigVars = collections.namedtuple( 

643 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"] 

644) 

645PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

646_PERL_MODULE_DIRS: dict[str, PerlConfigVars] = {} 

647 

648 

649@functools.lru_cache(1) 

650def _perl_config_data() -> PerlConfigData: 

651 d = ( 

652 subprocess.check_output( 

653 [ 

654 "perl", 

655 "-MConfig", 

656 "-e", 

657 'print "$Config{version}\n$Config{debian_abi}\n"', 

658 ] 

659 ) 

660 .decode("utf-8") 

661 .splitlines() 

662 ) 

663 return PerlConfigData(*d) 

664 

665 

666def _perl_version() -> str: 

667 return _perl_config_data().version 

668 

669 

670def perlxs_api_dependency() -> str: 

671 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

672 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

673 config = _perl_config_data() 

674 if config.debian_abi is not None and config.debian_abi != "": 

675 return f"perlapi-{config.debian_abi}" 

676 return f"perlapi-{config.version}" 

677 

678 

679def resolve_perl_config( 

680 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

681 dctrl_bin: Optional["BinaryPackage"], 

682) -> PerlConfigVars: 

683 global _PERL_MODULE_DIRS 

684 if dpkg_architecture_variables.is_cross_compiling: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true

685 if dctrl_bin is not None: 

686 arch = dctrl_bin.resolved_architecture 

687 ma = dctrl_bin.deb_multiarch 

688 else: 

689 arch = dpkg_architecture_variables.current_host_arch 

690 ma = dpkg_architecture_variables.current_host_multiarch 

691 else: 

692 # We are de facto using the build-arch version of perl here; be explicit 

693 arch = "_build_arch_" 

694 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"] 

695 config_vars = _PERL_MODULE_DIRS.get(arch) 

696 if config_vars is None: 

697 cmd = ["perl"] 

698 if dpkg_architecture_variables.is_cross_compiling: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true

699 version = _perl_version() 

700 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}" 

701 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

702 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

703 # cross builds... meh. 

704 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")): 

705 cmd.append(f"-I{cross_inc_dir}") 

706 else: 

707 cross_inc_dir = None 

708 cmd.extend( 

709 [ 

710 "-MConfig", 

711 "-e", 

712 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"', 

713 ] 

714 ) 

715 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

716 if len(output) != 4: 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true

717 raise ValueError( 

718 "Internal error: Unable to determine the perl include directories:" 

719 f" Raw output from perl snippet: {output}" 

720 ) 

721 config_vars = PerlConfigVars( 

722 vendorlib="/" + _normalize_path(output[0], with_prefix=False), 

723 vendorarch="/" + _normalize_path(output[1], with_prefix=False), 

724 cross_inc_dir=cross_inc_dir, 

725 ld=output[2], 

726 path_sep=output[3], 

727 ) 

728 _PERL_MODULE_DIRS[arch] = config_vars 

729 return config_vars 

730 

731 

732@functools.lru_cache(1) 

733def detect_fakeroot() -> bool: 

734 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

735 return False 

736 env = dict(os.environ) 

737 del env["LD_PRELOAD"] 

738 try: 

739 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

740 except subprocess.CalledProcessError: 

741 print( 

742 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

743 file=sys.stderr, 

744 ) 

745 return False 

746 

747 

748@functools.lru_cache(1) 

749def _sc_arg_max() -> int | None: 

750 try: 

751 return os.sysconf("SC_ARG_MAX") 

752 except RuntimeError: 

753 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

754 return None 

755 

756 

757def _split_xargs_args( 

758 static_cmd: Sequence[str], 

759 max_args_byte_len: int, 

760 varargs: Iterable[str], 

761 reuse_list_ok: bool, 

762) -> Iterator[list[str]]: 

763 static_cmd_len = len(static_cmd) 

764 remaining_len = max_args_byte_len 

765 pending_args = list(static_cmd) 

766 for arg in varargs: 

767 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

768 remaining_len -= arg_len 

769 if not remaining_len: 

770 if len(pending_args) <= static_cmd_len: 

771 raise ValueError( 

772 f"Could not fit a single argument into the command line !?" 

773 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

774 ) 

775 yield pending_args 

776 remaining_len = max_args_byte_len - arg_len 

777 if reuse_list_ok: 

778 pending_args.clear() 

779 pending_args.extend(static_cmd) 

780 else: 

781 pending_args = list(static_cmd) 

782 pending_args.append(arg) 

783 

784 if len(pending_args) > static_cmd_len: 

785 yield pending_args 

786 

787 

788def xargs( 

789 static_cmd: Sequence[str], 

790 varargs: Iterable[str], 

791 *, 

792 env: Mapping[str, str] | None = None, 

793 reuse_list_ok: bool = False, 

794) -> Iterator[list[str]]: 

795 max_args_bytes = _sc_arg_max() 

796 # len overshoots with one space explaining the -1. The _split_xargs_args 

797 # will account for the space for the first argument 

798 static_byte_len = ( 

799 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

800 ) 

801 if max_args_bytes is not None: 

802 if env is None: 

803 # +2 for nul bytes after key and value 

804 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

805 else: 

806 # +2 for nul bytes after key and value 

807 static_byte_len += sum( 

808 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

809 for k, v in env.items() 

810 ) 

811 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

812 # that) 

813 static_byte_len += 2 * 4096 

814 else: 

815 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

816 max_args_bytes = 20_000 

817 remain_len = max_args_bytes - static_byte_len 

818 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

819 

820 

821# itertools recipe 

822def grouper( 

823 iterable: Iterable[T], 

824 n: int, 

825 *, 

826 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

827 fillvalue: T | None = None, 

828) -> Iterator[tuple[T, ...]]: 

829 """Collect data into non-overlapping fixed-length chunks or blocks""" 

830 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

831 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

832 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

833 args = [iter(iterable)] * n 

834 if incomplete == "fill": 834 ↛ 835line 834 didn't jump to line 835 because the condition on line 834 was never true

835 return zip_longest(*args, fillvalue=fillvalue) 

836 if incomplete == "strict": 836 ↛ 838line 836 didn't jump to line 838 because the condition on line 836 was always true

837 return zip(*args, strict=True) 

838 if incomplete == "ignore": 

839 return zip(*args) 

840 else: 

841 raise ValueError("Expected fill, strict, or ignore") 

842 

843 

844_LOGGING_SET_UP = False 

845 

846 

847def _check_color() -> tuple[bool, bool, str | None]: 

848 dpkg_or_default = os.environ.get( 

849 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

850 ) 

851 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

852 bad_request = None 

853 if requested_color not in {"auto", "always", "never"}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true

854 bad_request = requested_color 

855 requested_color = "auto" 

856 

857 if requested_color == "auto": 857 ↛ 861line 857 didn't jump to line 861 because the condition on line 857 was always true

858 stdout_color = sys.stdout.isatty() 

859 stderr_color = sys.stdout.isatty() 

860 else: 

861 enable = requested_color == "always" 

862 stdout_color = enable 

863 stderr_color = enable 

864 return stdout_color, stderr_color, bad_request 

865 

866 

867def program_name() -> str: 

868 name = os.path.basename(sys.argv[0]) 

869 if name.endswith(".py"): 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 name = name[:-3] 

871 if name == "__main__": 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 name = os.path.basename(os.path.dirname(sys.argv[0])) 

873 # FIXME: Not optimal that we have to hardcode these kind of things here 

874 if name == "debputy_cmd": 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true

875 name = "debputy" 

876 return name 

877 

878 

879def package_cross_check_precheck( 

880 pkg_a: "BinaryPackage", 

881 pkg_b: "BinaryPackage", 

882) -> tuple[bool, bool]: 

883 """Whether these two packages can do content cross-checks 

884 

885 :param pkg_a: The first package 

886 :param pkg_b: The second package 

887 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

888 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

889 that involves binary_package_a. Both can be True and both can be False at the same time, which 

890 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

891 """ 

892 

893 # Handle the two most obvious base-cases 

894 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

895 return False, False 

896 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

897 return False, False 

898 

899 a_may_see_b = True 

900 b_may_see_a = True 

901 

902 a_bp = pkg_a.fields.get("Build-Profiles", "") 

903 b_bp = pkg_b.fields.get("Build-Profiles", "") 

904 

905 if a_bp != b_bp: 

906 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

907 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

908 

909 # Check for build profiles being identically but just ordered differently. 

910 if a_bp_set != b_bp_set: 

911 # For simplicity, we let groups cancel each other out. If one side has no clauses 

912 # left, then it will always be built when the other is built. 

913 # 

914 # Eventually, someone will be here with a special case where more complex logic is 

915 # required. Good luck to you! Remember to add test cases for it (the existing logic 

916 # has some for a reason and if the logic is going to be more complex, it will need 

917 # tests cases to assert it fixes the problem and does not regress) 

918 if a_bp_set - b_bp_set: 

919 a_may_see_b = False 

920 if b_bp_set - a_bp_set: 

921 b_may_see_a = False 

922 

923 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

924 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

925 if pkg_a.declared_architecture != "any": 925 ↛ 927line 925 didn't jump to line 927 because the condition on line 925 was always true

926 b_may_see_a = False 

927 if pkg_a.declared_architecture != "any": 927 ↛ 930line 927 didn't jump to line 930 because the condition on line 927 was always true

928 a_may_see_b = False 

929 

930 return a_may_see_b, b_may_see_a 

931 

932 

933def change_log_level( 

934 log_level: int, 

935) -> None: 

936 if _DEFAULT_LOGGER is not None: 936 ↛ 938line 936 didn't jump to line 938 because the condition on line 936 was always true

937 _DEFAULT_LOGGER.setLevel(log_level) 

938 logging.getLogger("").setLevel(log_level) 

939 

940 

941def current_log_level() -> int | None: 

942 if _DEFAULT_LOGGER is not None: 

943 return _DEFAULT_LOGGER.level 

944 return None 

945 

946 

947def setup_logging( 

948 *, 

949 log_only_to_stderr: bool = False, 

950 reconfigure_logging: bool = False, 

951) -> None: 

952 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

953 if _LOGGING_SET_UP and not reconfigure_logging: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 raise RuntimeError( 

955 "Logging has already been configured." 

956 " Use reconfigure_logging=True if you need to reconfigure it" 

957 ) 

958 stdout_color, stderr_color, bad_request = _check_color() 

959 colors: dict[str, str] | None = None 

960 

961 if stdout_color or stderr_color: 961 ↛ 962line 961 didn't jump to line 962 because the condition on line 961 was never true

962 try: 

963 import colorlog 

964 

965 except ImportError: 

966 stdout_color = False 

967 stderr_color = False 

968 else: 

969 colors = dict(colorlog.default_log_colors) 

970 # Add our custom levels. 

971 colors["_INFO"] = colors["INFO"] 

972 colors["__INFO"] = colors["INFO"] 

973 

974 if log_only_to_stderr: 

975 stdout = sys.stderr 

976 stdout_color = stderr_color 

977 else: 

978 stdout = sys.stdout 

979 

980 class LogLevelFilter(logging.Filter): 

981 def __init__(self, threshold: int, above: bool): 

982 super().__init__() 

983 self.threshold = threshold 

984 self.above = above 

985 

986 def filter(self, record: logging.LogRecord) -> bool: 

987 if self.above: 

988 return record.levelno >= self.threshold 

989 else: 

990 return record.levelno < self.threshold 

991 

992 color_format = ( 

993 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

994 ) 

995 colorless_format = "{name}: {levelnamelower}: {message}" 

996 

997 existing_stdout_handler = _STDOUT_HANDLER 

998 existing_stderr_handler = _STDERR_HANDLER 

999 

1000 if stdout_color: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true

1001 stdout_handler = colorlog.StreamHandler(stdout) 

1002 stdout_handler.setFormatter( 

1003 colorlog.ColoredFormatter( 

1004 color_format, 

1005 style="{", 

1006 force_color=True, 

1007 log_colors=colors, 

1008 ) 

1009 ) 

1010 logger = colorlog.getLogger() 

1011 if existing_stdout_handler is not None: 

1012 logger.removeHandler(existing_stdout_handler) 

1013 _STDOUT_HANDLER = stdout_handler 

1014 logger.addHandler(stdout_handler) 

1015 else: 

1016 stdout_handler = logging.StreamHandler(stdout) 

1017 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1018 logger = logging.getLogger() 

1019 if existing_stdout_handler is not None: 

1020 logger.removeHandler(existing_stdout_handler) 

1021 _STDOUT_HANDLER = stdout_handler 

1022 logger.addHandler(stdout_handler) 

1023 

1024 if stderr_color: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true

1025 stderr_handler = colorlog.StreamHandler(sys.stderr) 

1026 stderr_handler.setFormatter( 

1027 colorlog.ColoredFormatter( 

1028 color_format, 

1029 style="{", 

1030 force_color=True, 

1031 log_colors=colors, 

1032 ) 

1033 ) 

1034 logger = logging.getLogger() 

1035 if existing_stderr_handler is not None: 

1036 logger.removeHandler(existing_stderr_handler) 

1037 _STDERR_HANDLER = stderr_handler 

1038 logger.addHandler(stderr_handler) 

1039 else: 

1040 stderr_handler = logging.StreamHandler(sys.stderr) 

1041 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1042 logger = logging.getLogger() 

1043 if existing_stderr_handler is not None: 

1044 logger.removeHandler(existing_stderr_handler) 

1045 _STDERR_HANDLER = stderr_handler 

1046 logger.addHandler(stderr_handler) 

1047 

1048 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

1049 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

1050 

1051 name = program_name() 

1052 

1053 old_factory = logging.getLogRecordFactory() 

1054 

1055 def record_factory( 

1056 *args: Any, **kwargs: Any 

1057 ) -> logging.LogRecord: # pragma: no cover 

1058 record = old_factory(*args, **kwargs) 

1059 record.levelname = record.levelname.lstrip("_") 

1060 record.levelnamelower = record.levelname.lower() 

1061 return record 

1062 

1063 logging.setLogRecordFactory(record_factory) 

1064 

1065 logging.getLogger().setLevel(logging.WARN) 

1066 _DEFAULT_LOGGER = logging.getLogger(name) 

1067 

1068 if bad_request: 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true

1069 _DEFAULT_LOGGER.warning( 

1070 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

1071 ' Resetting to "auto".' 

1072 ) 

1073 

1074 _LOGGING_SET_UP = True