Coverage for src/debputy/commands/deb_materialization.py: 9%

245 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1#!/usr/bin/python3 -B 

2import argparse 

3import collections 

4import contextlib 

5import json 

6import logging 

7import os 

8import subprocess 

9import sys 

10import tempfile 

11import textwrap 

12from datetime import datetime 

13from typing import Optional, List, Iterator, Dict, Tuple 

14 

15from debputy import DEBPUTY_ROOT_DIR 

16from debputy.intermediate_manifest import ( 

17 TarMember, 

18 PathType, 

19 output_intermediate_manifest, 

20 output_intermediate_manifest_to_fd, 

21) 

22from debputy.util import ( 

23 _error, 

24 _info, 

25 compute_output_filename, 

26 resolve_source_date_epoch, 

27 ColorizedArgumentParser, 

28 setup_logging, 

29 detect_fakeroot, 

30 print_command, 

31 program_name, 

32 escape_shell, 

33) 

34from debputy.version import __version__ 

35 

36 

37def parse_args() -> argparse.Namespace: 

38 description = textwrap.dedent( 

39 """\ 

40 This is a low level tool for materializing deb packages from intermediate debputy manifests or assembling 

41 the deb from a materialization. 

42 

43 The tool is not intended to be run directly by end users. 

44 """ 

45 ) 

46 

47 parser = ColorizedArgumentParser( 

48 description=description, 

49 formatter_class=argparse.RawDescriptionHelpFormatter, 

50 allow_abbrev=False, 

51 prog=program_name(), 

52 ) 

53 

54 parser.add_argument("--version", action="version", version=__version__) 

55 parser.add_argument( 

56 "--verbose", 

57 default=False, 

58 action="store_true", 

59 dest="verbose", 

60 help="Make command verbose", 

61 ) 

62 

63 subparsers = parser.add_subparsers(dest="command", required=True) 

64 

65 materialize_deb_parser = subparsers.add_parser( 

66 "materialize-deb", 

67 allow_abbrev=False, 

68 help="Generate .deb/.udebs structure from a root directory and" 

69 " a *intermediate* debputy manifest", 

70 ) 

71 materialize_deb_parser.add_argument( 

72 "control_root_dir", 

73 metavar="control-root-dir", 

74 help="A directory that contains the control files (usually debian/<pkg>/DEBIAN)", 

75 ) 

76 materialize_deb_parser.add_argument( 

77 "materialization_output", 

78 metavar="materialization_output", 

79 help="Where to place the resulting structure should be placed. Should not exist", 

80 ) 

81 materialize_deb_parser.add_argument( 

82 "--discard-existing-output", 

83 dest="discard_existing_output", 

84 default=False, 

85 action="store_true", 

86 help="If passed, then the output location may exist." 

87 " If it does, it will be *deleted*.", 

88 ) 

89 materialize_deb_parser.add_argument( 

90 "--source-date-epoch", 

91 dest="source_date_epoch", 

92 action="store", 

93 type=int, 

94 default=None, 

95 help="Source date epoch (can also be given via the SOURCE_DATE_EPOCH environ" 

96 " variable", 

97 ) 

98 materialize_deb_parser.add_argument( 

99 "--may-move-control-files", 

100 dest="may_move_control_files", 

101 action="store_true", 

102 default=False, 

103 help="Whether the command may optimize by moving (rather than copying) DEBIAN files", 

104 ) 

105 materialize_deb_parser.add_argument( 

106 "--may-move-data-files", 

107 dest="may_move_data_files", 

108 action="store_true", 

109 default=False, 

110 help="Whether the command may optimize by moving (rather than copying) when materializing", 

111 ) 

112 

113 materialize_deb_parser.add_argument( 

114 "--intermediate-package-manifest", 

115 dest="package_manifest", 

116 metavar="JSON_FILE", 

117 action="store", 

118 default=None, 

119 help="INTERMEDIATE package manifest (JSON!)", 

120 ) 

121 

122 materialize_deb_parser.add_argument( 

123 "--udeb", 

124 dest="udeb", 

125 default=False, 

126 action="store_true", 

127 help="Whether this is udeb package. Affects extension and default compression", 

128 ) 

129 

130 materialize_deb_parser.add_argument( 

131 "--build-method", 

132 dest="build_method", 

133 choices=["debputy", "dpkg-deb"], 

134 type=str, 

135 default=None, 

136 help="Immediately assemble the deb as well using the selected method", 

137 ) 

138 materialize_deb_parser.add_argument( 

139 "--assembled-deb-output", 

140 dest="assembled_deb_output", 

141 type=str, 

142 default=None, 

143 help="Where to place the resulting deb. Only applicable with --build-method", 

144 ) 

