Coverage for src/debputy/util.py: 62%

544 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import argparse 

2import collections 

3import functools 

4import glob 

5import logging 

6import os 

7import re 

8import shutil 

9import subprocess 

10import sys 

11import time 

12from itertools import zip_longest 

13from pathlib import Path 

14from typing import ( 

15 NoReturn, 

16 TYPE_CHECKING, 

17 Union, 

18 Set, 

19 FrozenSet, 

20 Optional, 

21 TypeVar, 

22 Dict, 

23 Iterator, 

24 Iterable, 

25 Literal, 

26 Tuple, 

27 Sequence, 

28 List, 

29 Mapping, 

30 Any, 

31) 

32 

33from debian.deb822 import Deb822 

34 

35from debputy import DEBPUTY_DOC_ROOT_DIR 

36from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

37from debputy.exceptions import DebputySubstitutionError 

38 

39try: 

40 from Levenshtein import distance 

41except ImportError: 

42 

43 CAN_DETECT_TYPOS = False 

44 

45 def detect_possible_typo( 

46 provided_value: str, 

47 known_values: Iterable[str], 

48 *, 

49 max_edit_distance: int = 2, 

50 ) -> Sequence[str]: 

51 return () 

52 

53else: 

54 

55 CAN_DETECT_TYPOS = True 

56 

57 def detect_possible_typo( 

58 provided_value: str, 

59 known_values: Iterable[str], 

60 *, 

61 max_edit_distance: int = 2, 

62 ) -> Sequence[str]: 

63 k_len = len(provided_value) 

64 candidates = [] 

65 for known_value in known_values: 

66 if abs(k_len - len(known_value)) > max_edit_distance: 

67 continue 

68 d = distance(provided_value, known_value) 

69 if d > max_edit_distance: 

70 continue 

71 candidates.append(known_value) 

72 return candidates 

73 

74 

75if TYPE_CHECKING: 

76 from debputy.types import EnvironmentModification 

77 from debputy.packages import BinaryPackage 

78 from debputy.substitution import Substitution 

79 

80 

81T = TypeVar("T") 

82 

83 

84SLASH_PRUNE = re.compile("//+") 

85PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII) 

86PKGVERSION_REGEX = re.compile( 

87 r""" 

88 (?: \d+ : )? # Optional epoch 

89 \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens) 

90 (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens) 

91""", 

92 re.VERBOSE | re.ASCII, 

93) 

94DEFAULT_PACKAGE_TYPE = "deb" 

95DBGSYM_PACKAGE_TYPE = "deb" 

96UDEB_PACKAGE_TYPE = "udeb" 

97 

98POSTINST_DEFAULT_CONDITION = ( 

99 '[ "$1" = "configure" ]' 

100 ' || [ "$1" = "abort-upgrade" ]' 

101 ' || [ "$1" = "abort-deconfigure" ]' 

102 ' || [ "$1" = "abort-remove" ]' 

103) 

104 

105 

106_SPACE_RE = re.compile(r"\s") 

107_WORD_EQUAL = re.compile(r"^-*[\w_\-]+=") 

108_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])') 

109_REGULAR_ESCAPEES = re.compile(r"""([\s!"$()*+#;<>?@'\[\]\\`|~])""") 

110_PROFILE_GROUP_SPLIT = re.compile(r">\s+<") 

111_DEFAULT_LOGGER: Optional[logging.Logger] = None 

112_STDOUT_HANDLER: Optional[logging.StreamHandler[Any]] = None 

113_STDERR_HANDLER: Optional[logging.StreamHandler[Any]] = None 

114PRINT_COMMAND = logging.INFO + 3 

115PRINT_BUILD_SYSTEM_COMMAND = PRINT_COMMAND + 3 

116TRACE_LOG = logging.DEBUG + 3 

117 

118# Map them back to `INFO`. The names must be unique so the prefix is stripped. 

119logging.addLevelName(PRINT_COMMAND, "__INFO") 

120logging.addLevelName(PRINT_BUILD_SYSTEM_COMMAND, "_INFO") 

121logging.addLevelName(TRACE_LOG, "TRACE") 

122 

123 

124def assume_not_none(x: Optional[T]) -> T: 

125 if x is None: # pragma: no cover 

126 raise ValueError( 

127 'Internal error: None was given, but the receiver assumed "not None" here' 

128 ) 

129 return x 

130 

131 

132def _non_verbose_info(msg: str) -> None: 

133 global _DEFAULT_LOGGER 

134 logger = _DEFAULT_LOGGER 

135 if logger is not None: 

136 logger.log(PRINT_BUILD_SYSTEM_COMMAND, msg) 

137 

138 

139def _info(msg: str) -> None: 

140 global _DEFAULT_LOGGER 

141 logger = _DEFAULT_LOGGER 

142 if logger: 

143 logger.info(msg) 

144 # No fallback print for info 

