Coverage for src/debputy/util.py: 61%

548 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import argparse 

2import collections 

3import functools 

4import glob 

5import logging 

6import os 

7import re 

8import shutil 

9import subprocess 

10import sys 

11import time 

12from itertools import zip_longest 

13from pathlib import Path 

14from typing import ( 

15 NoReturn, 

16 TYPE_CHECKING, 

17 Union, 

18 Set, 

19 FrozenSet, 

20 Optional, 

21 TypeVar, 

22 Dict, 

23 Iterator, 

24 Iterable, 

25 Literal, 

26 Tuple, 

27 Sequence, 

28 List, 

29 Mapping, 

30 Any, 

31) 

32 

33from debian.deb822 import Deb822 

34 

35from debputy import DEBPUTY_DOC_ROOT_DIR 

36from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

37from debputy.exceptions import DebputySubstitutionError 

38 

39try: 

40 from Levenshtein import distance 

41except ImportError: 

42 

43 CAN_DETECT_TYPOS = False 

44 

45 def detect_possible_typo( 

46 provided_value: str, 

47 known_values: Iterable[str], 

48 *, 

49 max_edit_distance: int = 2, 

50 ) -> Sequence[str]: 

51 return () 

52 

53else: 

54 

55 CAN_DETECT_TYPOS = True 

56 

57 def detect_possible_typo( 

58 provided_value: str, 

59 known_values: Iterable[str], 

60 *, 

61 max_edit_distance: int = 2, 

62 ) -> Sequence[str]: 

63 k_len = len(provided_value) 

64 candidates = [] 

65 for known_value in known_values: 

66 if abs(k_len - len(known_value)) > max_edit_distance: 

67 continue 

68 d = distance(provided_value, known_value) 

69 if d > max_edit_distance: 

70 continue 

71 candidates.append(known_value) 

72 return candidates 

73 

74 

75if TYPE_CHECKING: 

76 from debputy.types import EnvironmentModification 

77 from debputy.packages import BinaryPackage 

78 from debputy.substitution import Substitution 

79 

80 

81T = TypeVar("T") 

82 

83 

84SLASH_PRUNE = re.compile("//+") 

85PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

86PKGVERSION_REGEX = re.compile( 

87 r""" 

88 (?: \d+ : )? # Optional epoch 

89 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

90 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

91""", 

92 re.VERBOSE | re.ASCII, 

93) 

94DEFAULT_PACKAGE_TYPE = "deb" 

95DBGSYM_PACKAGE_TYPE = "deb" 

96UDEB_PACKAGE_TYPE = "udeb" 

97 

98POSTINST_DEFAULT_CONDITION = ( 

99 '[ "$1" = "configure" ]' 

100 ' || [ "$1" = "abort-upgrade" ]' 

101 ' || [ "$1" = "abort-deconfigure" ]' 

102 ' || [ "$1" = "abort-remove" ]' 

103) 

104 

105 

106_SPACE_RE = re.compile(r"\s") 

107_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=") 

108_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

