Coverage for src/debputy/lsp/debputy_ls.py: 51%

461 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import asyncio 

2import collections 

3import dataclasses 

4import os 

5import time 

6from collections.abc import AsyncIterable 

7from contextlib import suppress 

8from typing import ( 

9 Optional, 

10 List, 

11 Any, 

12 TYPE_CHECKING, 

13 Tuple, 

14 Literal, 

15 Set, 

16 Dict, 

17 Type, 

18) 

19from collections.abc import Mapping, Container, Iterable, Sequence, Callable 

20 

21import debputy.l10n as l10n 

22from debputy.commands.debputy_cmd.output import ( 

23 OutputStyle, 

24 MarkdownOutputStyle, 

25) 

26from debputy.dh.dh_assistant import ( 

27 parse_drules_for_addons, 

28 DhSequencerData, 

29 extract_dh_addons_from_control, 

30) 

31from debputy.filesystem_scan import FSROOverlay, VirtualPathBase 

32from debputy.l10n import Translations 

33from debputy.linting.lint_util import ( 

34 LintState, 

35 AsyncLinterImpl, 

36 AbortTaskError, 

37 WorkspaceTextEditSupport, 

38) 

39from debputy.lsp.apt_cache import AptCache 

40from debputy.lsp.config.debputy_config import load_debputy_config, DebputyConfig 

41from debputy.lsp.diagnostics import DiagnosticReport 

42from debputy.lsp.maint_prefs import ( 

43 MaintainerPreferenceTable, 

44 determine_effective_preference, 

45 EffectiveFormattingPreference, 

46) 

47from debputy.lsp.text_util import LintCapablePositionCodec 

48from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

49from debputy.packages import ( 

50 SourcePackage, 

51 BinaryPackage, 

52 DctrlParser, 

53) 

54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

55from debputy.util import _info, _warn, T 

56from debputy.yaml import MANIFEST_YAML, YAMLError 

57from debputy.yaml.compat import CommentedMap 

58 

59if TYPE_CHECKING: 

60 import lsprotocol.types as types 

61 from pygls.server import LanguageServer 

62 from pygls.workspace import TextDocument 

63 from pygls.uris import from_fs_path, to_fs_path 

64 

65else: 

66 import debputy.lsprotocol.types as types 

67 

68 try: 

69 from pygls.server import LanguageServer 

70 from pygls.workspace import TextDocument 

71 from pygls.uris import from_fs_path, to_fs_path 

72 except ImportError as e: 

73 

74 class LanguageServer: 

75 def __init__(self, *args, **kwargs) -> None: 

76 """Placeholder to work if pygls is not installed""" 

77 # Should not be called 

78 global e 

79 raise e # pragma: no cover 

80 

81 

82@dataclasses.dataclass(slots=True) 

83class FileCache: 

84 doc_uri: str 

85 path: str 

86 is_open_in_editor: bool | None = None 

87 last_doc_version: int | None = None 

88 last_mtime: float | None = None 

89 is_valid: bool = False 

90 

91 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

92 raise NotImplementedError 

93 

94 def _clear_cache(self) -> None: 

95 raise NotImplementedError 

96 

97 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool: 

98 doc = ls.workspace.text_documents.get(self.doc_uri) 

99 if doc is None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 doc = ls.workspace.get_text_document(self.doc_uri) 

101 is_open = False 

102 else: 

103 is_open = True 

104 new_content: str | None = None 

105 if is_open: 105 ↛ 118line 105 didn't jump to line 118 because the condition on line 105 was always true

106 last_doc_version = self.last_doc_version 

107 current_doc_version = doc.version 

108 if ( 108 ↛ 116line 108 didn't jump to line 116 because the condition on line 108 was always true

109 not self.is_open_in_editor 

110 or last_doc_version is None 

111 or current_doc_version is None 

112 or last_doc_version < current_doc_version 

113 ): 

114 new_content = doc.source 

115 

116 self.last_doc_version = doc.version 