145 

146 

147def _is_trace_log_enabled() -> bool: 

148 global _DEFAULT_LOGGER 

149 logger = _DEFAULT_LOGGER 

150 return logger is not None and logger.isEnabledFor(TRACE_LOG) 

151 

152 

153def _trace_log(msg: str) -> None: 

154 global _DEFAULT_LOGGER 

155 logger = _DEFAULT_LOGGER 

156 if logger: 

157 logger.log(TRACE_LOG, msg) 

158 # No fallback print for this level 

159 

160 

161def _is_debug_log_enabled() -> bool: 

162 global _DEFAULT_LOGGER 

163 logger = _DEFAULT_LOGGER 

164 return logger is not None and logger.isEnabledFor(logging.DEBUG) 

165 

166 

167def _debug_log(msg: str) -> None: 

168 global _DEFAULT_LOGGER 

169 logger = _DEFAULT_LOGGER 

170 if logger: 

171 logger.debug(msg) 

172 # No fallback print for this level 

173 

174 

175def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn": 

176 global _DEFAULT_LOGGER 

177 logger = _DEFAULT_LOGGER 

178 if logger: 

179 logger.error(msg) 

180 else: 

181 me = os.path.basename(sys.argv[0]) if prog is None else prog 

182 print( 

183 f"{me}: error: {msg}", 

184 file=sys.stderr, 

185 ) 

186 sys.exit(1) 

187 

188 

189def _warn(msg: str, *, prog: Optional[str] = None) -> None: 

190 global _DEFAULT_LOGGER 

191 logger = _DEFAULT_LOGGER 

192 if logger: 

193 logger.warning(msg) 

194 else: 

195 me = os.path.basename(sys.argv[0]) if prog is None else prog 

196 

197 print( 

198 f"{me}: warning: {msg}", 

199 file=sys.stderr, 

200 ) 

201 

202 

203class ColorizedArgumentParser(argparse.ArgumentParser): 

204 def error(self, message: str) -> NoReturn: 

205 self.print_usage(sys.stderr) 

206 _error(message, prog=self.prog) 

207 

208 

209def ensure_dir(path: str) -> None: 

210 if not os.path.isdir(path): 210 ↛ exitline 210 didn't return from function 'ensure_dir' because the condition on line 210 was always true

211 os.makedirs(path, mode=0o755, exist_ok=True) 

212 

213 

214def _clean_path(orig_p: str) -> str: 

215 p = SLASH_PRUNE.sub("/", orig_p) 

216 if "." in p: 216 ↛ 229line 216 didn't jump to line 229 because the condition on line 216 was always true

217 path_base = p 

218 # We permit a single leading "./" because we add that when we normalize a path, and we want normalization 

219 # of a normalized path to be a no-op. 

220 if path_base.startswith("./"): 

221 path_base = path_base[2:] 

222 assert path_base 

223 for segment in path_base.split("/"): 

224 if segment in (".", ".."): 

225 raise ValueError( 

226 'Please provide paths that are normalized (i.e., no ".." or ".").' 

227 f' Offending input "{orig_p}"' 

228 ) 

229 return p 

230 

231 

232def _normalize_path(path: str, with_prefix: bool = True) -> str: 

233 path = path.strip("/") 

234 if not path or path == ".": 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 return "." 

236 if "//" in path or "." in path: 

237 path = _clean_path(path) 

238 if with_prefix ^ path.startswith("./"): 

239 if with_prefix: 239 ↛ 242line 239 didn't jump to line 242 because the condition on line 239 was always true

240 path = "./" + path 

241 else: 

242 path = path[2:] 

243 return path 

244 

245 

246def _normalize_link_target(link_target: str) -> str: 

247 link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/")) 

248 result: List[str] = [] 

249 for segment in link_target.split("/"): 

250 if segment in (".", ""): 

251 # Ignore these - the empty string is generally a trailing slash 

252 continue 

253 if segment == "..": 

254 # We ignore "root escape attempts" like the OS would (mapping /.. -> /) 

255 if result: 255 ↛ 249line 255 didn't jump to line 249 because the condition on line 255 was always true

256 result.pop() 

257 else: 

258 result.append(segment) 

259 return "/".join(result) 

260 

261 

262def manifest_format_doc(anchor: str) -> str: 

263 manifest_format = f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md" 

264 return f"{manifest_format}#{anchor}" if anchor else manifest_format 

265 

266 

267def _backslash_escape(m: re.Match[str]) -> str: 

268 return "\\" + m.group(0) 

269 

270 

271def _escape_shell_word(w: str) -> str: 

272 if _SPACE_RE.search(w): 

273 if "=" in w and (m := _WORD_EQUAL.search(w)) is not None: 

274 s = m.span(0) 

275 assert s[0] == 0 

276 prefix = w[0 : s[1]] 

