Coverage for src/debputy/linting/lint_util.py: 56%

425 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import collections 

2import contextlib 

3import dataclasses 

4import datetime 

5import os 

6import time 

7from collections import defaultdict, Counter 

8from dataclasses import fields 

9from enum import IntEnum 

10from functools import lru_cache 

11from typing import ( 

12 List, 

13 Optional, 

14 Callable, 

15 TYPE_CHECKING, 

16 Mapping, 

17 Sequence, 

18 cast, 

19 FrozenSet, 

20 Self, 

21 Set, 

22 Dict, 

23 Iterable, 

24 Awaitable, 

25 AsyncIterable, 

26 Tuple, 

27 TypeVar, 

28) 

29 

30 

31import debputy.l10n as l10n 

32from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

33from debputy.dh.dh_assistant import ( 

34 extract_dh_addons_from_control, 

35 DhSequencerData, 

36 parse_drules_for_addons, 

37) 

38from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError 

39from debputy.filesystem_scan import VirtualPathBase 

40from debputy.integration_detection import determine_debputy_integration_mode 

41from debputy.l10n import Translations 

42from debputy.lsp.config.debputy_config import DebputyConfig 

43from debputy.lsp.diagnostics import ( 

44 LintSeverity, 

45 LINT_SEVERITY2LSP_SEVERITY, 

46 DiagnosticData, 

47 NATIVELY_LSP_SUPPORTED_SEVERITIES, 

48) 

49from debputy.lsp.spellchecking import Spellchecker, default_spellchecker 

50from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

51from debputy.packages import SourcePackage, BinaryPackage 

52from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

53from debputy.plugin.api.spec import DebputyIntegrationMode 

54from debputy.util import _warn, T 

55from debputy.lsp.vendoring._deb822_repro.locatable import ( 

56 Range as TERange, 

57 Position as TEPosition, 

58 Locatable, 

59 START_POSITION, 

60) 

61 

62if TYPE_CHECKING: 

63 import lsprotocol.types as types 

64 from debputy.lsp.text_util import LintCapablePositionCodec 

65 from debputy.lsp.maint_prefs import ( 

66 MaintainerPreferenceTable, 

67 EffectiveFormattingPreference, 

68 ) 

69 

70else: 

71 import debputy.lsprotocol.types as types 

72 

73 

74L = TypeVar("L", bound=Locatable) 

75 

76 

77AsyncLinterImpl = Callable[["LintState"], Awaitable[None]] 

78FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]] 

79 

80 

81class AbortTaskError(DebputyRuntimeError): 

82 pass 

83 

84 

85# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

86DIAG_SOURCE_WITHOUT_SECTIONS: FrozenSet[str] = frozenset( 

87 { 

88 "debputy", 

89 "dpkg", 

90 } 

91) 

92 

93# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

94DIAG_SOURCE_WITH_SECTIONS: FrozenSet[str] = frozenset( 

95 { 

96 "Policy", 

97 "DevRef", 

98 } 

99) 

100 

101 

102def te_position_to_lsp(te_position: "TEPosition") -> types.Position: 

103 return types.Position( 

104 te_position.line_position, 

105 te_position.cursor_position, 

106 ) 

107 

108 

109def te_range_to_lsp(te_range: "TERange") -> types.Range: 

110 return types.Range( 

111 te_position_to_lsp(te_range.start_pos), 

112 te_position_to_lsp(te_range.end_pos), 

113 ) 

114 

115 

116def with_range_in_continuous_parts( 

117 iterable: Iterable["L"], 

118 *, 

119 start_relative_to: "TEPosition" = START_POSITION, 

120) -> Iterable[Tuple["TERange", "L"]]: 

121 current_pos = start_relative_to 

122 for part in iterable: 

123 part_range = part.size().relative_to(current_pos) 

124 yield part_range, part 

125 current_pos = part_range.end_pos 

126 

127 

128@lru_cache 

129def _check_diagnostic_source(source: str) -> None: 

130 if source in DIAG_SOURCE_WITHOUT_SECTIONS: 

131 return 

132 parts = source.split(" ") 

133 s = parts[0] 

134 if s not in DIAG_SOURCE_WITH_SECTIONS: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 raise ValueError( 

136 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py' 

137 ) 

138 if len(parts) != 2: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 raise ValueError( 

140 f'The diagnostic source "{source}" should have exactly one section associated with it.' 

141 ) 

