Coverage for src/debputy/commands/deb_packer.py: 56%
199 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
1#!/usr/bin/python3 -B
2import argparse
3import errno
4import operator
5import os
6import stat
7import subprocess
8import tarfile
9import textwrap
10from typing import BinaryIO, cast
11from collections.abc import Iterable, Callable
13from debputy.intermediate_manifest import TarMember, PathType
14from debputy.util import (
15 _error,
16 compute_output_filename,
17 resolve_source_date_epoch,
18 ColorizedArgumentParser,
19 setup_logging,
20 program_name,
21 assume_not_none,
22)
23from debputy.version import version
25# AR header / start of a deb file for reference
26# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b|
27# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736|
28# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
29# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 |
30# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.|
31# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736|
32# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
33# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 |
34# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F|
37class ArMember:
38 def __init__(
39 self,
40 name: str,
41 mtime: int,
42 fixed_binary: bytes | None = None,
43 write_to_impl: Callable[[BinaryIO], None] | None = None,
44 ) -> None:
45 self.name = name
46 self._mtime = mtime
47 self._write_to_impl = write_to_impl
48 self.fixed_binary = fixed_binary
50 @property
51 def is_fixed_binary(self) -> bool:
52 return self.fixed_binary is not None
54 @property
55 def mtime(self) -> int:
56 return self._mtime
58 def write_to(self, fd: BinaryIO) -> None:
59 writer = self._write_to_impl
60 assert writer is not None
61 writer(fd)
64AR_HEADER_LEN = 60
65AR_HEADER = b" " * AR_HEADER_LEN
68def write_header(
69 fd: BinaryIO,
70 member: ArMember,
71 member_len: int,
72 mtime: int,
73) -> None:
74 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % (
75 member.name.encode("ascii"),
76 mtime,
77 member_len,
78 )
79 fd.write(header)
82def generate_ar_archive(
83 output_filename: str,
84 mtime: int,
85 members: Iterable[ArMember],
86 prefer_raw_exceptions: bool,
87) -> None:
88 try:
89 with open(output_filename, "wb", buffering=0) as fd:
90 fd.write(b"!<arch>\n")
91 for member in members:
92 if member.is_fixed_binary:
93 fixed_binary = assume_not_none(member.fixed_binary)
94 write_header(fd, member, len(fixed_binary), mtime)
95 fd.write(fixed_binary)
96 else:
97 header_pos = fd.tell()
98 fd.write(AR_HEADER)
99 member.write_to(fd)
100 current_pos = fd.tell()
101 fd.seek(header_pos, os.SEEK_SET)
102 content_len = current_pos - header_pos - AR_HEADER_LEN
103 assert content_len >= 0
104 write_header(fd, member, content_len, mtime)
105 fd.seek(current_pos, os.SEEK_SET)
106 except OSError as e:
107 if prefer_raw_exceptions:
108 raise
109 if e.errno == errno.ENOSPC:
110 _error(
111 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}"
112 )
113 elif e.errno == errno.EIO:
114 _error(
115 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}"
116 )
117 elif e.errno == errno.EROFS:
118 _error(
119 f"Unable to write {output_filename}. The file system is read-only: {str(e)}"
120 )
121 raise
122 print(f"Generated {output_filename}")
125def _generate_tar_file(
126 tar_members: Iterable[TarMember],
127 compression_cmd: list[str],
128 write_to: BinaryIO,
129) -> None:
130 with (
131 subprocess.Popen(
132 compression_cmd, stdin=subprocess.PIPE, stdout=write_to
133 ) as compress_proc,
134 tarfile.open(
135 mode="w|",
136 fileobj=compress_proc.stdin,
137 format=tarfile.GNU_FORMAT,
138 errorlevel=1,
139 ) as tar_fd,
140 ):
141 for tar_member in tar_members:
142 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd)
143 if tar_member.path_type == PathType.FILE:
144 with open(assume_not_none(tar_member.fs_path), "rb") as mfd:
145 tar_fd.addfile(tar_info, fileobj=mfd)
146 else:
147 tar_fd.addfile(tar_info)
148 compress_proc.wait()
149 if compress_proc.returncode != 0: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 _error(
151 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}"
152 )
155def generate_tar_file_member(
156 tar_members: Iterable[TarMember],
157 compression_cmd: list[str],
158) -> Callable[[BinaryIO], None]:
159 def _impl(fd: BinaryIO) -> None:
160 _generate_tar_file(
161 tar_members,
162 compression_cmd,
163 fd,
164 )
166 return _impl
169def _xz_cmdline(
170 compression_rule: "Compression",
171 parsed_args: argparse.Namespace | None,
172) -> list[str]:
173 compression_level = compression_rule.effective_compression_level(parsed_args)
174 threads_max = parsed_args.threads_max
175 cmdline = [
176 "xz",
177 f"-T+{threads_max}",
178 "-" + str(compression_level),
179 # Do not generate warnings when adjusting memory usage, nor
180 # exit with non-zero due to those not emitted warnings.
181 "--quiet",
182 "--no-warn",
183 # Do not let xz fallback to single-threaded mode, to avoid
184 # non-reproducible output.
185 "--no-adjust",
186 ]
187 strategy = None if parsed_args is None else parsed_args.compression_strategy
188 if strategy is None: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true
189 strategy = "none"
190 if strategy != "none": 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 cmdline.append("--" + strategy)
192 cmdline.append("--no-adjust")
193 return cmdline
196def _gzip_cmdline(
197 compression_rule: "Compression",
198 parsed_args: argparse.Namespace | None,
199) -> list[str]:
200 compression_level = compression_rule.effective_compression_level(parsed_args)
201 cmdline = ["gzip", "-n" + str(compression_level)]
202 strategy = None if parsed_args is None else parsed_args.compression_strategy
203 if strategy is not None and strategy != "none":
204 raise ValueError(
205 f"Not implemented: Compression strategy {strategy}"
206 " for gzip is currently unsupported (but dpkg-deb does)"
207 )
208 return cmdline
211def _uncompressed_cmdline(
212 _unused_a: "Compression",
213 _unused_b: argparse.Namespace | None,
214) -> list[str]:
215 return ["cat"]
218class Compression:
219 def __init__(
220 self,
221 default_compression_level: int,
222 extension: str,
223 allowed_strategies: frozenset[str],
224 cmdline_builder: Callable[
225 ["Compression", argparse.Namespace | None], list[str]
226 ],
227 ) -> None:
228 self.default_compression_level = default_compression_level
229 self.extension = extension
230 self.allowed_strategies = allowed_strategies
231 self.cmdline_builder = cmdline_builder
233 def __repr__(self) -> str:
234 return f"<{self.__class__.__name__} {self.extension}>"
236 def effective_compression_level(
237 self, parsed_args: argparse.Namespace | None
238 ) -> int:
239 if parsed_args and parsed_args.compression_level is not None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 return cast("int", parsed_args.compression_level)
241 return self.default_compression_level
243 def as_cmdline(self, parsed_args: argparse.Namespace | None) -> list[str]:
244 return self.cmdline_builder(self, parsed_args)
246 def with_extension(self, filename: str) -> str:
247 return filename + self.extension
250COMPRESSIONS = {
251 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline),
252 "gzip": Compression(
253 9,
254 ".gz",
255 frozenset({"none", "filtered", "huffman", "rle", "fixed"}),
256 _gzip_cmdline,
257 ),
258 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline),
259}
262def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace:
263 if (
264 parsed_args.compression_level == 0
265 and parsed_args.compression_algorithm == "gzip"
266 ):
267 print(
268 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)"
269 )
270 setattr(parsed_args, "compression_algorithm", "none")
272 compression = COMPRESSIONS[parsed_args.compression_algorithm]
273 strategy = parsed_args.compression_strategy
274 if strategy is not None and strategy not in compression.allowed_strategies:
275 _error(
276 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy'
277 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}'
278 )
279 return parsed_args
282def parse_args() -> argparse.Namespace:
283 try:
284 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"])
285 except (KeyError, ValueError):
286 compression_level_default = None
288 try:
289 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"]
290 except (KeyError, ValueError):
291 compression_type = "xz"
293 try:
294 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"])
295 except (KeyError, ValueError):
296 threads_max = 0
298 description = textwrap.dedent("""\
299 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support
301 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as
302 soon as "dpkg-deb -b" grows support for the relevant features.
304 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package
305 without requiring root even for static ownership. It is a temporary stand-in for
306 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest.
308 The tool operates on an internal JSON based manifest for now, because it was faster
309 than building an mtree parser (which is the format that dpkg will likely end up
310 using).
312 As the tool is not meant to be used directly, it is full of annoying paper cuts that
313 I refuse to fix or maintain. Use the high level tool instead.
315 """)
317 parser = ColorizedArgumentParser(
318 description=description,
319 formatter_class=argparse.RawDescriptionHelpFormatter,
320 allow_abbrev=False,
321 prog=program_name(),
322 )
323 parser.add_argument("--version", action="version", version=version())
324 parser.add_argument(
325 "package_root_dir",
326 metavar="PACKAGE_ROOT_DIR",
327 help="Root directory of the package. Must contain a DEBIAN directory",
328 )
329 parser.add_argument(
330 "package_output_path",
331 metavar="PATH",
332 help="Path where the package should be placed. If it is directory,"
333 " the base name will be determined from the package metadata",
334 )
336 parser.add_argument(
337 "--intermediate-package-manifest",
338 dest="package_manifest",
339 metavar="JSON_FILE",
340 action="store",
341 default=None,
342 help="INTERMEDIATE package manifest (JSON!)",
343 )
344 parser.add_argument(
345 "--root-owner-group",
346 dest="root_owner_group",
347 action="store_true",
348 help="Ignored. Accepted for compatibility with dpkg-deb -b",
349 )
350 parser.add_argument(
351 "-b",
352 "--build",
353 dest="build_param",
354 action="store_true",
355 help="Ignored. Accepted for compatibility with dpkg-deb",
356 )
357 parser.add_argument(
358 "--source-date-epoch",
359 dest="source_date_epoch",
360 action="store",
361 type=int,
362 default=None,
363 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable",
364 )
365 parser.add_argument(
366 "-Z",
367 dest="compression_algorithm",
368 choices=COMPRESSIONS,
369 default=compression_type,
370 help="The compression algorithm to be used",
371 )
372 parser.add_argument(
373 "-z",
374 dest="compression_level",
375 metavar="{0-9}",
376 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
377 default=compression_level_default,
378 type=int,
379 help="The compression level to be used",
380 )
381 parser.add_argument(
382 "-S",
383 dest="compression_strategy",
384 # We have a different default for xz when strategy is unset and we are building a udeb
385 action="store",
386 default=None,
387 help="The compression algorithm to be used. Concrete values depend on the compression"
388 ' algorithm, but the value "none" is always allowed',
389 )
390 parser.add_argument(
391 "--uniform-compression",
392 dest="uniform_compression",
393 action="store_true",
394 default=True,
395 help="Whether to use the same compression for the control.tar and the data.tar."
396 " The default is to use uniform compression.",
397 )
398 parser.add_argument(
399 "--no-uniform-compression",
400 dest="uniform_compression",
401 action="store_false",
402 default=True,
403 help="Disable uniform compression (see --uniform-compression)",
404 )
405 parser.add_argument(
406 "--threads-max",
407 dest="threads_max",
408 default=threads_max,
409 type=int,
410 help="Sets the maximum number of threads allowed for compressors that support multi-threaded operations",
411 )
412 parser.add_argument(
413 "-d",
414 "--debug",
415 dest="debug_mode",
416 action="store_true",
417 default=False,
418 help="Enable debug logging and raw stack traces on errors",
419 )
421 parsed_args = parser.parse_args()
422 parsed_args = _normalize_compression_args(parsed_args)
424 return parsed_args
427def _ctrl_member(
428 member_path: str,
429 fs_path: str | None = None,
430 path_type: PathType = PathType.FILE,
431 mode: int = 0o644,
432 mtime: int = 0,
433) -> TarMember:
434 if fs_path is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 assert member_path.startswith("./")
436 fs_path = "DEBIAN" + member_path[1:]
437 return TarMember(
438 member_path=member_path,
439 path_type=path_type,
440 fs_path=fs_path,
441 mode=mode,
442 owner="root",
443 uid=0,
444 group="root",
445 gid=0,
446 mtime=mtime,
447 )
450CTRL_MEMBER_SCRIPTS = {
451 "postinst",
452 "preinst",
453 "postrm",
454 "prerm",
455 "config",
456 "isinstallable",
457}
460def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]:
461 debian_root = os.path.join(package_root_dir, "DEBIAN")
462 dir_st = os.stat(debian_root)
463 dir_mtime = int(dir_st.st_mtime)
464 yield _ctrl_member(
465 "./",
466 debian_root,
467 path_type=PathType.DIRECTORY,
468 mode=0o0755,
469 mtime=min(mtime, dir_mtime),
470 )
471 with os.scandir(debian_root) as dir_iter:
472 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")):
473 st = os.stat(ctrl_member)
474 if not stat.S_ISREG(st.st_mode): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 _error(
476 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!"
477 )
478 file_mtime = int(st.st_mtime)
479 yield _ctrl_member(
480 f"./{ctrl_member.name}",
481 path_type=PathType.FILE,
482 fs_path=ctrl_member.path,
483 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644,
484 mtime=min(mtime, file_mtime),
485 )
488def parse_manifest(manifest_path: str | None) -> list["TarMember"]:
489 if manifest_path is None: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 _error(f"--intermediate-package-manifest is mandatory for now")
491 return TarMember.parse_intermediate_manifest(manifest_path)
494def main() -> None:
495 setup_logging()
496 parsed_args = parse_args()
497 root_dir: str = parsed_args.package_root_dir
498 output_path: str = parsed_args.package_output_path
499 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
501 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm]
502 data_compression_cmd = data_compression.as_cmdline(parsed_args)
503 if parsed_args.uniform_compression:
504 ctrl_compression = data_compression
505 ctrl_compression_cmd = data_compression_cmd
506 else:
507 ctrl_compression = COMPRESSIONS["gzip"]
508 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None)
510 if output_path.endswith("/") or os.path.isdir(output_path):
511 deb_file = os.path.join(
512 output_path,
513 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False),
514 )
515 else:
516 deb_file = output_path
518 pack(
519 deb_file,
520 ctrl_compression,
521 data_compression,
522 root_dir,
523 parsed_args.package_manifest,
524 mtime,
525 ctrl_compression_cmd,
526 data_compression_cmd,
527 prefer_raw_exceptions=not parsed_args.debug_mode,
528 )
531def pack(
532 deb_file: str,
533 ctrl_compression: Compression,
534 data_compression: Compression,
535 root_dir: str,
536 package_manifest: str | None,
537 mtime: int,
538 ctrl_compression_cmd: list[str],
539 data_compression_cmd: list[str],
540 prefer_raw_exceptions: bool = False,
541) -> None:
542 data_tar_members = parse_manifest(package_manifest)
543 members = [
544 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"),
545 ArMember(
546 ctrl_compression.with_extension("control.tar"),
547 mtime,
548 write_to_impl=generate_tar_file_member(
549 _ctrl_tar_members(root_dir, mtime),
550 ctrl_compression_cmd,
551 ),
552 ),
553 ArMember(
554 data_compression.with_extension("data.tar"),
555 mtime,
556 write_to_impl=generate_tar_file_member(
557 data_tar_members,
558 data_compression_cmd,
559 ),
560 ),
561 ]
562 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions)
565if __name__ == "__main__":
566 main()