Coverage for src/debputy/linting/lint_util.py: 57%

366 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import collections 

2import contextlib 

3import dataclasses 

4import datetime 

5import os 

6import time 

7from collections import defaultdict, Counter 

8from enum import IntEnum 

9from functools import lru_cache 

10from typing import ( 

11 List, 

12 Optional, 

13 Callable, 

14 TYPE_CHECKING, 

15 Mapping, 

16 Sequence, 

17 cast, 

18 FrozenSet, 

19 Self, 

20 Set, 

21 Dict, 

22 Iterable, 

23) 

24 

25import debputy.l10n as l10n 

26from debputy.commands.debputy_cmd.output import OutputStylingBase 

27from debputy.dh.dh_assistant import ( 

28 extract_dh_addons_from_control, 

29 DhSequencerData, 

30 parse_drules_for_addons, 

31) 

32from debputy.exceptions import PureVirtualPathError 

33from debputy.filesystem_scan import VirtualPathBase 

34from debputy.integration_detection import determine_debputy_integration_mode 

35from debputy.l10n import Translations 

36from debputy.lsp.diagnostics import ( 

37 LintSeverity, 

38 LINT_SEVERITY2LSP_SEVERITY, 

39 DiagnosticData, 

40 NATIVELY_LSP_SUPPORTED_SEVERITIES, 

41) 

42from debputy.lsp.spellchecking import Spellchecker, default_spellchecker 

43from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

44from debputy.packages import SourcePackage, BinaryPackage 

45from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

46from debputy.plugin.api.spec import DebputyIntegrationMode 

47from debputy.util import _warn 

48 

49if TYPE_CHECKING: 

50 import lsprotocol.types as types 

51 from debputy.lsp.text_util import LintCapablePositionCodec 

52 from debputy.lsp.maint_prefs import ( 

53 MaintainerPreferenceTable, 

54 EffectiveFormattingPreference, 

55 ) 

56 from debputy.lsp.vendoring._deb822_repro.locatable import ( 

57 Range as TERange, 

58 Position as TEPosition, 

59 ) 

60else: 

61 import debputy.lsprotocol.types as types 

62 

63 

64LinterImpl = Callable[["LintState"], None] 

65FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]] 

66 

67 

68# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

69DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset( 

70 { 

71 "debputy", 

72 "dpkg", 

73 } 

74) 

75 

76# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

77DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset( 

78 { 

79 "Policy", 

80 "DevRef", 

81 } 

82) 

83 

84 

85def te_position_to_lsp(te_position: "TEPosition") -> types.Position: 

86 return types.Position( 

87 te_position.line_position, 

88 te_position.cursor_position, 

89 ) 

90 

91 

92def te_range_to_lsp(te_range: "TERange") -> types.Range: 

93 return types.Range( 

94 te_position_to_lsp(te_range.start_pos), 

95 te_position_to_lsp(te_range.end_pos), 

96 ) 

97 

98 

99@lru_cache 

100def _check_diagnostic_source(source: str) -> None: 

101 if source in DIAG_SOURCE_WITHOUT_SECTIONS: 

102 return 

103 parts = source.split(" ") 

104 s = parts[0] 

105 if s not in DIAG_SOURCE_WITH_SECTIONS: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise ValueError( 

107 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py' 

108 ) 

109 if len(parts) != 2: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 raise ValueError( 

111 f'The diagnostic source "{source}" should have exactly one section associated with it.' 

112 ) 

113 

114 

115@dataclasses.dataclass(slots=True) 

116class DebputyMetadata: 

117 debputy_integration_mode: Optional[DebputyIntegrationMode] 

118 

119 @classmethod 

120 def from_data( 

121 cls, 

122 source_fields: Mapping[str, str], 

123 dh_sequencer_data: DhSequencerData, 

124 ) -> Self: 

125 integration_mode = determine_debputy_integration_mode( 

126 source_fields, 

127 dh_sequencer_data.sequences, 

128 ) 

129 return cls(integration_mode) 

130 

131 

132class LintState: 

133 

134 @property 

135 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

136 """The plugin features known to the running instance of `debputy` 

137 

138 This is mostly only relevant when working with `debputy.manifest 

139 """ 

