Coverage for src/debputy/linting/lint_util.py: 57%

392 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import collections 

2import contextlib 

3import dataclasses 

4import datetime 

5import os 

6import time 

7from collections import defaultdict, Counter 

8from enum import IntEnum 

9from functools import lru_cache 

10from typing import ( 

11 List, 

12 Optional, 

13 Callable, 

14 TYPE_CHECKING, 

15 Mapping, 

16 Sequence, 

17 cast, 

18 FrozenSet, 

19 Self, 

20 Set, 

21 Dict, 

22 Iterable, 

23 Awaitable, 

24 AsyncIterable, 

25) 

26 

27import debputy.l10n as l10n 

28from debputy.commands.debputy_cmd.output import OutputStylingBase 

29from debputy.dh.dh_assistant import ( 

30 extract_dh_addons_from_control, 

31 DhSequencerData, 

32 parse_drules_for_addons, 

33) 

34from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError 

35from debputy.filesystem_scan import VirtualPathBase 

36from debputy.integration_detection import determine_debputy_integration_mode 

37from debputy.l10n import Translations 

38from debputy.lsp.diagnostics import ( 

39 LintSeverity, 

40 LINT_SEVERITY2LSP_SEVERITY, 

41 DiagnosticData, 

42 NATIVELY_LSP_SUPPORTED_SEVERITIES, 

43) 

44from debputy.lsp.spellchecking import Spellchecker, default_spellchecker 

45from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

46from debputy.packages import SourcePackage, BinaryPackage 

47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

48from debputy.plugin.api.spec import DebputyIntegrationMode 

49from debputy.util import _warn, T 

50 

51if TYPE_CHECKING: 

52 import lsprotocol.types as types 

53 from debputy.lsp.text_util import LintCapablePositionCodec 

54 from debputy.lsp.maint_prefs import ( 

55 MaintainerPreferenceTable, 

56 EffectiveFormattingPreference, 

57 ) 

58 from debputy.lsp.vendoring._deb822_repro.locatable import ( 

59 Range as TERange, 

60 Position as TEPosition, 

61 ) 

62else: 

63 import debputy.lsprotocol.types as types 

64 

65 

66AsyncLinterImpl = Callable[["LintState"], Awaitable[None]] 

67FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]] 

68 

69 

70class AbortTaskError(DebputyRuntimeError): 

71 pass 

72 

73 

74# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

75DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset( 

76 { 

77 "debputy", 

78 "dpkg", 

79 } 

80) 

81 

82# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

83DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset( 

84 { 

85 "Policy", 

86 "DevRef", 

87 } 

88) 

89 

90 

91def te_position_to_lsp(te_position: "TEPosition") -> types.Position: 

92 return types.Position( 

93 te_position.line_position, 

94 te_position.cursor_position, 

95 ) 

96 

97 

98def te_range_to_lsp(te_range: "TERange") -> types.Range: 

99 return types.Range( 

100 te_position_to_lsp(te_range.start_pos), 

101 te_position_to_lsp(te_range.end_pos), 

102 ) 

103 

104 

105@lru_cache 

106def _check_diagnostic_source(source: str) -> None: 

107 if source in DIAG_SOURCE_WITHOUT_SECTIONS: 

108 return 

109 parts = source.split(" ") 

110 s = parts[0] 

111 if s not in DIAG_SOURCE_WITH_SECTIONS: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 raise ValueError( 

113 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py' 

114 ) 

115 if len(parts) != 2: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 raise ValueError( 

117 f'The diagnostic source "{source}" should have exactly one section associated with it.' 

118 ) 

119 

120 

121@dataclasses.dataclass(slots=True) 

122class DebputyMetadata: 

123 debputy_integration_mode: Optional[DebputyIntegrationMode] 

124 

125 @classmethod 

126 def from_data( 

127 cls, 

128 source_fields: Mapping[str, str], 

129 dh_sequencer_data: DhSequencerData, 

130 ) -> Self: 

