Coverage for src/debputy/linting/lint_util.py: 56%

426 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import collections 

2import contextlib 

3import dataclasses 

4import datetime 

5import os 

6import time 

7from collections import defaultdict, Counter 

8from dataclasses import fields 

9from enum import IntEnum 

10from functools import lru_cache 

11from typing import ( 

12 List, 

13 Optional, 

14 TYPE_CHECKING, 

15 cast, 

16 FrozenSet, 

17 Self, 

18 Set, 

19 Dict, 

20 Tuple, 

21 TypeVar, 

22) 

23from collections.abc import ( 

24 Callable, 

25 Mapping, 

26 Sequence, 

27 Iterable, 

28 Awaitable, 

29 AsyncIterable, 

30) 

31 

32 

33import debputy.l10n as l10n 

34from debputy.commands.debputy_cmd.output import IOBasedOutputStyling 

35from debputy.dh.dh_assistant import ( 

36 extract_dh_addons_from_control, 

37 DhSequencerData, 

38 parse_drules_for_addons, 

39) 

40from debputy.exceptions import PureVirtualPathError, DebputyRuntimeError 

41from debputy.filesystem_scan import VirtualPathBase 

42from debputy.integration_detection import determine_debputy_integration_mode 

43from debputy.l10n import Translations 

44from debputy.lsp.config.debputy_config import DebputyConfig 

45from debputy.lsp.diagnostics import ( 

46 LintSeverity, 

47 LINT_SEVERITY2LSP_SEVERITY, 

48 DiagnosticData, 

49 NATIVELY_LSP_SUPPORTED_SEVERITIES, 

50) 

51from debputy.lsp.spellchecking import Spellchecker, default_spellchecker 

52from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

53from debputy.packages import SourcePackage, BinaryPackage 

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

55from debputy.plugin.api.spec import DebputyIntegrationMode 

56from debputy.util import _warn, T 

57from debputy.lsp.vendoring._deb822_repro.locatable import ( 

58 Range as TERange, 

59 Position as TEPosition, 

60 Locatable, 

61 START_POSITION, 

62) 

63 

64if TYPE_CHECKING: 

65 import lsprotocol.types as types 

66 from debputy.lsp.text_util import LintCapablePositionCodec 

67 from debputy.lsp.maint_prefs import ( 

68 MaintainerPreferenceTable, 

69 EffectiveFormattingPreference, 

70 ) 

71 

72else: 

73 import debputy.lsprotocol.types as types 

74 

75 

76L = TypeVar("L", bound=Locatable) 

77 

78 

79AsyncLinterImpl = Callable[["LintState"], Awaitable[None]] 

80FormatterImpl = Callable[["LintState"], Optional[Sequence[types.TextEdit]]] 

81 

82 

83class AbortTaskError(DebputyRuntimeError): 

84 pass 

85 

86 

87# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

88DIAG_SOURCE_WITHOUT_SECTIONS: frozenset[str] = frozenset( 

89 { 

90 "debputy", 

91 "dpkg", 

92 } 

93) 

94 

95# If you add a new one to this set, remember to mention it in the docs of `LintState.emit_diagnostic` 

96DIAG_SOURCE_WITH_SECTIONS: frozenset[str] = frozenset( 

97 { 

98 "Policy", 

99 "DevRef", 

100 } 

101) 

102 

103 

104def te_position_to_lsp(te_position: "TEPosition") -> types.Position: 

105 return types.Position( 

106 te_position.line_position, 

107 te_position.cursor_position, 

108 ) 

109 

110 

111def te_range_to_lsp(te_range: "TERange") -> types.Range: 

112 return types.Range( 

113 te_position_to_lsp(te_range.start_pos), 

114 te_position_to_lsp(te_range.end_pos), 

115 ) 

116 

117 

118def with_range_in_continuous_parts( 

119 iterable: Iterable["L"], 

120 *, 

121 start_relative_to: "TEPosition" = START_POSITION, 

122) -> Iterable[tuple["TERange", "L"]]: 

123 current_pos = start_relative_to 

124 for part in iterable: 

125 part_range = part.size().relative_to(current_pos) 

126 yield part_range, part 

127 current_pos = part_range.end_pos 

128 

129 

130@lru_cache 