142 

143 

144@dataclasses.dataclass(slots=True) 

145class DebputyMetadata: 

146 debputy_integration_mode: Optional[DebputyIntegrationMode] 

147 

148 @classmethod 

149 def from_data( 

150 cls, 

151 source_fields: Mapping[str, str], 

152 dh_sequencer_data: DhSequencerData, 

153 ) -> Self: 

154 integration_mode = determine_debputy_integration_mode( 

155 source_fields, 

156 dh_sequencer_data.sequences, 

157 ) 

158 return cls(integration_mode) 

159 

160 

161@dataclasses.dataclass(slots=True, frozen=True) 

162class RelatedDiagnosticInformation: 

163 text_range: "TERange" 

164 message: str 

165 doc_uri: str 

166 

167 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation: 

168 return types.DiagnosticRelatedInformation( 

169 types.Location( 

170 self.doc_uri, 

171 lint_state.position_codec.range_to_client_units( 

172 lint_state.lines, 

173 te_range_to_lsp(self.text_range), 

174 ), 

175 ), 

176 self.message, 

177 ) 

178 

179 

180@dataclasses.dataclass(frozen=True, slots=True) 

181class WorkspaceTextEditSupport: 

182 supports_document_changes: bool = False 

183 supported_resource_operation_edit_kinds: Sequence[types.ResourceOperationKind] = ( 

184 dataclasses.field(default_factory=list) 

185 ) 

186 

187 @property 

188 def supports_versioned_text_edits(self) -> bool: 

189 return ( 

190 self.supports_document_changes 

191 or self.supported_resource_operation_edit_kinds 

192 ) 

193 

194 

195class LintState: 

196 

197 @property 

198 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

199 """The plugin features known to the running instance of `debputy` 

200 

201 This is mostly only relevant when working with `debputy.manifest` 

202 """ 

203 raise NotImplementedError 

204 

205 @property 

206 def doc_uri(self) -> str: 

207 """The URI for the document being scanned. 

208 

209 This can be useful for providing related location ranges. 

210 """ 

211 raise NotImplementedError 

212 

213 @property 

214 def doc_version(self) -> Optional[int]: 

215 raise NotImplementedError 

216 

217 @property 

218 def source_root(self) -> Optional[VirtualPathBase]: 

219 """The path to the unpacked source root directory if available 

220 

221 This is the directory that would contain the `debian/` directory. Note, if you need the 

222 `debian/` directory, please use `debian_dir` instead. There may be cases where the source 

223 root is unavailable but the `debian/` directory is not. 

224 """ 

225 raise NotImplementedError 

226 

227 @property 

228 def debian_dir(self) -> Optional[VirtualPathBase]: 

229 """The path to the `debian/` directory if available""" 

230 raise NotImplementedError 

231 

232 @property 

233 def path(self) -> str: 

234 """The filename or path of the file being scanned. 

235 

236 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed 

237 that the file on the file system (even if accessible) has correct contents. When doing diagnostics 

238 for an editor, the editor often requests diagnostics for unsaved changes. 

239 """ 

240 raise NotImplementedError 

241 

242 @property 

243 def content(self) -> str: 

244 """The full contents of the file being checked""" 

245 raise NotImplementedError 

246 

247 @property 

248 def lines(self) -> List[str]: 

249 # FIXME: Replace with `Sequence[str]` if possible 

250 """The contents of the file being checked as a list of lines 

251 

252 Do **not** change the contents of this list as it may be cached. 

253 """ 

254 raise NotImplementedError 

255 

256 @property 

257 def position_codec(self) -> "LintCapablePositionCodec": 

258 raise NotImplementedError 

259 

260 @property 

261 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

262 """The contents of the file being checked as a parsed deb822 file 

263 

264 This can sometimes use a cached version of the parsed file and is therefore preferable to 

265 parsing the file manually from `content` or `lines`. 

266 

267 Do **not** change the contents of this as it may be cached. 

268 """ 

269 raise NotImplementedError 

270 

271 @property 

272 def source_package(self) -> Optional[SourcePackage]: 

273 """The source package (source stanza of `debian/control`). 

274 

275 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the 

276 source stanza is not available. 

277 """ 

278 raise NotImplementedError 

279 

280 @property 

281 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

282 """The binary packages (the Package stanzas of `debian/control`). 

283 

284 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are 

285 available. 

286 """ 