117 self.is_open_in_editor = True 

118 elif doc.uri.startswith("file://"): 

119 try: 

120 with open(self.path) as fd: 

121 st = os.fstat(fd.fileno()) 

122 current_mtime = st.st_mtime 

123 last_mtime = self.last_mtime or current_mtime - 1 

124 if self.is_open_in_editor or current_mtime > last_mtime: 

125 new_content = fd.read() 

126 self.last_mtime = current_mtime 

127 except FileNotFoundError: 

128 self._clear_cache() 

129 self.is_valid = False 

130 return False 

131 self.is_open_in_editor = is_open 

132 if new_content is not None: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 self._update_cache(doc, new_content) 

134 self.is_valid = True 

135 return True 

136 

137 

138@dataclasses.dataclass(slots=True) 

139class Deb822FileCache(FileCache): 

140 deb822_file: Deb822FileElement | None = None 

141 

142 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

143 deb822_file = parse_deb822_file( 

144 source.splitlines(keepends=True), 

145 accept_files_with_error_tokens=True, 

146 accept_files_with_duplicated_fields=True, 

147 ) 

148 self.deb822_file = deb822_file 

149 

150 def _clear_cache(self) -> None: 

151 self.deb822_file = None 

152 

153 

154@dataclasses.dataclass(slots=True) 

155class DctrlFileCache(Deb822FileCache): 

156 dctrl_parser: DctrlParser | None = None 

157 source_package: SourcePackage | None = None 

158 binary_packages: Mapping[str, BinaryPackage] | None = None 

159 

160 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

161 deb822_file, source_package, binary_packages = ( 

162 self.dctrl_parser.parse_source_debian_control( 

163 source.splitlines(keepends=True), 

164 ignore_errors=True, 

165 ) 

166 ) 

167 self.deb822_file = deb822_file 

168 self.source_package = source_package 

169 self.binary_packages = binary_packages 

170 

171 def _clear_cache(self) -> None: 

172 super()._clear_cache() 

173 self.source_package = None 

174 self.binary_packages = None 

175 

176 

177@dataclasses.dataclass(slots=True) 

178class SalsaCICache(FileCache): 

179 parsed_content: CommentedMap | None = None 

180 

181 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

182 try: 

183 value = MANIFEST_YAML.load(source) 

184 if isinstance(value, CommentedMap): 

185 self.parsed_content = value 

186 except YAMLError: 

187 pass 

188 

189 def _clear_cache(self) -> None: 

190 self.parsed_content = None 

191 

192 

193@dataclasses.dataclass(slots=True) 

194class DebianRulesCache(FileCache): 

195 sequences: set[str] | None = None 

196 saw_dh: bool = False 

197 

198 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

199 sequences = set() 

200 self.saw_dh = parse_drules_for_addons( 

201 source.splitlines(), 

202 sequences, 

203 ) 

204 self.sequences = sequences 

205 

206 def _clear_cache(self) -> None: 

207 self.sequences = None 

208 self.saw_dh = False 

209 

210 

211class LSProvidedLintState(LintState): 

212 def __init__( 

213 self, 

214 ls: "DebputyLanguageServer", 

215 doc: "TextDocument", 

216 source_root: str, 

217 debian_dir_path: str, 

218 dctrl_parser: DctrlParser, 

219 ) -> None: 

220 self._ls = ls 

221 self._doc = doc 

222 # Cache lines (doc.lines re-splits everytime) 

223 self._lines = doc.lines 

224 self._source_root = FSROOverlay.create_root_dir(".", source_root) 

225 debian_dir = self._source_root.get("debian") 

226 if debian_dir is not None and not debian_dir.is_dir: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 debian_dir = None 

228 self._debian_dir = debian_dir 

229 self._diagnostics: list[types.Diagnostic] | None = None 

230 self._last_emitted_diagnostic_count = 0 

231 dctrl_file = os.path.join(debian_dir_path, "control") 

232 

233 if dctrl_file != doc.path: 

234 

