Coverage for src/debputy/dh_migration/migration.py: 7%
289 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-15 18:18 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-15 18:18 +0000
1import json
2import os
3import re
4import subprocess
5import sys
6from itertools import chain
7from typing import (
8 Optional,
9 List,
10 Callable,
11 Set,
12 Mapping,
13 FrozenSet,
14 Tuple,
15 Iterable,
16 Container,
17)
19from debian.deb822 import Deb822
21from debputy.commands.debputy_cmd.context import CommandContext
22from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
23from debputy.dh.debhelper_emulation import CannotEmulateExecutableDHConfigFile
24from debputy.dh.dh_assistant import read_dh_addon_sequences
25from debputy.dh_migration.migrators import Migrator, MigrationTarget
26from debputy.dh_migration.migrators_impl import (
27 INTEGRATION_MODE_DH_DEBPUTY,
28 INTEGRATION_MODE_DH_DEBPUTY_RRR,
29)
30from debputy.dh_migration.models import (
31 FeatureMigration,
32 AcceptableMigrationIssues,
33 UnsupportedFeature,
34 ConflictingChange,
35)
36from debputy.highlevel_manifest import HighLevelManifest
37from debputy.integration_detection import determine_debputy_integration_mode
38from debputy.manifest_parser.exceptions import ManifestParseException
39from debputy.plugin.api import VirtualPath
40from debputy.plugin.api.spec import DebputyIntegrationMode, INTEGRATION_MODE_FULL
41from debputy.util import _error, _warn, _info, escape_shell, assume_not_none
43SUPPORTED_MIGRATIONS: Mapping[MigrationTarget, FrozenSet[MigrationTarget]] = {
44 INTEGRATION_MODE_FULL: frozenset([INTEGRATION_MODE_FULL]),
45 INTEGRATION_MODE_DH_DEBPUTY: frozenset(
46 [
47 "dh-single-to-multi-binary",
48 "dh-package-prefixed-config-files",
49 INTEGRATION_MODE_DH_DEBPUTY,
50 INTEGRATION_MODE_FULL,
51 ]
52 ),
53 INTEGRATION_MODE_DH_DEBPUTY_RRR: frozenset(
54 [
55 "dh-single-to-multi-binary",
56 "dh-package-prefixed-config-files",
57 INTEGRATION_MODE_DH_DEBPUTY_RRR,
58 INTEGRATION_MODE_DH_DEBPUTY,
59 INTEGRATION_MODE_FULL,
60 ]
61 ),
62}
65def _print_migration_summary(
66 fo: IOBasedOutputStyling,
67 migrations: List[FeatureMigration],
68 compat: int,
69 min_compat_level: int,
70 required_plugins: Set[str],
71 requested_plugins: Optional[Set[str]],
72) -> None:
73 warning_count = 0
75 for migration in migrations:
76 if not migration.anything_to_do:
77 continue
78 underline = "-" * len(migration.tagline)
79 if migration.warnings:
80 if warning_count:
81 _warn("")
82 _warn(f"Summary for migration: {migration.tagline}")
83 if not fo.optimize_for_screen_reader:
84 _warn(f"-----------------------{underline}")
85 warning_count += len(migration.warnings)
86 for warning in migration.warnings:
87 _warn(f" * {warning}")
89 if compat < min_compat_level:
90 if warning_count:
91 _warn("")
92 _warn("Supported debhelper compat check")
93 if not fo.optimize_for_screen_reader:
94 _warn("--------------------------------")
95 warning_count += 1
96 _warn(
97 f"The migration tool assumes debhelper compat {min_compat_level}+ semantics, but this package"
98 f" is using compat {compat}. Consider upgrading the package to compat {min_compat_level}"
99 " first."
100 )
102 if required_plugins:
103 if requested_plugins is None:
104 warning_count += 1
105 needed_plugins = ", ".join(f"debputy-plugin-{n}" for n in required_plugins)
106 if warning_count:
107 _warn("")
108 _warn("Missing debputy plugin check")
109 if not fo.optimize_for_screen_reader:
110 _warn("----------------------------")
111 _warn(
112 f"The migration tool could not read d/control and therefore cannot tell if all the required"
113 f" plugins have been requested. Please ensure that the package Build-Depends on: {needed_plugins}"
114 )
115 else:
116 missing_plugins = required_plugins - requested_plugins
117 if missing_plugins:
118 warning_count += 1
119 needed_plugins = ", ".join(
120 f"debputy-plugin-{n}" for n in missing_plugins
121 )
122 if warning_count:
123 _warn("")
124 _warn("Missing debputy plugin check")
125 if not fo.optimize_for_screen_reader:
126 _warn("----------------------------")
127 _warn(
128 f"The migration tool asserted that the following `debputy` plugins would be required, which"
129 f" are not explicitly requested. Please add the following to Build-Depends: {needed_plugins}"
130 )
132 if warning_count:
133 _warn("")
134 _warn(
135 f"/!\\ Total number of warnings or manual migrations required: {warning_count}"
136 )
139def _dh_compat_level() -> Optional[int]:
140 try:
141 res = subprocess.check_output(
142 ["dh_assistant", "active-compat-level"], stderr=subprocess.DEVNULL
143 )
144 except subprocess.CalledProcessError:
145 compat = None
146 else:
147 try:
148 compat = json.loads(res)["declared-compat-level"]
149 except RuntimeError:
150 compat = None
151 else:
152 if not isinstance(compat, int):
153 compat = None
154 return compat
157def _requested_debputy_plugins(debian_dir: VirtualPath) -> Optional[Set[str]]:
158 ctrl_file = debian_dir.get("control")
159 if not ctrl_file:
160 return None
162 dep_regex = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
163 plugins = set()
165 with ctrl_file.open() as fd:
166 ctrl = list(Deb822.iter_paragraphs(fd))
167 source_paragraph = ctrl[0] if ctrl else {}
169 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
170 field = source_paragraph.get(f)
171 if not field:
172 continue
174 for dep_clause in (d.strip() for d in field.split(",")):
175 match = dep_regex.match(dep_clause.strip())
176 if not match:
177 continue
178 dep = match.group(1)
179 if not dep.startswith("debputy-plugin-"):
180 continue
181 plugins.add(dep[15:])
182 return plugins
185def _check_migration_target(
186 context: CommandContext,
187 migration_target: Optional[DebputyIntegrationMode],
188) -> DebputyIntegrationMode:
189 r = read_dh_addon_sequences(context.debian_dir)
190 if r is not None:
191 bd_sequences, dr_sequences, _ = r
192 all_sequences = bd_sequences | dr_sequences
193 detected_migration_target = determine_debputy_integration_mode(
194 context.source_package().fields,
195 all_sequences,
196 )
197 else:
198 detected_migration_target = None
200 if migration_target is not None and detected_migration_target is not None:
201 supported_migrations = SUPPORTED_MIGRATIONS.get(
202 detected_migration_target,
203 frozenset([detected_migration_target]),
204 )
206 if (
207 migration_target != detected_migration_target
208 and migration_target not in supported_migrations
209 ):
210 _error(
211 f"Cannot migrate apply migration {migration_target} as it conflicts with the current state (which is {detected_migration_target})"
212 )
214 if migration_target is not None:
215 resolved_migration_target = migration_target
216 _info(f'Using "{resolved_migration_target}" as migration (requested)')
217 else:
218 if detected_migration_target is not None:
219 _info(
220 f'Using "{detected_migration_target}" as migration (based on the packaging)'
221 )
222 else:
223 detected_migration_target = INTEGRATION_MODE_DH_DEBPUTY
224 _info(
225 f'Using "{detected_migration_target}" as default migration target. Use --migration-target to choose!'
226 )
227 resolved_migration_target = detected_migration_target
229 return resolved_migration_target
232def _read_git_status_lines(
233 lines: Iterable[bytes],
234) -> Iterable[Tuple[str, str, Optional[str]]]:
235 line_iter = iter(lines)
236 while True:
237 try:
238 line = next(line_iter)
239 except StopIteration:
240 return
241 if not line:
242 # We get a final empty line
243 continue
244 if len(line) < 4:
245 _error(
246 f"Internal error: Got out of sync with the `git status --porcelain=v1 -z` output ({len(line)=})"
247 )
248 status_marker = line[0:2].decode("utf-8")
249 if line[2] != 32: # Space
250 _error(
251 "Internal error: Got out of sync with the `git status --porcelain=v1 -z` output (expected status + filename)"
252 )
253 if status_marker.startswith(("R", "C")) or status_marker.endswith(("R", "C")):
254 try:
255 second_filename = next(line_iter).decode("utf-8")
256 except StopIteration:
257 _error(
258 "Internal error: Expected one more line of output from `git status` but it was not there"
259 )
260 else:
261 second_filename = None
262 filename = line[3:].decode("utf-8")
263 yield status_marker, filename, second_filename
266def _git_status() -> Tuple[bool, Container[str]]:
267 try:
268 top_level = (
269 subprocess.check_output(
270 ["git", "rev-parse", "--show-toplevel"],
271 stderr=subprocess.DEVNULL,
272 cwd="debian",
273 )
274 .strip()
275 .decode("utf-8")
276 )
277 except (subprocess.CalledProcessError, FileNotFoundError):
278 return False, frozenset()
280 if os.path.realpath(os.getcwd()) != os.path.realpath(top_level):
281 # Patches welcome. The primary problem is to have the status output match the `_check_vcs_clashes` method.
282 _error(
283 f"Unsupported git repo: The `debputy` command only supports `git` when cwd is the git root (git root is: {top_level}). Please use --ignore-vcs to continue."
284 )
286 try:
287 r = subprocess.check_output(
288 ["git", "status", "--porcelain=v1", "-z"],
289 stderr=subprocess.DEVNULL,
290 cwd="debian",
291 )
292 except subprocess.CalledProcessError:
293 _error(
294 "The `git status --porcelain=v1 -z` command returned non-zero (note: the command outputs binary). Please use `--ignore-vcs` to continue anyway."
295 )
296 except FileNotFoundError:
297 _error(
298 "Could not run `git status` and there is a `.git` directory. Please use `--ignore-vcs` to continue anyway."
299 )
301 untracked_files = set()
302 for status_marker, filename, second_filename in _read_git_status_lines(
303 r.split(b"\0")
304 ):
305 if status_marker not in ("??", "!!"):
306 _error(
307 "The current git tree is not clean, please commit or stash any pending changes or use `--ignore-vcs` to continue anyway."
308 )
309 # There is never a second file for `??` or `!!`
310 assert second_filename is None
311 untracked_files.add(filename)
312 return True, untracked_files
315def _check_vcs_clashes(
316 manifest: HighLevelManifest,
317 migrations: List[FeatureMigration],
318 untracked_files: Container[str],
319) -> None:
320 all_affected_files = []
321 if any(m.successful_manifest_changes for m in migrations):
322 all_affected_files.append(manifest.manifest_path)
324 for previous_path, new_path in (
325 p for m in migrations for p in m.rename_paths_on_success
326 ):
327 all_affected_files.append(previous_path)
328 all_affected_files.append(new_path)
330 all_affected_files.extend(p for m in migrations for p in m.remove_paths_on_success)
332 clashing_paths = [p for p in all_affected_files if p in untracked_files]
334 if clashing_paths:
335 print(file=sys.stderr)
336 _warn(
337 "The following untracked or ignored paths would be affected by this migration"
338 )
339 _warn("")
340 for path in clashing_paths:
341 _warn(f" * {path}")
342 _warn("")
343 _warn("Since they are not tracked, the VCS cannot undo the changes to them")
344 _warn("if the migration continued")
345 print(file=sys.stderr)
346 _error(
347 "Please stash, commit or remove the above files before continuing. Alternatively, use --ignore-vcs"
348 )
351def perform_migration(
352 fo: IOBasedOutputStyling,
353 manifest: HighLevelManifest,
354 acceptable_migration_issues: AcceptableMigrationIssues,
355 permit_destructive_changes: Optional[bool],
356 migration_target: DebputyIntegrationMode,
357 manifest_parser_factory: Callable[[str], HighLevelManifest],
358 migrators: List[Migrator],
359 *,
360 ignore_vcs: bool = False,
361) -> None:
362 migrations = []
363 compat = _dh_compat_level()
364 if compat is None:
365 _error(
366 'Cannot detect declared compat level (try running "dh_assistant active-compat-level")'
367 )
369 debian_dir = manifest.debian_dir
370 mutable_manifest = assume_not_none(manifest.mutable_manifest)
372 if not ignore_vcs:
373 uses_git, untracked_files = _git_status()
374 else:
375 uses_git = False
376 untracked_files = frozenset()
378 if permit_destructive_changes is None and uses_git:
379 permit_destructive_changes = True
381 try:
382 for migrator in migrators:
383 feature_migration = FeatureMigration(migrator.__name__, fo)
384 migrator(
385 debian_dir,
386 manifest,
387 acceptable_migration_issues,
388 feature_migration,
389 migration_target,
390 )
391 migrations.append(feature_migration)
392 except CannotEmulateExecutableDHConfigFile as e:
393 _error(
394 f"Unable to process the executable dh config file {e.config_file().fs_path}: {e.message()}"
395 )
396 except UnsupportedFeature as e:
397 msg = (
398 f"Unable to migrate automatically due to missing features in debputy. The feature is:"
399 f"\n\n * {e.message}"
400 )
401 keys = e.issue_keys
402 if keys:
403 primary_key = keys[0]
404 alt_keys = ""
405 if len(keys) > 1:
406 alt_keys = (
407 f' Alternatively you can also use one of: {", ".join(keys[1:])}. Please note that some'
408 " of these may cover more cases."
409 )
410 msg += (
411 f"\n\nUse --acceptable-migration-issues={primary_key} to convert this into a warning and try again."
412 " However, you should only do that if you believe you can replace the functionality manually"
413 f" or the usage is obsolete / can be removed. {alt_keys}"
414 )
415 _error(msg)
416 except ConflictingChange as e:
417 _error(
418 "The migration tool detected a conflict data being migrated and data already migrated / in the existing"
419 "manifest."
420 f"\n\n * {e.message}"
421 "\n\nPlease review the situation and resolve the conflict manually."
422 )
424 # We start on compat 12 for arch:any due to the new dh_makeshlibs and dh_installinit default
425 # For arch:any, the min is compat 14 due to `dh_dwz` being removed.
426 min_compat = 14 if any(not p.is_arch_all for p in manifest.all_packages) else 12
427 min_compat = max(
428 (m.assumed_compat for m in migrations if m.assumed_compat is not None),
429 default=min_compat,
430 )
432 if compat < min_compat and "min-compat-level" not in acceptable_migration_issues:
433 # The migration summary special-cases the compat mismatch and warns for us.
434 _error(
435 f"The migration tool assumes debhelper compat {min_compat} or later but the package is only on"
436 f" compat {compat}. This may lead to incorrect result."
437 f"\n\nUse --acceptable-migration-issues=min-compat-level to convert this into a warning and"
438 f" try again, if you want to continue regardless."
439 )
441 requested_plugins = _requested_debputy_plugins(debian_dir)
442 required_plugins: Set[str] = set()
443 required_plugins.update(
444 chain.from_iterable(
445 m.required_plugins for m in migrations if m.required_plugins
446 )
447 )
449 if uses_git:
450 _check_vcs_clashes(manifest, migrations, untracked_files)
452 _print_migration_summary(
453 fo,
454 migrations,
455 compat,
456 min_compat,
457 required_plugins,
458 requested_plugins,
459 )
460 migration_count = sum((m.performed_changes for m in migrations), 0)
462 if not migration_count:
463 _info(
464 "debputy was not able to find any (supported) migrations that it could perform for you."
465 )
466 return
468 if any(m.successful_manifest_changes for m in migrations):
469 new_manifest_path = manifest.manifest_path + ".new"
471 with open(new_manifest_path, "w") as fd:
472 mutable_manifest.write_to(fd)
474 try:
475 _info("Verifying the new manifest")
476 manifest_parser_factory(new_manifest_path)
477 except ManifestParseException as e:
478 raise AssertionError(
479 "Could not parse the manifest generated from the migrator"
480 ) from e
482 if permit_destructive_changes:
483 if os.path.isfile(manifest.manifest_path):
484 os.rename(manifest.manifest_path, manifest.manifest_path + ".orig")
485 os.rename(new_manifest_path, manifest.manifest_path)
486 if uses_git:
487 subprocess.check_call(["git", "add", manifest.manifest_path])
488 _info(f"Updated manifest {manifest.manifest_path}")
489 else:
490 _info(
491 f'Created draft manifest "{new_manifest_path}" (rename to "{manifest.manifest_path}"'
492 " to activate it)"
493 )
494 else:
495 _info("No manifest changes detected; skipping update of manifest.")
497 removals: int = sum((len(m.remove_paths_on_success) for m in migrations), 0)
498 renames: int = sum((len(m.rename_paths_on_success) for m in migrations), 0)
500 if renames:
501 if permit_destructive_changes:
502 _info("Paths being renamed:")
503 else:
504 _info("Migration *would* rename the following paths:")
505 prefix = "git " if uses_git else ""
506 for previous_path, new_path in (
507 p for m in migrations for p in m.rename_paths_on_success
508 ):
509 _info(f" {prefix}mv {escape_shell(previous_path, new_path)}")
511 if removals:
512 if permit_destructive_changes:
513 _info("Removals:")
514 else:
515 _info("Migration *would* remove the following files:")
516 cmd = "git rm" if uses_git else "rm -f"
517 for path in (p for m in migrations for p in m.remove_paths_on_success):
518 _info(f" {cmd} {escape_shell(path)}")
520 if permit_destructive_changes is None:
521 print()
522 _info(
523 "If you would like to perform the migration, please re-run with --apply-changes."
524 )
525 elif permit_destructive_changes:
526 _apply_renames_and_removals(migrations, uses_git)
528 print()
529 _info("Migrations performed successfully")
530 print()
531 _info(
532 "Remember to validate the resulting binary packages after rebuilding with debputy"
533 )
534 if uses_git:
535 _info(
536 "Use `git commit` to commit the changes or use `git reset` / `git restore` to undo them."
537 )
538 else:
539 print()
540 _info("No migrations performed as requested")
543def _apply_renames_and_removals(
544 migrations: List[FeatureMigration],
545 uses_git: bool,
546) -> None:
547 for previous_path, new_path in (
548 p for m in migrations for p in m.rename_paths_on_success
549 ):
550 if uses_git:
551 subprocess.check_call(["git", "mv", previous_path, new_path])
552 else:
553 os.rename(previous_path, new_path)
555 files_being_removed = [p for m in migrations for p in m.remove_paths_on_success]
556 if uses_git and files_being_removed:
557 command = ["git", "rm"]
558 command.extend(files_being_removed)
559 subprocess.check_call(command)
560 elif files_being_removed:
561 for path in (p for m in migrations for p in m.remove_paths_on_success):
562 os.unlink(path)