Coverage for src/debputy/linting/lint_impl.py: 11%
377 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import difflib
3import os
4import stat
5import sys
6import textwrap
7from contextlib import suppress
8from typing import (
9 NoReturn,
10 TYPE_CHECKING,
11)
12from collections.abc import Mapping, Sequence, Callable
14from debputy.commands.debputy_cmd.context import CommandContext
15from debputy.commands.debputy_cmd.output import _output_styling, IOBasedOutputStyling
16from debputy.filesystem_scan import FSROOverlay
17from debputy.linting.lint_util import (
18 LintReport,
19 LintStateImpl,
20 FormatterImpl,
21 TermLintReport,
22 LintDiagnosticResultState,
23 AsyncLinterImpl,
24)
25from debputy.lsp.config.debputy_config import DebputyConfig, load_debputy_config
26from debputy.lsp.diagnostics import DiagnosticData
27from debputy.lsp.lsp_features import CLI_FORMAT_FILE_HANDLERS, CLI_DIAGNOSTIC_HANDLERS
28from debputy.lsp.maint_prefs import (
29 MaintainerPreferenceTable,
30 EffectiveFormattingPreference,
31 determine_effective_preference,
32)
33from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_lint
34from debputy.lsp.spellchecking import disable_spellchecking
35from debputy.lsp.text_edit import (
36 get_well_formatted_edit,
37 merge_sort_text_edits,
38 apply_text_edits,
39 OverLappingTextEditException,
40)
41from debputy.lsp.vendoring._deb822_repro import (
42 Deb822FileElement,
43 Deb822ParagraphElement,
44)
45from debputy.packages import SourcePackage, BinaryPackage
46from debputy.plugin.api import VirtualPath
47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
48from debputy.util import _warn, _error, _info
49from debputy.yaml import MANIFEST_YAML, YAMLError
50from debputy.yaml.compat import CommentedMap
52if TYPE_CHECKING:
53 import lsprotocol.types as types
54else:
55 import debputy.lsprotocol.types as types
58LINTER_FORMATS = CLI_DIAGNOSTIC_HANDLERS
59REFORMAT_FORMATS = CLI_FORMAT_FILE_HANDLERS
62@dataclasses.dataclass(slots=True)
63class LintContext:
64 plugin_feature_set: PluginProvidedFeatureSet
65 maint_preference_table: MaintainerPreferenceTable
66 source_root: VirtualPath | None
67 debian_dir: VirtualPath | None
68 debputy_config: DebputyConfig
69 parsed_deb822_file_content: Deb822FileElement | None = None
70 source_package: SourcePackage | None = None
71 binary_packages: Mapping[str, BinaryPackage] | None = None
72 effective_preference: EffectiveFormattingPreference | None = None
73 style_tool: str | None = None
74 unsupported_preference_reason: str | None = None
75 salsa_ci: CommentedMap | None = None
77 def state_for(
78 self,
79 path: str,
80 content: str,
81 lines: list[str],
82 lint_implementation: AsyncLinterImpl | None,
83 ) -> LintStateImpl:
84 return LintStateImpl(
85 self.plugin_feature_set,
86 self.maint_preference_table,
87 self.source_root,
88 self.debian_dir,
89 path,
90 content,
91 lines,
92 self.debputy_config,
93 self.source_package,
94 self.binary_packages,
95 self.effective_preference,
96 lint_implementation,
97 )
100def gather_lint_info(context: CommandContext) -> LintContext:
101 source_root = FSROOverlay.create_root_dir(".", ".")
102 debian_dir = source_root.get("debian")
103 if debian_dir is not None and not debian_dir.is_dir:
104 debian_dir = None
105 lint_context = LintContext(
106 context.load_plugins(),
107 MaintainerPreferenceTable.load_preferences(),
108 source_root,
109 debian_dir,
110 load_debputy_config(),
111 )
112 try:
113 with open("debian/control") as fd:
114 deb822_file, source_package, binary_packages = (
115 context.dctrl_parser.parse_source_debian_control(fd, ignore_errors=True)
116 )
117 except FileNotFoundError:
118 source_package = None
119 else:
120 lint_context.parsed_deb822_file_content = deb822_file
121 lint_context.source_package = source_package
122 lint_context.binary_packages = binary_packages
123 salsa_ci_map: CommentedMap | None = None
124 for ci_file in ("debian/salsa-ci.yml", "debian/gitlab-ci.yml", ".gitlab-ci.yml"):
125 try:
126 with open(ci_file) as fd:
127 salsa_ci_map = MANIFEST_YAML.load(fd)
128 if not isinstance(salsa_ci_map, CommentedMap):
129 salsa_ci_map = None
130 break
131 except FileNotFoundError:
132 pass
133 except YAMLError:
134 break
135 if source_package is not None or salsa_ci_map is not None:
136 pref, tool, pref_reason = determine_effective_preference(
137 lint_context.maint_preference_table,
138 source_package,
139 salsa_ci_map,
140 )
141 lint_context.effective_preference = pref
142 lint_context.style_tool = tool
143 lint_context.unsupported_preference_reason = pref_reason
145 return lint_context
148def initialize_lint_report(context: CommandContext) -> LintReport:
149 lint_report_format = context.parsed_args.lint_report_format
150 report_output = context.parsed_args.report_output
152 if lint_report_format == "term":
153 fo = _output_styling(context.parsed_args, sys.stdout)
154 if report_output is not None:
155 _warn("--report-output is redundant for the `term` report")
156 return TermLintReport(fo)
157 if lint_report_format == "junit4-xml":
158 try:
159 import junit_xml
160 except ImportError:
161 _error(
162 "The `junit4-xml` report format requires `python3-junit.xml` to be installed"
163 )
165 from debputy.linting.lint_report_junit import JunitLintReport
167 if report_output is None:
168 report_output = "debputy-lint-junit.xml"
170 return JunitLintReport(report_output)
172 raise AssertionError(f"Missing case for lint_report_format: {lint_report_format}")
175async def perform_linting(context: CommandContext) -> None:
176 parsed_args = context.parsed_args
177 if not parsed_args.spellcheck:
178 disable_spellchecking()
179 linter_exit_code = parsed_args.linter_exit_code
180 lint_report = initialize_lint_report(context)
181 lint_context = gather_lint_info(context)
183 for name_stem in LINTER_FORMATS:
184 filename = f"./{name_stem}"
185 if not os.path.isfile(filename):
186 continue
187 await perform_linting_of_file(
188 lint_context,
189 filename,
190 name_stem,
191 context.parsed_args.auto_fix,
192 lint_report,
193 )
194 if lint_report.number_of_invalid_diagnostics:
195 _warn(
196 "Some diagnostics did not explicitly set severity. Please report the bug and include the output"
197 )
198 if lint_report.number_of_broken_diagnostics:
199 _error(
200 "Some sub-linters reported issues. Please report the bug and include the output"
201 )
203 if parsed_args.warn_about_check_manifest and os.path.isfile(
204 "debian/debputy.manifest"
205 ):
206 _info("Note: Due to a limitation in the linter, debian/debputy.manifest is")
207 _info("only **partially** checked by this command at the time of writing.")
208 _info("Please use `debputy check-manifest` to fully check the manifest.")
210 lint_report.finish_report()
212 if linter_exit_code:
213 _exit_with_lint_code(lint_report)
216def perform_reformat(
217 context: CommandContext,
218 *,
219 named_style: str | None = None,
220) -> None:
221 parsed_args = context.parsed_args
222 fo = _output_styling(context.parsed_args, sys.stdout)
223 lint_context = gather_lint_info(context)
224 write_style = parsed_args.write_style
226 if named_style is not None:
227 style = lint_context.maint_preference_table.named_styles.get(named_style)
228 if style is None:
229 styles = ", ".join(lint_context.maint_preference_table.named_styles)
230 _error(f'There is no style named "{style}". Options include: {styles}')
231 if (
232 lint_context.effective_preference is not None
233 and lint_context.effective_preference != style
234 ):
235 _info(
236 f'Note that the style "{named_style}" does not match the style that `debputy` was configured to use.'
237 )
238 _info("This may be a non-issue (if the configuration is out of date).")
239 lint_context.effective_preference = style
241 elif write_style:
242 _error("The `--write-style` option requires a named style passed via `--style`")
244 if lint_context.effective_preference is None:
245 if lint_context.unsupported_preference_reason is not None:
246 _warn(
247 "While `debputy` could identify a formatting for this package, it does not support it."
248 )
249 _warn(f"{lint_context.unsupported_preference_reason}")
250 if lint_context.style_tool is not None:
251 _info(
252 f"The following tool might be able to apply the style: {lint_context.style_tool}"
253 )
254 if parsed_args.supported_style_required:
255 _error(
256 "Sorry; `debputy` does not support the style. Use --unknown-or-unsupported-style-is-ok to make"
257 " this a non-error (note that `debputy` will not reformat the packaging in this case; just not"
258 " exit with an error code)."
259 )
260 else:
261 print(
262 textwrap.dedent(
263 """\
264 You can enable set a style by doing either of:
266 * You can set `X-Style: black` in the source stanza of `debian/control` to pick
267 `black` as the preferred style for this package.
268 - Note: `black` is an opinionated style that follows the spirit of the `black` code formatter
269 for Python.
270 - If you use `pre-commit`, then there is a formatting hook at
271 https://salsa.debian.org/debian/debputy-pre-commit-hooks
273 * If you use the Debian Salsa CI pipeline, then you can set SALSA_CI_DISABLE_WRAP_AND_SORT (`no`)
274 or SALSA_CI_ENABLE_WRAP_AND_SORT (`yes`) to the relevant value and `debputy` will pick up the
275 configuration from there.
276 - Note: The option must be in `.gitlab-ci.yml`, `debian/gitlab-ci.yml` or `debian/salsa-ci.yml`
277 to work. The Salsa CI pipeline will use `wrap-and-sort` while `debputy` uses its own emulation
278 of `wrap-and-sort` (`debputy` also needs to apply the style via `debputy lsp server`).
280 * The `debputy` code also comes with a built-in style database. This may be interesting for
281 packaging teams, so set a default team style that applies to all packages maintained by
282 that packaging team.
283 - Individuals can also add their style, which can useful for ad-hoc packaging teams, where
284 `debputy` will automatically apply a style if *all* co-maintainers agree to it.
286 Note the above list is an ordered list of how `debputy` determines which style to use in case
287 multiple options are available.
288 """
289 )
290 )
291 if parsed_args.supported_style_required:
292 if lint_context.style_tool is not None:
293 _error(
294 "Sorry, `debputy reformat` does not support the packaging style. However, the"
295 f" formatting is supposedly handled by: {lint_context.style_tool}"
296 )
297 _error(
298 "Sorry; `debputy` does not know which style to use for this package. Please either set a"
299 "style or use --unknown-or-unsupported-style-is-ok to make this a non-error"
300 )
301 _info("")
302 _info(
303 "Doing nothing since no supported style could be identified as requested."
304 " See above how to set a style."
305 )
306 _info("Use --supported-style-is-required if this should be an error instead.")
307 sys.exit(0)
309 changes = False
310 auto_fix = context.parsed_args.auto_fix
311 modifiers = {}
312 if write_style:
314 def _commit_style(raw: str) -> str:
315 deb822_file, _, _ = context.dctrl_parser.parse_source_debian_control(
316 raw.splitlines(keepends=True), ignore_errors=True
317 )
319 for p in deb822_file.iter_parts():
320 if p.is_error:
321 _warn(
322 "Cannot commit the style due to syntax errors in debian/control. Fix these first."
323 )
324 return raw
325 if isinstance(p, Deb822ParagraphElement):
326 inserted = "X-Style" not in p
327 p["X-Style"] = named_style
328 # If we inserted the field, then we have opinions on its placement. But if it was already
329 # there, then we do not re-order it.
330 if inserted:
331 with suppress(KeyError):
332 p.order_before("X-Style", "Description")
333 break
335 return deb822_file.dump()
337 modifiers["debian/control"] = _commit_style
339 for name_stem in REFORMAT_FORMATS:
340 formatter = REFORMAT_FORMATS.get(name_stem)
341 filename = f"./{name_stem}"
342 if formatter is None or not os.path.isfile(filename):
343 continue
345 modifier = modifiers.get(name_stem)
347 reformatted = perform_reformat_of_file(
348 fo,
349 lint_context,
350 filename,
351 formatter,
352 auto_fix,
353 custom_modifier=modifier,
354 )
355 if reformatted:
356 changes = True
358 if changes and parsed_args.linter_exit_code:
359 sys.exit(2)
362def perform_reformat_of_file(
363 fo: IOBasedOutputStyling,
364 lint_context: LintContext,
365 filename: str,
366 formatter: FormatterImpl,
367 auto_fix: bool,
368 *,
369 custom_modifier: Callable[[str], str] | None = None,
370) -> bool:
371 with open(filename, encoding="utf-8") as fd:
372 text = fd.read()
374 lines = text.splitlines(keepends=True)
375 lint_state = lint_context.state_for(
376 filename,
377 text,
378 lines,
379 None,
380 )
381 edits = formatter(lint_state)
383 if not edits and not custom_modifier:
384 return False
386 if edits:
387 try:
388 replacement = apply_text_edits(text, lines, edits)
389 except OverLappingTextEditException:
390 _error(
391 f"The reformater for {filename} produced overlapping edits (which is broken and will not work)"
392 )
393 else:
394 replacement = text
396 if custom_modifier:
397 replacement = custom_modifier(replacement)
399 unified_diff = difflib.unified_diff(
400 text.splitlines(keepends=True),
401 replacement.splitlines(keepends=True),
402 fromfile=filename,
403 tofile=filename,
404 )
405 for line in unified_diff:
406 print(line, end="")
408 if auto_fix:
409 output_filename = f"{filename}.tmp"
410 with open(output_filename, "w", encoding="utf-8") as fd:
411 fd.write(replacement)
412 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
413 os.chmod(output_filename, orig_mode)
414 os.rename(output_filename, filename)
415 print(
416 fo.colored(
417 f"Reformatted {filename}.",
418 fg="green",
419 style="bold",
420 )
421 )
423 return True
426def _exit_with_lint_code(lint_report: LintReport) -> NoReturn:
427 diagnostics_count = lint_report.diagnostics_count
428 if (
429 diagnostics_count[types.DiagnosticSeverity.Error]
430 or diagnostics_count[types.DiagnosticSeverity.Warning]
431 ):
432 sys.exit(2)
433 sys.exit(0)
436async def perform_linting_of_file(
437 lint_context: LintContext,
438 filename: str,
439 file_format: str,
440 auto_fixing_enabled: bool,
441 lint_report: LintReport,
442) -> None:
443 handler = LINTER_FORMATS.get(file_format)
444 if handler is None:
445 return
446 with open(filename, encoding="utf-8") as fd:
447 text = fd.read()
449 if auto_fixing_enabled:
450 await _auto_fix_run(
451 lint_context,
452 filename,
453 text,
454 handler,
455 lint_report,
456 )
457 else:
458 await _diagnostics_run(
459 lint_context,
460 filename,
461 text,
462 handler,
463 lint_report,
464 )
467def _overlapping_edit(
468 last_edit_range: types.Range,
469 last_fix_range: types.Range,
470) -> bool:
471 last_edit_start_pos = last_edit_range.start
472 last_fix_start_pos = last_fix_range.start
473 if last_edit_start_pos.line < last_fix_start_pos.line:
474 return True
475 if (
476 last_edit_start_pos.line == last_fix_start_pos.character
477 and last_edit_start_pos.character < last_fix_start_pos.character
478 ):
479 return True
481 if last_fix_range.end == last_fix_start_pos:
482 return False
484 if last_fix_range.end < last_edit_start_pos:
485 return False
486 return True
489def _max_range(
490 r1: types.Range,
491 r2: types.Range,
492) -> types.Range:
493 if r2.end > r1.end:
494 return r2
495 if r2.end < r1.end:
496 return r1
497 if r2.start > r1.start:
498 return r2
499 return r1
502_INITIAL_FIX_RANGE = types.Range(
503 types.Position(0, 0),
504 types.Position(0, 0),
505)
508def _is_non_interactive_auto_fix_allowed(diagnostic: types.Diagnostic) -> bool:
509 diag_data: DiagnosticData | None = diagnostic.data
510 return diag_data is None or diag_data.get("enable_non_interactive_auto_fix", True)
513async def _auto_fix_run(
514 lint_context: LintContext,
515 filename: str,
516 text: str,
517 linter: AsyncLinterImpl,
518 lint_report: LintReport,
519) -> None:
520 another_round = True
521 unfixed_diagnostics: list[types.Diagnostic] = []
522 remaining_rounds = 10
523 fixed_count = 0
524 too_many_rounds = False
525 lines = text.splitlines(keepends=True)
526 lint_state = lint_context.state_for(
527 filename,
528 text,
529 lines,
530 linter,
531 )
532 current_issues = await lint_state.gather_diagnostics()
533 issue_count_start = len(current_issues) if current_issues else 0
534 while another_round and current_issues:
535 another_round = False
536 last_fix_range = _INITIAL_FIX_RANGE
537 unfixed_diagnostics.clear()
538 edits = []
539 fixed_diagnostics = []
540 for diagnostic in current_issues:
541 if not _is_non_interactive_auto_fix_allowed(diagnostic):
542 unfixed_diagnostics.append(diagnostic)
543 continue
544 actions = provide_standard_quickfixes_from_diagnostics_lint(
545 lint_state,
546 types.CodeActionParams(
547 types.TextDocumentIdentifier(lint_state.doc_uri),
548 diagnostic.range,
549 types.CodeActionContext(
550 [diagnostic],
551 ),
552 ),
553 )
554 auto_fixing_edits = resolve_auto_fixer(lint_state.doc_uri, actions)
556 if not auto_fixing_edits:
557 unfixed_diagnostics.append(diagnostic)
558 continue
560 sorted_edits = merge_sort_text_edits(
561 [get_well_formatted_edit(e) for e in auto_fixing_edits],
562 )
563 last_edit = sorted_edits[-1]
564 last_edit_range = last_edit.range
565 if _overlapping_edit(last_edit_range, last_fix_range):
566 if not another_round:
567 if remaining_rounds > 0:
568 remaining_rounds -= 1
569 print(
570 "Detected overlapping edit; scheduling another edit round."
571 )
572 another_round = True
573 else:
574 _warn(
575 "Too many overlapping edits; stopping after this round (circuit breaker)."
576 )
577 too_many_rounds = True
578 continue
579 edits.extend(sorted_edits)
580 fixed_diagnostics.append(diagnostic)
581 last_fix_range = _max_range(last_fix_range, sorted_edits[-1].range)
583 if another_round and not edits:
584 _error(
585 "Internal error: Detected an overlapping edit and yet had no edits to perform..."
586 )
588 fixed_count += len(fixed_diagnostics)
590 try:
591 text = apply_text_edits(
592 text,
593 lines,
594 edits,
595 )
596 except OverLappingTextEditException:
597 _error(
598 f"Failed to apply edits for f{filename} due to over lapping edits. Please file a bug"
599 f" against `debputy` with a contents of the `debian` directory or a minimal reproducer"
600 )
601 lines = text.splitlines(keepends=True)
603 with lint_report.line_state(lint_state):
604 for diagnostic in fixed_diagnostics:
605 lint_report.report_diagnostic(
606 diagnostic,
607 result_state=LintDiagnosticResultState.FIXED,
608 )
609 lint_state.content = text
610 lint_state.lines = lines
611 lint_state.clear_cache()
612 current_issues = await lint_state.gather_diagnostics()
614 if fixed_count:
615 output_filename = f"{filename}.tmp"
616 with open(output_filename, "w", encoding="utf-8") as fd:
617 fd.write(text)
618 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
619 os.chmod(output_filename, orig_mode)
620 os.rename(output_filename, filename)
621 lines = text.splitlines(keepends=True)
622 lint_state.content = text
623 lint_state.lines = lines
624 remaining_issues = await lint_state.gather_diagnostics() or []
625 else:
626 remaining_issues = current_issues or []
628 with lint_report.line_state(lint_state):
629 for diagnostic in remaining_issues:
630 lint_report.report_diagnostic(diagnostic)
632 if isinstance(lint_report, TermLintReport):
633 # TODO: Not optimal, but will do for now.
634 fo = lint_report.fo
635 print()
636 if fixed_count:
637 remaining_issues_count = len(remaining_issues)
638 print(
639 fo.colored(
640 f"Fixes applied to {filename}: {fixed_count}."
641 f" Number of issues went from {issue_count_start} to {remaining_issues_count}",
642 fg="green",
643 style="bold",
644 )
645 )
646 elif remaining_issues:
647 print(
648 fo.colored(
649 f"None of the issues in {filename} could be fixed automatically. Sorry!",
650 fg="yellow",
651 bg="black",
652 style="bold",
653 )
654 )
655 else:
656 assert not current_issues
657 print(
658 fo.colored(
659 f"No issues detected in {filename}",
660 fg="green",
661 style="bold",
662 )
663 )
664 if too_many_rounds:
665 print(
666 fo.colored(
667 f"Not all fixes for issues in {filename} could be applied due to overlapping edits.",
668 fg="yellow",
669 bg="black",
670 style="bold",
671 )
672 )
673 print(
674 "Running once more may cause more fixes to be applied. However, you may be facing"
675 " pathological performance."
676 )
679async def _diagnostics_run(
680 lint_context: LintContext,
681 filename: str,
682 text: str,
683 linter: AsyncLinterImpl,
684 lint_report: LintReport,
685) -> None:
686 lines = text.splitlines(keepends=True)
687 lint_state = lint_context.state_for(filename, text, lines, linter)
688 with lint_report.line_state(lint_state):
689 issues = await lint_state.gather_diagnostics()
690 for diagnostic in issues:
691 actions = provide_standard_quickfixes_from_diagnostics_lint(
692 lint_state,
693 types.CodeActionParams(
694 types.TextDocumentIdentifier(lint_state.doc_uri),
695 diagnostic.range,
696 types.CodeActionContext(
697 [diagnostic],
698 ),
699 ),
700 )
701 auto_fixer = resolve_auto_fixer(lint_state.doc_uri, actions)
702 has_auto_fixer = bool(auto_fixer) and _is_non_interactive_auto_fix_allowed(
703 diagnostic
704 )
706 result_state = LintDiagnosticResultState.REPORTED
707 if has_auto_fixer:
708 result_state = LintDiagnosticResultState.AUTO_FIXABLE
709 elif has_at_least_lsp_quickfix(actions):
710 result_state = LintDiagnosticResultState.MANUAL_FIXABLE
712 lint_report.report_diagnostic(diagnostic, result_state=result_state)
715def has_at_least_lsp_quickfix(
716 actions: list[types.Command | types.CodeAction] | None,
717) -> bool:
718 if actions is None:
719 return False
720 for action in actions:
721 if (
722 not isinstance(action, types.CodeAction)
723 or action.kind != types.CodeActionKind.QuickFix
724 ):
725 continue
726 if action.edit is None and action.command is None:
727 continue
728 return True
729 return False
732def resolve_auto_fixer(
733 document_ref: str,
734 actions: list[types.Command | types.CodeAction] | None,
735) -> Sequence[types.TextEdit | types.AnnotatedTextEdit]:
736 if actions is None or len(actions) != 1:
737 return tuple()
738 action = actions[0]
739 if not isinstance(action, types.CodeAction):
740 return tuple()
741 workspace_edit = action.edit
742 if workspace_edit is None or action.command is not None:
743 return tuple()
744 document_changes = workspace_edit.document_changes
745 if document_changes:
746 if len(document_changes) != 1:
747 return tuple()
748 doc_edit = document_changes[0]
749 if not isinstance(doc_edit, types.TextDocumentEdit):
750 return tuple()
751 if doc_edit.text_document.uri != document_ref:
752 return tuple()
753 return doc_edit.edits
754 if (
755 not workspace_edit.changes
756 or len(workspace_edit.changes) != 1
757 or document_ref not in workspace_edit.changes
758 ):
759 return tuple()
760 return workspace_edit.changes[document_ref]