131def _check_diagnostic_source(source: str) -> None: 

132 if source in DIAG_SOURCE_WITHOUT_SECTIONS: 

133 return 

134 parts = source.split(" ") 

135 s = parts[0] 

136 if s not in DIAG_SOURCE_WITH_SECTIONS: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise ValueError( 

138 f'Unknown diagnostic source: "{source}". If you are adding a new source, update lint_util.py' 

139 ) 

140 if len(parts) != 2: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 raise ValueError( 

142 f'The diagnostic source "{source}" should have exactly one section associated with it.' 

143 ) 

144 

145 

146@dataclasses.dataclass(slots=True) 

147class DebputyMetadata: 

148 debputy_integration_mode: DebputyIntegrationMode | None 

149 

150 @classmethod 

151 def from_data( 

152 cls, 

153 source_fields: Mapping[str, str], 

154 dh_sequencer_data: DhSequencerData, 

155 ) -> Self: 

156 integration_mode = determine_debputy_integration_mode( 

157 source_fields, 

158 dh_sequencer_data.sequences, 

159 ) 

160 return cls(integration_mode) 

161 

162 

163@dataclasses.dataclass(slots=True, frozen=True) 

164class RelatedDiagnosticInformation: 

165 text_range: "TERange" 

166 message: str 

167 doc_uri: str 

168 

169 def to_lsp(self, lint_state: "LintState") -> types.DiagnosticRelatedInformation: 

170 return types.DiagnosticRelatedInformation( 

171 types.Location( 

172 self.doc_uri, 

173 lint_state.position_codec.range_to_client_units( 

174 lint_state.lines, 

175 te_range_to_lsp(self.text_range), 

176 ), 

177 ), 

178 self.message, 

179 ) 

180 

181 

182@dataclasses.dataclass(frozen=True, slots=True) 

183class WorkspaceTextEditSupport: 

184 supports_document_changes: bool = False 

185 supported_resource_operation_edit_kinds: Sequence[types.ResourceOperationKind] = ( 

186 dataclasses.field(default_factory=list) 

187 ) 

188 

189 @property 

190 def supports_versioned_text_edits(self) -> bool: 

191 return ( 

192 self.supports_document_changes 

193 or self.supported_resource_operation_edit_kinds 

194 ) 

195 

196 

197class LintState: 

198 

199 @property 

200 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

201 """The plugin features known to the running instance of `debputy` 

202 

203 This is mostly only relevant when working with `debputy.manifest` 

204 """ 

205 raise NotImplementedError 

206 

207 @property 

208 def doc_uri(self) -> str: 

209 """The URI for the document being scanned. 

210 

211 This can be useful for providing related location ranges. 

212 """ 

213 raise NotImplementedError 

214 

215 @property 

216 def doc_version(self) -> int | None: 

217 raise NotImplementedError 

218 

219 @property 

220 def source_root(self) -> VirtualPathBase | None: 

221 """The path to the unpacked source root directory if available 

222 

223 This is the directory that would contain the `debian/` directory. Note, if you need the 

224 `debian/` directory, please use `debian_dir` instead. There may be cases where the source 

225 root is unavailable but the `debian/` directory is not. 

226 """ 

227 raise NotImplementedError 

228 

229 @property 

230 def debian_dir(self) -> VirtualPathBase | None: 

231 """The path to the `debian/` directory if available""" 

232 raise NotImplementedError 

233 

234 @property 

235 def path(self) -> str: 

236 """The filename or path of the file being scanned. 

237 

238 Note this path may or may not be accessible to the running `debputy` instance. Nor is it guaranteed 

239 that the file on the file system (even if accessible) has correct contents. When doing diagnostics 

240 for an editor, the editor often requests diagnostics for unsaved changes. 

241 """ 

242 raise NotImplementedError 

243 

244 @property 

245 def content(self) -> str: 

246 """The full contents of the file being checked""" 

247 raise NotImplementedError 

248 

249 @property 

250 def lines(self) -> list[str]: 

251 # FIXME: Replace with `Sequence[str]` if possible 

252 """The contents of the file being checked as a list of lines 

253 

254 Do **not** change the contents of this list as it may be cached. 

255 """ 

256 raise NotImplementedError 

257 

258 @property 

259 def position_codec(self) -> "LintCapablePositionCodec": 

