Coverage for src/debputy/dh_migration/migration.py: 7%

288 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import json 

2import os 

3import re 

4import subprocess 

5from itertools import chain 

6from typing import ( 

7 Optional, 

8 List, 

9 Callable, 

10 Set, 

11 Mapping, 

12 FrozenSet, 

13 Tuple, 

14 Iterable, 

15 Container, 

16) 

17 

18from debian.deb822 import Deb822 

19 

20from debputy.commands.debputy_cmd.context import CommandContext 

21from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

22from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile 

23from debputy.dh.dh_assistant import read_dh_addon_sequences 

24from debputy.dh_migration.migrators import Migrator, MigrationTarget 

25from debputy.dh_migration.migrators_impl import ( 

26 INTEGRATION_MODE_DH_DEBPUTY, 

27 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

28) 

29from debputy.dh_migration.models import ( 

30 FeatureMigration, 

31 AcceptableMigrationIssues, 

32 UnsupportedFeature, 

33 ConflictingChange, 

34) 

35from debputy.highlevel_manifest import HighLevelManifest 

36from debputy.integration_detection import determine_debputy_integration_mode 

37from debputy.manifest_parser.exceptions import ManifestParseException 

38from debputy.plugin.api import VirtualPath 

39from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL 

40from debputy.util import _error, _warn, _info, escape_shell, assume_not_none 

41 

42SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, FrozenSet[MigrationTarget]] = { 

43 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]), 

44 INTEGRATION_MODE_DH_DEBPUTY: frozenset( 

45 [ 

46 "dh-single-to-multi-binary", 

47 "dh-package-prefixed-config-files", 

48 INTEGRATION_MODE_DH_DEBPUTY, 

49 INTEGRATION_MODE_FULL, 

50 ] 

51 ), 

52 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset( 

53 [ 

54 "dh-single-to-multi-binary", 

55 "dh-package-prefixed-config-files", 

56 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

57 INTEGRATION_MODE_DH_DEBPUTY, 

58 INTEGRATION_MODE_FULL, 

59 ] 

60 ), 

61} 

62 

63 

64def _print_migration_summary( 

65 fo: IOBasedOutputStyling, 

66 migrations: List[FeatureMigration], 

67 compat: int, 

68 min_compat_level: int, 

69 required_plugins: Set[str], 

70 requested_plugins: Optional[Set[str]], 

71) -> None: 

72 warning_count = 0 

73 

74 for migration in migrations: 

75 if not migration.anything_to_do: 

76 continue 

77 underline = "-" * len(migration.tagline) 

78 if migration.warnings: 

79 if warning_count: 

80 _warn("") 

81 _warn(f"Summary for migration: {migration.tagline}") 

82 if not fo.optimize_for_screen_reader: 

83 _warn(f"-----------------------{underline}") 

84 warning_count += len(migration.warnings) 

85 for warning in migration.warnings: 

86 _warn(f" * {warning}") 

87 

88 if compat < min_compat_level: 

89 if warning_count: 

90 _warn("") 

91 _warn("Supported debhelper compat check") 

92 if not fo.optimize_for_screen_reader: 

93 _warn("--------------------------------") 

94 warning_count += 1 

95 _warn( 

96 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package" 

97 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}" 

98 " first." 

99 ) 

100 

101 if required_plugins: 

102 if requested_plugins is None: 

103 warning_count += 1 

104 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins) 

105 if warning_count: 

106 _warn("") 

107 _warn("Missing debputy plugin check") 

108 if not fo.optimize_for_screen_reader: 

109 _warn("----------------------------") 

110 _warn( 

111 f"The migration tool could not read d/control and therefore cannot tell if all the required" 

112 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}" 

113 ) 

114 else: 

115 missing_plugins = required_plugins - requested_plugins 

116 if missing_plugins: 

117 warning_count += 1 

118 needed_plugins = ", ".join( 

119 f"debputy-plugin-{n}" for n in missing_plugins 

120 ) 

121 if warning_count: 

122 _warn("") 

123 _warn("Missing debputy plugin check") 

124 if not fo.optimize_for_screen_reader: 

125 _warn("----------------------------") 

126 _warn( 

127 f"The migration tool asserted that the following `debputy` plugins would be required, which" 

128 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}" 

129 ) 

130 

131 if warning_count: 

132 _warn("") 

133 _warn( 

134 f"/!\\ Total number of warnings or manual migrations required: {warning_count}" 

135 ) 

136 

137 