277 escaped_value = _DOUBLE_ESCAPEES.sub(_backslash_escape, w[s[1] :]) 

278 return f'{prefix}"{escaped_value}"' 

279 w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w) 

280 return f'"{w}"' 

281 return _REGULAR_ESCAPEES.sub(_backslash_escape, w) 

282 

283 

284def escape_shell(*args: str) -> str: 

285 return " ".join(_escape_shell_word(w) for w in args) 

286 

287 

288def render_command( 

289 *args: str, 

290 cwd: Optional[str] = None, 

291 env_mod: Optional["EnvironmentModification"] = None, 

292) -> str: 

293 env_mod_prefix = "" 

294 if env_mod: 

295 env_mod_parts = [] 

296 if bool(env_mod.removals): 

297 env_mod_parts.append("env") 

298 if cwd is not None: 

299 env_mod_parts.append(f"--chdir={escape_shell(cwd)}") 

300 env_mod_parts.extend(f"--unset={escape_shell(v)}" for v in env_mod.removals) 

301 env_mod_parts.extend( 

302 f"{escape_shell(k)}={escape_shell(v)}" for k, v in env_mod.replacements 

303 ) 

304 

305 chdir_prefix = "" 

306 if cwd is not None and cwd != ".": 

307 chdir_prefix = f"cd {escape_shell(cwd)} && " 

308 return f"{chdir_prefix}{env_mod_prefix}{escape_shell(*args)}" 

309 

310 

311def print_command( 

312 *args: str, 

313 cwd: Optional[str] = None, 

314 env_mod: Optional["EnvironmentModification"] = None, 

315 print_at_log_level: int = PRINT_COMMAND, 

316) -> None: 

317 if _DEFAULT_LOGGER is None or not _DEFAULT_LOGGER.isEnabledFor(print_at_log_level): 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true

318 return 

319 

320 rendered_cmd = render_command( 

321 *args, 

322 cwd=cwd, 

323 env_mod=env_mod, 

324 ) 

325 print(f" {rendered_cmd}") 

326 

327 

328def run_command( 

329 *args: str, 

330 cwd: Optional[str] = None, 

331 env: Optional[Mapping[str, str]] = None, 

332 env_mod: Optional["EnvironmentModification"] = None, 

333 print_at_log_level: int = PRINT_COMMAND, 

334 raise_file_not_found_on_missing_command: bool = False, 

335) -> None: 

336 print_command( 

337 *args, 

338 cwd=cwd, 

339 env_mod=env_mod, 

340 print_at_log_level=print_at_log_level, 

341 ) 

342 if env_mod: 

343 if env is None: 

344 env = os.environ 

345 env = env_mod.compute_env(env) 

346 if env is os.environ: 

347 env = None 

348 try: 

349 subprocess.check_call(args, cwd=cwd, env=env) 

350 # At least "clean_logic.py" relies on catching FileNotFoundError 

351 except KeyboardInterrupt: 

352 _error(f"Interrupted (SIGINT) while running {escape_shell(*args)}") 

353 except FileNotFoundError: 

354 if raise_file_not_found_on_missing_command: 

355 raise 

356 if "/" in args[0]: 

357 _error(f"Could not run {escape_shell(args[0])}: Path does not exist") 

358 # Use the `command not found` to aid existing log pattern 

359 _error(f"{escape_shell(args[0])}: command not found") 

360 except subprocess.CalledProcessError as e: 

361 _error(f"The command {escape_shell(*args)} failed with status: {e.returncode}") 

362 

363 

364def run_build_system_command( 

365 *args: str, 

366 cwd: Optional[str] = None, 

367 env: Optional[Mapping[str, str]] = None, 

368 env_mod: Optional["EnvironmentModification"] = None, 

369 print_at_log_level: int = PRINT_BUILD_SYSTEM_COMMAND, 

370 raise_file_not_found_on_missing_command: bool = False, 

371) -> None: 

372 run_command( 

373 *args, 

374 cwd=cwd, 

375 env=env, 

376 env_mod=env_mod, 

377 print_at_log_level=print_at_log_level, 

378 raise_file_not_found_on_missing_command=raise_file_not_found_on_missing_command, 

379 ) 

380 

381 

382def debian_policy_normalize_symlink_target( 

383 link_path: str, 

384 link_target: str, 

385 normalize_link_path: bool = False, 

386) -> str: 

387 if normalize_link_path: 

388 link_path = _normalize_path(link_path) 

389 elif not link_path.startswith("./"): 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true

390 raise ValueError("Link part was not normalized") 

391 

392 link_path = link_path[2:] 

393 

394 if not link_target.startswith("/"): 

395 link_target = "/" + os.path.dirname(link_path) + "/" + link_target 

396 

397 link_path_parts = link_path.split("/") 

398 link_target_parts = [ 

399 s for s in _normalize_link_target(link_target).split("/") if s != "." 

400 ] 

