Coverage for src/debputy/commands/deb_materialization.py: 9%

245 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-04-14 21:38 +0000

1#!/usr/bin/python3 -B 

2import argparse 

3import collections 

4import contextlib 

5import json 

6import logging 

7import os 

8import subprocess 

9import sys 

10import tempfile 

11import textwrap 

12from datetime import datetime 

13from typing import Optional, List 

14from collections.abc import Iterator 

15 

16from debputy.intermediate_manifest import ( 

17 TarMember, 

18 PathType, 

19 output_intermediate_manifest, 

20 output_intermediate_manifest_to_fd, 

21) 

22from debputy.util import ( 

23 _error, 

24 _info, 

25 compute_output_filename, 

26 resolve_source_date_epoch, 

27 ColorizedArgumentParser, 

28 setup_logging, 

29 detect_fakeroot, 

30 print_command, 

31 program_name, 

32 escape_shell, 

33) 

34from debputy.version import DEBPUTY_ROOT_DIR, version 

35 

36 

37def parse_args() -> argparse.Namespace: 

38 description = textwrap.dedent("""\ 

39 This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling 

40 the deb from a materialization. 

41 

42 The tool is not intended to be run directly by end users. 

43 """) 

44 

45 parser = ColorizedArgumentParser( 

46 description=description, 

47 formatter_class=argparse.RawDescriptionHelpFormatter, 

48 allow_abbrev=False, 

49 prog=program_name(), 

50 ) 

51 

52 parser.add_argument("--version", action="version", version=version()) 

53 parser.add_argument( 

54 "--verbose", 

55 default=False, 

56 action="store_true", 

57 dest="verbose", 

58 help="Make command verbose", 

59 ) 

60 

61 subparsers = parser.add_subparsers(dest="command", required=True) 

62 

63 materialize_deb_parser = subparsers.add_parser( 

64 "materialize-deb", 

65 allow_abbrev=False, 

66 help="Generate .deb/.udebs structure from a root directory and" 

67 " a *intermediate* debputy manifest", 

68 ) 

69 materialize_deb_parser.add_argument( 

70 "control_root_dir", 

71 metavar="control-root-dir", 

72 help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)", 

73 ) 

74 materialize_deb_parser.add_argument( 

75 "materialization_output", 

76 metavar="materialization_output", 

77 help="Where to place the resulting structure should be placed. Should not exist", 

78 ) 

79 materialize_deb_parser.add_argument( 

80 "--discard-existing-output", 

81 dest="discard_existing_output", 

82 default=False, 

83 action="store_true", 

84 help="If passed, then the output location may exist." 

85 " If it does, it will be *deleted*.", 

86 ) 

87 materialize_deb_parser.add_argument( 

88 "--source-date-epoch", 

89 dest="source_date_epoch", 

90 action="store", 

91 type=int, 

92 default=None, 

93 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ" 

94 " variable", 

95 ) 

96 materialize_deb_parser.add_argument( 

97 "--may-move-control-files", 

98 dest="may_move_control_files", 

99 action="store_true", 

100 default=False, 

101 help="Whether the command may optimize by moving (rather than copying) DEBIAN files", 

102 ) 

103 materialize_deb_parser.add_argument( 

104 "--may-move-data-files", 

105 dest="may_move_data_files", 

106 action="store_true", 

107 default=False, 

108 help="Whether the command may optimize by moving (rather than copying) when materializing", 

109 ) 

110 

111 materialize_deb_parser.add_argument( 

112 "--intermediate-package-manifest", 

113 dest="package_manifest", 

114 metavar="JSON_FILE", 

115 action="store", 

116 default=None, 

117 help="INTERMEDIATE package manifest (JSON!)", 

118 ) 

119 

120 materialize_deb_parser.add_argument( 

121 "--udeb", 

122 dest="udeb", 

123 default=False, 

124 action="store_true", 

125 help="Whether this is udeb package. Affects extension and default compression", 

126 ) 

127 

128 materialize_deb_parser.add_argument( 

129 "--build-method", 

130 dest="build_method", 

131 choices=["debputy", "dpkg-deb"], 

132 type=str, 

133 default=None, 

134 help="Immediately assemble the deb as well using the selected method", 

135 ) 

