Coverage for src/debputy/dh_migration/migration.py: 7%

289 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-15 18:18 +0000

1import json 

2import os 

3import re 

4import subprocess 

5import sys 

6from itertools import chain 

7from typing import ( 

8 Optional, 

9 List, 

10 Callable, 

11 Set, 

12 Mapping, 

13 FrozenSet, 

14 Tuple, 

15 Iterable, 

16 Container, 

17) 

18 

19from debian.deb822 import Deb822 

20 

21from debputy.commands.debputy_cmd.context import CommandContext 

22from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

23from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile 

24from debputy.dh.dh_assistant import read_dh_addon_sequences 

25from debputy.dh_migration.migrators import Migrator, MigrationTarget 

26from debputy.dh_migration.migrators_impl import ( 

27 INTEGRATION_MODE_DH_DEBPUTY, 

28 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

29) 

30from debputy.dh_migration.models import ( 

31 FeatureMigration, 

32 AcceptableMigrationIssues, 

33 UnsupportedFeature, 

34 ConflictingChange, 

35) 

36from debputy.highlevel_manifest import HighLevelManifest 

37from debputy.integration_detection import determine_debputy_integration_mode 

38from debputy.manifest_parser.exceptions import ManifestParseException 

39from debputy.plugin.api import VirtualPath 

40from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL 

41from debputy.util import _error, _warn, _info, escape_shell, assume_not_none 

42 

43SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, FrozenSet[MigrationTarget]] = { 

44 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]), 

45 INTEGRATION_MODE_DH_DEBPUTY: frozenset( 

46 [ 

47 "dh-single-to-multi-binary", 

48 "dh-package-prefixed-config-files", 

49 INTEGRATION_MODE_DH_DEBPUTY, 

50 INTEGRATION_MODE_FULL, 

51 ] 

52 ), 

53 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset( 

54 [ 

55 "dh-single-to-multi-binary", 

56 "dh-package-prefixed-config-files", 

57 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

58 INTEGRATION_MODE_DH_DEBPUTY, 

59 INTEGRATION_MODE_FULL, 

60 ] 

61 ), 

62} 

63 

64 

65def _print_migration_summary( 

66 fo: IOBasedOutputStyling, 

67 migrations: List[FeatureMigration], 

68 compat: int, 

69 min_compat_level: int, 

70 required_plugins: Set[str], 

71 requested_plugins: Optional[Set[str]], 

72) -> None: 

73 warning_count = 0 

74 

75 for migration in migrations: 

76 if not migration.anything_to_do: 

77 continue 

78 underline = "-" * len(migration.tagline) 

79 if migration.warnings: 

80 if warning_count: 

81 _warn("") 

82 _warn(f"Summary for migration: {migration.tagline}") 

83 if not fo.optimize_for_screen_reader: 

84 _warn(f"-----------------------{underline}") 

85 warning_count += len(migration.warnings) 

86 for warning in migration.warnings: 

87 _warn(f" * {warning}") 

88 

89 if compat < min_compat_level: 

90 if warning_count: 

91 _warn("") 

92 _warn("Supported debhelper compat check") 

93 if not fo.optimize_for_screen_reader: 

94 _warn("--------------------------------") 

95 warning_count += 1 

96 _warn( 

97 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package" 

98 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}" 

99 " first." 

100 ) 

101 

102 if required_plugins: 

103 if requested_plugins is None: 

104 warning_count += 1 

105 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins) 

106 if warning_count: 

107 _warn("") 

108 _warn("Missing debputy plugin check") 

109 if not fo.optimize_for_screen_reader: 

110 _warn("----------------------------") 

111 _warn( 

112 f"The migration tool could not read d/control and therefore cannot tell if all the required" 

113 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}" 

114 ) 

115 else: 

116 missing_plugins = required_plugins - requested_plugins 

117 if missing_plugins: 

118 warning_count += 1 

119 needed_plugins = ", ".join( 

120 f"debputy-plugin-{n}" for n in missing_plugins 

121 ) 

122 if warning_count: 

123 _warn("") 

124 _warn("Missing debputy plugin check") 

125 if not fo.optimize_for_screen_reader: 

126 _warn("----------------------------") 

127 _warn( 

128 f"The migration tool asserted that the following `debputy` plugins would be required, which" 

129 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}" 

130 ) 

131 

132 if warning_count: 

133 _warn("") 

134 _warn( 

135 f"/!\\ Total number of warnings or manual migrations required: {warning_count}" 

136 ) 

137 

