Coverage for src/debputy/commands/deb_materialization.py: 9%
245 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-14 21:38 +0000
1#!/usr/bin/python3 -B
2import argparse
3import collections
4import contextlib
5import json
6import logging
7import os
8import subprocess
9import sys
10import tempfile
11import textwrap
12from datetime import datetime
13from typing import Optional, List
14from collections.abc import Iterator
16from debputy.intermediate_manifest import (
17 TarMember,
18 PathType,
19 output_intermediate_manifest,
20 output_intermediate_manifest_to_fd,
21)
22from debputy.util import (
23 _error,
24 _info,
25 compute_output_filename,
26 resolve_source_date_epoch,
27 ColorizedArgumentParser,
28 setup_logging,
29 detect_fakeroot,
30 print_command,
31 program_name,
32 escape_shell,
33)
34from debputy.version import DEBPUTY_ROOT_DIR, version
37def parse_args() -> argparse.Namespace:
38 description = textwrap.dedent("""\
39 This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling
40 the deb from a materialization.
42 The tool is not intended to be run directly by end users.
43 """)
45 parser = ColorizedArgumentParser(
46 description=description,
47 formatter_class=argparse.RawDescriptionHelpFormatter,
48 allow_abbrev=False,
49 prog=program_name(),
50 )
52 parser.add_argument("--version", action="version", version=version())
53 parser.add_argument(
54 "--verbose",
55 default=False,
56 action="store_true",
57 dest="verbose",
58 help="Make command verbose",
59 )
61 subparsers = parser.add_subparsers(dest="command", required=True)
63 materialize_deb_parser = subparsers.add_parser(
64 "materialize-deb",
65 allow_abbrev=False,
66 help="Generate .deb/.udebs structure from a root directory and"
67 " a *intermediate* debputy manifest",
68 )
69 materialize_deb_parser.add_argument(
70 "control_root_dir",
71 metavar="control-root-dir",
72 help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)",
73 )
74 materialize_deb_parser.add_argument(
75 "materialization_output",
76 metavar="materialization_output",
77 help="Where to place the resulting structure should be placed. Should not exist",
78 )
79 materialize_deb_parser.add_argument(
80 "--discard-existing-output",
81 dest="discard_existing_output",
82 default=False,
83 action="store_true",
84 help="If passed, then the output location may exist."
85 " If it does, it will be *deleted*.",
86 )
87 materialize_deb_parser.add_argument(
88 "--source-date-epoch",
89 dest="source_date_epoch",
90 action="store",
91 type=int,
92 default=None,
93 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ"
94 " variable",
95 )
96 materialize_deb_parser.add_argument(
97 "--may-move-control-files",
98 dest="may_move_control_files",
99 action="store_true",
100 default=False,
101 help="Whether the command may optimize by moving (rather than copying) DEBIAN files",
102 )
103 materialize_deb_parser.add_argument(
104 "--may-move-data-files",
105 dest="may_move_data_files",
106 action="store_true",
107 default=False,
108 help="Whether the command may optimize by moving (rather than copying) when materializing",
109 )
111 materialize_deb_parser.add_argument(
112 "--intermediate-package-manifest",
113 dest="package_manifest",
114 metavar="JSON_FILE",
115 action="store",
116 default=None,
117 help="INTERMEDIATE package manifest (JSON!)",
118 )
120 materialize_deb_parser.add_argument(
121 "--udeb",
122 dest="udeb",
123 default=False,
124 action="store_true",
125 help="Whether this is udeb package. Affects extension and default compression",
126 )
128 materialize_deb_parser.add_argument(
129 "--build-method",
130 dest="build_method",
131 choices=["debputy", "dpkg-deb"],
132 type=str,
133 default=None,
134 help="Immediately assemble the deb as well using the selected method",
135 )
136 materialize_deb_parser.add_argument(
137 "--assembled-deb-output",
138 dest="assembled_deb_output",
139 type=str,
140 default=None,
141 help="Where to place the resulting deb. Only applicable with --build-method",
142 )
144 # Added for "help only" - you cannot trigger this option in practice
145 materialize_deb_parser.add_argument(
146 "--",
147 metavar="DPKG_DEB_ARGS",
148 action="extend",
149 nargs="+",
150 dest="unused",
151 help="Arguments to be passed to dpkg-deb"
152 " (same as you might pass to dh_builddeb).",
153 )
155 build_deb_structure = subparsers.add_parser(
156 "build-materialized-deb",
157 allow_abbrev=False,
158 help="Produce a .deb from a directory produced by the"
159 " materialize-deb-structure command",
160 )
161 build_deb_structure.add_argument(
162 "materialized_deb_root_dir",
163 metavar="materialized-deb-root-dir",
164 help="The output directory of the materialize-deb-structure command",
165 )
166 build_deb_structure.add_argument(
167 "build_method",
168 metavar="build-method",
169 choices=["debputy", "dpkg-deb"],
170 type=str,
171 default="dpkg-deb",
172 help="Which tool should assemble the deb",
173 )
174 build_deb_structure.add_argument(
175 "--output", type=str, default=None, help="Where to place the resulting deb"
176 )
178 argv = sys.argv
179 try:
180 i = argv.index("--")
181 upstream_args = argv[i + 1 :]
182 argv = argv[:i]
183 except (IndexError, ValueError):
184 upstream_args = []
185 parsed_args = parser.parse_args(argv[1:])
186 setattr(parsed_args, "upstream_args", upstream_args)
187 if parsed_args.verbose:
188 logging.getLogger().setLevel(logging.INFO)
190 return parsed_args
193def _run(cmd: list[str]) -> None:
194 print_command(*cmd)
195 try:
196 subprocess.check_call(cmd)
197 except FileNotFoundError:
198 _error(f" {escape_shell(*cmd)} failed! Command was not available in PATH")
199 except subprocess.CalledProcessError:
200 _error(f" {escape_shell(*cmd)} had a non-zero exit code.")
203def strip_path_prefix(member_path: str) -> str:
204 if not member_path.startswith("./"):
205 _error(
206 f'Invalid manifest: "{member_path}" does not start with "./", but all paths should'
207 )
208 return member_path[2:]
211def _perform_data_tar_materialization(
212 output_packaging_root: str,
213 intermediate_manifest: list[TarMember],
214 may_move_data_files: bool,
215) -> list[tuple[str, TarMember]]:
216 start_time = datetime.now()
217 replacement_manifest_paths = []
218 _info("Materializing data.tar part of the deb:")
220 directories = ["mkdir"]
221 symlinks = []
222 bulk_copies: dict[str, list[str]] = collections.defaultdict(list)
223 copies = []
224 renames = []
226 for tar_member in intermediate_manifest:
227 member_path = strip_path_prefix(tar_member.member_path)
228 new_fs_path = (
229 os.path.join("deb-root", member_path) if member_path else "deb-root"
230 )
231 materialization_path = (
232 f"{output_packaging_root}/{member_path}"
233 if member_path
234 else output_packaging_root
235 )
236 replacement_tar_member = tar_member
237 materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/"))
238 if tar_member.path_type == PathType.DIRECTORY:
239 directories.append(materialization_path)
240 elif tar_member.path_type == PathType.SYMLINK:
241 symlinks.append((tar_member.link_target, materialization_path))
242 elif tar_member.fs_path is not None:
243 if tar_member.link_target:
244 # Not sure if hardlinks gets here yet as we do not support hardlinks
245 _error("Internal error; hardlink not supported")
247 if may_move_data_files and tar_member.may_steal_fs_path:
248 renames.append((tar_member.fs_path, materialization_path))
249 elif os.path.basename(tar_member.fs_path) == os.path.basename(
250 materialization_path
251 ):
252 bulk_copies[materialization_parent_dir].append(tar_member.fs_path)
253 else:
254 copies.append((tar_member.fs_path, materialization_path))
255 else:
256 _error(f"Internal error; unsupported path type {tar_member.path_type}")
258 if tar_member.fs_path is not None:
259 replacement_tar_member = tar_member.clone_and_replace(
260 fs_path=new_fs_path, may_steal_fs_path=False
261 )
263 replacement_manifest_paths.append(
264 (materialization_path, replacement_tar_member)
265 )
267 if len(directories) > 1:
268 _run(directories)
270 for dest_dir, files in bulk_copies.items():
271 cmd = ["cp", "--reflink=auto", "-t", dest_dir]
272 cmd.extend(files)
273 _run(cmd)
275 for source, dest in copies:
276 _run(["cp", "--reflink=auto", source, dest])
278 for source, dest in renames:
279 print_command("mv", source, dest)
280 os.rename(source, dest)
282 for link_target, link_path in symlinks:
283 print_command("ln", "-s", link_target, link_path)
284 os.symlink(link_target, link_path)
286 end_time = datetime.now()
288 _info(f"Materialization of data.tar finished, took: {end_time - start_time}")
290 return replacement_manifest_paths
293def materialize_deb(
294 control_root_dir: str,
295 intermediate_manifest_path: str | None,
296 source_date_epoch: int,
297 dpkg_deb_options: list[str],
298 is_udeb: bool,
299 output_dir: str,
300 may_move_control_files: bool,
301 may_move_data_files: bool,
302) -> None:
303 if not os.path.isfile(f"{control_root_dir}/control"):
304 _error(
305 f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)'
306 )
307 intermediate_manifest: list[TarMember] = parse_manifest(intermediate_manifest_path)
309 output_packaging_root = os.path.join(output_dir, "deb-root")
310 os.mkdir(output_dir)
312 replacement_manifest_paths = _perform_data_tar_materialization(
313 output_packaging_root, intermediate_manifest, may_move_data_files
314 )
315 for materialization_path, tar_member in reversed(replacement_manifest_paths):
316 # TODO: Hardlinks should probably skip these commands
317 if tar_member.path_type != PathType.SYMLINK:
318 os.chmod(materialization_path, tar_member.mode, follow_symlinks=False)
319 os.utime(
320 materialization_path,
321 (tar_member.mtime, tar_member.mtime),
322 follow_symlinks=False,
323 )
325 materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN"
326 if may_move_control_files:
327 print_command("mv", control_root_dir, materialized_ctrl_dir)
328 os.rename(control_root_dir, materialized_ctrl_dir)
329 else:
330 os.mkdir(materialized_ctrl_dir)
331 copy_cmd = ["cp", "-a"]
332 copy_cmd.extend(
333 os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir)
334 )
335 copy_cmd.append(materialized_ctrl_dir)
336 _run(copy_cmd)
338 output_intermediate_manifest(
339 os.path.join(output_dir, "deb-structure-intermediate-manifest.json"),
340 [t[1] for t in replacement_manifest_paths],
341 )
343 with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd:
344 serial_format = {
345 "env": {
346 "SOURCE_DATE_EPOCH": str(source_date_epoch),
347 "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get(
348 "DPKG_DEB_COMPRESSOR_LEVEL"
349 ),
350 "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"),
351 "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"),
352 },
353 "cli": {"dpkg-deb": dpkg_deb_options},
354 "udeb": is_udeb,
355 }
356 json.dump(serial_format, fd)
359def apply_fs_metadata(
360 materialized_path: str,
361 tar_member: TarMember,
362 apply_ownership: bool,
363 is_using_fakeroot: bool,
364) -> None:
365 if apply_ownership:
366 os.chown(
367 materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False
368 )
369 # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata.
370 # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally
371 # also prevents that problem.
372 if tar_member.path_type != PathType.SYMLINK:
373 os.chmod(materialized_path, tar_member.mode, follow_symlinks=False)
374 os.utime(
375 materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False
376 )
377 if is_using_fakeroot:
378 st = os.stat(materialized_path, follow_symlinks=False)
379 if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid:
380 _error(
381 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.'
382 " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python"
383 " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)"
384 )
387def _dpkg_deb_root_requirements(
388 intermediate_manifest: list[TarMember],
389) -> tuple[list[str], bool, bool]:
390 needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest)
391 if needs_root:
392 if os.getuid() != 0:
393 _error(
394 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents'
395 )
396 is_using_fakeroot = detect_fakeroot()
397 deb_cmd = ["dpkg-deb"]
398 _info("Applying ownership, mode, and utime from the intermediate manifest...")
399 else:
400 # fakeroot does not matter in this case
401 is_using_fakeroot = False
402 deb_cmd = ["dpkg-deb", "--root-owner-group"]
403 _info("Applying mode and utime from the intermediate manifest...")
404 return deb_cmd, needs_root, is_using_fakeroot
407@contextlib.contextmanager
408def maybe_with_materialized_manifest(
409 content: list[TarMember] | None,
410) -> Iterator[str | None]:
411 if content is not None:
412 with tempfile.NamedTemporaryFile(
413 prefix="debputy-mat-build",
414 mode="w+t",
415 suffix=".json",
416 encoding="utf-8",
417 ) as fd:
418 output_intermediate_manifest_to_fd(fd, content)
419 fd.flush()
420 yield fd.name
421 else:
422 yield None
425def _prep_assembled_deb_output_path(
426 output_path: str | None,
427 materialized_deb_structure: str,
428 deb_root: str,
429 method: str,
430 is_udeb: bool,
431) -> str:
432 if output_path is None:
433 ext = "udeb" if is_udeb else "deb"
434 output_dir = os.path.join(materialized_deb_structure, "output")
435 if not os.path.isdir(output_dir):
436 os.mkdir(output_dir)
437 output = os.path.join(output_dir, f"{method}.{ext}")
438 elif os.path.isdir(output_path):
439 output = os.path.join(
440 output_path,
441 compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb),
442 )
443 else:
444 output = output_path
445 return output
448def _apply_env(env: dict[str, str | None]) -> None:
449 for name, value in env.items():
450 if value is not None:
451 os.environ[name] = value
452 else:
453 try:
454 del os.environ[name]
455 except KeyError:
456 pass
459def assemble_deb(
460 materialized_deb_structure: str,
461 method: str,
462 output_path: str | None,
463 combined_materialization_and_assembly: bool,
464) -> None:
465 deb_root = os.path.join(materialized_deb_structure, "deb-root")
467 with open(os.path.join(materialized_deb_structure, "env-and-cli.json")) as fd:
468 serial_format = json.load(fd)
470 env = serial_format.get("env") or {}
471 cli = serial_format.get("cli") or {}
472 is_udeb = serial_format.get("udeb")
473 source_date_epoch = env.get("SOURCE_DATE_EPOCH")
474 dpkg_deb_options = cli.get("dpkg-deb") or []
475 intermediate_manifest_path = os.path.join(
476 materialized_deb_structure, "deb-structure-intermediate-manifest.json"
477 )
478 original_intermediate_manifest = TarMember.parse_intermediate_manifest(
479 intermediate_manifest_path
480 )
481 _info(
482 "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..."
483 )
484 intermediate_manifest = [
485 (
486 tar_member.clone_and_replace(
487 fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path)
488 )
489 if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/")
490 else tar_member
491 )
492 for tar_member in original_intermediate_manifest
493 ]
494 materialized_manifest = None
495 if method == "debputy":
496 materialized_manifest = intermediate_manifest
498 if source_date_epoch is None:
499 _error(
500 "Cannot reproduce the deb. No source date epoch provided in the materialized deb root."
501 )
502 _apply_env(env)
504 output = _prep_assembled_deb_output_path(
505 output_path,
506 materialized_deb_structure,
507 deb_root,
508 method,
509 is_udeb,
510 )
512 with maybe_with_materialized_manifest(materialized_manifest) as tmp_file:
513 if method == "dpkg-deb":
514 deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements(
515 intermediate_manifest
516 )
517 if needs_root or not combined_materialization_and_assembly:
518 for tar_member in reversed(intermediate_manifest):
519 p = os.path.join(
520 deb_root, strip_path_prefix(tar_member.member_path)
521 )
522 apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot)
523 elif method == "debputy":
524 deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py")
525 assert tmp_file is not None
526 deb_cmd = [
527 deb_packer,
528 "--intermediate-package-manifest",
529 tmp_file,
530 "--source-date-epoch",
531 source_date_epoch,
532 ]
533 else:
534 _error(f"Internal error: Unsupported assembly method: {method}")
536 if is_udeb:
537 deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"])
538 deb_cmd.extend(dpkg_deb_options)
539 deb_cmd.extend(["--build", deb_root, output])
540 start_time = datetime.now()
541 _run(deb_cmd)
542 end_time = datetime.now()
543 _info(f" - assembly command took {end_time - start_time}")
546def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]":
547 if manifest_path is None:
548 _error("--intermediate-package-manifest is mandatory for now")
549 return TarMember.parse_intermediate_manifest(manifest_path)
552def main() -> None:
553 setup_logging()
554 parsed_args = parse_args()
555 if parsed_args.command == "materialize-deb":
556 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
557 dpkg_deb_args = parsed_args.upstream_args or []
558 output_dir = parsed_args.materialization_output
559 if os.path.exists(output_dir):
560 if not parsed_args.discard_existing_output:
561 _error(
562 "The output path already exists. Please either choose a non-existing path, delete the path"
563 " or use --discard-existing-output (to have this command remove it as necessary)."
564 )
565 _info(
566 f'Removing existing path "{output_dir}" as requested by --discard-existing-output'
567 )
568 _run(["rm", "-fr", output_dir])
570 materialize_deb(
571 parsed_args.control_root_dir,
572 parsed_args.package_manifest,
573 mtime,
574 dpkg_deb_args,
575 parsed_args.udeb,
576 output_dir,
577 parsed_args.may_move_control_files,
578 parsed_args.may_move_data_files,
579 )
581 if parsed_args.build_method is not None:
582 assemble_deb(
583 output_dir,
584 parsed_args.build_method,
585 parsed_args.assembled_deb_output,
586 True,
587 )
589 elif parsed_args.command == "build-materialized-deb":
590 assemble_deb(
591 parsed_args.materialized_deb_root_dir,
592 parsed_args.build_method,
593 parsed_args.output,
594 False,
595 )
596 else:
597 _error(f'Internal error: Unimplemented command "{parsed_args.command}"')
600if __name__ == "__main__":
601 main()