Coverage for src/debputy/linting/lint_util.py: 56%
425 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import collections
2import contextlib
3import dataclasses
4import datetime
5import os
6import time
7from collections import defaultdict, Counter
8from dataclasses import fields
9from enum import IntEnum
10from functools import lru_cache
11from typing import (
12 List,
13 Optional,
14 Callable,
15 TYPE_CHECKING,
16 Mapping,
17 Sequence,
18 cast,
19 FrozenSet,
20 Self,
21 Set,
22 Dict,
23 Iterable,
24 Awaitable,
25 AsyncIterable,
26 Tuple,
27 TypeVar,
28)
31import debputy.l10n as l10n
32from debputy.commands.debputy_cmd.output import IOBasedOutputStyling
33from debputy.dh.dh_assistant import (
34 extract_dh_addons_from_control,
35 DhSequencerData,
36 parse_drules_for_addons,
37)
38from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError
39from debputy.filesystem_scan import VirtualPathBase
40from debputy.integration_detection import determine_debputy_integration_mode
41from debputy.l10n import Translations
42from debputy.lsp.config.debputy_config import DebputyConfig
43from debputy.lsp.diagnostics import (
44 LintSeverity,
45 LINT_SEVERITY2LSP_SEVERITY,
46 DiagnosticData,
47 NATIVELY_LSP_SUPPORTED_SEVERITIES,
48)
49from debputy.lsp.spellchecking import Spellchecker, default_spellchecker
50from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
51from debputy.packages import SourcePackage, BinaryPackage
52from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
53from debputy.plugin.api.spec import DebputyIntegrationMode
54from debputy.util import _warn, T
55from debputy.lsp.vendoring._deb822_repro.locatable import (
56 Range as TERange,
57 Position as TEPosition,
58 Locatable,
59 START_POSITION,
60)
62if TYPE_CHECKING:
63 import lsprotocol.types as types
64 from debputy.lsp.text_util import LintCapablePositionCodec
65 from debputy.lsp.maint_prefs import (
66 MaintainerPreferenceTable,
67 EffectiveFormattingPreference,
68 )
70else:
71 import debputy.lsprotocol.types as types
74L = TypeVar("L", bound=Locatable)
77AsyncLinterImpl = Callable[["LintState"], Awaitable[None]]
78FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]]
81class AbortTaskError(DebputyRuntimeError):
82 pass
85# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
86DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset(
87 {
88 "debputy",
89 "dpkg",
90 }
91)
93# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
94DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset(
95 {
96 "Policy",
97 "DevRef",
98 }
99)
102def te_position_to_lsp(te_position: "TEPosition") -> types.Position:
103 return types.Position(
104 te_position.line_position,
105 te_position.cursor_position,
106 )
109def te_range_to_lsp(te_range: "TERange") -> types.Range:
110 return types.Range(
111 te_position_to_lsp(te_range.start_pos),
112 te_position_to_lsp(te_range.end_pos),
113 )
116def with_range_in_continuous_parts(
117 iterable: Iterable["L"],
118 *,
119 start_relative_to: "TEPosition" = START_POSITION,
120) -> Iterable[Tuple["TERange", "L"]]:
121 current_pos = start_relative_to
122 for part in iterable:
123 part_range = part.size().relative_to(current_pos)
124 yield part_range, part
125 current_pos = part_range.end_pos
128@lru_cache
129def _check_diagnostic_source(source: str) -> None:
130 if source in DIAG_SOURCE_WITHOUT_SECTIONS:
131 return
132 parts = source.split(" ")
133 s = parts[0]
134 if s not in DIAG_SOURCE_WITH_SECTIONS: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise ValueError(
136 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py'
137 )
138 if len(parts) != 2: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 raise ValueError(
140 f'The diagnostic source "{source}" should have exactly one section associated with it.'
141 )
144@dataclasses.dataclass(slots=True)
145class DebputyMetadata:
146 debputy_integration_mode: Optional[DebputyIntegrationMode]
148 @classmethod
149 def from_data(
150 cls,
151 source_fields: Mapping[str, str],
152 dh_sequencer_data: DhSequencerData,
153 ) -> Self:
154 integration_mode = determine_debputy_integration_mode(
155 source_fields,
156 dh_sequencer_data.sequences,
157 )
158 return cls(integration_mode)
161@dataclasses.dataclass(slots=True, frozen=True)
162class RelatedDiagnosticInformation:
163 text_range: "TERange"
164 message: str
165 doc_uri: str
167 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation:
168 return types.DiagnosticRelatedInformation(
169 types.Location(
170 self.doc_uri,
171 lint_state.position_codec.range_to_client_units(
172 lint_state.lines,
173 te_range_to_lsp(self.text_range),
174 ),
175 ),
176 self.message,
177 )
180@dataclasses.dataclass(frozen=True, slots=True)
181class WorkspaceTextEditSupport:
182 supports_document_changes: bool = False
183 supported_resource_operation_edit_kinds: Sequence[types.ResourceOperationKind] = (
184 dataclasses.field(default_factory=list)
185 )
187 @property
188 def supports_versioned_text_edits(self) -> bool:
189 return (
190 self.supports_document_changes
191 or self.supported_resource_operation_edit_kinds
192 )
195class LintState:
197 @property
198 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
199 """The plugin features known to the running instance of `debputy`
201 This is mostly only relevant when working with `debputy.manifest`
202 """
203 raise NotImplementedError
205 @property
206 def doc_uri(self) -> str:
207 """The URI for the document being scanned.
209 This can be useful for providing related location ranges.
210 """
211 raise NotImplementedError
213 @property
214 def doc_version(self) -> Optional[int]:
215 raise NotImplementedError
217 @property
218 def source_root(self) -> Optional[VirtualPathBase]:
219 """The path to the unpacked source root directory if available
221 This is the directory that would contain the `debian/` directory. Note, if you need the
222 `debian/` directory, please use `debian_dir` instead. There may be cases where the source
223 root is unavailable but the `debian/` directory is not.
224 """
225 raise NotImplementedError
227 @property
228 def debian_dir(self) -> Optional[VirtualPathBase]:
229 """The path to the `debian/` directory if available"""
230 raise NotImplementedError
232 @property
233 def path(self) -> str:
234 """The filename or path of the file being scanned.
236 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed
237 that the file on the file system (even if accessible) has correct contents. When doing diagnostics
238 for an editor, the editor often requests diagnostics for unsaved changes.
239 """
240 raise NotImplementedError
242 @property
243 def content(self) -> str:
244 """The full contents of the file being checked"""
245 raise NotImplementedError
247 @property
248 def lines(self) -> List[str]:
249 # FIXME: Replace with `Sequence[str]` if possible
250 """The contents of the file being checked as a list of lines
252 Do **not** change the contents of this list as it may be cached.
253 """
254 raise NotImplementedError
256 @property
257 def position_codec(self) -> "LintCapablePositionCodec":
258 raise NotImplementedError
260 @property
261 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
262 """The contents of the file being checked as a parsed deb822 file
264 This can sometimes use a cached version of the parsed file and is therefore preferable to
265 parsing the file manually from `content` or `lines`.
267 Do **not** change the contents of this as it may be cached.
268 """
269 raise NotImplementedError
271 @property
272 def source_package(self) -> Optional[SourcePackage]:
273 """The source package (source stanza of `debian/control`).
275 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the
276 source stanza is not available.
277 """
278 raise NotImplementedError
280 @property
281 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
282 """The binary packages (the Package stanzas of `debian/control`).
284 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are
285 available.
286 """
287 raise NotImplementedError
289 @property
290 def maint_preference_table(self) -> "MaintainerPreferenceTable":
291 # TODO: Visible only for tests.
292 raise NotImplementedError
294 @property
295 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]:
296 raise NotImplementedError
298 @property
299 def debputy_metadata(self) -> DebputyMetadata:
300 """Information about `debputy` usage such as which integration mode is being used."""
301 src_pkg = self.source_package
302 src_fields = src_pkg.fields if src_pkg else {}
303 return DebputyMetadata.from_data(
304 src_fields,
305 self.dh_sequencer_data,
306 )
308 @property
309 def dh_sequencer_data(self) -> DhSequencerData:
310 """Information about the use of the `dh` sequencer
312 This includes which sequences are being used and whether the `dh` sequencer is used at all.
313 """
314 raise NotImplementedError
316 @property
317 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
318 raise NotImplementedError
320 @property
321 def debputy_config(self) -> DebputyConfig:
322 raise NotImplementedError
324 def spellchecker(self) -> "Spellchecker":
325 checker = default_spellchecker()
326 ignored_words = set()
327 source_package = self.source_package
328 binary_packages = self.binary_packages
329 if source_package and (name := source_package.fields.get("Source")) is not None:
330 ignored_words.add(name)
331 if binary_packages:
332 ignored_words.update(binary_packages.keys())
333 return checker.context_ignored_words(ignored_words)
335 async def slow_iter(
336 self,
337 iterable: Iterable[T],
338 *,
339 yield_every: int = 100,
340 ) -> AsyncIterable[T]:
341 for value in iterable:
342 yield value
344 def translation(self, domain: str) -> Translations:
345 return l10n.translation(
346 domain,
347 )
349 def related_diagnostic_information(
350 self,
351 text_range: "TERange",
352 message: str,
353 *,
354 doc_uri: Optional[str] = None,
355 ) -> RelatedDiagnosticInformation:
356 """Provide a related context for the diagnostic
358 The related diagnostic information is typically highlighted with the diagnostic. As an example,
359 `debputy lint`'s terminal output will display the message and display the selected range after
360 the diagnostic itself.
362 :param text_range: The text range to highlight.
363 :param message: The message to associate with the provided text range.
364 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is
365 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file
366 for ranges passed to `emit_diagnostic`.
367 :return:
368 """
369 return RelatedDiagnosticInformation(
370 text_range,
371 message,
372 doc_uri=doc_uri if doc_uri is not None else self.doc_uri,
373 )
375 def emit_diagnostic(
376 self,
377 text_range: "TERange",
378 diagnostic_msg: str,
379 severity: LintSeverity,
380 authority_reference: str,
381 *,
382 quickfixes: Optional[List[dict]] = None,
383 tags: Optional[List[types.DiagnosticTag]] = None,
384 related_information: Optional[List[RelatedDiagnosticInformation]] = None,
385 diagnostic_applies_to_another_file: Optional[str] = None,
386 enable_non_interactive_auto_fix: bool = True,
387 ) -> None:
388 """Emit a diagnostic for an issue detected in the current file.
390 :param text_range: The text range to highlight in the file.
391 :param diagnostic_msg: The message to show to the user for this diagnostic
392 :param severity: The severity to associate with the diagnostic.
393 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of.
395 Use:
396 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1
397 (replace the section number with the relevant number for your case)
398 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2
399 (replace the section number with the relevant number for your case)
400 * "debputy" for diagnostics without a reference or where `debputy` is the authority.
401 (This is also used for cases where `debputy` filters the result. Like with spellchecking
402 via hunspell, where `debputy` provides its own ignore list on top)
404 If you need a new reference, feel free to add it to this list.
405 :param quickfixes: If provided, this is a list of possible fixes for this problem.
406 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`.
407 :param tags: TODO: Not yet specified (currently uses LSP format).
408 :param related_information: Provide additional context to the diagnostic. This can be used to define
409 the source of a conflict. As an example, for duplicate definitions, this can be used to show where
410 the definitions are.
412 Every item should be created via the `related_diagnostic_information` method.
413 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via
414 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false
415 positives.
416 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names.
417 Leave this one at `None`, unless you know you need it.
419 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as
420 `debian/install`, etc.
421 """
422 _check_diagnostic_source(authority_reference)
423 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity]
424 diag_data: DiagnosticData = {
425 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix,
426 }
428 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES:
429 diag_data["lint_severity"] = severity
430 if quickfixes:
431 diag_data["quickfixes"] = quickfixes
432 if diagnostic_applies_to_another_file is not None:
433 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file
435 lsp_range_client_units = self.position_codec.range_to_client_units(
436 self.lines,
437 te_range_to_lsp(text_range),
438 )
440 if related_information and any( 440 ↛ 443line 440 didn't jump to line 443 because the condition on line 440 was never true
441 i.doc_uri != self.doc_uri for i in related_information
442 ):
443 raise NotImplementedError("Ranges from another document will be wrong")
445 related_lsp_format = (
446 [i.to_lsp(self) for i in related_information]
447 if related_information
448 else None
449 )
450 diag = types.Diagnostic(
451 lsp_range_client_units,
452 diagnostic_msg,
453 severity=lsp_severity,
454 source=authority_reference,
455 data=diag_data if diag_data else None,
456 tags=tags,
457 related_information=related_lsp_format,
458 )
459 self._emit_diagnostic(diag)
461 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
462 raise NotImplementedError
465CLI_WORKSPACE_TEXT_EDIT_SUPPORT = WorkspaceTextEditSupport(
466 supports_document_changes=True,
467)
470@dataclasses.dataclass(slots=True)
471class LintStateImpl(LintState):
472 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False)
473 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False)
474 source_root: Optional[VirtualPathBase]
475 debian_dir: Optional[VirtualPathBase]
476 path: str
477 content: str
478 lines: List[str]
479 debputy_config: DebputyConfig
480 source_package: Optional[SourcePackage] = None
481 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
482 effective_preference: Optional["EffectiveFormattingPreference"] = None
483 lint_implementation: Optional["AsyncLinterImpl"] = None
484 _parsed_cache: Optional[Deb822FileElement] = None
485 _dh_sequencer_cache: Optional[DhSequencerData] = None
486 _diagnostics: Optional[List[types.Diagnostic]] = None
488 @property
489 def doc_uri(self) -> str:
490 path = self.path
491 abs_path = os.path.join(os.path.curdir, path)
492 return f"file://{abs_path}"
494 @property
495 def doc_version(self) -> Optional[int]:
496 return None
498 @property
499 def position_codec(self) -> "LintCapablePositionCodec":
500 return LINTER_POSITION_CODEC
502 @property
503 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
504 cache = self._parsed_cache
505 if cache is None:
506 cache = parse_deb822_file(
507 self.lines,
508 accept_files_with_error_tokens=True,
509 accept_files_with_duplicated_fields=True,
510 )
511 self._parsed_cache = cache
512 return cache
514 @property
515 def dh_sequencer_data(self) -> DhSequencerData:
516 dh_sequencer_cache = self._dh_sequencer_cache
517 if dh_sequencer_cache is None:
518 debian_dir = self.debian_dir
519 dh_sequences: Set[str] = set()
520 saw_dh = False
521 src_pkg = self.source_package
522 drules = debian_dir.get("rules") if debian_dir is not None else None
523 if drules and drules.is_file:
524 try:
525 with drules.open() as fd:
526 saw_dh = parse_drules_for_addons(fd, dh_sequences)
527 except PureVirtualPathError:
528 pass
529 if src_pkg:
530 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
532 dh_sequencer_cache = DhSequencerData(
533 frozenset(dh_sequences),
534 saw_dh,
535 )
536 self._dh_sequencer_cache = dh_sequencer_cache
537 return dh_sequencer_cache
539 @property
540 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
541 return CLI_WORKSPACE_TEXT_EDIT_SUPPORT
543 async def gather_diagnostics(self) -> List[types.Diagnostic]:
544 if self._diagnostics is not None: 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true
545 raise RuntimeError(
546 "run_diagnostics cannot be run while it is already running"
547 )
548 linter = self.lint_implementation
549 if linter is None: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 raise TypeError(
551 "run_diagnostics cannot be run:"
552 " LintState was created without a lint implementation (such as for reformat-only)"
553 )
554 self._diagnostics = diagnostics = []
556 await linter(self)
558 self._diagnostics = None
559 return diagnostics
561 def clear_cache(self) -> None:
562 self._parsed_cache = None
563 self._dh_sequencer_cache = None
565 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
566 diagnostics = self._diagnostics
567 if diagnostics is None: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics")
569 diagnostics.append(diagnostic)
572class LintDiagnosticResultState(IntEnum):
573 REPORTED = 1
574 MANUAL_FIXABLE = 2
575 AUTO_FIXABLE = 3
576 FIXED = 4
579@dataclasses.dataclass(slots=True, frozen=True)
580class LintDiagnosticResult:
581 diagnostic: types.Diagnostic
582 result_state: LintDiagnosticResultState
583 invalid_marker: Optional[RuntimeError]
584 is_file_level_diagnostic: bool
585 has_broken_range: bool
586 missing_severity: bool
587 discovered_in: str
588 report_for_related_file: Optional[str]
591class LintReport:
593 def __init__(self) -> None:
594 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter()
595 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = (
596 defaultdict(list)
597 )
598 self.number_of_invalid_diagnostics: int = 0
599 self.number_of_broken_diagnostics: int = 0
600 self.lint_state: Optional[LintState] = None
601 self.start_timestamp = datetime.datetime.now()
602 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0)
603 self._timer = time.perf_counter()
605 @contextlib.contextmanager
606 def line_state(self, lint_state: LintState) -> Iterable[None]:
607 previous = self.lint_state
608 if previous is not None:
609 path = previous.path
610 duration = time.perf_counter() - self._timer
611 self.durations[path] += duration
613 self.lint_state = lint_state
615 try:
616 self._timer = time.perf_counter()
617 yield
618 finally:
619 now = time.perf_counter()
620 duration = now - self._timer
621 self.durations[lint_state.path] += duration
622 self._timer = now
623 self.lint_state = previous
625 def report_diagnostic(
626 self,
627 diagnostic: types.Diagnostic,
628 *,
629 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED,
630 in_file: Optional[str] = None,
631 ) -> None:
632 lint_state = self.lint_state
633 assert lint_state is not None
634 if in_file is None:
635 in_file = lint_state.path
636 discovered_in_file = in_file
637 severity = diagnostic.severity
638 missing_severity = False
639 error_marker: Optional[RuntimeError] = None
640 if severity is None:
641 self.number_of_invalid_diagnostics += 1
642 severity = types.DiagnosticSeverity.Warning
643 diagnostic.severity = severity
644 missing_severity = True
646 lines = lint_state.lines
647 diag_range = diagnostic.range
648 start_pos = diag_range.start
649 end_pos = diag_range.end
650 diag_data = diagnostic.data
651 if isinstance(diag_data, dict):
652 report_for_related_file = diag_data.get("report_for_related_file")
653 if report_for_related_file is None or not isinstance(
654 report_for_related_file, str
655 ):
656 report_for_related_file = None
657 else:
658 in_file = report_for_related_file
659 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations.
660 if in_file not in self.durations:
661 self.durations[in_file] = 0
662 else:
663 report_for_related_file = None
664 if report_for_related_file is not None:
665 is_file_level_diagnostic = True
666 else:
667 is_file_level_diagnostic = _is_file_level_diagnostic(
668 lines,
669 start_pos.line,
670 start_pos.character,
671 end_pos.line,
672 end_pos.character,
673 )
674 has_broken_range = not is_file_level_diagnostic and (
675 end_pos.line > len(lines) or start_pos.line < 0
676 )
678 if has_broken_range or missing_severity:
679 error_marker = RuntimeError("Registration Marker for invalid diagnostic")
681 diagnostic_result = LintDiagnosticResult(
682 diagnostic,
683 result_state,
684 error_marker,
685 is_file_level_diagnostic,
686 has_broken_range,
687 missing_severity,
688 report_for_related_file=report_for_related_file,
689 discovered_in=discovered_in_file,
690 )
692 self.diagnostics_by_file[in_file].append(diagnostic_result)
693 self.diagnostics_count[severity] += 1
694 self.process_diagnostic(in_file, lint_state, diagnostic_result)
696 def process_diagnostic(
697 self,
698 filename: str,
699 lint_state: LintState,
700 diagnostic_result: LintDiagnosticResult,
701 ) -> None:
702 # Subclass hook
703 pass
705 def finish_report(self) -> None:
706 # Subclass hook
707 pass
710_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = {
711 types.DiagnosticSeverity.Error: "error",
712 types.DiagnosticSeverity.Warning: "warning",
713 types.DiagnosticSeverity.Information: "informational",
714 types.DiagnosticSeverity.Hint: "pedantic",
715}
718_TERM_SEVERITY2TAG = {
719 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored(
720 lint_tag if lint_tag else "error",
721 fg="red",
722 bg="black",
723 style="bold",
724 ),
725 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored(
726 lint_tag if lint_tag else "warning",
727 fg="yellow",
728 bg="black",
729 style="bold",
730 ),
731 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored(
732 lint_tag if lint_tag else "informational",
733 fg="blue",
734 bg="black",
735 style="bold",
736 ),
737 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored(
738 lint_tag if lint_tag else "pedantic",
739 fg="green",
740 bg="black",
741 style="bold",
742 ),
743}
746def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity:
747 lint_tag: Optional[LintSeverity] = None
748 if isinstance(diagnostic.data, dict):
749 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity"))
751 if lint_tag is not None:
752 return lint_tag
753 severity = diagnostic.severity
754 if severity is None:
755 return "warning"
756 return _LS2DEBPUTY_SEVERITY.get(severity, "warning")
759class TermLintReport(LintReport):
761 def __init__(self, fo: IOBasedOutputStyling) -> None:
762 super().__init__()
763 self.fo = fo
765 def finish_report(self) -> None:
766 # Nothing to do for now
767 pass
769 def process_diagnostic(
770 self,
771 filename: str,
772 lint_state: LintState,
773 diagnostic_result: LintDiagnosticResult,
774 ) -> None:
775 diagnostic = diagnostic_result.diagnostic
776 fo = self.fo
777 severity = diagnostic.severity
778 assert severity is not None
779 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED:
780 tag_unresolved = _TERM_SEVERITY2TAG[severity]
781 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic)
782 tag = tag_unresolved(fo, lint_tag)
783 else:
784 tag = fo.colored(
785 "auto-fixing",
786 fg="magenta",
787 bg="black",
788 style="bold",
789 )
791 if diagnostic_result.is_file_level_diagnostic:
792 start_line = 0
793 start_position = 0
794 end_line = 0
795 end_position = 0
796 else:
797 start_line = diagnostic.range.start.line
798 start_position = diagnostic.range.start.character
799 end_line = diagnostic.range.end.line
800 end_position = diagnostic.range.end.character
802 authority = diagnostic.source
803 assert authority is not None
804 diag_tags = f" [{authority}]"
805 lines = lint_state.lines
806 line_no_format_width = len(str(len(lines)))
808 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE:
809 diag_tags += "[Correctable via --auto-fix]"
810 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE:
811 diag_tags += "[LSP interactive quickfix]"
813 code = f"[{diagnostic.code}]: " if diagnostic.code else ""
814 msg = f"{code}{diagnostic.message}"
816 print(
817 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}",
818 )
819 if diagnostic_result.missing_severity:
820 _warn(
821 " This warning did not have an explicit severity; Used Warning as a fallback!"
822 )
823 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED:
824 # If it is fixed, there is no reason to show additional context.
825 return
826 if diagnostic_result.is_file_level_diagnostic:
827 print(" File-level diagnostic")
828 return
829 if diagnostic_result.has_broken_range:
830 _warn(
831 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..."
832 )
833 return
834 self._print_range_context(diagnostic.range, lines, line_no_format_width)
835 related_info_list = diagnostic.related_information or []
836 hint_tag = fo.colored(
837 "Related information",
838 fg="magenta",
839 bg="black",
840 style="bold",
841 )
842 for related_info in related_info_list:
843 if related_info.location.uri != lint_state.doc_uri:
844 continue
845 print(f" {hint_tag}: {related_info.message}")
846 self._print_range_context(
847 related_info.location.range, lines, line_no_format_width
848 )
850 def _print_range_context(
851 self,
852 print_range: types.Range,
853 lines: List[str],
854 line_no_format_width: int,
855 ) -> None:
856 lines_to_print = _lines_to_print(print_range)
857 fo = self.fo
858 start_line = print_range.start.line
859 for line_no in range(start_line, start_line + lines_to_print):
860 line = _highlight_range(fo, lines[line_no], line_no, print_range)
861 print(f" {line_no + 1:{line_no_format_width}}: {line}")
864class LinterPositionCodec:
866 def client_num_units(self, chars: str):
867 return len(chars)
869 def position_from_client_units(
870 self,
871 lines: List[str],
872 position: types.Position,
873 ) -> types.Position:
875 if len(lines) == 0:
876 return types.Position(0, 0)
877 if position.line >= len(lines):
878 return types.Position(len(lines) - 1, self.client_num_units(lines[-1]))
879 return position
881 def position_to_client_units(
882 self,
883 _lines: List[str],
884 position: types.Position,
885 ) -> types.Position:
886 return position
888 def range_from_client_units(
889 self, _lines: List[str], range: types.Range
890 ) -> types.Range:
891 return range
893 def range_to_client_units(
894 self, _lines: List[str], range: types.Range
895 ) -> types.Range:
896 return range
899LINTER_POSITION_CODEC = LinterPositionCodec()
902def _lines_to_print(range_: types.Range) -> int:
903 count = range_.end.line - range_.start.line
904 if range_.end.character > 0:
905 count += 1
906 return count
909def _highlight_range(
910 fo: IOBasedOutputStyling,
911 line: str,
912 line_no: int,
913 range_: types.Range,
914) -> str:
915 line_wo_nl = line.rstrip("\r\n")
916 start_pos = 0
917 prefix = ""
918 suffix = ""
919 if line_no == range_.start.line:
920 start_pos = range_.start.character
921 prefix = line_wo_nl[0:start_pos]
922 if line_no == range_.end.line:
923 end_pos = range_.end.character
924 suffix = line_wo_nl[end_pos:]
925 else:
926 end_pos = len(line_wo_nl)
928 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold")
930 return prefix + marked_part + suffix
933def _is_file_level_diagnostic(
934 lines: List[str],
935 start_line: int,
936 start_position: int,
937 end_line: int,
938 end_position: int,
939) -> bool:
940 if start_line != 0 or start_position != 0:
941 return False
942 line_count = len(lines)
943 if end_line + 1 == line_count and end_position == 0:
944 return True
945 return end_line == line_count and line_count and end_position == len(lines[-1])