Coverage for src/debputy/linting/lint_util.py: 56%
426 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import collections
2import contextlib
3import dataclasses
4import datetime
5import os
6import time
7from collections import defaultdict, Counter
8from dataclasses import fields
9from enum import IntEnum
10from functools import lru_cache
11from typing import (
12 List,
13 Optional,
14 TYPE_CHECKING,
15 cast,
16 FrozenSet,
17 Self,
18 Set,
19 Dict,
20 Tuple,
21 TypeVar,
22)
23from collections.abc import (
24 Callable,
25 Mapping,
26 Sequence,
27 Iterable,
28 Awaitable,
29 AsyncIterable,
30)
33import debputy.l10n as l10n
34from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
35from debputy.dh.dh_assistant import (
36 extract_dh_addons_from_control,
37 DhSequencerData,
38 parse_drules_for_addons,
39)
40from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError
41from debputy.filesystem_scan import VirtualPathBase
42from debputy.integration_detection import determine_debputy_integration_mode
43from debputy.l10n import Translations
44from debputy.lsp.config.debputy_config import DebputyConfig
45from debputy.lsp.diagnostics import (
46 LintSeverity,
47 LINT_SEVERITY2LSP_SEVERITY,
48 DiagnosticData,
49 NATIVELY_LSP_SUPPORTED_SEVERITIES,
50)
51from debputy.lsp.spellchecking import Spellchecker, default_spellchecker
52from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
53from debputy.packages import SourcePackage, BinaryPackage
54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
55from debputy.plugin.api.spec import DebputyIntegrationMode
56from debputy.util import _warn, T
57from debputy.lsp.vendoring._deb822_repro.locatable import (
58 Range as TERange,
59 Position as TEPosition,
60 Locatable,
61 START_POSITION,
62)
64if TYPE_CHECKING:
65 import lsprotocol.types as types
66 from debputy.lsp.text_util import LintCapablePositionCodec
67 from debputy.lsp.maint_prefs import (
68 MaintainerPreferenceTable,
69 EffectiveFormattingPreference,
70 )
72else:
73 import debputy.lsprotocol.types as types
76L = TypeVar("L", bound=Locatable)
79AsyncLinterImpl = Callable[["LintState"], Awaitable[None]]
80FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]]
83class AbortTaskError(DebputyRuntimeError):
84 pass
87# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
88DIAG_SOURCE_WITHOUT_SECTIONS: frozenset[str] = frozenset(
89 {
90 "debputy",
91 "dpkg",
92 }
93)
95# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
96DIAG_SOURCE_WITH_SECTIONS: frozenset[str] = frozenset(
97 {
98 "Policy",
99 "DevRef",
100 }
101)
104def te_position_to_lsp(te_position: "TEPosition") -> types.Position:
105 return types.Position(
106 te_position.line_position,
107 te_position.cursor_position,
108 )
111def te_range_to_lsp(te_range: "TERange") -> types.Range:
112 return types.Range(
113 te_position_to_lsp(te_range.start_pos),
114 te_position_to_lsp(te_range.end_pos),
115 )
118def with_range_in_continuous_parts(
119 iterable: Iterable["L"],
120 *,
121 start_relative_to: "TEPosition" = START_POSITION,
122) -> Iterable[tuple["TERange", "L"]]:
123 current_pos = start_relative_to
124 for part in iterable:
125 part_range = part.size().relative_to(current_pos)
126 yield part_range, part
127 current_pos = part_range.end_pos
130@lru_cache
131def _check_diagnostic_source(source: str) -> None:
132 if source in DIAG_SOURCE_WITHOUT_SECTIONS:
133 return
134 parts = source.split(" ")
135 s = parts[0]
136 if s not in DIAG_SOURCE_WITH_SECTIONS: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise ValueError(
138 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py'
139 )
140 if len(parts) != 2: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 raise ValueError(
142 f'The diagnostic source "{source}" should have exactly one section associated with it.'
143 )
146@dataclasses.dataclass(slots=True)
147class DebputyMetadata:
148 debputy_integration_mode: DebputyIntegrationMode | None
150 @classmethod
151 def from_data(
152 cls,
153 source_fields: Mapping[str, str],
154 dh_sequencer_data: DhSequencerData,
155 ) -> Self:
156 integration_mode = determine_debputy_integration_mode(
157 source_fields,
158 dh_sequencer_data.sequences,
159 )
160 return cls(integration_mode)
163@dataclasses.dataclass(slots=True, frozen=True)
164class RelatedDiagnosticInformation:
165 text_range: "TERange"
166 message: str
167 doc_uri: str
169 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation:
170 return types.DiagnosticRelatedInformation(
171 types.Location(
172 self.doc_uri,
173 lint_state.position_codec.range_to_client_units(
174 lint_state.lines,
175 te_range_to_lsp(self.text_range),
176 ),
177 ),
178 self.message,
179 )
182@dataclasses.dataclass(frozen=True, slots=True)
183class WorkspaceTextEditSupport:
184 supports_document_changes: bool = False
185 supported_resource_operation_edit_kinds: Sequence[types.ResourceOperationKind] = (
186 dataclasses.field(default_factory=list)
187 )
189 @property
190 def supports_versioned_text_edits(self) -> bool:
191 return (
192 self.supports_document_changes
193 or self.supported_resource_operation_edit_kinds
194 )
197class LintState:
199 @property
200 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
201 """The plugin features known to the running instance of `debputy`
203 This is mostly only relevant when working with `debputy.manifest`
204 """
205 raise NotImplementedError
207 @property
208 def doc_uri(self) -> str:
209 """The URI for the document being scanned.
211 This can be useful for providing related location ranges.
212 """
213 raise NotImplementedError
215 @property
216 def doc_version(self) -> int | None:
217 raise NotImplementedError
219 @property
220 def source_root(self) -> VirtualPathBase | None:
221 """The path to the unpacked source root directory if available
223 This is the directory that would contain the `debian/` directory. Note, if you need the
224 `debian/` directory, please use `debian_dir` instead. There may be cases where the source
225 root is unavailable but the `debian/` directory is not.
226 """
227 raise NotImplementedError
229 @property
230 def debian_dir(self) -> VirtualPathBase | None:
231 """The path to the `debian/` directory if available"""
232 raise NotImplementedError
234 @property
235 def path(self) -> str:
236 """The filename or path of the file being scanned.
238 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed
239 that the file on the file system (even if accessible) has correct contents. When doing diagnostics
240 for an editor, the editor often requests diagnostics for unsaved changes.
241 """
242 raise NotImplementedError
244 @property
245 def content(self) -> str:
246 """The full contents of the file being checked"""
247 raise NotImplementedError
249 @property
250 def lines(self) -> list[str]:
251 # FIXME: Replace with `Sequence[str]` if possible
252 """The contents of the file being checked as a list of lines
254 Do **not** change the contents of this list as it may be cached.
255 """
256 raise NotImplementedError
258 @property
259 def position_codec(self) -> "LintCapablePositionCodec":
260 raise NotImplementedError
262 @property
263 def parsed_deb822_file_content(self) -> Deb822FileElement | None:
264 """The contents of the file being checked as a parsed deb822 file
266 This can sometimes use a cached version of the parsed file and is therefore preferable to
267 parsing the file manually from `content` or `lines`.
269 Do **not** change the contents of this as it may be cached.
270 """
271 raise NotImplementedError
273 @property
274 def source_package(self) -> SourcePackage | None:
275 """The source package (source stanza of `debian/control`).
277 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the
278 source stanza is not available.
279 """
280 raise NotImplementedError
282 @property
283 def binary_packages(self) -> Mapping[str, BinaryPackage] | None:
284 """The binary packages (the Package stanzas of `debian/control`).
286 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are
287 available.
288 """
289 raise NotImplementedError
291 @property
292 def maint_preference_table(self) -> "MaintainerPreferenceTable":
293 # TODO: Visible only for tests.
294 raise NotImplementedError
296 @property
297 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]:
298 raise NotImplementedError
300 @property
301 def debputy_metadata(self) -> DebputyMetadata:
302 """Information about `debputy` usage such as which integration mode is being used."""
303 src_pkg = self.source_package
304 src_fields = src_pkg.fields if src_pkg else {}
305 return DebputyMetadata.from_data(
306 src_fields,
307 self.dh_sequencer_data,
308 )
310 @property
311 def dh_sequencer_data(self) -> DhSequencerData:
312 """Information about the use of the `dh` sequencer
314 This includes which sequences are being used and whether the `dh` sequencer is used at all.
315 """
316 raise NotImplementedError
318 @property
319 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
320 raise NotImplementedError
322 @property
323 def debputy_config(self) -> DebputyConfig:
324 raise NotImplementedError
326 def spellchecker(self) -> "Spellchecker":
327 checker = default_spellchecker()
328 ignored_words = set()
329 source_package = self.source_package
330 binary_packages = self.binary_packages
331 if source_package and (name := source_package.fields.get("Source")) is not None:
332 ignored_words.add(name)
333 if binary_packages:
334 ignored_words.update(binary_packages.keys())
335 return checker.context_ignored_words(ignored_words)
337 async def slow_iter(
338 self,
339 iterable: Iterable[T],
340 *,
341 yield_every: int = 100,
342 ) -> AsyncIterable[T]:
343 for value in iterable:
344 yield value
346 def translation(self, domain: str) -> Translations:
347 return l10n.translation(
348 domain,
349 )
351 def related_diagnostic_information(
352 self,
353 text_range: "TERange",
354 message: str,
355 *,
356 doc_uri: str | None = None,
357 ) -> RelatedDiagnosticInformation:
358 """Provide a related context for the diagnostic
360 The related diagnostic information is typically highlighted with the diagnostic. As an example,
361 `debputy lint`'s terminal output will display the message and display the selected range after
362 the diagnostic itself.
364 :param text_range: The text range to highlight.
365 :param message: The message to associate with the provided text range.
366 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is
367 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file
368 for ranges passed to `emit_diagnostic`.
369 :return:
370 """
371 return RelatedDiagnosticInformation(
372 text_range,
373 message,
374 doc_uri=doc_uri if doc_uri is not None else self.doc_uri,
375 )
377 def emit_diagnostic(
378 self,
379 text_range: "TERange",
380 diagnostic_msg: str,
381 severity: LintSeverity,
382 authority_reference: str,
383 *,
384 quickfixes: list[dict] | None = None,
385 tags: list[types.DiagnosticTag] | None = None,
386 related_information: list[RelatedDiagnosticInformation] | None = None,
387 diagnostic_applies_to_another_file: str | None = None,
388 enable_non_interactive_auto_fix: bool = True,
389 ) -> None:
390 """Emit a diagnostic for an issue detected in the current file.
392 :param text_range: The text range to highlight in the file.
393 :param diagnostic_msg: The message to show to the user for this diagnostic
394 :param severity: The severity to associate with the diagnostic.
395 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of.
397 Use:
398 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1
399 (replace the section number with the relevant number for your case)
400 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2
401 (replace the section number with the relevant number for your case)
402 * "debputy" for diagnostics without a reference or where `debputy` is the authority.
403 (This is also used for cases where `debputy` filters the result. Like with spellchecking
404 via hunspell, where `debputy` provides its own ignore list on top)
406 If you need a new reference, feel free to add it to this list.
407 :param quickfixes: If provided, this is a list of possible fixes for this problem.
408 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`.
409 :param tags: TODO: Not yet specified (currently uses LSP format).
410 :param related_information: Provide additional context to the diagnostic. This can be used to define
411 the source of a conflict. As an example, for duplicate definitions, this can be used to show where
412 the definitions are.
414 Every item should be created via the `related_diagnostic_information` method.
415 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via
416 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false
417 positives.
418 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names.
419 Leave this one at `None`, unless you know you need it.
421 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as
422 `debian/install`, etc.
423 """
424 _check_diagnostic_source(authority_reference)
425 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity]
426 diag_data: DiagnosticData = {
427 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix,
428 }
430 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES:
431 diag_data["lint_severity"] = severity
432 if quickfixes:
433 diag_data["quickfixes"] = quickfixes
434 if diagnostic_applies_to_another_file is not None:
435 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file
437 lsp_range_client_units = self.position_codec.range_to_client_units(
438 self.lines,
439 te_range_to_lsp(text_range),
440 )
442 if related_information and any( 442 ↛ 445line 442 didn't jump to line 445 because the condition on line 442 was never true
443 i.doc_uri != self.doc_uri for i in related_information
444 ):
445 raise NotImplementedError("Ranges from another document will be wrong")
447 related_lsp_format = (
448 [i.to_lsp(self) for i in related_information]
449 if related_information
450 else None
451 )
452 diag = types.Diagnostic(
453 lsp_range_client_units,
454 diagnostic_msg,
455 severity=lsp_severity,
456 source=authority_reference,
457 data=diag_data if diag_data else None,
458 tags=tags,
459 related_information=related_lsp_format,
460 )
461 self._emit_diagnostic(diag)
463 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
464 raise NotImplementedError
467CLI_WORKSPACE_TEXT_EDIT_SUPPORT = WorkspaceTextEditSupport(
468 supports_document_changes=True,
469)
472@dataclasses.dataclass(slots=True)
473class LintStateImpl(LintState):
474 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False)
475 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False)
476 source_root: VirtualPathBase | None
477 debian_dir: VirtualPathBase | None
478 path: str
479 content: str
480 lines: list[str]
481 debputy_config: DebputyConfig
482 source_package: SourcePackage | None = None
483 binary_packages: Mapping[str, BinaryPackage] | None = None
484 effective_preference: Optional["EffectiveFormattingPreference"] = None
485 lint_implementation: Optional["AsyncLinterImpl"] = None
486 _parsed_cache: Deb822FileElement | None = None
487 _dh_sequencer_cache: DhSequencerData | None = None
488 _diagnostics: list[types.Diagnostic] | None = None
490 @property
491 def doc_uri(self) -> str:
492 path = self.path
493 abs_path = os.path.join(os.path.curdir, path)
494 return f"file://{abs_path}"
496 @property
497 def doc_version(self) -> int | None:
498 return None
500 @property
501 def position_codec(self) -> "LintCapablePositionCodec":
502 return LINTER_POSITION_CODEC
504 @property
505 def parsed_deb822_file_content(self) -> Deb822FileElement | None:
506 cache = self._parsed_cache
507 if cache is None:
508 cache = parse_deb822_file(
509 self.lines,
510 accept_files_with_error_tokens=True,
511 accept_files_with_duplicated_fields=True,
512 )
513 self._parsed_cache = cache
514 return cache
516 @property
517 def dh_sequencer_data(self) -> DhSequencerData:
518 dh_sequencer_cache = self._dh_sequencer_cache
519 if dh_sequencer_cache is None:
520 debian_dir = self.debian_dir
521 dh_sequences: set[str] = set()
522 saw_dh = False
523 src_pkg = self.source_package
524 drules = debian_dir.get("rules") if debian_dir is not None else None
525 if drules and drules.is_file:
526 try:
527 with drules.open() as fd:
528 saw_dh = parse_drules_for_addons(fd, dh_sequences)
529 except PureVirtualPathError:
530 pass
531 if src_pkg:
532 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
534 dh_sequencer_cache = DhSequencerData(
535 frozenset(dh_sequences),
536 saw_dh,
537 )
538 self._dh_sequencer_cache = dh_sequencer_cache
539 return dh_sequencer_cache
541 @property
542 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
543 return CLI_WORKSPACE_TEXT_EDIT_SUPPORT
545 async def gather_diagnostics(self) -> list[types.Diagnostic]:
546 if self._diagnostics is not None: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 raise RuntimeError(
548 "run_diagnostics cannot be run while it is already running"
549 )
550 linter = self.lint_implementation
551 if linter is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 raise TypeError(
553 "run_diagnostics cannot be run:"
554 " LintState was created without a lint implementation (such as for reformat-only)"
555 )
556 self._diagnostics = diagnostics = []
558 await linter(self)
560 self._diagnostics = None
561 return diagnostics
563 def clear_cache(self) -> None:
564 self._parsed_cache = None
565 self._dh_sequencer_cache = None
567 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
568 diagnostics = self._diagnostics
569 if diagnostics is None: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true
570 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics")
571 diagnostics.append(diagnostic)
574class LintDiagnosticResultState(IntEnum):
575 REPORTED = 1
576 MANUAL_FIXABLE = 2
577 AUTO_FIXABLE = 3
578 FIXED = 4
581@dataclasses.dataclass(slots=True, frozen=True)
582class LintDiagnosticResult:
583 diagnostic: types.Diagnostic
584 result_state: LintDiagnosticResultState
585 invalid_marker: RuntimeError | None
586 is_file_level_diagnostic: bool
587 has_broken_range: bool
588 missing_severity: bool
589 discovered_in: str
590 report_for_related_file: str | None
593class LintReport:
595 def __init__(self) -> None:
596 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter()
597 self.diagnostics_by_file: Mapping[str, list[LintDiagnosticResult]] = (
598 defaultdict(list)
599 )
600 self.number_of_invalid_diagnostics: int = 0
601 self.number_of_broken_diagnostics: int = 0
602 self.lint_state: LintState | None = None
603 self.start_timestamp = datetime.datetime.now()
604 self.durations: dict[str, float] = collections.defaultdict(float)
605 self._timer = time.perf_counter()
607 @contextlib.contextmanager
608 def line_state(self, lint_state: LintState) -> Iterable[None]:
609 previous = self.lint_state
610 if previous is not None:
611 path = previous.path
612 duration = time.perf_counter() - self._timer
613 self.durations[path] += duration
615 self.lint_state = lint_state
617 try:
618 self._timer = time.perf_counter()
619 yield
620 finally:
621 now = time.perf_counter()
622 duration = now - self._timer
623 self.durations[lint_state.path] += duration
624 self._timer = now
625 self.lint_state = previous
627 def report_diagnostic(
628 self,
629 diagnostic: types.Diagnostic,
630 *,
631 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED,
632 in_file: str | None = None,
633 ) -> None:
634 lint_state = self.lint_state
635 assert lint_state is not None
636 if in_file is None:
637 in_file = lint_state.path
638 discovered_in_file = in_file
639 severity = diagnostic.severity
640 missing_severity = False
641 error_marker: RuntimeError | None = None
642 if severity is None:
643 self.number_of_invalid_diagnostics += 1
644 severity = types.DiagnosticSeverity.Warning
645 diagnostic.severity = severity
646 missing_severity = True
648 lines = lint_state.lines
649 diag_range = diagnostic.range
650 start_pos = diag_range.start
651 end_pos = diag_range.end
652 diag_data = diagnostic.data
653 if isinstance(diag_data, dict):
654 report_for_related_file = diag_data.get("report_for_related_file")
655 if report_for_related_file is None or not isinstance(
656 report_for_related_file, str
657 ):
658 report_for_related_file = None
659 else:
660 in_file = report_for_related_file
661 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations.
662 if in_file not in self.durations:
663 self.durations[in_file] = 0
664 else:
665 report_for_related_file = None
666 if report_for_related_file is not None:
667 is_file_level_diagnostic = True
668 else:
669 is_file_level_diagnostic = _is_file_level_diagnostic(
670 lines,
671 start_pos.line,
672 start_pos.character,
673 end_pos.line,
674 end_pos.character,
675 )
676 has_broken_range = not is_file_level_diagnostic and (
677 end_pos.line > len(lines) or start_pos.line < 0
678 )
680 if has_broken_range or missing_severity:
681 error_marker = RuntimeError("Registration Marker for invalid diagnostic")
683 diagnostic_result = LintDiagnosticResult(
684 diagnostic,
685 result_state,
686 error_marker,
687 is_file_level_diagnostic,
688 has_broken_range,
689 missing_severity,
690 report_for_related_file=report_for_related_file,
691 discovered_in=discovered_in_file,
692 )
694 self.diagnostics_by_file[in_file].append(diagnostic_result)
695 self.diagnostics_count[severity] += 1
696 self.process_diagnostic(in_file, lint_state, diagnostic_result)
698 def process_diagnostic(
699 self,
700 filename: str,
701 lint_state: LintState,
702 diagnostic_result: LintDiagnosticResult,
703 ) -> None:
704 # Subclass hook
705 pass
707 def finish_report(self) -> None:
708 # Subclass hook
709 pass
712_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = {
713 types.DiagnosticSeverity.Error: "error",
714 types.DiagnosticSeverity.Warning: "warning",
715 types.DiagnosticSeverity.Information: "informational",
716 types.DiagnosticSeverity.Hint: "pedantic",
717}
720_TERM_SEVERITY2TAG = {
721 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored(
722 lint_tag if lint_tag else "error",
723 fg="red",
724 bg="black",
725 style="bold",
726 ),
727 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored(
728 lint_tag if lint_tag else "warning",
729 fg="yellow",
730 bg="black",
731 style="bold",
732 ),
733 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored(
734 lint_tag if lint_tag else "informational",
735 fg="blue",
736 bg="black",
737 style="bold",
738 ),
739 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored(
740 lint_tag if lint_tag else "pedantic",
741 fg="green",
742 bg="black",
743 style="bold",
744 ),
745}
748def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity:
749 lint_tag: LintSeverity | None = None
750 if isinstance(diagnostic.data, dict):
751 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity"))
753 if lint_tag is not None:
754 return lint_tag
755 severity = diagnostic.severity
756 if severity is None:
757 return "warning"
758 return _LS2DEBPUTY_SEVERITY.get(severity, "warning")
761class TermLintReport(LintReport):
763 def __init__(self, fo: IOBasedOutputStyling) -> None:
764 super().__init__()
765 self.fo = fo
767 def finish_report(self) -> None:
768 # Nothing to do for now
769 pass
771 def process_diagnostic(
772 self,
773 filename: str,
774 lint_state: LintState,
775 diagnostic_result: LintDiagnosticResult,
776 ) -> None:
777 diagnostic = diagnostic_result.diagnostic
778 fo = self.fo
779 severity = diagnostic.severity
780 assert severity is not None
781 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED:
782 tag_unresolved = _TERM_SEVERITY2TAG[severity]
783 lint_tag: LintSeverity | None = debputy_severity(diagnostic)
784 tag = tag_unresolved(fo, lint_tag)
785 else:
786 tag = fo.colored(
787 "auto-fixing",
788 fg="magenta",
789 bg="black",
790 style="bold",
791 )
793 if diagnostic_result.is_file_level_diagnostic:
794 start_line = 0
795 start_position = 0
796 end_line = 0
797 end_position = 0
798 else:
799 start_line = diagnostic.range.start.line
800 start_position = diagnostic.range.start.character
801 end_line = diagnostic.range.end.line
802 end_position = diagnostic.range.end.character
804 authority = diagnostic.source
805 assert authority is not None
806 diag_tags = f" [{authority}]"
807 lines = lint_state.lines
808 line_no_format_width = len(str(len(lines)))
810 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE:
811 diag_tags += "[Correctable via --auto-fix]"
812 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE:
813 diag_tags += "[LSP interactive quickfix]"
815 code = f"[{diagnostic.code}]: " if diagnostic.code else ""
816 msg = f"{code}{diagnostic.message}"
818 print(
819 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}",
820 )
821 if diagnostic_result.missing_severity:
822 _warn(
823 " This warning did not have an explicit severity; Used Warning as a fallback!"
824 )
825 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED:
826 # If it is fixed, there is no reason to show additional context.
827 return
828 if diagnostic_result.is_file_level_diagnostic:
829 print(" File-level diagnostic")
830 return
831 if diagnostic_result.has_broken_range:
832 _warn(
833 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..."
834 )
835 return
836 self._print_range_context(diagnostic.range, lines, line_no_format_width)
837 related_info_list = diagnostic.related_information or []
838 hint_tag = fo.colored(
839 "Related information",
840 fg="magenta",
841 bg="black",
842 style="bold",
843 )
844 for related_info in related_info_list:
845 if related_info.location.uri != lint_state.doc_uri:
846 continue
847 print(f" {hint_tag}: {related_info.message}")
848 self._print_range_context(
849 related_info.location.range, lines, line_no_format_width
850 )
852 def _print_range_context(
853 self,
854 print_range: types.Range,
855 lines: list[str],
856 line_no_format_width: int,
857 ) -> None:
858 lines_to_print = _lines_to_print(print_range)
859 fo = self.fo
860 start_line = print_range.start.line
861 for line_no in range(start_line, start_line + lines_to_print):
862 line = _highlight_range(fo, lines[line_no], line_no, print_range)
863 print(f" {line_no + 1:{line_no_format_width}}: {line}")
866class LinterPositionCodec:
868 def client_num_units(self, chars: str):
869 return len(chars)
871 def position_from_client_units(
872 self,
873 lines: list[str],
874 position: types.Position,
875 ) -> types.Position:
877 if len(lines) == 0:
878 return types.Position(0, 0)
879 if position.line >= len(lines):
880 return types.Position(len(lines) - 1, self.client_num_units(lines[-1]))
881 return position
883 def position_to_client_units(
884 self,
885 _lines: list[str],
886 position: types.Position,
887 ) -> types.Position:
888 return position
890 def range_from_client_units(
891 self, _lines: list[str], range: types.Range
892 ) -> types.Range:
893 return range
895 def range_to_client_units(
896 self, _lines: list[str], range: types.Range
897 ) -> types.Range:
898 return range
901LINTER_POSITION_CODEC = LinterPositionCodec()
904def _lines_to_print(range_: types.Range) -> int:
905 count = range_.end.line - range_.start.line
906 if range_.end.character > 0:
907 count += 1
908 return count
911def _highlight_range(
912 fo: IOBasedOutputStyling,
913 line: str,
914 line_no: int,
915 range_: types.Range,
916) -> str:
917 line_wo_nl = line.rstrip("\r\n")
918 start_pos = 0
919 prefix = ""
920 suffix = ""
921 if line_no == range_.start.line:
922 start_pos = range_.start.character
923 prefix = line_wo_nl[0:start_pos]
924 if line_no == range_.end.line:
925 end_pos = range_.end.character
926 suffix = line_wo_nl[end_pos:]
927 else:
928 end_pos = len(line_wo_nl)
930 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold")
932 return prefix + marked_part + suffix
935def _is_file_level_diagnostic(
936 lines: list[str],
937 start_line: int,
938 start_position: int,
939 end_line: int,
940 end_position: int,
941) -> bool:
942 if start_line != 0 or start_position != 0:
943 return False
944 line_count = len(lines)
945 if end_line + 1 == line_count and end_position == 0:
946 return True
947 return end_line == line_count and line_count and end_position == len(lines[-1])