401 

402 assert link_path_parts 

403 

404 if link_target_parts and link_path_parts[0] == link_target_parts[0]: 

405 # Per Debian Policy, must be relative 

406 

407 # First determine the length of the overlap 

408 common_segment_count = 1 

409 shortest_path_length = min(len(link_target_parts), len(link_path_parts)) 

410 while ( 

411 common_segment_count < shortest_path_length 

412 and link_target_parts[common_segment_count] 

413 == link_path_parts[common_segment_count] 

414 ): 

415 common_segment_count += 1 

416 

417 if common_segment_count == shortest_path_length and len( 

418 link_path_parts 

419 ) - 1 == len(link_target_parts): 

420 normalized_link_target = "." 

421 else: 

422 up_dir_count = len(link_path_parts) - 1 - common_segment_count 

423 normalized_link_target_parts = [] 

424 if up_dir_count: 

425 up_dir_part = "../" * up_dir_count 

426 # We overshoot with a single '/', so rstrip it away 

427 normalized_link_target_parts.append(up_dir_part.rstrip("/")) 

428 # Add the relevant down parts 

429 normalized_link_target_parts.extend( 

430 link_target_parts[common_segment_count:] 

431 ) 

432 

433 normalized_link_target = "/".join(normalized_link_target_parts) 

434 else: 

435 # Per Debian Policy, must be absolute 

436 normalized_link_target = "/" + "/".join(link_target_parts) 

437 

438 return normalized_link_target 

439 

440 

441def has_glob_magic(pattern: str) -> bool: 

442 return glob.has_magic(pattern) or "{" in pattern 

443 

444 

445def glob_escape(replacement_value: str) -> str: 

446 if not glob.has_magic(replacement_value) or "{" not in replacement_value: 

447 return replacement_value 

448 return ( 

449 replacement_value.replace("[", "[[]") 

450 .replace("]", "[]]") 

451 .replace("*", "[*]") 

452 .replace("?", "[?]") 

453 .replace("{", "[{]") 

454 .replace("}", "[}]") 

455 ) 

456 

457 

458# TODO: This logic should probably be moved to `python-debian` 

459def active_profiles_match( 

460 profiles_raw: str, 

461 active_build_profiles: Union[Set[str], FrozenSet[str]], 

462) -> bool: 

463 profiles_raw = profiles_raw.strip() 

464 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true

465 raise ValueError( 

466 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

467 ) 

468 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

469 for profile_group_raw in profile_groups: 469 ↛ 485line 469 didn't jump to line 485 because the loop on line 469 didn't complete

470 should_process_package = True 

471 for profile_name in profile_group_raw.split(): 

472 negation = False 

473 if profile_name[0] == "!": 473 ↛ 477line 473 didn't jump to line 477 because the condition on line 473 was always true

474 negation = True 

475 profile_name = profile_name[1:] 

476 

477 matched_profile = profile_name in active_build_profiles 

478 if matched_profile == negation: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 should_process_package = False 

480 break 

481 

482 if should_process_package: 482 ↛ 469line 482 didn't jump to line 469 because the condition on line 482 was always true

483 return True 

484 

485 return False 

486 

487 

488def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]: 

489 profiles_raw = build_profiles_raw.strip() 

490 if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>": 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 raise ValueError( 

492 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"' 

493 ) 

494 profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1]) 

495 return frozenset(frozenset(g.split()) for g in profile_groups) 

496 

497 

498def resolve_source_date_epoch( 

499 command_line_value: Optional[int], 

500 *, 

501 substitution: Optional["Substitution"] = None, 

502) -> int: 

503 mtime = command_line_value 

504 if mtime is None and "SOURCE_DATE_EPOCH" in os.environ: 

505 sde_raw = os.environ["SOURCE_DATE_EPOCH"] 

506 if sde_raw == "": 

507 _error("SOURCE_DATE_EPOCH is set but empty.") 

508 mtime = int(sde_raw) 

509 if mtime is None and substitution is not None: 

510 try: 

511 sde_raw = substitution.substitute( 

512 "{{SOURCE_DATE_EPOCH}}", 

513 "Internal resolution", 

514 ) 

515 mtime = int(sde_raw) 

516 except (DebputySubstitutionError, ValueError): 

517 pass 

518 if mtime is None: 

519 mtime = int(time.time()) 

520 os.environ["SOURCE_DATE_EPOCH"] = str(mtime) 

521 return mtime 

522 

523 

524def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str: 

525 with open(os.path.join(control_root_dir, "control"), "rt") as fd: 

526 control_file = Deb822(fd) 

527 

528 package_name = control_file["Package"] 

529 package_version = control_file["Version"] 

530 package_architecture = control_file["Architecture"] 

531 extension = control_file.get("Package-Type") or "deb" 

532 if ":" in package_version: 

533 package_version = package_version.split(":", 1)[1] 

