Coverage for src/debputy/dh_migration/migration.py: 8%

290 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-19 09:24 +0000

1import json 

2import os 

3import re 

4import subprocess 

5import sys 

6from itertools import chain 

7from typing import ( 

8 Optional, 

9 List, 

10 Set, 

11 FrozenSet, 

12 Tuple, 

13) 

14from collections.abc import Callable, Mapping, Iterable, Container 

15 

16from debian.deb822 import Deb822 

17 

18from debputy.commands.debputy_cmd.context import CommandContext 

19from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

20from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile 

21from debputy.dh.dh_assistant import read_dh_addon_sequences 

22from debputy.dh_migration.migrators import Migrator, MigrationTarget 

23from debputy.dh_migration.migrators_impl import ( 

24 INTEGRATION_MODE_DH_DEBPUTY, 

25 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

26) 

27from debputy.dh_migration.models import ( 

28 FeatureMigration, 

29 AcceptableMigrationIssues, 

30 UnsupportedFeature, 

31 ConflictingChange, 

32 MigrationRequest, 

33) 

34from debputy.highlevel_manifest import HighLevelManifest 

35from debputy.integration_detection import determine_debputy_integration_mode 

36from debputy.manifest_parser.exceptions import ManifestParseException 

37from debputy.plugin.api import VirtualPath 

38from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL 

39from debputy.util import _error, _warn, _info, escape_shell, assume_not_none 

40 

41SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, frozenset[MigrationTarget]] = { 

42 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]), 

43 INTEGRATION_MODE_DH_DEBPUTY: frozenset( 

44 [ 

45 "dh-single-to-multi-binary", 

46 "dh-package-prefixed-config-files", 

47 INTEGRATION_MODE_DH_DEBPUTY, 

48 INTEGRATION_MODE_FULL, 

49 ] 

50 ), 

51 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset( 

52 [ 

53 "dh-single-to-multi-binary", 

54 "dh-package-prefixed-config-files", 

55 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

56 INTEGRATION_MODE_DH_DEBPUTY, 

57 INTEGRATION_MODE_FULL, 

58 ] 

59 ), 

60} 

61 

62 

63def _print_migration_summary( 

64 fo: IOBasedOutputStyling, 

65 migrations: list[FeatureMigration], 

66 compat: int, 

67 min_compat_level: int, 

68 required_plugins: set[str], 

69 requested_plugins: set[str] | None, 

70) -> None: 

71 warning_count = 0 

72 

73 for migration in migrations: 

74 if not migration.anything_to_do: 

75 continue 

76 underline = "-" * len(migration.tagline) 

77 if migration.warnings: 

78 if warning_count: 

79 _warn("") 

80 _warn(f"Summary for migration: {migration.tagline}") 

81 if not fo.optimize_for_screen_reader: 

82 _warn(f"-----------------------{underline}") 

83 warning_count += len(migration.warnings) 

84 for warning in migration.warnings: 

85 _warn(f" * {warning}") 

86 

87 if compat < min_compat_level: 

88 if warning_count: 

89 _warn("") 

90 _warn("Supported debhelper compat check") 

91 if not fo.optimize_for_screen_reader: 

92 _warn("--------------------------------") 

93 warning_count += 1 

94 _warn( 

95 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package" 

96 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}" 

97 " first." 

98 ) 

99 

100 if required_plugins: 

101 if requested_plugins is None: 

102 warning_count += 1 

103 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins) 

104 if warning_count: 

105 _warn("") 

106 _warn("Missing debputy plugin check") 

107 if not fo.optimize_for_screen_reader: 

108 _warn("----------------------------") 

109 _warn( 

110 f"The migration tool could not read d/control and therefore cannot tell if all the required" 

111 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}" 

112 ) 

113 else: 

114 missing_plugins = required_plugins - requested_plugins 

115 if missing_plugins: 

116 warning_count += 1 

117 needed_plugins = ", ".join( 

118 f"debputy-plugin-{n}" for n in missing_plugins 

119 ) 

120 if warning_count: 

121 _warn("") 

122 _warn("Missing debputy plugin check") 

123 if not fo.optimize_for_screen_reader: 

124 _warn("----------------------------") 

125 _warn( 

126 f"The migration tool asserted that the following `debputy` plugins would be required, which" 

127 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}" 

128 ) 

129 

130 if warning_count: 

131 _warn("") 