138 

139def _dh_compat_level() -> Optional[int]: 

140 try: 

141 res = subprocess.check_output( 

142 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL 

143 ) 

144 except subprocess.CalledProcessError: 

145 compat = None 

146 else: 

147 try: 

148 compat = json.loads(res)["declared-compat-level"] 

149 except RuntimeError: 

150 compat = None 

151 else: 

152 if not isinstance(compat, int): 

153 compat = None 

154 return compat 

155 

156 

157def _requested_debputy_plugins(debian_dir: VirtualPath) -> Optional[Set[str]]: 

158 ctrl_file = debian_dir.get("control") 

159 if not ctrl_file: 

160 return None 

161 

162 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII) 

163 plugins = set() 

164 

165 with ctrl_file.open() as fd: 

166 ctrl = list(Deb822.iter_paragraphs(fd)) 

167 source_paragraph = ctrl[0] if ctrl else {} 

168 

169 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"): 

170 field = source_paragraph.get(f) 

171 if not field: 

172 continue 

173 

174 for dep_clause in (d.strip() for d in field.split(",")): 

175 match = dep_regex.match(dep_clause.strip()) 

176 if not match: 

177 continue 

178 dep = match.group(1) 

179 if not dep.startswith("debputy-plugin-"): 

180 continue 

181 plugins.add(dep[15:]) 

182 return plugins 

183 

184 

185def _check_migration_target( 

186 context: CommandContext, 

187 migration_target: Optional[DebputyIntegrationMode], 

188) -> DebputyIntegrationMode: 

189 r = read_dh_addon_sequences(context.debian_dir) 

190 if r is not None: 

191 bd_sequences, dr_sequences, _ = r 

192 all_sequences = bd_sequences | dr_sequences 

193 detected_migration_target = determine_debputy_integration_mode( 

194 context.source_package().fields, 

195 all_sequences, 

196 ) 

197 else: 

198 detected_migration_target = None 

199 

200 if migration_target is not None and detected_migration_target is not None: 

201 supported_migrations = SUPPORTED_MIGRATIONS.get( 

202 detected_migration_target, 

203 frozenset([detected_migration_target]), 

204 ) 

205 

206 if ( 

207 migration_target != detected_migration_target 

208 and migration_target not in supported_migrations 

209 ): 

210 _error( 

211 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})" 

212 ) 

213 

214 if migration_target is not None: 

215 resolved_migration_target = migration_target 

216 _info(f'Using "{resolved_migration_target}" as migration (requested)') 

217 else: 

218 if detected_migration_target is not None: 

219 _info( 

220 f'Using "{detected_migration_target}" as migration (based on the packaging)' 

221 ) 

222 else: 

223 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY 

224 _info( 

225 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!' 

226 ) 

227 resolved_migration_target = detected_migration_target 

228 

229 return resolved_migration_target 

230 

231 

232def _read_git_status_lines( 

233 lines: Iterable[bytes], 

234) -> Iterable[Tuple[str, str, Optional[str]]]: 

235 line_iter = iter(lines) 

236 while True: 

237 try: 

238 line = next(line_iter) 

239 except StopIteration: 

240 return 

241 if not line: 

242 # We get a final empty line 

243 continue 

244 if len(line) < 4: 

245 _error( 

246 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})" 

247 ) 

248 status_marker = line[0:2].decode("utf-8") 

249 if line[2] != 32: # Space 

250 _error( 

251 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)" 

252 ) 

253 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")): 

254 try: 

255 second_filename = next(line_iter).decode("utf-8") 

256 except StopIteration: 

257 _error( 

258 "Internal error: Expected one more line of output from `git status` but it was not there" 

259 ) 

260 else: 

261 second_filename = None 

262 filename = line[3:].decode("utf-8") 

263 yield status_marker, filename, second_filename 

264 

265 

266def _git_status() -> Tuple[bool, Container[str]]: 

267 try: 

268 top_level = ( 

269 subprocess.check_output( 

270 ["git", "rev-parse", "--show-toplevel"], 

271 stderr=subprocess.DEVNULL, 

272 cwd="debian", 

273 ) 

274 .strip() 

275 .decode("utf-8") 

276 ) 

277 except (subprocess.CalledProcessError, FileNotFoundError): 

278 return False, frozenset() 

279 

280 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level): 

281 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method. 

282 _error( 

283 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue." 

284 ) 

285 

286 try: 

287 r = subprocess.check_output( 

288 ["git", "status", "--porcelain=v1", "-z"], 

289 stderr=subprocess.DEVNULL, 

290 cwd="debian", 

291 ) 

