Coverage for src/debputy/lsp/debputy_ls.py: 51%

460 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import asyncio 

2import collections 

3import dataclasses 

4import os 

5import time 

6from collections.abc import AsyncIterable 

7from contextlib import suppress 

8from typing import ( 

9 Optional, 

10 List, 

11 Any, 

12 Mapping, 

13 Container, 

14 TYPE_CHECKING, 

15 Tuple, 

16 Literal, 

17 Set, 

18 Dict, 

19 Iterable, 

20 Sequence, 

21 Callable, 

22 Type, 

23) 

24 

25import debputy.l10n as l10n 

26from debputy.commands.debputy_cmd.output import ( 

27 OutputStyle, 

28 MarkdownOutputStyle, 

29) 

30from debputy.dh.dh_assistant import ( 

31 parse_drules_for_addons, 

32 DhSequencerData, 

33 extract_dh_addons_from_control, 

34) 

35from debputy.filesystem_scan import FSROOverlay, VirtualPathBase 

36from debputy.l10n import Translations 

37from debputy.linting.lint_util import ( 

38 LintState, 

39 AsyncLinterImpl, 

40 AbortTaskError, 

41 WorkspaceTextEditSupport, 

42) 

43from debputy.lsp.apt_cache import AptCache 

44from debputy.lsp.config.debputy_config import load_debputy_config, DebputyConfig 

45from debputy.lsp.diagnostics import DiagnosticReport 

46from debputy.lsp.maint_prefs import ( 

47 MaintainerPreferenceTable, 

48 determine_effective_preference, 

49 EffectiveFormattingPreference, 

50) 

51from debputy.lsp.text_util import LintCapablePositionCodec 

52from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

53from debputy.packages import ( 

54 SourcePackage, 

55 BinaryPackage, 

56 DctrlParser, 

57) 

58from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

59from debputy.util import _info, _warn, T 

60from debputy.yaml import MANIFEST_YAML, YAMLError 

61from debputy.yaml.compat import CommentedMap 

62 

63if TYPE_CHECKING: 

64 import lsprotocol.types as types 

65 from pygls.server import LanguageServer 

66 from pygls.workspace import TextDocument 

67 from pygls.uris import from_fs_path, to_fs_path 

68 

69else: 

70 import debputy.lsprotocol.types as types 

71 

72 try: 

73 from pygls.server import LanguageServer 

74 from pygls.workspace import TextDocument 

75 from pygls.uris import from_fs_path, to_fs_path 

76 except ImportError as e: 

77 

78 class LanguageServer: 

79 def __init__(self, *args, **kwargs) -> None: 

80 """Placeholder to work if pygls is not installed""" 

81 # Should not be called 

82 global e 

83 raise e # pragma: no cover 

84 

85 

86@dataclasses.dataclass(slots=True) 

87class FileCache: 

88 doc_uri: str 

89 path: str 

90 is_open_in_editor: Optional[bool] = None 

91 last_doc_version: Optional[int] = None 

92 last_mtime: Optional[float] = None 

93 is_valid: bool = False 

94 

95 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

96 raise NotImplementedError 

97 

98 def _clear_cache(self) -> None: 

99 raise NotImplementedError 

100 

101 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool: 

102 doc = ls.workspace.text_documents.get(self.doc_uri) 

103 if doc is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 doc = ls.workspace.get_text_document(self.doc_uri) 

105 is_open = False 

106 else: 

107 is_open = True 

108 new_content: Optional[str] = None 

109 if is_open: 109 ↛ 122line 109 didn't jump to line 122 because the condition on line 109 was always true

110 last_doc_version = self.last_doc_version 

111 current_doc_version = doc.version 

112 if ( 112 ↛ 120line 112 didn't jump to line 120 because the condition on line 112 was always true

113 not self.is_open_in_editor 

114 or last_doc_version is None 

115 or current_doc_version is None 

116 or last_doc_version < current_doc_version 

117 ): 

