Coverage for src/debputy/dh_migration/migration.py: 8%

289 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import json 

2import os 

3import re 

4import subprocess 

5import sys 

6from itertools import chain 

7from typing import ( 

8 Optional, 

9 List, 

10 Set, 

11 FrozenSet, 

12 Tuple, 

13) 

14from collections.abc import Callable, Mapping, Iterable, Container 

15 

16from debian.deb822 import Deb822 

17 

18from debputy.commands.debputy_cmd.context import CommandContext 

19from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

20from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile 

21from debputy.dh.dh_assistant import read_dh_addon_sequences 

22from debputy.dh_migration.migrators import Migrator, MigrationTarget 

23from debputy.dh_migration.migrators_impl import ( 

24 INTEGRATION_MODE_DH_DEBPUTY, 

25 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

26) 

27from debputy.dh_migration.models import ( 

28 FeatureMigration, 

29 AcceptableMigrationIssues, 

30 UnsupportedFeature, 

31 ConflictingChange, 

32) 

33from debputy.highlevel_manifest import HighLevelManifest 

34from debputy.integration_detection import determine_debputy_integration_mode 

35from debputy.manifest_parser.exceptions import ManifestParseException 

36from debputy.plugin.api import VirtualPath 

37from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL 

38from debputy.util import _error, _warn, _info, escape_shell, assume_not_none 

39 

40SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, frozenset[MigrationTarget]] = { 

41 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]), 

42 INTEGRATION_MODE_DH_DEBPUTY: frozenset( 

43 [ 

44 "dh-single-to-multi-binary", 

45 "dh-package-prefixed-config-files", 

46 INTEGRATION_MODE_DH_DEBPUTY, 

47 INTEGRATION_MODE_FULL, 

48 ] 

49 ), 

50 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset( 

51 [ 

52 "dh-single-to-multi-binary", 

53 "dh-package-prefixed-config-files", 

54 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

55 INTEGRATION_MODE_DH_DEBPUTY, 

56 INTEGRATION_MODE_FULL, 

57 ] 

58 ), 

59} 

60 

61 

62def _print_migration_summary( 

63 fo: IOBasedOutputStyling, 

64 migrations: list[FeatureMigration], 

65 compat: int, 

66 min_compat_level: int, 

67 required_plugins: set[str], 

68 requested_plugins: set[str] | None, 

69) -> None: 

70 warning_count = 0 

71 

72 for migration in migrations: 

73 if not migration.anything_to_do: 

74 continue 

75 underline = "-" * len(migration.tagline) 

76 if migration.warnings: 

77 if warning_count: 

78 _warn("") 

79 _warn(f"Summary for migration: {migration.tagline}") 

80 if not fo.optimize_for_screen_reader: 

81 _warn(f"-----------------------{underline}") 

82 warning_count += len(migration.warnings) 

83 for warning in migration.warnings: 

84 _warn(f" * {warning}") 

85 

86 if compat < min_compat_level: 

87 if warning_count: 

88 _warn("") 

89 _warn("Supported debhelper compat check") 

90 if not fo.optimize_for_screen_reader: 

91 _warn("--------------------------------") 

92 warning_count += 1 

93 _warn( 

94 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package" 

95 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}" 

96 " first." 

97 ) 

98 

99 if required_plugins: 

100 if requested_plugins is None: 

101 warning_count += 1 

102 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins) 

103 if warning_count: 

104 _warn("") 

105 _warn("Missing debputy plugin check") 

106 if not fo.optimize_for_screen_reader: 

107 _warn("----------------------------") 

108 _warn( 

109 f"The migration tool could not read d/control and therefore cannot tell if all the required" 

110 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}" 

111 ) 

112 else: 

113 missing_plugins = required_plugins - requested_plugins 

114 if missing_plugins: 

115 warning_count += 1 

116 needed_plugins = ", ".join( 

117 f"debputy-plugin-{n}" for n in missing_plugins 

118 ) 

119 if warning_count: 

120 _warn("") 

121 _warn("Missing debputy plugin check") 

122 if not fo.optimize_for_screen_reader: 

123 _warn("----------------------------") 

124 _warn( 

125 f"The migration tool asserted that the following `debputy` plugins would be required, which" 

126 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}" 

127 ) 

128 

129 if warning_count: 

130 _warn("") 

131 _warn( 

132 f"/!\\ Total number of warnings or manual migrations required: {warning_count}" 

133 ) 

