Coverage for src/debputy/commands/deb_packer.py: 56%

199 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-16 17:20 +0000

1#!/usr/bin/python3 -B 

2import argparse 

3import errno 

4import operator 

5import os 

6import stat 

7import subprocess 

8import tarfile 

9import textwrap 

10from typing import BinaryIO, cast 

11from collections.abc import Iterable, Callable 

12 

13from debputy.intermediate_manifest import TarMember, PathType 

14from debputy.util import ( 

15 _error, 

16 compute_output_filename, 

17 resolve_source_date_epoch, 

18 ColorizedArgumentParser, 

19 setup_logging, 

20 program_name, 

21 assume_not_none, 

22) 

23from debputy.version import version 

24 

25 

26# AR header / start of a deb file for reference 

27# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b| 

28# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736| 

29# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 | 

30# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 | 

31# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.| 

32# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736| 

33# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 | 

34# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 | 

35# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F| 

36 

37 

38class ArMember: 

39 def __init__( 

40 self, 

41 name: str, 

42 mtime: int, 

43 fixed_binary: bytes | None = None, 

44 write_to_impl: Callable[[BinaryIO], None] | None = None, 

45 ) -> None: 

46 self.name = name 

47 self._mtime = mtime 

48 self._write_to_impl = write_to_impl 

49 self.fixed_binary = fixed_binary 

50 

51 @property 

52 def is_fixed_binary(self) -> bool: 

53 return self.fixed_binary is not None 

54 

55 @property 

56 def mtime(self) -> int: 

57 return self._mtime 

58 

59 def write_to(self, fd: BinaryIO) -> None: 

60 writer = self._write_to_impl 

61 assert writer is not None 

62 writer(fd) 

63 

64 

65AR_HEADER_LEN = 60 

66AR_HEADER = b" " * AR_HEADER_LEN 

67 

68 

69def write_header( 

70 fd: BinaryIO, 

71 member: ArMember, 

72 member_len: int, 

73 mtime: int, 

74) -> None: 

75 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % ( 

76 member.name.encode("ascii"), 

77 mtime, 

78 member_len, 

79 ) 

80 fd.write(header) 

81 

82 

83def generate_ar_archive( 

84 output_filename: str, 

85 mtime: int, 

86 members: Iterable[ArMember], 

87 prefer_raw_exceptions: bool, 

88) -> None: 

89 try: 

90 with open(output_filename, "wb", buffering=0) as fd: 

91 fd.write(b"!<arch>\n") 

92 for member in members: 

93 if member.is_fixed_binary: 

94 fixed_binary = assume_not_none(member.fixed_binary) 

95 write_header(fd, member, len(fixed_binary), mtime) 

96 fd.write(fixed_binary) 

97 else: 

98 header_pos = fd.tell() 

99 fd.write(AR_HEADER) 

100 member.write_to(fd) 

101 current_pos = fd.tell() 

102 fd.seek(header_pos, os.SEEK_SET) 

103 content_len = current_pos - header_pos - AR_HEADER_LEN 

104 assert content_len >= 0 

105 write_header(fd, member, content_len, mtime) 

106 fd.seek(current_pos, os.SEEK_SET) 

107 except OSError as e: 

108 if prefer_raw_exceptions: 

109 raise 

110 if e.errno == errno.ENOSPC: 

111 _error( 

112 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}" 

113 ) 

114 elif e.errno == errno.EIO: 

115 _error( 

116 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}" 

117 ) 

118 elif e.errno == errno.EROFS: 

119 _error( 

120 f"Unable to write {output_filename}. The file system is read-only: {str(e)}" 

121 ) 

122 raise 

123 print(f"Generated {output_filename}") 

124 

125 

126def _generate_tar_file( 

127 tar_members: Iterable[TarMember], 

128 compression_cmd: list[str], 

129 write_to: BinaryIO, 

130) -> None: 

131 with ( 

132 subprocess.Popen( 

133 compression_cmd, stdin=subprocess.PIPE, stdout=write_to 

134 ) as compress_proc, 

135 tarfile.open( 

136 mode="w|", 

137 fileobj=compress_proc.stdin, 

138 format=tarfile.GNU_FORMAT, 

139 errorlevel=1, 

140 ) as tar_fd, 

141 ): 

142 for tar_member in tar_members: 

143 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd) 

144 if tar_member.path_type == PathType.FILE: 

145 with open(assume_not_none(tar_member.fs_path), "rb") as mfd: 