136 materialize_deb_parser.add_argument( 

137 "--assembled-deb-output", 

138 dest="assembled_deb_output", 

139 type=str, 

140 default=None, 

141 help="Where to place the resulting deb. Only applicable with --build-method", 

142 ) 

143 

144 # Added for "help only" - you cannot trigger this option in practice 

145 materialize_deb_parser.add_argument( 

146 "--", 

147 metavar="DPKG_DEB_ARGS", 

148 action="extend", 

149 nargs="+", 

150 dest="unused", 

151 help="Arguments to be passed to dpkg-deb" 

152 " (same as you might pass to dh_builddeb).", 

153 ) 

154 

155 build_deb_structure = subparsers.add_parser( 

156 "build-materialized-deb", 

157 allow_abbrev=False, 

158 help="Produce a .deb from a directory produced by the" 

159 " materialize-deb-structure command", 

160 ) 

161 build_deb_structure.add_argument( 

162 "materialized_deb_root_dir", 

163 metavar="materialized-deb-root-dir", 

164 help="The output directory of the materialize-deb-structure command", 

165 ) 

166 build_deb_structure.add_argument( 

167 "build_method", 

168 metavar="build-method", 

169 choices=["debputy", "dpkg-deb"], 

170 type=str, 

171 default="dpkg-deb", 

172 help="Which tool should assemble the deb", 

173 ) 

174 build_deb_structure.add_argument( 

175 "--output", type=str, default=None, help="Where to place the resulting deb" 

176 ) 

177 

178 argv = sys.argv 

179 try: 

180 i = argv.index("--") 

181 upstream_args = argv[i + 1 :] 

182 argv = argv[:i] 

183 except (IndexError, ValueError): 

184 upstream_args = [] 

185 parsed_args = parser.parse_args(argv[1:]) 

186 setattr(parsed_args, "upstream_args", upstream_args) 

187 if parsed_args.verbose: 

188 logging.getLogger().setLevel(logging.INFO) 

189 

190 return parsed_args 

191 

192 

193def _run(cmd: list[str]) -> None: 

194 print_command(*cmd) 

195 try: 

196 subprocess.check_call(cmd) 

197 except FileNotFoundError: 

198 _error(f" {escape_shell(*cmd)} failed! Command was not available in PATH") 

199 except subprocess.CalledProcessError: 

200 _error(f" {escape_shell(*cmd)} had a non-zero exit code.") 

201 

202 

203def strip_path_prefix(member_path: str) -> str: 

204 if not member_path.startswith("./"): 

205 _error( 

206 f'Invalid manifest: "{member_path}" does not start with "./", but all paths should' 

207 ) 

208 return member_path[2:] 

209 

210 

211def _perform_data_tar_materialization( 

212 output_packaging_root: str, 

213 intermediate_manifest: list[TarMember], 

214 may_move_data_files: bool, 

215) -> list[tuple[str, TarMember]]: 

216 start_time = datetime.now() 

217 replacement_manifest_paths = [] 

218 _info("Materializing data.tar part of the deb:") 

219 

220 directories = ["mkdir"] 

221 symlinks = [] 

222 bulk_copies: dict[str, list[str]] = collections.defaultdict(list) 

223 copies = [] 

224 renames = [] 

225 

226 for tar_member in intermediate_manifest: 

227 member_path = strip_path_prefix(tar_member.member_path) 

228 new_fs_path = ( 

229 os.path.join("deb-root", member_path) if member_path else "deb-root" 

230 ) 

231 materialization_path = ( 

232 f"{output_packaging_root}/{member_path}" 

233 if member_path 

234 else output_packaging_root 

235 ) 

236 replacement_tar_member = tar_member 

237 materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/")) 

238 if tar_member.path_type == PathType.DIRECTORY: 

239 directories.append(materialization_path) 

240 elif tar_member.path_type == PathType.SYMLINK: 

241 symlinks.append((tar_member.link_target, materialization_path)) 

242 elif tar_member.fs_path is not None: 

243 if tar_member.link_target: 