235 self._dctrl_cache: DctrlFileCache = ls.file_cache_for( 

236 from_fs_path(dctrl_file), 

237 dctrl_file, 

238 DctrlFileCache, 

239 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser), 

240 ) 

241 self._deb822_file: Deb822FileCache = ls.file_cache_for( 

242 doc.uri, 

243 doc.path, 

244 Deb822FileCache, 

245 lambda uri, path: Deb822FileCache(uri, path), 

246 ) 

247 else: 

248 self._dctrl_cache = ls.file_cache_for( 

249 doc.uri, 

250 doc.path, 

251 DctrlFileCache, 

252 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser), 

253 ) 

254 self._deb822_file = self._dctrl_cache 

255 

256 self._salsa_ci_caches = [ 

257 ls.file_cache_for( 

258 from_fs_path(os.path.join(debian_dir_path, p)), 

259 os.path.join(debian_dir_path, p), 

260 SalsaCICache, 

261 lambda uri, path: SalsaCICache(uri, path), 

262 ) 

263 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml")) 

264 ] 

265 drules_path = os.path.join(debian_dir_path, "rules") 

266 self._drules_cache = ls.file_cache_for( 

267 from_fs_path(drules_path) if doc.path != drules_path else doc.uri, 

268 drules_path, 

269 DebianRulesCache, 

270 lambda uri, path: DebianRulesCache(uri, path), 

271 ) 

272 

273 @property 

274 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

275 return self._ls.plugin_feature_set 

276 

277 @property 

278 def doc_uri(self) -> str: 

279 return self._doc.uri 

280 

281 @property 

282 def doc_version(self) -> int | None: 

283 return self._doc.version 

284 

285 @property 

286 def source_root(self) -> VirtualPathBase | None: 

287 return self._source_root 

288 

289 @property 

290 def debian_dir(self) -> VirtualPathBase | None: 

291 return self._debian_dir 

292 

293 @property 

294 def path(self) -> str: 

295 return self._doc.path 

296 

297 @property 

298 def content(self) -> str: 

299 return self._doc.source 

300 

301 @property 

302 def lines(self) -> list[str]: 

303 return self._lines 

304 

305 @property 

306 def position_codec(self) -> LintCapablePositionCodec: 

307 return self._doc.position_codec 

308 

309 def _resolve_dctrl(self) -> DctrlFileCache | None: 

310 dctrl_cache = self._dctrl_cache 

311 dctrl_cache.resolve_cache(self._ls) 

312 return dctrl_cache 

313 

314 @property 

315 def parsed_deb822_file_content(self) -> Deb822FileElement | None: 

316 cache = self._deb822_file 

317 cache.resolve_cache(self._ls) 

318 return cache.deb822_file 

319 

320 @property 

321 def source_package(self) -> SourcePackage | None: 

322 return self._resolve_dctrl().source_package 

323 

324 @property 

325 def binary_packages(self) -> Mapping[str, BinaryPackage] | None: 

326 return self._resolve_dctrl().binary_packages 

327 

328 def _resolve_salsa_ci(self) -> CommentedMap | None: 

329 for salsa_ci_cache in self._salsa_ci_caches: 

330 if salsa_ci_cache.resolve_cache(self._ls): 

331 return salsa_ci_cache.parsed_content 

332 return None 

333 

334 @property 

335 def effective_preference(self) -> EffectiveFormattingPreference | None: 

336 source_package = self.source_package 

337 salsa_ci = self._resolve_salsa_ci() 

338 if source_package is None and salsa_ci is None: 

339 return None 

340 style, _, _ = determine_effective_preference( 

341 self.maint_preference_table, 

342 source_package, 

343 salsa_ci, 

344 ) 

345 return style 

346 

347 @property 

348 def maint_preference_table(self) -> MaintainerPreferenceTable: 

349 return self._ls.maint_preferences 

350 

351 @property 

352 def salsa_ci(self) -> CommentedMap | None: 

353 return None 

354 