260 raise NotImplementedError 

261 

262 @property 

263 def parsed_deb822_file_content(self) -> Deb822FileElement | None: 

264 """The contents of the file being checked as a parsed deb822 file 

265 

266 This can sometimes use a cached version of the parsed file and is therefore preferable to 

267 parsing the file manually from `content` or `lines`. 

268 

269 Do **not** change the contents of this as it may be cached. 

270 """ 

271 raise NotImplementedError 

272 

273 @property 

274 def source_package(self) -> SourcePackage | None: 

275 """The source package (source stanza of `debian/control`). 

276 

277 Will be `None` if the `debian/control` file cannot be parsed as a deb822 file, or if the 

278 source stanza is not available. 

279 """ 

280 raise NotImplementedError 

281 

282 @property 

283 def binary_packages(self) -> Mapping[str, BinaryPackage] | None: 

284 """The binary packages (the Package stanzas of `debian/control`). 

285 

286 Will be `None` if the `debian/control` file cannot be parsed, or if no Package stanzas are 

287 available. 

288 """ 

289 raise NotImplementedError 

290 

291 @property 

292 def maint_preference_table(self) -> "MaintainerPreferenceTable": 

293 # TODO: Visible only for tests. 

294 raise NotImplementedError 

295 

296 @property 

297 def effective_preference(self) -> Optional["EffectiveFormattingPreference"]: 

298 raise NotImplementedError 

299 

300 @property 

301 def debputy_metadata(self) -> DebputyMetadata: 

302 """Information about `debputy` usage such as which integration mode is being used.""" 

303 src_pkg = self.source_package 

304 src_fields = src_pkg.fields if src_pkg else {} 

305 return DebputyMetadata.from_data( 

306 src_fields, 

307 self.dh_sequencer_data, 

308 ) 

309 

310 @property 

311 def dh_sequencer_data(self) -> DhSequencerData: 

312 """Information about the use of the `dh` sequencer 

313 

314 This includes which sequences are being used and whether the `dh` sequencer is used at all. 

315 """ 

316 raise NotImplementedError 

317 

318 @property 

319 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

320 raise NotImplementedError 

321 

322 @property 

323 def debputy_config(self) -> DebputyConfig: 

324 raise NotImplementedError 

325 

326 def spellchecker(self) -> "Spellchecker": 

327 checker = default_spellchecker() 

328 ignored_words = set() 

329 source_package = self.source_package 

330 binary_packages = self.binary_packages 

331 if source_package and (name := source_package.fields.get("Source")) is not None: 

332 ignored_words.add(name) 

333 if binary_packages: 

334 ignored_words.update(binary_packages.keys()) 

335 return checker.context_ignored_words(ignored_words) 

336 

337 async def slow_iter( 

338 self, 

339 iterable: Iterable[T], 

340 *, 

341 yield_every: int = 100, 

342 ) -> AsyncIterable[T]: 

343 for value in iterable: 

344 yield value 

345 

346 def translation(self, domain: str) -> Translations: 

347 return l10n.translation( 

348 domain, 

349 ) 

350 

351 def related_diagnostic_information( 

352 self, 

353 text_range: "TERange", 

354 message: str, 

355 *, 

356 doc_uri: str | None = None, 

357 ) -> RelatedDiagnosticInformation: 

358 """Provide a related context for the diagnostic 

359 

360 The related diagnostic information is typically highlighted with the diagnostic. As an example, 

361 `debputy lint`'s terminal output will display the message and display the selected range after 

362 the diagnostic itself. 

363 

364 :param text_range: The text range to highlight. 

365 :param message: The message to associate with the provided text range. 

366 :param doc_uri: The URI of the document that the context is from. When omitted, the text range is 

367 assumed to be from the "current" file (the `doc_uri` attribute), which is also the default file 

368 for ranges passed to `emit_diagnostic`. 

369 :return: 

370 """ 

371 return RelatedDiagnosticInformation( 

372 text_range, 

373 message, 

374 doc_uri=doc_uri if doc_uri is not None else self.doc_uri, 

375 ) 

376 