132 _warn( 

133 f"/!\\ Total number of warnings or manual migrations required: {warning_count}" 

134 ) 

135 

136 

137def _dh_compat_level() -> int | None: 

138 try: 

139 res = subprocess.check_output( 

140 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL 

141 ) 

142 except subprocess.CalledProcessError: 

143 compat = None 

144 else: 

145 try: 

146 compat = json.loads(res)["declared-compat-level"] 

147 except RuntimeError: 

148 compat = None 

149 else: 

150 if not isinstance(compat, int): 

151 compat = None 

152 return compat 

153 

154 

155def _requested_debputy_plugins(debian_dir: VirtualPath) -> set[str] | None: 

156 ctrl_file = debian_dir.get("control") 

157 if not ctrl_file: 

158 return None 

159 

160 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII) 

161 plugins = set() 

162 

163 with ctrl_file.open() as fd: 

164 ctrl = list(Deb822.iter_paragraphs(fd)) 

165 source_paragraph = ctrl[0] if ctrl else {} 

166 

167 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"): 

168 field = source_paragraph.get(f) 

169 if not field: 

170 continue 

171 

172 for dep_clause in (d.strip() for d in field.split(",")): 

173 match = dep_regex.match(dep_clause.strip()) 

174 if not match: 

175 continue 

176 dep = match.group(1) 

177 if not dep.startswith("debputy-plugin-"): 

178 continue 

179 plugins.add(dep[15:]) 

180 return plugins 

181 

182 

183def _check_migration_target( 

184 context: CommandContext, 

185 migration_target: DebputyIntegrationMode | None, 

186) -> DebputyIntegrationMode: 

187 r = read_dh_addon_sequences(context.debian_dir) 

188 if r is not None: 

189 bd_sequences, dr_sequences, _ = r 

190 all_sequences = bd_sequences | dr_sequences 

191 detected_migration_target = determine_debputy_integration_mode( 

192 context.source_package().fields, 

193 all_sequences, 

194 ) 

195 else: 

196 detected_migration_target = None 

197 

198 if migration_target is not None and detected_migration_target is not None: 

199 supported_migrations = SUPPORTED_MIGRATIONS.get( 

200 detected_migration_target, 

201 frozenset([detected_migration_target]), 

202 ) 

203 

204 if ( 

205 migration_target != detected_migration_target 

206 and migration_target not in supported_migrations 

207 ): 

208 _error( 

209 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})" 

210 ) 

211 

212 if migration_target is not None: 

213 resolved_migration_target = migration_target 

214 _info(f'Using "{resolved_migration_target}" as migration (requested)') 

215 else: 

216 if detected_migration_target is not None: 

217 _info( 

218 f'Using "{detected_migration_target}" as migration (based on the packaging)' 

219 ) 

220 else: 

221 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY 

222 _info( 

223 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!' 

224 ) 

225 resolved_migration_target = detected_migration_target 

226 

227 return resolved_migration_target 

228 

229 

230def _read_git_status_lines( 

231 lines: Iterable[bytes], 

232) -> Iterable[tuple[str, str, str | None]]: 

233 line_iter = iter(lines) 

234 while True: 

235 try: 

236 line = next(line_iter) 

237 except StopIteration: 

238 return 

239 if not line: 

240 # We get a final empty line 

241 continue 

242 if len(line) < 4: 

243 _error( 

244 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})" 

245 ) 

246 status_marker = line[0:2].decode("utf-8") 

247 if line[2] != 32: # Space 

248 _error( 

249 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)" 

250 ) 

251 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")): 

252 try: 

253 second_filename = next(line_iter).decode("utf-8") 

254 except StopIteration: 

255 _error( 

256 "Internal error: Expected one more line of output from `git status` but it was not there" 

257 ) 

258 else: 

259 second_filename = None 

260 filename = line[3:].decode("utf-8") 

261 yield status_marker, filename, second_filename 

262 

263 

264def _git_status() -> tuple[bool, Container[str]]: 

265 try: 

266 top_level = ( 

267 subprocess.check_output( 

268 ["git", "rev-parse", "--show-toplevel"], 

269 stderr=subprocess.DEVNULL, 

270 cwd="debian", 

271 ) 

272 .strip() 

273 .decode("utf-8") 

274 ) 

275 except (subprocess.CalledProcessError, FileNotFoundError): 

276 return False, frozenset() 

277 

278 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level): 

279 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method. 

280 _error( 

281 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue." 

282 ) 

283 

284 try: 