292 except subprocess.CalledProcessError: 

293 _error( 

294 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway." 

295 ) 

296 except FileNotFoundError: 

297 _error( 

298 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway." 

299 ) 

300 

301 untracked_files = set() 

302 for status_marker, filename, second_filename in _read_git_status_lines( 

303 r.split(b"\0") 

304 ): 

305 if status_marker not in ("??", "!!"): 

306 _error( 

307 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway." 

308 ) 

309 # There is never a second file for `??` or `!!` 

310 assert second_filename is None 

311 untracked_files.add(filename) 

312 return True, untracked_files 

313 

314 

315def _check_vcs_clashes( 

316 manifest: HighLevelManifest, 

317 migrations: List[FeatureMigration], 

318 untracked_files: Container[str], 

319) -> None: 

320 all_affected_files = [] 

321 if any(m.successful_manifest_changes for m in migrations): 

322 all_affected_files.append(manifest.manifest_path) 

323 

324 for previous_path, new_path in ( 

325 p for m in migrations for p in m.rename_paths_on_success 

326 ): 

327 all_affected_files.append(previous_path) 

328 all_affected_files.append(new_path) 

329 

330 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success) 

331 

332 clashing_paths = [p for p in all_affected_files if p in untracked_files] 

333 

334 if clashing_paths: 

335 print(file=sys.stderr) 

336 _warn( 

337 "The following untracked or ignored paths would be affected by this migration" 

338 ) 

339 _warn("") 

340 for path in clashing_paths: 

341 _warn(f" * {path}") 

342 _warn("") 

343 _warn("Since they are not tracked, the VCS cannot undo the changes to them") 

344 _warn("if the migration continued") 

345 print(file=sys.stderr) 

346 _error( 

347 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs" 

348 ) 

349 

350 

351def perform_migration( 

352 fo: IOBasedOutputStyling, 

353 manifest: HighLevelManifest, 

354 acceptable_migration_issues: AcceptableMigrationIssues, 

355 permit_destructive_changes: Optional[bool], 

356 migration_target: DebputyIntegrationMode, 

357 manifest_parser_factory: Callable[[str], HighLevelManifest], 

358 migrators: List[Migrator], 

359 *, 

360 ignore_vcs: bool = False, 

361) -> None: 

362 migrations = [] 

363 compat = _dh_compat_level() 

364 if compat is None: 

365 _error( 

366 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")' 

367 ) 

368 

369 debian_dir = manifest.debian_dir 

370 mutable_manifest = assume_not_none(manifest.mutable_manifest) 

371 

372 if not ignore_vcs: 

373 uses_git, untracked_files = _git_status() 

374 else: 

375 uses_git = False 

376 untracked_files = frozenset() 

377 

378 if permit_destructive_changes is None and uses_git: 

379 permit_destructive_changes = True 

380 

381 try: 

382 for migrator in migrators: 

383 feature_migration = FeatureMigration(migrator.__name__, fo) 

384 migrator( 

385 debian_dir, 

386 manifest, 

387 acceptable_migration_issues, 

388 feature_migration, 

389 migration_target, 

390 ) 

391 migrations.append(feature_migration) 

392 except CannotEmulateExecutableDHConfigFile as e: 

393 _error( 

394 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}" 

395 ) 

396 except UnsupportedFeature as e: 

397 msg = ( 

398 f"Unable to migrate automatically due to missing features in debputy. The feature is:" 

399 f"\n\n * {e.message}" 

400 ) 

401 keys = e.issue_keys 

402 if keys: 

403 primary_key = keys[0] 

404 alt_keys = "" 

405 if len(keys) > 1: 

406 alt_keys = ( 

407 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some' 

408 " of these may cover more cases." 

409 ) 

410 msg += ( 

411 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again." 

412 " However, you should only do that if you believe you can replace the functionality manually" 

413 f" or the usage is obsolete / can be removed. {alt_keys}" 

414 ) 

415 _error(msg) 

416 except ConflictingChange as e: 

417 _error( 

418 "The migration tool detected a conflict data being migrated and data already migrated / in the existing" 

419 "manifest." 

420 f"\n\n * {e.message}" 

421 "\n\nPlease review the situation and resolve the conflict manually." 

422 ) 

423 

424 # We start on compat 12 for arch:any due to the new dh_makeshlibs and dh_installinit default 

425 # For arch:any, the min is compat 14 due to `dh_dwz` being removed. 