145 

146 # Added for "help only" - you cannot trigger this option in practice 

147 materialize_deb_parser.add_argument( 

148 "--", 

149 metavar="DPKG_DEB_ARGS", 

150 action="extend", 

151 nargs="+", 

152 dest="unused", 

153 help="Arguments to be passed to dpkg-deb" 

154 " (same as you might pass to dh_builddeb).", 

155 ) 

156 

157 build_deb_structure = subparsers.add_parser( 

158 "build-materialized-deb", 

159 allow_abbrev=False, 

160 help="Produce a .deb from a directory produced by the" 

161 " materialize-deb-structure command", 

162 ) 

163 build_deb_structure.add_argument( 

164 "materialized_deb_root_dir", 

165 metavar="materialized-deb-root-dir", 

166 help="The output directory of the materialize-deb-structure command", 

167 ) 

168 build_deb_structure.add_argument( 

169 "build_method", 

170 metavar="build-method", 

171 choices=["debputy", "dpkg-deb"], 

172 type=str, 

173 default="dpkg-deb", 

174 help="Which tool should assemble the deb", 

175 ) 

176 build_deb_structure.add_argument( 

177 "--output", type=str, default=None, help="Where to place the resulting deb" 

178 ) 

179 

180 argv = sys.argv 

181 try: 

182 i = argv.index("--") 

183 upstream_args = argv[i + 1 :] 

184 argv = argv[:i] 

185 except (IndexError, ValueError): 

186 upstream_args = [] 

187 parsed_args = parser.parse_args(argv[1:]) 

188 setattr(parsed_args, "upstream_args", upstream_args) 

189 if parsed_args.verbose: 

190 logging.getLogger().setLevel(logging.INFO) 

191 

192 return parsed_args 

193 

194 

195def _run(cmd: List[str]) -> None: 

196 print_command(*cmd) 

197 try: 

198 subprocess.check_call(cmd) 

199 except FileNotFoundError: 

200 _error(f" {escape_shell(*cmd)} failed! Command was not available in PATH") 

201 except subprocess.CalledProcessError: 

202 _error(f" {escape_shell(*cmd)} had a non-zero exit code.") 

203 

204 

205def strip_path_prefix(member_path: str) -> str: 

206 if not member_path.startswith("./"): 

207 _error( 

208 f'Invalid manifest: "{member_path}" does not start with "./", but all paths should' 

209 ) 

210 return member_path[2:] 

211 

212 

213def _perform_data_tar_materialization( 

214 output_packaging_root: str, 

215 intermediate_manifest: List[TarMember], 

216 may_move_data_files: bool, 

217) -> List[Tuple[str, TarMember]]: 

218 start_time = datetime.now() 

219 replacement_manifest_paths = [] 

220 _info("Materializing data.tar part of the deb:") 

221 

222 directories = ["mkdir"] 

223 symlinks = [] 

224 bulk_copies: Dict[str, List[str]] = collections.defaultdict(list) 

225 copies = [] 

226 renames = [] 

227 

228 for tar_member in intermediate_manifest: 

229 member_path = strip_path_prefix(tar_member.member_path) 

230 new_fs_path = ( 

231 os.path.join("deb-root", member_path) if member_path else "deb-root" 

232 ) 

233 materialization_path = ( 

234 f"{output_packaging_root}/{member_path}" 

235 if member_path 

236 else output_packaging_root 

237 ) 

238 replacement_tar_member = tar_member 

239 materialization_parent_dir = os.path.dirname(materialization_path.rstrip("/")) 

240 if tar_member.path_type == PathType.DIRECTORY: 

241 directories.append(materialization_path) 

242 elif tar_member.path_type == PathType.SYMLINK: 

243 symlinks.append((tar_member.link_target, materialization_path)) 

244 elif tar_member.fs_path is not None: 

245 if tar_member.link_target: 

246 # Not sure if hardlinks gets here yet as we do not support hardlinks 

247 _error("Internal error; hardlink not supported") 

248 

249 if may_move_data_files and tar_member.may_steal_fs_path: 

250 renames.append((tar_member.fs_path, materialization_path)) 

251 elif os.path.basename(tar_member.fs_path) == os.path.basename( 

252 materialization_path 

253 ): 

254 bulk_copies[materialization_parent_dir].append(tar_member.fs_path) 

255 else: 

256 copies.append((tar_member.fs_path, materialization_path)) 

257 else: 

258 _error(f"Internal error; unsupported path type {tar_member.path_type}") 

259 