244 # Not sure if hardlinks gets here yet as we do not support hardlinks 

245 _error("Internal error; hardlink not supported") 

246 

247 if may_move_data_files and tar_member.may_steal_fs_path: 

248 renames.append((tar_member.fs_path, materialization_path)) 

249 elif os.path.basename(tar_member.fs_path) == os.path.basename( 

250 materialization_path 

251 ): 

252 bulk_copies[materialization_parent_dir].append(tar_member.fs_path) 

253 else: 

254 copies.append((tar_member.fs_path, materialization_path)) 

255 else: 

256 _error(f"Internal error; unsupported path type {tar_member.path_type}") 

257 

258 if tar_member.fs_path is not None: 

259 replacement_tar_member = tar_member.clone_and_replace( 

260 fs_path=new_fs_path, may_steal_fs_path=False 

261 ) 

262 

263 replacement_manifest_paths.append( 

264 (materialization_path, replacement_tar_member) 

265 ) 

266 

267 if len(directories) > 1: 

268 _run(directories) 

269 

270 for dest_dir, files in bulk_copies.items(): 

271 cmd = ["cp", "--reflink=auto", "-t", dest_dir] 

272 cmd.extend(files) 

273 _run(cmd) 

274 

275 for source, dest in copies: 

276 _run(["cp", "--reflink=auto", source, dest]) 

277 

278 for source, dest in renames: 

279 print_command("mv", source, dest) 

280 os.rename(source, dest) 

281 

282 for link_target, link_path in symlinks: 

283 print_command("ln", "-s", link_target, link_path) 

284 os.symlink(link_target, link_path) 

285 

286 end_time = datetime.now() 

287 

288 _info(f"Materialization of data.tar finished, took: {end_time - start_time}") 

289 

290 return replacement_manifest_paths 

291 

292 

293def materialize_deb( 

294 control_root_dir: str, 

295 intermediate_manifest_path: str | None, 

296 source_date_epoch: int, 

297 dpkg_deb_options: list[str], 

298 is_udeb: bool, 

299 output_dir: str, 

300 may_move_control_files: bool, 

301 may_move_data_files: bool, 

302) -> None: 

303 if not os.path.isfile(f"{control_root_dir}/control"): 

304 _error( 

305 f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)' 

306 ) 

307 intermediate_manifest: list[TarMember] = parse_manifest(intermediate_manifest_path) 

308 

309 output_packaging_root = os.path.join(output_dir, "deb-root") 

310 os.mkdir(output_dir) 

311 

312 replacement_manifest_paths = _perform_data_tar_materialization( 

313 output_packaging_root, intermediate_manifest, may_move_data_files 

314 ) 

315 for materialization_path, tar_member in reversed(replacement_manifest_paths): 

316 # TODO: Hardlinks should probably skip these commands 

317 if tar_member.path_type != PathType.SYMLINK: 

318 os.chmod(materialization_path, tar_member.mode, follow_symlinks=False) 

319 os.utime( 

320 materialization_path, 

321 (tar_member.mtime, tar_member.mtime), 

322 follow_symlinks=False, 

323 ) 

324 

325 materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN" 

326 if may_move_control_files: 

327 print_command("mv", control_root_dir, materialized_ctrl_dir) 

328 os.rename(control_root_dir, materialized_ctrl_dir) 

329 else: 

330 os.mkdir(materialized_ctrl_dir) 

331 copy_cmd = ["cp", "-a"] 

332 copy_cmd.extend( 

333 os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir) 

334 ) 

335 copy_cmd.append(materialized_ctrl_dir) 

336 _run(copy_cmd) 

337 

338 output_intermediate_manifest( 

339 os.path.join(output_dir, "deb-structure-intermediate-manifest.json"), 

340 [t[1] for t in replacement_manifest_paths], 

341 ) 

342 

343 with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd: 

344 serial_format = { 

345 "env": { 

346 "SOURCE_DATE_EPOCH": str(source_date_epoch), 

347 "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get( 

348 "DPKG_DEB_COMPRESSOR_LEVEL" 

349 ), 

350 "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"), 

351 "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"), 

352 }, 