377 def emit_diagnostic( 

378 self, 

379 text_range: "TERange", 

380 diagnostic_msg: str, 

381 severity: LintSeverity, 

382 authority_reference: str, 

383 *, 

384 quickfixes: list[dict] | None = None, 

385 tags: list[types.DiagnosticTag] | None = None, 

386 related_information: list[RelatedDiagnosticInformation] | None = None, 

387 diagnostic_applies_to_another_file: str | None = None, 

388 enable_non_interactive_auto_fix: bool = True, 

389 ) -> None: 

390 """Emit a diagnostic for an issue detected in the current file. 

391 

392 :param text_range: The text range to highlight in the file. 

393 :param diagnostic_msg: The message to show to the user for this diagnostic 

394 :param severity: The severity to associate with the diagnostic. 

395 :param authority_reference: A reference to the authority / guide that this diagnostic is a violation of. 

396 

397 Use: 

398 * "Policy 3.4.1" for Debian Policy Manual section 3.4.1 

399 (replace the section number with the relevant number for your case) 

400 * "DevRef 6.2.2" for the Debian Developer Reference section 6.2.2 

401 (replace the section number with the relevant number for your case) 

402 * "debputy" for diagnostics without a reference or where `debputy` is the authority. 

403 (This is also used for cases where `debputy` filters the result. Like with spellchecking 

404 via hunspell, where `debputy` provides its own ignore list on top) 

405 

406 If you need a new reference, feel free to add it to this list. 

407 :param quickfixes: If provided, this is a list of possible fixes for this problem. 

408 Use the quickfixes provided in `debputy.lsp.quickfixes` such as `propose_correct_text_quick_fix`. 

409 :param tags: TODO: Not yet specified (currently uses LSP format). 

410 :param related_information: Provide additional context to the diagnostic. This can be used to define 

411 the source of a conflict. As an example, for duplicate definitions, this can be used to show where 

412 the definitions are. 

413 

414 Every item should be created via the `related_diagnostic_information` method. 

415 :param enable_non_interactive_auto_fix: Allow non-interactive auto-fixing (such as via 

416 `debputy lint --auto-fix`) of this issue. Set to `False` if the check is likely to have false 

417 positives. 

418 :param diagnostic_applies_to_another_file: Special-case parameter for flagging invalid file names. 

419 Leave this one at `None`, unless you know you need it. 

420 

421 It has non-obvious semantics and is primarily useful for reporting typos of filenames such as 

422 `debian/install`, etc. 

423 """ 

424 _check_diagnostic_source(authority_reference) 

425 lsp_severity = LINT_SEVERITY2LSP_SEVERITY[severity] 

426 diag_data: DiagnosticData = { 

427 "enable_non_interactive_auto_fix": enable_non_interactive_auto_fix, 

428 } 

429 

430 if severity not in NATIVELY_LSP_SUPPORTED_SEVERITIES: 

431 diag_data["lint_severity"] = severity 

432 if quickfixes: 

433 diag_data["quickfixes"] = quickfixes 

434 if diagnostic_applies_to_another_file is not None: 

435 diag_data["report_for_related_file"] = diagnostic_applies_to_another_file 

436 

437 lsp_range_client_units = self.position_codec.range_to_client_units( 

438 self.lines, 

439 te_range_to_lsp(text_range), 

440 ) 

441 

442 if related_information and any( 442 ↛ 445line 442 didn't jump to line 445 because the condition on line 442 was never true

443 i.doc_uri != self.doc_uri for i in related_information 

444 ): 

445 raise NotImplementedError("Ranges from another document will be wrong") 

446 

447 related_lsp_format = ( 

448 [i.to_lsp(self) for i in related_information] 

449 if related_information 

450 else None 

451 ) 

452 diag = types.Diagnostic( 

453 lsp_range_client_units, 

454 diagnostic_msg, 

455 severity=lsp_severity, 

456 source=authority_reference, 

457 data=diag_data if diag_data else None, 

458 tags=tags, 

459 related_information=related_lsp_format, 

460 ) 

461 self._emit_diagnostic(diag) 

462 

463 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

464 raise NotImplementedError 

465 

466 

467CLI_WORKSPACE_TEXT_EDIT_SUPPORT = WorkspaceTextEditSupport( 

468 supports_document_changes=True, 

469) 

470 

471 

472@dataclasses.dataclass(slots=True) 

473class LintStateImpl(LintState): 