109_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""") 

110_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

111_DEFAULT_LOGGER: Optional[logging.Logger] = None 

112_STDOUT_HANDLER: Optional[logging.StreamHandler[Any]] = None 

113_STDERR_HANDLER: Optional[logging.StreamHandler[Any]] = None 

114PRINT_COMMAND = logging.INFO + 3 

115PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3 

116TRACE_LOG = logging.DEBUG + 3 

117 

118# Map them back to `INFO`. The names must be unique so the prefix is stripped. 

119logging.addLevelName(PRINT_COMMAND, "__INFO") 

120logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO") 

121logging.addLevelName(TRACE_LOG, "TRACE") 

122 

123 

124def assume_not_none(x: Optional[T]) -> T: 

125 if x is None: # pragma: no cover 

126 raise ValueError( 

127 'Internal error: None was given, but the receiver assumed "not None" here' 

128 ) 

129 return x 

130 

131 

132def _non_verbose_info(msg: str) -> None: 

133 global _DEFAULT_LOGGER 

134 logger = _DEFAULT_LOGGER 

135 if logger is not None: 

136 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg) 

137 

138 

139def _info(msg: str) -> None: 

140 global _DEFAULT_LOGGER 

141 logger = _DEFAULT_LOGGER 

142 if logger: 

143 logger.info(msg) 

144 # No fallback print for info 

145 

146 

147def _is_trace_log_enabled() -> bool: 

148 global _DEFAULT_LOGGER 

149 logger = _DEFAULT_LOGGER 

150 return logger is not None and logger.isEnabledFor(TRACE_LOG) 

151 

152 

153def _trace_log(msg: str) -> None: 

154 global _DEFAULT_LOGGER 

155 logger = _DEFAULT_LOGGER 

156 if logger: 

157 logger.log(TRACE_LOG, msg) 

158 # No fallback print for this level 

159 

160 

161def _is_debug_log_enabled() -> bool: 

162 global _DEFAULT_LOGGER 

163 logger = _DEFAULT_LOGGER 

164 return logger is not None and logger.isEnabledFor(logging.DEBUG) 

165 

166 

167def _debug_log(msg: str) -> None: 

168 global _DEFAULT_LOGGER 

169 logger = _DEFAULT_LOGGER 

170 if logger: 

171 logger.debug(msg) 

172 # No fallback print for this level 

173 

174 

175def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn": 

176 global _DEFAULT_LOGGER 

177 logger = _DEFAULT_LOGGER 

178 if logger: 

179 logger.error(msg) 

180 else: 

181 me = os.path.basename(sys.argv[0]) if prog is None else prog 

182 print( 

183 f"{me}: error: {msg}", 

184 file=sys.stderr, 

185 ) 

186 sys.exit(1) 

187 

188 

189def _warn(msg: str, *, prog: Optional[str] = None) -> None: 

190 global _DEFAULT_LOGGER 

191 logger = _DEFAULT_LOGGER 

192 if logger: 

193 logger.warning(msg) 

194 else: 

195 me = os.path.basename(sys.argv[0]) if prog is None else prog 

196 

197 print( 

198 f"{me}: warning: {msg}", 

199 file=sys.stderr, 

200 ) 

201 

202 

203class ColorizedArgumentParser(argparse.ArgumentParser): 

204 def error(self, message: str) -> NoReturn: 

205 self.print_usage(sys.stderr) 

206 _error(message, prog=self.prog) 

207 

208 

209def ensure_dir(path: str) -> None: 

210 if not os.path.isdir(path): 210 ↛ exitline 210 didn't return from function 'ensure_dir' because the condition on line 210 was always true

211 os.makedirs(path, mode=0o755, exist_ok=True) 

212 

213 

214def _clean_path(orig_p: str) -> str: 

215 p = SLASH_PRUNE.sub("/", orig_p) 

216 if "." in p: 216 ↛ 229line 216 didn't jump to line 229 because the condition on line 216 was always true

217 path_base = p 

218 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

219 # of a normalized path to be a no-op. 

220 if path_base.startswith("./"): 

221 path_base = path_base[2:] 

222 assert path_base 

223 for segment in path_base.split("/"): 

224 if segment in (".", ".."): 

225 raise ValueError( 

226 'Please provide paths that are normalized (i.e., no ".." or ".").' 

227 f' Offending input "{orig_p}"' 

228 ) 

229 return p 

230 

231 

232def _normalize_path(path: str, with_prefix: bool = True) -> str: 

233 path = path.strip("/") 

234 if not path or path == ".": 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 return "." 

236 if "//" in path or "." in path: 

237 path = _clean_path(path) 

238 if with_prefix ^ path.startswith("./"): 

239 if with_prefix: 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true

240 path = "./" + path 

241 else: 

242 path = path[2:] 

243 return path 

244 

245 

246def _normalize_link_target(link_target: str) -> str: 

247 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

248 result: List[str] = [] 

249 for segment in link_target.split("/"): 

250 if segment in (".", ""): 

251 # Ignore these - the empty string is generally a trailing slash 

252 continue 

253 if segment == "..": 

254 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

255 if result: 255 ↛ 249line 255 didn't jump to line 249 because the condition on line 255 was always true

256 result.pop() 

257 else: 

258 result.append(segment) 

259 return "/".join(result) 

260 

261 

262def manifest_format_doc(anchor: str) -> str: 

263 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md" 

264 return f"{manifest_format}#{anchor}" if anchor else manifest_format 

265 

266 

267def _backslash_escape(m: re.Match[str]) -> str: 

268 return "\\" + m.group(0) 

269 

270 

271def _escape_shell_word(w: str) -> str: 

272 if _SPACE_RE.search(w): 

273 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None: 

274 s = m.span(0) 

275 assert s[0] == 0 

276 prefix = w[0 : s[1]] 

277 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :]) 

278 return f'{prefix}"{escaped_value}"' 

279 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

280 return f'"{w}"' 

281 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

282 

283 

284def escape_shell(*args: str) -> str: 

285 return " ".join(_escape_shell_word(w) for w in args) 

286 

287 

288def render_command( 

289 *args: str, 

290 cwd: Optional[str] = None, 

291 env_mod: Optional["EnvironmentModification"] = None, 

292) -> str: 

293 env_mod_prefix = "" 

294 if env_mod: 

295 env_mod_parts = [] 

296 if bool(env_mod.removals): 

297 env_mod_parts.append("env") 

298 if cwd is not None: 

299 env_mod_parts.append(f"--chdir={escape_shell(cwd)}") 

300 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals) 

301 env_mod_parts.extend( 

302 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements 

303 ) 

304 

305 chdir_prefix = "" 

306 if cwd is not None and cwd != ".": 

307 chdir_prefix = f"cd {escape_shell(cwd)} && " 

308 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}" 

309 

310 

311def print_command( 

312 *args: str, 

313 cwd: Optional[str] = None, 

314 env_mod: Optional["EnvironmentModification"] = None, 

315 print_at_log_level: int = PRINT_COMMAND, 

316) -> None: 

317 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true

318 return 

319 

320 rendered_cmd = render_command( 

321 *args, 

322 cwd=cwd, 

323 env_mod=env_mod, 

324 ) 

325 global _STDOUT_HANDLER 

326 handler = _STDOUT_HANDLER 

327 if handler is not None: 

328 handler.flush() 

329 # Ensure command is output immediately so it is hanging after its output. 

330 # TODO: This should `file` in case something in debputy redirects stdout 

331 # (nothing does that for now) 

332 print(f" {rendered_cmd}") 

333 sys.stdout.flush() 

334 

335 

336def run_command( 

337 *args: str, 

338 cwd: Optional[str] = None, 

339 env: Optional[Mapping[str, str]] = None, 

340 env_mod: Optional["EnvironmentModification"] = None, 

341 print_at_log_level: int = PRINT_COMMAND, 

342 raise_file_not_found_on_missing_command: bool = False, 

343) -> None: 

344 print_command( 

345 *args, 

346 cwd=cwd, 

347 env_mod=env_mod, 

348 print_at_log_level=print_at_log_level, 

349 ) 

350 if env_mod: 

351 if env is None: 

352 env = os.environ 

353 env = env_mod.compute_env(env) 

354 if env is os.environ: 

355 env = None 

356 try: 

357 subprocess.check_call(args, cwd=cwd, env=env) 

358 # At least "clean_logic.py" relies on catching FileNotFoundError 

359 except KeyboardInterrupt: 

360 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}") 

361 except FileNotFoundError: 

362 if raise_file_not_found_on_missing_command: 

363 raise 

364 if "/" in args[0]: 

365 _error(f"Could not run {escape_shell(args[0])}: Path does not exist") 

366 # Use the `command not found` to aid existing log pattern 

367 _error(f"{escape_shell(args[0])}: command not found") 

368 except subprocess.CalledProcessError as e: 

369 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}") 

370 

371 

372def run_build_system_command( 

373 *args: str, 

374 cwd: Optional[str] = None, 

375 env: Optional[Mapping[str, str]] = None, 

376 env_mod: Optional["EnvironmentModification"] = None, 

377 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND, 

378 raise_file_not_found_on_missing_command: bool = False, 

379) -> None: 

380 run_command( 

381 *args, 

382 cwd=cwd, 

383 env=env, 

384 env_mod=env_mod, 

385 print_at_log_level=print_at_log_level, 

386 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command, 

387 ) 

388 

389 

390def debian_policy_normalize_symlink_target( 

391 link_path: str, 

392 link_target: str, 

393 normalize_link_path: bool = False, 

394) -> str: 

395 if normalize_link_path: 

396 link_path = _normalize_path(link_path) 

397 elif not link_path.startswith("./"): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 raise ValueError("Link part was not normalized") 

399 

400 link_path = link_path[2:] 

401 

402 if not link_target.startswith("/"): 

403 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

404 

405 link_path_parts = link_path.split("/") 

406 link_target_parts = [ 

407 s for s in _normalize_link_target(link_target).split("/") if s != "." 

408 ] 

409 

410 assert link_path_parts 

411 

412 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

413 # Per Debian Policy, must be relative 

414 

415 # First determine the length of the overlap 

416 common_segment_count = 1 

417 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

418 while ( 

419 common_segment_count < shortest_path_length 

420 and link_target_parts[common_segment_count] 

421 == link_path_parts[common_segment_count] 

422 ): 

423 common_segment_count += 1 

424 

425 if common_segment_count == shortest_path_length and len( 

426 link_path_parts 

427 ) - 1 == len(link_target_parts): 

428 normalized_link_target = "." 

429 else: 

430 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

431 normalized_link_target_parts = [] 

432 if up_dir_count: 

433 up_dir_part = "../" * up_dir_count 

434 # We overshoot with a single '/', so rstrip it away 

435 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

436 # Add the relevant down parts 

437 normalized_link_target_parts.extend( 

438 link_target_parts[common_segment_count:] 

439 ) 

440 

441 normalized_link_target = "/".join(normalized_link_target_parts) 

442 else: 

443 # Per Debian Policy, must be absolute 

444 normalized_link_target = "/" + "/".join(link_target_parts) 

445 

446 return normalized_link_target 

447 

448 

449def has_glob_magic(pattern: str) -> bool: 

450 return glob.has_magic(pattern) or "{" in pattern 

451 

452 

453def glob_escape(replacement_value: str) -> str: 

454 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

455 return replacement_value 

456 return ( 

457 replacement_value.replace("[", "[[]") 

458 .replace("]", "[]]") 

459 .replace("*", "[*]") 

460 .replace("?", "[?]") 

461 .replace("{", "[{]") 

462 .replace("}", "[}]") 

463 ) 

464 

465 

466# TODO: This logic should probably be moved to `python-debian` 

467def active_profiles_match( 

468 profiles_raw: str, 

469 active_build_profiles: Union[Set[str], FrozenSet[str]], 

470) -> bool: 

471 profiles_raw = profiles_raw.strip() 

472 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 raise ValueError( 

474 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

475 ) 

476 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

477 for profile_group_raw in profile_groups: 477 ↛ 493line 477 didn't jump to line 493 because the loop on line 477 didn't complete

478 should_process_package = True 

479 for profile_name in profile_group_raw.split(): 

480 negation = False 

481 if profile_name[0] == "!": 481 ↛ 485line 481 didn't jump to line 485 because the condition on line 481 was always true

482 negation = True 

483 profile_name = profile_name[1:] 

484 

485 matched_profile = profile_name in active_build_profiles 

486 if matched_profile == negation: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 should_process_package = False 

488 break 

489 

490 if should_process_package: 490 ↛ 477line 490 didn't jump to line 477 because the condition on line 490 was always true

491 return True 

492 

493 return False 

494 

495 

496def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]: 

497 profiles_raw = build_profiles_raw.strip() 

498 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise ValueError( 

500 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

501 ) 

502 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

503 return frozenset(frozenset(g.split()) for g in profile_groups) 

504 

505 

506def resolve_source_date_epoch( 

507 command_line_value: Optional[int], 

508 *, 

509 substitution: Optional["Substitution"] = None, 

510) -> int: 

511 mtime = command_line_value 

512 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

513 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

514 if sde_raw == "": 

515 _error("SOURCE_DATE_EPOCH is set but empty.") 

516 mtime = int(sde_raw) 

517 if mtime is None and substitution is not None: 

518 try: 

519 sde_raw = substitution.substitute( 

520 "{{SOURCE_DATE_EPOCH}}", 

521 "Internal resolution", 

522 ) 

523 mtime = int(sde_raw) 

524 except (DebputySubstitutionError, ValueError): 

525 pass 

526 if mtime is None: 

527 mtime = int(time.time()) 

528 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

529 return mtime 

530 

531 

532def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

533 with open(os.path.join(control_root_dir, "control"), "rt") as fd: 

534 control_file = Deb822(fd) 

535 

536 package_name = control_file["Package"] 

537 package_version = control_file["Version"] 

538 package_architecture = control_file["Architecture"] 

539 extension = control_file.get("Package-Type") or "deb" 

540 if ":" in package_version: 

541 package_version = package_version.split(":", 1)[1] 

542 if is_udeb: 

543 extension = "udeb" 

544 

545 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

546 

547 

548_SCRATCH_DIR = None 

549_DH_INTEGRATION_MODE = False 

550 

551 

552def integrated_with_debhelper() -> None: 

553 global _DH_INTEGRATION_MODE 

554 _DH_INTEGRATION_MODE = True 

555 

556 

557def scratch_dir(*, create_if_not_exists: bool = True) -> str: 

558 global _SCRATCH_DIR 

559 if _SCRATCH_DIR is not None: 

560 return _SCRATCH_DIR 

561 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

562 is_debputy_dir = True 

563 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 _SCRATCH_DIR = debputy_scratch_dir 

565 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

567 is_debputy_dir = False 

568 else: 

569 _SCRATCH_DIR = debputy_scratch_dir 

570 if create_if_not_exists: 570 ↛ 574line 570 didn't jump to line 574 because the condition on line 570 was always true

571 ensure_dir(_SCRATCH_DIR) 

572 if is_debputy_dir: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was always true

573 generated_root_directory("debian/.debputy", internal_only=True) 

574 return _SCRATCH_DIR 

575 

576 

577def generated_root_directory(path: str, *, internal_only: bool = False) -> None: 

578 root_dir = Path(path) 

579 (root_dir / ".gitignore").write_text("*\n") 

580 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule 

581 if internal_only: 581 ↛ exitline 581 didn't return from function 'generated_root_directory' because the condition on line 581 was always true

582 (root_dir / "CACHEDIR.TAG").write_bytes( 

583 b"Signature: 8a477f597d28d172789f06886806bc55" 

584 ) 

585 

586 

587_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None 

588 

589 

590def generated_content_dir( 

591 *, 

592 package: Optional["BinaryPackage"] = None, 

593 subdir_key: Optional[str] = None, 

594) -> str: 

595 global _RUNTIME_CONTAINER_DIR_KEY 

596 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

597 first_run = False 

598 

599 if container_dir is None: 

600 first_run = True 

601 container_dir = f"_pb-{os.getpid()}" 

602 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

603 

604 directory = os.path.join(scratch_dir(), container_dir) 

605 

606 if first_run and os.path.isdir(directory): 606 ↛ 611line 606 didn't jump to line 611 because the condition on line 606 was never true

607 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

608 # see "stale" data. 

609 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

610 # reliable enough for that and we do not have an obvious hook for it. 

611 shutil.rmtree(directory) 

612 

613 directory = os.path.join( 

614 directory, 

615 "generated-fs-content", 

616 f"pkg_{package.name}" if package else "no-package", 

617 ) 

618 if subdir_key is not None: 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true

619 directory = os.path.join(directory, subdir_key) 

620 

621 os.makedirs(directory, exist_ok=True) 

622 return directory 

623 

624 

625PerlConfigVars = collections.namedtuple( 

626 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"] 

627) 

628PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

629_PERL_MODULE_DIRS: Dict[str, PerlConfigVars] = {} 

630 

631 

632@functools.lru_cache(1) 

633def _perl_config_data() -> PerlConfigData: 

634 d = ( 

635 subprocess.check_output( 

636 [ 

637 "perl", 

638 "-MConfig", 

639 "-e", 

640 'print "$Config{version}\n$Config{debian_abi}\n"', 

641 ] 

642 ) 

643 .decode("utf-8") 

644 .splitlines() 

645 ) 

646 return PerlConfigData(*d) 

647 

648 

649def _perl_version() -> str: 

650 return _perl_config_data().version 

651 

652 

653def perlxs_api_dependency() -> str: 

654 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

655 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

656 config = _perl_config_data() 

657 if config.debian_abi is not None and config.debian_abi != "": 

658 return f"perlapi-{config.debian_abi}" 

659 return f"perlapi-{config.version}" 

660 

661 

662def resolve_perl_config( 

663 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

664 dctrl_bin: Optional["BinaryPackage"], 

665) -> PerlConfigVars: 

666 global _PERL_MODULE_DIRS 

667 if dpkg_architecture_variables.is_cross_compiling: 667 ↛ 668line 667 didn't jump to line 668 because the condition on line 667 was never true

668 if dctrl_bin is not None: 

669 arch = dctrl_bin.resolved_architecture 

670 ma = dctrl_bin.deb_multiarch 

671 else: 

672 arch = dpkg_architecture_variables.current_host_arch 

673 ma = dpkg_architecture_variables.current_host_multiarch 

674 else: 

675 # We are de facto using the build-arch version of perl here; be explicit 

676 arch = "_build_arch_" 

677 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"] 

678 config_vars = _PERL_MODULE_DIRS.get(arch) 

679 if config_vars is None: 

680 cmd = ["perl"] 

681 if dpkg_architecture_variables.is_cross_compiling: 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true

682 version = _perl_version() 

683 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}" 

684 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

685 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

686 # cross builds... meh. 

687 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")): 

688 cmd.append(f"-I{cross_inc_dir}") 

689 else: 

690 cross_inc_dir = None 

691 cmd.extend( 

692 [ 

693 "-MConfig", 

694 "-e", 

695 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"', 

696 ] 

697 ) 

698 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

699 if len(output) != 4: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 raise ValueError( 

701 "Internal error: Unable to determine the perl include directories:" 

702 f" Raw output from perl snippet: {output}" 

703 ) 

704 config_vars = PerlConfigVars( 

705 vendorlib="/" + _normalize_path(output[0], with_prefix=False), 

706 vendorarch="/" + _normalize_path(output[1], with_prefix=False), 

707 cross_inc_dir=cross_inc_dir, 

708 ld=output[2], 

709 path_sep=output[3], 

710 ) 

711 _PERL_MODULE_DIRS[arch] = config_vars 

712 return config_vars 

713 

714 

715@functools.lru_cache(1) 

716def detect_fakeroot() -> bool: 

717 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

718 return False 

719 env = dict(os.environ) 

720 del env["LD_PRELOAD"] 

721 try: 

722 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

723 except subprocess.CalledProcessError: 

724 print( 

725 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

726 file=sys.stderr, 

727 ) 

728 return False 

729 

730 

731@functools.lru_cache(1) 

732def _sc_arg_max() -> Optional[int]: 

733 try: 

734 return os.sysconf("SC_ARG_MAX") 

735 except RuntimeError: 

736 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

737 return None 

738 

739 

740def _split_xargs_args( 

741 static_cmd: Sequence[str], 

742 max_args_byte_len: int, 

743 varargs: Iterable[str], 

744 reuse_list_ok: bool, 

745) -> Iterator[List[str]]: 

746 static_cmd_len = len(static_cmd) 

747 remaining_len = max_args_byte_len 

748 pending_args = list(static_cmd) 

749 for arg in varargs: 

750 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

751 remaining_len -= arg_len 

752 if not remaining_len: 

753 if len(pending_args) <= static_cmd_len: 

754 raise ValueError( 

755 f"Could not fit a single argument into the command line !?" 

756 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

757 ) 

758 yield pending_args 

759 remaining_len = max_args_byte_len - arg_len 

760 if reuse_list_ok: 

761 pending_args.clear() 

762 pending_args.extend(static_cmd) 

763 else: 

764 pending_args = list(static_cmd) 

765 pending_args.append(arg) 

766 

767 if len(pending_args) > static_cmd_len: 

768 yield pending_args 

769 

770 

771def xargs( 

772 static_cmd: Sequence[str], 

773 varargs: Iterable[str], 

774 *, 

775 env: Optional[Mapping[str, str]] = None, 

776 reuse_list_ok: bool = False, 

777) -> Iterator[List[str]]: 

778 max_args_bytes = _sc_arg_max() 

779 # len overshoots with one space explaining the -1. The _split_xargs_args 

780 # will account for the space for the first argument 

781 static_byte_len = ( 

782 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

783 ) 

784 if max_args_bytes is not None: 

785 if env is None: 

786 # +2 for nul bytes after key and value 

787 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

788 else: 

789 # +2 for nul bytes after key and value 

790 static_byte_len += sum( 

791 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

792 for k, v in env.items() 

793 ) 

794 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

795 # that) 

796 static_byte_len += 2 * 4096 

797 else: 

798 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

799 max_args_bytes = 20_000 

800 remain_len = max_args_bytes - static_byte_len 

801 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

802 

803 

804# itertools recipe 

805def grouper( 

806 iterable: Iterable[T], 

807 n: int, 

808 *, 

809 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

810 fillvalue: Optional[T] = None, 

811) -> Iterator[Tuple[T, ...]]: 

812 """Collect data into non-overlapping fixed-length chunks or blocks""" 

813 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

814 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

815 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

816 args = [iter(iterable)] * n 

817 if incomplete == "fill": 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true

818 return zip_longest(*args, fillvalue=fillvalue) 

819 if incomplete == "strict": 819 ↛ 821line 819 didn't jump to line 821 because the condition on line 819 was always true

820 return zip(*args, strict=True) 

821 if incomplete == "ignore": 

822 return zip(*args) 

823 else: 

824 raise ValueError("Expected fill, strict, or ignore") 

825 

826 

827_LOGGING_SET_UP = False 

828 

829 

830def _check_color() -> Tuple[bool, bool, Optional[str]]: 

831 dpkg_or_default = os.environ.get( 

832 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

833 ) 

834 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

835 bad_request = None 

836 if requested_color not in {"auto", "always", "never"}: 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true

837 bad_request = requested_color 

838 requested_color = "auto" 

839 

840 if requested_color == "auto": 840 ↛ 844line 840 didn't jump to line 844 because the condition on line 840 was always true

841 stdout_color = sys.stdout.isatty() 

842 stderr_color = sys.stdout.isatty() 

843 else: 

844 enable = requested_color == "always" 

845 stdout_color = enable 

846 stderr_color = enable 

847 return stdout_color, stderr_color, bad_request 

848 

849 

850def program_name() -> str: 

851 name = os.path.basename(sys.argv[0]) 

852 if name.endswith(".py"): 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 name = name[:-3] 

854 if name == "__main__": 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 name = os.path.basename(os.path.dirname(sys.argv[0])) 

856 # FIXME: Not optimal that we have to hardcode these kind of things here 

857 if name == "debputy_cmd": 857 ↛ 858line 857 didn't jump to line 858 because the condition on line 857 was never true

858 name = "debputy" 

859 return name 

860 

861 

862def package_cross_check_precheck( 

863 pkg_a: "BinaryPackage", 

864 pkg_b: "BinaryPackage", 

865) -> Tuple[bool, bool]: 

866 """Whether these two packages can do content cross-checks 