140 raise NotImplementedError 

141 

142 @property 

143 def doc_uri(self) -> str: 

144 """The URI for the document being scanned. 

145 

146 This can be useful for providing related location ranges. 

147 """ 

148 raise NotImplementedError 

149 

150 @property 

151 def source_root(self) -> Optional[VirtualPathBase]: 

152 """The path to the unpacked source root directory if available 

153 

154 This is the directory that would contain the `debian/` directory. Note, if you need the 

155 `debian/` directory, please use `debian_dir` instead. There may be cases where the source 

156 root is unavailable but the `debian/` directory is not. 

157 """ 

158 raise NotImplementedError 

159 

160 @property 

161 def debian_dir(self) -> Optional[VirtualPathBase]: 

162 """The path to the `debian/` directory if available""" 

163 raise NotImplementedError 

164 

165 @property 

166 def path(self) -> str: 

167 """The filename or path of the file being scanned. 

168 

169 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed 

170 that the file on the file system (even if accessible) has correct contents. When doing diagnostics 

171 for an editor, the editor often requests diagnostics for unsaved changes. 

172 """ 

173 raise NotImplementedError 

174 

175 @property 

176 def content(self) -> str: 

177 """The full contents of the file being checked""" 

178 raise NotImplementedError 

179 

180 @property 

181 def lines(self) -> List[str]: 

182 # FIXME: Replace with `Sequence[str]` if possible 

183 """The contents of the file being checked as a list of lines 

184 

185 Do **not** change the contents of this list as it may be cached. 

186 """ 

187 raise NotImplementedError 

188 

189 @property 

190 def position_codec(self) -> "LintCapablePositionCodec": 

191 raise NotImplementedError 

192 

193 @property 

194 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

195 """The contents of the file being checked as a parsed deb822 file 

196 

197 This can sometimes use a cached version of the parsed file and is therefore preferable to 

198 parsing the file manually from `content` or `lines`. 

199 

200 Do **not** change the contents of this as it may be cached. 

201 """ 

202 raise NotImplementedError 

203 

204 @property 

205 def source_package(self) -> Optional[SourcePackage]: 

206 """The source package (source stanza of `debian/control`). 

207 

208 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the 

209 source stanza is not available. 

210 """ 

211 raise NotImplementedError 

212 

213 @property 

214 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

215 """The binary packages (the Package stanzas of `debian/control`). 

216 

217 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are 

218 available. 

219 """ 

220 raise NotImplementedError 

221 

222 @property 

223 def maint_preference_table(self) -> "MaintainerPreferenceTable": 

224 # TODO: Remove (unused) 

225 raise NotImplementedError 

226 

227 @property 

228 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]: 

229 raise NotImplementedError 

230 

231 @property 

232 def debputy_metadata(self) -> DebputyMetadata: 

233 """Information about `debputy` usage such as which integration mode is being used.""" 

234 src_pkg = self.source_package 

235 src_fields = src_pkg.fields if src_pkg else {} 

236 return DebputyMetadata.from_data( 

237 src_fields, 

238 self.dh_sequencer_data, 

239 ) 

240 

241 @property 

242 def dh_sequencer_data(self) -> DhSequencerData: 

243 """Information about the use of the `dh` sequencer 

244 

245 This includes which sequences are being used and whether the `dh` sequencer is used at all. 

246 """ 

247 raise NotImplementedError 

248 

249 def spellchecker(self) -> "Spellchecker": 

250 checker = default_spellchecker() 

251 ignored_words = set() 

252 source_package = self.source_package 

253 binary_packages = self.binary_packages 

254 if source_package and source_package.fields.get("Source") is not None: 

255 ignored_words.add(source_package.fields.get("Source")) 

256 if binary_packages: 

257 ignored_words.update(binary_packages) 

258 return checker.context_ignored_words(ignored_words) 

259 

260 def translation(self, domain: str) -> Translations: 

261 return l10n.translation( 

262 domain, 

263 ) 

264 