260 if tar_member.fs_path is not None: 

261 replacement_tar_member = tar_member.clone_and_replace( 

262 fs_path=new_fs_path, may_steal_fs_path=False 

263 ) 

264 

265 replacement_manifest_paths.append( 

266 (materialization_path, replacement_tar_member) 

267 ) 

268 

269 if len(directories) > 1: 

270 _run(directories) 

271 

272 for dest_dir, files in bulk_copies.items(): 

273 cmd = ["cp", "--reflink=auto", "-t", dest_dir] 

274 cmd.extend(files) 

275 _run(cmd) 

276 

277 for source, dest in copies: 

278 _run(["cp", "--reflink=auto", source, dest]) 

279 

280 for source, dest in renames: 

281 print_command("mv", source, dest) 

282 os.rename(source, dest) 

283 

284 for link_target, link_path in symlinks: 

285 print_command("ln", "-s", link_target, link_path) 

286 os.symlink(link_target, link_path) 

287 

288 end_time = datetime.now() 

289 

290 _info(f"Materialization of data.tar finished, took: {end_time - start_time}") 

291 

292 return replacement_manifest_paths 

293 

294 

295def materialize_deb( 

296 control_root_dir: str, 

297 intermediate_manifest_path: Optional[str], 

298 source_date_epoch: int, 

299 dpkg_deb_options: List[str], 

300 is_udeb: bool, 

301 output_dir: str, 

302 may_move_control_files: bool, 

303 may_move_data_files: bool, 

304) -> None: 

305 if not os.path.isfile(f"{control_root_dir}/control"): 

306 _error( 

307 f'The directory "{control_root_dir}" does not look like a package root dir (there is no control file)' 

308 ) 

309 intermediate_manifest: List[TarMember] = parse_manifest(intermediate_manifest_path) 

310 

311 output_packaging_root = os.path.join(output_dir, "deb-root") 

312 os.mkdir(output_dir) 

313 

314 replacement_manifest_paths = _perform_data_tar_materialization( 

315 output_packaging_root, intermediate_manifest, may_move_data_files 

316 ) 

317 for materialization_path, tar_member in reversed(replacement_manifest_paths): 

318 # TODO: Hardlinks should probably skip these commands 

319 if tar_member.path_type != PathType.SYMLINK: 

320 os.chmod(materialization_path, tar_member.mode, follow_symlinks=False) 

321 os.utime( 

322 materialization_path, 

323 (tar_member.mtime, tar_member.mtime), 

324 follow_symlinks=False, 

325 ) 

326 

327 materialized_ctrl_dir = f"{output_packaging_root}/DEBIAN" 

328 if may_move_control_files: 

329 print_command("mv", control_root_dir, materialized_ctrl_dir) 

330 os.rename(control_root_dir, materialized_ctrl_dir) 

331 else: 

332 os.mkdir(materialized_ctrl_dir) 

333 copy_cmd = ["cp", "-a"] 

334 copy_cmd.extend( 

335 os.path.join(control_root_dir, f) for f in os.listdir(control_root_dir) 

336 ) 

337 copy_cmd.append(materialized_ctrl_dir) 

338 _run(copy_cmd) 

339 

340 output_intermediate_manifest( 

341 os.path.join(output_dir, "deb-structure-intermediate-manifest.json"), 

342 [t[1] for t in replacement_manifest_paths], 

343 ) 

344 

345 with open(os.path.join(output_dir, "env-and-cli.json"), "w") as fd: 

346 serial_format = { 

347 "env": { 

348 "SOURCE_DATE_EPOCH": str(source_date_epoch), 

349 "DPKG_DEB_COMPRESSOR_LEVEL": os.environ.get( 

350 "DPKG_DEB_COMPRESSOR_LEVEL" 

351 ), 

352 "DPKG_DEB_COMPRESSOR_TYPE": os.environ.get("DPKG_DEB_COMPRESSOR_TYPE"), 

353 "DPKG_DEB_THREADS_MAX": os.environ.get("DPKG_DEB_THREADS_MAX"), 

354 }, 

355 "cli": {"dpkg-deb": dpkg_deb_options}, 

356 "udeb": is_udeb, 

357 } 

358 json.dump(serial_format, fd) 

359 

360 

361def apply_fs_metadata( 

362 materialized_path: str, 

363 tar_member: TarMember, 

364 apply_ownership: bool, 

365 is_using_fakeroot: bool, 

366) -> None: 

367 if apply_ownership: 

368 os.chown( 

369 materialized_path, tar_member.uid, tar_member.gid, follow_symlinks=False 

370 ) 