138def _dh_compat_level() -> Optional[int]: 

139 try: 

140 res = subprocess.check_output( 

141 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL 

142 ) 

143 except subprocess.CalledProcessError: 

144 compat = None 

145 else: 

146 try: 

147 compat = json.loads(res)["declared-compat-level"] 

148 except RuntimeError: 

149 compat = None 

150 else: 

151 if not isinstance(compat, int): 

152 compat = None 

153 return compat 

154 

155 

156def _requested_debputy_plugins(debian_dir: VirtualPath) -> Optional[Set[str]]: 

157 ctrl_file = debian_dir.get("control") 

158 if not ctrl_file: 

159 return None 

160 

161 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII) 

162 plugins = set() 

163 

164 with ctrl_file.open() as fd: 

165 ctrl = list(Deb822.iter_paragraphs(fd)) 

166 source_paragraph = ctrl[0] if ctrl else {} 

167 

168 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"): 

169 field = source_paragraph.get(f) 

170 if not field: 

171 continue 

172 

173 for dep_clause in (d.strip() for d in field.split(",")): 

174 match = dep_regex.match(dep_clause.strip()) 

175 if not match: 

176 continue 

177 dep = match.group(1) 

178 if not dep.startswith("debputy-plugin-"): 

179 continue 

180 plugins.add(dep[15:]) 

181 return plugins 

182 

183 

184def _check_migration_target( 

185 context: CommandContext, 

186 migration_target: Optional[DebputyIntegrationMode], 

187) -> DebputyIntegrationMode: 

188 r = read_dh_addon_sequences(context.debian_dir) 

189 if r is not None: 

190 bd_sequences, dr_sequences, _ = r 

191 all_sequences = bd_sequences | dr_sequences 

192 detected_migration_target = determine_debputy_integration_mode( 

193 context.source_package().fields, 

194 all_sequences, 

195 ) 

196 else: 

197 detected_migration_target = None 

198 

199 if migration_target is not None and detected_migration_target is not None: 

200 supported_migrations = SUPPORTED_MIGRATIONS.get( 

201 detected_migration_target, 

202 frozenset([detected_migration_target]), 

203 ) 

204 

205 if ( 

206 migration_target != detected_migration_target 

207 and migration_target not in supported_migrations 

208 ): 

209 _error( 

210 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})" 

211 ) 

212 

213 if migration_target is not None: 

214 resolved_migration_target = migration_target 

215 _info(f'Using "{resolved_migration_target}" as migration (requested)') 

216 else: 

217 if detected_migration_target is not None: 

218 _info( 

219 f'Using "{detected_migration_target}" as migration (based on the packaging)' 

220 ) 

221 else: 

222 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY 

223 _info( 

224 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!' 

225 ) 

226 resolved_migration_target = detected_migration_target 

227 

228 return resolved_migration_target 

229 

230 

231def _read_git_status_lines( 

232 lines: Iterable[bytes], 

233) -> Iterable[Tuple[str, str, Optional[str]]]: 

234 line_iter = iter(lines) 

235 while True: 

236 try: 

237 line = next(line_iter) 

238 except StopIteration: 

239 return 

240 if not line: 

241 # We get a final empty line 

242 continue 

243 if len(line) < 4: 

244 _error( 

245 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})" 

246 ) 

247 status_marker = line[0:2].decode("utf-8") 

248 if line[2] != 32: # Space 

249 _error( 

250 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)" 

251 ) 

252 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")): 

253 try: 

254 second_filename = next(line_iter).decode("utf-8") 

255 except StopIteration: 

256 _error( 

257 "Internal error: Expected one more line of output from `git status` but it was not there" 

258 ) 

259 else: 

260 second_filename = None 

261 filename = line[3:].decode("utf-8") 

262 yield status_marker, filename, second_filename 

263 

264 

265def _git_status() -> Tuple[bool, Container[str]]: 

266 try: 

267 top_level = ( 

268 subprocess.check_output( 

269 ["git", "rev-parse", "--show-toplevel"], 

270 stderr=subprocess.DEVNULL, 

271 cwd="debian", 

272 ) 

273 .strip() 

274 .decode("utf-8") 

275 ) 

276 except (subprocess.CalledProcessError, FileNotFoundError): 

277 return False, frozenset() 

278 

279 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level): 

280 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method. 

281 _error( 

282 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue." 

283 ) 

284 

285 try: 

286 r = subprocess.check_output( 

287 ["git", "status", "--porcelain=v1", "-z"], 

288 stderr=subprocess.DEVNULL, 

289 cwd="debian", 

290 ) 