287 raise NotImplementedError 

288 

289 @property 

290 def maint_preference_table(self) -> "MaintainerPreferenceTable": 

291 # TODO: Visible only for tests. 

292 raise NotImplementedError 

293 

294 @property 

295 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]: 

296 raise NotImplementedError 

297 

298 @property 

299 def debputy_metadata(self) -> DebputyMetadata: 

300 """Information about `debputy` usage such as which integration mode is being used.""" 

301 src_pkg = self.source_package 

302 src_fields = src_pkg.fields if src_pkg else {} 

303 return DebputyMetadata.from_data( 

304 src_fields, 

305 self.dh_sequencer_data, 

306 ) 

307 

308 @property 

309 def dh_sequencer_data(self) -> DhSequencerData: 

310 """Information about the use of the `dh` sequencer 

311 

312 This includes which sequences are being used and whether the `dh` sequencer is used at all. 

313 """ 

314 raise NotImplementedError 

315 

316 @property 

317 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

318 raise NotImplementedError 

319 

320 @property 

321 def debputy_config(self) -> DebputyConfig: 

322 raise NotImplementedError 

323 

324 def spellchecker(self) -> "Spellchecker": 

325 checker = default_spellchecker() 

326 ignored_words = set() 

327 source_package = self.source_package 

328 binary_packages = self.binary_packages 

329 if source_package and (name := source_package.fields.get("Source")) is not None: 

330 ignored_words.add(name) 

331 if binary_packages: 

332 ignored_words.update(binary_packages.keys()) 

333 return checker.context_ignored_words(ignored_words) 

334 

335 async def slow_iter( 

336 self, 

337 iterable: Iterable[T], 

338 *, 

339 yield_every: int = 100, 

340 ) -> AsyncIterable[T]: 

341 for value in iterable: 

342 yield value 

343 

344 def translation(self, domain: str) -> Translations: 

345 return l10n.translation( 

346 domain, 

347 ) 

348 

349 def related_diagnostic_information( 

350 self, 

351 text_range: "TERange", 

352 message: str, 

353 *, 

354 doc_uri: Optional[str] = None, 

355 ) -> RelatedDiagnosticInformation: 

356 """Provide a related context for the diagnostic 

357 

358 The related diagnostic information is typically highlighted with the diagnostic. As an example, 

359 `debputy lint`'s terminal output will display the message and display the selected range after 

360 the diagnostic itself. 

361 

362 :param text_range: The text range to highlight. 

363 :param message: The message to associate with the provided text range. 

364 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is 

365 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file 

366 for ranges passed to `emit_diagnostic`. 

367 :return: 

368 """ 

369 return RelatedDiagnosticInformation( 

370 text_range, 

371 message, 

372 doc_uri=doc_uri if doc_uri is not None else self.doc_uri, 

373 ) 

374 

375 def emit_diagnostic( 

376 self, 

377 text_range: "TERange", 

378 diagnostic_msg: str, 

379 severity: LintSeverity, 

380 authority_reference: str, 

381 *, 

382 quickfixes: Optional[List[dict]] = None, 

383 tags: Optional[List[types.DiagnosticTag]] = None, 

384 related_information: Optional[List[RelatedDiagnosticInformation]] = None, 

385 diagnostic_applies_to_another_file: Optional[str] = None, 

386 enable_non_interactive_auto_fix: bool = True, 

387 ) -> None: 

388 """Emit a diagnostic for an issue detected in the current file. 

389 

390 :param text_range: The text range to highlight in the file. 

391 :param diagnostic_msg: The message to show to the user for this diagnostic 

392 :param severity: The severity to associate with the diagnostic. 

393 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of. 

394 

395 Use: 

396 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1 

397 (replace the section number with the relevant number for your case) 

398 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2 

399 (replace the section number with the relevant number for your case) 

400 * "debputy" for diagnostics without a reference or where `debputy` is the authority. 

401 (This is also used for cases where `debputy` filters the result. Like with spellchecking 

402 via hunspell, where `debputy` provides its own ignore list on top) 

403 

404 If you need a new reference, feel free to add it to this list. 

405 :param quickfixes: If provided, this is a list of possible fixes for this problem. 

406 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`. 

407 :param tags: TODO: Not yet specified (currently uses LSP format). 

408 :param related_information: Provide additional context to the diagnostic. This can be used to define 

409 the source of a conflict. As an example, for duplicate definitions, this can be used to show where 

410 the definitions are. 

411 

412 Every item should be created via the `related_diagnostic_information` method. 

413 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via 

414 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false 

415 positives. 

416 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names. 

417 Leave this one at `None`, unless you know you need it. 

418 

419 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as 

420 `debian/install`, etc. 

421 """ 