265 def emit_diagnostic( 

266 self, 

267 text_range: "TERange", 

268 diagnostic_msg: str, 

269 severity: LintSeverity, 

270 authority_reference: str, 

271 *, 

272 quickfixes: Optional[List[dict]] = None, 

273 tags: Optional[List[types.DiagnosticTag]] = None, 

274 related_information: Optional[List[types.DiagnosticRelatedInformation]] = None, 

275 diagnostic_applies_to_another_file: Optional[str] = None, 

276 enable_non_interactive_auto_fix: bool = True, 

277 ) -> None: 

278 """Emit a diagnostic for an issue detected in the current file. 

279 

280 :param text_range: The text range to highlight in the file. 

281 :param diagnostic_msg: The message to show to the user for this diagnostic 

282 :param severity: The severity to associate with the diagnostic. 

283 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of. 

284 

285 Use: 

286 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1 

287 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2 

288 * "debputy" for diagnostics without a reference or where `debputy` is the authority. 

289 (This is also used for cases where `debputy` filters the result. Like with spellchecking 

290 via hunspell, where `debputy` provides its own ignore list on top) 

291 

292 If you need a new reference, feel free to add it to this list. 

293 :param quickfixes: If provided, this is a list of possible fixes for this problem. 

294 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`. 

295 :param tags: TODO: Not yet specified (currently uses LSP format). 

296 :param related_information: TODO: Not yet specified (currently uses LSP format). 

297 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via 

298 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false 

299 positives. 

300 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names. 

301 Leave this one at `None`, unless you know you need it. 

302 

303 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as 

304 `debian/install`, etc. 

305 """ 

306 _check_diagnostic_source(authority_reference) 

307 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity] 

308 diag_data: DiagnosticData = { 

309 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix, 

310 } 

311 

312 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES: 

313 diag_data["lint_severity"] = severity 

314 if quickfixes: 

315 diag_data["quickfixes"] = quickfixes 

316 if diagnostic_applies_to_another_file is not None: 

317 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file 

318 

319 lsp_range_client_units = self.position_codec.range_to_client_units( 

320 self.lines, 

321 te_range_to_lsp(text_range), 

322 ) 

323 

324 diag = types.Diagnostic( 

325 lsp_range_client_units, 

326 diagnostic_msg, 

327 severity=lsp_severity, 

328 source=authority_reference, 

329 data=diag_data if diag_data else None, 

330 tags=tags, 

331 related_information=related_information, 

332 ) 

333 self._emit_diagnostic(diag) 

334 

335 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

336 raise NotImplementedError 

337 

338 

339@dataclasses.dataclass(slots=True) 

340class LintStateImpl(LintState): 

341 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False) 

342 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False) 

343 source_root: Optional[VirtualPathBase] 

344 debian_dir: Optional[VirtualPathBase] 

345 path: str 

346 content: str 

347 lines: List[str] 

348 source_package: Optional[SourcePackage] = None 

349 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

350 effective_preference: Optional["EffectiveFormattingPreference"] = None 

351 lint_implementation: Optional["LinterImpl"] = None 

352 _parsed_cache: Optional[Deb822FileElement] = None 

353 _dh_sequencer_cache: Optional[DhSequencerData] = None 

354 _diagnostics: Optional[List[types.Diagnostic]] = None 

355 

356 @property 

357 def doc_uri(self) -> str: 

358 path = self.path 

359 abs_path = os.path.join(os.path.curdir, path) 

360 return f"file://{abs_path}" 

361 

362 @property 

363 def position_codec(self) -> "LintCapablePositionCodec": 

364 return LINTER_POSITION_CODEC 

365 

366 @property 

367 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

368 cache = self._parsed_cache 

369 if cache is None: 369 ↛ 376line 369 didn't jump to line 376 because the condition on line 369 was always true

370 cache = parse_deb822_file( 

371 self.lines, 

372 accept_files_with_error_tokens=True, 

373 accept_files_with_duplicated_fields=True, 

374 ) 

375 self._parsed_cache = cache 

376 return cache 

377 

378 @property 

379 def dh_sequencer_data(self) -> DhSequencerData: 

380 dh_sequencer_cache = self._dh_sequencer_cache 

381 if dh_sequencer_cache is None: 381 ↛ 401line 381 didn't jump to line 401 because the condition on line 381 was always true

382 debian_dir = self.debian_dir 

383 dh_sequences: Set[str] = set() 

