Coverage for src/debputy/lsp/debputy_ls.py: 54%

401 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import asyncio 

2import dataclasses 

3import os 

4import time 

5from collections.abc import AsyncIterable 

6from typing import ( 

7 Optional, 

8 List, 

9 Any, 

10 Mapping, 

11 Container, 

12 TYPE_CHECKING, 

13 Tuple, 

14 Literal, 

15 Set, 

16 Dict, 

17 Iterable, 

18) 

19 

20import debputy.l10n as l10n 

21from debputy.dh.dh_assistant import ( 

22 parse_drules_for_addons, 

23 DhSequencerData, 

24 extract_dh_addons_from_control, 

25) 

26from debputy.filesystem_scan import FSROOverlay, VirtualPathBase 

27from debputy.l10n import Translations 

28from debputy.linting.lint_util import ( 

29 LintState, 

30 AsyncLinterImpl, 

31 AbortTaskError, 

32) 

33from debputy.lsp.apt_cache import AptCache 

34from debputy.lsp.diagnostics import DiagnosticReport 

35from debputy.lsp.maint_prefs import ( 

36 MaintainerPreferenceTable, 

37 MaintainerPreference, 

38 determine_effective_preference, 

39) 

40from debputy.lsp.text_util import LintCapablePositionCodec 

41from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

42from debputy.packages import ( 

43 SourcePackage, 

44 BinaryPackage, 

45 DctrlParser, 

46) 

47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

48from debputy.util import _info, _warn, T 

49from debputy.yaml import MANIFEST_YAML, YAMLError 

50from debputy.yaml.compat import CommentedMap 

51 

52if TYPE_CHECKING: 

53 import lsprotocol.types as types 

54 from pygls.server import LanguageServer 

55 from pygls.workspace import TextDocument 

56 from pygls.uris import from_fs_path 

57 

58else: 

59 import debputy.lsprotocol.types as types 

60 

61 try: 

62 from pygls.server import LanguageServer 

63 from pygls.workspace import TextDocument 

64 from pygls.uris import from_fs_path 

65 except ImportError as e: 

66 

67 class LanguageServer: 

68 def __init__(self, *args, **kwargs) -> None: 

69 """Placeholder to work if pygls is not installed""" 

70 # Should not be called 

71 global e 

72 raise e # pragma: no cover 

73 

74 

75@dataclasses.dataclass(slots=True) 

76class FileCache: 

77 doc_uri: str 

78 path: str 

79 is_open_in_editor: Optional[bool] = None 

80 last_doc_version: Optional[int] = None 

81 last_mtime: Optional[float] = None 

82 is_valid: bool = False 

83 

84 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

85 raise NotImplementedError 

86 

87 def _clear_cache(self) -> None: 

88 raise NotImplementedError 

89 

90 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool: 

91 doc = ls.workspace.text_documents.get(self.doc_uri) 

92 if doc is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 doc = ls.workspace.get_text_document(self.doc_uri) 

94 is_open = False 

95 else: 

96 is_open = True 

97 new_content: Optional[str] = None 

98 if is_open: 98 ↛ 110line 98 didn't jump to line 110 because the condition on line 98 was always true

99 last_doc_version = self.last_doc_version 

100 dctrl_doc_version = doc.version 

101 if ( 101 ↛ 109line 101 didn't jump to line 109

102 not self.is_open_in_editor 

103 or last_doc_version is None 

104 or dctrl_doc_version is None 

105 or last_doc_version < dctrl_doc_version 

106 ): 

107 new_content = doc.source 

108 

109 self.last_doc_version = doc.version 

110 elif doc.uri.startswith("file://"): 

111 try: 

112 with open(self.path) as fd: 

113 st = os.fstat(fd.fileno()) 

114 current_mtime = st.st_mtime 

115 last_mtime = self.last_mtime or current_mtime - 1 

116 if self.is_open_in_editor or current_mtime > last_mtime: 

117 new_content = fd.read() 

118 self.last_mtime = current_mtime 

119 except FileNotFoundError: 

120 self._clear_cache() 

121 self.is_valid = False 

122 return False 

123 if new_content is not None: 123 ↛ 125line 123 didn't jump to line 125 because the condition on line 123 was always true

124 self._update_cache(doc, new_content) 

125 self.is_valid = True 

126 return True 

127 

128 

129@dataclasses.dataclass(slots=True) 

130class Deb822FileCache(FileCache): 

131 deb822_file: Optional[Deb822FileElement] = None 

132 

133 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

