Coverage for src/debputy/linting/lint_util.py: 57%
366 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import collections
2import contextlib
3import dataclasses
4import datetime
5import os
6import time
7from collections import defaultdict, Counter
8from enum import IntEnum
9from functools import lru_cache
10from typing import (
11 List,
12 Optional,
13 Callable,
14 TYPE_CHECKING,
15 Mapping,
16 Sequence,
17 cast,
18 FrozenSet,
19 Self,
20 Set,
21 Dict,
22 Iterable,
23)
25import debputy.l10n as l10n
26from debputy.commands.debputy_cmd.output import OutputStylingBase
27from debputy.dh.dh_assistant import (
28 extract_dh_addons_from_control,
29 DhSequencerData,
30 parse_drules_for_addons,
31)
32from debputy.exceptions import PureVirtualPathError
33from debputy.filesystem_scan import VirtualPathBase
34from debputy.integration_detection import determine_debputy_integration_mode
35from debputy.l10n import Translations
36from debputy.lsp.diagnostics import (
37 LintSeverity,
38 LINT_SEVERITY2LSP_SEVERITY,
39 DiagnosticData,
40 NATIVELY_LSP_SUPPORTED_SEVERITIES,
41)
42from debputy.lsp.spellchecking import Spellchecker, default_spellchecker
43from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
44from debputy.packages import SourcePackage, BinaryPackage
45from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
46from debputy.plugin.api.spec import DebputyIntegrationMode
47from debputy.util import _warn
49if TYPE_CHECKING:
50 import lsprotocol.types as types
51 from debputy.lsp.text_util import LintCapablePositionCodec
52 from debputy.lsp.maint_prefs import (
53 MaintainerPreferenceTable,
54 EffectiveFormattingPreference,
55 )
56 from debputy.lsp.vendoring._deb822_repro.locatable import (
57 Range as TERange,
58 Position as TEPosition,
59 )
60else:
61 import debputy.lsprotocol.types as types
64LinterImpl = Callable[["LintState"], None]
65FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]]
68# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
69DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset(
70 {
71 "debputy",
72 "dpkg",
73 }
74)
76# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic`
77DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset(
78 {
79 "Policy",
80 "DevRef",
81 }
82)
85def te_position_to_lsp(te_position: "TEPosition") -> types.Position:
86 return types.Position(
87 te_position.line_position,
88 te_position.cursor_position,
89 )
92def te_range_to_lsp(te_range: "TERange") -> types.Range:
93 return types.Range(
94 te_position_to_lsp(te_range.start_pos),
95 te_position_to_lsp(te_range.end_pos),
96 )
99@lru_cache
100def _check_diagnostic_source(source: str) -> None:
101 if source in DIAG_SOURCE_WITHOUT_SECTIONS:
102 return
103 parts = source.split(" ")
104 s = parts[0]
105 if s not in DIAG_SOURCE_WITH_SECTIONS: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise ValueError(
107 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py'
108 )
109 if len(parts) != 2: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 raise ValueError(
111 f'The diagnostic source "{source}" should have exactly one section associated with it.'
112 )
115@dataclasses.dataclass(slots=True)
116class DebputyMetadata:
117 debputy_integration_mode: Optional[DebputyIntegrationMode]
119 @classmethod
120 def from_data(
121 cls,
122 source_fields: Mapping[str, str],
123 dh_sequencer_data: DhSequencerData,
124 ) -> Self:
125 integration_mode = determine_debputy_integration_mode(
126 source_fields,
127 dh_sequencer_data.sequences,
128 )
129 return cls(integration_mode)
132class LintState:
134 @property
135 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
136 """The plugin features known to the running instance of `debputy`
138 This is mostly only relevant when working with `debputy.manifest
139 """
140 raise NotImplementedError
142 @property
143 def doc_uri(self) -> str:
144 """The URI for the document being scanned.
146 This can be useful for providing related location ranges.
147 """
148 raise NotImplementedError
150 @property
151 def source_root(self) -> Optional[VirtualPathBase]:
152 """The path to the unpacked source root directory if available
154 This is the directory that would contain the `debian/` directory. Note, if you need the
155 `debian/` directory, please use `debian_dir` instead. There may be cases where the source
156 root is unavailable but the `debian/` directory is not.
157 """
158 raise NotImplementedError
160 @property
161 def debian_dir(self) -> Optional[VirtualPathBase]:
162 """The path to the `debian/` directory if available"""
163 raise NotImplementedError
165 @property
166 def path(self) -> str:
167 """The filename or path of the file being scanned.
169 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed
170 that the file on the file system (even if accessible) has correct contents. When doing diagnostics
171 for an editor, the editor often requests diagnostics for unsaved changes.
172 """
173 raise NotImplementedError
175 @property
176 def content(self) -> str:
177 """The full contents of the file being checked"""
178 raise NotImplementedError
180 @property
181 def lines(self) -> List[str]:
182 # FIXME: Replace with `Sequence[str]` if possible
183 """The contents of the file being checked as a list of lines
185 Do **not** change the contents of this list as it may be cached.
186 """
187 raise NotImplementedError
189 @property
190 def position_codec(self) -> "LintCapablePositionCodec":
191 raise NotImplementedError
193 @property
194 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
195 """The contents of the file being checked as a parsed deb822 file
197 This can sometimes use a cached version of the parsed file and is therefore preferable to
198 parsing the file manually from `content` or `lines`.
200 Do **not** change the contents of this as it may be cached.
201 """
202 raise NotImplementedError
204 @property
205 def source_package(self) -> Optional[SourcePackage]:
206 """The source package (source stanza of `debian/control`).
208 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the
209 source stanza is not available.
210 """
211 raise NotImplementedError
213 @property
214 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
215 """The binary packages (the Package stanzas of `debian/control`).
217 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are
218 available.
219 """
220 raise NotImplementedError
222 @property
223 def maint_preference_table(self) -> "MaintainerPreferenceTable":
224 # TODO: Remove (unused)
225 raise NotImplementedError
227 @property
228 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]:
229 raise NotImplementedError
231 @property
232 def debputy_metadata(self) -> DebputyMetadata:
233 """Information about `debputy` usage such as which integration mode is being used."""
234 src_pkg = self.source_package
235 src_fields = src_pkg.fields if src_pkg else {}
236 return DebputyMetadata.from_data(
237 src_fields,
238 self.dh_sequencer_data,
239 )
241 @property
242 def dh_sequencer_data(self) -> DhSequencerData:
243 """Information about the use of the `dh` sequencer
245 This includes which sequences are being used and whether the `dh` sequencer is used at all.
246 """
247 raise NotImplementedError
249 def spellchecker(self) -> "Spellchecker":
250 checker = default_spellchecker()
251 ignored_words = set()
252 source_package = self.source_package
253 binary_packages = self.binary_packages
254 if source_package and source_package.fields.get("Source") is not None:
255 ignored_words.add(source_package.fields.get("Source"))
256 if binary_packages:
257 ignored_words.update(binary_packages)
258 return checker.context_ignored_words(ignored_words)
260 def translation(self, domain: str) -> Translations:
261 return l10n.translation(
262 domain,
263 )
265 def emit_diagnostic(
266 self,
267 text_range: "TERange",
268 diagnostic_msg: str,
269 severity: LintSeverity,
270 authority_reference: str,
271 *,
272 quickfixes: Optional[List[dict]] = None,
273 tags: Optional[List[types.DiagnosticTag]] = None,
274 related_information: Optional[List[types.DiagnosticRelatedInformation]] = None,
275 diagnostic_applies_to_another_file: Optional[str] = None,
276 enable_non_interactive_auto_fix: bool = True,
277 ) -> None:
278 """Emit a diagnostic for an issue detected in the current file.
280 :param text_range: The text range to highlight in the file.
281 :param diagnostic_msg: The message to show to the user for this diagnostic
282 :param severity: The severity to associate with the diagnostic.
283 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of.
285 Use:
286 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1
287 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2
288 * "debputy" for diagnostics without a reference or where `debputy` is the authority.
289 (This is also used for cases where `debputy` filters the result. Like with spellchecking
290 via hunspell, where `debputy` provides its own ignore list on top)
292 If you need a new reference, feel free to add it to this list.
293 :param quickfixes: If provided, this is a list of possible fixes for this problem.
294 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`.
295 :param tags: TODO: Not yet specified (currently uses LSP format).
296 :param related_information: TODO: Not yet specified (currently uses LSP format).
297 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via
298 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false
299 positives.
300 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names.
301 Leave this one at `None`, unless you know you need it.
303 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as
304 `debian/install`, etc.
305 """
306 _check_diagnostic_source(authority_reference)
307 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity]
308 diag_data: DiagnosticData = {
309 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix,
310 }
312 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES:
313 diag_data["lint_severity"] = severity
314 if quickfixes:
315 diag_data["quickfixes"] = quickfixes
316 if diagnostic_applies_to_another_file is not None:
317 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file
319 lsp_range_client_units = self.position_codec.range_to_client_units(
320 self.lines,
321 te_range_to_lsp(text_range),
322 )
324 diag = types.Diagnostic(
325 lsp_range_client_units,
326 diagnostic_msg,
327 severity=lsp_severity,
328 source=authority_reference,
329 data=diag_data if diag_data else None,
330 tags=tags,
331 related_information=related_information,
332 )
333 self._emit_diagnostic(diag)
335 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
336 raise NotImplementedError
339@dataclasses.dataclass(slots=True)
340class LintStateImpl(LintState):
341 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False)
342 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False)
343 source_root: Optional[VirtualPathBase]
344 debian_dir: Optional[VirtualPathBase]
345 path: str
346 content: str
347 lines: List[str]
348 source_package: Optional[SourcePackage] = None
349 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
350 effective_preference: Optional["EffectiveFormattingPreference"] = None
351 lint_implementation: Optional["LinterImpl"] = None
352 _parsed_cache: Optional[Deb822FileElement] = None
353 _dh_sequencer_cache: Optional[DhSequencerData] = None
354 _diagnostics: Optional[List[types.Diagnostic]] = None
356 @property
357 def doc_uri(self) -> str:
358 path = self.path
359 abs_path = os.path.join(os.path.curdir, path)
360 return f"file://{abs_path}"
362 @property
363 def position_codec(self) -> "LintCapablePositionCodec":
364 return LINTER_POSITION_CODEC
366 @property
367 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
368 cache = self._parsed_cache
369 if cache is None: 369 ↛ 376line 369 didn't jump to line 376 because the condition on line 369 was always true
370 cache = parse_deb822_file(
371 self.lines,
372 accept_files_with_error_tokens=True,
373 accept_files_with_duplicated_fields=True,
374 )
375 self._parsed_cache = cache
376 return cache
378 @property
379 def dh_sequencer_data(self) -> DhSequencerData:
380 dh_sequencer_cache = self._dh_sequencer_cache
381 if dh_sequencer_cache is None: 381 ↛ 401line 381 didn't jump to line 401 because the condition on line 381 was always true
382 debian_dir = self.debian_dir
383 dh_sequences: Set[str] = set()
384 saw_dh = False
385 src_pkg = self.source_package
386 drules = debian_dir.get("rules") if debian_dir is not None else None
387 if drules and drules.is_file:
388 try:
389 with drules.open() as fd:
390 saw_dh = parse_drules_for_addons(fd, dh_sequences)
391 except PureVirtualPathError:
392 pass
393 if src_pkg:
394 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
396 dh_sequencer_cache = DhSequencerData(
397 frozenset(dh_sequences),
398 saw_dh,
399 )
400 self._dh_sequencer_cache = dh_sequencer_cache
401 return dh_sequencer_cache
403 def gather_diagnostics(self) -> List[types.Diagnostic]:
404 if self._diagnostics is not None: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 raise RuntimeError(
406 "run_diagnostics cannot be run while it is already running"
407 )
408 linter = self.lint_implementation
409 if linter is None: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 raise TypeError(
411 "run_diagnostics cannot be run:"
412 " LintState was created without a lint implementation (such as for reformat-only)"
413 )
414 self._diagnostics = diagnostics = []
416 linter(self)
418 self._diagnostics = None
419 return diagnostics
421 def clear_cache(self) -> None:
422 self._parsed_cache = None
423 self._dh_sequencer_cache = None
425 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
426 diagnostics = self._diagnostics
427 if diagnostics is None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics")
429 diagnostics.append(diagnostic)
432class LintDiagnosticResultState(IntEnum):
433 REPORTED = 1
434 MANUAL_FIXABLE = 2
435 AUTO_FIXABLE = 3
436 FIXED = 4
439@dataclasses.dataclass(slots=True, frozen=True)
440class LintDiagnosticResult:
441 diagnostic: types.Diagnostic
442 result_state: LintDiagnosticResultState
443 invalid_marker: Optional[RuntimeError]
444 is_file_level_diagnostic: bool
445 has_broken_range: bool
446 missing_severity: bool
447 discovered_in: str
448 report_for_related_file: Optional[str]
451class LintReport:
453 def __init__(self) -> None:
454 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter()
455 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = (
456 defaultdict(list)
457 )
458 self.number_of_invalid_diagnostics: int = 0
459 self.number_of_broken_diagnostics: int = 0
460 self.lint_state: Optional[LintState] = None
461 self.start_timestamp = datetime.datetime.now()
462 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0)
463 self._timer = time.perf_counter()
465 @contextlib.contextmanager
466 def line_state(self, lint_state: LintState) -> Iterable[None]:
467 previous = self.lint_state
468 if previous is not None:
469 path = previous.path
470 duration = time.perf_counter() - self._timer
471 self.durations[path] += duration
473 self.lint_state = lint_state
475 try:
476 self._timer = time.perf_counter()
477 yield
478 finally:
479 now = time.perf_counter()
480 duration = now - self._timer
481 self.durations[lint_state.path] += duration
482 self._timer = now
483 self.lint_state = previous
485 def report_diagnostic(
486 self,
487 diagnostic: types.Diagnostic,
488 *,
489 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED,
490 in_file: Optional[str] = None,
491 ) -> None:
492 lint_state = self.lint_state
493 assert lint_state is not None
494 if in_file is None:
495 in_file = lint_state.path
496 discovered_in_file = in_file
497 severity = diagnostic.severity
498 missing_severity = False
499 error_marker: Optional[RuntimeError] = None
500 if severity is None:
501 self.number_of_invalid_diagnostics += 1
502 severity = types.DiagnosticSeverity.Warning
503 diagnostic.severity = severity
504 missing_severity = True
506 lines = lint_state.lines
507 diag_range = diagnostic.range
508 start_pos = diag_range.start
509 end_pos = diag_range.end
510 diag_data = diagnostic.data
511 if isinstance(diag_data, dict):
512 report_for_related_file = diag_data.get("report_for_related_file")
513 if report_for_related_file is None or not isinstance(
514 report_for_related_file, str
515 ):
516 report_for_related_file = None
517 else:
518 in_file = report_for_related_file
519 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations.
520 if in_file not in self.durations:
521 self.durations[in_file] = 0
522 else:
523 report_for_related_file = None
524 if report_for_related_file is not None:
525 is_file_level_diagnostic = True
526 else:
527 is_file_level_diagnostic = _is_file_level_diagnostic(
528 lines,
529 start_pos.line,
530 start_pos.character,
531 end_pos.line,
532 end_pos.character,
533 )
534 has_broken_range = not is_file_level_diagnostic and (
535 end_pos.line > len(lines) or start_pos.line < 0
536 )
538 if has_broken_range or missing_severity:
539 error_marker = RuntimeError("Registration Marker for invalid diagnostic")
541 diagnostic_result = LintDiagnosticResult(
542 diagnostic,
543 result_state,
544 error_marker,
545 is_file_level_diagnostic,
546 has_broken_range,
547 missing_severity,
548 report_for_related_file=report_for_related_file,
549 discovered_in=discovered_in_file,
550 )
552 self.diagnostics_by_file[in_file].append(diagnostic_result)
553 self.diagnostics_count[severity] += 1
554 self.process_diagnostic(in_file, lint_state, diagnostic_result)
556 def process_diagnostic(
557 self,
558 filename: str,
559 lint_state: LintState,
560 diagnostic_result: LintDiagnosticResult,
561 ) -> None:
562 # Subclass hook
563 pass
565 def finish_report(self) -> None:
566 # Subclass hook
567 pass
570_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = {
571 types.DiagnosticSeverity.Error: "error",
572 types.DiagnosticSeverity.Warning: "warning",
573 types.DiagnosticSeverity.Information: "informational",
574 types.DiagnosticSeverity.Hint: "pedantic",
575}
578_TERM_SEVERITY2TAG = { 578 ↛ exitline 578 didn't jump to the function exit
579 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored(
580 lint_tag if lint_tag else "error",
581 fg="red",
582 bg="black",
583 style="bold",
584 ),
585 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored(
586 lint_tag if lint_tag else "warning",
587 fg="yellow",
588 bg="black",
589 style="bold",
590 ),
591 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored(
592 lint_tag if lint_tag else "informational",
593 fg="blue",
594 bg="black",
595 style="bold",
596 ),
597 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored(
598 lint_tag if lint_tag else "pedantic",
599 fg="green",
600 bg="black",
601 style="bold",
602 ),
603}
606def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity:
607 lint_tag: Optional[LintSeverity] = None
608 if isinstance(diagnostic.data, dict):
609 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity"))
611 if lint_tag is not None:
612 return lint_tag
613 severity = diagnostic.severity
614 if severity is None:
615 return "warning"
616 return _LS2DEBPUTY_SEVERITY.get(severity, "warning")
619class TermLintReport(LintReport):
621 def __init__(self, fo: OutputStylingBase) -> None:
622 super().__init__()
623 self.fo = fo
625 def finish_report(self) -> None:
626 # Nothing to do for now
627 pass
629 def process_diagnostic(
630 self,
631 filename: str,
632 lint_state: LintState,
633 diagnostic_result: LintDiagnosticResult,
634 ) -> None:
635 diagnostic = diagnostic_result.diagnostic
636 fo = self.fo
637 severity = diagnostic.severity
638 assert severity is not None
639 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED:
640 tag_unresolved = _TERM_SEVERITY2TAG[severity]
641 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic)
642 tag = tag_unresolved(fo, lint_tag)
643 else:
644 tag = fo.colored(
645 "auto-fixing",
646 fg="magenta",
647 bg="black",
648 style="bold",
649 )
651 if diagnostic_result.is_file_level_diagnostic:
652 start_line = 0
653 start_position = 0
654 end_line = 0
655 end_position = 0
656 else:
657 start_line = diagnostic.range.start.line
658 start_position = diagnostic.range.start.character
659 end_line = diagnostic.range.end.line
660 end_position = diagnostic.range.end.character
662 authority = diagnostic.source
663 assert authority is not None
664 diag_tags = f" [{authority}]"
665 lines = lint_state.lines
666 line_no_width = len(str(len(lines)))
668 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE:
669 diag_tags += "[Correctable via --auto-fix]"
670 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE:
671 diag_tags += "[LSP interactive quickfix]"
673 code = f"[{diagnostic.code}]: " if diagnostic.code else ""
674 msg = f"{code}{diagnostic.message}"
676 print(
677 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}",
678 )
679 if diagnostic_result.missing_severity:
680 _warn(
681 " This warning did not have an explicit severity; Used Warning as a fallback!"
682 )
683 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED:
684 # If it is fixed, there is no reason to show additional context.
685 return
686 if diagnostic_result.is_file_level_diagnostic:
687 print(" File-level diagnostic")
688 return
689 if diagnostic_result.has_broken_range:
690 _warn(
691 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..."
692 )
693 return
694 lines_to_print = _lines_to_print(diagnostic.range)
695 for line_no in range(start_line, start_line + lines_to_print):
696 line = _highlight_range(fo, lines[line_no], line_no, diagnostic.range)
697 print(f" {line_no+1:{line_no_width}}: {line}")
700class LinterPositionCodec:
702 def client_num_units(self, chars: str):
703 return len(chars)
705 def position_from_client_units(
706 self,
707 lines: List[str],
708 position: types.Position,
709 ) -> types.Position:
711 if len(lines) == 0:
712 return types.Position(0, 0)
713 if position.line >= len(lines):
714 return types.Position(len(lines) - 1, self.client_num_units(lines[-1]))
715 return position
717 def position_to_client_units(
718 self,
719 _lines: List[str],
720 position: types.Position,
721 ) -> types.Position:
722 return position
724 def range_from_client_units(
725 self, _lines: List[str], range: types.Range
726 ) -> types.Range:
727 return range
729 def range_to_client_units(
730 self, _lines: List[str], range: types.Range
731 ) -> types.Range:
732 return range
735LINTER_POSITION_CODEC = LinterPositionCodec()
738def _lines_to_print(range_: types.Range) -> int:
739 count = range_.end.line - range_.start.line
740 if range_.end.character > 0:
741 count += 1
742 return count
745def _highlight_range(
746 fo: OutputStylingBase,
747 line: str,
748 line_no: int,
749 range_: types.Range,
750) -> str:
751 line_wo_nl = line.rstrip("\r\n")
752 start_pos = 0
753 prefix = ""
754 suffix = ""
755 if line_no == range_.start.line:
756 start_pos = range_.start.character
757 prefix = line_wo_nl[0:start_pos]
758 if line_no == range_.end.line:
759 end_pos = range_.end.character
760 suffix = line_wo_nl[end_pos:]
761 else:
762 end_pos = len(line_wo_nl)
764 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold")
766 return prefix + marked_part + suffix
769def _is_file_level_diagnostic(
770 lines: List[str],
771 start_line: int,
772 start_position: int,
773 end_line: int,
774 end_position: int,
775) -> bool:
776 if start_line != 0 or start_position != 0:
777 return False
778 line_count = len(lines)
779 if end_line + 1 == line_count and end_position == 0:
780 return True
781 return end_line == line_count and line_count and end_position == len(lines[-1])