Coverage for src/debputy/lsp/debputy_ls.py: 51%
461 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import asyncio
2import collections
3import dataclasses
4import os
5import time
6from collections.abc import AsyncIterable
7from contextlib import suppress
8from typing import (
9 Optional,
10 List,
11 Any,
12 TYPE_CHECKING,
13 Tuple,
14 Literal,
15 Set,
16 Dict,
17 Type,
18)
19from collections.abc import Mapping, Container, Iterable, Sequence, Callable
21import debputy.l10n as l10n
22from debputy.commands.debputy_cmd.output import (
23 OutputStyle,
24 MarkdownOutputStyle,
25)
26from debputy.dh.dh_assistant import (
27 parse_drules_for_addons,
28 DhSequencerData,
29 extract_dh_addons_from_control,
30)
31from debputy.filesystem_scan import FSROOverlay, VirtualPathBase
32from debputy.l10n import Translations
33from debputy.linting.lint_util import (
34 LintState,
35 AsyncLinterImpl,
36 AbortTaskError,
37 WorkspaceTextEditSupport,
38)
39from debputy.lsp.apt_cache import AptCache
40from debputy.lsp.config.debputy_config import load_debputy_config, DebputyConfig
41from debputy.lsp.diagnostics import DiagnosticReport
42from debputy.lsp.maint_prefs import (
43 MaintainerPreferenceTable,
44 determine_effective_preference,
45 EffectiveFormattingPreference,
46)
47from debputy.lsp.text_util import LintCapablePositionCodec
48from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
49from debputy.packages import (
50 SourcePackage,
51 BinaryPackage,
52 DctrlParser,
53)
54from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
55from debputy.util import _info, _warn, T
56from debputy.yaml import MANIFEST_YAML, YAMLError
57from debputy.yaml.compat import CommentedMap
59if TYPE_CHECKING:
60 import lsprotocol.types as types
61 from pygls.server import LanguageServer
62 from pygls.workspace import TextDocument
63 from pygls.uris import from_fs_path, to_fs_path
65else:
66 import debputy.lsprotocol.types as types
68 try:
69 from pygls.server import LanguageServer
70 from pygls.workspace import TextDocument
71 from pygls.uris import from_fs_path, to_fs_path
72 except ImportError as e:
74 class LanguageServer:
75 def __init__(self, *args, **kwargs) -> None:
76 """Placeholder to work if pygls is not installed"""
77 # Should not be called
78 global e
79 raise e # pragma: no cover
82@dataclasses.dataclass(slots=True)
83class FileCache:
84 doc_uri: str
85 path: str
86 is_open_in_editor: bool | None = None
87 last_doc_version: int | None = None
88 last_mtime: float | None = None
89 is_valid: bool = False
91 def _update_cache(self, doc: "TextDocument", source: str) -> None:
92 raise NotImplementedError
94 def _clear_cache(self) -> None:
95 raise NotImplementedError
97 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool:
98 doc = ls.workspace.text_documents.get(self.doc_uri)
99 if doc is None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 doc = ls.workspace.get_text_document(self.doc_uri)
101 is_open = False
102 else:
103 is_open = True
104 new_content: str | None = None
105 if is_open: 105 ↛ 118line 105 didn't jump to line 118 because the condition on line 105 was always true
106 last_doc_version = self.last_doc_version
107 current_doc_version = doc.version
108 if ( 108 ↛ 116line 108 didn't jump to line 116 because the condition on line 108 was always true
109 not self.is_open_in_editor
110 or last_doc_version is None
111 or current_doc_version is None
112 or last_doc_version < current_doc_version
113 ):
114 new_content = doc.source
116 self.last_doc_version = doc.version
117 self.is_open_in_editor = True
118 elif doc.uri.startswith("file://"):
119 try:
120 with open(self.path) as fd:
121 st = os.fstat(fd.fileno())
122 current_mtime = st.st_mtime
123 last_mtime = self.last_mtime or current_mtime - 1
124 if self.is_open_in_editor or current_mtime > last_mtime:
125 new_content = fd.read()
126 self.last_mtime = current_mtime
127 except FileNotFoundError:
128 self._clear_cache()
129 self.is_valid = False
130 return False
131 self.is_open_in_editor = is_open
132 if new_content is not None: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true
133 self._update_cache(doc, new_content)
134 self.is_valid = True
135 return True
138@dataclasses.dataclass(slots=True)
139class Deb822FileCache(FileCache):
140 deb822_file: Deb822FileElement | None = None
142 def _update_cache(self, doc: "TextDocument", source: str) -> None:
143 deb822_file = parse_deb822_file(
144 source.splitlines(keepends=True),
145 accept_files_with_error_tokens=True,
146 accept_files_with_duplicated_fields=True,
147 )
148 self.deb822_file = deb822_file
150 def _clear_cache(self) -> None:
151 self.deb822_file = None
154@dataclasses.dataclass(slots=True)
155class DctrlFileCache(Deb822FileCache):
156 dctrl_parser: DctrlParser | None = None
157 source_package: SourcePackage | None = None
158 binary_packages: Mapping[str, BinaryPackage] | None = None
160 def _update_cache(self, doc: "TextDocument", source: str) -> None:
161 deb822_file, source_package, binary_packages = (
162 self.dctrl_parser.parse_source_debian_control(
163 source.splitlines(keepends=True),
164 ignore_errors=True,
165 )
166 )
167 self.deb822_file = deb822_file
168 self.source_package = source_package
169 self.binary_packages = binary_packages
171 def _clear_cache(self) -> None:
172 super()._clear_cache()
173 self.source_package = None
174 self.binary_packages = None
177@dataclasses.dataclass(slots=True)
178class SalsaCICache(FileCache):
179 parsed_content: CommentedMap | None = None
181 def _update_cache(self, doc: "TextDocument", source: str) -> None:
182 try:
183 value = MANIFEST_YAML.load(source)
184 if isinstance(value, CommentedMap):
185 self.parsed_content = value
186 except YAMLError:
187 pass
189 def _clear_cache(self) -> None:
190 self.parsed_content = None
193@dataclasses.dataclass(slots=True)
194class DebianRulesCache(FileCache):
195 sequences: set[str] | None = None
196 saw_dh: bool = False
198 def _update_cache(self, doc: "TextDocument", source: str) -> None:
199 sequences = set()
200 self.saw_dh = parse_drules_for_addons(
201 source.splitlines(),
202 sequences,
203 )
204 self.sequences = sequences
206 def _clear_cache(self) -> None:
207 self.sequences = None
208 self.saw_dh = False
211class LSProvidedLintState(LintState):
212 def __init__(
213 self,
214 ls: "DebputyLanguageServer",
215 doc: "TextDocument",
216 source_root: str,
217 debian_dir_path: str,
218 dctrl_parser: DctrlParser,
219 ) -> None:
220 self._ls = ls
221 self._doc = doc
222 # Cache lines (doc.lines re-splits everytime)
223 self._lines = doc.lines
224 self._source_root = FSROOverlay.create_root_dir(".", source_root)
225 debian_dir = self._source_root.get("debian")
226 if debian_dir is not None and not debian_dir.is_dir: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 debian_dir = None
228 self._debian_dir = debian_dir
229 self._diagnostics: list[types.Diagnostic] | None = None
230 self._last_emitted_diagnostic_count = 0
231 dctrl_file = os.path.join(debian_dir_path, "control")
233 if dctrl_file != doc.path:
235 self._dctrl_cache: DctrlFileCache = ls.file_cache_for(
236 from_fs_path(dctrl_file),
237 dctrl_file,
238 DctrlFileCache,
239 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser),
240 )
241 self._deb822_file: Deb822FileCache = ls.file_cache_for(
242 doc.uri,
243 doc.path,
244 Deb822FileCache,
245 lambda uri, path: Deb822FileCache(uri, path),
246 )
247 else:
248 self._dctrl_cache = ls.file_cache_for(
249 doc.uri,
250 doc.path,
251 DctrlFileCache,
252 lambda uri, path: DctrlFileCache(uri, path, dctrl_parser=dctrl_parser),
253 )
254 self._deb822_file = self._dctrl_cache
256 self._salsa_ci_caches = [
257 ls.file_cache_for(
258 from_fs_path(os.path.join(debian_dir_path, p)),
259 os.path.join(debian_dir_path, p),
260 SalsaCICache,
261 lambda uri, path: SalsaCICache(uri, path),
262 )
263 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml"))
264 ]
265 drules_path = os.path.join(debian_dir_path, "rules")
266 self._drules_cache = ls.file_cache_for(
267 from_fs_path(drules_path) if doc.path != drules_path else doc.uri,
268 drules_path,
269 DebianRulesCache,
270 lambda uri, path: DebianRulesCache(uri, path),
271 )
273 @property
274 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
275 return self._ls.plugin_feature_set
277 @property
278 def doc_uri(self) -> str:
279 return self._doc.uri
281 @property
282 def doc_version(self) -> int | None:
283 return self._doc.version
285 @property
286 def source_root(self) -> VirtualPathBase | None:
287 return self._source_root
289 @property
290 def debian_dir(self) -> VirtualPathBase | None:
291 return self._debian_dir
293 @property
294 def path(self) -> str:
295 return self._doc.path
297 @property
298 def content(self) -> str:
299 return self._doc.source
301 @property
302 def lines(self) -> list[str]:
303 return self._lines
305 @property
306 def position_codec(self) -> LintCapablePositionCodec:
307 return self._doc.position_codec
309 def _resolve_dctrl(self) -> DctrlFileCache | None:
310 dctrl_cache = self._dctrl_cache
311 dctrl_cache.resolve_cache(self._ls)
312 return dctrl_cache
314 @property
315 def parsed_deb822_file_content(self) -> Deb822FileElement | None:
316 cache = self._deb822_file
317 cache.resolve_cache(self._ls)
318 return cache.deb822_file
320 @property
321 def source_package(self) -> SourcePackage | None:
322 return self._resolve_dctrl().source_package
324 @property
325 def binary_packages(self) -> Mapping[str, BinaryPackage] | None:
326 return self._resolve_dctrl().binary_packages
328 def _resolve_salsa_ci(self) -> CommentedMap | None:
329 for salsa_ci_cache in self._salsa_ci_caches:
330 if salsa_ci_cache.resolve_cache(self._ls):
331 return salsa_ci_cache.parsed_content
332 return None
334 @property
335 def effective_preference(self) -> EffectiveFormattingPreference | None:
336 source_package = self.source_package
337 salsa_ci = self._resolve_salsa_ci()
338 if source_package is None and salsa_ci is None:
339 return None
340 style, _, _ = determine_effective_preference(
341 self.maint_preference_table,
342 source_package,
343 salsa_ci,
344 )
345 return style
347 @property
348 def maint_preference_table(self) -> MaintainerPreferenceTable:
349 return self._ls.maint_preferences
351 @property
352 def salsa_ci(self) -> CommentedMap | None:
353 return None
355 @property
356 def dh_sequencer_data(self) -> DhSequencerData:
357 dh_sequences: set[str] = set()
358 saw_dh = False
359 src_pkg = self.source_package
360 drules_cache = self._drules_cache
361 if drules_cache.resolve_cache(self._ls):
362 saw_dh = drules_cache.saw_dh
363 if drules_cache.sequences:
364 dh_sequences.update(drules_cache.sequences)
365 if src_pkg:
366 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
368 return DhSequencerData(
369 frozenset(dh_sequences),
370 saw_dh,
371 )
373 @property
374 def workspace_text_edit_support(self) -> WorkspaceTextEditSupport:
375 return self._ls.workspace_text_edit_support
377 @property
378 def debputy_config(self) -> DebputyConfig:
379 return self._ls.debputy_config
381 def translation(self, domain: str) -> Translations:
382 return self._ls.translation(domain)
384 async def slow_iter(
385 self,
386 iterable: Iterable[T],
387 *,
388 yield_every: int = 100,
389 ) -> AsyncIterable[T]:
390 counter = 0
391 for value in iterable:
392 counter += 1
393 if counter >= yield_every:
394 await asyncio.sleep(0)
395 self._abort_on_outdated_doc_version()
396 counter = 0
397 yield value
398 if counter:
399 await asyncio.sleep(0)
400 self._abort_on_outdated_doc_version()
402 async def run_diagnostics(
403 self,
404 linter: AsyncLinterImpl,
405 ) -> list[types.Diagnostic]:
406 if self._diagnostics is not None:
407 raise RuntimeError(
408 "run_diagnostics cannot be run while it is already running"
409 )
410 self._diagnostics = diagnostics = []
412 await linter(self)
414 self._diagnostics = None
415 return diagnostics
417 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
418 diagnostics = self._diagnostics
419 if diagnostics is None:
420 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics")
422 diagnostics.append(diagnostic)
423 self._last_emitted_diagnostic_count += 1
424 if self._last_emitted_diagnostic_count >= 100:
425 self._abort_on_outdated_doc_version()
426 self._ls.record_diagnostics(
427 self.doc_uri,
428 self._doc.version,
429 diagnostics,
430 is_partial=True,
431 )
432 self._last_emitted_diagnostic_count = 0
434 def _abort_on_outdated_doc_version(self) -> None:
435 expected_version = self._doc.version
436 current_doc = self._ls.workspace.get_text_document(self.doc_uri)
437 if current_doc.version != expected_version:
438 raise AbortTaskError(
439 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed"
440 )
443def _preference(
444 client_preference: list[types.MarkupKind] | None,
445 options: Container[types.MarkupKind],
446 fallback_kind: types.MarkupKind,
447) -> types.MarkupKind:
448 if not client_preference: 448 ↛ 450line 448 didn't jump to line 450 because the condition on line 448 was always true
449 return fallback_kind
450 for markdown_kind in client_preference:
451 if markdown_kind in options:
452 return markdown_kind
453 return fallback_kind
456class DebputyLanguageServer(LanguageServer):
458 def __init__(
459 self,
460 *args: Any,
461 **kwargs: Any,
462 ) -> None:
463 super().__init__(*args, **kwargs)
464 self._dctrl_parser: DctrlParser | None = None
465 self._plugin_feature_set: PluginProvidedFeatureSet | None = None
466 self._trust_language_ids: bool | None = None
467 self._finished_initialization = False
468 self.maint_preferences = MaintainerPreferenceTable({}, {})
469 self.apt_cache = AptCache()
470 self.background_tasks = set()
471 self.client_locale: str | None = None
472 self.forced_locale: str | None = None
473 self._active_locale: list[str] | None = None
474 self._diagnostic_reports: dict[str, DiagnosticReport] = {}
475 self.workspace_text_edit_support = WorkspaceTextEditSupport()
476 self.debputy_config = load_debputy_config()
477 self._file_state_caches: dict[str, dict[type[FileCache], FileCache]] = (
478 collections.defaultdict(dict)
479 )
480 self.hover_output_style = OutputStyle()
482 def finish_startup_initialization(self) -> None:
483 if self._finished_initialization:
484 return
486 assert self._dctrl_parser is not None
487 assert self._plugin_feature_set is not None
488 assert self._trust_language_ids is not None
489 self.maint_preferences = self.maint_preferences.load_preferences()
490 _info(
491 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded"
492 )
493 if (
494 self.hover_markup_format(
495 types.MarkupKind.Markdown, types.MarkupKind.PlainText
496 )
497 == types.MarkupKind.Markdown
498 ):
499 self.hover_output_style = MarkdownOutputStyle()
500 self._finished_initialization = True
502 async def on_initialize(self, params: types.InitializeParams) -> None:
503 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache")
504 self.background_tasks.add(task)
505 task.add_done_callback(self.background_tasks.discard)
506 self.client_locale = params.locale
507 if self.forced_locale is not None:
508 _info(
509 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]"
510 )
511 else:
512 _info(
513 f"Client locale: {self.client_locale}. Use --force-locale to override"
514 )
515 _info(f"Cwd: {os.getcwd()}")
516 self._update_locale()
517 self.workspace_text_edit_support = WorkspaceTextEditSupport(
518 supported_resource_operation_edit_kinds=self._supported_resource_operation_edit_kinds,
519 supports_document_changes=self._supports_ws_document_changes,
520 )
521 self._mask_ghost_requests()
523 def _mask_ghost_requests(self) -> None:
524 try:
525 if types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT not in self.lsp.fm.features:
527 # Work around `kate` bug (https://bugs.kde.org/show_bug.cgi?id=506664) for selected requests
528 # that are really annoying (that is, triggers all the time).
529 def _ghost_request(*_args: Any) -> None:
530 _info(
531 "Ignoring unsupported request from client that it should not have sent."
532 )
534 self.lsp.fm.features[types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT] = (
535 _ghost_request
536 )
537 except (AttributeError, TypeError, KeyError, ValueError) as e:
538 _info(
539 f"Could install ghost handler, continuing without. Error was: {str(e)}"
540 )
541 else:
542 _info(
543 f"Injecting fake {types.TEXT_DOCUMENT_DOCUMENT_HIGHLIGHT} handler to work around bugs like KDE#506664"
544 )
546 def _update_locale(self) -> None:
547 if self.forced_locale is not None:
548 self._active_locale = [self.forced_locale]
549 elif self.client_locale is not None:
550 self._active_locale = [self.client_locale]
551 else:
552 self._active_locale = None
554 def file_cache_for(
555 self,
556 uri: str,
557 path: str,
558 cache_type: type[T],
559 initializer: Callable[[str, str], T],
560 ) -> T:
561 inner_cache = self._file_state_caches.get(path)
562 result = inner_cache.get(cache_type) if inner_cache else None
563 if result is None:
564 result = initializer(uri, path)
565 if uri not in self.workspace.text_documents:
566 # We do not get a proper notification on when to discard the cache.
567 # For now, we simply do not cache them at all to avoid "leaking by infinite cache".
568 #
569 # Note that even if we did cache them, we would have to track whether the files have
570 # changed (which we do not), so the cache itself would not be useful.
571 return result
572 assert isinstance(result, cache_type)
573 if inner_cache is None: 573 ↛ 576line 573 didn't jump to line 576 because the condition on line 573 was always true
574 inner_cache = {}
575 self._file_state_caches[path] = inner_cache
576 inner_cache[cache_type] = result
577 else:
578 assert isinstance(result, cache_type)
579 return result
581 def shutdown(self) -> None:
582 for task in self.background_tasks:
583 _info(f"Cancelling task: {task.get_name()}")
584 self.loop.call_soon_threadsafe(task.cancel)
585 return super().shutdown()
587 def translation(self, domain: str) -> Translations:
588 return l10n.translation(
589 domain,
590 languages=self._active_locale,
591 )
593 async def _load_apt_cache(self) -> None:
594 if self.apt_cache.state in ("loading", "loaded"):
595 _info(
596 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering"
597 )
598 return
599 _info("Starting load of apt cache data")
600 start = time.time()
601 try:
602 await self.apt_cache.load()
603 except ValueError as ex:
604 _warn(f"Could not load apt cache: {ex}")
605 else:
606 end = time.time()
607 _info(
608 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}"
609 )
611 @property
612 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
613 res = self._plugin_feature_set
614 if res is None: 614 ↛ 615line 614 didn't jump to line 615 because the condition on line 614 was never true
615 raise RuntimeError(
616 "Initialization error: The plugin feature set has not been initialized before it was needed."
617 )
618 return res
620 @plugin_feature_set.setter
621 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None:
622 if self._plugin_feature_set is not None: 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true
623 raise RuntimeError(
624 "The plugin_feature_set attribute cannot be changed once set"
625 )
626 self._plugin_feature_set = plugin_feature_set
628 @property
629 def dctrl_parser(self) -> DctrlParser:
630 res = self._dctrl_parser
631 if res is None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 raise RuntimeError(
633 "Initialization error: The dctrl_parser has not been initialized before it was needed."
634 )
635 return res
637 @dctrl_parser.setter
638 def dctrl_parser(self, parser: DctrlParser) -> None:
639 if self._dctrl_parser is not None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 raise RuntimeError("The dctrl_parser attribute cannot be changed once set")
641 self._dctrl_parser = parser
643 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState:
644 dir_path = os.path.dirname(doc.path)
646 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 dir_path = os.path.dirname(dir_path)
649 source_root = os.path.dirname(dir_path)
651 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser)
653 @property
654 def _client_hover_markup_formats(self) -> list[types.MarkupKind] | None:
655 try:
656 return (
657 self.client_capabilities.text_document.hover.content_format
658 ) # type: ignore
659 except AttributeError:
660 return None
662 def hover_markup_format(
663 self,
664 *options: types.MarkupKind,
665 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
666 ) -> types.MarkupKind:
667 """Pick the client preferred hover markup format from a set of options
669 :param options: The markup kinds possible.
670 :param fallback_kind: If no overlapping option was found in the client preferences
671 (or client did not announce a value at all), this parameter is returned instead.
672 :returns: The client's preferred markup format from the provided options, or,
673 (if there is no overlap), the `fallback_kind` value is returned.
674 """
675 client_preference = self._client_hover_markup_formats
676 return _preference(client_preference, frozenset(options), fallback_kind)
678 @property
679 def _client_completion_item_document_markup_formats(
680 self,
681 ) -> list[types.MarkupKind] | None:
682 try:
683 return (
684 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore
685 )
686 except AttributeError:
687 return None
689 @property
690 def _supports_ws_document_changes(self) -> bool:
691 try:
692 return (
693 self.client_capabilities.workspace.workspace_edit.document_changes # type : ignore
694 )
695 except AttributeError:
696 return False
698 @property
699 def _supported_resource_operation_edit_kinds(
700 self,
701 ) -> Sequence[types.ResourceOperationKind]:
702 try:
703 return (
704 self.client_capabilities.workspace.workspace_edit.resource_operations # type : ignore
705 )
706 except AttributeError:
707 return []
709 def completion_item_document_markup(
710 self,
711 *options: types.MarkupKind,
712 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
713 ) -> types.MarkupKind:
714 """Pick the client preferred completion item documentation markup format from a set of options
716 :param options: The markup kinds possible.
717 :param fallback_kind: If no overlapping option was found in the client preferences
718 (or client did not announce a value at all), this parameter is returned instead.
719 :returns: The client's preferred markup format from the provided options, or,
720 (if there is no overlap), the `fallback_kind` value is returned.
721 """
723 client_preference = self._client_completion_item_document_markup_formats
724 return _preference(client_preference, frozenset(options), fallback_kind)
726 @property
727 def trust_language_ids(self) -> bool:
728 v = self._trust_language_ids
729 if v is None:
730 return True
731 return v
733 @trust_language_ids.setter
734 def trust_language_ids(self, new_value: bool) -> None:
735 self._trust_language_ids = new_value
737 def determine_language_id(
738 self,
739 doc: "TextDocument",
740 ) -> tuple[Literal["editor-provided", "filename"], str, str]:
741 lang_id = doc.language_id
742 path = doc.path
743 try:
744 last_idx = path.rindex("debian/")
745 except ValueError:
746 cleaned_filename = os.path.basename(path)
747 else:
748 cleaned_filename = path[last_idx:]
750 if self.trust_language_ids and lang_id and not lang_id.isspace():
751 if lang_id not in ("fundamental",):
752 return "editor-provided", lang_id, cleaned_filename
753 _info(
754 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)"
755 )
757 return "filename", cleaned_filename, cleaned_filename
759 def close_document(self, uri: str) -> None:
760 path = to_fs_path(uri)
761 with suppress(KeyError):
762 del self._diagnostic_reports[uri]
763 with suppress(KeyError):
764 del self._file_state_caches[path]
766 async def slow_iter(
767 self,
768 iterable: Iterable[T],
769 *,
770 yield_every: int = 100,
771 ) -> AsyncIterable[T]:
772 counter = 0
773 for value in iterable:
774 counter += 1
775 if counter >= yield_every: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 await asyncio.sleep(0)
777 counter = 0
778 yield value
779 if counter: 779 ↛ exitline 779 didn't return from function 'slow_iter' because the condition on line 779 was always true
780 await asyncio.sleep(0)
782 def record_diagnostics(
783 self,
784 doc_uri: str,
785 version: int,
786 diagnostics: list[types.Diagnostic],
787 is_partial: bool,
788 ) -> None:
789 self._diagnostic_reports[doc_uri] = DiagnosticReport(
790 doc_uri,
791 version,
792 f"{version}@{doc_uri}",
793 is_partial,
794 diagnostics,
795 )
796 self.publish_diagnostics(doc_uri, diagnostics, version)
798 def diagnostics_in_range(
799 self,
800 uri: str,
801 text_range: types.Range,
802 ) -> list[types.Diagnostic] | None:
803 report = self._diagnostic_reports.get(uri)
804 if report is None or report.is_in_progress:
805 return None
806 doc = self.workspace.get_text_document(uri)
807 if doc.version is not None and doc.version != report.doc_version:
808 return None
810 return report.diagnostics_in_range(text_range)