Coverage for src/debputy/dh_migration/migration.py: 8%
290 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-19 09:24 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-19 09:24 +0000
1import json
2import os
3import re
4import subprocess
5import sys
6from itertools import chain
7from typing import (
8 Optional,
9 List,
10 Set,
11 FrozenSet,
12 Tuple,
13)
14from collections.abc import Callable, Mapping, Iterable, Container
16from debian.deb822 import Deb822
18from debputy.commands.debputy_cmd.context import CommandContext
19from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
20from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile
21from debputy.dh.dh_assistant import read_dh_addon_sequences
22from debputy.dh_migration.migrators import Migrator, MigrationTarget
23from debputy.dh_migration.migrators_impl import (
24 INTEGRATION_MODE_DH_DEBPUTY,
25 INTEGRATION_MODE_DH_DEBPUTY_RRR,
26)
27from debputy.dh_migration.models import (
28 FeatureMigration,
29 AcceptableMigrationIssues,
30 UnsupportedFeature,
31 ConflictingChange,
32 MigrationRequest,
33)
34from debputy.highlevel_manifest import HighLevelManifest
35from debputy.integration_detection import determine_debputy_integration_mode
36from debputy.manifest_parser.exceptions import ManifestParseException
37from debputy.plugin.api import VirtualPath
38from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL
39from debputy.util import _error, _warn, _info, escape_shell, assume_not_none
41SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, frozenset[MigrationTarget]] = {
42 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]),
43 INTEGRATION_MODE_DH_DEBPUTY: frozenset(
44 [
45 "dh-single-to-multi-binary",
46 "dh-package-prefixed-config-files",
47 INTEGRATION_MODE_DH_DEBPUTY,
48 INTEGRATION_MODE_FULL,
49 ]
50 ),
51 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset(
52 [
53 "dh-single-to-multi-binary",
54 "dh-package-prefixed-config-files",
55 INTEGRATION_MODE_DH_DEBPUTY_RRR,
56 INTEGRATION_MODE_DH_DEBPUTY,
57 INTEGRATION_MODE_FULL,
58 ]
59 ),
60}
63def _print_migration_summary(
64 fo: IOBasedOutputStyling,
65 migrations: list[FeatureMigration],
66 compat: int,
67 min_compat_level: int,
68 required_plugins: set[str],
69 requested_plugins: set[str] | None,
70) -> None:
71 warning_count = 0
73 for migration in migrations:
74 if not migration.anything_to_do:
75 continue
76 underline = "-" * len(migration.tagline)
77 if migration.warnings:
78 if warning_count:
79 _warn("")
80 _warn(f"Summary for migration: {migration.tagline}")
81 if not fo.optimize_for_screen_reader:
82 _warn(f"-----------------------{underline}")
83 warning_count += len(migration.warnings)
84 for warning in migration.warnings:
85 _warn(f" * {warning}")
87 if compat < min_compat_level:
88 if warning_count:
89 _warn("")
90 _warn("Supported debhelper compat check")
91 if not fo.optimize_for_screen_reader:
92 _warn("--------------------------------")
93 warning_count += 1
94 _warn(
95 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package"
96 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}"
97 " first."
98 )
100 if required_plugins:
101 if requested_plugins is None:
102 warning_count += 1
103 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins)
104 if warning_count:
105 _warn("")
106 _warn("Missing debputy plugin check")
107 if not fo.optimize_for_screen_reader:
108 _warn("----------------------------")
109 _warn(
110 f"The migration tool could not read d/control and therefore cannot tell if all the required"
111 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}"
112 )
113 else:
114 missing_plugins = required_plugins - requested_plugins
115 if missing_plugins:
116 warning_count += 1
117 needed_plugins = ", ".join(
118 f"debputy-plugin-{n}" for n in missing_plugins
119 )
120 if warning_count:
121 _warn("")
122 _warn("Missing debputy plugin check")
123 if not fo.optimize_for_screen_reader:
124 _warn("----------------------------")
125 _warn(
126 f"The migration tool asserted that the following `debputy` plugins would be required, which"
127 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}"
128 )
130 if warning_count:
131 _warn("")
132 _warn(
133 f"/!\\ Total number of warnings or manual migrations required: {warning_count}"
134 )
137def _dh_compat_level() -> int | None:
138 try:
139 res = subprocess.check_output(
140 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL
141 )
142 except subprocess.CalledProcessError:
143 compat = None
144 else:
145 try:
146 compat = json.loads(res)["declared-compat-level"]
147 except RuntimeError:
148 compat = None
149 else:
150 if not isinstance(compat, int):
151 compat = None
152 return compat
155def _requested_debputy_plugins(debian_dir: VirtualPath) -> set[str] | None:
156 ctrl_file = debian_dir.get("control")
157 if not ctrl_file:
158 return None
160 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
161 plugins = set()
163 with ctrl_file.open() as fd:
164 ctrl = list(Deb822.iter_paragraphs(fd))
165 source_paragraph = ctrl[0] if ctrl else {}
167 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
168 field = source_paragraph.get(f)
169 if not field:
170 continue
172 for dep_clause in (d.strip() for d in field.split(",")):
173 match = dep_regex.match(dep_clause.strip())
174 if not match:
175 continue
176 dep = match.group(1)
177 if not dep.startswith("debputy-plugin-"):
178 continue
179 plugins.add(dep[15:])
180 return plugins
183def _check_migration_target(
184 context: CommandContext,
185 migration_target: DebputyIntegrationMode | None,
186) -> DebputyIntegrationMode:
187 r = read_dh_addon_sequences(context.debian_dir)
188 if r is not None:
189 bd_sequences, dr_sequences, _ = r
190 all_sequences = bd_sequences | dr_sequences
191 detected_migration_target = determine_debputy_integration_mode(
192 context.source_package().fields,
193 all_sequences,
194 )
195 else:
196 detected_migration_target = None
198 if migration_target is not None and detected_migration_target is not None:
199 supported_migrations = SUPPORTED_MIGRATIONS.get(
200 detected_migration_target,
201 frozenset([detected_migration_target]),
202 )
204 if (
205 migration_target != detected_migration_target
206 and migration_target not in supported_migrations
207 ):
208 _error(
209 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})"
210 )
212 if migration_target is not None:
213 resolved_migration_target = migration_target
214 _info(f'Using "{resolved_migration_target}" as migration (requested)')
215 else:
216 if detected_migration_target is not None:
217 _info(
218 f'Using "{detected_migration_target}" as migration (based on the packaging)'
219 )
220 else:
221 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY
222 _info(
223 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!'
224 )
225 resolved_migration_target = detected_migration_target
227 return resolved_migration_target
230def _read_git_status_lines(
231 lines: Iterable[bytes],
232) -> Iterable[tuple[str, str, str | None]]:
233 line_iter = iter(lines)
234 while True:
235 try:
236 line = next(line_iter)
237 except StopIteration:
238 return
239 if not line:
240 # We get a final empty line
241 continue
242 if len(line) < 4:
243 _error(
244 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})"
245 )
246 status_marker = line[0:2].decode("utf-8")
247 if line[2] != 32: # Space
248 _error(
249 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)"
250 )
251 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")):
252 try:
253 second_filename = next(line_iter).decode("utf-8")
254 except StopIteration:
255 _error(
256 "Internal error: Expected one more line of output from `git status` but it was not there"
257 )
258 else:
259 second_filename = None
260 filename = line[3:].decode("utf-8")
261 yield status_marker, filename, second_filename
264def _git_status() -> tuple[bool, Container[str]]:
265 try:
266 top_level = (
267 subprocess.check_output(
268 ["git", "rev-parse", "--show-toplevel"],
269 stderr=subprocess.DEVNULL,
270 cwd="debian",
271 )
272 .strip()
273 .decode("utf-8")
274 )
275 except (subprocess.CalledProcessError, FileNotFoundError):
276 return False, frozenset()
278 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level):
279 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method.
280 _error(
281 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue."
282 )
284 try:
285 r = subprocess.check_output(
286 ["git", "status", "--porcelain=v1", "-z"],
287 stderr=subprocess.DEVNULL,
288 cwd="debian",
289 )
290 except subprocess.CalledProcessError:
291 _error(
292 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway."
293 )
294 except FileNotFoundError:
295 _error(
296 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway."
297 )
299 untracked_files = set()
300 for status_marker, filename, second_filename in _read_git_status_lines(
301 r.split(b"\0")
302 ):
303 if status_marker not in ("??", "!!"):
304 _error(
305 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway."
306 )
307 # There is never a second file for `??` or `!!`
308 assert second_filename is None
309 untracked_files.add(filename)
310 return True, untracked_files
313def _check_vcs_clashes(
314 manifest: HighLevelManifest,
315 migrations: list[FeatureMigration],
316 untracked_files: Container[str],
317) -> None:
318 all_affected_files = []
319 if any(m.successful_manifest_changes for m in migrations):
320 all_affected_files.append(manifest.manifest_path)
322 for previous_path, new_path in (
323 p for m in migrations for p in m.rename_paths_on_success
324 ):
325 all_affected_files.append(previous_path)
326 all_affected_files.append(new_path)
328 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success)
330 clashing_paths = [p for p in all_affected_files if p in untracked_files]
332 if clashing_paths:
333 print(file=sys.stderr)
334 _warn(
335 "The following untracked or ignored paths would be affected by this migration"
336 )
337 _warn("")
338 for path in clashing_paths:
339 _warn(f" * {path}")
340 _warn("")
341 _warn("Since they are not tracked, the VCS cannot undo the changes to them")
342 _warn("if the migration continued")
343 print(file=sys.stderr)
344 _error(
345 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs"
346 )
349def perform_migration(
350 fo: IOBasedOutputStyling,
351 manifest: HighLevelManifest,
352 acceptable_migration_issues: AcceptableMigrationIssues,
353 permit_destructive_changes: bool | None,
354 migration_target: DebputyIntegrationMode,
355 manifest_parser_factory: Callable[[str], HighLevelManifest],
356 migrators: list[Migrator],
357 *,
358 ignore_vcs: bool = False,
359) -> None:
360 migrations = []
361 compat = _dh_compat_level()
362 if compat is None:
363 _error(
364 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")'
365 )
367 debian_dir = manifest.debian_dir
368 mutable_manifest = assume_not_none(manifest.mutable_manifest)
370 if not ignore_vcs:
371 uses_git, untracked_files = _git_status()
372 else:
373 uses_git = False
374 untracked_files = frozenset()
376 if permit_destructive_changes is None and uses_git:
377 permit_destructive_changes = True
379 migration_request = MigrationRequest(
380 debian_dir,
381 manifest,
382 acceptable_migration_issues,
383 migration_target,
384 )
386 try:
387 for migrator in migrators:
388 feature_migration = FeatureMigration(migrator.__name__, fo)
389 migrator(migration_request, feature_migration)
390 migrations.append(feature_migration)
391 except CannotEmulateExecutableDHConfigFile as e:
392 _error(
393 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}"
394 )
395 except UnsupportedFeature as e:
396 msg = (
397 f"Unable to migrate automatically due to missing features in debputy. The feature is:"
398 f"\n\n * {e.message}"
399 )
400 keys = e.issue_keys
401 if keys:
402 primary_key = keys[0]
403 alt_keys = ""
404 if len(keys) > 1:
405 alt_keys = (
406 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some'
407 " of these may cover more cases."
408 )
409 msg += (
410 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again."
411 " However, you should only do that if you believe you can replace the functionality manually"
412 f" or the usage is obsolete / can be removed. {alt_keys}"
413 )
414 _error(msg)
415 except ConflictingChange as e:
416 _error(
417 "The migration tool detected a conflict data being migrated and data already migrated / in the existing"
418 "manifest."
419 f"\n\n * {e.message}"
420 "\n\nPlease review the situation and resolve the conflict manually."
421 )
423 min_compat = max(
424 (m.assumed_compat for m in migrations if m.assumed_compat is not None),
425 default=0,
426 )
428 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues:
429 # The migration summary special-cases the compat mismatch and warns for us.
430 _error(
431 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on"
432 f" compat {compat}. This may lead to incorrect result."
433 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and"
434 f" try again, if you want to continue regardless."
435 )
437 requested_plugins = _requested_debputy_plugins(debian_dir)
438 required_plugins: set[str] = set()
439 required_plugins.update(
440 chain.from_iterable(
441 m.required_plugins for m in migrations if m.required_plugins
442 )
443 )
445 if uses_git:
446 _check_vcs_clashes(manifest, migrations, untracked_files)
448 _print_migration_summary(
449 fo,
450 migrations,
451 compat,
452 min_compat,
453 required_plugins,
454 requested_plugins,
455 )
456 migration_count = sum((m.performed_changes for m in migrations), 0)
458 if not migration_count:
459 _info(
460 "debputy was not able to find any (supported) migrations that it could perform for you."
461 )
462 return
464 if any(m.successful_manifest_changes for m in migrations):
465 new_manifest_path = manifest.manifest_path + ".new"
467 with open(new_manifest_path, "w") as fd:
468 mutable_manifest.write_to(fd)
470 try:
471 _info("Verifying the new manifest")
472 manifest_parser_factory(new_manifest_path)
473 except ManifestParseException as e:
474 raise AssertionError(
475 "Could not parse the manifest generated from the migrator"
476 ) from e
478 if permit_destructive_changes:
479 if os.path.isfile(manifest.manifest_path):
480 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig")
481 os.rename(new_manifest_path, manifest.manifest_path)
482 if uses_git:
483 subprocess.check_call(["git", "add", manifest.manifest_path])
484 _info(f"Updated manifest {manifest.manifest_path}")
485 else:
486 _info(
487 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"'
488 " to activate it)"
489 )
490 else:
491 _info("No manifest changes detected; skipping update of manifest.")
493 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0)
494 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0)
496 if renames:
497 if permit_destructive_changes:
498 _info("Paths being renamed:")
499 else:
500 _info("Migration *would* rename the following paths:")
501 prefix = "git " if uses_git else ""
502 for previous_path, new_path in (
503 p for m in migrations for p in m.rename_paths_on_success
504 ):
505 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}")
507 if removals:
508 if permit_destructive_changes:
509 _info("Removals:")
510 else:
511 _info("Migration *would* remove the following files:")
512 cmd = "git rm" if uses_git else "rm -f"
513 for path in (p for m in migrations for p in m.remove_paths_on_success):
514 _info(f" {cmd} {escape_shell(path)}")
516 if permit_destructive_changes is None:
517 print()
518 _info(
519 "If you would like to perform the migration, please re-run with --apply-changes."
520 )
521 elif permit_destructive_changes:
522 _apply_renames_and_removals(migrations, uses_git)
524 print()
525 _info("Migrations performed successfully")
526 print()
527 _info(
528 "Remember to validate the resulting binary packages after rebuilding with debputy"
529 )
530 if uses_git:
531 _info(
532 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them."
533 )
534 else:
535 print()
536 _info("No migrations performed as requested")
539def _apply_renames_and_removals(
540 migrations: list[FeatureMigration],
541 uses_git: bool,
542) -> None:
543 for previous_path, new_path in (
544 p for m in migrations for p in m.rename_paths_on_success
545 ):
546 if uses_git:
547 subprocess.check_call(["git", "mv", previous_path, new_path])
548 else:
549 os.rename(previous_path, new_path)
551 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success]
552 if uses_git and files_being_removed:
553 command = ["git", "rm"]
554 command.extend(files_being_removed)
555 subprocess.check_call(command)
556 elif files_being_removed:
557 for path in (p for m in migrations for p in m.remove_paths_on_success):
558 os.unlink(path)