Coverage for src/debputy/commands/deb_packer.py: 56%
198 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1#!/usr/bin/python3 -B
2import argparse
3import errno
4import operator
5import os
6import stat
7import subprocess
8import tarfile
9import textwrap
10from typing import Optional, List, FrozenSet, BinaryIO, cast
11from collections.abc import Iterable, Callable
13from debputy.intermediate_manifest import TarMember, PathType
14from debputy.util import (
15 _error,
16 compute_output_filename,
17 resolve_source_date_epoch,
18 ColorizedArgumentParser,
19 setup_logging,
20 program_name,
21 assume_not_none,
22)
23from debputy.version import __version__
26# AR header / start of a deb file for reference
27# 00000000 21 3c 61 72 63 68 3e 0a 64 65 62 69 61 6e 2d 62 |!<arch>.debian-b|
28# 00000010 69 6e 61 72 79 20 20 20 31 36 36 38 39 37 33 36 |inary 16689736|
29# 00000020 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
30# 00000030 31 30 30 36 34 34 20 20 34 20 20 20 20 20 20 20 |100644 4 |
31# 00000040 20 20 60 0a 32 2e 30 0a 63 6f 6e 74 72 6f 6c 2e | `.2.0.control.|
32# 00000050 74 61 72 2e 78 7a 20 20 31 36 36 38 39 37 33 36 |tar.xz 16689736|
33# 00000060 39 35 20 20 30 20 20 20 20 20 30 20 20 20 20 20 |95 0 0 |
34# 00000070 31 30 30 36 34 34 20 20 39 33 36 38 20 20 20 20 |100644 9368 |
35# 00000080 20 20 60 0a fd 37 7a 58 5a 00 00 04 e6 d6 b4 46 | `..7zXZ......F|
38class ArMember:
39 def __init__(
40 self,
41 name: str,
42 mtime: int,
43 fixed_binary: bytes | None = None,
44 write_to_impl: Callable[[BinaryIO], None] | None = None,
45 ) -> None:
46 self.name = name
47 self._mtime = mtime
48 self._write_to_impl = write_to_impl
49 self.fixed_binary = fixed_binary
51 @property
52 def is_fixed_binary(self) -> bool:
53 return self.fixed_binary is not None
55 @property
56 def mtime(self) -> int:
57 return self.mtime
59 def write_to(self, fd: BinaryIO) -> None:
60 writer = self._write_to_impl
61 assert writer is not None
62 writer(fd)
65AR_HEADER_LEN = 60
66AR_HEADER = b" " * AR_HEADER_LEN
69def write_header(
70 fd: BinaryIO,
71 member: ArMember,
72 member_len: int,
73 mtime: int,
74) -> None:
75 header = b"%-16s%-12d0 0 100644 %-10d\x60\n" % (
76 member.name.encode("ascii"),
77 mtime,
78 member_len,
79 )
80 fd.write(header)
83def generate_ar_archive(
84 output_filename: str,
85 mtime: int,
86 members: Iterable[ArMember],
87 prefer_raw_exceptions: bool,
88) -> None:
89 try:
90 with open(output_filename, "wb", buffering=0) as fd:
91 fd.write(b"!<arch>\n")
92 for member in members:
93 if member.is_fixed_binary:
94 fixed_binary = assume_not_none(member.fixed_binary)
95 write_header(fd, member, len(fixed_binary), mtime)
96 fd.write(fixed_binary)
97 else:
98 header_pos = fd.tell()
99 fd.write(AR_HEADER)
100 member.write_to(fd)
101 current_pos = fd.tell()
102 fd.seek(header_pos, os.SEEK_SET)
103 content_len = current_pos - header_pos - AR_HEADER_LEN
104 assert content_len >= 0
105 write_header(fd, member, content_len, mtime)
106 fd.seek(current_pos, os.SEEK_SET)
107 except OSError as e:
108 if prefer_raw_exceptions:
109 raise
110 if e.errno == errno.ENOSPC:
111 _error(
112 f"Unable to write {output_filename}. The file system device reported disk full: {str(e)}"
113 )
114 elif e.errno == errno.EIO:
115 _error(
116 f"Unable to write {output_filename}. The file system reported a generic I/O error: {str(e)}"
117 )
118 elif e.errno == errno.EROFS:
119 _error(
120 f"Unable to write {output_filename}. The file system is read-only: {str(e)}"
121 )
122 raise
123 print(f"Generated {output_filename}")
126def _generate_tar_file(
127 tar_members: Iterable[TarMember],
128 compression_cmd: list[str],
129 write_to: BinaryIO,
130) -> None:
131 with (
132 subprocess.Popen(
133 compression_cmd, stdin=subprocess.PIPE, stdout=write_to
134 ) as compress_proc,
135 tarfile.open(
136 mode="w|",
137 fileobj=compress_proc.stdin,
138 format=tarfile.GNU_FORMAT,
139 errorlevel=1,
140 ) as tar_fd,
141 ):
142 for tar_member in tar_members:
143 tar_info: tarfile.TarInfo = tar_member.create_tar_info(tar_fd)
144 if tar_member.path_type == PathType.FILE:
145 with open(assume_not_none(tar_member.fs_path), "rb") as mfd:
146 tar_fd.addfile(tar_info, fileobj=mfd)
147 else:
148 tar_fd.addfile(tar_info)
149 compress_proc.wait()
150 if compress_proc.returncode != 0: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 _error(
152 f"Compression command {compression_cmd} failed with code {compress_proc.returncode}"
153 )
156def generate_tar_file_member(
157 tar_members: Iterable[TarMember],
158 compression_cmd: list[str],
159) -> Callable[[BinaryIO], None]:
160 def _impl(fd: BinaryIO) -> None:
161 _generate_tar_file(
162 tar_members,
163 compression_cmd,
164 fd,
165 )
167 return _impl
170def _xz_cmdline(
171 compression_rule: "Compression",
172 parsed_args: argparse.Namespace | None,
173) -> list[str]:
174 compression_level = compression_rule.effective_compression_level(parsed_args)
175 cmdline = ["xz", "-T2", "-" + str(compression_level)]
176 strategy = None if parsed_args is None else parsed_args.compression_strategy
177 if strategy is None: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true
178 strategy = "none"
179 if strategy != "none": 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 cmdline.append("--" + strategy)
181 cmdline.append("--no-adjust")
182 return cmdline
185def _gzip_cmdline(
186 compression_rule: "Compression",
187 parsed_args: argparse.Namespace | None,
188) -> list[str]:
189 compression_level = compression_rule.effective_compression_level(parsed_args)
190 cmdline = ["gzip", "-n" + str(compression_level)]
191 strategy = None if parsed_args is None else parsed_args.compression_strategy
192 if strategy is not None and strategy != "none":
193 raise ValueError(
194 f"Not implemented: Compression strategy {strategy}"
195 " for gzip is currently unsupported (but dpkg-deb does)"
196 )
197 return cmdline
200def _uncompressed_cmdline(
201 _unused_a: "Compression",
202 _unused_b: argparse.Namespace | None,
203) -> list[str]:
204 return ["cat"]
207class Compression:
208 def __init__(
209 self,
210 default_compression_level: int,
211 extension: str,
212 allowed_strategies: frozenset[str],
213 cmdline_builder: Callable[
214 ["Compression", argparse.Namespace | None], list[str]
215 ],
216 ) -> None:
217 self.default_compression_level = default_compression_level
218 self.extension = extension
219 self.allowed_strategies = allowed_strategies
220 self.cmdline_builder = cmdline_builder
222 def __repr__(self) -> str:
223 return f"<{self.__class__.__name__} {self.extension}>"
225 def effective_compression_level(
226 self, parsed_args: argparse.Namespace | None
227 ) -> int:
228 if parsed_args and parsed_args.compression_level is not None: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 return cast("int", parsed_args.compression_level)
230 return self.default_compression_level
232 def as_cmdline(self, parsed_args: argparse.Namespace | None) -> list[str]:
233 return self.cmdline_builder(self, parsed_args)
235 def with_extension(self, filename: str) -> str:
236 return filename + self.extension
239COMPRESSIONS = {
240 "xz": Compression(6, ".xz", frozenset({"none", "extreme"}), _xz_cmdline),
241 "gzip": Compression(
242 9,
243 ".gz",
244 frozenset({"none", "filtered", "huffman", "rle", "fixed"}),
245 _gzip_cmdline,
246 ),
247 "none": Compression(0, "", frozenset({"none"}), _uncompressed_cmdline),
248}
251def _normalize_compression_args(parsed_args: argparse.Namespace) -> argparse.Namespace:
252 if (
253 parsed_args.compression_level == 0
254 and parsed_args.compression_algorithm == "gzip"
255 ):
256 print(
257 "Note: Mapping compression algorithm to none for compatibility with dpkg-deb (due to -Zgzip -z0)"
258 )
259 setattr(parsed_args, "compression_algorithm", "none")
261 compression = COMPRESSIONS[parsed_args.compression_algorithm]
262 strategy = parsed_args.compression_strategy
263 if strategy is not None and strategy not in compression.allowed_strategies:
264 _error(
265 f'Compression algorithm "{parsed_args.compression_algorithm}" does not support compression strategy'
266 f' "{strategy}". Allowed values: {", ".join(sorted(compression.allowed_strategies))}'
267 )
268 return parsed_args
271def parse_args() -> argparse.Namespace:
272 try:
273 compression_level_default = int(os.environ["DPKG_DEB_COMPRESSOR_LEVEL"])
274 except (KeyError, ValueError):
275 compression_level_default = None
277 try:
278 compression_type = os.environ["DPKG_DEB_COMPRESSOR_TYPE"]
279 except (KeyError, ValueError):
280 compression_type = "xz"
282 try:
283 threads_max = int(os.environ["DPKG_DEB_THREADS_MAX"])
284 except (KeyError, ValueError):
285 threads_max = None
287 description = textwrap.dedent(
288 """\
289 THIS IS A PROTOTYPE "dpkg-deb -b" emulator with basic manifest support
291 DO NOT USE THIS TOOL DIRECTLY. It has not stability guarantees and will be removed as
292 soon as "dpkg-deb -b" grows support for the relevant features.
294 This tool is a prototype "dpkg-deb -b"-like interface for compiling a Debian package
295 without requiring root even for static ownership. It is a temporary stand-in for
296 "dpkg-deb -b" until "dpkg-deb -b" will get support for a manifest.
298 The tool operates on an internal JSON based manifest for now, because it was faster
299 than building an mtree parser (which is the format that dpkg will likely end up
300 using).
302 As the tool is not meant to be used directly, it is full of annoying paper cuts that
303 I refuse to fix or maintain. Use the high level tool instead.
305 """
306 )
308 parser = ColorizedArgumentParser(
309 description=description,
310 formatter_class=argparse.RawDescriptionHelpFormatter,
311 allow_abbrev=False,
312 prog=program_name(),
313 )
314 parser.add_argument("--version", action="version", version=__version__)
315 parser.add_argument(
316 "package_root_dir",
317 metavar="PACKAGE_ROOT_DIR",
318 help="Root directory of the package. Must contain a DEBIAN directory",
319 )
320 parser.add_argument(
321 "package_output_path",
322 metavar="PATH",
323 help="Path where the package should be placed. If it is directory,"
324 " the base name will be determined from the package metadata",
325 )
327 parser.add_argument(
328 "--intermediate-package-manifest",
329 dest="package_manifest",
330 metavar="JSON_FILE",
331 action="store",
332 default=None,
333 help="INTERMEDIATE package manifest (JSON!)",
334 )
335 parser.add_argument(
336 "--root-owner-group",
337 dest="root_owner_group",
338 action="store_true",
339 help="Ignored. Accepted for compatibility with dpkg-deb -b",
340 )
341 parser.add_argument(
342 "-b",
343 "--build",
344 dest="build_param",
345 action="store_true",
346 help="Ignored. Accepted for compatibility with dpkg-deb",
347 )
348 parser.add_argument(
349 "--source-date-epoch",
350 dest="source_date_epoch",
351 action="store",
352 type=int,
353 default=None,
354 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ variable",
355 )
356 parser.add_argument(
357 "-Z",
358 dest="compression_algorithm",
359 choices=COMPRESSIONS,
360 default=compression_type,
361 help="The compression algorithm to be used",
362 )
363 parser.add_argument(
364 "-z",
365 dest="compression_level",
366 metavar="{0-9}",
367 choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
368 default=compression_level_default,
369 type=int,
370 help="The compression level to be used",
371 )
372 parser.add_argument(
373 "-S",
374 dest="compression_strategy",
375 # We have a different default for xz when strategy is unset and we are building a udeb
376 action="store",
377 default=None,
378 help="The compression algorithm to be used. Concrete values depend on the compression"
379 ' algorithm, but the value "none" is always allowed',
380 )
381 parser.add_argument(
382 "--uniform-compression",
383 dest="uniform_compression",
384 action="store_true",
385 default=True,
386 help="Whether to use the same compression for the control.tar and the data.tar."
387 " The default is to use uniform compression.",
388 )
389 parser.add_argument(
390 "--no-uniform-compression",
391 dest="uniform_compression",
392 action="store_false",
393 default=True,
394 help="Disable uniform compression (see --uniform-compression)",
395 )
396 parser.add_argument(
397 "--threads-max",
398 dest="threads_max",
399 default=threads_max,
400 # TODO: Support this properly
401 type=int,
402 help="Ignored; accepted for compatibility",
403 )
404 parser.add_argument(
405 "-d",
406 "--debug",
407 dest="debug_mode",
408 action="store_true",
409 default=False,
410 help="Enable debug logging and raw stack traces on errors",
411 )
413 parsed_args = parser.parse_args()
414 parsed_args = _normalize_compression_args(parsed_args)
416 return parsed_args
419def _ctrl_member(
420 member_path: str,
421 fs_path: str | None = None,
422 path_type: PathType = PathType.FILE,
423 mode: int = 0o644,
424 mtime: int = 0,
425) -> TarMember:
426 if fs_path is None: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 assert member_path.startswith("./")
428 fs_path = "DEBIAN" + member_path[1:]
429 return TarMember(
430 member_path=member_path,
431 path_type=path_type,
432 fs_path=fs_path,
433 mode=mode,
434 owner="root",
435 uid=0,
436 group="root",
437 gid=0,
438 mtime=mtime,
439 )
442CTRL_MEMBER_SCRIPTS = {
443 "postinst",
444 "preinst",
445 "postrm",
446 "prerm",
447 "config",
448 "isinstallable",
449}
452def _ctrl_tar_members(package_root_dir: str, mtime: int) -> Iterable[TarMember]:
453 debian_root = os.path.join(package_root_dir, "DEBIAN")
454 dir_st = os.stat(debian_root)
455 dir_mtime = int(dir_st.st_mtime)
456 yield _ctrl_member(
457 "./",
458 debian_root,
459 path_type=PathType.DIRECTORY,
460 mode=0o0755,
461 mtime=min(mtime, dir_mtime),
462 )
463 with os.scandir(debian_root) as dir_iter:
464 for ctrl_member in sorted(dir_iter, key=operator.attrgetter("name")):
465 st = os.stat(ctrl_member)
466 if not stat.S_ISREG(st.st_mode): 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true
467 _error(
468 f"{ctrl_member.path} is not a file and all control.tar members ought to be files!"
469 )
470 file_mtime = int(st.st_mtime)
471 yield _ctrl_member(
472 f"./{ctrl_member.name}",
473 path_type=PathType.FILE,
474 fs_path=ctrl_member.path,
475 mode=0o0755 if ctrl_member.name in CTRL_MEMBER_SCRIPTS else 0o0644,
476 mtime=min(mtime, file_mtime),
477 )
480def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]":
481 if manifest_path is None: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 _error(f"--intermediate-package-manifest is mandatory for now")
483 return TarMember.parse_intermediate_manifest(manifest_path)
486def main() -> None:
487 setup_logging()
488 parsed_args = parse_args()
489 root_dir: str = parsed_args.package_root_dir
490 output_path: str = parsed_args.package_output_path
491 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
493 data_compression: Compression = COMPRESSIONS[parsed_args.compression_algorithm]
494 data_compression_cmd = data_compression.as_cmdline(parsed_args)
495 if parsed_args.uniform_compression:
496 ctrl_compression = data_compression
497 ctrl_compression_cmd = data_compression_cmd
498 else:
499 ctrl_compression = COMPRESSIONS["gzip"]
500 ctrl_compression_cmd = COMPRESSIONS["gzip"].as_cmdline(None)
502 if output_path.endswith("/") or os.path.isdir(output_path):
503 deb_file = os.path.join(
504 output_path,
505 compute_output_filename(os.path.join(root_dir, "DEBIAN"), False),
506 )
507 else:
508 deb_file = output_path
510 pack(
511 deb_file,
512 ctrl_compression,
513 data_compression,
514 root_dir,
515 parsed_args.package_manifest,
516 mtime,
517 ctrl_compression_cmd,
518 data_compression_cmd,
519 prefer_raw_exceptions=not parsed_args.debug_mode,
520 )
523def pack(
524 deb_file: str,
525 ctrl_compression: Compression,
526 data_compression: Compression,
527 root_dir: str,
528 package_manifest: "Optional[str]",
529 mtime: int,
530 ctrl_compression_cmd: list[str],
531 data_compression_cmd: list[str],
532 prefer_raw_exceptions: bool = False,
533) -> None:
534 data_tar_members = parse_manifest(package_manifest)
535 members = [
536 ArMember("debian-binary", mtime, fixed_binary=b"2.0\n"),
537 ArMember(
538 ctrl_compression.with_extension("control.tar"),
539 mtime,
540 write_to_impl=generate_tar_file_member(
541 _ctrl_tar_members(root_dir, mtime),
542 ctrl_compression_cmd,
543 ),
544 ),
545 ArMember(
546 data_compression.with_extension("data.tar"),
547 mtime,
548 write_to_impl=generate_tar_file_member(
549 data_tar_members,
550 data_compression_cmd,
551 ),
552 ),
553 ]
554 generate_ar_archive(deb_file, mtime, members, prefer_raw_exceptions)
557if __name__ == "__main__":
558 main()