Coverage for src/debputy/commands/deb_packer.py: 56%
199 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
1#!/usr/bin/python3 -B
2import argparse
3import errno
4import operator
5import os
6import stat
7import subprocess
8import tarfile
9import textwrap
10from typing import BinaryIO, cast
11from collections.abc import Iterable, Callable
13from debputy.intermediate_manifest import TarMember, PathType
14from debputy.util import (
15 _error,
16 compute_output_filename,
17 resolve_source_date_epoch,
18 ColorizedArgumentParser,
19 setup_logging,
20 program_name,
21 assume_not_none,
22)
23from debputy.version import version
26# AR header / start of a deb file for reference
27# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b|
28# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736|
29# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
30# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 |
31# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.|
32# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736|
33# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
34# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 |
35# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F|
38class ArMember:
39 def __init__(
40 self,
41 name: str,
42 mtime: int,
43 fixed_binary: bytes | None = None,
44 write_to_impl: Callable[[BinaryIO], None] | None = None,
45 ) -> None:
46 self.name = name
47 self._mtime = mtime
48 self._write_to_impl = write_to_impl
49 self.fixed_binary = fixed_binary
51 @property
52 def is_fixed_binary(self) -> bool:
53 return self.fixed_binary is not None
55 @property
56 def mtime(self) -> int:
57 return self._mtime
59 def write_to(self, fd: BinaryIO) -> None:
60 writer = self._write_to_impl
61 assert writer is not None
62 writer(fd)
65AR_HEADER_LEN = 60
66AR_HEADER = b" " * AR_HEADER_LEN
69def write_header(
70 fd: BinaryIO,
71 member: ArMember,
72 member_len: int,
73 mtime: int,
74) -> None:
75 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % (
76 member.name.encode("ascii"),
77 mtime,
78 member_len,
79 )
80 fd.write(header)
83def generate_ar_archive(
84 output_filename: str,
85 mtime: int,
86 members: Iterable[ArMember],
87 prefer_raw_exceptions: bool,
88) -> None:
89 try:
90 with open(output_filename, "wb", buffering=0) as fd:
91 fd.write(b"!<arch>\n")
92 for member in members:
93 if member.is_fixed_binary:
94 fixed_binary = assume_not_none(member.fixed_binary)
95 write_header(fd, member, len(fixed_binary), mtime)
96 fd.write(fixed_binary)
97 else:
98 header_pos = fd.tell()
99 fd.write(AR_HEADER)
100 member.write_to(fd)
101 current_pos = fd.tell()
102 fd.seek(header_pos, os.SEEK_SET)
103 content_len = current_pos - header_pos - AR_HEADER_LEN
104 assert content_len >= 0
105 write_header(fd, member, content_len, mtime)
106 fd.seek(current_pos, os.SEEK_SET)
107 except OSError as e:
108 if prefer_raw_exceptions:
109 raise
110 if e.errno == errno.ENOSPC:
111 _error(
112 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}"
113 )
114 elif e.errno == errno.EIO:
115 _error(
116 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}"
117 )
118 elif e.errno == errno.EROFS:
119 _error(
120 f"Unable to write {output_filename}. The file system is read-only: {str(e)}"
121 )
122 raise
123 print(f"Generated {output_filename}")
126def _generate_tar_file(
127 tar_members: Iterable[TarMember],
128 compression_cmd: list[str],
129 write_to: BinaryIO,
130) -> None:
131 with (
132 subprocess.Popen(
133 compression_cmd, stdin=subprocess.PIPE, stdout=write_to
134 ) as compress_proc,
135 tarfile.open(
136 mode="w|",
137 fileobj=compress_proc.stdin,
138 format=tarfile.GNU_FORMAT,
139 errorlevel=1,
140 ) as tar_fd,
141 ):
142 for tar_member in tar_members:
143 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd)
144 if tar_member.path_type == PathType.FILE:
145 with open(assume_not_none(tar_member.fs_path), "rb") as mfd:
146 tar_fd.addfile(tar_info, fileobj=mfd)
147 else:
148 tar_fd.addfile(tar_info)
149 compress_proc.wait()
150 if compress_proc.returncode != 0: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 _error(
152 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}"
153 )
156def generate_tar_file_member(
157 tar_members: Iterable[TarMember],
158 compression_cmd: list[str],
159) -> Callable[[BinaryIO], None]:
160 def _impl(fd: BinaryIO) -> None:
161 _generate_tar_file(
162 tar_members,
163 compression_cmd,
164 fd,
165 )
167 return _impl
170def _xz_cmdline(
171 compression_rule: "Compression",
172 parsed_args: argparse.Namespace | None,
173) -> list[str]:
174 compression_level = compression_rule.effective_compression_level(parsed_args)
175 threads_max = parsed_args.threads_max
176 cmdline = [
177 "xz",
178 f"-T+{threads_max}",
179 "-" + str(compression_level),
180 # Do not generate warnings when adjusting memory usage, nor
181 # exit with non-zero due to those not emitted warnings.
182 "--quiet",
183 "--no-warn",
184 # Do not let xz fallback to single-threaded mode, to avoid
185 # non-reproducible output.
186 "--no-adjust",
187 ]
188 strategy = None if parsed_args is None else parsed_args.compression_strategy
189 if strategy is None: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 strategy = "none"
191 if strategy != "none": 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 cmdline.append("--" + strategy)
193 cmdline.append("--no-adjust")
194 return cmdline
197def _gzip_cmdline(
198 compression_rule: "Compression",
199 parsed_args: argparse.Namespace | None,
200) -> list[str]:
201 compression_level = compression_rule.effective_compression_level(parsed_args)
202 cmdline = ["gzip", "-n" + str(compression_level)]
203 strategy = None if parsed_args is None else parsed_args.compression_strategy
204 if strategy is not None and strategy != "none":
205 raise ValueError(
206 f"Not implemented: Compression strategy {strategy}"
207 " for gzip is currently unsupported (but dpkg-deb does)"
208 )
209 return cmdline
212def _uncompressed_cmdline(
213 _unused_a: "Compression",
214 _unused_b: argparse.Namespace | None,
215) -> list[str]:
216 return ["cat"]
219class Compression:
220 def __init__(
221 self,
222 default_compression_level: int,
223 extension: str,
224 allowed_strategies: frozenset[str],
225 cmdline_builder: Callable[
226 ["Compression", argparse.Namespace | None], list[str]
227 ],
228 ) -> None:
229 self.default_compression_level = default_compression_level
230 self.extension = extension
231 self.allowed_strategies = allowed_strategies
232 self.cmdline_builder = cmdline_builder
234 def __repr__(self) -> str:
235 return f"<{self.__class__.__name__} {self.extension}>"
237 def effective_compression_level(
238 self, parsed_args: argparse.Namespace | None
239 ) -> int:
240 if parsed_args and parsed_args.compression_level is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 return cast("int", parsed_args.compression_level)
242 return self.default_compression_level
244 def as_cmdline(self, parsed_args: argparse.Namespace | None) -> list[str]:
245 return self.cmdline_builder(self, parsed_args)
247 def with_extension(self, filename: str) -> str:
248 return filename + self.extension
251COMPRESSIONS = {
252 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline),
253 "gzip": Compression(
254 9,
255 ".gz",
256 frozenset({"none", "filtered", "huffman", "rle", "fixed"}),
257 _gzip_cmdline,
258 ),
259 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline),
260}
263def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace:
264 if (
265 parsed_args.compression_level == 0
266 and parsed_args.compression_algorithm == "gzip"
267 ):
268 print(
269 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)"
270 )
271 setattr(parsed_args, "compression_algorithm", "none")
273 compression = COMPRESSIONS[parsed_args.compression_algorithm]
274 strategy = parsed_args.compression_strategy
275 if strategy is not None and strategy not in compression.allowed_strategies:
276 _error(
277 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy'
278 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}'
279 )
280 return parsed_args
283def parse_args() -> argparse.Namespace:
284 try:
285 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"])
286 except (KeyError, ValueError):
287 compression_level_default = None
289 try:
290 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"]
291 except (KeyError, ValueError):
292 compression_type = "xz"
294 try:
295 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"])
296 except (KeyError, ValueError):
297 threads_max = 0
299 description = textwrap.dedent(
300 """\
301 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support
303 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as
304 soon as "dpkg-deb -b" grows support for the relevant features.
306 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package
307 without requiring root even for static ownership. It is a temporary stand-in for
308 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest.
310 The tool operates on an internal JSON based manifest for now, because it was faster
311 than building an mtree parser (which is the format that dpkg will likely end up
312 using).
314 As the tool is not meant to be used directly, it is full of annoying paper cuts that
315 I refuse to fix or maintain. Use the high level tool instead.
317 """
318 )
320 parser = ColorizedArgumentParser(
321 description=description,
322 formatter_class=argparse.RawDescriptionHelpFormatter,
323 allow_abbrev=False,
324 prog=program_name(),
325 )
326 parser.add_argument("--version", action="version", version=version())
327 parser.add_argument(
328 "package_root_dir",
329 metavar="PACKAGE_ROOT_DIR",
330 help="Root directory of the package. Must contain a DEBIAN directory",
331 )
332 parser.add_argument(
333 "package_output_path",
334 metavar="PATH",
335 help="Path where the package should be placed. If it is directory,"
336 " the base name will be determined from the package metadata",
337 )
339 parser.add_argument(
340 "--intermediate-package-manifest",
341 dest="package_manifest",
342 metavar="JSON_FILE",
343 action="store",
344 default=None,
345 help="INTERMEDIATE package manifest (JSON!)",
346 )
347 parser.add_argument(
348 "--root-owner-group",
349 dest="root_owner_group",
350 action="store_true",
351 help="Ignored. Accepted for compatibility with dpkg-deb -b",
352 )
353 parser.add_argument(
354 "-b",
355 "--build",
356 dest="build_param",
357 action="store_true",
358 help="Ignored. Accepted for compatibility with dpkg-deb",
359 )
360 parser.add_argument(
361 "--source-date-epoch",
362 dest="source_date_epoch",
363 action="store",
364 type=int,
365 default=None,
366 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable",
367 )
368 parser.add_argument(
369 "-Z",
370 dest="compression_algorithm",
371 choices=COMPRESSIONS,
372 default=compression_type,
373 help="The compression algorithm to be used",
374 )
375 parser.add_argument(
376 "-z",
377 dest="compression_level",
378 metavar="{0-9}",
379 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
380 default=compression_level_default,
381 type=int,
382 help="The compression level to be used",
383 )
384 parser.add_argument(
385 "-S",
386 dest="compression_strategy",
387 # We have a different default for xz when strategy is unset and we are building a udeb
388 action="store",
389 default=None,
390 help="The compression algorithm to be used. Concrete values depend on the compression"
391 ' algorithm, but the value "none" is always allowed',
392 )
393 parser.add_argument(
394 "--uniform-compression",
395 dest="uniform_compression",
396 action="store_true",
397 default=True,
398 help="Whether to use the same compression for the control.tar and the data.tar."
399 " The default is to use uniform compression.",
400 )
401 parser.add_argument(
402 "--no-uniform-compression",
403 dest="uniform_compression",
404 action="store_false",
405 default=True,
406 help="Disable uniform compression (see --uniform-compression)",
407 )
408 parser.add_argument(
409 "--threads-max",
410 dest="threads_max",
411 default=threads_max,
412 type=int,
413 help="Sets the maximum number of threads allowed for compressors that support multi-threaded operations",
414 )
415 parser.add_argument(
416 "-d",
417 "--debug",
418 dest="debug_mode",
419 action="store_true",
420 default=False,
421 help="Enable debug logging and raw stack traces on errors",
422 )
424 parsed_args = parser.parse_args()
425 parsed_args = _normalize_compression_args(parsed_args)
427 return parsed_args
430def _ctrl_member(
431 member_path: str,
432 fs_path: str | None = None,
433 path_type: PathType = PathType.FILE,
434 mode: int = 0o644,
435 mtime: int = 0,
436) -> TarMember:
437 if fs_path is None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 assert member_path.startswith("./")
439 fs_path = "DEBIAN" + member_path[1:]
440 return TarMember(
441 member_path=member_path,
442 path_type=path_type,
443 fs_path=fs_path,
444 mode=mode,
445 owner="root",
446 uid=0,
447 group="root",
448 gid=0,
449 mtime=mtime,
450 )
453CTRL_MEMBER_SCRIPTS = {
454 "postinst",
455 "preinst",
456 "postrm",
457 "prerm",
458 "config",
459 "isinstallable",
460}
463def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]:
464 debian_root = os.path.join(package_root_dir, "DEBIAN")
465 dir_st = os.stat(debian_root)
466 dir_mtime = int(dir_st.st_mtime)
467 yield _ctrl_member(
468 "./",
469 debian_root,
470 path_type=PathType.DIRECTORY,
471 mode=0o0755,
472 mtime=min(mtime, dir_mtime),
473 )
474 with os.scandir(debian_root) as dir_iter:
475 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")):
476 st = os.stat(ctrl_member)
477 if not stat.S_ISREG(st.st_mode): 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 _error(
479 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!"
480 )
481 file_mtime = int(st.st_mtime)
482 yield _ctrl_member(
483 f"./{ctrl_member.name}",
484 path_type=PathType.FILE,
485 fs_path=ctrl_member.path,
486 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644,
487 mtime=min(mtime, file_mtime),
488 )
491def parse_manifest(manifest_path: str | None) -> list["TarMember"]:
492 if manifest_path is None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 _error(f"--intermediate-package-manifest is mandatory for now")
494 return TarMember.parse_intermediate_manifest(manifest_path)
497def main() -> None:
498 setup_logging()
499 parsed_args = parse_args()
500 root_dir: str = parsed_args.package_root_dir
501 output_path: str = parsed_args.package_output_path
502 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
504 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm]
505 data_compression_cmd = data_compression.as_cmdline(parsed_args)
506 if parsed_args.uniform_compression:
507 ctrl_compression = data_compression
508 ctrl_compression_cmd = data_compression_cmd
509 else:
510 ctrl_compression = COMPRESSIONS["gzip"]
511 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None)
513 if output_path.endswith("/") or os.path.isdir(output_path):
514 deb_file = os.path.join(
515 output_path,
516 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False),
517 )
518 else:
519 deb_file = output_path
521 pack(
522 deb_file,
523 ctrl_compression,
524 data_compression,
525 root_dir,
526 parsed_args.package_manifest,
527 mtime,
528 ctrl_compression_cmd,
529 data_compression_cmd,
530 prefer_raw_exceptions=not parsed_args.debug_mode,
531 )
534def pack(
535 deb_file: str,
536 ctrl_compression: Compression,
537 data_compression: Compression,
538 root_dir: str,
539 package_manifest: str | None,
540 mtime: int,
541 ctrl_compression_cmd: list[str],
542 data_compression_cmd: list[str],
543 prefer_raw_exceptions: bool = False,
544) -> None:
545 data_tar_members = parse_manifest(package_manifest)
546 members = [
547 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"),
548 ArMember(
549 ctrl_compression.with_extension("control.tar"),
550 mtime,
551 write_to_impl=generate_tar_file_member(
552 _ctrl_tar_members(root_dir, mtime),
553 ctrl_compression_cmd,
554 ),
555 ),
556 ArMember(
557 data_compression.with_extension("data.tar"),
558 mtime,
559 write_to_impl=generate_tar_file_member(
560 data_tar_members,
561 data_compression_cmd,
562 ),
563 ),
564 ]
565 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions)
568if __name__ == "__main__":
569 main()