353 "cli": {"dpkg-deb": dpkg_deb_options}, 

354 "udeb": is_udeb, 

355 } 

356 json.dump(serial_format, fd) 

357 

358 

359def apply_fs_metadata( 

360 materialized_path: str, 

361 tar_member: TarMember, 

362 apply_ownership: bool, 

363 is_using_fakeroot: bool, 

364) -> None: 

365 if apply_ownership: 

366 os.chown( 

367 materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False 

368 ) 

369 # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata. 

370 # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally 

371 # also prevents that problem. 

372 if tar_member.path_type != PathType.SYMLINK: 

373 os.chmod(materialized_path, tar_member.mode, follow_symlinks=False) 

374 os.utime( 

375 materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False 

376 ) 

377 if is_using_fakeroot: 

378 st = os.stat(materialized_path, follow_symlinks=False) 

379 if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid: 

380 _error( 

381 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.' 

382 " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python" 

383 " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)" 

384 ) 

385 

386 

387def _dpkg_deb_root_requirements( 

388 intermediate_manifest: list[TarMember], 

389) -> tuple[list[str], bool, bool]: 

390 needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest) 

391 if needs_root: 

392 if os.getuid() != 0: 

393 _error( 

394 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents' 

395 ) 

396 is_using_fakeroot = detect_fakeroot() 

397 deb_cmd = ["dpkg-deb"] 

398 _info("Applying ownership, mode, and utime from the intermediate manifest...") 

399 else: 

400 # fakeroot does not matter in this case 

401 is_using_fakeroot = False 

402 deb_cmd = ["dpkg-deb", "--root-owner-group"] 

403 _info("Applying mode and utime from the intermediate manifest...") 

404 return deb_cmd, needs_root, is_using_fakeroot 

405 

406 

407@contextlib.contextmanager 

408def maybe_with_materialized_manifest( 

409 content: list[TarMember] | None, 

410) -> Iterator[str | None]: 

411 if content is not None: 

412 with tempfile.NamedTemporaryFile( 

413 prefix="debputy-mat-build", 

414 mode="w+t", 

415 suffix=".json", 

416 encoding="utf-8", 

417 ) as fd: 

418 output_intermediate_manifest_to_fd(fd, content) 

419 fd.flush() 

420 yield fd.name 

421 else: 

422 yield None 

423 

424 

425def _prep_assembled_deb_output_path( 

426 output_path: str | None, 

427 materialized_deb_structure: str, 

428 deb_root: str, 

429 method: str, 

430 is_udeb: bool, 

431) -> str: 

432 if output_path is None: 

433 ext = "udeb" if is_udeb else "deb" 

434 output_dir = os.path.join(materialized_deb_structure, "output") 

435 if not os.path.isdir(output_dir): 

436 os.mkdir(output_dir) 

437 output = os.path.join(output_dir, f"{method}.{ext}") 

438 elif os.path.isdir(output_path): 

439 output = os.path.join( 

440 output_path, 

441 compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb), 

442 ) 

443 else: 

444 output = output_path 

445 return output 

446 

447 

448def _apply_env(env: dict[str, str | None]) -> None: 

449 for name, value in env.items(): 

450 if value is not None: 

451 os.environ[name] = value 

452 else: 

453 try: 

454 del os.environ[name] 

455 except KeyError: 

456 pass 

457 

458 

459def assemble_deb( 

460 materialized_deb_structure: str, 

461 method: str, 

462 output_path: str | None, 

463 combined_materialization_and_assembly: bool, 

464) -> None: 

465 deb_root = os.path.join(materialized_deb_structure, "deb-root") 

466 

467 with open(os.path.join(materialized_deb_structure, "env-and-cli.json")) as fd: 

468 serial_format = json.load(fd) 

469 

470 env = serial_format.get("env") or {} 

471 cli = serial_format.get("cli") or {} 

472 is_udeb = serial_format.get("udeb") 

473 source_date_epoch = env.get("SOURCE_DATE_EPOCH") 

474 dpkg_deb_options = cli.get("dpkg-deb") or [] 