134 deb822_file = parse_deb822_file( 

135 source.splitlines(keepends=True), 

136 accept_files_with_error_tokens=True, 

137 accept_files_with_duplicated_fields=True, 

138 ) 

139 self.deb822_file = deb822_file 

140 

141 def _clear_cache(self) -> None: 

142 self.deb822_file = None 

143 

144 

145@dataclasses.dataclass(slots=True) 

146class DctrlFileCache(Deb822FileCache): 

147 dctrl_parser: Optional[DctrlParser] = None 

148 source_package: Optional[SourcePackage] = None 

149 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

150 

151 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

152 deb822_file, source_package, binary_packages = ( 

153 self.dctrl_parser.parse_source_debian_control( 

154 source.splitlines(keepends=True), 

155 ignore_errors=True, 

156 ) 

157 ) 

158 self.deb822_file = deb822_file 

159 self.source_package = source_package 

160 self.binary_packages = binary_packages 

161 

162 def _clear_cache(self) -> None: 

163 super()._clear_cache() 

164 self.source_package = None 

165 self.binary_packages = None 

166 

167 

168@dataclasses.dataclass(slots=True) 

169class SalsaCICache(FileCache): 

170 parsed_content: Optional[CommentedMap] = None 

171 

172 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

173 try: 

174 value = MANIFEST_YAML.load(source) 

175 if isinstance(value, CommentedMap): 

176 self.parsed_content = value 

177 except YAMLError: 

178 pass 

179 

180 def _clear_cache(self) -> None: 

181 self.parsed_content = None 

182 

183 

184@dataclasses.dataclass(slots=True) 

185class DebianRulesCache(FileCache): 

186 sequences: Optional[Set[str]] = None 

187 saw_dh: bool = False 

188 

189 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

190 sequences = set() 

191 self.saw_dh = parse_drules_for_addons( 

192 source.splitlines(), 

193 sequences, 

194 ) 

195 self.sequences = sequences 

196 

197 def _clear_cache(self) -> None: 

198 self.sequences = None 

199 self.saw_dh = False 

200 

201 

202class LSProvidedLintState(LintState): 

203 def __init__( 

204 self, 

205 ls: "DebputyLanguageServer", 

206 doc: "TextDocument", 

207 source_root: str, 

208 debian_dir_path: str, 

209 dctrl_parser: DctrlParser, 

210 ) -> None: 

211 self._ls = ls 

212 self._doc = doc 

213 # Cache lines (doc.lines re-splits everytime) 

214 self._lines = doc.lines 

215 self._source_root = FSROOverlay.create_root_dir(".", source_root) 

216 debian_dir = self._source_root.get("debian") 

217 if debian_dir is not None and not debian_dir.is_dir: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 debian_dir = None 

219 self._debian_dir = debian_dir 

220 self._diagnostics: Optional[List[types.Diagnostic]] = None 

221 self._last_emitted_diagnostic_count = 0 

222 dctrl_file = os.path.join(debian_dir_path, "control") 

223 

224 if dctrl_file != doc.path: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 self._dctrl_cache: DctrlFileCache = DctrlFileCache( 

226 from_fs_path(dctrl_file), 

227 dctrl_file, 

228 dctrl_parser=dctrl_parser, 

229 ) 

230 self._deb822_file: Deb822FileCache = Deb822FileCache( 

231 doc.uri, 

232 doc.path, 

233 ) 

234 else: 

235 self._dctrl_cache: DctrlFileCache = DctrlFileCache( 

236 doc.uri, 

237 doc.path, 

238 dctrl_parser=dctrl_parser, 

239 ) 

240 self._deb822_file = self._dctrl_cache 

241 

242 self._salsa_ci_caches = [ 

243 SalsaCICache( 

244 from_fs_path(os.path.join(debian_dir_path, p)), 

245 os.path.join(debian_dir_path, p), 

246 ) 

247 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml")) 

248 ] 

249 drules_path = os.path.join(debian_dir_path, "rules") 

250 self._drules_cache = DebianRulesCache( 

251 from_fs_path(drules_path) if doc.path != drules_path else doc.uri, 

252 drules_path, 

253 ) 

254 

255 @property 

256 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

257 return self._ls.plugin_feature_set 

258 

259 @property 

260 def doc_uri(self) -> str: 

261 return self._doc.uri 

262 

263 @property 

264 def source_root(self) -> Optional[VirtualPathBase]: 

265 return self._source_root 

266 

267 @property 

268 def debian_dir(self) -> Optional[VirtualPathBase]: 

269 return self._debian_dir 

270 

271 @property 

272 def path(self) -> str: 

273 return self._doc.path 

274 