355 @property 

356 def dh_sequencer_data(self) -> DhSequencerData: 

357 dh_sequences: set[str] = set() 

358 saw_dh = False 

359 src_pkg = self.source_package 

360 drules_cache = self._drules_cache 

361 if drules_cache.resolve_cache(self._ls): 

362 saw_dh = drules_cache.saw_dh 

363 if drules_cache.sequences: 

364 dh_sequences.update(drules_cache.sequences) 

365 if src_pkg: 

366 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

367 

368 return DhSequencerData( 

369 frozenset(dh_sequences), 

370 saw_dh, 

371 ) 

372 

373 @property 

374 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport: 

375 return self._ls.workspace_text_edit_support 

376 

377 @property 

378 def debputy_config(self) -> DebputyConfig: 

379 return self._ls.debputy_config 

380 

381 def translation(self, domain: str) -> Translations: 

382 return self._ls.translation(domain) 

383 

384 async def slow_iter( 

385 self, 

386 iterable: Iterable[T], 

387 *, 

388 yield_every: int = 100, 

389 ) -> AsyncIterable[T]: 

390 counter = 0 

391 for value in iterable: 

392 counter += 1 

393 if counter >= yield_every: 

394 await asyncio.sleep(0) 

395 self._abort_on_outdated_doc_version() 

396 counter = 0 

397 yield value 

398 if counter: 

399 await asyncio.sleep(0) 

400 self._abort_on_outdated_doc_version() 

401 

402 async def run_diagnostics( 

403 self, 

404 linter: AsyncLinterImpl, 

405 ) -> list[types.Diagnostic]: 

406 if self._diagnostics is not None: 

407 raise RuntimeError( 

408 "run_diagnostics cannot be run while it is already running" 

409 ) 

410 self._diagnostics = diagnostics = [] 

411 

412 await linter(self) 

413 

414 self._diagnostics = None 

415 return diagnostics 

416 

417 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

418 diagnostics = self._diagnostics 

419 if diagnostics is None: 

420 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics") 

421 

422 diagnostics.append(diagnostic) 

423 self._last_emitted_diagnostic_count += 1 

424 if self._last_emitted_diagnostic_count >= 100: 

425 self._abort_on_outdated_doc_version() 

426 self._ls.record_diagnostics( 

427 self.doc_uri, 

428 self._doc.version, 

429 diagnostics, 

430 is_partial=True, 

431 ) 

432 self._last_emitted_diagnostic_count = 0 

433 

434 def _abort_on_outdated_doc_version(self) -> None: 

435 expected_version = self._doc.version 

436 current_doc = self._ls.workspace.get_text_document(self.doc_uri) 

437 if current_doc.version != expected_version: 

438 raise AbortTaskError( 

439 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed" 

440 ) 

441 

442 

443def _preference( 

444 client_preference: list[types.MarkupKind] | None, 

445 options: Container[types.MarkupKind], 

446 fallback_kind: types.MarkupKind, 

447) -> types.MarkupKind: 

448 if not client_preference: 448 ↛ 450line 448 didn't jump to line 450 because the condition on line 448 was always true

449 return fallback_kind 

450 for markdown_kind in client_preference: 

451 if markdown_kind in options: 

452 return markdown_kind 

453 return fallback_kind 

454 

455 

456class DebputyLanguageServer(LanguageServer): 

457 

458 def __init__( 

459 self, 

460 *args: Any, 

461 **kwargs: Any, 

462 ) -> None: 

463 super().__init__(*args, **kwargs) 

464 self._dctrl_parser: DctrlParser | None = None 

465 self._plugin_feature_set: PluginProvidedFeatureSet | None = None 

466 self._trust_language_ids: bool | None = None 

467 self._finished_initialization = False 

468 self.maint_preferences = MaintainerPreferenceTable({}, {}) 

469 self.apt_cache = AptCache() 

470 self.background_tasks = set() 

471 self.client_locale: str | None = None 

472 self.forced_locale: str | None = None 