371 # To avoid surprises, align these with the manifest. Just in case the transport did not preserve the metadata. 

372 # Also, unsure whether metadata changes cause directory mtimes to change, so resetting them unconditionally 

373 # also prevents that problem. 

374 if tar_member.path_type != PathType.SYMLINK: 

375 os.chmod(materialized_path, tar_member.mode, follow_symlinks=False) 

376 os.utime( 

377 materialized_path, (tar_member.mtime, tar_member.mtime), follow_symlinks=False 

378 ) 

379 if is_using_fakeroot: 

380 st = os.stat(materialized_path, follow_symlinks=False) 

381 if st.st_uid != tar_member.uid or st.st_gid != tar_member.gid: 

382 _error( 

383 'Change of ownership failed. The chown call "succeeded" but stat does not give the right result.' 

384 " Most likely a fakeroot bug. Note, when verifying this, use os.chown + os.stat from python" 

385 " (the chmod/stat shell commands might use a different syscall that fakeroot accurately emulates)" 

386 ) 

387 

388 

389def _dpkg_deb_root_requirements( 

390 intermediate_manifest: List[TarMember], 

391) -> Tuple[List[str], bool, bool]: 

392 needs_root = any(tm.uid != 0 or tm.gid != 0 for tm in intermediate_manifest) 

393 if needs_root: 

394 if os.getuid() != 0: 

395 _error( 

396 'Must be run as root/fakeroot when using the method "dpkg-deb" due to the contents' 

397 ) 

398 is_using_fakeroot = detect_fakeroot() 

399 deb_cmd = ["dpkg-deb"] 

400 _info("Applying ownership, mode, and utime from the intermediate manifest...") 

401 else: 

402 # fakeroot does not matter in this case 

403 is_using_fakeroot = False 

404 deb_cmd = ["dpkg-deb", "--root-owner-group"] 

405 _info("Applying mode and utime from the intermediate manifest...") 

406 return deb_cmd, needs_root, is_using_fakeroot 

407 

408 

409@contextlib.contextmanager 

410def maybe_with_materialized_manifest( 

411 content: Optional[List[TarMember]], 

412) -> Iterator[Optional[str]]: 

413 if content is not None: 

414 with tempfile.NamedTemporaryFile( 

415 prefix="debputy-mat-build", 

416 mode="w+t", 

417 suffix=".json", 

418 encoding="utf-8", 

419 ) as fd: 

420 output_intermediate_manifest_to_fd(fd, content) 

421 fd.flush() 

422 yield fd.name 

423 else: 

424 yield None 

425 

426 

427def _prep_assembled_deb_output_path( 

428 output_path: Optional[str], 

429 materialized_deb_structure: str, 

430 deb_root: str, 

431 method: str, 

432 is_udeb: bool, 

433) -> str: 

434 if output_path is None: 

435 ext = "udeb" if is_udeb else "deb" 

436 output_dir = os.path.join(materialized_deb_structure, "output") 

437 if not os.path.isdir(output_dir): 

438 os.mkdir(output_dir) 

439 output = os.path.join(output_dir, f"{method}.{ext}") 

440 elif os.path.isdir(output_path): 

441 output = os.path.join( 

442 output_path, 

443 compute_output_filename(os.path.join(deb_root, "DEBIAN"), is_udeb), 

444 ) 

445 else: 

446 output = output_path 

447 return output 

448 

449 

450def _apply_env(env: Dict[str, Optional[str]]) -> None: 

451 for name, value in env.items(): 

452 if value is not None: 

453 os.environ[name] = value 

454 else: 

455 try: 

456 del os.environ[name] 

457 except KeyError: 

458 pass 

459 

460 

461def assemble_deb( 

462 materialized_deb_structure: str, 

463 method: str, 

464 output_path: Optional[str], 

465 combined_materialization_and_assembly: bool, 

466) -> None: 

467 deb_root = os.path.join(materialized_deb_structure, "deb-root") 

468 

469 with open(os.path.join(materialized_deb_structure, "env-and-cli.json"), "r") as fd: 

470 serial_format = json.load(fd) 

471 

472 env = serial_format.get("env") or {} 

473 cli = serial_format.get("cli") or {} 

474 is_udeb = serial_format.get("udeb") 

475 source_date_epoch = env.get("SOURCE_DATE_EPOCH") 

476 dpkg_deb_options = cli.get("dpkg-deb") or [] 

477 intermediate_manifest_path = os.path.join( 

478 materialized_deb_structure, "deb-structure-intermediate-manifest.json" 

479 ) 

480 original_intermediate_manifest = TarMember.parse_intermediate_manifest( 

481 intermediate_manifest_path 

482 ) 