534 if is_udeb: 

535 extension = "udeb" 

536 

537 return f"{package_name}_{package_version}_{package_architecture}.{extension}" 

538 

539 

540_SCRATCH_DIR = None 

541_DH_INTEGRATION_MODE = False 

542 

543 

544def integrated_with_debhelper() -> None: 

545 global _DH_INTEGRATION_MODE 

546 _DH_INTEGRATION_MODE = True 

547 

548 

549def scratch_dir(*, create_if_not_exists: bool = True) -> str: 

550 global _SCRATCH_DIR 

551 if _SCRATCH_DIR is not None: 

552 return _SCRATCH_DIR 

553 debputy_scratch_dir = "debian/.debputy/scratch-dir" 

554 is_debputy_dir = True 

555 if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 _SCRATCH_DIR = debputy_scratch_dir 

557 elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir" 

559 is_debputy_dir = False 

560 else: 

561 _SCRATCH_DIR = debputy_scratch_dir 

562 if create_if_not_exists: 562 ↛ 566line 562 didn't jump to line 566 because the condition on line 562 was always true

563 ensure_dir(_SCRATCH_DIR) 

564 if is_debputy_dir: 564 ↛ 566line 564 didn't jump to line 566 because the condition on line 564 was always true

565 generated_root_directory("debian/.debputy", internal_only=True) 

566 return _SCRATCH_DIR 

567 

568 

569def generated_root_directory(path: str, *, internal_only: bool = False) -> None: 

570 root_dir = Path(path) 

571 (root_dir / ".gitignore").write_text("*\n") 

572 # TODO: Should we add a "CACHEDIR.TAG" here? (Requires a relevant ignore rule 

573 if internal_only: 573 ↛ exitline 573 didn't return from function 'generated_root_directory' because the condition on line 573 was always true

574 (root_dir / "CACHEDIR.TAG").write_bytes( 

575 b"Signature: 8a477f597d28d172789f06886806bc55" 

576 ) 

577 

578 

579_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None 

580 

581 

582def generated_content_dir( 

583 *, 

584 package: Optional["BinaryPackage"] = None, 

585 subdir_key: Optional[str] = None, 

586) -> str: 

587 global _RUNTIME_CONTAINER_DIR_KEY 

588 container_dir = _RUNTIME_CONTAINER_DIR_KEY 

589 first_run = False 

590 

591 if container_dir is None: 

592 first_run = True 

593 container_dir = f"_pb-{os.getpid()}" 

594 _RUNTIME_CONTAINER_DIR_KEY = container_dir 

595 

596 directory = os.path.join(scratch_dir(), container_dir) 

597 

598 if first_run and os.path.isdir(directory): 598 ↛ 603line 598 didn't jump to line 603 because the condition on line 598 was never true

599 # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not 

600 # see "stale" data. 

601 # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not 

602 # reliable enough for that and we do not have an obvious hook for it. 

603 shutil.rmtree(directory) 

604 

605 directory = os.path.join( 

606 directory, 

607 "generated-fs-content", 

608 f"pkg_{package.name}" if package else "no-package", 

609 ) 

610 if subdir_key is not None: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true

611 directory = os.path.join(directory, subdir_key) 

612 

613 os.makedirs(directory, exist_ok=True) 

614 return directory 

615 

616 

617PerlConfigVars = collections.namedtuple( 

618 "PerlConfigVars", ["vendorlib", "vendorarch", "cross_inc_dir", "ld", "path_sep"] 

619) 

620PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"]) 

621_PERL_MODULE_DIRS: Dict[str, PerlConfigVars] = {} 

622 

623 

624@functools.lru_cache(1) 

625def _perl_config_data() -> PerlConfigData: 

626 d = ( 

627 subprocess.check_output( 

628 [ 

629 "perl", 

630 "-MConfig", 

631 "-e", 

632 'print "$Config{version}\n$Config{debian_abi}\n"', 

633 ] 

634 ) 

635 .decode("utf-8") 

636 .splitlines() 

637 ) 

638 return PerlConfigData(*d) 

639 

640 

641def _perl_version() -> str: 

642 return _perl_config_data().version 

643 

644 

645def perlxs_api_dependency() -> str: 

646 # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic 

647 # assumes that the major version of build variant of Perl is the same as the host variant of Perl. 

648 config = _perl_config_data() 

649 if config.debian_abi is not None and config.debian_abi != "": 

650 return f"perlapi-{config.debian_abi}" 

651 return f"perlapi-{config.version}" 

652 

653 

654def resolve_perl_config( 

655 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable, 

656 dctrl_bin: Optional["BinaryPackage"], 

657) -> PerlConfigVars: 

658 global _PERL_MODULE_DIRS 

659 if dpkg_architecture_variables.is_cross_compiling: 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true

660 if dctrl_bin is not None: 

661 arch = dctrl_bin.resolved_architecture 