474 plugin_feature_set: PluginProvidedFeatureSet = dataclasses.field(repr=False) 

475 maint_preference_table: "MaintainerPreferenceTable" = dataclasses.field(repr=False) 

476 source_root: VirtualPathBase | None 

477 debian_dir: VirtualPathBase | None 

478 path: str 

479 content: str 

480 lines: list[str] 

481 debputy_config: DebputyConfig 

482 source_package: SourcePackage | None = None 

483 binary_packages: Mapping[str, BinaryPackage] | None = None 

484 effective_preference: Optional["EffectiveFormattingPreference"] = None 

485 lint_implementation: Optional["AsyncLinterImpl"] = None 

486 _parsed_cache: Deb822FileElement | None = None 

487 _dh_sequencer_cache: DhSequencerData | None = None 

488 _diagnostics: list[types.Diagnostic] | None = None 

489 

490 @property 

491 def doc_uri(self) -> str: 

492 path = self.path 

493 abs_path = os.path.join(os.path.curdir, path) 

494 return f"file://{abs_path}" 

495 

496 @property 

497 def doc_version(self) -> int | None: 

498 return None 

499 

500 @property 

501 def position_codec(self) -> "LintCapablePositionCodec": 

502 return LINTER_POSITION_CODEC 

503 

504 @property 

505 def parsed_deb822_file_content(self) -> Deb822FileElement | None: 

506 cache = self._parsed_cache 

507 if cache is None: 

508 cache = parse_deb822_file( 

509 self.lines, 

510 accept_files_with_error_tokens=True, 

511 accept_files_with_duplicated_fields=True, 

512 ) 

513 self._parsed_cache = cache 

514 return cache 

515 

516 @property 

517 def dh_sequencer_data(self) -> DhSequencerData: 

518 dh_sequencer_cache = self._dh_sequencer_cache 

519 if dh_sequencer_cache is None: 

520 debian_dir = self.debian_dir 

521 dh_sequences: set[str] = set() 

522 saw_dh = False 

523 src_pkg = self.source_package 

524 drules = debian_dir.get("rules") if debian_dir is not None else None 

525 if drules and drules.is_file: 

526 try: 

527 with drules.open() as fd: 

528 saw_dh = parse_drules_for_addons(fd, dh_sequences) 

529 except PureVirtualPathError: 

530 pass 

531 if src_pkg: 

532 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

533 

534 dh_sequencer_cache = DhSequencerData( 

535 frozenset(dh_sequences), 

536 saw_dh, 

537 ) 

538 self._dh_sequencer_cache = dh_sequencer_cache 

539 return dh_sequencer_cache 

540 

541 @property 

542 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

543 return CLI_WORKSPACE_TEXT_EDIT_SUPPORT 

544 

545 async def gather_diagnostics(self) -> list[types.Diagnostic]: 

546 if self._diagnostics is not None: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 raise RuntimeError( 

548 "run_diagnostics cannot be run while it is already running" 

549 ) 

550 linter = self.lint_implementation 

551 if linter is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 raise TypeError( 

553 "run_diagnostics cannot be run:" 

554 " LintState was created without a lint implementation (such as for reformat-only)" 

555 ) 

556 self._diagnostics = diagnostics = [] 

557 

558 await linter(self) 

559 

560 self._diagnostics = None 

561 return diagnostics 

562 

563 def clear_cache(self) -> None: 

564 self._parsed_cache = None 

565 self._dh_sequencer_cache = None 

566 

567 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

568 diagnostics = self._diagnostics 

569 if diagnostics is None: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 raise TypeError("Cannot run emit_diagnostic outside of gather_diagnostics") 

571 diagnostics.append(diagnostic) 

572 

573 

574class LintDiagnosticResultState(IntEnum): 

575 REPORTED = 1 

576 MANUAL_FIXABLE = 2 

577 AUTO_FIXABLE = 3 

578 FIXED = 4 

579 

580 

581@dataclasses.dataclass(slots=True, frozen=True) 

582class LintDiagnosticResult: 

583 diagnostic: types.Diagnostic 

584 result_state: LintDiagnosticResultState 

585 invalid_marker: RuntimeError | None 

586 is_file_level_diagnostic: bool 

587 has_broken_range: bool 

588 missing_severity: bool 