384 saw_dh = False 

385 src_pkg = self.source_package 

386 drules = debian_dir.get("rules") if debian_dir is not None else None 

387 if drules and drules.is_file: 

388 try: 

389 with drules.open() as fd: 

390 saw_dh = parse_drules_for_addons(fd, dh_sequences) 

391 except PureVirtualPathError: 

392 pass 

393 if src_pkg: 

394 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

395 

396 dh_sequencer_cache = DhSequencerData( 

397 frozenset(dh_sequences), 

398 saw_dh, 

399 ) 

400 self._dh_sequencer_cache = dh_sequencer_cache 

401 return dh_sequencer_cache 

402 

403 def gather_diagnostics(self) -> List[types.Diagnostic]: 

404 if self._diagnostics is not None: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 raise RuntimeError( 

406 "run_diagnostics cannot be run while it is already running" 

407 ) 

408 linter = self.lint_implementation 

409 if linter is None: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 raise TypeError( 

411 "run_diagnostics cannot be run:" 

412 " LintState was created without a lint implementation (such as for reformat-only)" 

413 ) 

414 self._diagnostics = diagnostics = [] 

415 

416 linter(self) 

417 

418 self._diagnostics = None 

419 return diagnostics 

420 

421 def clear_cache(self) -> None: 

422 self._parsed_cache = None 

423 self._dh_sequencer_cache = None 

424 

425 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

426 diagnostics = self._diagnostics 

427 if diagnostics is None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics") 

429 diagnostics.append(diagnostic) 

430 

431 

432class LintDiagnosticResultState(IntEnum): 

433 REPORTED = 1 

434 MANUAL_FIXABLE = 2 

435 AUTO_FIXABLE = 3 

436 FIXED = 4 

437 

438 

439@dataclasses.dataclass(slots=True, frozen=True) 

440class LintDiagnosticResult: 

441 diagnostic: types.Diagnostic 

442 result_state: LintDiagnosticResultState 

443 invalid_marker: Optional[RuntimeError] 

444 is_file_level_diagnostic: bool 

445 has_broken_range: bool 

446 missing_severity: bool 

447 discovered_in: str 

448 report_for_related_file: Optional[str] 

449 

450 

451class LintReport: 

452 

453 def __init__(self) -> None: 

454 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter() 

455 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = ( 

456 defaultdict(list) 

457 ) 

458 self.number_of_invalid_diagnostics: int = 0 

459 self.number_of_broken_diagnostics: int = 0 

460 self.lint_state: Optional[LintState] = None 

461 self.start_timestamp = datetime.datetime.now() 

462 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0) 

463 self._timer = time.perf_counter() 

464 

465 @contextlib.contextmanager 

466 def line_state(self, lint_state: LintState) -> Iterable[None]: 

467 previous = self.lint_state 

468 if previous is not None: 

469 path = previous.path 

470 duration = time.perf_counter() - self._timer 

471 self.durations[path] += duration 

472 

473 self.lint_state = lint_state 

474 

475 try: 

476 self._timer = time.perf_counter() 

477 yield 

478 finally: 

479 now = time.perf_counter() 

480 duration = now - self._timer 

481 self.durations[lint_state.path] += duration 

482 self._timer = now 

483 self.lint_state = previous 

484 

485 def report_diagnostic( 

486 self, 

487 diagnostic: types.Diagnostic, 

488 *, 

489 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED, 

490 in_file: Optional[str] = None, 

491 ) -> None: 

492 lint_state = self.lint_state 

493 assert lint_state is not None 

494 if in_file is None: 

495 in_file = lint_state.path 

496 discovered_in_file = in_file 

497 severity = diagnostic.severity 

498 missing_severity = False 

499 error_marker: Optional[RuntimeError] = None 

500 if severity is None: 

501 self.number_of_invalid_diagnostics += 1 

502 severity = types.DiagnosticSeverity.Warning 

503 diagnostic.severity = severity 

504 missing_severity = True 

505 

506 lines = lint_state.lines 

507 diag_range = diagnostic.range 

508 start_pos = diag_range.start 

509 end_pos = diag_range.end 

510 diag_data = diagnostic.data 

511 if isinstance(diag_data, dict): 