285 r = subprocess.check_output( 

286 ["git", "status", "--porcelain=v1", "-z"], 

287 stderr=subprocess.DEVNULL, 

288 cwd="debian", 

289 ) 

290 except subprocess.CalledProcessError: 

291 _error( 

292 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway." 

293 ) 

294 except FileNotFoundError: 

295 _error( 

296 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway." 

297 ) 

298 

299 untracked_files = set() 

300 for status_marker, filename, second_filename in _read_git_status_lines( 

301 r.split(b"\0") 

302 ): 

303 if status_marker not in ("??", "!!"): 

304 _error( 

305 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway." 

306 ) 

307 # There is never a second file for `??` or `!!` 

308 assert second_filename is None 

309 untracked_files.add(filename) 

310 return True, untracked_files 

311 

312 

313def _check_vcs_clashes( 

314 manifest: HighLevelManifest, 

315 migrations: list[FeatureMigration], 

316 untracked_files: Container[str], 

317) -> None: 

318 all_affected_files = [] 

319 if any(m.successful_manifest_changes for m in migrations): 

320 all_affected_files.append(manifest.manifest_path) 

321 

322 for previous_path, new_path in ( 

323 p for m in migrations for p in m.rename_paths_on_success 

324 ): 

325 all_affected_files.append(previous_path) 

326 all_affected_files.append(new_path) 

327 

328 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success) 

329 

330 clashing_paths = [p for p in all_affected_files if p in untracked_files] 

331 

332 if clashing_paths: 

333 print(file=sys.stderr) 

334 _warn( 

335 "The following untracked or ignored paths would be affected by this migration" 

336 ) 

337 _warn("") 

338 for path in clashing_paths: 

339 _warn(f" * {path}") 

340 _warn("") 

341 _warn("Since they are not tracked, the VCS cannot undo the changes to them") 

342 _warn("if the migration continued") 

343 print(file=sys.stderr) 

344 _error( 

345 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs" 

346 ) 

347 

348 

349def perform_migration( 

350 fo: IOBasedOutputStyling, 

351 manifest: HighLevelManifest, 

352 acceptable_migration_issues: AcceptableMigrationIssues, 

353 permit_destructive_changes: bool | None, 

354 migration_target: DebputyIntegrationMode, 

355 manifest_parser_factory: Callable[[str], HighLevelManifest], 

356 migrators: list[Migrator], 

357 *, 

358 ignore_vcs: bool = False, 

359) -> None: 

360 migrations = [] 

361 compat = _dh_compat_level() 

362 if compat is None: 

363 _error( 

364 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")' 

365 ) 

366 

367 debian_dir = manifest.debian_dir 

368 mutable_manifest = assume_not_none(manifest.mutable_manifest) 

369 

370 if not ignore_vcs: 

371 uses_git, untracked_files = _git_status() 

372 else: 

373 uses_git = False 

374 untracked_files = frozenset() 

375 

376 if permit_destructive_changes is None and uses_git: 

377 permit_destructive_changes = True 

378 

379 migration_request = MigrationRequest( 

380 debian_dir, 

381 manifest, 

382 acceptable_migration_issues, 

383 migration_target, 

384 ) 

385 

386 try: 

387 for migrator in migrators: 

388 feature_migration = FeatureMigration(migrator.__name__, fo) 

389 migrator(migration_request, feature_migration) 

390 migrations.append(feature_migration) 

391 except CannotEmulateExecutableDHConfigFile as e: 

392 _error( 

393 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}" 

394 ) 

395 except UnsupportedFeature as e: 

396 msg = ( 

397 f"Unable to migrate automatically due to missing features in debputy. The feature is:" 

398 f"\n\n * {e.message}" 

399 ) 

400 keys = e.issue_keys 

401 if keys: 

402 primary_key = keys[0] 

403 alt_keys = "" 

404 if len(keys) > 1: 

405 alt_keys = ( 

406 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some' 

407 " of these may cover more cases." 

408 ) 

409 msg += ( 

410 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again." 

411 " However, you should only do that if you believe you can replace the functionality manually" 

412 f" or the usage is obsolete / can be removed. {alt_keys}" 

413 ) 

414 _error(msg) 

415 except ConflictingChange as e: 

416 _error( 

417 "The migration tool detected a conflict data being migrated and data already migrated / in the existing" 

418 "manifest." 

419 f"\n\n * {e.message}" 

420 "\n\nPlease review the situation and resolve the conflict manually." 

421 ) 

