Coverage for src/debputy/lsp/debputy_ls.py: 51%
460 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import asyncio
2import collections
3import dataclasses
4import os
5import time
6from collections.abc import AsyncIterable
7from contextlib import suppress
8from typing import (
9 Optional,
10 List,
11 Any,
12 Mapping,
13 Container,
14 TYPE_CHECKING,
15 Tuple,
16 Literal,
17 Set,
18 Dict,
19 Iterable,
20 Sequence,
21 Callable,
22 Type,
23)
25import debputy.l10n as l10n
26from debputy.commands.debputy_cmd.output import (
27 OutputStyle,
28 MarkdownOutputStyle,
29)
30from debputy.dh.dh_assistant import (
31 parse_drules_for_addons,
32 DhSequencerData,
33 extract_dh_addons_from_control,
34)
35from debputy.filesystem_scan import FSROOverlay, VirtualPathBase
36from debputy.l10n import Translations
37from debputy.linting.lint_util import (
38 LintState,
39 AsyncLinterImpl,
40 AbortTaskError,
41 WorkspaceTextEditSupport,
42)
43from debputy.lsp.apt_cache import AptCache
44from debputy.lsp.config.debputy_config import load_debputy_config, DebputyConfig
45from debputy.lsp.diagnostics import DiagnosticReport
46from debputy.lsp.maint_prefs import (
47 MaintainerPreferenceTable,
48 determine_effective_preference,
49 EffectiveFormattingPreference,
50)
51from debputy.lsp.text_util import LintCapablePositionCodec
52from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
53from debputy.packages import (
54 SourcePackage,
55 BinaryPackage,
56 DctrlParser,
57)
58from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
59from debputy.util import _info, _warn, T
60from debputy.yaml import MANIFEST_YAML, YAMLError
61from debputy.yaml.compat import CommentedMap
63if TYPE_CHECKING:
64 import lsprotocol.types as types
65 from pygls.server import LanguageServer
66 from pygls.workspace import TextDocument
67 from pygls.uris import from_fs_path, to_fs_path
69else:
70 import debputy.lsprotocol.types as types
72 try:
73 from pygls.server import LanguageServer
74 from pygls.workspace import TextDocument
75 from pygls.uris import from_fs_path, to_fs_path
76 except ImportError as e:
78 class LanguageServer:
79 def __init__(self, *args, **kwargs) -> None:
80 """Placeholder to work if pygls is not installed"""
81 # Should not be called
82 global e
83 raise e # pragma: no cover
86@dataclasses.dataclass(slots=True)
87class FileCache:
88 doc_uri: str
89 path: str
90 is_open_in_editor: Optional[bool] = None
91 last_doc_version: Optional[int] = None
92 last_mtime: Optional[float] = None
93 is_valid: bool = False
95 def _update_cache(self, doc: "TextDocument", source: str) -> None:
96 raise NotImplementedError
98 def _clear_cache(self) -> None:
99 raise NotImplementedError
101 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool:
102 doc = ls.workspace.text_documents.get(self.doc_uri)
103 if doc is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 doc = ls.workspace.get_text_document(self.doc_uri)
105 is_open = False
106 else:
107 is_open = True
108 new_content: Optional[str] = None
109 if is_open: 109 ↛ 122line 109 didn't jump to line 122 because the condition on line 109 was always true
110 last_doc_version = self.last_doc_version
111 current_doc_version = doc.version
112 if ( 112 ↛ 120line 112 didn't jump to line 120 because the condition on line 112 was always true
113 not self.is_open_in_editor
114 or last_doc_version is None
115 or current_doc_version is None
116 or last_doc_version < current_doc_version
117 ):
118 new_content = doc.source
120 self.last_doc_version = doc.version
121 self.is_open_in_editor = True
122 elif doc.uri.startswith("file://"):
123 try:
124 with open(self.path) as fd:
125 st = os.fstat(fd.fileno())
126 current_mtime = st.st_mtime
127 last_mtime = self.last_mtime or current_mtime - 1
128 if self.is_open_in_editor or current_mtime > last_mtime:
129 new_content = fd.read()
130 self.last_mtime = current_mtime
131 except FileNotFoundError:
132 self._clear_cache()
133 self.is_valid = False
134 return False
135 self.is_open_in_editor = is_open
136 if new_content is not None: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true
137 self._update_cache(doc, new_content)
138 self.is_valid = True
139 return True
142@dataclasses.dataclass(slots=True)
143class Deb822FileCache(FileCache):
144 deb822_file: Optional[Deb822FileElement] = None
146 def _update_cache(self, doc: "TextDocument", source: str) -> None:
147 deb822_file = parse_deb822_file(
148 source.splitlines(keepends=True),
149 accept_files_with_error_tokens=True,
150 accept_files_with_duplicated_fields=True,
151 )
152 self.deb822_file = deb822_file
154 def _clear_cache(self) -> None:
155 self.deb822_file = None
158@dataclasses.dataclass(slots=True)
159class DctrlFileCache(Deb822FileCache):
160 dctrl_parser: Optional[DctrlParser] = None
161 source_package: Optional[SourcePackage] = None
162 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
164 def _update_cache(self, doc: "TextDocument", source: str) -> None:
165 deb822_file, source_package, binary_packages = (
166 self.dctrl_parser.parse_source_debian_control(
167 source.splitlines(keepends=True),
168 ignore_errors=True,
169 )
170 )
171 self.deb822_file = deb822_file
172 self.source_package = source_package
173 self.binary_packages = binary_packages
175 def _clear_cache(self) -> None:
176 super()._clear_cache()
177 self.source_package = None
178 self.binary_packages = None
181@dataclasses.dataclass(slots=True)
182class SalsaCICache(FileCache):
183 parsed_content: Optional[CommentedMap] = None
185 def _update_cache(self, doc: "TextDocument", source: str) -> None:
186 try:
187 value = MANIFEST_YAML.load(source)
188 if isinstance(value, CommentedMap):
189 self.parsed_content = value
190 except YAMLError:
191 pass
193 def _clear_cache(self) -> None:
194 self.parsed_content = None
197@dataclasses.dataclass(slots=True)
198class DebianRulesCache(FileCache):
199 sequences: Optional[Set[str]] = None
200 saw_dh: bool = False
202 def _update_cache(self, doc: "TextDocument", source: str) -> None:
203 sequences = set()
204 self.saw_dh = parse_drules_for_addons(
205 source.splitlines(),
206 sequences,
207 )
208 self.sequences = sequences
210 def _clear_cache(self) -> None:
211 self.sequences = None
212 self.saw_dh = False
215class LSProvidedLintState(LintState):
216 def __init__(
217 self,
218 ls: "DebputyLanguageServer",
219 doc: "TextDocument",
220 source_root: str,
221 debian_dir_path: str,
222 dctrl_parser: DctrlParser,
223 ) -> None:
224 self._ls = ls
225 self._doc = doc
226 # Cache lines (doc.lines re-splits everytime)
227 self._lines = doc.lines
228 self._source_root = FSROOverlay.create_root_dir(".", source_root)
229 debian_dir = self._source_root.get("debian")
230 if debian_dir is not None and not debian_dir.is_dir: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 debian_dir = None
232 self._debian_dir = debian_dir
233 self._diagnostics: Optional[List[types.Diagnostic]] = None
234 self._last_emitted_diagnostic_count = 0
235 dctrl_file = os.path.join(debian_dir_path, "control")
237 if dctrl_file != doc.path:
239 self._dctrl_cache: DctrlFileCache = ls.file_cache_for(
240 from_fs_path(dctrl_file),
241 dctrl_file,
242 DctrlFileCache,
243 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser),
244 )
245 self._deb822_file: Deb822FileCache = ls.file_cache_for(
246 doc.uri,
247 doc.path,
248 Deb822FileCache,
249 lambda uri, path: Deb822FileCache(uri, path),
250 )
251 else:
252 self._dctrl_cache = ls.file_cache_for(
253 doc.uri,
254 doc.path,
255 DctrlFileCache,
256 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser),
257 )
258 self._deb822_file = self._dctrl_cache
260 self._salsa_ci_caches = [
261 ls.file_cache_for(
262 from_fs_path(os.path.join(debian_dir_path, p)),
263 os.path.join(debian_dir_path, p),
264 SalsaCICache,
265 lambda uri, path: SalsaCICache(uri, path),
266 )
267 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml"))
268 ]
269 drules_path = os.path.join(debian_dir_path, "rules")
270 self._drules_cache = ls.file_cache_for(
271 from_fs_path(drules_path) if doc.path != drules_path else doc.uri,
272 drules_path,
273 DebianRulesCache,
274 lambda uri, path: DebianRulesCache(uri, path),
275 )
277 @property
278 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
279 return self._ls.plugin_feature_set
281 @property
282 def doc_uri(self) -> str:
283 return self._doc.uri
285 @property
286 def doc_version(self) -> Optional[int]:
287 return self._doc.version
289 @property
290 def source_root(self) -> Optional[VirtualPathBase]:
291 return self._source_root
293 @property
294 def debian_dir(self) -> Optional[VirtualPathBase]:
295 return self._debian_dir
297 @property
298 def path(self) -> str:
299 return self._doc.path
301 @property
302 def content(self) -> str:
303 return self._doc.source
305 @property
306 def lines(self) -> List[str]:
307 return self._lines
309 @property
310 def position_codec(self) -> LintCapablePositionCodec:
311 return self._doc.position_codec
313 def _resolve_dctrl(self) -> Optional[DctrlFileCache]:
314 dctrl_cache = self._dctrl_cache
315 dctrl_cache.resolve_cache(self._ls)
316 return dctrl_cache
318 @property
319 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
320 cache = self._deb822_file
321 cache.resolve_cache(self._ls)
322 return cache.deb822_file
324 @property
325 def source_package(self) -> Optional[SourcePackage]:
326 return self._resolve_dctrl().source_package
328 @property
329 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
330 return self._resolve_dctrl().binary_packages
332 def _resolve_salsa_ci(self) -> Optional[CommentedMap]:
333 for salsa_ci_cache in self._salsa_ci_caches:
334 if salsa_ci_cache.resolve_cache(self._ls):
335 return salsa_ci_cache.parsed_content
336 return None
338 @property
339 def effective_preference(self) -> Optional[EffectiveFormattingPreference]:
340 source_package = self.source_package
341 salsa_ci = self._resolve_salsa_ci()
342 if source_package is None and salsa_ci is None:
343 return None
344 style, _, _ = determine_effective_preference(
345 self.maint_preference_table,
346 source_package,
347 salsa_ci,
348 )
349 return style
351 @property
352 def maint_preference_table(self) -> MaintainerPreferenceTable:
353 return self._ls.maint_preferences
355 @property
356 def salsa_ci(self) -> Optional[CommentedMap]:
357 return None
359 @property
360 def dh_sequencer_data(self) -> DhSequencerData:
361 dh_sequences: Set[str] = set()
362 saw_dh = False
363 src_pkg = self.source_package
364 drules_cache = self._drules_cache
365 if drules_cache.resolve_cache(self._ls):
366 saw_dh = drules_cache.saw_dh
367 if drules_cache.sequences:
368 dh_sequences.update(drules_cache.sequences)
369 if src_pkg:
370 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
372 return DhSequencerData(
373 frozenset(dh_sequences),
374 saw_dh,
375 )
377 @property
378 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
379 return self._ls.workspace_text_edit_support
381 @property
382 def debputy_config(self) -> DebputyConfig:
383 return self._ls.debputy_config
385 def translation(self, domain: str) -> Translations:
386 return self._ls.translation(domain)
388 async def slow_iter(
389 self,
390 iterable: Iterable[T],
391 *,
392 yield_every: int = 100,
393 ) -> AsyncIterable[T]:
394 counter = 0
395 for value in iterable:
396 counter += 1
397 if counter >= yield_every:
398 await asyncio.sleep(0)
399 self._abort_on_outdated_doc_version()
400 counter = 0
401 yield value
402 if counter:
403 await asyncio.sleep(0)
404 self._abort_on_outdated_doc_version()
406 async def run_diagnostics(
407 self,
408 linter: AsyncLinterImpl,
409 ) -> List[types.Diagnostic]:
410 if self._diagnostics is not None:
411 raise RuntimeError(
412 "run_diagnostics cannot be run while it is already running"
413 )
414 self._diagnostics = diagnostics = []
416 await linter(self)
418 self._diagnostics = None
419 return diagnostics
421 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
422 diagnostics = self._diagnostics
423 if diagnostics is None:
424 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics")
426 diagnostics.append(diagnostic)
427 self._last_emitted_diagnostic_count += 1
428 if self._last_emitted_diagnostic_count >= 100:
429 self._abort_on_outdated_doc_version()
430 self._ls.record_diagnostics(
431 self.doc_uri,
432 self._doc.version,
433 diagnostics,
434 is_partial=True,
435 )
436 self._last_emitted_diagnostic_count = 0
438 def _abort_on_outdated_doc_version(self) -> None:
439 expected_version = self._doc.version
440 current_doc = self._ls.workspace.get_text_document(self.doc_uri)
441 if current_doc.version != expected_version:
442 raise AbortTaskError(
443 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed"
444 )
447def _preference(
448 client_preference: Optional[List[types.MarkupKind]],
449 options: Container[types.MarkupKind],
450 fallback_kind: types.MarkupKind,
451) -> types.MarkupKind:
452 if not client_preference: 452 ↛ 454line 452 didn't jump to line 454 because the condition on line 452 was always true
453 return fallback_kind
454 for markdown_kind in client_preference:
455 if markdown_kind in options:
456 return markdown_kind
457 return fallback_kind
460class DebputyLanguageServer(LanguageServer):
462 def __init__(
463 self,
464 *args: Any,
465 **kwargs: Any,
466 ) -> None:
467 super().__init__(*args, **kwargs)
468 self._dctrl_parser: Optional[DctrlParser] = None
469 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None
470 self._trust_language_ids: Optional[bool] = None
471 self._finished_initialization = False
472 self.maint_preferences = MaintainerPreferenceTable({}, {})
473 self.apt_cache = AptCache()
474 self.background_tasks = set()
475 self.client_locale: Optional[str] = None
476 self.forced_locale: Optional[str] = None
477 self._active_locale: Optional[List[str]] = None
478 self._diagnostic_reports: Dict[str, DiagnosticReport] = {}
479 self.workspace_text_edit_support = WorkspaceTextEditSupport()
480 self.debputy_config = load_debputy_config()
481 self._file_state_caches: Dict[str, Dict[Type[FileCache], FileCache]] = (
482 collections.defaultdict(dict)
483 )
484 self.hover_output_style = OutputStyle()
486 def finish_startup_initialization(self) -> None:
487 if self._finished_initialization:
488 return
490 assert self._dctrl_parser is not None
491 assert self._plugin_feature_set is not None
492 assert self._trust_language_ids is not None
493 self.maint_preferences = self.maint_preferences.load_preferences()
494 _info(
495 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded"
496 )
497 if (
498 self.hover_markup_format(
499 types.MarkupKind.Markdown, types.MarkupKind.PlainText
500 )
501 == types.MarkupKind.Markdown
502 ):
503 self.hover_output_style = MarkdownOutputStyle()
504 self._finished_initialization = True
506 async def on_initialize(self, params: types.InitializeParams) -> None:
507 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache")
508 self.background_tasks.add(task)
509 task.add_done_callback(self.background_tasks.discard)
510 self.client_locale = params.locale
511 if self.forced_locale is not None:
512 _info(
513 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]"
514 )
515 else:
516 _info(
517 f"Client locale: {self.client_locale}. Use --force-locale to override"
518 )
519 _info(f"Cwd: {os.getcwd()}")
520 self._update_locale()
521 self.workspace_text_edit_support = WorkspaceTextEditSupport(
522 supported_resource_operation_edit_kinds=self._supported_resource_operation_edit_kinds,
523 supports_document_changes=self._supports_ws_document_changes,
524 )
525 self._mask_ghost_requests()
527 def _mask_ghost_requests(self) -> None:
528 try:
529 if types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT not in self.lsp.fm.features:
531 # Work around `kate` bug (https://bugs.kde.org/show_bug.cgi?id=506664) for selected requests
532 # that are really annoying (that is, triggers all the time).
533 def _ghost_request(*_args: Any) -> None:
534 _info(
535 "Ignoring unsupported request from client that it should not have sent."
536 )
538 self.lsp.fm.features[types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT] = (
539 _ghost_request
540 )
541 except (AttributeError, TypeError, KeyError, ValueError) as e:
542 _info(
543 f"Could install ghost handler, continuing without. Error was: {str(e)}"
544 )
545 else:
546 _info(
547 f"Injecting fake {types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT} handler to work around bugs like KDE#506664"
548 )
550 def _update_locale(self) -> None:
551 if self.forced_locale is not None:
552 self._active_locale = [self.forced_locale]
553 elif self.client_locale is not None:
554 self._active_locale = [self.client_locale]
555 else:
556 self._active_locale = None
558 def file_cache_for(
559 self,
560 uri: str,
561 path: str,
562 cache_type: Type[T],
563 initializer: Callable[[str, str], T],
564 ) -> T:
565 inner_cache = self._file_state_caches.get(path)
566 result = inner_cache.get(cache_type) if inner_cache else None
567 if result is None:
568 result = initializer(uri, path)
569 if uri not in self.workspace.text_documents:
570 # We do not get a proper notification on when to discard the cache.
571 # For now, we simply do not cache them at all to avoid "leaking by infinite cache".
572 #
573 # Note that even if we did cache them, we would have to track whether the files have
574 # changed (which we do not), so the cache itself would not be useful.
575 return result
576 assert isinstance(result, cache_type)
577 if inner_cache is None: 577 ↛ 580line 577 didn't jump to line 580 because the condition on line 577 was always true
578 inner_cache = {}
579 self._file_state_caches[path] = inner_cache
580 inner_cache[cache_type] = result
581 else:
582 assert isinstance(result, cache_type)
583 return result
585 def shutdown(self) -> None:
586 for task in self.background_tasks:
587 _info(f"Cancelling task: {task.get_name()}")
588 self.loop.call_soon_threadsafe(task.cancel)
589 return super().shutdown()
591 def translation(self, domain: str) -> Translations:
592 return l10n.translation(
593 domain,
594 languages=self._active_locale,
595 )
597 async def _load_apt_cache(self) -> None:
598 if self.apt_cache.state in ("loading", "loaded"):
599 _info(
600 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering"
601 )
602 return
603 _info("Starting load of apt cache data")
604 start = time.time()
605 try:
606 await self.apt_cache.load()
607 except ValueError as ex:
608 _warn(f"Could not load apt cache: {ex}")
609 else:
610 end = time.time()
611 _info(
612 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}"
613 )
615 @property
616 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
617 res = self._plugin_feature_set
618 if res is None: 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true
619 raise RuntimeError(
620 "Initialization error: The plugin feature set has not been initialized before it was needed."
621 )
622 return res
624 @plugin_feature_set.setter
625 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None:
626 if self._plugin_feature_set is not None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true
627 raise RuntimeError(
628 "The plugin_feature_set attribute cannot be changed once set"
629 )
630 self._plugin_feature_set = plugin_feature_set
632 @property
633 def dctrl_parser(self) -> DctrlParser:
634 res = self._dctrl_parser
635 if res is None: 635 ↛ 636line 635 didn't jump to line 636 because the condition on line 635 was never true
636 raise RuntimeError(
637 "Initialization error: The dctrl_parser has not been initialized before it was needed."
638 )
639 return res
641 @dctrl_parser.setter
642 def dctrl_parser(self, parser: DctrlParser) -> None:
643 if self._dctrl_parser is not None: 643 ↛ 644line 643 didn't jump to line 644 because the condition on line 643 was never true
644 raise RuntimeError("The dctrl_parser attribute cannot be changed once set")
645 self._dctrl_parser = parser
647 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState:
648 dir_path = os.path.dirname(doc.path)
650 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 dir_path = os.path.dirname(dir_path)
653 source_root = os.path.dirname(dir_path)
655 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser)
657 @property
658 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]:
659 try:
660 return (
661 self.client_capabilities.text_document.hover.content_format
662 ) # type: ignore
663 except AttributeError:
664 return None
666 def hover_markup_format(
667 self,
668 *options: types.MarkupKind,
669 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
670 ) -> types.MarkupKind:
671 """Pick the client preferred hover markup format from a set of options
673 :param options: The markup kinds possible.
674 :param fallback_kind: If no overlapping option was found in the client preferences
675 (or client did not announce a value at all), this parameter is returned instead.
676 :returns: The client's preferred markup format from the provided options, or,
677 (if there is no overlap), the `fallback_kind` value is returned.
678 """
679 client_preference = self._client_hover_markup_formats
680 return _preference(client_preference, frozenset(options), fallback_kind)
682 @property
683 def _client_completion_item_document_markup_formats(
684 self,
685 ) -> Optional[List[types.MarkupKind]]:
686 try:
687 return (
688 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore
689 )
690 except AttributeError:
691 return None
693 @property
694 def _supports_ws_document_changes(self) -> bool:
695 try:
696 return (
697 self.client_capabilities.workspace.workspace_edit.document_changes # type : ignore
698 )
699 except AttributeError:
700 return False
702 @property
703 def _supported_resource_operation_edit_kinds(
704 self,
705 ) -> Sequence[types.ResourceOperationKind]:
706 try:
707 return (
708 self.client_capabilities.workspace.workspace_edit.resource_operations # type : ignore
709 )
710 except AttributeError:
711 return []
713 def completion_item_document_markup(
714 self,
715 *options: types.MarkupKind,
716 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
717 ) -> types.MarkupKind:
718 """Pick the client preferred completion item documentation markup format from a set of options
720 :param options: The markup kinds possible.
721 :param fallback_kind: If no overlapping option was found in the client preferences
722 (or client did not announce a value at all), this parameter is returned instead.
723 :returns: The client's preferred markup format from the provided options, or,
724 (if there is no overlap), the `fallback_kind` value is returned.
725 """
727 client_preference = self._client_completion_item_document_markup_formats
728 return _preference(client_preference, frozenset(options), fallback_kind)
730 @property
731 def trust_language_ids(self) -> bool:
732 v = self._trust_language_ids
733 if v is None:
734 return True
735 return v
737 @trust_language_ids.setter
738 def trust_language_ids(self, new_value: bool) -> None:
739 self._trust_language_ids = new_value
741 def determine_language_id(
742 self,
743 doc: "TextDocument",
744 ) -> Tuple[Literal["editor-provided", "filename"], str, str]:
745 lang_id = doc.language_id
746 path = doc.path
747 try:
748 last_idx = path.rindex("debian/")
749 except ValueError:
750 cleaned_filename = os.path.basename(path)
751 else:
752 cleaned_filename = path[last_idx:]
754 if self.trust_language_ids and lang_id and not lang_id.isspace():
755 if lang_id not in ("fundamental",):
756 return "editor-provided", lang_id, cleaned_filename
757 _info(
758 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)"
759 )
761 return "filename", cleaned_filename, cleaned_filename
763 def close_document(self, uri: str) -> None:
764 path = to_fs_path(uri)
765 with suppress(KeyError):
766 del self._diagnostic_reports[uri]
767 with suppress(KeyError):
768 del self._file_state_caches[path]
770 async def slow_iter(
771 self,
772 iterable: Iterable[T],
773 *,
774 yield_every: int = 100,
775 ) -> AsyncIterable[T]:
776 counter = 0
777 for value in iterable:
778 counter += 1
779 if counter >= yield_every: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true
780 await asyncio.sleep(0)
781 counter = 0
782 yield value
783 if counter: 783 ↛ exitline 783 didn't return from function 'slow_iter' because the condition on line 783 was always true
784 await asyncio.sleep(0)
786 def record_diagnostics(
787 self,
788 doc_uri: str,
789 version: int,
790 diagnostics: List[types.Diagnostic],
791 is_partial: bool,
792 ) -> None:
793 self._diagnostic_reports[doc_uri] = DiagnosticReport(
794 doc_uri,
795 version,
796 f"{version}@{doc_uri}",
797 is_partial,
798 diagnostics,
799 )
800 self.publish_diagnostics(doc_uri, diagnostics, version)
802 def diagnostics_in_range(
803 self,
804 uri: str,
805 text_range: types.Range,
806 ) -> Optional[List[types.Diagnostic]]:
807 report = self._diagnostic_reports.get(uri)
808 if report is None or report.is_in_progress:
809 return None
810 doc = self.workspace.get_text_document(uri)
811 if doc.version is not None and doc.version != report.doc_version:
812 return None
814 return report.diagnostics_in_range(text_range)