146 tar_fd.addfile(tar_info, fileobj=mfd) 

147 else: 

148 tar_fd.addfile(tar_info) 

149 compress_proc.wait() 

150 if compress_proc.returncode != 0: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 _error( 

152 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}" 

153 ) 

154 

155 

156def generate_tar_file_member( 

157 tar_members: Iterable[TarMember], 

158 compression_cmd: list[str], 

159) -> Callable[[BinaryIO], None]: 

160 def _impl(fd: BinaryIO) -> None: 

161 _generate_tar_file( 

162 tar_members, 

163 compression_cmd, 

164 fd, 

165 ) 

166 

167 return _impl 

168 

169 

170def _xz_cmdline( 

171 compression_rule: "Compression", 

172 parsed_args: argparse.Namespace | None, 

173) -> list[str]: 

174 compression_level = compression_rule.effective_compression_level(parsed_args) 

175 threads_max = parsed_args.threads_max 

176 cmdline = [ 

177 "xz", 

178 f"-T+{threads_max}", 

179 "-" + str(compression_level), 

180 # Do not generate warnings when adjusting memory usage, nor 

181 # exit with non-zero due to those not emitted warnings. 

182 "--quiet", 

183 "--no-warn", 

184 # Do not let xz fallback to single-threaded mode, to avoid 

185 # non-reproducible output. 

186 "--no-adjust", 

187 ] 

188 strategy = None if parsed_args is None else parsed_args.compression_strategy 

189 if strategy is None: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 strategy = "none" 

191 if strategy != "none": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 cmdline.append("--" + strategy) 

193 cmdline.append("--no-adjust") 

194 return cmdline 

195 

196 

197def _gzip_cmdline( 

198 compression_rule: "Compression", 

199 parsed_args: argparse.Namespace | None, 

200) -> list[str]: 

201 compression_level = compression_rule.effective_compression_level(parsed_args) 

202 cmdline = ["gzip", "-n" + str(compression_level)] 

203 strategy = None if parsed_args is None else parsed_args.compression_strategy 

204 if strategy is not None and strategy != "none": 

205 raise ValueError( 

206 f"Not implemented: Compression strategy {strategy}" 

207 " for gzip is currently unsupported (but dpkg-deb does)" 

208 ) 

209 return cmdline 

210 

211 

212def _uncompressed_cmdline( 

213 _unused_a: "Compression", 

214 _unused_b: argparse.Namespace | None, 

215) -> list[str]: 

216 return ["cat"] 

217 

218 

219class Compression: 

220 def __init__( 

221 self, 

222 default_compression_level: int, 

223 extension: str, 

224 allowed_strategies: frozenset[str], 

225 cmdline_builder: Callable[ 

226 ["Compression", argparse.Namespace | None], list[str] 

227 ], 

228 ) -> None: 

229 self.default_compression_level = default_compression_level 

230 self.extension = extension 

231 self.allowed_strategies = allowed_strategies 

232 self.cmdline_builder = cmdline_builder 

233 

234 def __repr__(self) -> str: 

235 return f"<{self.__class__.__name__} {self.extension}>" 

236 

237 def effective_compression_level( 

238 self, parsed_args: argparse.Namespace | None 

239 ) -> int: 

240 if parsed_args and parsed_args.compression_level is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 return cast("int", parsed_args.compression_level) 

242 return self.default_compression_level 

243 

244 def as_cmdline(self, parsed_args: argparse.Namespace | None) -> list[str]: 

245 return self.cmdline_builder(self, parsed_args) 

246 

247 def with_extension(self, filename: str) -> str: 

248 return filename + self.extension 

249 

250 

251COMPRESSIONS = { 

252 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline), 

253 "gzip": Compression( 

254 9, 

255 ".gz", 

256 frozenset({"none", "filtered", "huffman", "rle", "fixed"}), 

257 _gzip_cmdline, 

258 ), 

259 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline), 

260} 

261 

262 

263def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace: 

264 if ( 

265 parsed_args.compression_level == 0 

266 and parsed_args.compression_algorithm == "gzip" 

267 ): 

268 print( 

269 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)" 

270 ) 

271 setattr(parsed_args, "compression_algorithm", "none") 

272 

273 compression = COMPRESSIONS[parsed_args.compression_algorithm] 

274 strategy = parsed_args.compression_strategy 

275 if strategy is not None and strategy not in compression.allowed_strategies: 

276 _error( 

277 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy' 

278 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}' 

279 ) 