275 @property 

276 def content(self) -> str: 

277 return self._doc.source 

278 

279 @property 

280 def lines(self) -> List[str]: 

281 return self._lines 

282 

283 @property 

284 def position_codec(self) -> LintCapablePositionCodec: 

285 return self._doc.position_codec 

286 

287 def _resolve_dctrl(self) -> Optional[DctrlFileCache]: 

288 dctrl_cache = self._dctrl_cache 

289 dctrl_cache.resolve_cache(self._ls) 

290 return dctrl_cache 

291 

292 @property 

293 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

294 cache = self._deb822_file 

295 cache.resolve_cache(self._ls) 

296 return cache.deb822_file 

297 

298 @property 

299 def source_package(self) -> Optional[SourcePackage]: 

300 return self._resolve_dctrl().source_package 

301 

302 @property 

303 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

304 return self._resolve_dctrl().binary_packages 

305 

306 def _resolve_salsa_ci(self) -> Optional[CommentedMap]: 

307 for salsa_ci_cache in self._salsa_ci_caches: 

308 if salsa_ci_cache.resolve_cache(self._ls): 

309 return salsa_ci_cache.parsed_content 

310 return None 

311 

312 @property 

313 def effective_preference(self) -> Optional[MaintainerPreference]: 

314 source_package = self.source_package 

315 salsa_ci = self._resolve_salsa_ci() 

316 if source_package is None and salsa_ci is None: 

317 return None 

318 style, _, _ = determine_effective_preference( 

319 self.maint_preference_table, 

320 source_package, 

321 salsa_ci, 

322 ) 

323 return style 

324 

325 @property 

326 def maint_preference_table(self) -> MaintainerPreferenceTable: 

327 return self._ls.maint_preferences 

328 

329 @property 

330 def salsa_ci(self) -> Optional[CommentedMap]: 

331 return None 

332 

333 @property 

334 def dh_sequencer_data(self) -> DhSequencerData: 

335 dh_sequences: Set[str] = set() 

336 saw_dh = False 

337 src_pkg = self.source_package 

338 drules_cache = self._drules_cache 

339 if drules_cache.resolve_cache(self._ls): 

340 saw_dh = drules_cache.saw_dh 

341 if drules_cache.sequences: 

342 dh_sequences.update(drules_cache.sequences) 

343 if src_pkg: 

344 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

345 

346 return DhSequencerData( 

347 frozenset(dh_sequences), 

348 saw_dh, 

349 ) 

350 

351 def translation(self, domain: str) -> Translations: 

352 return self._ls.translation(domain) 

353 

354 async def slow_iter( 

355 self, 

356 iterable: Iterable[T], 

357 *, 

358 yield_every: int = 100, 

359 ) -> AsyncIterable[T]: 

360 counter = 0 

361 for value in iterable: 

362 counter += 1 

363 if counter >= yield_every: 

364 await asyncio.sleep(0) 

365 self._abort_on_outdated_doc_version() 

366 counter = 0 

367 yield value 

368 if counter: 

369 await asyncio.sleep(0) 

370 self._abort_on_outdated_doc_version() 

371 

372 async def run_diagnostics( 

373 self, 

374 linter: AsyncLinterImpl, 

375 ) -> List[types.Diagnostic]: 

376 if self._diagnostics is not None: 

377 raise RuntimeError( 

378 "run_diagnostics cannot be run while it is already running" 

379 ) 

380 self._diagnostics = diagnostics = [] 

381 

382 await linter(self) 

383 

384 self._diagnostics = None 

385 return diagnostics 

386 

387 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

388 diagnostics = self._diagnostics 

389 if diagnostics is None: 

390 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics") 

391 

392 diagnostics.append(diagnostic) 

393 self._last_emitted_diagnostic_count += 1 

394 if self._last_emitted_diagnostic_count >= 100: 

395 self._abort_on_outdated_doc_version() 

396 self._ls.record_diagnostics( 

397 self.doc_uri, 

398 self._doc.version, 

399 diagnostics, 

400 is_partial=True, 

401 ) 

402 self._last_emitted_diagnostic_count = 0 

403 

404 def _abort_on_outdated_doc_version(self) -> None: 

405 expected_version = self._doc.version 

406 current_doc = self._ls.workspace.get_text_document(self.doc_uri) 

407 if current_doc.version != expected_version: 

408 raise AbortTaskError( 

409 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed" 

410 ) 

411 

412 

413def _preference( 

414 client_preference: Optional[List[types.MarkupKind]], 

415 options: Container[types.MarkupKind], 

416 fallback_kind: types.MarkupKind, 

417) -> types.MarkupKind: 