131 integration_mode = determine_debputy_integration_mode( 

132 source_fields, 

133 dh_sequencer_data.sequences, 

134 ) 

135 return cls(integration_mode) 

136 

137 

138@dataclasses.dataclass(slots=True, frozen=True) 

139class RelatedDiagnosticInformation: 

140 text_range: "TERange" 

141 message: str 

142 doc_uri: str 

143 

144 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation: 

145 return types.DiagnosticRelatedInformation( 

146 types.Location( 

147 self.doc_uri, 

148 lint_state.position_codec.range_to_client_units( 

149 lint_state.lines, 

150 te_range_to_lsp(self.text_range), 

151 ), 

152 ), 

153 self.message, 

154 ) 

155 

156 

157class LintState: 

158 

159 @property 

160 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

161 """The plugin features known to the running instance of `debputy` 

162 

163 This is mostly only relevant when working with `debputy.manifest 

164 """ 

165 raise NotImplementedError 

166 

167 @property 

168 def doc_uri(self) -> str: 

169 """The URI for the document being scanned. 

170 

171 This can be useful for providing related location ranges. 

172 """ 

173 raise NotImplementedError 

174 

175 @property 

176 def source_root(self) -> Optional[VirtualPathBase]: 

177 """The path to the unpacked source root directory if available 

178 

179 This is the directory that would contain the `debian/` directory. Note, if you need the 

180 `debian/` directory, please use `debian_dir` instead. There may be cases where the source 

181 root is unavailable but the `debian/` directory is not. 

182 """ 

183 raise NotImplementedError 

184 

185 @property 

186 def debian_dir(self) -> Optional[VirtualPathBase]: 

187 """The path to the `debian/` directory if available""" 

188 raise NotImplementedError 

189 

190 @property 

191 def path(self) -> str: 

192 """The filename or path of the file being scanned. 

193 

194 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed 

195 that the file on the file system (even if accessible) has correct contents. When doing diagnostics 

196 for an editor, the editor often requests diagnostics for unsaved changes. 

197 """ 

198 raise NotImplementedError 

199 

200 @property 

201 def content(self) -> str: 

202 """The full contents of the file being checked""" 

203 raise NotImplementedError 

204 

205 @property 

206 def lines(self) -> List[str]: 

207 # FIXME: Replace with `Sequence[str]` if possible 

208 """The contents of the file being checked as a list of lines 

209 

210 Do **not** change the contents of this list as it may be cached. 

211 """ 

212 raise NotImplementedError 

213 

214 @property 

215 def position_codec(self) -> "LintCapablePositionCodec": 

216 raise NotImplementedError 

217 

218 @property 

219 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

220 """The contents of the file being checked as a parsed deb822 file 

221 

222 This can sometimes use a cached version of the parsed file and is therefore preferable to 

223 parsing the file manually from `content` or `lines`. 

224 

225 Do **not** change the contents of this as it may be cached. 

226 """ 

227 raise NotImplementedError 

228 

229 @property 

230 def source_package(self) -> Optional[SourcePackage]: 

231 """The source package (source stanza of `debian/control`). 

232 

233 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the 

234 source stanza is not available. 

235 """ 

236 raise NotImplementedError 

237 

238 @property 

239 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

240 """The binary packages (the Package stanzas of `debian/control`). 

241 

242 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are 

243 available. 

244 """ 

245 raise NotImplementedError 

246 

247 @property 

248 def maint_preference_table(self) -> "MaintainerPreferenceTable": 

249 # TODO: Remove (unused) 

250 raise NotImplementedError 

251 

252 @property 

253 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]: 

254 raise NotImplementedError 

255 

256 @property 

257 def debputy_metadata(self) -> DebputyMetadata: 

258 """Information about `debputy` usage such as which integration mode is being used.""" 

259 src_pkg = self.source_package 

260 src_fields = src_pkg.fields if src_pkg else {} 

261 return DebputyMetadata.from_data( 

262 src_fields, 

263 self.dh_sequencer_data, 

264 ) 

265 

266 @property 

267 def dh_sequencer_data(self) -> DhSequencerData: 