473 self._active_locale: list[str] | None = None 

474 self._diagnostic_reports: dict[str, DiagnosticReport] = {} 

475 self.workspace_text_edit_support = WorkspaceTextEditSupport() 

476 self.debputy_config = load_debputy_config() 

477 self._file_state_caches: dict[str, dict[type[FileCache], FileCache]] = ( 

478 collections.defaultdict(dict) 

479 ) 

480 self.hover_output_style = OutputStyle() 

481 

482 def finish_startup_initialization(self) -> None: 

483 if self._finished_initialization: 

484 return 

485 

486 assert self._dctrl_parser is not None 

487 assert self._plugin_feature_set is not None 

488 assert self._trust_language_ids is not None 

489 self.maint_preferences = self.maint_preferences.load_preferences() 

490 _info( 

491 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded" 

492 ) 

493 if ( 

494 self.hover_markup_format( 

495 types.MarkupKind.Markdown, types.MarkupKind.PlainText 

496 ) 

497 == types.MarkupKind.Markdown 

498 ): 

499 self.hover_output_style = MarkdownOutputStyle() 

500 self._finished_initialization = True 

501 

502 async def on_initialize(self, params: types.InitializeParams) -> None: 

503 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache") 

504 self.background_tasks.add(task) 

505 task.add_done_callback(self.background_tasks.discard) 

506 self.client_locale = params.locale 

507 if self.forced_locale is not None: 

508 _info( 

509 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]" 

510 ) 

511 else: 

512 _info( 

513 f"Client locale: {self.client_locale}. Use --force-locale to override" 

514 ) 

515 _info(f"Cwd: {os.getcwd()}") 

516 self._update_locale() 

517 self.workspace_text_edit_support = WorkspaceTextEditSupport( 

518 supported_resource_operation_edit_kinds=self._supported_resource_operation_edit_kinds, 

519 supports_document_changes=self._supports_ws_document_changes, 

520 ) 

521 self._mask_ghost_requests() 

522 

523 def _mask_ghost_requests(self) -> None: 

524 try: 

525 if types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT not in self.lsp.fm.features: 

526 

527 # Work around `kate` bug (https://bugs.kde.org/show_bug.cgi?id=506664) for selected requests 

528 # that are really annoying (that is, triggers all the time). 

529 def _ghost_request(*_args: Any) -> None: 

530 _info( 

531 "Ignoring unsupported request from client that it should not have sent." 

532 ) 

533 

534 self.lsp.fm.features[types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT] = ( 

535 _ghost_request 

536 ) 

537 except (AttributeError, TypeError, KeyError, ValueError) as e: 

538 _info( 

539 f"Could install ghost handler, continuing without. Error was: {str(e)}" 

540 ) 

541 else: 

542 _info( 

543 f"Injecting fake {types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT} handler to work around bugs like KDE#506664" 

544 ) 

545 

546 def _update_locale(self) -> None: 

547 if self.forced_locale is not None: 

548 self._active_locale = [self.forced_locale] 

549 elif self.client_locale is not None: 

550 self._active_locale = [self.client_locale] 

551 else: 

552 self._active_locale = None 

553 

554 def file_cache_for( 

555 self, 

556 uri: str, 

557 path: str, 

558 cache_type: type[T], 

559 initializer: Callable[[str, str], T], 

560 ) -> T: 

561 inner_cache = self._file_state_caches.get(path) 

562 result = inner_cache.get(cache_type) if inner_cache else None 

563 if result is None: 

564 result = initializer(uri, path) 

565 if uri not in self.workspace.text_documents: 

566 # We do not get a proper notification on when to discard the cache. 

567 # For now, we simply do not cache them at all to avoid "leaking by infinite cache". 

568 # 

569 # Note that even if we did cache them, we would have to track whether the files have 

570 # changed (which we do not), so the cache itself would not be useful. 

571 return result 

572 assert isinstance(result, cache_type) 