589 discovered_in: str 

590 report_for_related_file: str | None 

591 

592 

593class LintReport: 

594 

595 def __init__(self) -> None: 

596 self.diagnostics_count: Counter[types.DiagnosticSeverity] = Counter() 

597 self.diagnostics_by_file: Mapping[str, list[LintDiagnosticResult]] = ( 

598 defaultdict(list) 

599 ) 

600 self.number_of_invalid_diagnostics: int = 0 

601 self.number_of_broken_diagnostics: int = 0 

602 self.lint_state: LintState | None = None 

603 self.start_timestamp = datetime.datetime.now() 

604 self.durations: dict[str, float] = collections.defaultdict(float) 

605 self._timer = time.perf_counter() 

606 

607 @contextlib.contextmanager 

608 def line_state(self, lint_state: LintState) -> Iterable[None]: 

609 previous = self.lint_state 

610 if previous is not None: 

611 path = previous.path 

612 duration = time.perf_counter() - self._timer 

613 self.durations[path] += duration 

614 

615 self.lint_state = lint_state 

616 

617 try: 

618 self._timer = time.perf_counter() 

619 yield 

620 finally: 

621 now = time.perf_counter() 

622 duration = now - self._timer 

623 self.durations[lint_state.path] += duration 

624 self._timer = now 

625 self.lint_state = previous 

626 

627 def report_diagnostic( 

628 self, 

629 diagnostic: types.Diagnostic, 

630 *, 

631 result_state: LintDiagnosticResultState = LintDiagnosticResultState.REPORTED, 

632 in_file: str | None = None, 

633 ) -> None: 

634 lint_state = self.lint_state 

635 assert lint_state is not None 

636 if in_file is None: 

637 in_file = lint_state.path 

638 discovered_in_file = in_file 

639 severity = diagnostic.severity 

640 missing_severity = False 

641 error_marker: RuntimeError | None = None 

642 if severity is None: 

643 self.number_of_invalid_diagnostics += 1 

644 severity = types.DiagnosticSeverity.Warning 

645 diagnostic.severity = severity 

646 missing_severity = True 

647 

648 lines = lint_state.lines 

649 diag_range = diagnostic.range 

650 start_pos = diag_range.start 

651 end_pos = diag_range.end 

652 diag_data = diagnostic.data 

653 if isinstance(diag_data, dict): 

654 report_for_related_file = diag_data.get("report_for_related_file") 

655 if report_for_related_file is None or not isinstance( 

656 report_for_related_file, str 

657 ): 

658 report_for_related_file = None 

659 else: 

660 in_file = report_for_related_file 

661 # Force it to exist in self.durations, since subclasses can use .items() or "foo" in self.durations. 

662 if in_file not in self.durations: 

663 self.durations[in_file] = 0 

664 else: 

665 report_for_related_file = None 

666 if report_for_related_file is not None: 

667 is_file_level_diagnostic = True 

668 else: 

669 is_file_level_diagnostic = _is_file_level_diagnostic( 

670 lines, 

671 start_pos.line, 

672 start_pos.character, 

673 end_pos.line, 

674 end_pos.character, 

675 ) 

676 has_broken_range = not is_file_level_diagnostic and ( 

677 end_pos.line > len(lines) or start_pos.line < 0 

678 ) 

679 

680 if has_broken_range or missing_severity: 

681 error_marker = RuntimeError("Registration Marker for invalid diagnostic") 

682 

683 diagnostic_result = LintDiagnosticResult( 

684 diagnostic, 

685 result_state, 

686 error_marker, 

687 is_file_level_diagnostic, 

688 has_broken_range, 

689 missing_severity, 

690 report_for_related_file=report_for_related_file, 

691 discovered_in=discovered_in_file, 

692 ) 

693 

694 self.diagnostics_by_file[in_file].append(diagnostic_result) 

695 self.diagnostics_count[severity] += 1 

696 self.process_diagnostic(in_file, lint_state, diagnostic_result) 

697 

698 def process_diagnostic( 

699 self, 

700 filename: str, 

701 lint_state: LintState, 

702 diagnostic_result: LintDiagnosticResult, 

703 ) -> None: 

704 # Subclass hook 

705 pass 

706 

707 def finish_report(self) -> None: 

708 # Subclass hook 