422 _check_diagnostic_source(authority_reference) 

423 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity] 

424 diag_data: DiagnosticData = { 

425 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix, 

426 } 

427 

428 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES: 

429 diag_data["lint_severity"] = severity 

430 if quickfixes: 

431 diag_data["quickfixes"] = quickfixes 

432 if diagnostic_applies_to_another_file is not None: 

433 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file 

434 

435 lsp_range_client_units = self.position_codec.range_to_client_units( 

436 self.lines, 

437 te_range_to_lsp(text_range), 

438 ) 

439 

440 if related_information and any( 440 ↛ 443line 440 didn't jump to line 443 because the condition on line 440 was never true

441 i.doc_uri != self.doc_uri for i in related_information 

442 ): 

443 raise NotImplementedError("Ranges from another document will be wrong") 

444 

445 related_lsp_format = ( 

446 [i.to_lsp(self) for i in related_information] 

447 if related_information 

448 else None 

449 ) 

450 diag = types.Diagnostic( 

451 lsp_range_client_units, 

452 diagnostic_msg, 

453 severity=lsp_severity, 

454 source=authority_reference, 

455 data=diag_data if diag_data else None, 

456 tags=tags, 

457 related_information=related_lsp_format, 

458 ) 

459 self._emit_diagnostic(diag) 

460 

461 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

462 raise NotImplementedError 

463 

464 

465CLI_WORKSPACE_TEXT_EDIT_SUPPORT = WorkspaceTextEditSupport( 

466 supports_document_changes=True, 

467) 

468 

469 

470@dataclasses.dataclass(slots=True) 

471class LintStateImpl(LintState): 

472 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False) 

473 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False) 

474 source_root: Optional[VirtualPathBase] 

475 debian_dir: Optional[VirtualPathBase] 

476 path: str 

477 content: str 

478 lines: List[str] 

479 debputy_config: DebputyConfig 

480 source_package: Optional[SourcePackage] = None 

481 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

482 effective_preference: Optional["EffectiveFormattingPreference"] = None 

483 lint_implementation: Optional["AsyncLinterImpl"] = None 

484 _parsed_cache: Optional[Deb822FileElement] = None 

485 _dh_sequencer_cache: Optional[DhSequencerData] = None 

486 _diagnostics: Optional[List[types.Diagnostic]] = None 

487 

488 @property 

489 def doc_uri(self) -> str: 

490 path = self.path 

491 abs_path = os.path.join(os.path.curdir, path) 

492 return f"file://{abs_path}" 

493 

494 @property 

495 def doc_version(self) -> Optional[int]: 

496 return None 

497 

498 @property 

499 def position_codec(self) -> "LintCapablePositionCodec": 

500 return LINTER_POSITION_CODEC 

501 

502 @property 

503 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

504 cache = self._parsed_cache 

505 if cache is None: 

506 cache = parse_deb822_file( 

507 self.lines, 

508 accept_files_with_error_tokens=True, 

509 accept_files_with_duplicated_fields=True, 

510 ) 

511 self._parsed_cache = cache 

512 return cache 

513 

514 @property 

515 def dh_sequencer_data(self) -> DhSequencerData: 

516 dh_sequencer_cache = self._dh_sequencer_cache 

517 if dh_sequencer_cache is None: 

518 debian_dir = self.debian_dir 

519 dh_sequences: Set[str] = set() 

520 saw_dh = False 

521 src_pkg = self.source_package 

522 drules = debian_dir.get("rules") if debian_dir is not None else None 

523 if drules and drules.is_file: 

524 try: 

525 with drules.open() as fd: 

526 saw_dh = parse_drules_for_addons(fd, dh_sequences) 

527 except PureVirtualPathError: 

528 pass 

529 if src_pkg: 

530 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

531 

532 dh_sequencer_cache = DhSequencerData( 

533 frozenset(dh_sequences), 

534 saw_dh, 

535 ) 

536 self._dh_sequencer_cache = dh_sequencer_cache 