573 if inner_cache is None: 573 ↛ 576line 573 didn't jump to line 576 because the condition on line 573 was always true

574 inner_cache = {} 

575 self._file_state_caches[path] = inner_cache 

576 inner_cache[cache_type] = result 

577 else: 

578 assert isinstance(result, cache_type) 

579 return result 

580 

581 def shutdown(self) -> None: 

582 for task in self.background_tasks: 

583 _info(f"Cancelling task: {task.get_name()}") 

584 self.loop.call_soon_threadsafe(task.cancel) 

585 return super().shutdown() 

586 

587 def translation(self, domain: str) -> Translations: 

588 return l10n.translation( 

589 domain, 

590 languages=self._active_locale, 

591 ) 

592 

593 async def _load_apt_cache(self) -> None: 

594 if self.apt_cache.state in ("loading", "loaded"): 

595 _info( 

596 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering" 

597 ) 

598 return 

599 _info("Starting load of apt cache data") 

600 start = time.time() 

601 try: 

602 await self.apt_cache.load() 

603 except ValueError as ex: 

604 _warn(f"Could not load apt cache: {ex}") 

605 else: 

606 end = time.time() 

607 _info( 

608 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}" 

609 ) 

610 

611 @property 

612 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

613 res = self._plugin_feature_set 

614 if res is None: 614 ↛ 615line 614 didn't jump to line 615 because the condition on line 614 was never true

615 raise RuntimeError( 

616 "Initialization error: The plugin feature set has not been initialized before it was needed." 

617 ) 

618 return res 

619 

620 @plugin_feature_set.setter 

621 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None: 

622 if self._plugin_feature_set is not None: 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true

623 raise RuntimeError( 

624 "The plugin_feature_set attribute cannot be changed once set" 

625 ) 

626 self._plugin_feature_set = plugin_feature_set 

627 

628 @property 

629 def dctrl_parser(self) -> DctrlParser: 

630 res = self._dctrl_parser 

631 if res is None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true

632 raise RuntimeError( 

633 "Initialization error: The dctrl_parser has not been initialized before it was needed." 

634 ) 

635 return res 

636 

637 @dctrl_parser.setter 

638 def dctrl_parser(self, parser: DctrlParser) -> None: 

639 if self._dctrl_parser is not None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 raise RuntimeError("The dctrl_parser attribute cannot be changed once set") 

641 self._dctrl_parser = parser 

642 

643 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState: 

644 dir_path = os.path.dirname(doc.path) 

645 

646 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 dir_path = os.path.dirname(dir_path) 

648 

649 source_root = os.path.dirname(dir_path) 

650 

651 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser) 

652 

653 @property 

654 def _client_hover_markup_formats(self) -> list[types.MarkupKind] | None: 

655 try: 

656 return ( 

657 self.client_capabilities.text_document.hover.content_format 

658 ) # type: ignore 

659 except AttributeError: 

660 return None 

661 

662 def hover_markup_format( 

663 self, 

664 *options: types.MarkupKind, 

665 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

666 ) -> types.MarkupKind: 

667 """Pick the client preferred hover markup format from a set of options 

668 

669 :param options: The markup kinds possible. 

670 :param fallback_kind: If no overlapping option was found in the client preferences 

671 (or client did not announce a value at all), this parameter is returned instead. 

672 :returns: The client's preferred markup format from the provided options, or, 

673 (if there is no overlap), the `fallback_kind` value is returned. 

674 """ 

675 client_preference = self._client_hover_markup_formats 

676 return _preference(client_preference, frozenset(options), fallback_kind) 

677 

678 @property 

679 def _client_completion_item_document_markup_formats( 

680 self, 

681 ) -> list[types.MarkupKind] | None: 

682 try: 

683 return ( 

684 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore 

685 ) 

686 except AttributeError: 

687 return None 

688 

689 @property 

690 def _supports_ws_document_changes(self) -> bool: 

691 try: 

692 return ( 

693 self.client_capabilities.workspace.workspace_edit.document_changes # type : ignore 

694 ) 