291 except subprocess.CalledProcessError: 

292 _error( 

293 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway." 

294 ) 

295 except FileNotFoundError: 

296 _error( 

297 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway." 

298 ) 

299 

300 untracked_files = set() 

301 for status_marker, filename, second_filename in _read_git_status_lines( 

302 r.split(b"\0") 

303 ): 

304 if status_marker not in ("??", "!!"): 

305 _error( 

306 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway." 

307 ) 

308 # There is never a second file for `??` or `!!` 

309 assert second_filename is None 

310 untracked_files.add(filename) 

311 return True, untracked_files 

312 

313 

314def _check_vcs_clashes( 

315 manifest: HighLevelManifest, 

316 migrations: List[FeatureMigration], 

317 untracked_files: Container[str], 

318) -> None: 

319 all_affected_files = [] 

320 if any(m.successful_manifest_changes for m in migrations): 

321 all_affected_files.append(manifest.manifest_path) 

322 

323 for previous_path, new_path in ( 

324 p for m in migrations for p in m.rename_paths_on_success 

325 ): 

326 all_affected_files.append(previous_path) 

327 all_affected_files.append(new_path) 

328 

329 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success) 

330 

331 clashing_paths = [p for p in all_affected_files if p in untracked_files] 

332 

333 if clashing_paths: 

334 print() 

335 _warn( 

336 "The following untracked or ignored paths would be affected by this migration" 

337 ) 

338 _warn("") 

339 for path in clashing_paths: 

340 _warn(f" * {path}") 

341 _warn("") 

342 _warn("Since they are not tracked, the VCS cannot undo the changes to them") 

343 _warn("if the migration continued") 

344 print() 

345 _error( 

346 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs" 

347 ) 

348 

349 

350def perform_migration( 

351 fo: IOBasedOutputStyling, 

352 manifest: HighLevelManifest, 

353 acceptable_migration_issues: AcceptableMigrationIssues, 

354 permit_destructive_changes: Optional[bool], 

355 migration_target: DebputyIntegrationMode, 

356 manifest_parser_factory: Callable[[str], HighLevelManifest], 

357 migrators: List[Migrator], 

358 *, 

359 ignore_vcs: bool = False, 

360) -> None: 

361 migrations = [] 

362 compat = _dh_compat_level() 

363 if compat is None: 

364 _error( 

365 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")' 

366 ) 

367 

368 debian_dir = manifest.debian_dir 

369 mutable_manifest = assume_not_none(manifest.mutable_manifest) 

370 

371 if not ignore_vcs: 

372 uses_git, untracked_files = _git_status() 

373 else: 

374 uses_git = False 

375 untracked_files = frozenset() 

376 

377 if permit_destructive_changes is None and uses_git: 

378 permit_destructive_changes = True 

379 

380 try: 

381 for migrator in migrators: 

382 feature_migration = FeatureMigration(migrator.__name__, fo) 

383 migrator( 

384 debian_dir, 

385 manifest, 

386 acceptable_migration_issues, 

387 feature_migration, 

388 migration_target, 

389 ) 

390 migrations.append(feature_migration) 

391 except CannotEmulateExecutableDHConfigFile as e: 

392 _error( 

393 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}" 

394 ) 

395 except UnsupportedFeature as e: 

396 msg = ( 

397 f"Unable to migrate automatically due to missing features in debputy. The feature is:" 

398 f"\n\n * {e.message}" 

399 ) 

400 keys = e.issue_keys 

401 if keys: 

402 primary_key = keys[0] 

403 alt_keys = "" 

404 if len(keys) > 1: 

405 alt_keys = ( 

406 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some' 

407 " of these may cover more cases." 

408 ) 

409 msg += ( 

410 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again." 

411 " However, you should only do that if you believe you can replace the functionality manually" 

412 f" or the usage is obsolete / can be removed. {alt_keys}" 

413 ) 

414 _error(msg) 

415 except ConflictingChange as e: 

416 _error( 

417 "The migration tool detected a conflict data being migrated and data already migrated / in the existing" 

418 "manifest." 

419 f"\n\n * {e.message}" 

420 "\n\nPlease review the situation and resolve the conflict manually." 

421 ) 

422 

423 # We start on compat 12 for arch:any due to the new dh_makeshlibs and dh_installinit default 

424 # For arch:any, the min is compat 14 due to `dh_dwz` being removed. 