426 min_compat = 14 if any(not p.is_arch_all for p in manifest.all_packages) else 12 

427 min_compat = max( 

428 (m.assumed_compat for m in migrations if m.assumed_compat is not None), 

429 default=min_compat, 

430 ) 

431 

432 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues: 

433 # The migration summary special-cases the compat mismatch and warns for us. 

434 _error( 

435 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on" 

436 f" compat {compat}. This may lead to incorrect result." 

437 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and" 

438 f" try again, if you want to continue regardless." 

439 ) 

440 

441 requested_plugins = _requested_debputy_plugins(debian_dir) 

442 required_plugins: Set[str] = set() 

443 required_plugins.update( 

444 chain.from_iterable( 

445 m.required_plugins for m in migrations if m.required_plugins 

446 ) 

447 ) 

448 

449 if uses_git: 

450 _check_vcs_clashes(manifest, migrations, untracked_files) 

451 

452 _print_migration_summary( 

453 fo, 

454 migrations, 

455 compat, 

456 min_compat, 

457 required_plugins, 

458 requested_plugins, 

459 ) 

460 migration_count = sum((m.performed_changes for m in migrations), 0) 

461 

462 if not migration_count: 

463 _info( 

464 "debputy was not able to find any (supported) migrations that it could perform for you." 

465 ) 

466 return 

467 

468 if any(m.successful_manifest_changes for m in migrations): 

469 new_manifest_path = manifest.manifest_path + ".new" 

470 

471 with open(new_manifest_path, "w") as fd: 

472 mutable_manifest.write_to(fd) 

473 

474 try: 

475 _info("Verifying the new manifest") 

476 manifest_parser_factory(new_manifest_path) 

477 except ManifestParseException as e: 

478 raise AssertionError( 

479 "Could not parse the manifest generated from the migrator" 

480 ) from e 

481 

482 if permit_destructive_changes: 

483 if os.path.isfile(manifest.manifest_path): 

484 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig") 

485 os.rename(new_manifest_path, manifest.manifest_path) 

486 if uses_git: 

487 subprocess.check_call(["git", "add", manifest.manifest_path]) 

488 _info(f"Updated manifest {manifest.manifest_path}") 

489 else: 

490 _info( 

491 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"' 

492 " to activate it)" 

493 ) 

494 else: 

495 _info("No manifest changes detected; skipping update of manifest.") 

496 

497 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0) 

498 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0) 

499 

500 if renames: 

501 if permit_destructive_changes: 

502 _info("Paths being renamed:") 

503 else: 

504 _info("Migration *would* rename the following paths:") 

505 prefix = "git " if uses_git else "" 

506 for previous_path, new_path in ( 

507 p for m in migrations for p in m.rename_paths_on_success 

508 ): 

509 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}") 

510 

511 if removals: 

512 if permit_destructive_changes: 

513 _info("Removals:") 

514 else: 

515 _info("Migration *would* remove the following files:") 

516 cmd = "git rm" if uses_git else "rm -f" 

517 for path in (p for m in migrations for p in m.remove_paths_on_success): 

518 _info(f" {cmd} {escape_shell(path)}") 

519 

520 if permit_destructive_changes is None: 

521 print() 

522 _info( 

523 "If you would like to perform the migration, please re-run with --apply-changes." 

524 ) 

525 elif permit_destructive_changes: 

526 _apply_renames_and_removals(migrations, uses_git) 

527 

528 print() 

529 _info("Migrations performed successfully") 

530 print() 

531 _info( 

532 "Remember to validate the resulting binary packages after rebuilding with debputy" 

533 ) 

534 if uses_git: 

535 _info( 

536 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them." 

537 ) 

538 else: 

539 print() 

540 _info("No migrations performed as requested") 

541 

542 

543def _apply_renames_and_removals( 

544 migrations: List[FeatureMigration], 

545 uses_git: bool, 

546) -> None: 

547 for previous_path, new_path in ( 

548 p for m in migrations for p in m.rename_paths_on_success 

549 ): 

550 if uses_git: 

551 subprocess.check_call(["git", "mv", previous_path, new_path]) 

552 else: 

553 os.rename(previous_path, new_path) 

554 

555 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success] 

556 if uses_git and files_being_removed: 

557 command = ["git", "rm"] 

558 command.extend(files_being_removed) 

559 subprocess.check_call(command) 

560 elif files_being_removed: 

561 for path in (p for m in migrations for p in m.remove_paths_on_success): 

562 os.unlink(path)