537 return dh_sequencer_cache 

538 

539 @property 

540 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

541 return CLI_WORKSPACE_TEXT_EDIT_SUPPORT 

542 

543 async def gather_diagnostics(self) -> List[types.Diagnostic]: 

544 if self._diagnostics is not None: 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true

545 raise RuntimeError( 

546 "run_diagnostics cannot be run while it is already running" 

547 ) 

548 linter = self.lint_implementation 

549 if linter is None: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 raise TypeError( 

551 "run_diagnostics cannot be run:" 

552 " LintState was created without a lint implementation (such as for reformat-only)" 

553 ) 

554 self._diagnostics = diagnostics = [] 

555 

556 await linter(self) 

557 

558 self._diagnostics = None 

559 return diagnostics 

560 

561 def clear_cache(self) -> None: 

562 self._parsed_cache = None 

563 self._dh_sequencer_cache = None 

564 

565 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

566 diagnostics = self._diagnostics 

567 if diagnostics is None: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics") 

569 diagnostics.append(diagnostic) 

570 

571 

572class LintDiagnosticResultState(IntEnum): 

573 REPORTED = 1 

574 MANUAL_FIXABLE = 2 

575 AUTO_FIXABLE = 3 

576 FIXED = 4 

577 

578 

579@dataclasses.dataclass(slots=True, frozen=True) 

580class LintDiagnosticResult: 

581 diagnostic: types.Diagnostic 

582 result_state: LintDiagnosticResultState 

583 invalid_marker: Optional[RuntimeError] 

584 is_file_level_diagnostic: bool 

585 has_broken_range: bool 

586 missing_severity: bool 

587 discovered_in: str 

588 report_for_related_file: Optional[str] 

589 

590 

591class LintReport: 

592 

593 def __init__(self) -> None: 

594 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter() 

595 self.diagnostics_by_file: Mapping[str, List[LintDiagnosticResult]] = ( 

596 defaultdict(list) 

597 ) 

598 self.number_of_invalid_diagnostics: int = 0 

599 self.number_of_broken_diagnostics: int = 0 

600 self.lint_state: Optional[LintState] = None 

601 self.start_timestamp = datetime.datetime.now() 

602 self.durations: Dict[str, float] = collections.defaultdict(lambda: 0.0) 

603 self._timer = time.perf_counter() 

604 

605 @contextlib.contextmanager 

606 def line_state(self, lint_state: LintState) -> Iterable[None]: 

607 previous = self.lint_state 

608 if previous is not None: 

609 path = previous.path 

610 duration = time.perf_counter() - self._timer 

611 self.durations[path] += duration 

612 

613 self.lint_state = lint_state 

614 

615 try: 

616 self._timer = time.perf_counter() 

617 yield 

618 finally: 

619 now = time.perf_counter() 

620 duration = now - self._timer 

621 self.durations[lint_state.path] += duration 

622 self._timer = now 

623 self.lint_state = previous 

624 

625 def report_diagnostic( 

626 self, 

627 diagnostic: types.Diagnostic, 

628 *, 

629 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED, 

630 in_file: Optional[str] = None, 

631 ) -> None: 

632 lint_state = self.lint_state 

633 assert lint_state is not None 

634 if in_file is None: 

635 in_file = lint_state.path 

636 discovered_in_file = in_file 

637 severity = diagnostic.severity 

638 missing_severity = False 

639 error_marker: Optional[RuntimeError] = None 

640 if severity is None: 

641 self.number_of_invalid_diagnostics += 1 

642 severity = types.DiagnosticSeverity.Warning 

643 diagnostic.severity = severity 

644 missing_severity = True 

645 

646 lines = lint_state.lines 

647 diag_range = diagnostic.range 

648 start_pos = diag_range.start 

649 end_pos = diag_range.end 

650 diag_data = diagnostic.data 

651 if isinstance(diag_data, dict): 

652 report_for_related_file = diag_data.get("report_for_related_file") 

653 if report_for_related_file is None or not isinstance( 

654 report_for_related_file, str 

655 ): 

656 report_for_related_file = None 

657 else: 

658 in_file = report_for_related_file 

659 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations. 

660 if in_file not in self.durations: 

661 self.durations[in_file] = 0 

662 else: 

663 report_for_related_file = None 

664 if report_for_related_file is not None: 

665 is_file_level_diagnostic = True 

666 else: 