425 min_compat = 14 if any(not p.is_arch_all for p in manifest.all_packages) else 12 

426 min_compat = max( 

427 (m.assumed_compat for m in migrations if m.assumed_compat is not None), 

428 default=min_compat, 

429 ) 

430 

431 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues: 

432 # The migration summary special-cases the compat mismatch and warns for us. 

433 _error( 

434 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on" 

435 f" compat {compat}. This may lead to incorrect result." 

436 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and" 

437 f" try again, if you want to continue regardless." 

438 ) 

439 

440 requested_plugins = _requested_debputy_plugins(debian_dir) 

441 required_plugins: Set[str] = set() 

442 required_plugins.update( 

443 chain.from_iterable( 

444 m.required_plugins for m in migrations if m.required_plugins 

445 ) 

446 ) 

447 

448 if uses_git: 

449 _check_vcs_clashes(manifest, migrations, untracked_files) 

450 

451 _print_migration_summary( 

452 fo, 

453 migrations, 

454 compat, 

455 min_compat, 

456 required_plugins, 

457 requested_plugins, 

458 ) 

459 migration_count = sum((m.performed_changes for m in migrations), 0) 

460 

461 if not migration_count: 

462 _info( 

463 "debputy was not able to find any (supported) migrations that it could perform for you." 

464 ) 

465 return 

466 

467 if any(m.successful_manifest_changes for m in migrations): 

468 new_manifest_path = manifest.manifest_path + ".new" 

469 

470 with open(new_manifest_path, "w") as fd: 

471 mutable_manifest.write_to(fd) 

472 

473 try: 

474 _info("Verifying the new manifest") 

475 manifest_parser_factory(new_manifest_path) 

476 except ManifestParseException as e: 

477 raise AssertionError( 

478 "Could not parse the manifest generated from the migrator" 

479 ) from e 

480 

481 if permit_destructive_changes: 

482 if os.path.isfile(manifest.manifest_path): 

483 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig") 

484 os.rename(new_manifest_path, manifest.manifest_path) 

485 if uses_git: 

486 subprocess.check_call(["git", "add", manifest.manifest_path]) 

487 _info(f"Updated manifest {manifest.manifest_path}") 

488 else: 

489 _info( 

490 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"' 

491 " to activate it)" 

492 ) 

493 else: 

494 _info("No manifest changes detected; skipping update of manifest.") 

495 

496 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0) 

497 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0) 

498 

499 if renames: 

500 if permit_destructive_changes: 

501 _info("Paths being renamed:") 

502 else: 

503 _info("Migration *would* rename the following paths:") 

504 prefix = "git " if uses_git else "" 

505 for previous_path, new_path in ( 

506 p for m in migrations for p in m.rename_paths_on_success 

507 ): 

508 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}") 

509 

510 if removals: 

511 if permit_destructive_changes: 

512 _info("Removals:") 

513 else: 

514 _info("Migration *would* remove the following files:") 

515 cmd = "git rm" if uses_git else "rm -f" 

516 for path in (p for m in migrations for p in m.remove_paths_on_success): 

517 _info(f" {cmd} {escape_shell(path)}") 

518 

519 if permit_destructive_changes is None: 

520 print() 

521 _info( 

522 "If you would like to perform the migration, please re-run with --apply-changes." 

523 ) 

524 elif permit_destructive_changes: 

525 _apply_renames_and_removals(migrations, uses_git) 

526 

527 print() 

528 _info("Migrations performed successfully") 

529 print() 

530 _info( 

531 "Remember to validate the resulting binary packages after rebuilding with debputy" 

532 ) 

533 if uses_git: 

534 _info( 

535 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them." 

536 ) 

537 else: 

538 print() 

539 _info("No migrations performed as requested") 

540 

541 

542def _apply_renames_and_removals( 

543 migrations: List[FeatureMigration], 

544 uses_git: bool, 

545) -> None: 

546 for previous_path, new_path in ( 

547 p for m in migrations for p in m.rename_paths_on_success 

548 ): 

549 if uses_git: 

550 subprocess.check_call(["git", "mv", previous_path, new_path]) 

551 else: 

552 os.rename(previous_path, new_path) 

553 

554 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success] 

555 if uses_git and files_being_removed: 

556 command = ["git", "rm"] 

557 command.extend(files_being_removed) 

558 subprocess.check_call(command) 

559 elif files_being_removed: 

560 for path in (p for m in migrations for p in m.remove_paths_on_success): 

561 os.unlink(path)