268 """Information about the use of the `dh` sequencer 

269 

270 This includes which sequences are being used and whether the `dh` sequencer is used at all. 

271 """ 

272 raise NotImplementedError 

273 

274 def spellchecker(self) -> "Spellchecker": 

275 checker = default_spellchecker() 

276 ignored_words = set() 

277 source_package = self.source_package 

278 binary_packages = self.binary_packages 

279 if source_package and source_package.fields.get("Source") is not None: 

280 ignored_words.add(source_package.fields.get("Source")) 

281 if binary_packages: 

282 ignored_words.update(binary_packages) 

283 return checker.context_ignored_words(ignored_words) 

284 

285 async def slow_iter( 

286 self, 

287 iterable: Iterable[T], 

288 *, 

289 yield_every: int = 100, 

290 ) -> AsyncIterable[T]: 

291 for value in iterable: 

292 yield value 

293 

294 def translation(self, domain: str) -> Translations: 

295 return l10n.translation( 

296 domain, 

297 ) 

298 

299 def related_diagnostic_information( 

300 self, 

301 text_range: "TERange", 

302 message: str, 

303 *, 

304 doc_uri: Optional[str] = None, 

305 ) -> RelatedDiagnosticInformation: 

306 """Provide a related context for the diagnostic 

307 

308 The related diagnostic information is typically highlighted with the diagnostic. As an example, 

309 `debputy lint`'s terminal output will display the message and display the selected range after the 

310 the diagnostic itself. 

311 

312 :param text_range: The text range to highlight. 

313 :param message: The message to associate with the provided text range. 

314 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is 

315 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file 

316 for ranges passed to `emit_diagnostic`. 

317 :return: 

318 """ 

319 return RelatedDiagnosticInformation( 

320 text_range, 

321 message, 

322 doc_uri=doc_uri if doc_uri is not None else self.doc_uri, 

323 ) 

324 

325 def emit_diagnostic( 

326 self, 

327 text_range: "TERange", 

328 diagnostic_msg: str, 

329 severity: LintSeverity, 

330 authority_reference: str, 

331 *, 

332 quickfixes: Optional[List[dict]] = None, 

333 tags: Optional[List[types.DiagnosticTag]] = None, 

334 related_information: Optional[List[RelatedDiagnosticInformation]] = None, 

335 diagnostic_applies_to_another_file: Optional[str] = None, 

336 enable_non_interactive_auto_fix: bool = True, 

337 ) -> None: 

338 """Emit a diagnostic for an issue detected in the current file. 

339 

340 :param text_range: The text range to highlight in the file. 

341 :param diagnostic_msg: The message to show to the user for this diagnostic 

342 :param severity: The severity to associate with the diagnostic. 

343 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of. 

344 

345 Use: 

346 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1 

347 (replace the section number with the relevant number for your case) 

348 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2 

349 (replace the section number with the relevant number for your case) 

350 * "debputy" for diagnostics without a reference or where `debputy` is the authority. 

351 (This is also used for cases where `debputy` filters the result. Like with spellchecking 

352 via hunspell, where `debputy` provides its own ignore list on top) 

353 

354 If you need a new reference, feel free to add it to this list. 

355 :param quickfixes: If provided, this is a list of possible fixes for this problem. 

356 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`. 

357 :param tags: TODO: Not yet specified (currently uses LSP format). 

358 :param related_information: Provide additional context to the diagnostic. This can be used to define 

359 the source of a conflict. As an example, for duplicate definitions, this can be used to show where 

360 the definitions are. 

361 

362 Every item should be created via the `related_diagnostic_information` method. 

363 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via 

364 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false 

365 positives. 

366 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names. 

367 Leave this one at `None`, unless you know you need it. 

368 

369 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as 

370 `debian/install`, etc. 

371 """ 

372 _check_diagnostic_source(authority_reference) 

373 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity] 

374 diag_data: DiagnosticData = { 

375 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix, 

376 } 

377 

378 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES: 

379 diag_data["lint_severity"] = severity 