695 except AttributeError: 

696 return False 

697 

698 @property 

699 def _supported_resource_operation_edit_kinds( 

700 self, 

701 ) -> Sequence[types.ResourceOperationKind]: 

702 try: 

703 return ( 

704 self.client_capabilities.workspace.workspace_edit.resource_operations # type : ignore 

705 ) 

706 except AttributeError: 

707 return [] 

708 

709 def completion_item_document_markup( 

710 self, 

711 *options: types.MarkupKind, 

712 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

713 ) -> types.MarkupKind: 

714 """Pick the client preferred completion item documentation markup format from a set of options 

715 

716 :param options: The markup kinds possible. 

717 :param fallback_kind: If no overlapping option was found in the client preferences 

718 (or client did not announce a value at all), this parameter is returned instead. 

719 :returns: The client's preferred markup format from the provided options, or, 

720 (if there is no overlap), the `fallback_kind` value is returned. 

721 """ 

722 

723 client_preference = self._client_completion_item_document_markup_formats 

724 return _preference(client_preference, frozenset(options), fallback_kind) 

725 

726 @property 

727 def trust_language_ids(self) -> bool: 

728 v = self._trust_language_ids 

729 if v is None: 

730 return True 

731 return v 

732 

733 @trust_language_ids.setter 

734 def trust_language_ids(self, new_value: bool) -> None: 

735 self._trust_language_ids = new_value 

736 

737 def determine_language_id( 

738 self, 

739 doc: "TextDocument", 

740 ) -> tuple[Literal["editor-provided", "filename"], str, str]: 

741 lang_id = doc.language_id 

742 path = doc.path 

743 try: 

744 last_idx = path.rindex("debian/") 

745 except ValueError: 

746 cleaned_filename = os.path.basename(path) 

747 else: 

748 cleaned_filename = path[last_idx:] 

749 

750 if self.trust_language_ids and lang_id and not lang_id.isspace(): 

751 if lang_id not in ("fundamental",): 

752 return "editor-provided", lang_id, cleaned_filename 

753 _info( 

754 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)" 

755 ) 

756 

757 return "filename", cleaned_filename, cleaned_filename 

758 

759 def close_document(self, uri: str) -> None: 

760 path = to_fs_path(uri) 

761 with suppress(KeyError): 

762 del self._diagnostic_reports[uri] 

763 with suppress(KeyError): 

764 del self._file_state_caches[path] 

765 

766 async def slow_iter( 

767 self, 

768 iterable: Iterable[T], 

769 *, 

770 yield_every: int = 100, 

771 ) -> AsyncIterable[T]: 

772 counter = 0 

773 for value in iterable: 

774 counter += 1 

775 if counter >= yield_every: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 await asyncio.sleep(0) 

777 counter = 0 

778 yield value 

779 if counter: 779 ↛ exitline 779 didn't return from function 'slow_iter' because the condition on line 779 was always true

780 await asyncio.sleep(0) 

781 

782 def record_diagnostics( 

783 self, 

784 doc_uri: str, 

785 version: int, 

786 diagnostics: list[types.Diagnostic], 

787 is_partial: bool, 

788 ) -> None: 

789 self._diagnostic_reports[doc_uri] = DiagnosticReport( 

790 doc_uri, 

791 version, 

792 f"{version}@{doc_uri}", 

793 is_partial, 

794 diagnostics, 

795 ) 

796 self.publish_diagnostics(doc_uri, diagnostics, version) 

797 

798 def diagnostics_in_range( 

799 self, 

800 uri: str, 

801 text_range: types.Range, 

802 ) -> list[types.Diagnostic] | None: 

803 report = self._diagnostic_reports.get(uri) 

804 if report is None or report.is_in_progress: 

805 return None 

806 doc = self.workspace.get_text_document(uri) 

807 if doc.version is not None and doc.version != report.doc_version: 

808 return None 

809 

810 return report.diagnostics_in_range(text_range)