Coverage for src/debputy/dh_migration/migration.py: 8%
289 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import json
2import os
3import re
4import subprocess
5import sys
6from itertools import chain
7from typing import (
8 Optional,
9 List,
10 Set,
11 FrozenSet,
12 Tuple,
13)
14from collections.abc import Callable, Mapping, Iterable, Container
16from debian.deb822 import Deb822
18from debputy.commands.debputy_cmd.context import CommandContext
19from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
20from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile
21from debputy.dh.dh_assistant import read_dh_addon_sequences
22from debputy.dh_migration.migrators import Migrator, MigrationTarget
23from debputy.dh_migration.migrators_impl import (
24 INTEGRATION_MODE_DH_DEBPUTY,
25 INTEGRATION_MODE_DH_DEBPUTY_RRR,
26)
27from debputy.dh_migration.models import (
28 FeatureMigration,
29 AcceptableMigrationIssues,
30 UnsupportedFeature,
31 ConflictingChange,
32)
33from debputy.highlevel_manifest import HighLevelManifest
34from debputy.integration_detection import determine_debputy_integration_mode
35from debputy.manifest_parser.exceptions import ManifestParseException
36from debputy.plugin.api import VirtualPath
37from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL
38from debputy.util import _error, _warn, _info, escape_shell, assume_not_none
40SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, frozenset[MigrationTarget]] = {
41 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]),
42 INTEGRATION_MODE_DH_DEBPUTY: frozenset(
43 [
44 "dh-single-to-multi-binary",
45 "dh-package-prefixed-config-files",
46 INTEGRATION_MODE_DH_DEBPUTY,
47 INTEGRATION_MODE_FULL,
48 ]
49 ),
50 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset(
51 [
52 "dh-single-to-multi-binary",
53 "dh-package-prefixed-config-files",
54 INTEGRATION_MODE_DH_DEBPUTY_RRR,
55 INTEGRATION_MODE_DH_DEBPUTY,
56 INTEGRATION_MODE_FULL,
57 ]
58 ),
59}
62def _print_migration_summary(
63 fo: IOBasedOutputStyling,
64 migrations: list[FeatureMigration],
65 compat: int,
66 min_compat_level: int,
67 required_plugins: set[str],
68 requested_plugins: set[str] | None,
69) -> None:
70 warning_count = 0
72 for migration in migrations:
73 if not migration.anything_to_do:
74 continue
75 underline = "-" * len(migration.tagline)
76 if migration.warnings:
77 if warning_count:
78 _warn("")
79 _warn(f"Summary for migration: {migration.tagline}")
80 if not fo.optimize_for_screen_reader:
81 _warn(f"-----------------------{underline}")
82 warning_count += len(migration.warnings)
83 for warning in migration.warnings:
84 _warn(f" * {warning}")
86 if compat < min_compat_level:
87 if warning_count:
88 _warn("")
89 _warn("Supported debhelper compat check")
90 if not fo.optimize_for_screen_reader:
91 _warn("--------------------------------")
92 warning_count += 1
93 _warn(
94 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package"
95 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}"
96 " first."
97 )
99 if required_plugins:
100 if requested_plugins is None:
101 warning_count += 1
102 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins)
103 if warning_count:
104 _warn("")
105 _warn("Missing debputy plugin check")
106 if not fo.optimize_for_screen_reader:
107 _warn("----------------------------")
108 _warn(
109 f"The migration tool could not read d/control and therefore cannot tell if all the required"
110 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}"
111 )
112 else:
113 missing_plugins = required_plugins - requested_plugins
114 if missing_plugins:
115 warning_count += 1
116 needed_plugins = ", ".join(
117 f"debputy-plugin-{n}" for n in missing_plugins
118 )
119 if warning_count:
120 _warn("")
121 _warn("Missing debputy plugin check")
122 if not fo.optimize_for_screen_reader:
123 _warn("----------------------------")
124 _warn(
125 f"The migration tool asserted that the following `debputy` plugins would be required, which"
126 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}"
127 )
129 if warning_count:
130 _warn("")
131 _warn(
132 f"/!\\ Total number of warnings or manual migrations required: {warning_count}"
133 )
136def _dh_compat_level() -> int | None:
137 try:
138 res = subprocess.check_output(
139 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL
140 )
141 except subprocess.CalledProcessError:
142 compat = None
143 else:
144 try:
145 compat = json.loads(res)["declared-compat-level"]
146 except RuntimeError:
147 compat = None
148 else:
149 if not isinstance(compat, int):
150 compat = None
151 return compat
154def _requested_debputy_plugins(debian_dir: VirtualPath) -> set[str] | None:
155 ctrl_file = debian_dir.get("control")
156 if not ctrl_file:
157 return None
159 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
160 plugins = set()
162 with ctrl_file.open() as fd:
163 ctrl = list(Deb822.iter_paragraphs(fd))
164 source_paragraph = ctrl[0] if ctrl else {}
166 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
167 field = source_paragraph.get(f)
168 if not field:
169 continue
171 for dep_clause in (d.strip() for d in field.split(",")):
172 match = dep_regex.match(dep_clause.strip())
173 if not match:
174 continue
175 dep = match.group(1)
176 if not dep.startswith("debputy-plugin-"):
177 continue
178 plugins.add(dep[15:])
179 return plugins
182def _check_migration_target(
183 context: CommandContext,
184 migration_target: DebputyIntegrationMode | None,
185) -> DebputyIntegrationMode:
186 r = read_dh_addon_sequences(context.debian_dir)
187 if r is not None:
188 bd_sequences, dr_sequences, _ = r
189 all_sequences = bd_sequences | dr_sequences
190 detected_migration_target = determine_debputy_integration_mode(
191 context.source_package().fields,
192 all_sequences,
193 )
194 else:
195 detected_migration_target = None
197 if migration_target is not None and detected_migration_target is not None:
198 supported_migrations = SUPPORTED_MIGRATIONS.get(
199 detected_migration_target,
200 frozenset([detected_migration_target]),
201 )
203 if (
204 migration_target != detected_migration_target
205 and migration_target not in supported_migrations
206 ):
207 _error(
208 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})"
209 )
211 if migration_target is not None:
212 resolved_migration_target = migration_target
213 _info(f'Using "{resolved_migration_target}" as migration (requested)')
214 else:
215 if detected_migration_target is not None:
216 _info(
217 f'Using "{detected_migration_target}" as migration (based on the packaging)'
218 )
219 else:
220 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY
221 _info(
222 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!'
223 )
224 resolved_migration_target = detected_migration_target
226 return resolved_migration_target
229def _read_git_status_lines(
230 lines: Iterable[bytes],
231) -> Iterable[tuple[str, str, str | None]]:
232 line_iter = iter(lines)
233 while True:
234 try:
235 line = next(line_iter)
236 except StopIteration:
237 return
238 if not line:
239 # We get a final empty line
240 continue
241 if len(line) < 4:
242 _error(
243 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})"
244 )
245 status_marker = line[0:2].decode("utf-8")
246 if line[2] != 32: # Space
247 _error(
248 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)"
249 )
250 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")):
251 try:
252 second_filename = next(line_iter).decode("utf-8")
253 except StopIteration:
254 _error(
255 "Internal error: Expected one more line of output from `git status` but it was not there"
256 )
257 else:
258 second_filename = None
259 filename = line[3:].decode("utf-8")
260 yield status_marker, filename, second_filename
263def _git_status() -> tuple[bool, Container[str]]:
264 try:
265 top_level = (
266 subprocess.check_output(
267 ["git", "rev-parse", "--show-toplevel"],
268 stderr=subprocess.DEVNULL,
269 cwd="debian",
270 )
271 .strip()
272 .decode("utf-8")
273 )
274 except (subprocess.CalledProcessError, FileNotFoundError):
275 return False, frozenset()
277 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level):
278 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method.
279 _error(
280 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue."
281 )
283 try:
284 r = subprocess.check_output(
285 ["git", "status", "--porcelain=v1", "-z"],
286 stderr=subprocess.DEVNULL,
287 cwd="debian",
288 )
289 except subprocess.CalledProcessError:
290 _error(
291 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway."
292 )
293 except FileNotFoundError:
294 _error(
295 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway."
296 )
298 untracked_files = set()
299 for status_marker, filename, second_filename in _read_git_status_lines(
300 r.split(b"\0")
301 ):
302 if status_marker not in ("??", "!!"):
303 _error(
304 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway."
305 )
306 # There is never a second file for `??` or `!!`
307 assert second_filename is None
308 untracked_files.add(filename)
309 return True, untracked_files
312def _check_vcs_clashes(
313 manifest: HighLevelManifest,
314 migrations: list[FeatureMigration],
315 untracked_files: Container[str],
316) -> None:
317 all_affected_files = []
318 if any(m.successful_manifest_changes for m in migrations):
319 all_affected_files.append(manifest.manifest_path)
321 for previous_path, new_path in (
322 p for m in migrations for p in m.rename_paths_on_success
323 ):
324 all_affected_files.append(previous_path)
325 all_affected_files.append(new_path)
327 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success)
329 clashing_paths = [p for p in all_affected_files if p in untracked_files]
331 if clashing_paths:
332 print(file=sys.stderr)
333 _warn(
334 "The following untracked or ignored paths would be affected by this migration"
335 )
336 _warn("")
337 for path in clashing_paths:
338 _warn(f" * {path}")
339 _warn("")
340 _warn("Since they are not tracked, the VCS cannot undo the changes to them")
341 _warn("if the migration continued")
342 print(file=sys.stderr)
343 _error(
344 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs"
345 )
348def perform_migration(
349 fo: IOBasedOutputStyling,
350 manifest: HighLevelManifest,
351 acceptable_migration_issues: AcceptableMigrationIssues,
352 permit_destructive_changes: bool | None,
353 migration_target: DebputyIntegrationMode,
354 manifest_parser_factory: Callable[[str], HighLevelManifest],
355 migrators: list[Migrator],
356 *,
357 ignore_vcs: bool = False,
358) -> None:
359 migrations = []
360 compat = _dh_compat_level()
361 if compat is None:
362 _error(
363 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")'
364 )
366 debian_dir = manifest.debian_dir
367 mutable_manifest = assume_not_none(manifest.mutable_manifest)
369 if not ignore_vcs:
370 uses_git, untracked_files = _git_status()
371 else:
372 uses_git = False
373 untracked_files = frozenset()
375 if permit_destructive_changes is None and uses_git:
376 permit_destructive_changes = True
378 try:
379 for migrator in migrators:
380 feature_migration = FeatureMigration(migrator.__name__, fo)
381 migrator(
382 debian_dir,
383 manifest,
384 acceptable_migration_issues,
385 feature_migration,
386 migration_target,
387 )
388 migrations.append(feature_migration)
389 except CannotEmulateExecutableDHConfigFile as e:
390 _error(
391 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}"
392 )
393 except UnsupportedFeature as e:
394 msg = (
395 f"Unable to migrate automatically due to missing features in debputy. The feature is:"
396 f"\n\n * {e.message}"
397 )
398 keys = e.issue_keys
399 if keys:
400 primary_key = keys[0]
401 alt_keys = ""
402 if len(keys) > 1:
403 alt_keys = (
404 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some'
405 " of these may cover more cases."
406 )
407 msg += (
408 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again."
409 " However, you should only do that if you believe you can replace the functionality manually"
410 f" or the usage is obsolete / can be removed. {alt_keys}"
411 )
412 _error(msg)
413 except ConflictingChange as e:
414 _error(
415 "The migration tool detected a conflict data being migrated and data already migrated / in the existing"
416 "manifest."
417 f"\n\n * {e.message}"
418 "\n\nPlease review the situation and resolve the conflict manually."
419 )
421 min_compat = max(
422 (m.assumed_compat for m in migrations if m.assumed_compat is not None),
423 default=0,
424 )
426 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues:
427 # The migration summary special-cases the compat mismatch and warns for us.
428 _error(
429 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on"
430 f" compat {compat}. This may lead to incorrect result."
431 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and"
432 f" try again, if you want to continue regardless."
433 )
435 requested_plugins = _requested_debputy_plugins(debian_dir)
436 required_plugins: set[str] = set()
437 required_plugins.update(
438 chain.from_iterable(
439 m.required_plugins for m in migrations if m.required_plugins
440 )
441 )
443 if uses_git:
444 _check_vcs_clashes(manifest, migrations, untracked_files)
446 _print_migration_summary(
447 fo,
448 migrations,
449 compat,
450 min_compat,
451 required_plugins,
452 requested_plugins,
453 )
454 migration_count = sum((m.performed_changes for m in migrations), 0)
456 if not migration_count:
457 _info(
458 "debputy was not able to find any (supported) migrations that it could perform for you."
459 )
460 return
462 if any(m.successful_manifest_changes for m in migrations):
463 new_manifest_path = manifest.manifest_path + ".new"
465 with open(new_manifest_path, "w") as fd:
466 mutable_manifest.write_to(fd)
468 try:
469 _info("Verifying the new manifest")
470 manifest_parser_factory(new_manifest_path)
471 except ManifestParseException as e:
472 raise AssertionError(
473 "Could not parse the manifest generated from the migrator"
474 ) from e
476 if permit_destructive_changes:
477 if os.path.isfile(manifest.manifest_path):
478 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig")
479 os.rename(new_manifest_path, manifest.manifest_path)
480 if uses_git:
481 subprocess.check_call(["git", "add", manifest.manifest_path])
482 _info(f"Updated manifest {manifest.manifest_path}")
483 else:
484 _info(
485 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"'
486 " to activate it)"
487 )
488 else:
489 _info("No manifest changes detected; skipping update of manifest.")
491 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0)
492 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0)
494 if renames:
495 if permit_destructive_changes:
496 _info("Paths being renamed:")
497 else:
498 _info("Migration *would* rename the following paths:")
499 prefix = "git " if uses_git else ""
500 for previous_path, new_path in (
501 p for m in migrations for p in m.rename_paths_on_success
502 ):
503 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}")
505 if removals:
506 if permit_destructive_changes:
507 _info("Removals:")
508 else:
509 _info("Migration *would* remove the following files:")
510 cmd = "git rm" if uses_git else "rm -f"
511 for path in (p for m in migrations for p in m.remove_paths_on_success):
512 _info(f" {cmd} {escape_shell(path)}")
514 if permit_destructive_changes is None:
515 print()
516 _info(
517 "If you would like to perform the migration, please re-run with --apply-changes."
518 )
519 elif permit_destructive_changes:
520 _apply_renames_and_removals(migrations, uses_git)
522 print()
523 _info("Migrations performed successfully")
524 print()
525 _info(
526 "Remember to validate the resulting binary packages after rebuilding with debputy"
527 )
528 if uses_git:
529 _info(
530 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them."
531 )
532 else:
533 print()
534 _info("No migrations performed as requested")
537def _apply_renames_and_removals(
538 migrations: list[FeatureMigration],
539 uses_git: bool,
540) -> None:
541 for previous_path, new_path in (
542 p for m in migrations for p in m.rename_paths_on_success
543 ):
544 if uses_git:
545 subprocess.check_call(["git", "mv", previous_path, new_path])
546 else:
547 os.rename(previous_path, new_path)
549 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success]
550 if uses_git and files_being_removed:
551 command = ["git", "rm"]
552 command.extend(files_being_removed)
553 subprocess.check_call(command)
554 elif files_being_removed:
555 for path in (p for m in migrations for p in m.remove_paths_on_success):
556 os.unlink(path)