380 if quickfixes: 

381 diag_data["quickfixes"] = quickfixes 

382 if diagnostic_applies_to_another_file is not None: 

383 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file 

384 

385 lsp_range_client_units = self.position_codec.range_to_client_units( 

386 self.lines, 

387 te_range_to_lsp(text_range), 

388 ) 

389 

390 related_lsp_format = ( 

391 [i.to_lsp(self) for i in related_information] 

392 if related_information 

393 else None 

394 ) 

395 diag = types.Diagnostic( 

396 lsp_range_client_units, 

397 diagnostic_msg, 

398 severity=lsp_severity, 

399 source=authority_reference, 

400 data=diag_data if diag_data else None, 

401 tags=tags, 

402 related_information=related_lsp_format, 

403 ) 

404 self._emit_diagnostic(diag) 

405 

406 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

407 raise NotImplementedError 

408 

409 

410@dataclasses.dataclass(slots=True) 

411class LintStateImpl(LintState): 

412 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False) 

413 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False) 

414 source_root: Optional[VirtualPathBase] 

415 debian_dir: Optional[VirtualPathBase] 

416 path: str 

417 content: str 

418 lines: List[str] 

419 source_package: Optional[SourcePackage] = None 

420 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

421 effective_preference: Optional["EffectiveFormattingPreference"] = None 

422 lint_implementation: Optional["AsyncLinterImpl"] = None 

423 _parsed_cache: Optional[Deb822FileElement] = None 

424 _dh_sequencer_cache: Optional[DhSequencerData] = None 

425 _diagnostics: Optional[List[types.Diagnostic]] = None 

426 

427 @property 

428 def doc_uri(self) -> str: 

429 path = self.path 

430 abs_path = os.path.join(os.path.curdir, path) 

431 return f"file://{abs_path}" 

432 

433 @property 

434 def position_codec(self) -> "LintCapablePositionCodec": 

435 return LINTER_POSITION_CODEC 

436 

437 @property 

438 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

439 cache = self._parsed_cache 

440 if cache is None: 440 ↛ 447line 440 didn't jump to line 447 because the condition on line 440 was always true

441 cache = parse_deb822_file( 

442 self.lines, 

443 accept_files_with_error_tokens=True, 

444 accept_files_with_duplicated_fields=True, 

445 ) 

446 self._parsed_cache = cache 

447 return cache 

448 

449 @property 

450 def dh_sequencer_data(self) -> DhSequencerData: 

451 dh_sequencer_cache = self._dh_sequencer_cache 

452 if dh_sequencer_cache is None: 

453 debian_dir = self.debian_dir 

454 dh_sequences: Set[str] = set() 

455 saw_dh = False 

456 src_pkg = self.source_package 

457 drules = debian_dir.get("rules") if debian_dir is not None else None 

458 if drules and drules.is_file: 

459 try: 

460 with drules.open() as fd: 

461 saw_dh = parse_drules_for_addons(fd, dh_sequences) 

462 except PureVirtualPathError: 

463 pass 

464 if src_pkg: 

465 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

466 

467 dh_sequencer_cache = DhSequencerData( 

468 frozenset(dh_sequences), 

469 saw_dh, 

470 ) 

471 self._dh_sequencer_cache = dh_sequencer_cache 

472 return dh_sequencer_cache 

473 

474 async def gather_diagnostics(self) -> List[types.Diagnostic]: 

475 if self._diagnostics is not None: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 raise RuntimeError( 

477 "run_diagnostics cannot be run while it is already running" 

478 ) 

479 linter = self.lint_implementation 

480 if linter is None: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 raise TypeError( 

482 "run_diagnostics cannot be run:" 

483 " LintState was created without a lint implementation (such as for reformat-only)" 

484 ) 

485 self._diagnostics = diagnostics = [] 

486 

487 await linter(self) 

488 

489 self._diagnostics = None 

490 return diagnostics 

491 

492 def clear_cache(self) -> None: 

493 self._parsed_cache = None 

494 self._dh_sequencer_cache = None 