134 

135 

136def _dh_compat_level() -> int | None: 

137 try: 

138 res = subprocess.check_output( 

139 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL 

140 ) 

141 except subprocess.CalledProcessError: 

142 compat = None 

143 else: 

144 try: 

145 compat = json.loads(res)["declared-compat-level"] 

146 except RuntimeError: 

147 compat = None 

148 else: 

149 if not isinstance(compat, int): 

150 compat = None 

151 return compat 

152 

153 

154def _requested_debputy_plugins(debian_dir: VirtualPath) -> set[str] | None: 

155 ctrl_file = debian_dir.get("control") 

156 if not ctrl_file: 

157 return None 

158 

159 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII) 

160 plugins = set() 

161 

162 with ctrl_file.open() as fd: 

163 ctrl = list(Deb822.iter_paragraphs(fd)) 

164 source_paragraph = ctrl[0] if ctrl else {} 

165 

166 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"): 

167 field = source_paragraph.get(f) 

168 if not field: 

169 continue 

170 

171 for dep_clause in (d.strip() for d in field.split(",")): 

172 match = dep_regex.match(dep_clause.strip()) 

173 if not match: 

174 continue 

175 dep = match.group(1) 

176 if not dep.startswith("debputy-plugin-"): 

177 continue 

178 plugins.add(dep[15:]) 

179 return plugins 

180 

181 

182def _check_migration_target( 

183 context: CommandContext, 

184 migration_target: DebputyIntegrationMode | None, 

185) -> DebputyIntegrationMode: 

186 r = read_dh_addon_sequences(context.debian_dir) 

187 if r is not None: 

188 bd_sequences, dr_sequences, _ = r 

189 all_sequences = bd_sequences | dr_sequences 

190 detected_migration_target = determine_debputy_integration_mode( 

191 context.source_package().fields, 

192 all_sequences, 

193 ) 

194 else: 

195 detected_migration_target = None 

196 

197 if migration_target is not None and detected_migration_target is not None: 

198 supported_migrations = SUPPORTED_MIGRATIONS.get( 

199 detected_migration_target, 

200 frozenset([detected_migration_target]), 

201 ) 

202 

203 if ( 

204 migration_target != detected_migration_target 

205 and migration_target not in supported_migrations 

206 ): 

207 _error( 

208 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})" 

209 ) 

210 

211 if migration_target is not None: 

212 resolved_migration_target = migration_target 

213 _info(f'Using "{resolved_migration_target}" as migration (requested)') 

214 else: 

215 if detected_migration_target is not None: 

216 _info( 

217 f'Using "{detected_migration_target}" as migration (based on the packaging)' 

218 ) 

219 else: 

220 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY 

221 _info( 

222 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!' 

223 ) 

224 resolved_migration_target = detected_migration_target 

225 

226 return resolved_migration_target 

227 

228 

229def _read_git_status_lines( 

230 lines: Iterable[bytes], 

231) -> Iterable[tuple[str, str, str | None]]: 

232 line_iter = iter(lines) 

233 while True: 

234 try: 

235 line = next(line_iter) 

236 except StopIteration: 

237 return 

238 if not line: 

239 # We get a final empty line 

240 continue 

241 if len(line) < 4: 

242 _error( 

243 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})" 

244 ) 

245 status_marker = line[0:2].decode("utf-8") 

246 if line[2] != 32: # Space 

247 _error( 

248 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)" 

249 ) 

250 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")): 

251 try: 

252 second_filename = next(line_iter).decode("utf-8") 

253 except StopIteration: 

254 _error( 

255 "Internal error: Expected one more line of output from `git status` but it was not there" 

256 ) 

257 else: 

258 second_filename = None 

259 filename = line[3:].decode("utf-8") 

260 yield status_marker, filename, second_filename 

261 

262 

263def _git_status() -> tuple[bool, Container[str]]: 

264 try: 

265 top_level = ( 

266 subprocess.check_output( 

267 ["git", "rev-parse", "--show-toplevel"], 

268 stderr=subprocess.DEVNULL, 

269 cwd="debian", 

270 ) 

271 .strip() 

272 .decode("utf-8") 

273 ) 

274 except (subprocess.CalledProcessError, FileNotFoundError): 

275 return False, frozenset() 

276 

277 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level): 

278 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method. 

279 _error( 

280 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue." 

281 ) 

282 

283 try: 