280 return parsed_args 

281 

282 

283def parse_args() -> argparse.Namespace: 

284 try: 

285 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"]) 

286 except (KeyError, ValueError): 

287 compression_level_default = None 

288 

289 try: 

290 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"] 

291 except (KeyError, ValueError): 

292 compression_type = "xz" 

293 

294 try: 

295 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"]) 

296 except (KeyError, ValueError): 

297 threads_max = 0 

298 

299 description = textwrap.dedent( 

300 """\ 

301 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support 

302 

303 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as 

304 soon as "dpkg-deb -b" grows support for the relevant features. 

305 

306 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package 

307 without requiring root even for static ownership. It is a temporary stand-in for 

308 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest. 

309 

310 The tool operates on an internal JSON based manifest for now, because it was faster 

311 than building an mtree parser (which is the format that dpkg will likely end up 

312 using). 

313 

314 As the tool is not meant to be used directly, it is full of annoying paper cuts that 

315 I refuse to fix or maintain. Use the high level tool instead. 

316 

317 """ 

318 ) 

319 

320 parser = ColorizedArgumentParser( 

321 description=description, 

322 formatter_class=argparse.RawDescriptionHelpFormatter, 

323 allow_abbrev=False, 

324 prog=program_name(), 

325 ) 

326 parser.add_argument("--version", action="version", version=version()) 

327 parser.add_argument( 

328 "package_root_dir", 

329 metavar="PACKAGE_ROOT_DIR", 

330 help="Root directory of the package. Must contain a DEBIAN directory", 

331 ) 

332 parser.add_argument( 

333 "package_output_path", 

334 metavar="PATH", 

335 help="Path where the package should be placed. If it is directory," 

336 " the base name will be determined from the package metadata", 

337 ) 

338 

339 parser.add_argument( 

340 "--intermediate-package-manifest", 

341 dest="package_manifest", 

342 metavar="JSON_FILE", 

343 action="store", 

344 default=None, 

345 help="INTERMEDIATE package manifest (JSON!)", 

346 ) 

347 parser.add_argument( 

348 "--root-owner-group", 

349 dest="root_owner_group", 

350 action="store_true", 

351 help="Ignored. Accepted for compatibility with dpkg-deb -b", 

352 ) 

353 parser.add_argument( 

354 "-b", 

355 "--build", 

356 dest="build_param", 

357 action="store_true", 

358 help="Ignored. Accepted for compatibility with dpkg-deb", 

359 ) 

360 parser.add_argument( 

361 "--source-date-epoch", 

362 dest="source_date_epoch", 

363 action="store", 

364 type=int, 

365 default=None, 

366 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable", 

367 ) 

368 parser.add_argument( 

369 "-Z", 

370 dest="compression_algorithm", 

371 choices=COMPRESSIONS, 

372 default=compression_type, 

373 help="The compression algorithm to be used", 

374 ) 

375 parser.add_argument( 

376 "-z", 

377 dest="compression_level", 

378 metavar="{0-9}", 

379 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 

380 default=compression_level_default, 

381 type=int, 

382 help="The compression level to be used", 

383 ) 

384 parser.add_argument( 

385 "-S", 

386 dest="compression_strategy", 

387 # We have a different default for xz when strategy is unset and we are building a udeb 

388 action="store", 

389 default=None, 

390 help="The compression algorithm to be used. Concrete values depend on the compression" 

391 ' algorithm, but the value "none" is always allowed', 

392 ) 

393 parser.add_argument( 

394 "--uniform-compression", 

395 dest="uniform_compression", 

396 action="store_true", 

397 default=True, 

398 help="Whether to use the same compression for the control.tar and the data.tar." 

399 " The default is to use uniform compression.", 

400 ) 

401 parser.add_argument( 

402 "--no-uniform-compression", 

403 dest="uniform_compression", 

404 action="store_false", 

405 default=True, 

406 help="Disable uniform compression (see --uniform-compression)", 

407 ) 

408 parser.add_argument( 

409 "--threads-max", 

410 dest="threads_max", 

411 default=threads_max, 

412 type=int, 

413 help="Sets the maximum number of threads allowed for compressors that support multi-threaded operations", 

414 ) 

415 parser.add_argument( 

416 "-d", 

417 "--debug", 

418 dest="debug_mode", 

419 action="store_true", 

420 default=False, 

421 help="Enable debug logging and raw stack traces on errors", 

422 ) 

423 

424 parsed_args = parser.parse_args() 