662 ma = dctrl_bin.deb_multiarch 

663 else: 

664 arch = dpkg_architecture_variables.current_host_arch 

665 ma = dpkg_architecture_variables.current_host_multiarch 

666 else: 

667 # We are de facto using the build-arch version of perl here; be explicit 

668 arch = "_build_arch_" 

669 ma = dpkg_architecture_variables["DEB_BUILD_MULTIARCH"] 

670 config_vars = _PERL_MODULE_DIRS.get(arch) 

671 if config_vars is None: 

672 cmd = ["perl"] 

673 if dpkg_architecture_variables.is_cross_compiling: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 version = _perl_version() 

675 cross_inc_dir = f"/usr/lib/{ma}/perl/cross-config-{version}" 

676 # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs 

677 # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in 

678 # cross builds... meh. 

679 if os.path.exists(os.path.join(cross_inc_dir, "Config.pm")): 

680 cmd.append(f"-I{cross_inc_dir}") 

681 else: 

682 cross_inc_dir = None 

683 cmd.extend( 

684 [ 

685 "-MConfig", 

686 "-e", 

687 'print "$Config{vendorlib}\n$Config{vendorarch}\n$Config{ld}\n$Config{path_sep}\n"', 

688 ] 

689 ) 

690 output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False) 

691 if len(output) != 4: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true

692 raise ValueError( 

693 "Internal error: Unable to determine the perl include directories:" 

694 f" Raw output from perl snippet: {output}" 

695 ) 

696 config_vars = PerlConfigVars( 

697 vendorlib="/" + _normalize_path(output[0], with_prefix=False), 

698 vendorarch="/" + _normalize_path(output[1], with_prefix=False), 

699 cross_inc_dir=cross_inc_dir, 

700 ld=output[2], 

701 path_sep=output[3], 

702 ) 

703 _PERL_MODULE_DIRS[arch] = config_vars 

704 return config_vars 

705 

706 

707@functools.lru_cache(1) 

708def detect_fakeroot() -> bool: 

709 if os.getuid() != 0 or "LD_PRELOAD" not in os.environ: 

710 return False 

711 env = dict(os.environ) 

712 del env["LD_PRELOAD"] 

713 try: 

714 return subprocess.check_output(["id", "-u"], env=env).strip() != b"0" 

715 except subprocess.CalledProcessError: 

716 print( 

717 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot', 

718 file=sys.stderr, 

719 ) 

720 return False 

721 

722 

723@functools.lru_cache(1) 

724def _sc_arg_max() -> Optional[int]: 

725 try: 

726 return os.sysconf("SC_ARG_MAX") 

727 except RuntimeError: 

728 _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit") 

729 return None 

730 

731 

732def _split_xargs_args( 

733 static_cmd: Sequence[str], 

734 max_args_byte_len: int, 

735 varargs: Iterable[str], 

736 reuse_list_ok: bool, 

737) -> Iterator[List[str]]: 

738 static_cmd_len = len(static_cmd) 

739 remaining_len = max_args_byte_len 

740 pending_args = list(static_cmd) 

741 for arg in varargs: 

742 arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space 

743 remaining_len -= arg_len 

744 if not remaining_len: 

745 if len(pending_args) <= static_cmd_len: 

746 raise ValueError( 

747 f"Could not fit a single argument into the command line !?" 

748 f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)" 

749 ) 

750 yield pending_args 

751 remaining_len = max_args_byte_len - arg_len 

752 if reuse_list_ok: 

753 pending_args.clear() 

754 pending_args.extend(static_cmd) 

755 else: 

756 pending_args = list(static_cmd) 

757 pending_args.append(arg) 

758 

759 if len(pending_args) > static_cmd_len: 

760 yield pending_args 

761 

762 

763def xargs( 

764 static_cmd: Sequence[str], 

765 varargs: Iterable[str], 

766 *, 

767 env: Optional[Mapping[str, str]] = None, 

768 reuse_list_ok: bool = False, 

769) -> Iterator[List[str]]: 

770 max_args_bytes = _sc_arg_max() 

771 # len overshoots with one space explaining the -1. The _split_xargs_args 

772 # will account for the space for the first argument 

773 static_byte_len = ( 

774 len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd) 

775 ) 

776 if max_args_bytes is not None: 

777 if env is None: 

778 # +2 for nul bytes after key and value 

779 static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items()) 

780 else: 

781 # +2 for nul bytes after key and value 

782 static_byte_len += sum( 

783 len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2 

784 for k, v in env.items() 

785 ) 

786 # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like 

787 # that) 

788 static_byte_len += 2 * 4096 

789 else: 

790 # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here. 

791 max_args_bytes = 20_000 

792 remain_len = max_args_bytes - static_byte_len 

793 yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok) 

794 

795 

796# itertools recipe 