867 

868 :param pkg_a: The first package 

869 :param pkg_b: The second package 

870 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

871 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

872 that involves binary_package_a. Both can be True and both can be False at the same time, which 

873 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

874 """ 

875 

876 # Handle the two most obvious base-cases 

877 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

878 return False, False 

879 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

880 return False, False 

881 

882 a_may_see_b = True 

883 b_may_see_a = True 

884 

885 a_bp = pkg_a.fields.get("Build-Profiles", "") 

886 b_bp = pkg_b.fields.get("Build-Profiles", "") 

887 

888 if a_bp != b_bp: 

889 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

890 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

891 

892 # Check for build profiles being identically but just ordered differently. 

893 if a_bp_set != b_bp_set: 

894 # For simplicity, we let groups cancel each other out. If one side has no clauses 

895 # left, then it will always be built when the other is built. 

896 # 

897 # Eventually, someone will be here with a special case where more complex logic is 

898 # required. Good luck to you! Remember to add test cases for it (the existing logic 

899 # has some for a reason and if the logic is going to be more complex, it will need 

900 # tests cases to assert it fixes the problem and does not regress) 

901 if a_bp_set - b_bp_set: 

902 a_may_see_b = False 

903 if b_bp_set - a_bp_set: 

904 b_may_see_a = False 

905 

906 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

907 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

908 if pkg_a.declared_architecture != "any": 908 ↛ 910line 908 didn't jump to line 910 because the condition on line 908 was always true

909 b_may_see_a = False 

910 if pkg_a.declared_architecture != "any": 910 ↛ 913line 910 didn't jump to line 913 because the condition on line 910 was always true

911 a_may_see_b = False 

912 

913 return a_may_see_b, b_may_see_a 

914 

915 

916def change_log_level( 

917 log_level: int, 

918) -> None: 

919 if _DEFAULT_LOGGER is not None: 

920 _DEFAULT_LOGGER.setLevel(log_level) 

921 logging.getLogger("").setLevel(log_level) 

922 

923 

924def current_log_level() -> Optional[int]: 

925 if _DEFAULT_LOGGER is not None: 

926 return _DEFAULT_LOGGER.level 

927 return None 

928 

929 

930def setup_logging( 

931 *, 

932 log_only_to_stderr: bool = False, 

933 reconfigure_logging: bool = False, 

934) -> None: 

935 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

936 if _LOGGING_SET_UP and not reconfigure_logging: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true

937 raise RuntimeError( 

938 "Logging has already been configured." 

939 " Use reconfigure_logging=True if you need to reconfigure it" 

940 ) 

941 stdout_color, stderr_color, bad_request = _check_color() 

942 colors: Optional[Dict[str, str]] = None 

943 

944 if stdout_color or stderr_color: 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true

945 try: 

946 import colorlog 

947 

948 except ImportError: 

949 stdout_color = False 

950 stderr_color = False 

951 else: 

952 colors = dict(colorlog.default_log_colors) 

953 # Add our custom levels. 

954 colors["_INFO"] = colors["INFO"] 

955 colors["__INFO"] = colors["INFO"] 

956 

957 if log_only_to_stderr: 

958 stdout = sys.stderr 

959 stdout_color = stderr_color 

960 else: 

961 stdout = sys.stderr 

962 

963 class LogLevelFilter(logging.Filter): 

964 def __init__(self, threshold: int, above: bool): 

965 super().__init__() 

966 self.threshold = threshold 

967 self.above = above 

968 

969 def filter(self, record: logging.LogRecord) -> bool: 

970 if self.above: 

971 return record.levelno >= self.threshold 

972 else: 

973 return record.levelno < self.threshold 

974 

975 color_format = ( 

976 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

977 ) 

978 colorless_format = "{name}: {levelnamelower}: {message}" 

979 

980 existing_stdout_handler = _STDOUT_HANDLER 

981 existing_stderr_handler = _STDERR_HANDLER 

982 

983 if stdout_color: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true

984 stdout_handler = colorlog.StreamHandler(stdout) 

985 stdout_handler.setFormatter( 

986 colorlog.ColoredFormatter( 

987 color_format, 

988 style="{", 

989 force_color=True, 

990 log_colors=colors, 

991 ) 

992 ) 

993 logger = colorlog.getLogger() 

994 if existing_stdout_handler is not None: 

995 logger.removeHandler(existing_stdout_handler) 

996 _STDOUT_HANDLER = stdout_handler 

997 logger.addHandler(stdout_handler) 

998 else: 

999 stdout_handler = logging.StreamHandler(stdout) 

1000 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1001 logger = logging.getLogger() 

1002 if existing_stdout_handler is not None: 

1003 logger.removeHandler(existing_stdout_handler) 

1004 _STDOUT_HANDLER = stdout_handler 

1005 logger.addHandler(stdout_handler) 

1006 

1007 if stderr_color: 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true

1008 stderr_handler = colorlog.StreamHandler(sys.stderr) 

1009 stderr_handler.setFormatter( 

1010 colorlog.ColoredFormatter( 

1011 color_format, 

1012 style="{", 

1013 force_color=True, 

1014 log_colors=colors, 

1015 ) 

1016 ) 

1017 logger = logging.getLogger() 

1018 if existing_stderr_handler is not None: 

1019 logger.removeHandler(existing_stderr_handler) 

1020 _STDERR_HANDLER = stderr_handler 

1021 logger.addHandler(stderr_handler) 

1022 else: 

1023 stderr_handler = logging.StreamHandler(sys.stderr) 

1024 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1025 logger = logging.getLogger() 

1026 if existing_stderr_handler is not None: 

1027 logger.removeHandler(existing_stderr_handler) 

1028 _STDERR_HANDLER = stderr_handler 

1029 logger.addHandler(stderr_handler) 

1030 

1031 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

1032 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

1033 

1034 name = program_name() 

1035 

1036 old_factory = logging.getLogRecordFactory() 

1037 

1038 def record_factory( 

1039 *args: Any, **kwargs: Any 

1040 ) -> logging.LogRecord: # pragma: no cover 

1041 record = old_factory(*args, **kwargs) 

1042 record.levelname = record.levelname.lstrip("_") 

1043 record.levelnamelower = record.levelname.lower() 

1044 return record 

1045 

1046 logging.setLogRecordFactory(record_factory) 

1047 

1048 logging.getLogger().setLevel(logging.WARN) 

1049 _DEFAULT_LOGGER = logging.getLogger(name) 

1050 

1051 if bad_request: 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 _DEFAULT_LOGGER.warning( 

1053 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

1054 ' Resetting to "auto".' 

1055 ) 

1056 

1057 _LOGGING_SET_UP = True