418 if not client_preference: 418 ↛ 420line 418 didn't jump to line 420 because the condition on line 418 was always true

419 return fallback_kind 

420 for markdown_kind in client_preference: 

421 if markdown_kind in options: 

422 return markdown_kind 

423 return fallback_kind 

424 

425 

426class DebputyLanguageServer(LanguageServer): 

427 

428 def __init__( 

429 self, 

430 *args: Any, 

431 **kwargs: Any, 

432 ) -> None: 

433 super().__init__(*args, **kwargs) 

434 self._dctrl_parser: Optional[DctrlParser] = None 

435 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None 

436 self._trust_language_ids: Optional[bool] = None 

437 self._finished_initialization = False 

438 self.maint_preferences = MaintainerPreferenceTable({}, {}) 

439 self.apt_cache = AptCache() 

440 self.background_tasks = set() 

441 self.client_locale: Optional[str] = None 

442 self.forced_locale: Optional[str] = None 

443 self._active_locale: Optional[List[str]] = None 

444 self._diagnostic_reports: Dict[str, DiagnosticReport] = {} 

445 

446 def finish_startup_initialization(self) -> None: 

447 if self._finished_initialization: 

448 return 

449 assert self._dctrl_parser is not None 

450 assert self._plugin_feature_set is not None 

451 assert self._trust_language_ids is not None 

452 self.maint_preferences = self.maint_preferences.load_preferences() 

453 _info( 

454 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded" 

455 ) 

456 self._finished_initialization = True 

457 

458 async def on_initialize(self, params: types.InitializeParams) -> None: 

459 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache") 

460 self.background_tasks.add(task) 

461 task.add_done_callback(self.background_tasks.discard) 

462 self.client_locale = params.locale 

463 if self.forced_locale is not None: 

464 _info( 

465 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]" 

466 ) 

467 else: 

468 _info( 

469 f"Client locale: {self.client_locale}. Use --force-locale to override" 

470 ) 

471 _info(f"Cwd: {os.getcwd()}") 

472 self._update_locale() 

473 

474 def _update_locale(self) -> None: 

475 if self.forced_locale is not None: 

476 self._active_locale = [self.forced_locale] 

477 elif self.client_locale is not None: 

478 self._active_locale = [self.client_locale] 

479 else: 

480 self._active_locale = None 

481 

482 def shutdown(self) -> None: 

483 for task in self.background_tasks: 

484 _info(f"Cancelling task: {task.get_name()}") 

485 self.loop.call_soon_threadsafe(task.cancel) 

486 return super().shutdown() 

487 

488 def translation(self, domain: str) -> Translations: 

489 return l10n.translation( 

490 domain, 

491 languages=self._active_locale, 

492 ) 

493 

494 async def _load_apt_cache(self) -> None: 

495 if self.apt_cache.state in ("loading", "loaded"): 

496 _info( 

497 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering" 

498 ) 

499 return 

500 _info("Starting load of apt cache data") 

501 start = time.time() 

502 try: 

503 await self.apt_cache.load() 

504 except ValueError as ex: 

505 _warn(f"Could not load apt cache: {ex}") 

506 else: 

507 end = time.time() 

508 _info( 

509 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}" 

510 ) 

511 

512 @property 

513 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

514 res = self._plugin_feature_set 

515 if res is None: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 raise RuntimeError( 

517 "Initialization error: The plugin feature set has not been initialized before it was needed." 

518 ) 

519 return res 

520 

521 @plugin_feature_set.setter 

522 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None: 

523 if self._plugin_feature_set is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 raise RuntimeError( 

525 "The plugin_feature_set attribute cannot be changed once set" 

526 ) 

527 self._plugin_feature_set = plugin_feature_set 

528 

529 @property 

530 def dctrl_parser(self) -> DctrlParser: 

531 res = self._dctrl_parser 

532 if res is None: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 raise RuntimeError( 

534 "Initialization error: The dctrl_parser has not been initialized before it was needed." 

535 ) 

536 return res 

537 

538 @dctrl_parser.setter 

539 def dctrl_parser(self, parser: DctrlParser) -> None: 

540 if self._dctrl_parser is not None: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 raise RuntimeError("The dctrl_parser attribute cannot be changed once set") 

542 self._dctrl_parser = parser 

543 

544 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState: 

545 dir_path = os.path.dirname(doc.path) 

546 

547 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 dir_path = os.path.dirname(dir_path) 

549 

550 source_root = os.path.dirname(dir_path) 

551 

552 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser) 

553 