118 new_content = doc.source 

119 

120 self.last_doc_version = doc.version 

121 self.is_open_in_editor = True 

122 elif doc.uri.startswith("file://"): 

123 try: 

124 with open(self.path) as fd: 

125 st = os.fstat(fd.fileno()) 

126 current_mtime = st.st_mtime 

127 last_mtime = self.last_mtime or current_mtime - 1 

128 if self.is_open_in_editor or current_mtime > last_mtime: 

129 new_content = fd.read() 

130 self.last_mtime = current_mtime 

131 except FileNotFoundError: 

132 self._clear_cache() 

133 self.is_valid = False 

134 return False 

135 self.is_open_in_editor = is_open 

136 if new_content is not None: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true

137 self._update_cache(doc, new_content) 

138 self.is_valid = True 

139 return True 

140 

141 

142@dataclasses.dataclass(slots=True) 

143class Deb822FileCache(FileCache): 

144 deb822_file: Optional[Deb822FileElement] = None 

145 

146 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

147 deb822_file = parse_deb822_file( 

148 source.splitlines(keepends=True), 

149 accept_files_with_error_tokens=True, 

150 accept_files_with_duplicated_fields=True, 

151 ) 

152 self.deb822_file = deb822_file 

153 

154 def _clear_cache(self) -> None: 

155 self.deb822_file = None 

156 

157 

158@dataclasses.dataclass(slots=True) 

159class DctrlFileCache(Deb822FileCache): 

160 dctrl_parser: Optional[DctrlParser] = None 

161 source_package: Optional[SourcePackage] = None 

162 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

163 

164 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

165 deb822_file, source_package, binary_packages = ( 

166 self.dctrl_parser.parse_source_debian_control( 

167 source.splitlines(keepends=True), 

168 ignore_errors=True, 

169 ) 

170 ) 

171 self.deb822_file = deb822_file 

172 self.source_package = source_package 

173 self.binary_packages = binary_packages 

174 

175 def _clear_cache(self) -> None: 

176 super()._clear_cache() 

177 self.source_package = None 

178 self.binary_packages = None 

179 

180 

181@dataclasses.dataclass(slots=True) 

182class SalsaCICache(FileCache): 

183 parsed_content: Optional[CommentedMap] = None 

184 

185 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

186 try: 

187 value = MANIFEST_YAML.load(source) 

188 if isinstance(value, CommentedMap): 

189 self.parsed_content = value 

190 except YAMLError: 

191 pass 

192 

193 def _clear_cache(self) -> None: 

194 self.parsed_content = None 

195 

196 

197@dataclasses.dataclass(slots=True) 

198class DebianRulesCache(FileCache): 

199 sequences: Optional[Set[str]] = None 

200 saw_dh: bool = False 

201 

202 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

203 sequences = set() 

204 self.saw_dh = parse_drules_for_addons( 

205 source.splitlines(), 

206 sequences, 

207 ) 

208 self.sequences = sequences 

209 

210 def _clear_cache(self) -> None: 

211 self.sequences = None 

212 self.saw_dh = False 

213 

214 

215class LSProvidedLintState(LintState): 

216 def __init__( 

217 self, 

218 ls: "DebputyLanguageServer", 

219 doc: "TextDocument", 

220 source_root: str, 

221 debian_dir_path: str, 

222 dctrl_parser: DctrlParser, 

223 ) -> None: 

224 self._ls = ls 

225 self._doc = doc 

226 # Cache lines (doc.lines re-splits everytime) 

227 self._lines = doc.lines 

228 self._source_root = FSROOverlay.create_root_dir(".", source_root) 

229 debian_dir = self._source_root.get("debian") 

230 if debian_dir is not None and not debian_dir.is_dir: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 debian_dir = None 

232 self._debian_dir = debian_dir 

233 self._diagnostics: Optional[List[types.Diagnostic]] = None 

234 self._last_emitted_diagnostic_count = 0 