667 is_file_level_diagnostic = _is_file_level_diagnostic( 

668 lines, 

669 start_pos.line, 

670 start_pos.character, 

671 end_pos.line, 

672 end_pos.character, 

673 ) 

674 has_broken_range = not is_file_level_diagnostic and ( 

675 end_pos.line > len(lines) or start_pos.line < 0 

676 ) 

677 

678 if has_broken_range or missing_severity: 

679 error_marker = RuntimeError("Registration Marker for invalid diagnostic") 

680 

681 diagnostic_result = LintDiagnosticResult( 

682 diagnostic, 

683 result_state, 

684 error_marker, 

685 is_file_level_diagnostic, 

686 has_broken_range, 

687 missing_severity, 

688 report_for_related_file=report_for_related_file, 

689 discovered_in=discovered_in_file, 

690 ) 

691 

692 self.diagnostics_by_file[in_file].append(diagnostic_result) 

693 self.diagnostics_count[severity] += 1 

694 self.process_diagnostic(in_file, lint_state, diagnostic_result) 

695 

696 def process_diagnostic( 

697 self, 

698 filename: str, 

699 lint_state: LintState, 

700 diagnostic_result: LintDiagnosticResult, 

701 ) -> None: 

702 # Subclass hook 

703 pass 

704 

705 def finish_report(self) -> None: 

706 # Subclass hook 

707 pass 

708 

709 

710_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = { 

711 types.DiagnosticSeverity.Error: "error", 

712 types.DiagnosticSeverity.Warning: "warning", 

713 types.DiagnosticSeverity.Information: "informational", 

714 types.DiagnosticSeverity.Hint: "pedantic", 

715} 

716 

717 

718_TERM_SEVERITY2TAG = { 

719 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored( 

720 lint_tag if lint_tag else "error", 

721 fg="red", 

722 bg="black", 

723 style="bold", 

724 ), 

725 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored( 

726 lint_tag if lint_tag else "warning", 

727 fg="yellow", 

728 bg="black", 

729 style="bold", 

730 ), 

731 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored( 

732 lint_tag if lint_tag else "informational", 

733 fg="blue", 

734 bg="black", 

735 style="bold", 

736 ), 

737 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored( 

738 lint_tag if lint_tag else "pedantic", 

739 fg="green", 

740 bg="black", 

741 style="bold", 

742 ), 

743} 

744 

745 

746def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity: 

747 lint_tag: Optional[LintSeverity] = None 

748 if isinstance(diagnostic.data, dict): 

749 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity")) 

750 

751 if lint_tag is not None: 

752 return lint_tag 

753 severity = diagnostic.severity 

754 if severity is None: 

755 return "warning" 

756 return _LS2DEBPUTY_SEVERITY.get(severity, "warning") 

757 

758 

759class TermLintReport(LintReport): 

760 

761 def __init__(self, fo: IOBasedOutputStyling) -> None: 

762 super().__init__() 

763 self.fo = fo 

764 

765 def finish_report(self) -> None: 

766 # Nothing to do for now 

767 pass 

768 

769 def process_diagnostic( 

770 self, 

771 filename: str, 

772 lint_state: LintState, 

773 diagnostic_result: LintDiagnosticResult, 

774 ) -> None: 

775 diagnostic = diagnostic_result.diagnostic 

776 fo = self.fo 

777 severity = diagnostic.severity 

778 assert severity is not None 

779 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED: 

780 tag_unresolved = _TERM_SEVERITY2TAG[severity] 

781 lint_tag: Optional[LintSeverity] = debputy_severity(diagnostic) 

782 tag = tag_unresolved(fo, lint_tag) 

783 else: 

784 tag = fo.colored( 

785 "auto-fixing", 

786 fg="magenta", 

787 bg="black", 

788 style="bold", 

789 ) 

790 

791 if diagnostic_result.is_file_level_diagnostic: 

792 start_line = 0 

793 start_position = 0 

794 end_line = 0 

795 end_position = 0 

796 else: 

797 start_line = diagnostic.range.start.line 

798 start_position = diagnostic.range.start.character 

799 end_line = diagnostic.range.end.line 

800 end_position = diagnostic.range.end.character 

801 

802 authority = diagnostic.source 

803 assert authority is not None 

804 diag_tags = f" [{authority}]" 

805 lines = lint_state.lines 