425 parsed_args = _normalize_compression_args(parsed_args) 

426 

427 return parsed_args 

428 

429 

430def _ctrl_member( 

431 member_path: str, 

432 fs_path: str | None = None, 

433 path_type: PathType = PathType.FILE, 

434 mode: int = 0o644, 

435 mtime: int = 0, 

436) -> TarMember: 

437 if fs_path is None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 assert member_path.startswith("./") 

439 fs_path = "DEBIAN" + member_path[1:] 

440 return TarMember( 

441 member_path=member_path, 

442 path_type=path_type, 

443 fs_path=fs_path, 

444 mode=mode, 

445 owner="root", 

446 uid=0, 

447 group="root", 

448 gid=0, 

449 mtime=mtime, 

450 ) 

451 

452 

453CTRL_MEMBER_SCRIPTS = { 

454 "postinst", 

455 "preinst", 

456 "postrm", 

457 "prerm", 

458 "config", 

459 "isinstallable", 

460} 

461 

462 

463def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]: 

464 debian_root = os.path.join(package_root_dir, "DEBIAN") 

465 dir_st = os.stat(debian_root) 

466 dir_mtime = int(dir_st.st_mtime) 

467 yield _ctrl_member( 

468 "./", 

469 debian_root, 

470 path_type=PathType.DIRECTORY, 

471 mode=0o0755, 

472 mtime=min(mtime, dir_mtime), 

473 ) 

474 with os.scandir(debian_root) as dir_iter: 

475 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")): 

476 st = os.stat(ctrl_member) 

477 if not stat.S_ISREG(st.st_mode): 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 _error( 

479 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!" 

480 ) 

481 file_mtime = int(st.st_mtime) 

482 yield _ctrl_member( 

483 f"./{ctrl_member.name}", 

484 path_type=PathType.FILE, 

485 fs_path=ctrl_member.path, 

486 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644, 

487 mtime=min(mtime, file_mtime), 

488 ) 

489 

490 

491def parse_manifest(manifest_path: str | None) -> list["TarMember"]: 

492 if manifest_path is None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 _error(f"--intermediate-package-manifest is mandatory for now") 

494 return TarMember.parse_intermediate_manifest(manifest_path) 

495 

496 

497def main() -> None: 

498 setup_logging() 

499 parsed_args = parse_args() 

500 root_dir: str = parsed_args.package_root_dir 

501 output_path: str = parsed_args.package_output_path 

502 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) 

503 

504 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm] 

505 data_compression_cmd = data_compression.as_cmdline(parsed_args) 

506 if parsed_args.uniform_compression: 

507 ctrl_compression = data_compression 

508 ctrl_compression_cmd = data_compression_cmd 

509 else: 

510 ctrl_compression = COMPRESSIONS["gzip"] 

511 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None) 

512 

513 if output_path.endswith("/") or os.path.isdir(output_path): 

514 deb_file = os.path.join( 

515 output_path, 

516 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False), 

517 ) 

518 else: 

519 deb_file = output_path 

520 

521 pack( 

522 deb_file, 

523 ctrl_compression, 

524 data_compression, 

525 root_dir, 

526 parsed_args.package_manifest, 

527 mtime, 

528 ctrl_compression_cmd, 

529 data_compression_cmd, 

530 prefer_raw_exceptions=not parsed_args.debug_mode, 

531 ) 

532 

533 

534def pack( 

535 deb_file: str, 

536 ctrl_compression: Compression, 

537 data_compression: Compression, 

538 root_dir: str, 

539 package_manifest: str | None, 

540 mtime: int, 

541 ctrl_compression_cmd: list[str], 

542 data_compression_cmd: list[str], 

543 prefer_raw_exceptions: bool = False, 

544) -> None: 

545 data_tar_members = parse_manifest(package_manifest) 

546 members = [ 

547 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"), 

548 ArMember( 

549 ctrl_compression.with_extension("control.tar"), 

550 mtime, 

551 write_to_impl=generate_tar_file_member( 

552 _ctrl_tar_members(root_dir, mtime), 

553 ctrl_compression_cmd, 

554 ), 

555 ), 

556 ArMember( 

557 data_compression.with_extension("data.tar"), 

558 mtime, 

559 write_to_impl=generate_tar_file_member( 

560 data_tar_members, 

561 data_compression_cmd, 

562 ), 

563 ), 

564 ] 

565 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions) 

566 

567 

568if __name__ == "__main__": 

569 main()