512 report_for_related_file = diag_data.get("report_for_related_file") 

513 if report_for_related_file is None or not isinstance( 

514 report_for_related_file, str 

515 ): 

516 report_for_related_file = None 

517 else: 

518 in_file = report_for_related_file 

519 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations. 

520 if in_file not in self.durations: 

521 self.durations[in_file] = 0 

522 else: 

523 report_for_related_file = None 

524 if report_for_related_file is not None: 

525 is_file_level_diagnostic = True 

526 else: 

527 is_file_level_diagnostic = _is_file_level_diagnostic( 

528 lines, 

529 start_pos.line, 

530 start_pos.character, 

531 end_pos.line, 

532 end_pos.character, 

533 ) 

534 has_broken_range = not is_file_level_diagnostic and ( 

535 end_pos.line > len(lines) or start_pos.line < 0 

536 ) 

537 

538 if has_broken_range or missing_severity: 

539 error_marker = RuntimeError("Registration Marker for invalid diagnostic") 

540 

541 diagnostic_result = LintDiagnosticResult( 

542 diagnostic, 

543 result_state, 

544 error_marker, 

545 is_file_level_diagnostic, 

546 has_broken_range, 

547 missing_severity, 

548 report_for_related_file=report_for_related_file, 

549 discovered_in=discovered_in_file, 

550 ) 

551 

552 self.diagnostics_by_file[in_file].append(diagnostic_result) 

553 self.diagnostics_count[severity] += 1 

554 self.process_diagnostic(in_file, lint_state, diagnostic_result) 

555 

556 def process_diagnostic( 

557 self, 

558 filename: str, 

559 lint_state: LintState, 

560 diagnostic_result: LintDiagnosticResult, 

561 ) -> None: 

562 # Subclass hook 

563 pass 

564 

565 def finish_report(self) -> None: 

566 # Subclass hook 

567 pass 

568 

569 

570_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = { 

571 types.DiagnosticSeverity.Error: "error", 

572 types.DiagnosticSeverity.Warning: "warning", 

573 types.DiagnosticSeverity.Information: "informational", 

574 types.DiagnosticSeverity.Hint: "pedantic", 

575} 

576 

577 

578_TERM_SEVERITY2TAG = { 578 ↛ exitline 578 didn't jump to the function exit

579 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored( 

580 lint_tag if lint_tag else "error", 

581 fg="red", 

582 bg="black", 

583 style="bold", 

584 ), 

585 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored( 

586 lint_tag if lint_tag else "warning", 

587 fg="yellow", 

588 bg="black", 

589 style="bold", 

590 ), 

591 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored( 

592 lint_tag if lint_tag else "informational", 

593 fg="blue", 

594 bg="black", 

595 style="bold", 

596 ), 

597 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored( 

598 lint_tag if lint_tag else "pedantic", 

599 fg="green", 

600 bg="black", 

601 style="bold", 

602 ), 

603} 

604 

605 

606def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity: 

607 lint_tag: Optional[LintSeverity] = None 

608 if isinstance(diagnostic.data, dict): 

609 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity")) 

610 

611 if lint_tag is not None: 

612 return lint_tag 

613 severity = diagnostic.severity 

614 if severity is None: 

615 return "warning" 

616 return _LS2DEBPUTY_SEVERITY.get(severity, "warning") 

617 

618 

619class TermLintReport(LintReport): 

620 

621 def __init__(self, fo: OutputStylingBase) -> None: 

622 super().__init__() 

623 self.fo = fo 

624 

625 def finish_report(self) -> None: 

626 # Nothing to do for now 

627 pass 

628 

629 def process_diagnostic( 

630 self, 

631 filename: str, 

632 lint_state: LintState, 

633 diagnostic_result: LintDiagnosticResult, 

634 ) -> None: 

635 diagnostic = diagnostic_result.diagnostic 

636 fo = self.fo 

637 severity = diagnostic.severity 

638 assert severity is not None 

639 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED: 

640 tag_unresolved = _TERM_SEVERITY2TAG[severity] 

641 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic) 

642 tag = tag_unresolved(fo, lint_tag) 

643 else: 