284 r = subprocess.check_output( 

285 ["git", "status", "--porcelain=v1", "-z"], 

286 stderr=subprocess.DEVNULL, 

287 cwd="debian", 

288 ) 

289 except subprocess.CalledProcessError: 

290 _error( 

291 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway." 

292 ) 

293 except FileNotFoundError: 

294 _error( 

295 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway." 

296 ) 

297 

298 untracked_files = set() 

299 for status_marker, filename, second_filename in _read_git_status_lines( 

300 r.split(b"\0") 

301 ): 

302 if status_marker not in ("??", "!!"): 

303 _error( 

304 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway." 

305 ) 

306 # There is never a second file for `??` or `!!` 

307 assert second_filename is None 

308 untracked_files.add(filename) 

309 return True, untracked_files 

310 

311 

312def _check_vcs_clashes( 

313 manifest: HighLevelManifest, 

314 migrations: list[FeatureMigration], 

315 untracked_files: Container[str], 

316) -> None: 

317 all_affected_files = [] 

318 if any(m.successful_manifest_changes for m in migrations): 

319 all_affected_files.append(manifest.manifest_path) 

320 

321 for previous_path, new_path in ( 

322 p for m in migrations for p in m.rename_paths_on_success 

323 ): 

324 all_affected_files.append(previous_path) 

325 all_affected_files.append(new_path) 

326 

327 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success) 

328 

329 clashing_paths = [p for p in all_affected_files if p in untracked_files] 

330 

331 if clashing_paths: 

332 print(file=sys.stderr) 

333 _warn( 

334 "The following untracked or ignored paths would be affected by this migration" 

335 ) 

336 _warn("") 

337 for path in clashing_paths: 

338 _warn(f" * {path}") 

339 _warn("") 

340 _warn("Since they are not tracked, the VCS cannot undo the changes to them") 

341 _warn("if the migration continued") 

342 print(file=sys.stderr) 

343 _error( 

344 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs" 

345 ) 

346 

347 

348def perform_migration( 

349 fo: IOBasedOutputStyling, 

350 manifest: HighLevelManifest, 

351 acceptable_migration_issues: AcceptableMigrationIssues, 

352 permit_destructive_changes: bool | None, 

353 migration_target: DebputyIntegrationMode, 

354 manifest_parser_factory: Callable[[str], HighLevelManifest], 

355 migrators: list[Migrator], 

356 *, 

357 ignore_vcs: bool = False, 

358) -> None: 

359 migrations = [] 

360 compat = _dh_compat_level() 

361 if compat is None: 

362 _error( 

363 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")' 

364 ) 

365 

366 debian_dir = manifest.debian_dir 

367 mutable_manifest = assume_not_none(manifest.mutable_manifest) 

368 

369 if not ignore_vcs: 

370 uses_git, untracked_files = _git_status() 

371 else: 

372 uses_git = False 

373 untracked_files = frozenset() 

374 

375 if permit_destructive_changes is None and uses_git: 

376 permit_destructive_changes = True 

377 

378 try: 

379 for migrator in migrators: 

380 feature_migration = FeatureMigration(migrator.__name__, fo) 

381 migrator( 

382 debian_dir, 

383 manifest, 

384 acceptable_migration_issues, 

385 feature_migration, 

386 migration_target, 

387 ) 

388 migrations.append(feature_migration) 

389 except CannotEmulateExecutableDHConfigFile as e: 

390 _error( 

391 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}" 

392 ) 

393 except UnsupportedFeature as e: 

394 msg = ( 

395 f"Unable to migrate automatically due to missing features in debputy. The feature is:" 

396 f"\n\n * {e.message}" 

397 ) 

398 keys = e.issue_keys 

399 if keys: 

400 primary_key = keys[0] 

401 alt_keys = "" 

402 if len(keys) > 1: 

403 alt_keys = ( 

404 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some' 

405 " of these may cover more cases." 

406 ) 

407 msg += ( 

408 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again." 

409 " However, you should only do that if you believe you can replace the functionality manually" 

410 f" or the usage is obsolete / can be removed. {alt_keys}" 

411 ) 

412 _error(msg) 

413 except ConflictingChange as e: 

414 _error( 

415 "The migration tool detected a conflict data being migrated and data already migrated / in the existing" 

416 "manifest." 

417 f"\n\n * {e.message}" 

418 "\n\nPlease review the situation and resolve the conflict manually." 

419 ) 