495 

496 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

497 diagnostics = self._diagnostics 

498 if diagnostics is None: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics") 

500 diagnostics.append(diagnostic) 

501 

502 

503class LintDiagnosticResultState(IntEnum): 

504 REPORTED = 1 

505 MANUAL_FIXABLE = 2 

506 AUTO_FIXABLE = 3 

507 FIXED = 4 

508 

509 

510@dataclasses.dataclass(slots=True, frozen=True) 

511class LintDiagnosticResult: 

512 diagnostic: types.Diagnostic 

513 result_state: LintDiagnosticResultState 

514 invalid_marker: Optional[RuntimeError] 

515 is_file_level_diagnostic: bool 

516 has_broken_range: bool 

517 missing_severity: bool 

518 discovered_in: str 

519 report_for_related_file: Optional[str] 

520 

521 

522class LintReport: 

523 

524 def __init__(self) -> None: 

525 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter() 

526 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = ( 

527 defaultdict(list) 

528 ) 

529 self.number_of_invalid_diagnostics: int = 0 

530 self.number_of_broken_diagnostics: int = 0 

531 self.lint_state: Optional[LintState] = None 

532 self.start_timestamp = datetime.datetime.now() 

533 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0) 

534 self._timer = time.perf_counter() 

535 

536 @contextlib.contextmanager 

537 def line_state(self, lint_state: LintState) -> Iterable[None]: 

538 previous = self.lint_state 

539 if previous is not None: 

540 path = previous.path 

541 duration = time.perf_counter() - self._timer 

542 self.durations[path] += duration 

543 

544 self.lint_state = lint_state 

545 

546 try: 

547 self._timer = time.perf_counter() 

548 yield 

549 finally: 

550 now = time.perf_counter() 

551 duration = now - self._timer 

552 self.durations[lint_state.path] += duration 

553 self._timer = now 

554 self.lint_state = previous 

555 

556 def report_diagnostic( 

557 self, 

558 diagnostic: types.Diagnostic, 

559 *, 

560 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED, 

561 in_file: Optional[str] = None, 

562 ) -> None: 

563 lint_state = self.lint_state 

564 assert lint_state is not None 

565 if in_file is None: 

566 in_file = lint_state.path 

567 discovered_in_file = in_file 

568 severity = diagnostic.severity 

569 missing_severity = False 

570 error_marker: Optional[RuntimeError] = None 

571 if severity is None: 

572 self.number_of_invalid_diagnostics += 1 

573 severity = types.DiagnosticSeverity.Warning 

574 diagnostic.severity = severity 

575 missing_severity = True 

576 

577 lines = lint_state.lines 

578 diag_range = diagnostic.range 

579 start_pos = diag_range.start 

580 end_pos = diag_range.end 

581 diag_data = diagnostic.data 

582 if isinstance(diag_data, dict): 

583 report_for_related_file = diag_data.get("report_for_related_file") 

584 if report_for_related_file is None or not isinstance( 

585 report_for_related_file, str 

586 ): 

587 report_for_related_file = None 

588 else: 

589 in_file = report_for_related_file 

590 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations. 

591 if in_file not in self.durations: 

592 self.durations[in_file] = 0 

593 else: 

594 report_for_related_file = None 

595 if report_for_related_file is not None: 

596 is_file_level_diagnostic = True 

597 else: 

598 is_file_level_diagnostic = _is_file_level_diagnostic( 

599 lines, 

600 start_pos.line, 

601 start_pos.character, 

602 end_pos.line, 

603 end_pos.character, 

604 ) 

605 has_broken_range = not is_file_level_diagnostic and ( 

606 end_pos.line > len(lines) or start_pos.line < 0 

607 ) 

608 

609 if has_broken_range or missing_severity: 

610 error_marker = RuntimeError("Registration Marker for invalid diagnostic") 

611 