235 dctrl_file = os.path.join(debian_dir_path, "control") 

236 

237 if dctrl_file != doc.path: 

238 

239 self._dctrl_cache: DctrlFileCache = ls.file_cache_for( 

240 from_fs_path(dctrl_file), 

241 dctrl_file, 

242 DctrlFileCache, 

243 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser), 

244 ) 

245 self._deb822_file: Deb822FileCache = ls.file_cache_for( 

246 doc.uri, 

247 doc.path, 

248 Deb822FileCache, 

249 lambda uri, path: Deb822FileCache(uri, path), 

250 ) 

251 else: 

252 self._dctrl_cache = ls.file_cache_for( 

253 doc.uri, 

254 doc.path, 

255 DctrlFileCache, 

256 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser), 

257 ) 

258 self._deb822_file = self._dctrl_cache 

259 

260 self._salsa_ci_caches = [ 

261 ls.file_cache_for( 

262 from_fs_path(os.path.join(debian_dir_path, p)), 

263 os.path.join(debian_dir_path, p), 

264 SalsaCICache, 

265 lambda uri, path: SalsaCICache(uri, path), 

266 ) 

267 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml")) 

268 ] 

269 drules_path = os.path.join(debian_dir_path, "rules") 

270 self._drules_cache = ls.file_cache_for( 

271 from_fs_path(drules_path) if doc.path != drules_path else doc.uri, 

272 drules_path, 

273 DebianRulesCache, 

274 lambda uri, path: DebianRulesCache(uri, path), 

275 ) 

276 

277 @property 

278 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

279 return self._ls.plugin_feature_set 

280 

281 @property 

282 def doc_uri(self) -> str: 

283 return self._doc.uri 

284 

285 @property 

286 def doc_version(self) -> Optional[int]: 

287 return self._doc.version 

288 

289 @property 

290 def source_root(self) -> Optional[VirtualPathBase]: 

291 return self._source_root 

292 

293 @property 

294 def debian_dir(self) -> Optional[VirtualPathBase]: 

295 return self._debian_dir 

296 

297 @property 

298 def path(self) -> str: 

299 return self._doc.path 

300 

301 @property 

302 def content(self) -> str: 

303 return self._doc.source 

304 

305 @property 

306 def lines(self) -> List[str]: 

307 return self._lines 

308 

309 @property 

310 def position_codec(self) -> LintCapablePositionCodec: 

311 return self._doc.position_codec 

312 

313 def _resolve_dctrl(self) -> Optional[DctrlFileCache]: 

314 dctrl_cache = self._dctrl_cache 

315 dctrl_cache.resolve_cache(self._ls) 

316 return dctrl_cache 

317 

318 @property 

319 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

320 cache = self._deb822_file 

321 cache.resolve_cache(self._ls) 

322 return cache.deb822_file 

323 

324 @property 

325 def source_package(self) -> Optional[SourcePackage]: 

326 return self._resolve_dctrl().source_package 

327 

328 @property 

329 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

330 return self._resolve_dctrl().binary_packages 

331 

332 def _resolve_salsa_ci(self) -> Optional[CommentedMap]: 

333 for salsa_ci_cache in self._salsa_ci_caches: 

334 if salsa_ci_cache.resolve_cache(self._ls): 

335 return salsa_ci_cache.parsed_content 

336 return None 

337 

338 @property 

339 def effective_preference(self) -> Optional[EffectiveFormattingPreference]: 

340 source_package = self.source_package 

341 salsa_ci = self._resolve_salsa_ci() 

342 if source_package is None and salsa_ci is None: 

343 return None 

344 style, _, _ = determine_effective_preference( 

345 self.maint_preference_table, 

346 source_package, 

347 salsa_ci, 

348 ) 

349 return style 

350 

351 @property 

352 def maint_preference_table(self) -> MaintainerPreferenceTable: 

353 return self._ls.maint_preferences 

354 

355 @property 

356 def salsa_ci(self) -> Optional[CommentedMap]: 