475 intermediate_manifest_path = os.path.join( 

476 materialized_deb_structure, "deb-structure-intermediate-manifest.json" 

477 ) 

478 original_intermediate_manifest = TarMember.parse_intermediate_manifest( 

479 intermediate_manifest_path 

480 ) 

481 _info( 

482 "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..." 

483 ) 

484 intermediate_manifest = [ 

485 ( 

486 tar_member.clone_and_replace( 

487 fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path) 

488 ) 

489 if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/") 

490 else tar_member 

491 ) 

492 for tar_member in original_intermediate_manifest 

493 ] 

494 materialized_manifest = None 

495 if method == "debputy": 

496 materialized_manifest = intermediate_manifest 

497 

498 if source_date_epoch is None: 

499 _error( 

500 "Cannot reproduce the deb. No source date epoch provided in the materialized deb root." 

501 ) 

502 _apply_env(env) 

503 

504 output = _prep_assembled_deb_output_path( 

505 output_path, 

506 materialized_deb_structure, 

507 deb_root, 

508 method, 

509 is_udeb, 

510 ) 

511 

512 with maybe_with_materialized_manifest(materialized_manifest) as tmp_file: 

513 if method == "dpkg-deb": 

514 deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements( 

515 intermediate_manifest 

516 ) 

517 if needs_root or not combined_materialization_and_assembly: 

518 for tar_member in reversed(intermediate_manifest): 

519 p = os.path.join( 

520 deb_root, strip_path_prefix(tar_member.member_path) 

521 ) 

522 apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot) 

523 elif method == "debputy": 

524 deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py") 

525 assert tmp_file is not None 

526 deb_cmd = [ 

527 deb_packer, 

528 "--intermediate-package-manifest", 

529 tmp_file, 

530 "--source-date-epoch", 

531 source_date_epoch, 

532 ] 

533 else: 

534 _error(f"Internal error: Unsupported assembly method: {method}") 

535 

536 if is_udeb: 

537 deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"]) 

538 deb_cmd.extend(dpkg_deb_options) 

539 deb_cmd.extend(["--build", deb_root, output]) 

540 start_time = datetime.now() 

541 _run(deb_cmd) 

542 end_time = datetime.now() 

543 _info(f" - assembly command took {end_time - start_time}") 

544 

545 

546def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]": 

547 if manifest_path is None: 

548 _error("--intermediate-package-manifest is mandatory for now") 

549 return TarMember.parse_intermediate_manifest(manifest_path) 

550 

551 

552def main() -> None: 

553 setup_logging() 

554 parsed_args = parse_args() 

555 if parsed_args.command == "materialize-deb": 

556 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) 

557 dpkg_deb_args = parsed_args.upstream_args or [] 

558 output_dir = parsed_args.materialization_output 

559 if os.path.exists(output_dir): 

560 if not parsed_args.discard_existing_output: 

561 _error( 

562 "The output path already exists. Please either choose a non-existing path, delete the path" 

563 " or use --discard-existing-output (to have this command remove it as necessary)." 

564 ) 

565 _info( 

566 f'Removing existing path "{output_dir}" as requested by --discard-existing-output' 

567 ) 

568 _run(["rm", "-fr", output_dir]) 

569 

570 materialize_deb( 

571 parsed_args.control_root_dir, 

572 parsed_args.package_manifest, 

573 mtime, 

574 dpkg_deb_args, 

575 parsed_args.udeb, 

576 output_dir, 

577 parsed_args.may_move_control_files, 

578 parsed_args.may_move_data_files, 

579 ) 

580 

581 if parsed_args.build_method is not None: 

582 assemble_deb( 

583 output_dir, 

584 parsed_args.build_method, 

585 parsed_args.assembled_deb_output, 

586 True, 

587 ) 

588 

589 elif parsed_args.command == "build-materialized-deb": 

590 assemble_deb( 

591 parsed_args.materialized_deb_root_dir, 

592 parsed_args.build_method, 

593 parsed_args.output, 

594 False, 

595 ) 

596 else: 

597 _error(f'Internal error: Unimplemented command "{parsed_args.command}"') 

598 

599 

600if __name__ == "__main__": 

601 main()