612 diagnostic_result = LintDiagnosticResult( 

613 diagnostic, 

614 result_state, 

615 error_marker, 

616 is_file_level_diagnostic, 

617 has_broken_range, 

618 missing_severity, 

619 report_for_related_file=report_for_related_file, 

620 discovered_in=discovered_in_file, 

621 ) 

622 

623 self.diagnostics_by_file[in_file].append(diagnostic_result) 

624 self.diagnostics_count[severity] += 1 

625 self.process_diagnostic(in_file, lint_state, diagnostic_result) 

626 

627 def process_diagnostic( 

628 self, 

629 filename: str, 

630 lint_state: LintState, 

631 diagnostic_result: LintDiagnosticResult, 

632 ) -> None: 

633 # Subclass hook 

634 pass 

635 

636 def finish_report(self) -> None: 

637 # Subclass hook 

638 pass 

639 

640 

641_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = { 

642 types.DiagnosticSeverity.Error: "error", 

643 types.DiagnosticSeverity.Warning: "warning", 

644 types.DiagnosticSeverity.Information: "informational", 

645 types.DiagnosticSeverity.Hint: "pedantic", 

646} 

647 

648 

649_TERM_SEVERITY2TAG = { 649 ↛ exitline 649 didn't jump to the function exit

650 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored( 

651 lint_tag if lint_tag else "error", 

652 fg="red", 

653 bg="black", 

654 style="bold", 

655 ), 

656 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored( 

657 lint_tag if lint_tag else "warning", 

658 fg="yellow", 

659 bg="black", 

660 style="bold", 

661 ), 

662 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored( 

663 lint_tag if lint_tag else "informational", 

664 fg="blue", 

665 bg="black", 

666 style="bold", 

667 ), 

668 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored( 

669 lint_tag if lint_tag else "pedantic", 

670 fg="green", 

671 bg="black", 

672 style="bold", 

673 ), 

674} 

675 

676 

677def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity: 

678 lint_tag: Optional[LintSeverity] = None 

679 if isinstance(diagnostic.data, dict): 

680 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity")) 

681 

682 if lint_tag is not None: 

683 return lint_tag 

684 severity = diagnostic.severity 

685 if severity is None: 

686 return "warning" 

687 return _LS2DEBPUTY_SEVERITY.get(severity, "warning") 

688 

689 

690class TermLintReport(LintReport): 

691 

692 def __init__(self, fo: OutputStylingBase) -> None: 

693 super().__init__() 

694 self.fo = fo 

695 

696 def finish_report(self) -> None: 

697 # Nothing to do for now 

698 pass 

699 

700 def process_diagnostic( 

701 self, 

702 filename: str, 

703 lint_state: LintState, 

704 diagnostic_result: LintDiagnosticResult, 

705 ) -> None: 

706 diagnostic = diagnostic_result.diagnostic 

707 fo = self.fo 

708 severity = diagnostic.severity 

709 assert severity is not None 

710 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED: 

711 tag_unresolved = _TERM_SEVERITY2TAG[severity] 

712 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic) 

713 tag = tag_unresolved(fo, lint_tag) 

714 else: 

715 tag = fo.colored( 

716 "auto-fixing", 

717 fg="magenta", 

718 bg="black", 

719 style="bold", 

720 ) 

721 

722 if diagnostic_result.is_file_level_diagnostic: 

723 start_line = 0 

724 start_position = 0 

725 end_line = 0 

726 end_position = 0 

727 else: 

728 start_line = diagnostic.range.start.line 

729 start_position = diagnostic.range.start.character 

730 end_line = diagnostic.range.end.line 

731 end_position = diagnostic.range.end.character 

732 

733 authority = diagnostic.source 

734 assert authority is not None 

735 diag_tags = f" [{authority}]" 

736 lines = lint_state.lines 

737 line_no_format_width = len(str(len(lines))) 

738 

739 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE: 

740 diag_tags += "[Correctable via --auto-fix]" 

741 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE: 

742 diag_tags += "[LSP interactive quickfix]" 

743 

744 code = f"[{diagnostic.code}]: " if diagnostic.code else "" 

745 msg = f"{code}{diagnostic.message}" 