797def grouper( 

798 iterable: Iterable[T], 

799 n: int, 

800 *, 

801 incomplete: Literal["fill", "strict", "ignore"] = "fill", 

802 fillvalue: Optional[T] = None, 

803) -> Iterator[Tuple[T, ...]]: 

804 """Collect data into non-overlapping fixed-length chunks or blocks""" 

805 # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx 

806 # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError 

807 # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF 

808 args = [iter(iterable)] * n 

809 if incomplete == "fill": 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true

810 return zip_longest(*args, fillvalue=fillvalue) 

811 if incomplete == "strict": 811 ↛ 813line 811 didn't jump to line 813 because the condition on line 811 was always true

812 return zip(*args, strict=True) 

813 if incomplete == "ignore": 

814 return zip(*args) 

815 else: 

816 raise ValueError("Expected fill, strict, or ignore") 

817 

818 

819_LOGGING_SET_UP = False 

820 

821 

822def _check_color() -> Tuple[bool, bool, Optional[str]]: 

823 dpkg_or_default = os.environ.get( 

824 "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto" 

825 ) 

826 requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default) 

827 bad_request = None 

828 if requested_color not in {"auto", "always", "never"}: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 bad_request = requested_color 

830 requested_color = "auto" 

831 

832 if requested_color == "auto": 832 ↛ 836line 832 didn't jump to line 836 because the condition on line 832 was always true

833 stdout_color = sys.stdout.isatty() 

834 stderr_color = sys.stdout.isatty() 

835 else: 

836 enable = requested_color == "always" 

837 stdout_color = enable 

838 stderr_color = enable 

839 return stdout_color, stderr_color, bad_request 

840 

841 

842def program_name() -> str: 

843 name = os.path.basename(sys.argv[0]) 

844 if name.endswith(".py"): 844 ↛ 845line 844 didn't jump to line 845 because the condition on line 844 was never true

845 name = name[:-3] 

846 if name == "__main__": 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true

847 name = os.path.basename(os.path.dirname(sys.argv[0])) 

848 # FIXME: Not optimal that we have to hardcode these kind of things here 

849 if name == "debputy_cmd": 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true

850 name = "debputy" 

851 return name 

852 

853 

854def package_cross_check_precheck( 

855 pkg_a: "BinaryPackage", 

856 pkg_b: "BinaryPackage", 

857) -> Tuple[bool, bool]: 

858 """Whether these two packages can do content cross-checks 

859 

860 :param pkg_a: The first package 

861 :param pkg_b: The second package 

862 :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks 

863 that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks 

864 that involves binary_package_a. Both can be True and both can be False at the same time, which 

865 happens in common cases (arch:all + arch:any cases both to be False as a common example). 

866 """ 

867 

868 # Handle the two most obvious base-cases 

869 if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on: 

870 return False, False 

871 if pkg_a.is_arch_all ^ pkg_b.is_arch_all: 

872 return False, False 

873 

874 a_may_see_b = True 

875 b_may_see_a = True 

876 

877 a_bp = pkg_a.fields.get("Build-Profiles", "") 

878 b_bp = pkg_b.fields.get("Build-Profiles", "") 

879 

880 if a_bp != b_bp: 

881 a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset() 

882 b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset() 

883 

884 # Check for build profiles being identically but just ordered differently. 

885 if a_bp_set != b_bp_set: 

886 # For simplicity, we let groups cancel each other out. If one side has no clauses 

887 # left, then it will always be built when the other is built. 

888 # 

889 # Eventually, someone will be here with a special case where more complex logic is 

890 # required. Good luck to you! Remember to add test cases for it (the existing logic 

891 # has some for a reason and if the logic is going to be more complex, it will need 

892 # tests cases to assert it fixes the problem and does not regress) 

893 if a_bp_set - b_bp_set: 

894 a_may_see_b = False 

895 if b_bp_set - a_bp_set: 

896 b_may_see_a = False 

897 

898 if pkg_a.declared_architecture != pkg_b.declared_architecture: 

899 # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain 

900 if pkg_a.declared_architecture != "any": 900 ↛ 902line 900 didn't jump to line 902 because the condition on line 900 was always true

901 b_may_see_a = False 

902 if pkg_a.declared_architecture != "any": 902 ↛ 905line 902 didn't jump to line 905 because the condition on line 902 was always true

903 a_may_see_b = False 

904 

905 return a_may_see_b, b_may_see_a 

906 

907 

908def change_log_level( 

909 log_level: int, 

910) -> None: 

911 if _DEFAULT_LOGGER is not None: 

912 _DEFAULT_LOGGER.setLevel(log_level) 

913 logging.getLogger("").setLevel(log_level) 

914 

915 

916def current_log_level() -> Optional[int]: 

917 if _DEFAULT_LOGGER is not None: 

918 return _DEFAULT_LOGGER.level 

919 return None 

920 

921 