483 _info( 

484 "Rebasing relative paths in the intermediate manifest so they are relative to current working directory ..." 

485 ) 

486 intermediate_manifest = [ 

487 ( 

488 tar_member.clone_and_replace( 

489 fs_path=os.path.join(materialized_deb_structure, tar_member.fs_path) 

490 ) 

491 if tar_member.fs_path is not None and not tar_member.fs_path.startswith("/") 

492 else tar_member 

493 ) 

494 for tar_member in original_intermediate_manifest 

495 ] 

496 materialized_manifest = None 

497 if method == "debputy": 

498 materialized_manifest = intermediate_manifest 

499 

500 if source_date_epoch is None: 

501 _error( 

502 "Cannot reproduce the deb. No source date epoch provided in the materialized deb root." 

503 ) 

504 _apply_env(env) 

505 

506 output = _prep_assembled_deb_output_path( 

507 output_path, 

508 materialized_deb_structure, 

509 deb_root, 

510 method, 

511 is_udeb, 

512 ) 

513 

514 with maybe_with_materialized_manifest(materialized_manifest) as tmp_file: 

515 if method == "dpkg-deb": 

516 deb_cmd, needs_root, is_using_fakeroot = _dpkg_deb_root_requirements( 

517 intermediate_manifest 

518 ) 

519 if needs_root or not combined_materialization_and_assembly: 

520 for tar_member in reversed(intermediate_manifest): 

521 p = os.path.join( 

522 deb_root, strip_path_prefix(tar_member.member_path) 

523 ) 

524 apply_fs_metadata(p, tar_member, needs_root, is_using_fakeroot) 

525 elif method == "debputy": 

526 deb_packer = os.path.join(DEBPUTY_ROOT_DIR, "deb_packer.py") 

527 assert tmp_file is not None 

528 deb_cmd = [ 

529 deb_packer, 

530 "--intermediate-package-manifest", 

531 tmp_file, 

532 "--source-date-epoch", 

533 source_date_epoch, 

534 ] 

535 else: 

536 _error(f"Internal error: Unsupported assembly method: {method}") 

537 

538 if is_udeb: 

539 deb_cmd.extend(["-z6", "-Zxz", "-Sextreme"]) 

540 deb_cmd.extend(dpkg_deb_options) 

541 deb_cmd.extend(["--build", deb_root, output]) 

542 start_time = datetime.now() 

543 _run(deb_cmd) 

544 end_time = datetime.now() 

545 _info(f" - assembly command took {end_time - start_time}") 

546 

547 

548def parse_manifest(manifest_path: "Optional[str]") -> "List[TarMember]": 

549 if manifest_path is None: 

550 _error("--intermediate-package-manifest is mandatory for now") 

551 return TarMember.parse_intermediate_manifest(manifest_path) 

552 

553 

554def main() -> None: 

555 setup_logging() 

556 parsed_args = parse_args() 

557 if parsed_args.command == "materialize-deb": 

558 mtime = resolve_source_date_epoch(parsed_args.source_date_epoch) 

559 dpkg_deb_args = parsed_args.upstream_args or [] 

560 output_dir = parsed_args.materialization_output 

561 if os.path.exists(output_dir): 

562 if not parsed_args.discard_existing_output: 

563 _error( 

564 "The output path already exists. Please either choose a non-existing path, delete the path" 

565 " or use --discard-existing-output (to have this command remove it as necessary)." 

566 ) 

567 _info( 

568 f'Removing existing path "{output_dir}" as requested by --discard-existing-output' 

569 ) 

570 _run(["rm", "-fr", output_dir]) 

571 

572 materialize_deb( 

573 parsed_args.control_root_dir, 

574 parsed_args.package_manifest, 

575 mtime, 

576 dpkg_deb_args, 

577 parsed_args.udeb, 

578 output_dir, 

579 parsed_args.may_move_control_files, 

580 parsed_args.may_move_data_files, 

581 ) 

582 

583 if parsed_args.build_method is not None: 

584 assemble_deb( 

585 output_dir, 

586 parsed_args.build_method, 

587 parsed_args.assembled_deb_output, 

588 True, 

589 ) 

590 

591 elif parsed_args.command == "build-materialized-deb": 

592 assemble_deb( 

593 parsed_args.materialized_deb_root_dir, 

594 parsed_args.build_method, 

595 parsed_args.output, 

596 False, 

597 ) 

598 else: 

599 _error(f'Internal error: Unimplemented command "{parsed_args.command}"') 

600 

601 

602if __name__ == "__main__": 

603 main()