Coverage for src/debputy/dh_migration/migration.py: 7%
288 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import json
2import os
3import re
4import subprocess
5from itertools import chain
6from typing import (
7 Optional,
8 List,
9 Callable,
10 Set,
11 Mapping,
12 FrozenSet,
13 Tuple,
14 Iterable,
15 Container,
16)
18from debian.deb822 import Deb822
20from debputy.commands.debputy_cmd.context import CommandContext
21from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
22from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile
23from debputy.dh.dh_assistant import read_dh_addon_sequences
24from debputy.dh_migration.migrators import Migrator, MigrationTarget
25from debputy.dh_migration.migrators_impl import (
26 INTEGRATION_MODE_DH_DEBPUTY,
27 INTEGRATION_MODE_DH_DEBPUTY_RRR,
28)
29from debputy.dh_migration.models import (
30 FeatureMigration,
31 AcceptableMigrationIssues,
32 UnsupportedFeature,
33 ConflictingChange,
34)
35from debputy.highlevel_manifest import HighLevelManifest
36from debputy.integration_detection import determine_debputy_integration_mode
37from debputy.manifest_parser.exceptions import ManifestParseException
38from debputy.plugin.api import VirtualPath
39from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL
40from debputy.util import _error, _warn, _info, escape_shell, assume_not_none
42SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, FrozenSet[MigrationTarget]] = {
43 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]),
44 INTEGRATION_MODE_DH_DEBPUTY: frozenset(
45 [
46 "dh-single-to-multi-binary",
47 "dh-package-prefixed-config-files",
48 INTEGRATION_MODE_DH_DEBPUTY,
49 INTEGRATION_MODE_FULL,
50 ]
51 ),
52 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset(
53 [
54 "dh-single-to-multi-binary",
55 "dh-package-prefixed-config-files",
56 INTEGRATION_MODE_DH_DEBPUTY_RRR,
57 INTEGRATION_MODE_DH_DEBPUTY,
58 INTEGRATION_MODE_FULL,
59 ]
60 ),
61}
64def _print_migration_summary(
65 fo: IOBasedOutputStyling,
66 migrations: List[FeatureMigration],
67 compat: int,
68 min_compat_level: int,
69 required_plugins: Set[str],
70 requested_plugins: Optional[Set[str]],
71) -> None:
72 warning_count = 0
74 for migration in migrations:
75 if not migration.anything_to_do:
76 continue
77 underline = "-" * len(migration.tagline)
78 if migration.warnings:
79 if warning_count:
80 _warn("")
81 _warn(f"Summary for migration: {migration.tagline}")
82 if not fo.optimize_for_screen_reader:
83 _warn(f"-----------------------{underline}")
84 warning_count += len(migration.warnings)
85 for warning in migration.warnings:
86 _warn(f" * {warning}")
88 if compat < min_compat_level:
89 if warning_count:
90 _warn("")
91 _warn("Supported debhelper compat check")
92 if not fo.optimize_for_screen_reader:
93 _warn("--------------------------------")
94 warning_count += 1
95 _warn(
96 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package"
97 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}"
98 " first."
99 )
101 if required_plugins:
102 if requested_plugins is None:
103 warning_count += 1
104 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins)
105 if warning_count:
106 _warn("")
107 _warn("Missing debputy plugin check")
108 if not fo.optimize_for_screen_reader:
109 _warn("----------------------------")
110 _warn(
111 f"The migration tool could not read d/control and therefore cannot tell if all the required"
112 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}"
113 )
114 else:
115 missing_plugins = required_plugins - requested_plugins
116 if missing_plugins:
117 warning_count += 1
118 needed_plugins = ", ".join(
119 f"debputy-plugin-{n}" for n in missing_plugins
120 )
121 if warning_count:
122 _warn("")
123 _warn("Missing debputy plugin check")
124 if not fo.optimize_for_screen_reader:
125 _warn("----------------------------")
126 _warn(
127 f"The migration tool asserted that the following `debputy` plugins would be required, which"
128 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}"
129 )
131 if warning_count:
132 _warn("")
133 _warn(
134 f"/!\\ Total number of warnings or manual migrations required: {warning_count}"
135 )
138def _dh_compat_level() -> Optional[int]:
139 try:
140 res = subprocess.check_output(
141 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL
142 )
143 except subprocess.CalledProcessError:
144 compat = None
145 else:
146 try:
147 compat = json.loads(res)["declared-compat-level"]
148 except RuntimeError:
149 compat = None
150 else:
151 if not isinstance(compat, int):
152 compat = None
153 return compat
156def _requested_debputy_plugins(debian_dir: VirtualPath) -> Optional[Set[str]]:
157 ctrl_file = debian_dir.get("control")
158 if not ctrl_file:
159 return None
161 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
162 plugins = set()
164 with ctrl_file.open() as fd:
165 ctrl = list(Deb822.iter_paragraphs(fd))
166 source_paragraph = ctrl[0] if ctrl else {}
168 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
169 field = source_paragraph.get(f)
170 if not field:
171 continue
173 for dep_clause in (d.strip() for d in field.split(",")):
174 match = dep_regex.match(dep_clause.strip())
175 if not match:
176 continue
177 dep = match.group(1)
178 if not dep.startswith("debputy-plugin-"):
179 continue
180 plugins.add(dep[15:])
181 return plugins
184def _check_migration_target(
185 context: CommandContext,
186 migration_target: Optional[DebputyIntegrationMode],
187) -> DebputyIntegrationMode:
188 r = read_dh_addon_sequences(context.debian_dir)
189 if r is not None:
190 bd_sequences, dr_sequences, _ = r
191 all_sequences = bd_sequences | dr_sequences
192 detected_migration_target = determine_debputy_integration_mode(
193 context.source_package().fields,
194 all_sequences,
195 )
196 else:
197 detected_migration_target = None
199 if migration_target is not None and detected_migration_target is not None:
200 supported_migrations = SUPPORTED_MIGRATIONS.get(
201 detected_migration_target,
202 frozenset([detected_migration_target]),
203 )
205 if (
206 migration_target != detected_migration_target
207 and migration_target not in supported_migrations
208 ):
209 _error(
210 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})"
211 )
213 if migration_target is not None:
214 resolved_migration_target = migration_target
215 _info(f'Using "{resolved_migration_target}" as migration (requested)')
216 else:
217 if detected_migration_target is not None:
218 _info(
219 f'Using "{detected_migration_target}" as migration (based on the packaging)'
220 )
221 else:
222 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY
223 _info(
224 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!'
225 )
226 resolved_migration_target = detected_migration_target
228 return resolved_migration_target
231def _read_git_status_lines(
232 lines: Iterable[bytes],
233) -> Iterable[Tuple[str, str, Optional[str]]]:
234 line_iter = iter(lines)
235 while True:
236 try:
237 line = next(line_iter)
238 except StopIteration:
239 return
240 if not line:
241 # We get a final empty line
242 continue
243 if len(line) < 4:
244 _error(
245 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})"
246 )
247 status_marker = line[0:2].decode("utf-8")
248 if line[2] != 32: # Space
249 _error(
250 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)"
251 )
252 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")):
253 try:
254 second_filename = next(line_iter).decode("utf-8")
255 except StopIteration:
256 _error(
257 "Internal error: Expected one more line of output from `git status` but it was not there"
258 )
259 else:
260 second_filename = None
261 filename = line[3:].decode("utf-8")
262 yield status_marker, filename, second_filename
265def _git_status() -> Tuple[bool, Container[str]]:
266 try:
267 top_level = (
268 subprocess.check_output(
269 ["git", "rev-parse", "--show-toplevel"],
270 stderr=subprocess.DEVNULL,
271 cwd="debian",
272 )
273 .strip()
274 .decode("utf-8")
275 )
276 except (subprocess.CalledProcessError, FileNotFoundError):
277 return False, frozenset()
279 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level):
280 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method.
281 _error(
282 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue."
283 )
285 try:
286 r = subprocess.check_output(
287 ["git", "status", "--porcelain=v1", "-z"],
288 stderr=subprocess.DEVNULL,
289 cwd="debian",
290 )
291 except subprocess.CalledProcessError:
292 _error(
293 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway."
294 )
295 except FileNotFoundError:
296 _error(
297 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway."
298 )
300 untracked_files = set()
301 for status_marker, filename, second_filename in _read_git_status_lines(
302 r.split(b"\0")
303 ):
304 if status_marker not in ("??", "!!"):
305 _error(
306 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway."
307 )
308 # There is never a second file for `??` or `!!`
309 assert second_filename is None
310 untracked_files.add(filename)
311 return True, untracked_files
314def _check_vcs_clashes(
315 manifest: HighLevelManifest,
316 migrations: List[FeatureMigration],
317 untracked_files: Container[str],
318) -> None:
319 all_affected_files = []
320 if any(m.successful_manifest_changes for m in migrations):
321 all_affected_files.append(manifest.manifest_path)
323 for previous_path, new_path in (
324 p for m in migrations for p in m.rename_paths_on_success
325 ):
326 all_affected_files.append(previous_path)
327 all_affected_files.append(new_path)
329 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success)
331 clashing_paths = [p for p in all_affected_files if p in untracked_files]
333 if clashing_paths:
334 print()
335 _warn(
336 "The following untracked or ignored paths would be affected by this migration"
337 )
338 _warn("")
339 for path in clashing_paths:
340 _warn(f" * {path}")
341 _warn("")
342 _warn("Since they are not tracked, the VCS cannot undo the changes to them")
343 _warn("if the migration continued")
344 print()
345 _error(
346 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs"
347 )
350def perform_migration(
351 fo: IOBasedOutputStyling,
352 manifest: HighLevelManifest,
353 acceptable_migration_issues: AcceptableMigrationIssues,
354 permit_destructive_changes: Optional[bool],
355 migration_target: DebputyIntegrationMode,
356 manifest_parser_factory: Callable[[str], HighLevelManifest],
357 migrators: List[Migrator],
358 *,
359 ignore_vcs: bool = False,
360) -> None:
361 migrations = []
362 compat = _dh_compat_level()
363 if compat is None:
364 _error(
365 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")'
366 )
368 debian_dir = manifest.debian_dir
369 mutable_manifest = assume_not_none(manifest.mutable_manifest)
371 if not ignore_vcs:
372 uses_git, untracked_files = _git_status()
373 else:
374 uses_git = False
375 untracked_files = frozenset()
377 if permit_destructive_changes is None and uses_git:
378 permit_destructive_changes = True
380 try:
381 for migrator in migrators:
382 feature_migration = FeatureMigration(migrator.__name__, fo)
383 migrator(
384 debian_dir,
385 manifest,
386 acceptable_migration_issues,
387 feature_migration,
388 migration_target,
389 )
390 migrations.append(feature_migration)
391 except CannotEmulateExecutableDHConfigFile as e:
392 _error(
393 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}"
394 )
395 except UnsupportedFeature as e:
396 msg = (
397 f"Unable to migrate automatically due to missing features in debputy. The feature is:"
398 f"\n\n * {e.message}"
399 )
400 keys = e.issue_keys
401 if keys:
402 primary_key = keys[0]
403 alt_keys = ""
404 if len(keys) > 1:
405 alt_keys = (
406 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some'
407 " of these may cover more cases."
408 )
409 msg += (
410 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again."
411 " However, you should only do that if you believe you can replace the functionality manually"
412 f" or the usage is obsolete / can be removed. {alt_keys}"
413 )
414 _error(msg)
415 except ConflictingChange as e:
416 _error(
417 "The migration tool detected a conflict data being migrated and data already migrated / in the existing"
418 "manifest."
419 f"\n\n * {e.message}"
420 "\n\nPlease review the situation and resolve the conflict manually."
421 )
423 # We start on compat 12 for arch:any due to the new dh_makeshlibs and dh_installinit default
424 # For arch:any, the min is compat 14 due to `dh_dwz` being removed.
425 min_compat = 14 if any(not p.is_arch_all for p in manifest.all_packages) else 12
426 min_compat = max(
427 (m.assumed_compat for m in migrations if m.assumed_compat is not None),
428 default=min_compat,
429 )
431 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues:
432 # The migration summary special-cases the compat mismatch and warns for us.
433 _error(
434 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on"
435 f" compat {compat}. This may lead to incorrect result."
436 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and"
437 f" try again, if you want to continue regardless."
438 )
440 requested_plugins = _requested_debputy_plugins(debian_dir)
441 required_plugins: Set[str] = set()
442 required_plugins.update(
443 chain.from_iterable(
444 m.required_plugins for m in migrations if m.required_plugins
445 )
446 )
448 if uses_git:
449 _check_vcs_clashes(manifest, migrations, untracked_files)
451 _print_migration_summary(
452 fo,
453 migrations,
454 compat,
455 min_compat,
456 required_plugins,
457 requested_plugins,
458 )
459 migration_count = sum((m.performed_changes for m in migrations), 0)
461 if not migration_count:
462 _info(
463 "debputy was not able to find any (supported) migrations that it could perform for you."
464 )
465 return
467 if any(m.successful_manifest_changes for m in migrations):
468 new_manifest_path = manifest.manifest_path + ".new"
470 with open(new_manifest_path, "w") as fd:
471 mutable_manifest.write_to(fd)
473 try:
474 _info("Verifying the new manifest")
475 manifest_parser_factory(new_manifest_path)
476 except ManifestParseException as e:
477 raise AssertionError(
478 "Could not parse the manifest generated from the migrator"
479 ) from e
481 if permit_destructive_changes:
482 if os.path.isfile(manifest.manifest_path):
483 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig")
484 os.rename(new_manifest_path, manifest.manifest_path)
485 if uses_git:
486 subprocess.check_call(["git", "add", manifest.manifest_path])
487 _info(f"Updated manifest {manifest.manifest_path}")
488 else:
489 _info(
490 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"'
491 " to activate it)"
492 )
493 else:
494 _info("No manifest changes detected; skipping update of manifest.")
496 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0)
497 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0)
499 if renames:
500 if permit_destructive_changes:
501 _info("Paths being renamed:")
502 else:
503 _info("Migration *would* rename the following paths:")
504 prefix = "git " if uses_git else ""
505 for previous_path, new_path in (
506 p for m in migrations for p in m.rename_paths_on_success
507 ):
508 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}")
510 if removals:
511 if permit_destructive_changes:
512 _info("Removals:")
513 else:
514 _info("Migration *would* remove the following files:")
515 cmd = "git rm" if uses_git else "rm -f"
516 for path in (p for m in migrations for p in m.remove_paths_on_success):
517 _info(f" {cmd} {escape_shell(path)}")
519 if permit_destructive_changes is None:
520 print()
521 _info(
522 "If you would like to perform the migration, please re-run with --apply-changes."
523 )
524 elif permit_destructive_changes:
525 _apply_renames_and_removals(migrations, uses_git)
527 print()
528 _info("Migrations performed successfully")
529 print()
530 _info(
531 "Remember to validate the resulting binary packages after rebuilding with debputy"
532 )
533 if uses_git:
534 _info(
535 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them."
536 )
537 else:
538 print()
539 _info("No migrations performed as requested")
542def _apply_renames_and_removals(
543 migrations: List[FeatureMigration],
544 uses_git: bool,
545) -> None:
546 for previous_path, new_path in (
547 p for m in migrations for p in m.rename_paths_on_success
548 ):
549 if uses_git:
550 subprocess.check_call(["git", "mv", previous_path, new_path])
551 else:
552 os.rename(previous_path, new_path)
554 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success]
555 if uses_git and files_being_removed:
556 command = ["git", "rm"]
557 command.extend(files_being_removed)
558 subprocess.check_call(command)
559 elif files_being_removed:
560 for path in (p for m in migrations for p in m.remove_paths_on_success):
561 os.unlink(path)