357 return None 

358 

359 @property 

360 def dh_sequencer_data(self) -> DhSequencerData: 

361 dh_sequences: Set[str] = set() 

362 saw_dh = False 

363 src_pkg = self.source_package 

364 drules_cache = self._drules_cache 

365 if drules_cache.resolve_cache(self._ls): 

366 saw_dh = drules_cache.saw_dh 

367 if drules_cache.sequences: 

368 dh_sequences.update(drules_cache.sequences) 

369 if src_pkg: 

370 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

371 

372 return DhSequencerData( 

373 frozenset(dh_sequences), 

374 saw_dh, 

375 ) 

376 

377 @property 

378 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

379 return self._ls.workspace_text_edit_support 

380 

381 @property 

382 def debputy_config(self) -> DebputyConfig: 

383 return self._ls.debputy_config 

384 

385 def translation(self, domain: str) -> Translations: 

386 return self._ls.translation(domain) 

387 

388 async def slow_iter( 

389 self, 

390 iterable: Iterable[T], 

391 *, 

392 yield_every: int = 100, 

393 ) -> AsyncIterable[T]: 

394 counter = 0 

395 for value in iterable: 

396 counter += 1 

397 if counter >= yield_every: 

398 await asyncio.sleep(0) 

399 self._abort_on_outdated_doc_version() 

400 counter = 0 

401 yield value 

402 if counter: 

403 await asyncio.sleep(0) 

404 self._abort_on_outdated_doc_version() 

405 

406 async def run_diagnostics( 

407 self, 

408 linter: AsyncLinterImpl, 

409 ) -> List[types.Diagnostic]: 

410 if self._diagnostics is not None: 

411 raise RuntimeError( 

412 "run_diagnostics cannot be run while it is already running" 

413 ) 

414 self._diagnostics = diagnostics = [] 

415 

416 await linter(self) 

417 

418 self._diagnostics = None 

419 return diagnostics 

420 

421 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

422 diagnostics = self._diagnostics 

423 if diagnostics is None: 

424 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics") 

425 

426 diagnostics.append(diagnostic) 

427 self._last_emitted_diagnostic_count += 1 

428 if self._last_emitted_diagnostic_count >= 100: 

429 self._abort_on_outdated_doc_version() 

430 self._ls.record_diagnostics( 

431 self.doc_uri, 

432 self._doc.version, 

433 diagnostics, 

434 is_partial=True, 

435 ) 

436 self._last_emitted_diagnostic_count = 0 

437 

438 def _abort_on_outdated_doc_version(self) -> None: 

439 expected_version = self._doc.version 

440 current_doc = self._ls.workspace.get_text_document(self.doc_uri) 

441 if current_doc.version != expected_version: 

442 raise AbortTaskError( 

443 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed" 

444 ) 

445 

446 

447def _preference( 

448 client_preference: Optional[List[types.MarkupKind]], 

449 options: Container[types.MarkupKind], 

450 fallback_kind: types.MarkupKind, 

451) -> types.MarkupKind: 

452 if not client_preference: 452 ↛ 454line 452 didn't jump to line 454 because the condition on line 452 was always true

453 return fallback_kind 

454 for markdown_kind in client_preference: 

455 if markdown_kind in options: 

456 return markdown_kind 

457 return fallback_kind 

458 

459 

460class DebputyLanguageServer(LanguageServer): 

461 

462 def __init__( 

463 self, 

464 *args: Any, 

465 **kwargs: Any, 

466 ) -> None: 

467 super().__init__(*args, **kwargs) 

468 self._dctrl_parser: Optional[DctrlParser] = None 

469 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None 

470 self._trust_language_ids: Optional[bool] = None 

471 self._finished_initialization = False 

472 self.maint_preferences = MaintainerPreferenceTable({}, {}) 

473 self.apt_cache = AptCache() 

474 self.background_tasks = set() 

475 self.client_locale: Optional[str] = None 