422 

423 min_compat = max( 

424 (m.assumed_compat for m in migrations if m.assumed_compat is not None), 

425 default=0, 

426 ) 

427 

428 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues: 

429 # The migration summary special-cases the compat mismatch and warns for us. 

430 _error( 

431 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on" 

432 f" compat {compat}. This may lead to incorrect result." 

433 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and" 

434 f" try again, if you want to continue regardless." 

435 ) 

436 

437 requested_plugins = _requested_debputy_plugins(debian_dir) 

438 required_plugins: set[str] = set() 

439 required_plugins.update( 

440 chain.from_iterable( 

441 m.required_plugins for m in migrations if m.required_plugins 

442 ) 

443 ) 

444 

445 if uses_git: 

446 _check_vcs_clashes(manifest, migrations, untracked_files) 

447 

448 _print_migration_summary( 

449 fo, 

450 migrations, 

451 compat, 

452 min_compat, 

453 required_plugins, 

454 requested_plugins, 

455 ) 

456 migration_count = sum((m.performed_changes for m in migrations), 0) 

457 

458 if not migration_count: 

459 _info( 

460 "debputy was not able to find any (supported) migrations that it could perform for you." 

461 ) 

462 return 

463 

464 if any(m.successful_manifest_changes for m in migrations): 

465 new_manifest_path = manifest.manifest_path + ".new" 

466 

467 with open(new_manifest_path, "w") as fd: 

468 mutable_manifest.write_to(fd) 

469 

470 try: 

471 _info("Verifying the new manifest") 

472 manifest_parser_factory(new_manifest_path) 

473 except ManifestParseException as e: 

474 raise AssertionError( 

475 "Could not parse the manifest generated from the migrator" 

476 ) from e 

477 

478 if permit_destructive_changes: 

479 if os.path.isfile(manifest.manifest_path): 

480 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig") 

481 os.rename(new_manifest_path, manifest.manifest_path) 

482 if uses_git: 

483 subprocess.check_call(["git", "add", manifest.manifest_path]) 

484 _info(f"Updated manifest {manifest.manifest_path}") 

485 else: 

486 _info( 

487 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"' 

488 " to activate it)" 

489 ) 

490 else: 

491 _info("No manifest changes detected; skipping update of manifest.") 

492 

493 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0) 

494 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0) 

495 

496 if renames: 

497 if permit_destructive_changes: 

498 _info("Paths being renamed:") 

499 else: 

500 _info("Migration *would* rename the following paths:") 

501 prefix = "git " if uses_git else "" 

502 for previous_path, new_path in ( 

503 p for m in migrations for p in m.rename_paths_on_success 

504 ): 

505 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}") 

506 

507 if removals: 

508 if permit_destructive_changes: 

509 _info("Removals:") 

510 else: 

511 _info("Migration *would* remove the following files:") 

512 cmd = "git rm" if uses_git else "rm -f" 

513 for path in (p for m in migrations for p in m.remove_paths_on_success): 

514 _info(f" {cmd} {escape_shell(path)}") 

515 

516 if permit_destructive_changes is None: 

517 print() 

518 _info( 

519 "If you would like to perform the migration, please re-run with --apply-changes." 

520 ) 

521 elif permit_destructive_changes: 

522 _apply_renames_and_removals(migrations, uses_git) 

523 

524 print() 

525 _info("Migrations performed successfully") 

526 print() 

527 _info( 

528 "Remember to validate the resulting binary packages after rebuilding with debputy" 

529 ) 

530 if uses_git: 

531 _info( 

532 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them." 

533 ) 

534 else: 

535 print() 

536 _info("No migrations performed as requested") 

537 

538 

539def _apply_renames_and_removals( 

540 migrations: list[FeatureMigration], 

541 uses_git: bool, 

542) -> None: 

543 for previous_path, new_path in ( 

544 p for m in migrations for p in m.rename_paths_on_success 

545 ): 

546 if uses_git: 

547 subprocess.check_call(["git", "mv", previous_path, new_path]) 

548 else: 

549 os.rename(previous_path, new_path) 

550 

551 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success] 

552 if uses_git and files_being_removed: 

553 command = ["git", "rm"] 

554 command.extend(files_being_removed) 

555 subprocess.check_call(command) 

556 elif files_being_removed: 

557 for path in (p for m in migrations for p in m.remove_paths_on_success): 

558 os.unlink(path)