554 @property 

555 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]: 

556 try: 

557 return ( 

558 self.client_capabilities.text_document.hover.content_format 

559 ) # type: ignore 

560 except AttributeError: 

561 return None 

562 

563 def hover_markup_format( 

564 self, 

565 *options: types.MarkupKind, 

566 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

567 ) -> types.MarkupKind: 

568 """Pick the client preferred hover markup format from a set of options 

569 

570 :param options: The markup kinds possible. 

571 :param fallback_kind: If no overlapping option was found in the client preferences 

572 (or client did not announce a value at all), this parameter is returned instead. 

573 :returns: The client's preferred markup format from the provided options, or, 

574 (if there is no overlap), the `fallback_kind` value is returned. 

575 """ 

576 client_preference = self._client_hover_markup_formats 

577 return _preference(client_preference, frozenset(options), fallback_kind) 

578 

579 @property 

580 def _client_completion_item_document_markup_formats( 

581 self, 

582 ) -> Optional[List[types.MarkupKind]]: 

583 try: 

584 return ( 

585 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore 

586 ) 

587 except AttributeError: 

588 return None 

589 

590 def completion_item_document_markup( 

591 self, 

592 *options: types.MarkupKind, 

593 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

594 ) -> types.MarkupKind: 

595 """Pick the client preferred completion item documentation markup format from a set of options 

596 

597 :param options: The markup kinds possible. 

598 :param fallback_kind: If no overlapping option was found in the client preferences 

599 (or client did not announce a value at all), this parameter is returned instead. 

600 :returns: The client's preferred markup format from the provided options, or, 

601 (if there is no overlap), the `fallback_kind` value is returned. 

602 """ 

603 

604 client_preference = self._client_completion_item_document_markup_formats 

605 return _preference(client_preference, frozenset(options), fallback_kind) 

606 

607 @property 

608 def trust_language_ids(self) -> bool: 

609 v = self._trust_language_ids 

610 if v is None: 

611 return True 

612 return v 

613 

614 @trust_language_ids.setter 

615 def trust_language_ids(self, new_value: bool) -> None: 

616 self._trust_language_ids = new_value 

617 

618 def determine_language_id( 

619 self, 

620 doc: "TextDocument", 

621 ) -> Tuple[Literal["editor-provided", "filename"], str, str]: 

622 lang_id = doc.language_id 

623 path = doc.path 

624 try: 

625 last_idx = path.rindex("debian/") 

626 except ValueError: 

627 cleaned_filename = os.path.basename(path) 

628 else: 

629 cleaned_filename = path[last_idx:] 

630 

631 if self.trust_language_ids and lang_id and not lang_id.isspace(): 

632 if lang_id not in ("fundamental",): 

633 return "editor-provided", lang_id, cleaned_filename 

634 _info( 

635 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)" 

636 ) 

637 

638 return "filename", cleaned_filename, cleaned_filename 

639 

640 def close_document(self, uri: str) -> None: 

641 try: 

642 del self._diagnostic_reports[uri] 

643 except KeyError: 

644 pass 

645 

646 async def slow_iter( 

647 self, 

648 iterable: Iterable[T], 

649 *, 

650 yield_every: int = 100, 

651 ) -> AsyncIterable[T]: 

652 counter = 0 

653 for value in iterable: 

654 counter += 1 

655 if counter >= yield_every: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 await asyncio.sleep(0) 

657 counter = 0 

658 yield value 

659 if counter: 659 ↛ exitline 659 didn't return from function 'slow_iter' because the condition on line 659 was always true

660 await asyncio.sleep(0) 

661 

662 def record_diagnostics( 

663 self, 

664 doc_uri: str, 

665 version: int, 

666 diagnostics: List[types.Diagnostic], 

667 is_partial: bool, 

668 ) -> None: 

669 self._diagnostic_reports[doc_uri] = DiagnosticReport( 

670 doc_uri, 

671 version, 

672 f"{version}@{doc_uri}", 

673 is_partial, 

674 diagnostics, 

675 ) 

676 self.publish_diagnostics(doc_uri, diagnostics, version) 

677 

678 def diagnostics_in_range( 

679 self, 

680 uri: str, 

681 text_range: types.Range, 

682 ) -> Optional[List[types.Diagnostic]]: 

683 report = self._diagnostic_reports.get(uri) 

684 if report is None or report.is_in_progress: 

685 return None 

686 doc = self.workspace.get_text_document(uri) 

687 if doc.version is not None and doc.version != report.doc_version: 

688 return None 

689 

690 return report.diagnostics_in_range(text_range)