476 self.forced_locale: Optional[str] = None 

477 self._active_locale: Optional[List[str]] = None 

478 self._diagnostic_reports: Dict[str, DiagnosticReport] = {} 

479 self.workspace_text_edit_support = WorkspaceTextEditSupport() 

480 self.debputy_config = load_debputy_config() 

481 self._file_state_caches: Dict[str, Dict[Type[FileCache], FileCache]] = ( 

482 collections.defaultdict(dict) 

483 ) 

484 self.hover_output_style = OutputStyle() 

485 

486 def finish_startup_initialization(self) -> None: 

487 if self._finished_initialization: 

488 return 

489 

490 assert self._dctrl_parser is not None 

491 assert self._plugin_feature_set is not None 

492 assert self._trust_language_ids is not None 

493 self.maint_preferences = self.maint_preferences.load_preferences() 

494 _info( 

495 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded" 

496 ) 

497 if ( 

498 self.hover_markup_format( 

499 types.MarkupKind.Markdown, types.MarkupKind.PlainText 

500 ) 

501 == types.MarkupKind.Markdown 

502 ): 

503 self.hover_output_style = MarkdownOutputStyle() 

504 self._finished_initialization = True 

505 

506 async def on_initialize(self, params: types.InitializeParams) -> None: 

507 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache") 

508 self.background_tasks.add(task) 

509 task.add_done_callback(self.background_tasks.discard) 

510 self.client_locale = params.locale 

511 if self.forced_locale is not None: 

512 _info( 

513 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]" 

514 ) 

515 else: 

516 _info( 

517 f"Client locale: {self.client_locale}. Use --force-locale to override" 

518 ) 

519 _info(f"Cwd: {os.getcwd()}") 

520 self._update_locale() 

521 self.workspace_text_edit_support = WorkspaceTextEditSupport( 

522 supported_resource_operation_edit_kinds=self._supported_resource_operation_edit_kinds, 

523 supports_document_changes=self._supports_ws_document_changes, 

524 ) 

525 self._mask_ghost_requests() 

526 

527 def _mask_ghost_requests(self) -> None: 

528 try: 

529 if types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT not in self.lsp.fm.features: 

530 

531 # Work around `kate` bug (https://bugs.kde.org/show_bug.cgi?id=506664) for selected requests 

532 # that are really annoying (that is, triggers all the time). 

533 def _ghost_request(*_args: Any) -> None: 

534 _info( 

535 "Ignoring unsupported request from client that it should not have sent." 

536 ) 

537 

538 self.lsp.fm.features[types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT] = ( 

539 _ghost_request 

540 ) 

541 except (AttributeError, TypeError, KeyError, ValueError) as e: 

542 _info( 

543 f"Could install ghost handler, continuing without. Error was: {str(e)}" 

544 ) 

545 else: 

546 _info( 

547 f"Injecting fake {types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT} handler to work around bugs like KDE#506664" 

548 ) 

549 

550 def _update_locale(self) -> None: 

551 if self.forced_locale is not None: 

552 self._active_locale = [self.forced_locale] 

553 elif self.client_locale is not None: 

554 self._active_locale = [self.client_locale] 

555 else: 

556 self._active_locale = None 

557 

558 def file_cache_for( 

559 self, 

560 uri: str, 

561 path: str, 

562 cache_type: Type[T], 

563 initializer: Callable[[str, str], T], 

564 ) -> T: 

565 inner_cache = self._file_state_caches.get(path) 

566 result = inner_cache.get(cache_type) if inner_cache else None 

567 if result is None: 

568 result = initializer(uri, path) 

569 if uri not in self.workspace.text_documents: 

570 # We do not get a proper notification on when to discard the cache. 

571 # For now, we simply do not cache them at all to avoid "leaking by infinite cache". 

572 # 

573 # Note that even if we did cache them, we would have to track whether the files have 

574 # changed (which we do not), so the cache itself would not be useful. 

575 return result 

