Coverage for src/debputy/linting/lint_util.py: 57%
392 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import collections
2import contextlib
3import dataclasses
4import datetime
5import os
6import time
7from collections import defaultdict, Counter
8from enum import IntEnum
9from functools import lru_cache
10from typing import (
11 List,
12 Optional,
13 Callable,
14 TYPE_CHECKING,
15 Mapping,
16 Sequence,
17 cast,
18 FrozenSet,
19 Self,
20 Set,
21 Dict,
22 Iterable,
23 Awaitable,
24 AsyncIterable,
25)
27import debputy.l10n as l10n
28from debputy.commands.debputy_cmd.output import OutputStylingBase
29from debputy.dh.dh_assistant import (
30 extract_dh_addons_from_control,
31 DhSequencerData,
32 parse_drules_for_addons,
33)
34from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError
35from debputy.filesystem_scan import VirtualPathBase
36from debputy.integration_detection import determine_debputy_integration_mode
37from debputy.l10n import Translations
38from debputy.lsp.diagnostics import (
39 LintSeverity,
40 LINT_SEVERITY2LSP_SEVERITY,
41 DiagnosticData,
42 NATIVELY_LSP_SUPPORTED_SEVERITIES,
43)
44from debputy.lsp.spellchecking import Spellchecker, default_spellchecker
45from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
46from debputy.packages import SourcePackage, BinaryPackage
47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
48from debputy.plugin.api.spec import DebputyIntegrationMode
49from debputy.util import _warn, T
51if TYPE_CHECKING:
52 import lsprotocol.types as types
53 from debputy.lsp.text_util import LintCapablePositionCodec
54 from debputy.lsp.maint_prefs import (
55 MaintainerPreferenceTable,
56 EffectiveFormattingPreference,
57 )
58 from debputy.lsp.vendoring._deb822_repro.locatable import (
59 Range as TERange,
60 Position as TEPosition,
61 )
62else:
63 import debputy.lsprotocol.types as types
66AsyncLinterImpl = Callable[["LintState"], Awaitable[None]]
67FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]]
70class AbortTaskError(DebputyRuntimeError):
71 pass
74# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
75DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset(
76 {
77 "debputy",
78 "dpkg",
79 }
80)
82# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
83DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset(
84 {
85 "Policy",
86 "DevRef",
87 }
88)
91def te_position_to_lsp(te_position: "TEPosition") -> types.Position:
92 return types.Position(
93 te_position.line_position,
94 te_position.cursor_position,
95 )
98def te_range_to_lsp(te_range: "TERange") -> types.Range:
99 return types.Range(
100 te_position_to_lsp(te_range.start_pos),
101 te_position_to_lsp(te_range.end_pos),
102 )
105@lru_cache
106def _check_diagnostic_source(source: str) -> None:
107 if source in DIAG_SOURCE_WITHOUT_SECTIONS:
108 return
109 parts = source.split(" ")
110 s = parts[0]
111 if s not in DIAG_SOURCE_WITH_SECTIONS: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 raise ValueError(
113 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py'
114 )
115 if len(parts) != 2: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 raise ValueError(
117 f'The diagnostic source "{source}" should have exactly one section associated with it.'
118 )
121@dataclasses.dataclass(slots=True)
122class DebputyMetadata:
123 debputy_integration_mode: Optional[DebputyIntegrationMode]
125 @classmethod
126 def from_data(
127 cls,
128 source_fields: Mapping[str, str],
129 dh_sequencer_data: DhSequencerData,
130 ) -> Self:
131 integration_mode = determine_debputy_integration_mode(
132 source_fields,
133 dh_sequencer_data.sequences,
134 )
135 return cls(integration_mode)
138@dataclasses.dataclass(slots=True, frozen=True)
139class RelatedDiagnosticInformation:
140 text_range: "TERange"
141 message: str
142 doc_uri: str
144 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation:
145 return types.DiagnosticRelatedInformation(
146 types.Location(
147 self.doc_uri,
148 lint_state.position_codec.range_to_client_units(
149 lint_state.lines,
150 te_range_to_lsp(self.text_range),
151 ),
152 ),
153 self.message,
154 )
157class LintState:
159 @property
160 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
161 """The plugin features known to the running instance of `debputy`
163 This is mostly only relevant when working with `debputy.manifest
164 """
165 raise NotImplementedError
167 @property
168 def doc_uri(self) -> str:
169 """The URI for the document being scanned.
171 This can be useful for providing related location ranges.
172 """
173 raise NotImplementedError
175 @property
176 def source_root(self) -> Optional[VirtualPathBase]:
177 """The path to the unpacked source root directory if available
179 This is the directory that would contain the `debian/` directory. Note, if you need the
180 `debian/` directory, please use `debian_dir` instead. There may be cases where the source
181 root is unavailable but the `debian/` directory is not.
182 """
183 raise NotImplementedError
185 @property
186 def debian_dir(self) -> Optional[VirtualPathBase]:
187 """The path to the `debian/` directory if available"""
188 raise NotImplementedError
190 @property
191 def path(self) -> str:
192 """The filename or path of the file being scanned.
194 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed
195 that the file on the file system (even if accessible) has correct contents. When doing diagnostics
196 for an editor, the editor often requests diagnostics for unsaved changes.
197 """
198 raise NotImplementedError
200 @property
201 def content(self) -> str:
202 """The full contents of the file being checked"""
203 raise NotImplementedError
205 @property
206 def lines(self) -> List[str]:
207 # FIXME: Replace with `Sequence[str]` if possible
208 """The contents of the file being checked as a list of lines
210 Do **not** change the contents of this list as it may be cached.
211 """
212 raise NotImplementedError
214 @property
215 def position_codec(self) -> "LintCapablePositionCodec":
216 raise NotImplementedError
218 @property
219 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
220 """The contents of the file being checked as a parsed deb822 file
222 This can sometimes use a cached version of the parsed file and is therefore preferable to
223 parsing the file manually from `content` or `lines`.
225 Do **not** change the contents of this as it may be cached.
226 """
227 raise NotImplementedError
229 @property
230 def source_package(self) -> Optional[SourcePackage]:
231 """The source package (source stanza of `debian/control`).
233 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the
234 source stanza is not available.
235 """
236 raise NotImplementedError
238 @property
239 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
240 """The binary packages (the Package stanzas of `debian/control`).
242 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are
243 available.
244 """
245 raise NotImplementedError
247 @property
248 def maint_preference_table(self) -> "MaintainerPreferenceTable":
249 # TODO: Remove (unused)
250 raise NotImplementedError
252 @property
253 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]:
254 raise NotImplementedError
256 @property
257 def debputy_metadata(self) -> DebputyMetadata:
258 """Information about `debputy` usage such as which integration mode is being used."""
259 src_pkg = self.source_package
260 src_fields = src_pkg.fields if src_pkg else {}
261 return DebputyMetadata.from_data(
262 src_fields,
263 self.dh_sequencer_data,
264 )
266 @property
267 def dh_sequencer_data(self) -> DhSequencerData:
268 """Information about the use of the `dh` sequencer
270 This includes which sequences are being used and whether the `dh` sequencer is used at all.
271 """
272 raise NotImplementedError
274 def spellchecker(self) -> "Spellchecker":
275 checker = default_spellchecker()
276 ignored_words = set()
277 source_package = self.source_package
278 binary_packages = self.binary_packages
279 if source_package and source_package.fields.get("Source") is not None:
280 ignored_words.add(source_package.fields.get("Source"))
281 if binary_packages:
282 ignored_words.update(binary_packages)
283 return checker.context_ignored_words(ignored_words)
285 async def slow_iter(
286 self,
287 iterable: Iterable[T],
288 *,
289 yield_every: int = 100,
290 ) -> AsyncIterable[T]:
291 for value in iterable:
292 yield value
294 def translation(self, domain: str) -> Translations:
295 return l10n.translation(
296 domain,
297 )
299 def related_diagnostic_information(
300 self,
301 text_range: "TERange",
302 message: str,
303 *,
304 doc_uri: Optional[str] = None,
305 ) -> RelatedDiagnosticInformation:
306 """Provide a related context for the diagnostic
308 The related diagnostic information is typically highlighted with the diagnostic. As an example,
309 `debputy lint`'s terminal output will display the message and display the selected range after the
310 the diagnostic itself.
312 :param text_range: The text range to highlight.
313 :param message: The message to associate with the provided text range.
314 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is
315 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file
316 for ranges passed to `emit_diagnostic`.
317 :return:
318 """
319 return RelatedDiagnosticInformation(
320 text_range,
321 message,
322 doc_uri=doc_uri if doc_uri is not None else self.doc_uri,
323 )
325 def emit_diagnostic(
326 self,
327 text_range: "TERange",
328 diagnostic_msg: str,
329 severity: LintSeverity,
330 authority_reference: str,
331 *,
332 quickfixes: Optional[List[dict]] = None,
333 tags: Optional[List[types.DiagnosticTag]] = None,
334 related_information: Optional[List[RelatedDiagnosticInformation]] = None,
335 diagnostic_applies_to_another_file: Optional[str] = None,
336 enable_non_interactive_auto_fix: bool = True,
337 ) -> None:
338 """Emit a diagnostic for an issue detected in the current file.
340 :param text_range: The text range to highlight in the file.
341 :param diagnostic_msg: The message to show to the user for this diagnostic
342 :param severity: The severity to associate with the diagnostic.
343 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of.
345 Use:
346 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1
347 (replace the section number with the relevant number for your case)
348 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2
349 (replace the section number with the relevant number for your case)
350 * "debputy" for diagnostics without a reference or where `debputy` is the authority.
351 (This is also used for cases where `debputy` filters the result. Like with spellchecking
352 via hunspell, where `debputy` provides its own ignore list on top)
354 If you need a new reference, feel free to add it to this list.
355 :param quickfixes: If provided, this is a list of possible fixes for this problem.
356 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`.
357 :param tags: TODO: Not yet specified (currently uses LSP format).
358 :param related_information: Provide additional context to the diagnostic. This can be used to define
359 the source of a conflict. As an example, for duplicate definitions, this can be used to show where
360 the definitions are.
362 Every item should be created via the `related_diagnostic_information` method.
363 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via
364 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false
365 positives.
366 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names.
367 Leave this one at `None`, unless you know you need it.
369 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as
370 `debian/install`, etc.
371 """
372 _check_diagnostic_source(authority_reference)
373 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity]
374 diag_data: DiagnosticData = {
375 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix,
376 }
378 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES:
379 diag_data["lint_severity"] = severity
380 if quickfixes:
381 diag_data["quickfixes"] = quickfixes
382 if diagnostic_applies_to_another_file is not None:
383 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file
385 lsp_range_client_units = self.position_codec.range_to_client_units(
386 self.lines,
387 te_range_to_lsp(text_range),
388 )
390 related_lsp_format = (
391 [i.to_lsp(self) for i in related_information]
392 if related_information
393 else None
394 )
395 diag = types.Diagnostic(
396 lsp_range_client_units,
397 diagnostic_msg,
398 severity=lsp_severity,
399 source=authority_reference,
400 data=diag_data if diag_data else None,
401 tags=tags,
402 related_information=related_lsp_format,
403 )
404 self._emit_diagnostic(diag)
406 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
407 raise NotImplementedError
410@dataclasses.dataclass(slots=True)
411class LintStateImpl(LintState):
412 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False)
413 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False)
414 source_root: Optional[VirtualPathBase]
415 debian_dir: Optional[VirtualPathBase]
416 path: str
417 content: str
418 lines: List[str]
419 source_package: Optional[SourcePackage] = None
420 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
421 effective_preference: Optional["EffectiveFormattingPreference"] = None
422 lint_implementation: Optional["AsyncLinterImpl"] = None
423 _parsed_cache: Optional[Deb822FileElement] = None
424 _dh_sequencer_cache: Optional[DhSequencerData] = None
425 _diagnostics: Optional[List[types.Diagnostic]] = None
427 @property
428 def doc_uri(self) -> str:
429 path = self.path
430 abs_path = os.path.join(os.path.curdir, path)
431 return f"file://{abs_path}"
433 @property
434 def position_codec(self) -> "LintCapablePositionCodec":
435 return LINTER_POSITION_CODEC
437 @property
438 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
439 cache = self._parsed_cache
440 if cache is None: 440 ↛ 447line 440 didn't jump to line 447 because the condition on line 440 was always true
441 cache = parse_deb822_file(
442 self.lines,
443 accept_files_with_error_tokens=True,
444 accept_files_with_duplicated_fields=True,
445 )
446 self._parsed_cache = cache
447 return cache
449 @property
450 def dh_sequencer_data(self) -> DhSequencerData:
451 dh_sequencer_cache = self._dh_sequencer_cache
452 if dh_sequencer_cache is None:
453 debian_dir = self.debian_dir
454 dh_sequences: Set[str] = set()
455 saw_dh = False
456 src_pkg = self.source_package
457 drules = debian_dir.get("rules") if debian_dir is not None else None
458 if drules and drules.is_file:
459 try:
460 with drules.open() as fd:
461 saw_dh = parse_drules_for_addons(fd, dh_sequences)
462 except PureVirtualPathError:
463 pass
464 if src_pkg:
465 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
467 dh_sequencer_cache = DhSequencerData(
468 frozenset(dh_sequences),
469 saw_dh,
470 )
471 self._dh_sequencer_cache = dh_sequencer_cache
472 return dh_sequencer_cache
474 async def gather_diagnostics(self) -> List[types.Diagnostic]:
475 if self._diagnostics is not None: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 raise RuntimeError(
477 "run_diagnostics cannot be run while it is already running"
478 )
479 linter = self.lint_implementation
480 if linter is None: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 raise TypeError(
482 "run_diagnostics cannot be run:"
483 " LintState was created without a lint implementation (such as for reformat-only)"
484 )
485 self._diagnostics = diagnostics = []
487 await linter(self)
489 self._diagnostics = None
490 return diagnostics
492 def clear_cache(self) -> None:
493 self._parsed_cache = None
494 self._dh_sequencer_cache = None
496 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
497 diagnostics = self._diagnostics
498 if diagnostics is None: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics")
500 diagnostics.append(diagnostic)
503class LintDiagnosticResultState(IntEnum):
504 REPORTED = 1
505 MANUAL_FIXABLE = 2
506 AUTO_FIXABLE = 3
507 FIXED = 4
510@dataclasses.dataclass(slots=True, frozen=True)
511class LintDiagnosticResult:
512 diagnostic: types.Diagnostic
513 result_state: LintDiagnosticResultState
514 invalid_marker: Optional[RuntimeError]
515 is_file_level_diagnostic: bool
516 has_broken_range: bool
517 missing_severity: bool
518 discovered_in: str
519 report_for_related_file: Optional[str]
522class LintReport:
524 def __init__(self) -> None:
525 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter()
526 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = (
527 defaultdict(list)
528 )
529 self.number_of_invalid_diagnostics: int = 0
530 self.number_of_broken_diagnostics: int = 0
531 self.lint_state: Optional[LintState] = None
532 self.start_timestamp = datetime.datetime.now()
533 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0)
534 self._timer = time.perf_counter()
536 @contextlib.contextmanager
537 def line_state(self, lint_state: LintState) -> Iterable[None]:
538 previous = self.lint_state
539 if previous is not None:
540 path = previous.path
541 duration = time.perf_counter() - self._timer
542 self.durations[path] += duration
544 self.lint_state = lint_state
546 try:
547 self._timer = time.perf_counter()
548 yield
549 finally:
550 now = time.perf_counter()
551 duration = now - self._timer
552 self.durations[lint_state.path] += duration
553 self._timer = now
554 self.lint_state = previous
556 def report_diagnostic(
557 self,
558 diagnostic: types.Diagnostic,
559 *,
560 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED,
561 in_file: Optional[str] = None,
562 ) -> None:
563 lint_state = self.lint_state
564 assert lint_state is not None
565 if in_file is None:
566 in_file = lint_state.path
567 discovered_in_file = in_file
568 severity = diagnostic.severity
569 missing_severity = False
570 error_marker: Optional[RuntimeError] = None
571 if severity is None:
572 self.number_of_invalid_diagnostics += 1
573 severity = types.DiagnosticSeverity.Warning
574 diagnostic.severity = severity
575 missing_severity = True
577 lines = lint_state.lines
578 diag_range = diagnostic.range
579 start_pos = diag_range.start
580 end_pos = diag_range.end
581 diag_data = diagnostic.data
582 if isinstance(diag_data, dict):
583 report_for_related_file = diag_data.get("report_for_related_file")
584 if report_for_related_file is None or not isinstance(
585 report_for_related_file, str
586 ):
587 report_for_related_file = None
588 else:
589 in_file = report_for_related_file
590 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations.
591 if in_file not in self.durations:
592 self.durations[in_file] = 0
593 else:
594 report_for_related_file = None
595 if report_for_related_file is not None:
596 is_file_level_diagnostic = True
597 else:
598 is_file_level_diagnostic = _is_file_level_diagnostic(
599 lines,
600 start_pos.line,
601 start_pos.character,
602 end_pos.line,
603 end_pos.character,
604 )
605 has_broken_range = not is_file_level_diagnostic and (
606 end_pos.line > len(lines) or start_pos.line < 0
607 )
609 if has_broken_range or missing_severity:
610 error_marker = RuntimeError("Registration Marker for invalid diagnostic")
612 diagnostic_result = LintDiagnosticResult(
613 diagnostic,
614 result_state,
615 error_marker,
616 is_file_level_diagnostic,
617 has_broken_range,
618 missing_severity,
619 report_for_related_file=report_for_related_file,
620 discovered_in=discovered_in_file,
621 )
623 self.diagnostics_by_file[in_file].append(diagnostic_result)
624 self.diagnostics_count[severity] += 1
625 self.process_diagnostic(in_file, lint_state, diagnostic_result)
627 def process_diagnostic(
628 self,
629 filename: str,
630 lint_state: LintState,
631 diagnostic_result: LintDiagnosticResult,
632 ) -> None:
633 # Subclass hook
634 pass
636 def finish_report(self) -> None:
637 # Subclass hook
638 pass
641_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = {
642 types.DiagnosticSeverity.Error: "error",
643 types.DiagnosticSeverity.Warning: "warning",
644 types.DiagnosticSeverity.Information: "informational",
645 types.DiagnosticSeverity.Hint: "pedantic",
646}
649_TERM_SEVERITY2TAG = { 649 ↛ exitline 649 didn't jump to the function exit
650 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored(
651 lint_tag if lint_tag else "error",
652 fg="red",
653 bg="black",
654 style="bold",
655 ),
656 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored(
657 lint_tag if lint_tag else "warning",
658 fg="yellow",
659 bg="black",
660 style="bold",
661 ),
662 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored(
663 lint_tag if lint_tag else "informational",
664 fg="blue",
665 bg="black",
666 style="bold",
667 ),
668 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored(
669 lint_tag if lint_tag else "pedantic",
670 fg="green",
671 bg="black",
672 style="bold",
673 ),
674}
677def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity:
678 lint_tag: Optional[LintSeverity] = None
679 if isinstance(diagnostic.data, dict):
680 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity"))
682 if lint_tag is not None:
683 return lint_tag
684 severity = diagnostic.severity
685 if severity is None:
686 return "warning"
687 return _LS2DEBPUTY_SEVERITY.get(severity, "warning")
690class TermLintReport(LintReport):
692 def __init__(self, fo: OutputStylingBase) -> None:
693 super().__init__()
694 self.fo = fo
696 def finish_report(self) -> None:
697 # Nothing to do for now
698 pass
700 def process_diagnostic(
701 self,
702 filename: str,
703 lint_state: LintState,
704 diagnostic_result: LintDiagnosticResult,
705 ) -> None:
706 diagnostic = diagnostic_result.diagnostic
707 fo = self.fo
708 severity = diagnostic.severity
709 assert severity is not None
710 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED:
711 tag_unresolved = _TERM_SEVERITY2TAG[severity]
712 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic)
713 tag = tag_unresolved(fo, lint_tag)
714 else:
715 tag = fo.colored(
716 "auto-fixing",
717 fg="magenta",
718 bg="black",
719 style="bold",
720 )
722 if diagnostic_result.is_file_level_diagnostic:
723 start_line = 0
724 start_position = 0
725 end_line = 0
726 end_position = 0
727 else:
728 start_line = diagnostic.range.start.line
729 start_position = diagnostic.range.start.character
730 end_line = diagnostic.range.end.line
731 end_position = diagnostic.range.end.character
733 authority = diagnostic.source
734 assert authority is not None
735 diag_tags = f" [{authority}]"
736 lines = lint_state.lines
737 line_no_format_width = len(str(len(lines)))
739 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE:
740 diag_tags += "[Correctable via --auto-fix]"
741 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE:
742 diag_tags += "[LSP interactive quickfix]"
744 code = f"[{diagnostic.code}]: " if diagnostic.code else ""
745 msg = f"{code}{diagnostic.message}"
747 print(
748 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}",
749 )
750 if diagnostic_result.missing_severity:
751 _warn(
752 " This warning did not have an explicit severity; Used Warning as a fallback!"
753 )
754 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED:
755 # If it is fixed, there is no reason to show additional context.
756 return
757 if diagnostic_result.is_file_level_diagnostic:
758 print(" File-level diagnostic")
759 return
760 if diagnostic_result.has_broken_range:
761 _warn(
762 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..."
763 )
764 return
765 self._print_range_context(diagnostic.range, lines, line_no_format_width)
766 related_info_list = diagnostic.related_information or []
767 hint_tag = fo.colored(
768 "Related information",
769 fg="magenta",
770 bg="black",
771 style="bold",
772 )
773 for related_info in related_info_list:
774 if related_info.location.uri != lint_state.doc_uri:
775 continue
776 print(f" {hint_tag}: {related_info.message}")
777 self._print_range_context(
778 related_info.location.range, lines, line_no_format_width
779 )
781 def _print_range_context(
782 self,
783 print_range: types.Range,
784 lines: List[str],
785 line_no_format_width: int,
786 ) -> None:
787 lines_to_print = _lines_to_print(print_range)
788 fo = self.fo
789 start_line = print_range.start.line
790 for line_no in range(start_line, start_line + lines_to_print):
791 line = _highlight_range(fo, lines[line_no], line_no, print_range)
792 print(f" {line_no + 1:{line_no_format_width}}: {line}")
795class LinterPositionCodec:
797 def client_num_units(self, chars: str):
798 return len(chars)
800 def position_from_client_units(
801 self,
802 lines: List[str],
803 position: types.Position,
804 ) -> types.Position:
806 if len(lines) == 0:
807 return types.Position(0, 0)
808 if position.line >= len(lines):
809 return types.Position(len(lines) - 1, self.client_num_units(lines[-1]))
810 return position
812 def position_to_client_units(
813 self,
814 _lines: List[str],
815 position: types.Position,
816 ) -> types.Position:
817 return position
819 def range_from_client_units(
820 self, _lines: List[str], range: types.Range
821 ) -> types.Range:
822 return range
824 def range_to_client_units(
825 self, _lines: List[str], range: types.Range
826 ) -> types.Range:
827 return range
830LINTER_POSITION_CODEC = LinterPositionCodec()
833def _lines_to_print(range_: types.Range) -> int:
834 count = range_.end.line - range_.start.line
835 if range_.end.character > 0:
836 count += 1
837 return count
840def _highlight_range(
841 fo: OutputStylingBase,
842 line: str,
843 line_no: int,
844 range_: types.Range,
845) -> str:
846 line_wo_nl = line.rstrip("\r\n")
847 start_pos = 0
848 prefix = ""
849 suffix = ""
850 if line_no == range_.start.line:
851 start_pos = range_.start.character
852 prefix = line_wo_nl[0:start_pos]
853 if line_no == range_.end.line:
854 end_pos = range_.end.character
855 suffix = line_wo_nl[end_pos:]
856 else:
857 end_pos = len(line_wo_nl)
859 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold")
861 return prefix + marked_part + suffix
864def _is_file_level_diagnostic(
865 lines: List[str],
866 start_line: int,
867 start_position: int,
868 end_line: int,
869 end_position: int,
870) -> bool:
871 if start_line != 0 or start_position != 0:
872 return False
873 line_count = len(lines)
874 if end_line + 1 == line_count and end_position == 0:
875 return True
876 return end_line == line_count and line_count and end_position == len(lines[-1])