922def setup_logging( 

923 *, 

924 log_only_to_stderr: bool = False, 

925 reconfigure_logging: bool = False, 

926) -> None: 

927 global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER 

928 if _LOGGING_SET_UP and not reconfigure_logging: 928 ↛ 929line 928 didn't jump to line 929 because the condition on line 928 was never true

929 raise RuntimeError( 

930 "Logging has already been configured." 

931 " Use reconfigure_logging=True if you need to reconfigure it" 

932 ) 

933 stdout_color, stderr_color, bad_request = _check_color() 

934 colors: Optional[Dict[str, str]] = None 

935 

936 if stdout_color or stderr_color: 936 ↛ 937line 936 didn't jump to line 937 because the condition on line 936 was never true

937 try: 

938 import colorlog 

939 

940 except ImportError: 

941 stdout_color = False 

942 stderr_color = False 

943 else: 

944 colors = dict(colorlog.default_log_colors) 

945 # Add our custom levels. 

946 colors["_INFO"] = colors["INFO"] 

947 colors["__INFO"] = colors["INFO"] 

948 

949 if log_only_to_stderr: 

950 stdout = sys.stderr 

951 stdout_color = stderr_color 

952 else: 

953 stdout = sys.stderr 

954 

955 class LogLevelFilter(logging.Filter): 

956 def __init__(self, threshold: int, above: bool): 

957 super().__init__() 

958 self.threshold = threshold 

959 self.above = above 

960 

961 def filter(self, record: logging.LogRecord) -> bool: 

962 if self.above: 

963 return record.levelno >= self.threshold 

964 else: 

965 return record.levelno < self.threshold 

966 

967 color_format = ( 

968 "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}" 

969 ) 

970 colorless_format = "{name}: {levelnamelower}: {message}" 

971 

972 existing_stdout_handler = _STDOUT_HANDLER 

973 existing_stderr_handler = _STDERR_HANDLER 

974 

975 if stdout_color: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true

976 stdout_handler = colorlog.StreamHandler(stdout) 

977 stdout_handler.setFormatter( 

978 colorlog.ColoredFormatter( 

979 color_format, 

980 style="{", 

981 force_color=True, 

982 log_colors=colors, 

983 ) 

984 ) 

985 logger = colorlog.getLogger() 

986 if existing_stdout_handler is not None: 

987 logger.removeHandler(existing_stdout_handler) 

988 _STDOUT_HANDLER = stdout_handler 

989 logger.addHandler(stdout_handler) 

990 else: 

991 stdout_handler = logging.StreamHandler(stdout) 

992 stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

993 logger = logging.getLogger() 

994 if existing_stdout_handler is not None: 

995 logger.removeHandler(existing_stdout_handler) 

996 _STDOUT_HANDLER = stdout_handler 

997 logger.addHandler(stdout_handler) 

998 

999 if stderr_color: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true

1000 stderr_handler = colorlog.StreamHandler(sys.stderr) 

1001 stderr_handler.setFormatter( 

1002 colorlog.ColoredFormatter( 

1003 color_format, 

1004 style="{", 

1005 force_color=True, 

1006 log_colors=colors, 

1007 ) 

1008 ) 

1009 logger = logging.getLogger() 

1010 if existing_stderr_handler is not None: 

1011 logger.removeHandler(existing_stderr_handler) 

1012 _STDERR_HANDLER = stderr_handler 

1013 logger.addHandler(stderr_handler) 

1014 else: 

1015 stderr_handler = logging.StreamHandler(sys.stderr) 

1016 stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{")) 

1017 logger = logging.getLogger() 

1018 if existing_stderr_handler is not None: 

1019 logger.removeHandler(existing_stderr_handler) 

1020 _STDERR_HANDLER = stderr_handler 

1021 logger.addHandler(stderr_handler) 

1022 

1023 stdout_handler.addFilter(LogLevelFilter(logging.WARN, False)) 

1024 stderr_handler.addFilter(LogLevelFilter(logging.WARN, True)) 

1025 

1026 name = program_name() 

1027 

1028 old_factory = logging.getLogRecordFactory() 

1029 

1030 def record_factory( 

1031 *args: Any, **kwargs: Any 

1032 ) -> logging.LogRecord: # pragma: no cover 

1033 record = old_factory(*args, **kwargs) 

1034 record.levelname = record.levelname.lstrip("_") 

1035 record.levelnamelower = record.levelname.lower() 

1036 return record 

1037 

1038 logging.setLogRecordFactory(record_factory) 

1039 

1040 logging.getLogger().setLevel(logging.WARN) 

1041 _DEFAULT_LOGGER = logging.getLogger(name) 

1042 

1043 if bad_request: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 _DEFAULT_LOGGER.warning( 

1045 f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.' 

1046 ' Resetting to "auto".' 

1047 ) 

1048 

1049 _LOGGING_SET_UP = True