576 assert isinstance(result, cache_type) 

577 if inner_cache is None: 577 ↛ 580line 577 didn't jump to line 580 because the condition on line 577 was always true

578 inner_cache = {} 

579 self._file_state_caches[path] = inner_cache 

580 inner_cache[cache_type] = result 

581 else: 

582 assert isinstance(result, cache_type) 

583 return result 

584 

585 def shutdown(self) -> None: 

586 for task in self.background_tasks: 

587 _info(f"Cancelling task: {task.get_name()}") 

588 self.loop.call_soon_threadsafe(task.cancel) 

589 return super().shutdown() 

590 

591 def translation(self, domain: str) -> Translations: 

592 return l10n.translation( 

593 domain, 

594 languages=self._active_locale, 

595 ) 

596 

597 async def _load_apt_cache(self) -> None: 

598 if self.apt_cache.state in ("loading", "loaded"): 

599 _info( 

600 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering" 

601 ) 

602 return 

603 _info("Starting load of apt cache data") 

604 start = time.time() 

605 try: 

606 await self.apt_cache.load() 

607 except ValueError as ex: 

608 _warn(f"Could not load apt cache: {ex}") 

609 else: 

610 end = time.time() 

611 _info( 

612 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}" 

613 ) 

614 

615 @property 

616 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

617 res = self._plugin_feature_set 

618 if res is None: 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true

619 raise RuntimeError( 

620 "Initialization error: The plugin feature set has not been initialized before it was needed." 

621 ) 

622 return res 

623 

624 @plugin_feature_set.setter 

625 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None: 

626 if self._plugin_feature_set is not None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true

627 raise RuntimeError( 

628 "The plugin_feature_set attribute cannot be changed once set" 

629 ) 

630 self._plugin_feature_set = plugin_feature_set 

631 

632 @property 

633 def dctrl_parser(self) -> DctrlParser: 

634 res = self._dctrl_parser 

635 if res is None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true

636 raise RuntimeError( 

637 "Initialization error: The dctrl_parser has not been initialized before it was needed." 

638 ) 

639 return res 

640 

641 @dctrl_parser.setter 

642 def dctrl_parser(self, parser: DctrlParser) -> None: 

643 if self._dctrl_parser is not None: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true

644 raise RuntimeError("The dctrl_parser attribute cannot be changed once set") 

645 self._dctrl_parser = parser 

646 

647 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState: 

648 dir_path = os.path.dirname(doc.path) 

649 

650 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 dir_path = os.path.dirname(dir_path) 

652 

653 source_root = os.path.dirname(dir_path) 

654 

655 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser) 

656 

657 @property 

658 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]: 

659 try: 

660 return ( 

661 self.client_capabilities.text_document.hover.content_format 

662 ) # type: ignore 

663 except AttributeError: 

664 return None 

665 

666 def hover_markup_format( 

667 self, 

668 *options: types.MarkupKind, 

669 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

670 ) -> types.MarkupKind: 

671 """Pick the client preferred hover markup format from a set of options 

672 

673 :param options: The markup kinds possible. 

674 :param fallback_kind: If no overlapping option was found in the client preferences 

675 (or client did not announce a value at all), this parameter is returned instead. 

676 :returns: The client's preferred markup format from the provided options, or, 

677 (if there is no overlap), the `fallback_kind` value is returned. 

678 """ 

679 client_preference = self._client_hover_markup_formats 

680 return _preference(client_preference, frozenset(options), fallback_kind) 

681 

682 @property 

683 def _client_completion_item_document_markup_formats( 

684 self, 

685 ) -> Optional[List[types.MarkupKind]]: 

686 try: 

687 return ( 

688 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore 

689 ) 

690 except AttributeError: 

691 return None 

692 

693 @property 

694 def _supports_ws_document_changes(self) -> bool: 

695 try: 

696 return ( 

697 self.client_capabilities.workspace.workspace_edit.document_changes # type : ignore 

698 ) 

