Coverage for src/debputy/linting/lint_impl.py: 11%
377 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import dataclasses
2import os
3import stat
4import subprocess
5import sys
6import textwrap
7from contextlib import suppress
8from typing import (
9 Optional,
10 List,
11 Union,
12 NoReturn,
13 Mapping,
14 Sequence,
15 TYPE_CHECKING,
16 Callable,
17)
19from debputy.commands.debputy_cmd.context import CommandContext
20from debputy.commands.debputy_cmd.output import _output_styling, IOBasedOutputStyling
21from debputy.filesystem_scan import FSROOverlay
22from debputy.linting.lint_util import (
23 LintReport,
24 LintStateImpl,
25 FormatterImpl,
26 TermLintReport,
27 LintDiagnosticResultState,
28 AsyncLinterImpl,
29)
30from debputy.lsp.config.debputy_config import DebputyConfig, load_debputy_config
31from debputy.lsp.diagnostics import DiagnosticData
32from debputy.lsp.lsp_features import CLI_FORMAT_FILE_HANDLERS, CLI_DIAGNOSTIC_HANDLERS
33from debputy.lsp.maint_prefs import (
34 MaintainerPreferenceTable,
35 EffectiveFormattingPreference,
36 determine_effective_preference,
37)
38from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_lint
39from debputy.lsp.spellchecking import disable_spellchecking
40from debputy.lsp.text_edit import (
41 get_well_formatted_edit,
42 merge_sort_text_edits,
43 apply_text_edits,
44 OverLappingTextEditException,
45)
46from debputy.lsp.vendoring._deb822_repro import (
47 Deb822FileElement,
48 Deb822ParagraphElement,
49)
50from debputy.packages import SourcePackage, BinaryPackage
51from debputy.plugin.api import VirtualPath
52from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
53from debputy.util import _warn, _error, _info
54from debputy.yaml import MANIFEST_YAML, YAMLError
55from debputy.yaml.compat import CommentedMap
57if TYPE_CHECKING:
58 import lsprotocol.types as types
59else:
60 import debputy.lsprotocol.types as types
63LINTER_FORMATS = CLI_DIAGNOSTIC_HANDLERS
64REFORMAT_FORMATS = CLI_FORMAT_FILE_HANDLERS
67@dataclasses.dataclass(slots=True)
68class LintContext:
69 plugin_feature_set: PluginProvidedFeatureSet
70 maint_preference_table: MaintainerPreferenceTable
71 source_root: Optional[VirtualPath]
72 debian_dir: Optional[VirtualPath]
73 debputy_config: DebputyConfig
74 parsed_deb822_file_content: Optional[Deb822FileElement] = None
75 source_package: Optional[SourcePackage] = None
76 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
77 effective_preference: Optional[EffectiveFormattingPreference] = None
78 style_tool: Optional[str] = None
79 unsupported_preference_reason: Optional[str] = None
80 salsa_ci: Optional[CommentedMap] = None
82 def state_for(
83 self,
84 path: str,
85 content: str,
86 lines: List[str],
87 lint_implementation: Optional[AsyncLinterImpl],
88 ) -> LintStateImpl:
89 return LintStateImpl(
90 self.plugin_feature_set,
91 self.maint_preference_table,
92 self.source_root,
93 self.debian_dir,
94 path,
95 content,
96 lines,
97 self.debputy_config,
98 self.source_package,
99 self.binary_packages,
100 self.effective_preference,
101 lint_implementation,
102 )
105def gather_lint_info(context: CommandContext) -> LintContext:
106 source_root = FSROOverlay.create_root_dir(".", ".")
107 debian_dir = source_root.get("debian")
108 if debian_dir is not None and not debian_dir.is_dir:
109 debian_dir = None
110 lint_context = LintContext(
111 context.load_plugins(),
112 MaintainerPreferenceTable.load_preferences(),
113 source_root,
114 debian_dir,
115 load_debputy_config(),
116 )
117 try:
118 with open("debian/control") as fd:
119 deb822_file, source_package, binary_packages = (
120 context.dctrl_parser.parse_source_debian_control(fd, ignore_errors=True)
121 )
122 except FileNotFoundError:
123 source_package = None
124 else:
125 lint_context.parsed_deb822_file_content = deb822_file
126 lint_context.source_package = source_package
127 lint_context.binary_packages = binary_packages
128 salsa_ci_map: Optional[CommentedMap] = None
129 for ci_file in ("debian/salsa-ci.yml", ".gitlab-ci.yml"):
130 try:
131 with open(ci_file) as fd:
132 salsa_ci_map = MANIFEST_YAML.load(fd)
133 if not isinstance(salsa_ci_map, CommentedMap):
134 salsa_ci_map = None
135 break
136 except FileNotFoundError:
137 pass
138 except YAMLError:
139 break
140 if source_package is not None or salsa_ci_map is not None:
141 pref, tool, pref_reason = determine_effective_preference(
142 lint_context.maint_preference_table,
143 source_package,
144 salsa_ci_map,
145 )
146 lint_context.effective_preference = pref
147 lint_context.style_tool = tool
148 lint_context.unsupported_preference_reason = pref_reason
150 return lint_context
153def initialize_lint_report(context: CommandContext) -> LintReport:
154 lint_report_format = context.parsed_args.lint_report_format
155 report_output = context.parsed_args.report_output
157 if lint_report_format == "term":
158 fo = _output_styling(context.parsed_args, sys.stdout)
159 if report_output is not None:
160 _warn("--report-output is redundant for the `term` report")
161 return TermLintReport(fo)
162 if lint_report_format == "junit4-xml":
163 try:
164 import junit_xml
165 except ImportError:
166 _error(
167 "The `junit4-xml` report format requires `python3-junit.xml` to be installed"
168 )
170 from debputy.linting.lint_report_junit import JunitLintReport
172 if report_output is None:
173 report_output = "debputy-lint-junit.xml"
175 return JunitLintReport(report_output)
177 raise AssertionError(f"Missing case for lint_report_format: {lint_report_format}")
180async def perform_linting(context: CommandContext) -> None:
181 parsed_args = context.parsed_args
182 if not parsed_args.spellcheck:
183 disable_spellchecking()
184 linter_exit_code = parsed_args.linter_exit_code
185 lint_report = initialize_lint_report(context)
186 lint_context = gather_lint_info(context)
188 for name_stem in LINTER_FORMATS:
189 filename = f"./{name_stem}"
190 if not os.path.isfile(filename):
191 continue
192 await perform_linting_of_file(
193 lint_context,
194 filename,
195 name_stem,
196 context.parsed_args.auto_fix,
197 lint_report,
198 )
199 if lint_report.number_of_invalid_diagnostics:
200 _warn(
201 "Some diagnostics did not explicitly set severity. Please report the bug and include the output"
202 )
203 if lint_report.number_of_broken_diagnostics:
204 _error(
205 "Some sub-linters reported issues. Please report the bug and include the output"
206 )
208 if parsed_args.warn_about_check_manifest and os.path.isfile(
209 "debian/debputy.manifest"
210 ):
211 _info("Note: Due to a limitation in the linter, debian/debputy.manifest is")
212 _info("only **partially** checked by this command at the time of writing.")
213 _info("Please use `debputy check-manifest` to fully check the manifest.")
215 lint_report.finish_report()
217 if linter_exit_code:
218 _exit_with_lint_code(lint_report)
221def perform_reformat(
222 context: CommandContext,
223 *,
224 named_style: Optional[str] = None,
225) -> None:
226 parsed_args = context.parsed_args
227 fo = _output_styling(context.parsed_args, sys.stdout)
228 lint_context = gather_lint_info(context)
229 write_style = parsed_args.write_style
231 if named_style is not None:
232 style = lint_context.maint_preference_table.named_styles.get(named_style)
233 if style is None:
234 styles = ", ".join(lint_context.maint_preference_table.named_styles)
235 _error(f'There is no style named "{style}". Options include: {styles}')
236 if (
237 lint_context.effective_preference is not None
238 and lint_context.effective_preference != style
239 ):
240 _info(
241 f'Note that the style "{named_style}" does not match the style that `debputy` was configured to use.'
242 )
243 _info("This may be a non-issue (if the configuration is out of date).")
244 lint_context.effective_preference = style
246 elif write_style:
247 _error("The `--write-style` option requires a named style passed via `--style`")
249 if lint_context.effective_preference is None:
250 if lint_context.unsupported_preference_reason is not None:
251 _warn(
252 "While `debputy` could identify a formatting for this package, it does not support it."
253 )
254 _warn(f"{lint_context.unsupported_preference_reason}")
255 if lint_context.style_tool is not None:
256 _info(
257 f"The following tool might be able to apply the style: {lint_context.style_tool}"
258 )
259 if parsed_args.supported_style_required:
260 _error(
261 "Sorry; `debputy` does not support the style. Use --unknown-or-unsupported-style-is-ok to make"
262 " this a non-error (note that `debputy` will not reformat the packaging in this case; just not"
263 " exit with an error code)."
264 )
265 else:
266 print(
267 textwrap.dedent(
268 """\
269 You can enable set a style by doing either of:
271 * You can set `X-Style: black` in the source stanza of `debian/control` to pick
272 `black` as the preferred style for this package.
273 - Note: `black` is an opinionated style that follows the spirit of the `black` code formatter
274 for Python.
275 - If you use `pre-commit`, then there is a formatting hook at
276 https://salsa.debian.org/debian/debputy-pre-commit-hooks
278 * If you use the Debian Salsa CI pipeline, then you can set SALSA_CI_DISABLE_WRAP_AND_SORT (`no`)
279 or SALSA_CI_ENABLE_WRAP_AND_SORT (`yes`) to the relevant value and `debputy` will pick up the
280 configuration from there.
281 - Note: The option must be in `.gitlab-ci.yml` or `debian/salsa-ci.yml` to work. The Salsa CI
282 pipeline will use `wrap-and-sort` while `debputy` uses its own emulation of `wrap-and-sort`
283 (`debputy` also needs to apply the style via `debputy lsp server`).
285 * The `debputy` code also comes with a built-in style database. This may be interesting for
286 packaging teams, so set a default team style that applies to all packages maintained by
287 that packaging team.
288 - Individuals can also add their style, which can useful for ad-hoc packaging teams, where
289 `debputy` will automatically apply a style if *all* co-maintainers agree to it.
291 Note the above list is an ordered list of how `debputy` determines which style to use in case
292 multiple options are available.
293 """
294 )
295 )
296 if parsed_args.supported_style_required:
297 if lint_context.style_tool is not None:
298 _error(
299 "Sorry, `debputy reformat` does not support the packaging style. However, the"
300 f" formatting is supposedly handled by: {lint_context.style_tool}"
301 )
302 _error(
303 "Sorry; `debputy` does not know which style to use for this package. Please either set a"
304 "style or use --unknown-or-unsupported-style-is-ok to make this a non-error"
305 )
306 _info("")
307 _info(
308 "Doing nothing since no supported style could be identified as requested."
309 " See above how to set a style."
310 )
311 _info("Use --supported-style-is-required if this should be an error instead.")
312 sys.exit(0)
314 changes = False
315 auto_fix = context.parsed_args.auto_fix
316 modifiers = {}
317 if write_style:
319 def _commit_style(raw: str) -> str:
320 deb822_file, _, _ = context.dctrl_parser.parse_source_debian_control(
321 raw.splitlines(keepends=True), ignore_errors=True
322 )
324 for p in deb822_file.iter_parts():
325 if p.is_error:
326 _warn(
327 "Cannot commit the style due to syntax errors in debian/control. Fix these first."
328 )
329 return raw
330 if isinstance(p, Deb822ParagraphElement):
331 inserted = "X-Style" not in p
332 p["X-Style"] = named_style
333 # If we inserted the field, then we have opinions on its placement. But if it was already
334 # there, then we do not re-order it.
335 if inserted:
336 with suppress(KeyError):
337 p.order_before("X-Style", "Description")
338 break
340 return deb822_file.dump()
342 modifiers["debian/control"] = _commit_style
344 for name_stem in REFORMAT_FORMATS:
345 formatter = REFORMAT_FORMATS.get(name_stem)
346 filename = f"./{name_stem}"
347 if formatter is None or not os.path.isfile(filename):
348 continue
350 modifier = modifiers.get(name_stem)
352 reformatted = perform_reformat_of_file(
353 fo,
354 lint_context,
355 filename,
356 formatter,
357 auto_fix,
358 custom_modifier=modifier,
359 )
360 if reformatted:
361 changes = True
363 if changes and parsed_args.linter_exit_code:
364 sys.exit(2)
367def perform_reformat_of_file(
368 fo: IOBasedOutputStyling,
369 lint_context: LintContext,
370 filename: str,
371 formatter: FormatterImpl,
372 auto_fix: bool,
373 *,
374 custom_modifier: Optional[Callable[[str], str]] = None,
375) -> bool:
376 with open(filename, "rt", encoding="utf-8") as fd:
377 text = fd.read()
379 lines = text.splitlines(keepends=True)
380 lint_state = lint_context.state_for(
381 filename,
382 text,
383 lines,
384 None,
385 )
386 edits = formatter(lint_state)
388 if not edits and not custom_modifier:
389 return False
391 if edits:
392 try:
393 replacement = apply_text_edits(text, lines, edits)
394 except OverLappingTextEditException:
395 _error(
396 f"The reformater for {filename} produced overlapping edits (which is broken and will not work)"
397 )
398 else:
399 replacement = text
401 if custom_modifier:
402 replacement = custom_modifier(replacement)
404 output_filename = f"{filename}.tmp"
405 with open(output_filename, "wt", encoding="utf-8") as fd:
406 fd.write(replacement)
408 r = subprocess.run(["diff", "-u", filename, output_filename]).returncode
409 if r != 0 and r != 1:
410 _warn(f"diff -u {filename} {output_filename} failed!?")
411 if auto_fix:
412 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
413 os.chmod(output_filename, orig_mode)
414 os.rename(output_filename, filename)
415 print(
416 fo.colored(
417 f"Reformatted {filename}.",
418 fg="green",
419 style="bold",
420 )
421 )
422 else:
423 os.unlink(output_filename)
425 return True
428def _exit_with_lint_code(lint_report: LintReport) -> NoReturn:
429 diagnostics_count = lint_report.diagnostics_count
430 if (
431 diagnostics_count[types.DiagnosticSeverity.Error]
432 or diagnostics_count[types.DiagnosticSeverity.Warning]
433 ):
434 sys.exit(2)
435 sys.exit(0)
438async def perform_linting_of_file(
439 lint_context: LintContext,
440 filename: str,
441 file_format: str,
442 auto_fixing_enabled: bool,
443 lint_report: LintReport,
444) -> None:
445 handler = LINTER_FORMATS.get(file_format)
446 if handler is None:
447 return
448 with open(filename, "rt", encoding="utf-8") as fd:
449 text = fd.read()
451 if auto_fixing_enabled:
452 await _auto_fix_run(
453 lint_context,
454 filename,
455 text,
456 handler,
457 lint_report,
458 )
459 else:
460 await _diagnostics_run(
461 lint_context,
462 filename,
463 text,
464 handler,
465 lint_report,
466 )
469def _overlapping_edit(
470 last_edit_range: types.Range,
471 last_fix_range: types.Range,
472) -> bool:
473 last_edit_start_pos = last_edit_range.start
474 last_fix_start_pos = last_fix_range.start
475 if last_edit_start_pos.line < last_fix_start_pos.line:
476 return True
477 if (
478 last_edit_start_pos.line == last_fix_start_pos.character
479 and last_edit_start_pos.character < last_fix_start_pos.character
480 ):
481 return True
483 if last_fix_range.end == last_fix_start_pos:
484 return False
486 if last_fix_range.end < last_edit_start_pos:
487 return False
488 return True
491def _max_range(
492 r1: types.Range,
493 r2: types.Range,
494) -> types.Range:
495 if r2.end > r1.end:
496 return r2
497 if r2.end < r1.end:
498 return r1
499 if r2.start > r1.start:
500 return r2
501 return r1
504_INITIAL_FIX_RANGE = types.Range(
505 types.Position(0, 0),
506 types.Position(0, 0),
507)
510def _is_non_interactive_auto_fix_allowed(diagnostic: types.Diagnostic) -> bool:
511 diag_data: Optional[DiagnosticData] = diagnostic.data
512 return diag_data is None or diag_data.get("enable_non_interactive_auto_fix", True)
515async def _auto_fix_run(
516 lint_context: LintContext,
517 filename: str,
518 text: str,
519 linter: AsyncLinterImpl,
520 lint_report: LintReport,
521) -> None:
522 another_round = True
523 unfixed_diagnostics: List[types.Diagnostic] = []
524 remaining_rounds = 10
525 fixed_count = 0
526 too_many_rounds = False
527 lines = text.splitlines(keepends=True)
528 lint_state = lint_context.state_for(
529 filename,
530 text,
531 lines,
532 linter,
533 )
534 current_issues = await lint_state.gather_diagnostics()
535 issue_count_start = len(current_issues) if current_issues else 0
536 while another_round and current_issues:
537 another_round = False
538 last_fix_range = _INITIAL_FIX_RANGE
539 unfixed_diagnostics.clear()
540 edits = []
541 fixed_diagnostics = []
542 for diagnostic in current_issues:
543 if not _is_non_interactive_auto_fix_allowed(diagnostic):
544 unfixed_diagnostics.append(diagnostic)
545 continue
546 actions = provide_standard_quickfixes_from_diagnostics_lint(
547 lint_state,
548 types.CodeActionParams(
549 types.TextDocumentIdentifier(lint_state.doc_uri),
550 diagnostic.range,
551 types.CodeActionContext(
552 [diagnostic],
553 ),
554 ),
555 )
556 auto_fixing_edits = resolve_auto_fixer(lint_state.doc_uri, actions)
558 if not auto_fixing_edits:
559 unfixed_diagnostics.append(diagnostic)
560 continue
562 sorted_edits = merge_sort_text_edits(
563 [get_well_formatted_edit(e) for e in auto_fixing_edits],
564 )
565 last_edit = sorted_edits[-1]
566 last_edit_range = last_edit.range
567 if _overlapping_edit(last_edit_range, last_fix_range):
568 if not another_round:
569 if remaining_rounds > 0:
570 remaining_rounds -= 1
571 print(
572 "Detected overlapping edit; scheduling another edit round."
573 )
574 another_round = True
575 else:
576 _warn(
577 "Too many overlapping edits; stopping after this round (circuit breaker)."
578 )
579 too_many_rounds = True
580 continue
581 edits.extend(sorted_edits)
582 fixed_diagnostics.append(diagnostic)
583 last_fix_range = _max_range(last_fix_range, sorted_edits[-1].range)
585 if another_round and not edits:
586 _error(
587 "Internal error: Detected an overlapping edit and yet had no edits to perform..."
588 )
590 fixed_count += len(fixed_diagnostics)
592 try:
593 text = apply_text_edits(
594 text,
595 lines,
596 edits,
597 )
598 except OverLappingTextEditException:
599 _error(
600 f"Failed to apply edits for f{filename} due to over lapping edits. Please file a bug"
601 f" against `debputy` with a contents of the `debian` directory or a minimal reproducer"
602 )
603 lines = text.splitlines(keepends=True)
605 with lint_report.line_state(lint_state):
606 for diagnostic in fixed_diagnostics:
607 lint_report.report_diagnostic(
608 diagnostic,
609 result_state=LintDiagnosticResultState.FIXED,
610 )
611 lint_state.content = text
612 lint_state.lines = lines
613 lint_state.clear_cache()
614 current_issues = await lint_state.gather_diagnostics()
616 if fixed_count:
617 output_filename = f"{filename}.tmp"
618 with open(output_filename, "wt", encoding="utf-8") as fd:
619 fd.write(text)
620 orig_mode = stat.S_IMODE(os.stat(filename).st_mode)
621 os.chmod(output_filename, orig_mode)
622 os.rename(output_filename, filename)
623 lines = text.splitlines(keepends=True)
624 lint_state.content = text
625 lint_state.lines = lines
626 remaining_issues = await lint_state.gather_diagnostics() or []
627 else:
628 remaining_issues = current_issues or []
630 with lint_report.line_state(lint_state):
631 for diagnostic in remaining_issues:
632 lint_report.report_diagnostic(diagnostic)
634 if isinstance(lint_report, TermLintReport):
635 # TODO: Not optimal, but will do for now.
636 fo = lint_report.fo
637 print()
638 if fixed_count:
639 remaining_issues_count = len(remaining_issues)
640 print(
641 fo.colored(
642 f"Fixes applied to {filename}: {fixed_count}."
643 f" Number of issues went from {issue_count_start} to {remaining_issues_count}",
644 fg="green",
645 style="bold",
646 )
647 )
648 elif remaining_issues:
649 print(
650 fo.colored(
651 f"None of the issues in {filename} could be fixed automatically. Sorry!",
652 fg="yellow",
653 bg="black",
654 style="bold",
655 )
656 )
657 else:
658 assert not current_issues
659 print(
660 fo.colored(
661 f"No issues detected in {filename}",
662 fg="green",
663 style="bold",
664 )
665 )
666 if too_many_rounds:
667 print(
668 fo.colored(
669 f"Not all fixes for issues in {filename} could be applied due to overlapping edits.",
670 fg="yellow",
671 bg="black",
672 style="bold",
673 )
674 )
675 print(
676 "Running once more may cause more fixes to be applied. However, you may be facing"
677 " pathological performance."
678 )
681async def _diagnostics_run(
682 lint_context: LintContext,
683 filename: str,
684 text: str,
685 linter: AsyncLinterImpl,
686 lint_report: LintReport,
687) -> None:
688 lines = text.splitlines(keepends=True)
689 lint_state = lint_context.state_for(filename, text, lines, linter)
690 with lint_report.line_state(lint_state):
691 issues = await lint_state.gather_diagnostics()
692 for diagnostic in issues:
693 actions = provide_standard_quickfixes_from_diagnostics_lint(
694 lint_state,
695 types.CodeActionParams(
696 types.TextDocumentIdentifier(lint_state.doc_uri),
697 diagnostic.range,
698 types.CodeActionContext(
699 [diagnostic],
700 ),
701 ),
702 )
703 auto_fixer = resolve_auto_fixer(lint_state.doc_uri, actions)
704 has_auto_fixer = bool(auto_fixer) and _is_non_interactive_auto_fix_allowed(
705 diagnostic
706 )
708 result_state = LintDiagnosticResultState.REPORTED
709 if has_auto_fixer:
710 result_state = LintDiagnosticResultState.AUTO_FIXABLE
711 elif has_at_least_lsp_quickfix(actions):
712 result_state = LintDiagnosticResultState.MANUAL_FIXABLE
714 lint_report.report_diagnostic(diagnostic, result_state=result_state)
717def has_at_least_lsp_quickfix(
718 actions: Optional[List[Union[types.Command, types.CodeAction]]],
719) -> bool:
720 if actions is None:
721 return False
722 for action in actions:
723 if (
724 not isinstance(action, types.CodeAction)
725 or action.kind != types.CodeActionKind.QuickFix
726 ):
727 continue
728 if action.edit is None and action.command is None:
729 continue
730 return True
731 return False
734def resolve_auto_fixer(
735 document_ref: str,
736 actions: Optional[List[Union[types.Command, types.CodeAction]]],
737) -> Sequence[Union[types.TextEdit, types.AnnotatedTextEdit]]:
738 if actions is None or len(actions) != 1:
739 return tuple()
740 action = actions[0]
741 if not isinstance(action, types.CodeAction):
742 return tuple()
743 workspace_edit = action.edit
744 if workspace_edit is None or action.command is not None:
745 return tuple()
746 document_changes = workspace_edit.document_changes
747 if document_changes:
748 if len(document_changes) != 1:
749 return tuple()
750 doc_edit = document_changes[0]
751 if not isinstance(doc_edit, types.TextDocumentEdit):
752 return tuple()
753 if doc_edit.text_document.uri != document_ref:
754 return tuple()
755 return doc_edit.edits
756 if (
757 not workspace_edit.changes
758 or len(workspace_edit.changes) != 1
759 or document_ref not in workspace_edit.changes
760 ):
761 return tuple()
762 return workspace_edit.changes[document_ref]