420 

421 min_compat = max( 

422 (m.assumed_compat for m in migrations if m.assumed_compat is not None), 

423 default=0, 

424 ) 

425 

426 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues: 

427 # The migration summary special-cases the compat mismatch and warns for us. 

428 _error( 

429 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on" 

430 f" compat {compat}. This may lead to incorrect result." 

431 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and" 

432 f" try again, if you want to continue regardless." 

433 ) 

434 

435 requested_plugins = _requested_debputy_plugins(debian_dir) 

436 required_plugins: set[str] = set() 

437 required_plugins.update( 

438 chain.from_iterable( 

439 m.required_plugins for m in migrations if m.required_plugins 

440 ) 

441 ) 

442 

443 if uses_git: 

444 _check_vcs_clashes(manifest, migrations, untracked_files) 

445 

446 _print_migration_summary( 

447 fo, 

448 migrations, 

449 compat, 

450 min_compat, 

451 required_plugins, 

452 requested_plugins, 

453 ) 

454 migration_count = sum((m.performed_changes for m in migrations), 0) 

455 

456 if not migration_count: 

457 _info( 

458 "debputy was not able to find any (supported) migrations that it could perform for you." 

459 ) 

460 return 

461 

462 if any(m.successful_manifest_changes for m in migrations): 

463 new_manifest_path = manifest.manifest_path + ".new" 

464 

465 with open(new_manifest_path, "w") as fd: 

466 mutable_manifest.write_to(fd) 

467 

468 try: 

469 _info("Verifying the new manifest") 

470 manifest_parser_factory(new_manifest_path) 

471 except ManifestParseException as e: 

472 raise AssertionError( 

473 "Could not parse the manifest generated from the migrator" 

474 ) from e 

475 

476 if permit_destructive_changes: 

477 if os.path.isfile(manifest.manifest_path): 

478 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig") 

479 os.rename(new_manifest_path, manifest.manifest_path) 

480 if uses_git: 

481 subprocess.check_call(["git", "add", manifest.manifest_path]) 

482 _info(f"Updated manifest {manifest.manifest_path}") 

483 else: 

484 _info( 

485 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"' 

486 " to activate it)" 

487 ) 

488 else: 

489 _info("No manifest changes detected; skipping update of manifest.") 

490 

491 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0) 

492 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0) 

493 

494 if renames: 

495 if permit_destructive_changes: 

496 _info("Paths being renamed:") 

497 else: 

498 _info("Migration *would* rename the following paths:") 

499 prefix = "git " if uses_git else "" 

500 for previous_path, new_path in ( 

501 p for m in migrations for p in m.rename_paths_on_success 

502 ): 

503 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}") 

504 

505 if removals: 

506 if permit_destructive_changes: 

507 _info("Removals:") 

508 else: 

509 _info("Migration *would* remove the following files:") 

510 cmd = "git rm" if uses_git else "rm -f" 

511 for path in (p for m in migrations for p in m.remove_paths_on_success): 

512 _info(f" {cmd} {escape_shell(path)}") 

513 

514 if permit_destructive_changes is None: 

515 print() 

516 _info( 

517 "If you would like to perform the migration, please re-run with --apply-changes." 

518 ) 

519 elif permit_destructive_changes: 

520 _apply_renames_and_removals(migrations, uses_git) 

521 

522 print() 

523 _info("Migrations performed successfully") 

524 print() 

525 _info( 

526 "Remember to validate the resulting binary packages after rebuilding with debputy" 

527 ) 

528 if uses_git: 

529 _info( 

530 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them." 

531 ) 

532 else: 

533 print() 

534 _info("No migrations performed as requested") 

535 

536 

537def _apply_renames_and_removals( 

538 migrations: list[FeatureMigration], 

539 uses_git: bool, 

540) -> None: 

541 for previous_path, new_path in ( 

542 p for m in migrations for p in m.rename_paths_on_success 

543 ): 

544 if uses_git: 

545 subprocess.check_call(["git", "mv", previous_path, new_path]) 

546 else: 

547 os.rename(previous_path, new_path) 

548 

549 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success] 

550 if uses_git and files_being_removed: 

551 command = ["git", "rm"] 

552 command.extend(files_being_removed) 

553 subprocess.check_call(command) 

554 elif files_being_removed: 

555 for path in (p for m in migrations for p in m.remove_paths_on_success): 

556 os.unlink(path)