709 pass 

710 

711 

712_LS2DEBPUTY_SEVERITY: Mapping[types.DiagnosticSeverity, LintSeverity] = { 

713 types.DiagnosticSeverity.Error: "error", 

714 types.DiagnosticSeverity.Warning: "warning", 

715 types.DiagnosticSeverity.Information: "informational", 

716 types.DiagnosticSeverity.Hint: "pedantic", 

717} 

718 

719 

720_TERM_SEVERITY2TAG = { 

721 types.DiagnosticSeverity.Error: lambda fo, lint_tag=None: fo.colored( 

722 lint_tag if lint_tag else "error", 

723 fg="red", 

724 bg="black", 

725 style="bold", 

726 ), 

727 types.DiagnosticSeverity.Warning: lambda fo, lint_tag=None: fo.colored( 

728 lint_tag if lint_tag else "warning", 

729 fg="yellow", 

730 bg="black", 

731 style="bold", 

732 ), 

733 types.DiagnosticSeverity.Information: lambda fo, lint_tag=None: fo.colored( 

734 lint_tag if lint_tag else "informational", 

735 fg="blue", 

736 bg="black", 

737 style="bold", 

738 ), 

739 types.DiagnosticSeverity.Hint: lambda fo, lint_tag=None: fo.colored( 

740 lint_tag if lint_tag else "pedantic", 

741 fg="green", 

742 bg="black", 

743 style="bold", 

744 ), 

745} 

746 

747 

748def debputy_severity(diagnostic: types.Diagnostic) -> LintSeverity: 

749 lint_tag: LintSeverity | None = None 

750 if isinstance(diagnostic.data, dict): 

751 lint_tag = cast("LintSeverity", diagnostic.data.get("lint_severity")) 

752 

753 if lint_tag is not None: 

754 return lint_tag 

755 severity = diagnostic.severity 

756 if severity is None: 

757 return "warning" 

758 return _LS2DEBPUTY_SEVERITY.get(severity, "warning") 

759 

760 

761class TermLintReport(LintReport): 

762 

763 def __init__(self, fo: IOBasedOutputStyling) -> None: 

764 super().__init__() 

765 self.fo = fo 

766 

767 def finish_report(self) -> None: 

768 # Nothing to do for now 

769 pass 

770 

771 def process_diagnostic( 

772 self, 

773 filename: str, 

774 lint_state: LintState, 

775 diagnostic_result: LintDiagnosticResult, 

776 ) -> None: 

777 diagnostic = diagnostic_result.diagnostic 

778 fo = self.fo 

779 severity = diagnostic.severity 

780 assert severity is not None 

781 if diagnostic_result.result_state != LintDiagnosticResultState.FIXED: 

782 tag_unresolved = _TERM_SEVERITY2TAG[severity] 

783 lint_tag: LintSeverity | None = debputy_severity(diagnostic) 

784 tag = tag_unresolved(fo, lint_tag) 

785 else: 

786 tag = fo.colored( 

787 "auto-fixing", 

788 fg="magenta", 

789 bg="black", 

790 style="bold", 

791 ) 

792 

793 if diagnostic_result.is_file_level_diagnostic: 

794 start_line = 0 

795 start_position = 0 

796 end_line = 0 

797 end_position = 0 

798 else: 

799 start_line = diagnostic.range.start.line 

800 start_position = diagnostic.range.start.character 

801 end_line = diagnostic.range.end.line 

802 end_position = diagnostic.range.end.character 

803 

804 authority = diagnostic.source 

805 assert authority is not None 

806 diag_tags = f" [{authority}]" 

807 lines = lint_state.lines 

808 line_no_format_width = len(str(len(lines))) 

809 

810 if diagnostic_result.result_state == LintDiagnosticResultState.AUTO_FIXABLE: 

811 diag_tags += "[Correctable via --auto-fix]" 

812 elif diagnostic_result.result_state == LintDiagnosticResultState.MANUAL_FIXABLE: 

813 diag_tags += "[LSP interactive quickfix]" 

814 

815 code = f"[{diagnostic.code}]: " if diagnostic.code else "" 

816 msg = f"{code}{diagnostic.message}" 

817 

818 print( 

819 f"{tag}: File: {filename}:{start_line+1}:{start_position}:{end_line+1}:{end_position}: {msg}{diag_tags}", 

820 ) 