699 except AttributeError: 

700 return False 

701 

702 @property 

703 def _supported_resource_operation_edit_kinds( 

704 self, 

705 ) -> Sequence[types.ResourceOperationKind]: 

706 try: 

707 return ( 

708 self.client_capabilities.workspace.workspace_edit.resource_operations # type : ignore 

709 ) 

710 except AttributeError: 

711 return [] 

712 

713 def completion_item_document_markup( 

714 self, 

715 *options: types.MarkupKind, 

716 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

717 ) -> types.MarkupKind: 

718 """Pick the client preferred completion item documentation markup format from a set of options 

719 

720 :param options: The markup kinds possible. 

721 :param fallback_kind: If no overlapping option was found in the client preferences 

722 (or client did not announce a value at all), this parameter is returned instead. 

723 :returns: The client's preferred markup format from the provided options, or, 

724 (if there is no overlap), the `fallback_kind` value is returned. 

725 """ 

726 

727 client_preference = self._client_completion_item_document_markup_formats 

728 return _preference(client_preference, frozenset(options), fallback_kind) 

729 

730 @property 

731 def trust_language_ids(self) -> bool: 

732 v = self._trust_language_ids 

733 if v is None: 

734 return True 

735 return v 

736 

737 @trust_language_ids.setter 

738 def trust_language_ids(self, new_value: bool) -> None: 

739 self._trust_language_ids = new_value 

740 

741 def determine_language_id( 

742 self, 

743 doc: "TextDocument", 

744 ) -> Tuple[Literal["editor-provided", "filename"], str, str]: 

745 lang_id = doc.language_id 

746 path = doc.path 

747 try: 

748 last_idx = path.rindex("debian/") 

749 except ValueError: 

750 cleaned_filename = os.path.basename(path) 

751 else: 

752 cleaned_filename = path[last_idx:] 

753 

754 if self.trust_language_ids and lang_id and not lang_id.isspace(): 

755 if lang_id not in ("fundamental",): 

756 return "editor-provided", lang_id, cleaned_filename 

757 _info( 

758 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)" 

759 ) 

760 

761 return "filename", cleaned_filename, cleaned_filename 

762 

763 def close_document(self, uri: str) -> None: 

764 path = to_fs_path(uri) 

765 with suppress(KeyError): 

766 del self._diagnostic_reports[uri] 

767 with suppress(KeyError): 

768 del self._file_state_caches[path] 

769 

770 async def slow_iter( 

771 self, 

772 iterable: Iterable[T], 

773 *, 

774 yield_every: int = 100, 

775 ) -> AsyncIterable[T]: 

776 counter = 0 

777 for value in iterable: 

778 counter += 1 

779 if counter >= yield_every: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true

780 await asyncio.sleep(0) 

781 counter = 0 

782 yield value 

783 if counter: 783 ↛ exitline 783 didn't return from function 'slow_iter' because the condition on line 783 was always true

784 await asyncio.sleep(0) 

785 

786 def record_diagnostics( 

787 self, 

788 doc_uri: str, 

789 version: int, 

790 diagnostics: List[types.Diagnostic], 

791 is_partial: bool, 

792 ) -> None: 

793 self._diagnostic_reports[doc_uri] = DiagnosticReport( 

794 doc_uri, 

795 version, 

796 f"{version}@{doc_uri}", 

797 is_partial, 

798 diagnostics, 

799 ) 

800 self.publish_diagnostics(doc_uri, diagnostics, version) 

801 

802 def diagnostics_in_range( 

803 self, 

804 uri: str, 

805 text_range: types.Range, 

806 ) -> Optional[List[types.Diagnostic]]: 

807 report = self._diagnostic_reports.get(uri) 

808 if report is None or report.is_in_progress: 

809 return None 

810 doc = self.workspace.get_text_document(uri) 

811 if doc.version is not None and doc.version != report.doc_version: 

812 return None 

813 

814 return report.diagnostics_in_range(text_range)