Coverage for src/debputy/commands/deb_materialization.py: 9%
246 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1#!/usr/bin/python3 -B
2import argparse
3import collections
4import contextlib
5import json
6import logging
7import os
8import subprocess
9import sys
10import tempfile
11import textwrap
12from datetime import datetime
13from typing import Optional, List, Dict, Tuple
14from collections.abc import Iterator
16from debputy import DEBPUTY_ROOT_DIR
17from debputy.intermediate_manifest import (
18 TarMember,
19 PathType,
20 output_intermediate_manifest,
21 output_intermediate_manifest_to_fd,
22)
23from debputy.util import (
24 _error,
25 _info,
26 compute_output_filename,
27 resolve_source_date_epoch,
28 ColorizedArgumentParser,
29 setup_logging,
30 detect_fakeroot,
31 print_command,
32 program_name,
33 escape_shell,
34)
35from debputy.version import __version__
38def parse_args() -> argparse.Namespace:
39 description = textwrap.dedent(
40 """\
41 This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling
42 the deb from a materialization.
44 The tool is not intended to be run directly by end users.
45 """
46 )
48 parser = ColorizedArgumentParser(
49 description=description,
50 formatter_class=argparse.RawDescriptionHelpFormatter,
51 allow_abbrev=False,
52 prog=program_name(),
53 )
55 parser.add_argument("--version", action="version", version=__version__)
56 parser.add_argument(
57 "--verbose",
58 default=False,
59 action="store_true",
60 dest="verbose",
61 help="Make command verbose",
62 )
64 subparsers = parser.add_subparsers(dest="command", required=True)
66 materialize_deb_parser = subparsers.add_parser(
67 "materialize-deb",
68 allow_abbrev=False,
69 help="Generate .deb/.udebs structure from a root directory and"
70 " a *intermediate* debputy manifest",
71 )
72 materialize_deb_parser.add_argument(
73 "control_root_dir",
74 metavar="control-root-dir",
75 help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)",
76 )
77 materialize_deb_parser.add_argument(
78 "materialization_output",
79 metavar="materialization_output",
80 help="Where to place the resulting structure should be placed. Should not exist",
81 )
82 materialize_deb_parser.add_argument(
83 "--discard-existing-output",
84 dest="discard_existing_output",
85 default=False,
86 action="store_true",
87 help="If passed, then the output location may exist."
88 " If it does, it will be *deleted*.",
89 )
90 materialize_deb_parser.add_argument(
91 "--source-date-epoch",
92 dest="source_date_epoch",
93 action="store",
94 type=int,
95 default=None,
96 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ"
97 " variable",
98 )
99 materialize_deb_parser.add_argument(
100 "--may-move-control-files",
101 dest="may_move_control_files",
102 action="store_true",
103 default=False,
104 help="Whether the command may optimize by moving (rather than copying) DEBIAN files",
105 )
106 materialize_deb_parser.add_argument(
107 "--may-move-data-files",
108 dest="may_move_data_files",
109 action="store_true",
110 default=False,
111 help="Whether the command may optimize by moving (rather than copying) when materializing",
112 )
114 materialize_deb_parser.add_argument(
115 "--intermediate-package-manifest",
116 dest="package_manifest",
117 metavar="JSON_FILE",
118 action="store",
119 default=None,
120 help="INTERMEDIATE package manifest (JSON!)",
121 )
123 materialize_deb_parser.add_argument(
124 "--udeb",
125 dest="udeb",
126 default=False,
127 action="store_true",
128 help="Whether this is udeb package. Affects extension and default compression",
129 )
131 materialize_deb_parser.add_argument(
132 "--build-method",
133 dest="build_method",
134 choices=["debputy", "dpkg-deb"],
135 type=str,
136 default=None,
137 help="Immediately assemble the deb as well using the selected method",
138 )
139 materialize_deb_parser.add_argument(
140 "--assembled-deb-output",
141 dest="assembled_deb_output",
142 type=str,
143 default=None,
144 help="Where to place the resulting deb. Only applicable with --build-method",
145 )
147 # Added for "help only" - you cannot trigger this option in practice
148 materialize_deb_parser.add_argument(
149 "--",
150 metavar="DPKG_DEB_ARGS",
151 action="extend",
152 nargs="+",
153 dest="unused",
154 help="Arguments to be passed to dpkg-deb"
155 " (same as you might pass to dh_builddeb).",
156 )
158 build_deb_structure = subparsers.add_parser(
159 "build-materialized-deb",
160 allow_abbrev=False,
161 help="Produce a .deb from a directory produced by the"
162 " materialize-deb-structure command",
163 )
164 build_deb_structure.add_argument(
165 "materialized_deb_root_dir",
166 metavar="materialized-deb-root-dir",
167 help="The output directory of the materialize-deb-structure command",
168 )
169 build_deb_structure.add_argument(
170 "build_method",
171 metavar="build-method",
172 choices=["debputy", "dpkg-deb"],
173 type=str,
174 default="dpkg-deb",
175 help="Which tool should assemble the deb",
176 )
177 build_deb_structure.add_argument(
178 "--output", type=str, default=None, help="Where to place the resulting deb"
179 )
181 argv = sys.argv
182 try:
183 i = argv.index("--")
184 upstream_args = argv[i + 1 :]
185 argv = argv[:i]
186 except (IndexError, ValueError):
187 upstream_args = []
188 parsed_args = parser.parse_args(argv[1:])
189 setattr(parsed_args, "upstream_args", upstream_args)
190 if parsed_args.verbose:
191 logging.getLogger().setLevel(logging.INFO)
193 return parsed_args
196def _run(cmd: list[str]) -> None:
197 print_command(*cmd)
198 try:
199 subprocess.check_call(cmd)
200 except FileNotFoundError:
201 _error(f" {escape_shell(*cmd)} failed! Command was not available in PATH")
202 except subprocess.CalledProcessError:
203 _error(f" {escape_shell(*cmd)} had a non-zero exit code.")
206def strip_path_prefix(member_path: str) -> str:
207 if not member_path.startswith("./"):
208 _error(
209 f'Invalid manifest: "{member_path}" does not start with "./", but all paths should'
210 )
211 return member_path[2:]
214def _perform_data_tar_materialization(
215 output_packaging_root: str,
216 intermediate_manifest: list[TarMember],
217 may_move_data_files: bool,
218) -> list[tuple[str, TarMember]]:
219 start_time = datetime.now()
220 replacement_manifest_paths = []
221 _info("Materializing data.tar part of the deb:")
223 directories = ["mkdir"]
224 symlinks = []
225 bulk_copies: dict[str, list[str]] = collections.defaultdict(list)
226 copies = []
227 renames = []
229 for tar_member in intermediate_manifest:
230 member_path = strip_path_prefix(tar_member.member_path)
231 new_fs_path = (
232 os.path.join("deb-root", member_path) if member_path else "deb-root"
233 )
234 materialization_path = (
235 f"{output_packaging_root}/{member_path}"
236 if member_path
237 else output_packaging_root
238 )
239 replacement_tar_member = tar_member
240 materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/"))
241 if tar_member.path_type == PathType.DIRECTORY:
242 directories.append(materialization_path)
243 elif tar_member.path_type == PathType.SYMLINK:
244 symlinks.append((tar_member.link_target, materialization_path))
245 elif tar_member.fs_path is not None:
246 if tar_member.link_target:
247 # Not sure if hardlinks gets here yet as we do not support hardlinks
248 _error("Internal error; hardlink not supported")
250 if may_move_data_files and tar_member.may_steal_fs_path:
251 renames.append((tar_member.fs_path, materialization_path))
252 elif os.path.basename(tar_member.fs_path) == os.path.basename(
253 materialization_path
254 ):
255 bulk_copies[materialization_parent_dir].append(tar_member.fs_path)
256 else:
257 copies.append((tar_member.fs_path, materialization_path))
258 else:
259 _error(f"Internal error; unsupported path type {tar_member.path_type}")
261 if tar_member.fs_path is not None:
262 replacement_tar_member = tar_member.clone_and_replace(
263 fs_path=new_fs_path, may_steal_fs_path=False
264 )
266 replacement_manifest_paths.append(
267 (materialization_path, replacement_tar_member)
268 )
270 if len(directories) > 1:
271 _run(directories)
273 for dest_dir, files in bulk_copies.items():
274 cmd = ["cp", "--reflink=auto", "-t", dest_dir]
275 cmd.extend(files)
276 _run(cmd)
278 for source, dest in copies:
279 _run(["cp", "--reflink=auto", source, dest])
281 for source, dest in renames:
282 print_command("mv", source, dest)
283 os.rename(source, dest)
285 for link_target, link_path in symlinks:
286 print_command("ln", "-s", link_target, link_path)
287 os.symlink(link_target, link_path)
289 end_time = datetime.now()
291 _info(f"Materialization of data.tar finished, took: {end_time - start_time}")
293 return replacement_manifest_paths
296def materialize_deb(
297 control_root_dir: str,
298 intermediate_manifest_path: str | None,
299 source_date_epoch: int,
300 dpkg_deb_options: list[str],
301 is_udeb: bool,
302 output_dir: str,
303 may_move_control_files: bool,
304 may_move_data_files: bool,
305) -> None:
306 if not os.path.isfile(f"{control_root_dir}/control"):
307 _error(
308 f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)'
309 )
310 intermediate_manifest: list[TarMember] = parse_manifest(intermediate_manifest_path)
312 output_packaging_root = os.path.join(output_dir, "deb-root")
313 os.mkdir(output_dir)
315 replacement_manifest_paths = _perform_data_tar_materialization(
316 output_packaging_root, intermediate_manifest, may_move_data_files
317 )
318 for materialization_path, tar_member in reversed(replacement_manifest_paths):
319 # TODO: Hardlinks should probably skip these commands
320 if tar_member.path_type != PathType.SYMLINK:
321 os.chmod(materialization_path, tar_member.mode, follow_symlinks=False)
322 os.utime(
323 materialization_path,
324 (tar_member.mtime, tar_member.mtime),
325 follow_symlinks=False,
326 )
328 materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN"
329 if may_move_control_files:
330 print_command("mv", control_root_dir, materialized_ctrl_dir)
331 os.rename(control_root_dir, materialized_ctrl_dir)
332 else:
333 os.mkdir(materialized_ctrl_dir)
334 copy_cmd = ["cp", "-a"]
335 copy_cmd.extend(
336 os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir)
337 )
338 copy_cmd.append(materialized_ctrl_dir)
339 _run(copy_cmd)
341 output_intermediate_manifest(
342 os.path.join(output_dir, "deb-structure-intermediate-manifest.json"),
343 [t[1] for t in replacement_manifest_paths],
344 )
346 with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd:
347 serial_format = {
348 "env": {
349 "SOURCE_DATE_EPOCH": str(source_date_epoch),
350 "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get(
351 "DPKG_DEB_COMPRESSOR_LEVEL"
352 ),
353 "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"),
354 "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"),
355 },
356 "cli": {"dpkg-deb": dpkg_deb_options},
357 "udeb": is_udeb,
358 }
359 json.dump(serial_format, fd)
362def apply_fs_metadata(
363 materialized_path: str,
364 tar_member: TarMember,
365 apply_ownership: bool,
366 is_using_fakeroot: bool,
367) -> None:
368 if apply_ownership:
369 os.chown(
370 materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False
371 )
372 # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata.
373 # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally
374 # also prevents that problem.
375 if tar_member.path_type != PathType.SYMLINK:
376 os.chmod(materialized_path, tar_member.mode, follow_symlinks=False)
377 os.utime(
378 materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False
379 )
380 if is_using_fakeroot:
381 st = os.stat(materialized_path, follow_symlinks=False)
382 if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid:
383 _error(
384 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.'
385 " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python"
386 " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)"
387 )
390def _dpkg_deb_root_requirements(
391 intermediate_manifest: list[TarMember],
392) -> tuple[list[str], bool, bool]:
393 needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest)
394 if needs_root:
395 if os.getuid() != 0:
396 _error(
397 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents'
398 )
399 is_using_fakeroot = detect_fakeroot()
400 deb_cmd = ["dpkg-deb"]
401 _info("Applying ownership, mode, and utime from the intermediate manifest...")
402 else:
403 # fakeroot does not matter in this case
404 is_using_fakeroot = False
405 deb_cmd = ["dpkg-deb", "--root-owner-group"]
406 _info("Applying mode and utime from the intermediate manifest...")
407 return deb_cmd, needs_root, is_using_fakeroot
410@contextlib.contextmanager
411def maybe_with_materialized_manifest(
412 content: list[TarMember] | None,
413) -> Iterator[str | None]:
414 if content is not None:
415 with tempfile.NamedTemporaryFile(
416 prefix="debputy-mat-build",
417 mode="w+t",
418 suffix=".json",
419 encoding="utf-8",
420 ) as fd:
421 output_intermediate_manifest_to_fd(fd, content)
422 fd.flush()
423 yield fd.name
424 else:
425 yield None
428def _prep_assembled_deb_output_path(
429 output_path: str | None,
430 materialized_deb_structure: str,
431 deb_root: str,
432 method: str,
433 is_udeb: bool,
434) -> str:
435 if output_path is None:
436 ext = "udeb" if is_udeb else "deb"
437 output_dir = os.path.join(materialized_deb_structure, "output")
438 if not os.path.isdir(output_dir):
439 os.mkdir(output_dir)
440 output = os.path.join(output_dir, f"{method}.{ext}")
441 elif os.path.isdir(output_path):
442 output = os.path.join(
443 output_path,
444 compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb),
445 )
446 else:
447 output = output_path
448 return output
451def _apply_env(env: dict[str, str | None]) -> None:
452 for name, value in env.items():
453 if value is not None:
454 os.environ[name] = value
455 else:
456 try:
457 del os.environ[name]
458 except KeyError:
459 pass
462def assemble_deb(
463 materialized_deb_structure: str,
464 method: str,
465 output_path: str | None,
466 combined_materialization_and_assembly: bool,
467) -> None:
468 deb_root = os.path.join(materialized_deb_structure, "deb-root")
470 with open(os.path.join(materialized_deb_structure, "env-and-cli.json")) as fd:
471 serial_format = json.load(fd)
473 env = serial_format.get("env") or {}
474 cli = serial_format.get("cli") or {}
475 is_udeb = serial_format.get("udeb")
476 source_date_epoch = env.get("SOURCE_DATE_EPOCH")
477 dpkg_deb_options = cli.get("dpkg-deb") or []
478 intermediate_manifest_path = os.path.join(
479 materialized_deb_structure, "deb-structure-intermediate-manifest.json"
480 )
481 original_intermediate_manifest = TarMember.parse_intermediate_manifest(
482 intermediate_manifest_path
483 )
484 _info(
485 "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..."
486 )
487 intermediate_manifest = [
488 (
489 tar_member.clone_and_replace(
490 fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path)
491 )
492 if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/")
493 else tar_member
494 )
495 for tar_member in original_intermediate_manifest
496 ]
497 materialized_manifest = None
498 if method == "debputy":
499 materialized_manifest = intermediate_manifest
501 if source_date_epoch is None:
502 _error(
503 "Cannot reproduce the deb. No source date epoch provided in the materialized deb root."
504 )
505 _apply_env(env)
507 output = _prep_assembled_deb_output_path(
508 output_path,
509 materialized_deb_structure,
510 deb_root,
511 method,
512 is_udeb,
513 )
515 with maybe_with_materialized_manifest(materialized_manifest) as tmp_file:
516 if method == "dpkg-deb":
517 deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements(
518 intermediate_manifest
519 )
520 if needs_root or not combined_materialization_and_assembly:
521 for tar_member in reversed(intermediate_manifest):
522 p = os.path.join(
523 deb_root, strip_path_prefix(tar_member.member_path)
524 )
525 apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot)
526 elif method == "debputy":
527 deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py")
528 assert tmp_file is not None
529 deb_cmd = [
530 deb_packer,
531 "--intermediate-package-manifest",
532 tmp_file,
533 "--source-date-epoch",
534 source_date_epoch,
535 ]
536 else:
537 _error(f"Internal error: Unsupported assembly method: {method}")
539 if is_udeb:
540 deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"])
541 deb_cmd.extend(dpkg_deb_options)
542 deb_cmd.extend(["--build", deb_root, output])
543 start_time = datetime.now()
544 _run(deb_cmd)
545 end_time = datetime.now()
546 _info(f" - assembly command took {end_time - start_time}")
549def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]":
550 if manifest_path is None:
551 _error("--intermediate-package-manifest is mandatory for now")
552 return TarMember.parse_intermediate_manifest(manifest_path)
555def main() -> None:
556 setup_logging()
557 parsed_args = parse_args()
558 if parsed_args.command == "materialize-deb":
559 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch)
560 dpkg_deb_args = parsed_args.upstream_args or []
561 output_dir = parsed_args.materialization_output
562 if os.path.exists(output_dir):
563 if not parsed_args.discard_existing_output:
564 _error(
565 "The output path already exists. Please either choose a non-existing path, delete the path"
566 " or use --discard-existing-output (to have this command remove it as necessary)."
567 )
568 _info(
569 f'Removing existing path "{output_dir}" as requested by --discard-existing-output'
570 )
571 _run(["rm", "-fr", output_dir])
573 materialize_deb(
574 parsed_args.control_root_dir,
575 parsed_args.package_manifest,
576 mtime,
577 dpkg_deb_args,
578 parsed_args.udeb,
579 output_dir,
580 parsed_args.may_move_control_files,
581 parsed_args.may_move_data_files,
582 )
584 if parsed_args.build_method is not None:
585 assemble_deb(
586 output_dir,
587 parsed_args.build_method,
588 parsed_args.assembled_deb_output,
589 True,
590 )
592 elif parsed_args.command == "build-materialized-deb":
593 assemble_deb(
594 parsed_args.materialized_deb_root_dir,
595 parsed_args.build_method,
596 parsed_args.output,
597 False,
598 )
599 else:
600 _error(f'Internal error: Unimplemented command "{parsed_args.command}"')
603if __name__ == "__main__":
604 main()