806 line_no_format_width = len(str(len(lines))) 

807 

808 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE: 

809 diag_tags += "[Correctable via --auto-fix]" 

810 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE: 

811 diag_tags += "[LSP interactive quickfix]" 

812 

813 code = f"[{diagnostic.code}]: " if diagnostic.code else "" 

814 msg = f"{code}{diagnostic.message}" 

815 

816 print( 

817 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}", 

818 ) 

819 if diagnostic_result.missing_severity: 

820 _warn( 

821 " This warning did not have an explicit severity; Used Warning as a fallback!" 

822 ) 

823 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED: 

824 # If it is fixed, there is no reason to show additional context. 

825 return 

826 if diagnostic_result.is_file_level_diagnostic: 

827 print(" File-level diagnostic") 

828 return 

829 if diagnostic_result.has_broken_range: 

830 _warn( 

831 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..." 

832 ) 

833 return 

834 self._print_range_context(diagnostic.range, lines, line_no_format_width) 

835 related_info_list = diagnostic.related_information or [] 

836 hint_tag = fo.colored( 

837 "Related information", 

838 fg="magenta", 

839 bg="black", 

840 style="bold", 

841 ) 

842 for related_info in related_info_list: 

843 if related_info.location.uri != lint_state.doc_uri: 

844 continue 

845 print(f" {hint_tag}: {related_info.message}") 

846 self._print_range_context( 

847 related_info.location.range, lines, line_no_format_width 

848 ) 

849 

850 def _print_range_context( 

851 self, 

852 print_range: types.Range, 

853 lines: List[str], 

854 line_no_format_width: int, 

855 ) -> None: 

856 lines_to_print = _lines_to_print(print_range) 

857 fo = self.fo 

858 start_line = print_range.start.line 

859 for line_no in range(start_line, start_line + lines_to_print): 

860 line = _highlight_range(fo, lines[line_no], line_no, print_range) 

861 print(f" {line_no + 1:{line_no_format_width}}: {line}") 

862 

863 

864class LinterPositionCodec: 

865 

866 def client_num_units(self, chars: str): 

867 return len(chars) 

868 

869 def position_from_client_units( 

870 self, 

871 lines: List[str], 

872 position: types.Position, 

873 ) -> types.Position: 

874 

875 if len(lines) == 0: 

876 return types.Position(0, 0) 

877 if position.line >= len(lines): 

878 return types.Position(len(lines) - 1, self.client_num_units(lines[-1])) 

879 return position 

880 

881 def position_to_client_units( 

882 self, 

883 _lines: List[str], 

884 position: types.Position, 

885 ) -> types.Position: 

886 return position 

887 

888 def range_from_client_units( 

889 self, _lines: List[str], range: types.Range 

890 ) -> types.Range: 

891 return range 

892 

893 def range_to_client_units( 

894 self, _lines: List[str], range: types.Range 

895 ) -> types.Range: 

896 return range 

897 

898 

899LINTER_POSITION_CODEC = LinterPositionCodec() 

900 

901 

902def _lines_to_print(range_: types.Range) -> int: 

903 count = range_.end.line - range_.start.line 

904 if range_.end.character > 0: 

905 count += 1 

906 return count 

907 

908 

909def _highlight_range( 

910 fo: IOBasedOutputStyling, 

911 line: str, 

912 line_no: int, 

913 range_: types.Range, 

914) -> str: 

915 line_wo_nl = line.rstrip("\r\n") 

916 start_pos = 0 

917 prefix = "" 

918 suffix = "" 

919 if line_no == range_.start.line: 

920 start_pos = range_.start.character 

921 prefix = line_wo_nl[0:start_pos] 

922 if line_no == range_.end.line: 

923 end_pos = range_.end.character 

924 suffix = line_wo_nl[end_pos:] 

925 else: 

926 end_pos = len(line_wo_nl) 

927 

928 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold") 

929 

930 return prefix + marked_part + suffix 

931 

932 

933def _is_file_level_diagnostic( 

934 lines: List[str], 

935 start_line: int, 

936 start_position: int, 

937 end_line: int, 

938 end_position: int, 

939) -> bool: 

940 if start_line != 0 or start_position != 0: 

941 return False 

942 line_count = len(lines) 

943 if end_line + 1 == line_count and end_position == 0: 

944 return True 

945 return end_line == line_count and line_count and end_position == len(lines[-1])