746 

747 print( 

748 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}", 

749 ) 

750 if diagnostic_result.missing_severity: 

751 _warn( 

752 " This warning did not have an explicit severity; Used Warning as a fallback!" 

753 ) 

754 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED: 

755 # If it is fixed, there is no reason to show additional context. 

756 return 

757 if diagnostic_result.is_file_level_diagnostic: 

758 print(" File-level diagnostic") 

759 return 

760 if diagnostic_result.has_broken_range: 

761 _warn( 

762 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..." 

763 ) 

764 return 

765 self._print_range_context(diagnostic.range, lines, line_no_format_width) 

766 related_info_list = diagnostic.related_information or [] 

767 hint_tag = fo.colored( 

768 "Related information", 

769 fg="magenta", 

770 bg="black", 

771 style="bold", 

772 ) 

773 for related_info in related_info_list: 

774 if related_info.location.uri != lint_state.doc_uri: 

775 continue 

776 print(f" {hint_tag}: {related_info.message}") 

777 self._print_range_context( 

778 related_info.location.range, lines, line_no_format_width 

779 ) 

780 

781 def _print_range_context( 

782 self, 

783 print_range: types.Range, 

784 lines: List[str], 

785 line_no_format_width: int, 

786 ) -> None: 

787 lines_to_print = _lines_to_print(print_range) 

788 fo = self.fo 

789 start_line = print_range.start.line 

790 for line_no in range(start_line, start_line + lines_to_print): 

791 line = _highlight_range(fo, lines[line_no], line_no, print_range) 

792 print(f" {line_no + 1:{line_no_format_width}}: {line}") 

793 

794 

795class LinterPositionCodec: 

796 

797 def client_num_units(self, chars: str): 

798 return len(chars) 

799 

800 def position_from_client_units( 

801 self, 

802 lines: List[str], 

803 position: types.Position, 

804 ) -> types.Position: 

805 

806 if len(lines) == 0: 

807 return types.Position(0, 0) 

808 if position.line >= len(lines): 

809 return types.Position(len(lines) - 1, self.client_num_units(lines[-1])) 

810 return position 

811 

812 def position_to_client_units( 

813 self, 

814 _lines: List[str], 

815 position: types.Position, 

816 ) -> types.Position: 

817 return position 

818 

819 def range_from_client_units( 

820 self, _lines: List[str], range: types.Range 

821 ) -> types.Range: 

822 return range 

823 

824 def range_to_client_units( 

825 self, _lines: List[str], range: types.Range 

826 ) -> types.Range: 

827 return range 

828 

829 

830LINTER_POSITION_CODEC = LinterPositionCodec() 

831 

832 

833def _lines_to_print(range_: types.Range) -> int: 

834 count = range_.end.line - range_.start.line 

835 if range_.end.character > 0: 

836 count += 1 

837 return count 

838 

839 

840def _highlight_range( 

841 fo: OutputStylingBase, 

842 line: str, 

843 line_no: int, 

844 range_: types.Range, 

845) -> str: 

846 line_wo_nl = line.rstrip("\r\n") 

847 start_pos = 0 

848 prefix = "" 

849 suffix = "" 

850 if line_no == range_.start.line: 

851 start_pos = range_.start.character 

852 prefix = line_wo_nl[0:start_pos] 

853 if line_no == range_.end.line: 

854 end_pos = range_.end.character 

855 suffix = line_wo_nl[end_pos:] 

856 else: 

857 end_pos = len(line_wo_nl) 

858 

859 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold") 

860 

861 return prefix + marked_part + suffix 

862 

863 

864def _is_file_level_diagnostic( 

865 lines: List[str], 

866 start_line: int, 

867 start_position: int, 

868 end_line: int, 

869 end_position: int, 

870) -> bool: 

871 if start_line != 0 or start_position != 0: 

872 return False 

873 line_count = len(lines) 

874 if end_line + 1 == line_count and end_position == 0: 

875 return True 

876 return end_line == line_count and line_count and end_position == len(lines[-1])