Coverage for src/debputy/linting/lint_impl.py: 11%
377 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
1import dataclasses
2import difflib
3import os
4import stat
5import sys
6import textwrap
7from contextlib import suppress
8from typing import (
9 NoReturn,
10 TYPE_CHECKING,
11)
12from collections.abc import Mapping, Sequence, Callable
14from debputy.commands.debputy_cmd.context import CommandContext
15from debputy.commands.debputy_cmd.output import _output_styling, IOBasedOutputStyling
16from debputy.filesystem_scan import OSFSROOverlay
17from debputy.linting.lint_util import (
18 LintReport,
19 LintStateImpl,
20 FormatterImpl,
21 TermLintReport,
22 LintDiagnosticResultState,
23 AsyncLinterImpl,
24)
25from debputy.lsp.config.debputy_config import DebputyConfig, load_debputy_config
26from debputy.lsp.diagnostics import DiagnosticData
27from debputy.lsp.lsp_features import CLI_FORMAT_FILE_HANDLERS, CLI_DIAGNOSTIC_HANDLERS
28from debputy.lsp.maint_prefs import (
29 MaintainerPreferenceTable,
30 EffectiveFormattingPreference,
31 determine_effective_preference,
32)
33from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_lint
34from debputy.lsp.spellchecking import disable_spellchecking
35from debputy.lsp.text_edit import (
36 get_well_formatted_edit,
37 merge_sort_text_edits,
38 apply_text_edits,
39 OverLappingTextEditException,
40)
41from debian._deb822_repro import (
42 Deb822FileElement,
43 Deb822ParagraphElement,
44)
45from debputy.packages import SourcePackage, BinaryPackage
46from debputy.plugin.api import VirtualPath
47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
48from debputy.util import _warn, _error, _info
49from debputy.yaml import MANIFEST_YAML, YAMLError
50from debputy.yaml.compat import CommentedMap
52if TYPE_CHECKING:
53 import lsprotocol.types as types
54else:
55 import debputy.lsprotocol.types as types
58LINTER_FORMATS = CLI_DIAGNOSTIC_HANDLERS
59REFORMAT_FORMATS = CLI_FORMAT_FILE_HANDLERS
62@dataclasses.dataclass(slots=True)
63class LintContext:
64 plugin_feature_set: PluginProvidedFeatureSet
65 maint_preference_table: MaintainerPreferenceTable
66 source_root: VirtualPath | None
67 debian_dir: VirtualPath | None
68 debputy_config: DebputyConfig
69 parsed_deb822_file_content: Deb822FileElement | None = None
70 source_package: SourcePackage | None = None
71 binary_packages: Mapping[str, BinaryPackage] | None = None
72 effective_preference: EffectiveFormattingPreference | None = None
73 style_tool: str | None = None
74 unsupported_preference_reason: str | None = None
75 salsa_ci: CommentedMap | None = None
77 def state_for(
78 self,
79 path: str,
80 content: str,
81 lines: list[str],
82 lint_implementation: AsyncLinterImpl | None,
83 ) -> LintStateImpl:
84 return LintStateImpl(
85 self.plugin_feature_set,
86 self.maint_preference_table,
87 self.source_root,
88 self.debian_dir,
89 path,
90 content,
91 lines,
92 self.debputy_config,
93 self.source_package,
94 self.binary_packages,
95 self.effective_preference,
96 lint_implementation,
97 )
100def gather_lint_info(context: CommandContext) -> LintContext:
101 source_root = OSFSROOverlay.create_root_dir(".", ".")
102 debian_dir = source_root.get("debian")
103 if debian_dir is not None and not debian_dir.is_dir:
104 debian_dir = None
105 lint_context = LintContext(
106 context.load_plugins(),
107 MaintainerPreferenceTable.load_preferences(),
108 source_root,
109 debian_dir,
110 load_debputy_config(),
111 )
112 try:
113 with open("debian/control") as fd:
114 deb822_file, source_package, binary_packages = (
115 context.dctrl_parser.parse_source_debian_control(fd, ignore_errors=True)
116 )
117 except FileNotFoundError:
118 source_package = None
119 else:
120 lint_context.parsed_deb822_file_content = deb822_file
121 lint_context.source_package = source_package
122 lint_context.binary_packages = binary_packages
123 salsa_ci_map: CommentedMap | None = None
124 for ci_file in ("debian/salsa-ci.yml", "debian/gitlab-ci.yml", ".gitlab-ci.yml"):
125 try:
126 with open(ci_file) as fd:
127 salsa_ci_map = MANIFEST_YAML.load(fd)
128 if not isinstance(salsa_ci_map, CommentedMap):
129 salsa_ci_map = None
130 break
131 except FileNotFoundError:
132 pass
133 except YAMLError:
134 break
135 if source_package is not None or salsa_ci_map is not None:
136 pref, tool, pref_reason = determine_effective_preference(
137 lint_context.maint_preference_table,
138 source_package,
139 salsa_ci_map,
140 )
141 lint_context.effective_preference = pref
142 lint_context.style_tool = tool
143 lint_context.unsupported_preference_reason = pref_reason
145 return lint_context
148def initialize_lint_report(context: CommandContext) -> LintReport:
149 lint_report_format = context.parsed_args.lint_report_format
150 report_output = context.parsed_args.report_output
152 if lint_report_format == "term":
153 fo = _output_styling(context.parsed_args, sys.stdout)
154 if report_output is not None:
155 _warn("--report-output is redundant for the `term` report")
156 return TermLintReport(fo)
157 if lint_report_format == "junit4-xml":
158 try:
159 import junit_xml
160 except ImportError:
161 _error(
162 "The `junit4-xml` report format requires `python3-junit.xml` to be installed"
163 )
165 from debputy.linting.lint_report_junit import JunitLintReport
167 if report_output is None:
168 report_output = "debputy-lint-junit.xml"
170 return JunitLintReport(report_output)
172 raise AssertionError(f"Missing case for lint_report_format: {lint_report_format}")
175async def perform_linting(context: CommandContext) -> None:
176 parsed_args = context.parsed_args
177 if not parsed_args.spellcheck:
178 disable_spellchecking()
179 linter_exit_code = parsed_args.linter_exit_code
180 lint_report = initialize_lint_report(context)
181 lint_context = gather_lint_info(context)
183 for name_stem in LINTER_FORMATS:
184 filename = f"./{name_stem}"
185 if not os.path.isfile(filename):
186 continue
187 await perform_linting_of_file(
188 lint_context,
189 filename,
190 name_stem,
191 context.parsed_args.auto_fix,
192 lint_report,
193 )
194 if lint_report.number_of_invalid_diagnostics:
195 _warn(
196 "Some diagnostics did not explicitly set severity. Please report the bug and include the output"
197 )
198 if lint_report.number_of_broken_diagnostics:
199 _error(
200 "Some sub-linters reported issues. Please report the bug and include the output"
201 )
203 if parsed_args.warn_about_check_manifest and os.path.isfile(
204 "debian/debputy.manifest"
205 ):
206 _info("Note: Due to a limitation in the linter, debian/debputy.manifest is")
207 _info("only **partially** checked by this command at the time of writing.")
208 _info("Please use `debputy check-manifest` to fully check the manifest.")
210 lint_report.finish_report()
212 if linter_exit_code:
213 _exit_with_lint_code(lint_report)
216def perform_reformat(
217 context: CommandContext,
218 *,
219 named_style: str | None = None,
220) -> None:
221 parsed_args = context.parsed_args
222 fo = _output_styling(context.parsed_args, sys.stdout)
223 lint_context = gather_lint_info(context)
224 write_style = parsed_args.write_style
226 if named_style is not None:
227 style = lint_context.maint_preference_table.named_styles.get(named_style)
228 if style is None:
229 styles = ", ".join(lint_context.maint_preference_table.named_styles)
230 _error(f'There is no style named "{style}". Options include: {styles}')
231 if (
232 lint_context.effective_preference is not None
233 and lint_context.effective_preference != style
234 ):
235 _info(
236 f'Note that the style "{named_style}" does not match the style that `debputy` was configured to use.'
237 )
238 _info("This may be a non-issue (if the configuration is out of date).")
239 lint_context.effective_preference = style
241 elif write_style:
242 _error("The `--write-style` option requires a named style passed via `--style`")
244 if lint_context.effective_preference is None:
245 if lint_context.unsupported_preference_reason is not None:
246 _warn(
247 "While `debputy` could identify a formatting for this package, it does not support it."
248 )
249 _warn(f"{lint_context.unsupported_preference_reason}")
250 if lint_context.style_tool is not None:
251 _info(
252 f"The following tool might be able to apply the style: {lint_context.style_tool}"
253 )
254 if parsed_args.supported_style_required:
255 _error(
256 "Sorry; `debputy` does not support the style. Use --unknown-or-unsupported-style-is-ok to make"
257 " this a non-error (note that `debputy` will not reformat the packaging in this case; just not"
258 " exit with an error code)."
259 )
260 else:
261 print(textwrap.dedent("""\
262 You can enable set a style by doing either of:
264 * You can set `X-Style: black` in the source stanza of `debian/control` to pick
265 `black` as the preferred style for this package.
266 - Note: `black` is an opinionated style that follows the spirit of the `black` code formatter
267 for Python.
268 - If you use `pre-commit`, then there is a formatting hook at
269 https://salsa.debian.org/debian/debputy-pre-commit-hooks
271 * If you use the Debian Salsa CI pipeline, then you can set SALSA_CI_DISABLE_WRAP_AND_SORT (`no`)
272 or SALSA_CI_ENABLE_WRAP_AND_SORT (`yes`) to the relevant value and `debputy` will pick up the
273 configuration from there.
274 - Note: The option must be in `.gitlab-ci.yml`, `debian/gitlab-ci.yml` or `debian/salsa-ci.yml`
275 to work. The Salsa CI pipeline will use `wrap-and-sort` while `debputy` uses its own emulation
276 of `wrap-and-sort` (`debputy` also needs to apply the style via `debputy lsp server`).
278 * The `debputy` code also comes with a built-in style database. This may be interesting for
279 packaging teams, so set a default team style that applies to all packages maintained by
280 that packaging team.
281 - Individuals can also add their style, which can useful for ad-hoc packaging teams, where
282 `debputy` will automatically apply a style if *all* co-maintainers agree to it.
284 Note the above list is an ordered list of how `debputy` determines which style to use in case
285 multiple options are available.
286 """))
287 if parsed_args.supported_style_required:
288 if lint_context.style_tool is not None:
289 _error(
290 "Sorry, `debputy reformat` does not support the packaging style. However, the"
291 f" formatting is supposedly handled by: {lint_context.style_tool}"
292 )
293 _error(
294 "Sorry; `debputy` does not know which style to use for this package. Please either set a"
295 "style or use --unknown-or-unsupported-style-is-ok to make this a non-error"
296 )
297 _info("")
298 _info(
299 "Doing nothing since no supported style could be identified as requested."
300 " See above how to set a style."
301 )
302 _info("Use --supported-style-is-required if this should be an error instead.")
303 sys.exit(0)
305 changes = False
306 auto_fix = context.parsed_args.auto_fix
307 modifiers = {}
308 if write_style:
310 def _commit_style(raw: str) -> str:
311 deb822_file, _, _ = context.dctrl_parser.parse_source_debian_control(
312 raw.splitlines(keepends=True), ignore_errors=True
313 )
315 for p in deb822_file.iter_parts():
316 if p.is_error:
317 _warn(
318 "Cannot commit the style due to syntax errors in debian/control. Fix these first."
319 )
320 return raw
321 if isinstance(p, Deb822ParagraphElement):
322 inserted = "X-Style" not in p
323 p["X-Style"] = named_style
324 # If we inserted the field, then we have opinions on its placement. But if it was already
325 # there, then we do not re-order it.
326 if inserted:
327 with suppress(KeyError):
328 p.order_before("X-Style", "Description")
329 break
331 return deb822_file.dump()
333 modifiers["debian/control"] = _commit_style
335 for name_stem in REFORMAT_FORMATS:
336 formatter = REFORMAT_FORMATS.get(name_stem)
337 filename = f"./{name_stem}"
338 if formatter is None or not os.path.isfile(filename):
339 continue
341 modifier = modifiers.get(name_stem)
343 reformatted = perform_reformat_of_file(
344 fo,
345 lint_context,
346 filename,
347 formatter,
348 auto_fix,
349 custom_modifier=modifier,
350 )
351 if reformatted:
352 changes = True
354 if changes and parsed_args.linter_exit_code:
355 sys.exit(2)
358def perform_reformat_of_file(
359 fo: IOBasedOutputStyling,
360 lint_context: LintContext,
361 filename: str,
362 formatter: FormatterImpl,
363 auto_fix: bool,
364 *,
365 custom_modifier: Callable[[str], str] | None = None,
366) -> bool:
367 with open(filename, encoding="utf-8") as fd:
368 text = fd.read()
370 lines = text.splitlines(keepends=True)
371 lint_state = lint_context.state_for(
372 filename,
373 text,
374 lines,
375 None,
376 )
377 edits = formatter(lint_state)
379 if not edits and not custom_modifier:
380 return False
382 if edits:
383 try:
384 replacement = apply_text_edits(text, lines, edits)
385 except OverLappingTextEditException:
386 _error(
387 f"The reformater for {filename} produced overlapping edits (which is broken and will not work)"
388 )
389 else:
390 replacement = text
392 if custom_modifier:
393 replacement = custom_modifier(replacement)
395 unified_diff = difflib.unified_diff(
396 text.splitlines(keepends=True),
397 replacement.splitlines(keepends=True),
398 fromfile=filename,
399 tofile=filename,
400 )
401 for line in unified_diff:
402 print(line, end="")
404 if auto_fix:
405 output_filename = f"{filename}.tmp"
406 with open(output_filename, "w", encoding="utf-8") as fd:
407 fd.write(replacement)
408 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
409 os.chmod(output_filename, orig_mode)
410 os.rename(output_filename, filename)
411 print(
412 fo.colored(
413 f"Reformatted {filename}.",
414 fg="green",
415 style="bold",
416 )
417 )
419 return True
422def _exit_with_lint_code(lint_report: LintReport) -> NoReturn:
423 diagnostics_count = lint_report.diagnostics_count
424 if (
425 diagnostics_count[types.DiagnosticSeverity.Error]
426 or diagnostics_count[types.DiagnosticSeverity.Warning]
427 ):
428 sys.exit(2)
429 sys.exit(0)
432async def perform_linting_of_file(
433 lint_context: LintContext,
434 filename: str,
435 file_format: str,
436 auto_fixing_enabled: bool,
437 lint_report: LintReport,
438) -> None:
439 handler = LINTER_FORMATS.get(file_format)
440 if handler is None:
441 return
442 with open(filename, encoding="utf-8") as fd:
443 text = fd.read()
445 if auto_fixing_enabled:
446 await _auto_fix_run(
447 lint_context,
448 filename,
449 text,
450 handler,
451 lint_report,
452 )
453 else:
454 await _diagnostics_run(
455 lint_context,
456 filename,
457 text,
458 handler,
459 lint_report,
460 )
463def _overlapping_edit(
464 last_edit_range: types.Range,
465 last_fix_range: types.Range,
466) -> bool:
467 last_edit_start_pos = last_edit_range.start
468 last_fix_start_pos = last_fix_range.start
469 if last_edit_start_pos.line < last_fix_start_pos.line:
470 return True
471 if (
472 last_edit_start_pos.line == last_fix_start_pos.character
473 and last_edit_start_pos.character < last_fix_start_pos.character
474 ):
475 return True
477 if last_fix_range.end == last_fix_start_pos:
478 return False
480 if last_fix_range.end < last_edit_start_pos:
481 return False
482 return True
485def _max_range(
486 r1: types.Range,
487 r2: types.Range,
488) -> types.Range:
489 if r2.end > r1.end:
490 return r2
491 if r2.end < r1.end:
492 return r1
493 if r2.start > r1.start:
494 return r2
495 return r1
498_INITIAL_FIX_RANGE = types.Range(
499 types.Position(0, 0),
500 types.Position(0, 0),
501)
504def _is_non_interactive_auto_fix_allowed(diagnostic: types.Diagnostic) -> bool:
505 diag_data: DiagnosticData | None = diagnostic.data
506 return diag_data is None or diag_data.get("enable_non_interactive_auto_fix", True)
509async def _auto_fix_run(
510 lint_context: LintContext,
511 filename: str,
512 text: str,
513 linter: AsyncLinterImpl,
514 lint_report: LintReport,
515) -> None:
516 another_round = True
517 unfixed_diagnostics: list[types.Diagnostic] = []
518 remaining_rounds = 10
519 fixed_count = 0
520 too_many_rounds = False
521 lines = text.splitlines(keepends=True)
522 lint_state = lint_context.state_for(
523 filename,
524 text,
525 lines,
526 linter,
527 )
528 current_issues = await lint_state.gather_diagnostics()
529 issue_count_start = len(current_issues) if current_issues else 0
530 while another_round and current_issues:
531 another_round = False
532 last_fix_range = _INITIAL_FIX_RANGE
533 unfixed_diagnostics.clear()
534 edits = []
535 fixed_diagnostics = []
536 for diagnostic in current_issues:
537 if not _is_non_interactive_auto_fix_allowed(diagnostic):
538 unfixed_diagnostics.append(diagnostic)
539 continue
540 actions = provide_standard_quickfixes_from_diagnostics_lint(
541 lint_state,
542 types.CodeActionParams(
543 types.TextDocumentIdentifier(lint_state.doc_uri),
544 diagnostic.range,
545 types.CodeActionContext(
546 [diagnostic],
547 ),
548 ),
549 )
550 auto_fixing_edits = resolve_auto_fixer(lint_state.doc_uri, actions)
552 if not auto_fixing_edits:
553 unfixed_diagnostics.append(diagnostic)
554 continue
556 sorted_edits = merge_sort_text_edits(
557 [get_well_formatted_edit(e) for e in auto_fixing_edits],
558 )
559 last_edit = sorted_edits[-1]
560 last_edit_range = last_edit.range
561 if _overlapping_edit(last_edit_range, last_fix_range):
562 if not another_round:
563 if remaining_rounds > 0:
564 remaining_rounds -= 1
565 print(
566 "Detected overlapping edit; scheduling another edit round."
567 )
568 another_round = True
569 else:
570 _warn(
571 "Too many overlapping edits; stopping after this round (circuit breaker)."
572 )
573 too_many_rounds = True
574 continue
575 edits.extend(sorted_edits)
576 fixed_diagnostics.append(diagnostic)
577 last_fix_range = _max_range(last_fix_range, sorted_edits[-1].range)
579 if another_round and not edits:
580 _error(
581 "Internal error: Detected an overlapping edit and yet had no edits to perform..."
582 )
584 fixed_count += len(fixed_diagnostics)
586 try:
587 text = apply_text_edits(
588 text,
589 lines,
590 edits,
591 )
592 except OverLappingTextEditException:
593 _error(
594 f"Failed to apply edits for f{filename} due to over lapping edits. Please file a bug"
595 f" against `debputy` with a contents of the `debian` directory or a minimal reproducer"
596 )
597 lines = text.splitlines(keepends=True)
599 with lint_report.line_state(lint_state):
600 for diagnostic in fixed_diagnostics:
601 lint_report.report_diagnostic(
602 diagnostic,
603 result_state=LintDiagnosticResultState.FIXED,
604 )
605 lint_state.content = text
606 lint_state.lines = lines
607 lint_state.clear_cache()
608 current_issues = await lint_state.gather_diagnostics()
610 if fixed_count:
611 output_filename = f"{filename}.tmp"
612 with open(output_filename, "w", encoding="utf-8") as fd:
613 fd.write(text)
614 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
615 os.chmod(output_filename, orig_mode)
616 os.rename(output_filename, filename)
617 lines = text.splitlines(keepends=True)
618 lint_state.content = text
619 lint_state.lines = lines
620 remaining_issues = await lint_state.gather_diagnostics() or []
621 else:
622 remaining_issues = current_issues or []
624 with lint_report.line_state(lint_state):
625 for diagnostic in remaining_issues:
626 lint_report.report_diagnostic(diagnostic)
628 if isinstance(lint_report, TermLintReport):
629 # TODO: Not optimal, but will do for now.
630 fo = lint_report.fo
631 print()
632 if fixed_count:
633 remaining_issues_count = len(remaining_issues)
634 print(
635 fo.colored(
636 f"Fixes applied to {filename}: {fixed_count}."
637 f" Number of issues went from {issue_count_start} to {remaining_issues_count}",
638 fg="green",
639 style="bold",
640 )
641 )
642 elif remaining_issues:
643 print(
644 fo.colored(
645 f"None of the issues in {filename} could be fixed automatically. Sorry!",
646 fg="yellow",
647 bg="black",
648 style="bold",
649 )
650 )
651 else:
652 assert not current_issues
653 print(
654 fo.colored(
655 f"No issues detected in {filename}",
656 fg="green",
657 style="bold",
658 )
659 )
660 if too_many_rounds:
661 print(
662 fo.colored(
663 f"Not all fixes for issues in {filename} could be applied due to overlapping edits.",
664 fg="yellow",
665 bg="black",
666 style="bold",
667 )
668 )
669 print(
670 "Running once more may cause more fixes to be applied. However, you may be facing"
671 " pathological performance."
672 )
675async def _diagnostics_run(
676 lint_context: LintContext,
677 filename: str,
678 text: str,
679 linter: AsyncLinterImpl,
680 lint_report: LintReport,
681) -> None:
682 lines = text.splitlines(keepends=True)
683 lint_state = lint_context.state_for(filename, text, lines, linter)
684 with lint_report.line_state(lint_state):
685 issues = await lint_state.gather_diagnostics()
686 for diagnostic in issues:
687 actions = provide_standard_quickfixes_from_diagnostics_lint(
688 lint_state,
689 types.CodeActionParams(
690 types.TextDocumentIdentifier(lint_state.doc_uri),
691 diagnostic.range,
692 types.CodeActionContext(
693 [diagnostic],
694 ),
695 ),
696 )
697 auto_fixer = resolve_auto_fixer(lint_state.doc_uri, actions)
698 has_auto_fixer = bool(auto_fixer) and _is_non_interactive_auto_fix_allowed(
699 diagnostic
700 )
702 result_state = LintDiagnosticResultState.REPORTED
703 if has_auto_fixer:
704 result_state = LintDiagnosticResultState.AUTO_FIXABLE
705 elif has_at_least_lsp_quickfix(actions):
706 result_state = LintDiagnosticResultState.MANUAL_FIXABLE
708 lint_report.report_diagnostic(diagnostic, result_state=result_state)
711def has_at_least_lsp_quickfix(
712 actions: list[types.Command | types.CodeAction] | None,
713) -> bool:
714 if actions is None:
715 return False
716 for action in actions:
717 if (
718 not isinstance(action, types.CodeAction)
719 or action.kind != types.CodeActionKind.QuickFix
720 ):
721 continue
722 if action.edit is None and action.command is None:
723 continue
724 return True
725 return False
728def resolve_auto_fixer(
729 document_ref: str,
730 actions: list[types.Command | types.CodeAction] | None,
731) -> Sequence[types.TextEdit | types.AnnotatedTextEdit]:
732 if actions is None or len(actions) != 1:
733 return tuple()
734 action = actions[0]
735 if not isinstance(action, types.CodeAction):
736 return tuple()
737 workspace_edit = action.edit
738 if workspace_edit is None or action.command is not None:
739 return tuple()
740 document_changes = workspace_edit.document_changes
741 if document_changes:
742 if len(document_changes) != 1:
743 return tuple()
744 doc_edit = document_changes[0]
745 if not isinstance(doc_edit, types.TextDocumentEdit):
746 return tuple()
747 if doc_edit.text_document.uri != document_ref:
748 return tuple()
749 return doc_edit.edits
750 if (
751 not workspace_edit.changes
752 or len(workspace_edit.changes) != 1
753 or document_ref not in workspace_edit.changes
754 ):
755 return tuple()
756 return workspace_edit.changes[document_ref]