644 tag = fo.colored( 

645 "auto-fixing", 

646 fg="magenta", 

647 bg="black", 

648 style="bold", 

649 ) 

650 

651 if diagnostic_result.is_file_level_diagnostic: 

652 start_line = 0 

653 start_position = 0 

654 end_line = 0 

655 end_position = 0 

656 else: 

657 start_line = diagnostic.range.start.line 

658 start_position = diagnostic.range.start.character 

659 end_line = diagnostic.range.end.line 

660 end_position = diagnostic.range.end.character 

661 

662 authority = diagnostic.source 

663 assert authority is not None 

664 diag_tags = f" [{authority}]" 

665 lines = lint_state.lines 

666 line_no_width = len(str(len(lines))) 

667 

668 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE: 

669 diag_tags += "[Correctable via --auto-fix]" 

670 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE: 

671 diag_tags += "[LSP interactive quickfix]" 

672 

673 code = f"[{diagnostic.code}]: " if diagnostic.code else "" 

674 msg = f"{code}{diagnostic.message}" 

675 

676 print( 

677 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}", 

678 ) 

679 if diagnostic_result.missing_severity: 

680 _warn( 

681 " This warning did not have an explicit severity; Used Warning as a fallback!" 

682 ) 

683 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED: 

684 # If it is fixed, there is no reason to show additional context. 

685 return 

686 if diagnostic_result.is_file_level_diagnostic: 

687 print(" File-level diagnostic") 

688 return 

689 if diagnostic_result.has_broken_range: 

690 _warn( 

691 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..." 

692 ) 

693 return 

694 lines_to_print = _lines_to_print(diagnostic.range) 

695 for line_no in range(start_line, start_line + lines_to_print): 

696 line = _highlight_range(fo, lines[line_no], line_no, diagnostic.range) 

697 print(f" {line_no+1:{line_no_width}}: {line}") 

698 

699 

700class LinterPositionCodec: 

701 

702 def client_num_units(self, chars: str): 

703 return len(chars) 

704 

705 def position_from_client_units( 

706 self, 

707 lines: List[str], 

708 position: types.Position, 

709 ) -> types.Position: 

710 

711 if len(lines) == 0: 

712 return types.Position(0, 0) 

713 if position.line >= len(lines): 

714 return types.Position(len(lines) - 1, self.client_num_units(lines[-1])) 

715 return position 

716 

717 def position_to_client_units( 

718 self, 

719 _lines: List[str], 

720 position: types.Position, 

721 ) -> types.Position: 

722 return position 

723 

724 def range_from_client_units( 

725 self, _lines: List[str], range: types.Range 

726 ) -> types.Range: 

727 return range 

728 

729 def range_to_client_units( 

730 self, _lines: List[str], range: types.Range 

731 ) -> types.Range: 

732 return range 

733 

734 

735LINTER_POSITION_CODEC = LinterPositionCodec() 

736 

737 

738def _lines_to_print(range_: types.Range) -> int: 

739 count = range_.end.line - range_.start.line 

740 if range_.end.character > 0: 

741 count += 1 

742 return count 

743 

744 

745def _highlight_range( 

746 fo: OutputStylingBase, 

747 line: str, 

748 line_no: int, 

749 range_: types.Range, 

750) -> str: 

751 line_wo_nl = line.rstrip("\r\n") 

752 start_pos = 0 

753 prefix = "" 

754 suffix = "" 

755 if line_no == range_.start.line: 

756 start_pos = range_.start.character 

757 prefix = line_wo_nl[0:start_pos] 

758 if line_no == range_.end.line: 

759 end_pos = range_.end.character 

760 suffix = line_wo_nl[end_pos:] 

761 else: 

762 end_pos = len(line_wo_nl) 

763 

764 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold") 

765 

766 return prefix + marked_part + suffix 

767 

768 

769def _is_file_level_diagnostic( 

770 lines: List[str], 

771 start_line: int, 

772 start_position: int, 

773 end_line: int, 

774 end_position: int, 

775) -> bool: 

776 if start_line != 0 or start_position != 0: 

777 return False 

778 line_count = len(lines) 

779 if end_line + 1 == line_count and end_position == 0: 

780 return True 

781 return end_line == line_count and line_count and end_position == len(lines[-1])