821 if diagnostic_result.missing_severity: 

822 _warn( 

823 " This warning did not have an explicit severity; Used Warning as a fallback!" 

824 ) 

825 if diagnostic_result.result_state == LintDiagnosticResultState.FIXED: 

826 # If it is fixed, there is no reason to show additional context. 

827 return 

828 if diagnostic_result.is_file_level_diagnostic: 

829 print(" File-level diagnostic") 

830 return 

831 if diagnostic_result.has_broken_range: 

832 _warn( 

833 "Bug in the underlying linter: The line numbers of the warning does not fit in the file..." 

834 ) 

835 return 

836 self._print_range_context(diagnostic.range, lines, line_no_format_width) 

837 related_info_list = diagnostic.related_information or [] 

838 hint_tag = fo.colored( 

839 "Related information", 

840 fg="magenta", 

841 bg="black", 

842 style="bold", 

843 ) 

844 for related_info in related_info_list: 

845 if related_info.location.uri != lint_state.doc_uri: 

846 continue 

847 print(f" {hint_tag}: {related_info.message}") 

848 self._print_range_context( 

849 related_info.location.range, lines, line_no_format_width 

850 ) 

851 

852 def _print_range_context( 

853 self, 

854 print_range: types.Range, 

855 lines: list[str], 

856 line_no_format_width: int, 

857 ) -> None: 

858 lines_to_print = _lines_to_print(print_range) 

859 fo = self.fo 

860 start_line = print_range.start.line 

861 for line_no in range(start_line, start_line + lines_to_print): 

862 line = _highlight_range(fo, lines[line_no], line_no, print_range) 

863 print(f" {line_no + 1:{line_no_format_width}}: {line}") 

864 

865 

866class LinterPositionCodec: 

867 

868 def client_num_units(self, chars: str): 

869 return len(chars) 

870 

871 def position_from_client_units( 

872 self, 

873 lines: list[str], 

874 position: types.Position, 

875 ) -> types.Position: 

876 

877 if len(lines) == 0: 

878 return types.Position(0, 0) 

879 if position.line >= len(lines): 

880 return types.Position(len(lines) - 1, self.client_num_units(lines[-1])) 

881 return position 

882 

883 def position_to_client_units( 

884 self, 

885 _lines: list[str], 

886 position: types.Position, 

887 ) -> types.Position: 

888 return position 

889 

890 def range_from_client_units( 

891 self, _lines: list[str], range: types.Range 

892 ) -> types.Range: 

893 return range 

894 

895 def range_to_client_units( 

896 self, _lines: list[str], range: types.Range 

897 ) -> types.Range: 

898 return range 

899 

900 

901LINTER_POSITION_CODEC = LinterPositionCodec() 

902 

903 

904def _lines_to_print(range_: types.Range) -> int: 

905 count = range_.end.line - range_.start.line 

906 if range_.end.character > 0: 

907 count += 1 

908 return count 

909 

910 

911def _highlight_range( 

912 fo: IOBasedOutputStyling, 

913 line: str, 

914 line_no: int, 

915 range_: types.Range, 

916) -> str: 

917 line_wo_nl = line.rstrip("\r\n") 

918 start_pos = 0 

919 prefix = "" 

920 suffix = "" 

921 if line_no == range_.start.line: 

922 start_pos = range_.start.character 

923 prefix = line_wo_nl[0:start_pos] 

924 if line_no == range_.end.line: 

925 end_pos = range_.end.character 

926 suffix = line_wo_nl[end_pos:] 

927 else: 

928 end_pos = len(line_wo_nl) 

929 

930 marked_part = fo.colored(line_wo_nl[start_pos:end_pos], fg="red", style="bold") 

931 

932 return prefix + marked_part + suffix 

933 

934 

935def _is_file_level_diagnostic( 

936 lines: list[str], 

937 start_line: int, 

938 start_position: int, 

939 end_line: int, 

940 end_position: int, 

941) -> bool: 

942 if start_line != 0 or start_position != 0: 

943 return False 

944 line_count = len(lines) 

945 if end_line + 1 == line_count and end_position == 0: 

946 return True 

947 return end_line == line_count and line_count and end_position == len(lines[-1])