Coverage for src/debputy/lsp/debputy_ls.py: 54%
401 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import asyncio
2import dataclasses
3import os
4import time
5from collections.abc import AsyncIterable
6from typing import (
7 Optional,
8 List,
9 Any,
10 Mapping,
11 Container,
12 TYPE_CHECKING,
13 Tuple,
14 Literal,
15 Set,
16 Dict,
17 Iterable,
18)
20import debputy.l10n as l10n
21from debputy.dh.dh_assistant import (
22 parse_drules_for_addons,
23 DhSequencerData,
24 extract_dh_addons_from_control,
25)
26from debputy.filesystem_scan import FSROOverlay, VirtualPathBase
27from debputy.l10n import Translations
28from debputy.linting.lint_util import (
29 LintState,
30 AsyncLinterImpl,
31 AbortTaskError,
32)
33from debputy.lsp.apt_cache import AptCache
34from debputy.lsp.diagnostics import DiagnosticReport
35from debputy.lsp.maint_prefs import (
36 MaintainerPreferenceTable,
37 MaintainerPreference,
38 determine_effective_preference,
39)
40from debputy.lsp.text_util import LintCapablePositionCodec
41from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
42from debputy.packages import (
43 SourcePackage,
44 BinaryPackage,
45 DctrlParser,
46)
47from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
48from debputy.util import _info, _warn, T
49from debputy.yaml import MANIFEST_YAML, YAMLError
50from debputy.yaml.compat import CommentedMap
52if TYPE_CHECKING:
53 import lsprotocol.types as types
54 from pygls.server import LanguageServer
55 from pygls.workspace import TextDocument
56 from pygls.uris import from_fs_path
58else:
59 import debputy.lsprotocol.types as types
61 try:
62 from pygls.server import LanguageServer
63 from pygls.workspace import TextDocument
64 from pygls.uris import from_fs_path
65 except ImportError as e:
67 class LanguageServer:
68 def __init__(self, *args, **kwargs) -> None:
69 """Placeholder to work if pygls is not installed"""
70 # Should not be called
71 global e
72 raise e # pragma: no cover
75@dataclasses.dataclass(slots=True)
76class FileCache:
77 doc_uri: str
78 path: str
79 is_open_in_editor: Optional[bool] = None
80 last_doc_version: Optional[int] = None
81 last_mtime: Optional[float] = None
82 is_valid: bool = False
84 def _update_cache(self, doc: "TextDocument", source: str) -> None:
85 raise NotImplementedError
87 def _clear_cache(self) -> None:
88 raise NotImplementedError
90 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool:
91 doc = ls.workspace.text_documents.get(self.doc_uri)
92 if doc is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 doc = ls.workspace.get_text_document(self.doc_uri)
94 is_open = False
95 else:
96 is_open = True
97 new_content: Optional[str] = None
98 if is_open: 98 ↛ 110line 98 didn't jump to line 110 because the condition on line 98 was always true
99 last_doc_version = self.last_doc_version
100 dctrl_doc_version = doc.version
101 if ( 101 ↛ 109line 101 didn't jump to line 109
102 not self.is_open_in_editor
103 or last_doc_version is None
104 or dctrl_doc_version is None
105 or last_doc_version < dctrl_doc_version
106 ):
107 new_content = doc.source
109 self.last_doc_version = doc.version
110 elif doc.uri.startswith("file://"):
111 try:
112 with open(self.path) as fd:
113 st = os.fstat(fd.fileno())
114 current_mtime = st.st_mtime
115 last_mtime = self.last_mtime or current_mtime - 1
116 if self.is_open_in_editor or current_mtime > last_mtime:
117 new_content = fd.read()
118 self.last_mtime = current_mtime
119 except FileNotFoundError:
120 self._clear_cache()
121 self.is_valid = False
122 return False
123 if new_content is not None: 123 ↛ 125line 123 didn't jump to line 125 because the condition on line 123 was always true
124 self._update_cache(doc, new_content)
125 self.is_valid = True
126 return True
129@dataclasses.dataclass(slots=True)
130class Deb822FileCache(FileCache):
131 deb822_file: Optional[Deb822FileElement] = None
133 def _update_cache(self, doc: "TextDocument", source: str) -> None:
134 deb822_file = parse_deb822_file(
135 source.splitlines(keepends=True),
136 accept_files_with_error_tokens=True,
137 accept_files_with_duplicated_fields=True,
138 )
139 self.deb822_file = deb822_file
141 def _clear_cache(self) -> None:
142 self.deb822_file = None
145@dataclasses.dataclass(slots=True)
146class DctrlFileCache(Deb822FileCache):
147 dctrl_parser: Optional[DctrlParser] = None
148 source_package: Optional[SourcePackage] = None
149 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
151 def _update_cache(self, doc: "TextDocument", source: str) -> None:
152 deb822_file, source_package, binary_packages = (
153 self.dctrl_parser.parse_source_debian_control(
154 source.splitlines(keepends=True),
155 ignore_errors=True,
156 )
157 )
158 self.deb822_file = deb822_file
159 self.source_package = source_package
160 self.binary_packages = binary_packages
162 def _clear_cache(self) -> None:
163 super()._clear_cache()
164 self.source_package = None
165 self.binary_packages = None
168@dataclasses.dataclass(slots=True)
169class SalsaCICache(FileCache):
170 parsed_content: Optional[CommentedMap] = None
172 def _update_cache(self, doc: "TextDocument", source: str) -> None:
173 try:
174 value = MANIFEST_YAML.load(source)
175 if isinstance(value, CommentedMap):
176 self.parsed_content = value
177 except YAMLError:
178 pass
180 def _clear_cache(self) -> None:
181 self.parsed_content = None
184@dataclasses.dataclass(slots=True)
185class DebianRulesCache(FileCache):
186 sequences: Optional[Set[str]] = None
187 saw_dh: bool = False
189 def _update_cache(self, doc: "TextDocument", source: str) -> None:
190 sequences = set()
191 self.saw_dh = parse_drules_for_addons(
192 source.splitlines(),
193 sequences,
194 )
195 self.sequences = sequences
197 def _clear_cache(self) -> None:
198 self.sequences = None
199 self.saw_dh = False
202class LSProvidedLintState(LintState):
203 def __init__(
204 self,
205 ls: "DebputyLanguageServer",
206 doc: "TextDocument",
207 source_root: str,
208 debian_dir_path: str,
209 dctrl_parser: DctrlParser,
210 ) -> None:
211 self._ls = ls
212 self._doc = doc
213 # Cache lines (doc.lines re-splits everytime)
214 self._lines = doc.lines
215 self._source_root = FSROOverlay.create_root_dir(".", source_root)
216 debian_dir = self._source_root.get("debian")
217 if debian_dir is not None and not debian_dir.is_dir: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 debian_dir = None
219 self._debian_dir = debian_dir
220 self._diagnostics: Optional[List[types.Diagnostic]] = None
221 self._last_emitted_diagnostic_count = 0
222 dctrl_file = os.path.join(debian_dir_path, "control")
224 if dctrl_file != doc.path: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 self._dctrl_cache: DctrlFileCache = DctrlFileCache(
226 from_fs_path(dctrl_file),
227 dctrl_file,
228 dctrl_parser=dctrl_parser,
229 )
230 self._deb822_file: Deb822FileCache = Deb822FileCache(
231 doc.uri,
232 doc.path,
233 )
234 else:
235 self._dctrl_cache: DctrlFileCache = DctrlFileCache(
236 doc.uri,
237 doc.path,
238 dctrl_parser=dctrl_parser,
239 )
240 self._deb822_file = self._dctrl_cache
242 self._salsa_ci_caches = [
243 SalsaCICache(
244 from_fs_path(os.path.join(debian_dir_path, p)),
245 os.path.join(debian_dir_path, p),
246 )
247 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml"))
248 ]
249 drules_path = os.path.join(debian_dir_path, "rules")
250 self._drules_cache = DebianRulesCache(
251 from_fs_path(drules_path) if doc.path != drules_path else doc.uri,
252 drules_path,
253 )
255 @property
256 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
257 return self._ls.plugin_feature_set
259 @property
260 def doc_uri(self) -> str:
261 return self._doc.uri
263 @property
264 def source_root(self) -> Optional[VirtualPathBase]:
265 return self._source_root
267 @property
268 def debian_dir(self) -> Optional[VirtualPathBase]:
269 return self._debian_dir
271 @property
272 def path(self) -> str:
273 return self._doc.path
275 @property
276 def content(self) -> str:
277 return self._doc.source
279 @property
280 def lines(self) -> List[str]:
281 return self._lines
283 @property
284 def position_codec(self) -> LintCapablePositionCodec:
285 return self._doc.position_codec
287 def _resolve_dctrl(self) -> Optional[DctrlFileCache]:
288 dctrl_cache = self._dctrl_cache
289 dctrl_cache.resolve_cache(self._ls)
290 return dctrl_cache
292 @property
293 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
294 cache = self._deb822_file
295 cache.resolve_cache(self._ls)
296 return cache.deb822_file
298 @property
299 def source_package(self) -> Optional[SourcePackage]:
300 return self._resolve_dctrl().source_package
302 @property
303 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
304 return self._resolve_dctrl().binary_packages
306 def _resolve_salsa_ci(self) -> Optional[CommentedMap]:
307 for salsa_ci_cache in self._salsa_ci_caches:
308 if salsa_ci_cache.resolve_cache(self._ls):
309 return salsa_ci_cache.parsed_content
310 return None
312 @property
313 def effective_preference(self) -> Optional[MaintainerPreference]:
314 source_package = self.source_package
315 salsa_ci = self._resolve_salsa_ci()
316 if source_package is None and salsa_ci is None:
317 return None
318 style, _, _ = determine_effective_preference(
319 self.maint_preference_table,
320 source_package,
321 salsa_ci,
322 )
323 return style
325 @property
326 def maint_preference_table(self) -> MaintainerPreferenceTable:
327 return self._ls.maint_preferences
329 @property
330 def salsa_ci(self) -> Optional[CommentedMap]:
331 return None
333 @property
334 def dh_sequencer_data(self) -> DhSequencerData:
335 dh_sequences: Set[str] = set()
336 saw_dh = False
337 src_pkg = self.source_package
338 drules_cache = self._drules_cache
339 if drules_cache.resolve_cache(self._ls):
340 saw_dh = drules_cache.saw_dh
341 if drules_cache.sequences:
342 dh_sequences.update(drules_cache.sequences)
343 if src_pkg:
344 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
346 return DhSequencerData(
347 frozenset(dh_sequences),
348 saw_dh,
349 )
351 def translation(self, domain: str) -> Translations:
352 return self._ls.translation(domain)
354 async def slow_iter(
355 self,
356 iterable: Iterable[T],
357 *,
358 yield_every: int = 100,
359 ) -> AsyncIterable[T]:
360 counter = 0
361 for value in iterable:
362 counter += 1
363 if counter >= yield_every:
364 await asyncio.sleep(0)
365 self._abort_on_outdated_doc_version()
366 counter = 0
367 yield value
368 if counter:
369 await asyncio.sleep(0)
370 self._abort_on_outdated_doc_version()
372 async def run_diagnostics(
373 self,
374 linter: AsyncLinterImpl,
375 ) -> List[types.Diagnostic]:
376 if self._diagnostics is not None:
377 raise RuntimeError(
378 "run_diagnostics cannot be run while it is already running"
379 )
380 self._diagnostics = diagnostics = []
382 await linter(self)
384 self._diagnostics = None
385 return diagnostics
387 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
388 diagnostics = self._diagnostics
389 if diagnostics is None:
390 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics")
392 diagnostics.append(diagnostic)
393 self._last_emitted_diagnostic_count += 1
394 if self._last_emitted_diagnostic_count >= 100:
395 self._abort_on_outdated_doc_version()
396 self._ls.record_diagnostics(
397 self.doc_uri,
398 self._doc.version,
399 diagnostics,
400 is_partial=True,
401 )
402 self._last_emitted_diagnostic_count = 0
404 def _abort_on_outdated_doc_version(self) -> None:
405 expected_version = self._doc.version
406 current_doc = self._ls.workspace.get_text_document(self.doc_uri)
407 if current_doc.version != expected_version:
408 raise AbortTaskError(
409 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed"
410 )
413def _preference(
414 client_preference: Optional[List[types.MarkupKind]],
415 options: Container[types.MarkupKind],
416 fallback_kind: types.MarkupKind,
417) -> types.MarkupKind:
418 if not client_preference: 418 ↛ 420line 418 didn't jump to line 420 because the condition on line 418 was always true
419 return fallback_kind
420 for markdown_kind in client_preference:
421 if markdown_kind in options:
422 return markdown_kind
423 return fallback_kind
426class DebputyLanguageServer(LanguageServer):
428 def __init__(
429 self,
430 *args: Any,
431 **kwargs: Any,
432 ) -> None:
433 super().__init__(*args, **kwargs)
434 self._dctrl_parser: Optional[DctrlParser] = None
435 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None
436 self._trust_language_ids: Optional[bool] = None
437 self._finished_initialization = False
438 self.maint_preferences = MaintainerPreferenceTable({}, {})
439 self.apt_cache = AptCache()
440 self.background_tasks = set()
441 self.client_locale: Optional[str] = None
442 self.forced_locale: Optional[str] = None
443 self._active_locale: Optional[List[str]] = None
444 self._diagnostic_reports: Dict[str, DiagnosticReport] = {}
446 def finish_startup_initialization(self) -> None:
447 if self._finished_initialization:
448 return
449 assert self._dctrl_parser is not None
450 assert self._plugin_feature_set is not None
451 assert self._trust_language_ids is not None
452 self.maint_preferences = self.maint_preferences.load_preferences()
453 _info(
454 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded"
455 )
456 self._finished_initialization = True
458 async def on_initialize(self, params: types.InitializeParams) -> None:
459 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache")
460 self.background_tasks.add(task)
461 task.add_done_callback(self.background_tasks.discard)
462 self.client_locale = params.locale
463 if self.forced_locale is not None:
464 _info(
465 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]"
466 )
467 else:
468 _info(
469 f"Client locale: {self.client_locale}. Use --force-locale to override"
470 )
471 _info(f"Cwd: {os.getcwd()}")
472 self._update_locale()
474 def _update_locale(self) -> None:
475 if self.forced_locale is not None:
476 self._active_locale = [self.forced_locale]
477 elif self.client_locale is not None:
478 self._active_locale = [self.client_locale]
479 else:
480 self._active_locale = None
482 def shutdown(self) -> None:
483 for task in self.background_tasks:
484 _info(f"Cancelling task: {task.get_name()}")
485 self.loop.call_soon_threadsafe(task.cancel)
486 return super().shutdown()
488 def translation(self, domain: str) -> Translations:
489 return l10n.translation(
490 domain,
491 languages=self._active_locale,
492 )
494 async def _load_apt_cache(self) -> None:
495 if self.apt_cache.state in ("loading", "loaded"):
496 _info(
497 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering"
498 )
499 return
500 _info("Starting load of apt cache data")
501 start = time.time()
502 try:
503 await self.apt_cache.load()
504 except ValueError as ex:
505 _warn(f"Could not load apt cache: {ex}")
506 else:
507 end = time.time()
508 _info(
509 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}"
510 )
512 @property
513 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
514 res = self._plugin_feature_set
515 if res is None: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 raise RuntimeError(
517 "Initialization error: The plugin feature set has not been initialized before it was needed."
518 )
519 return res
521 @plugin_feature_set.setter
522 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None:
523 if self._plugin_feature_set is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 raise RuntimeError(
525 "The plugin_feature_set attribute cannot be changed once set"
526 )
527 self._plugin_feature_set = plugin_feature_set
529 @property
530 def dctrl_parser(self) -> DctrlParser:
531 res = self._dctrl_parser
532 if res is None: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 raise RuntimeError(
534 "Initialization error: The dctrl_parser has not been initialized before it was needed."
535 )
536 return res
538 @dctrl_parser.setter
539 def dctrl_parser(self, parser: DctrlParser) -> None:
540 if self._dctrl_parser is not None: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 raise RuntimeError("The dctrl_parser attribute cannot be changed once set")
542 self._dctrl_parser = parser
544 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState:
545 dir_path = os.path.dirname(doc.path)
547 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 dir_path = os.path.dirname(dir_path)
550 source_root = os.path.dirname(dir_path)
552 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser)
554 @property
555 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]:
556 try:
557 return (
558 self.client_capabilities.text_document.hover.content_format
559 ) # type: ignore
560 except AttributeError:
561 return None
563 def hover_markup_format(
564 self,
565 *options: types.MarkupKind,
566 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
567 ) -> types.MarkupKind:
568 """Pick the client preferred hover markup format from a set of options
570 :param options: The markup kinds possible.
571 :param fallback_kind: If no overlapping option was found in the client preferences
572 (or client did not announce a value at all), this parameter is returned instead.
573 :returns: The client's preferred markup format from the provided options, or,
574 (if there is no overlap), the `fallback_kind` value is returned.
575 """
576 client_preference = self._client_hover_markup_formats
577 return _preference(client_preference, frozenset(options), fallback_kind)
579 @property
580 def _client_completion_item_document_markup_formats(
581 self,
582 ) -> Optional[List[types.MarkupKind]]:
583 try:
584 return (
585 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore
586 )
587 except AttributeError:
588 return None
590 def completion_item_document_markup(
591 self,
592 *options: types.MarkupKind,
593 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
594 ) -> types.MarkupKind:
595 """Pick the client preferred completion item documentation markup format from a set of options
597 :param options: The markup kinds possible.
598 :param fallback_kind: If no overlapping option was found in the client preferences
599 (or client did not announce a value at all), this parameter is returned instead.
600 :returns: The client's preferred markup format from the provided options, or,
601 (if there is no overlap), the `fallback_kind` value is returned.
602 """
604 client_preference = self._client_completion_item_document_markup_formats
605 return _preference(client_preference, frozenset(options), fallback_kind)
607 @property
608 def trust_language_ids(self) -> bool:
609 v = self._trust_language_ids
610 if v is None:
611 return True
612 return v
614 @trust_language_ids.setter
615 def trust_language_ids(self, new_value: bool) -> None:
616 self._trust_language_ids = new_value
618 def determine_language_id(
619 self,
620 doc: "TextDocument",
621 ) -> Tuple[Literal["editor-provided", "filename"], str, str]:
622 lang_id = doc.language_id
623 path = doc.path
624 try:
625 last_idx = path.rindex("debian/")
626 except ValueError:
627 cleaned_filename = os.path.basename(path)
628 else:
629 cleaned_filename = path[last_idx:]
631 if self.trust_language_ids and lang_id and not lang_id.isspace():
632 if lang_id not in ("fundamental",):
633 return "editor-provided", lang_id, cleaned_filename
634 _info(
635 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)"
636 )
638 return "filename", cleaned_filename, cleaned_filename
640 def close_document(self, uri: str) -> None:
641 try:
642 del self._diagnostic_reports[uri]
643 except KeyError:
644 pass
646 async def slow_iter(
647 self,
648 iterable: Iterable[T],
649 *,
650 yield_every: int = 100,
651 ) -> AsyncIterable[T]:
652 counter = 0
653 for value in iterable:
654 counter += 1
655 if counter >= yield_every: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 await asyncio.sleep(0)
657 counter = 0
658 yield value
659 if counter: 659 ↛ exitline 659 didn't return from function 'slow_iter' because the condition on line 659 was always true
660 await asyncio.sleep(0)
662 def record_diagnostics(
663 self,
664 doc_uri: str,
665 version: int,
666 diagnostics: List[types.Diagnostic],
667 is_partial: bool,
668 ) -> None:
669 self._diagnostic_reports[doc_uri] = DiagnosticReport(
670 doc_uri,
671 version,
672 f"{version}@{doc_uri}",
673 is_partial,
674 diagnostics,
675 )
676 self.publish_diagnostics(doc_uri, diagnostics, version)
678 def diagnostics_in_range(
679 self,
680 uri: str,
681 text_range: types.Range,
682 ) -> Optional[List[types.Diagnostic]]:
683 report = self._diagnostic_reports.get(uri)
684 if report is None or report.is_in_progress:
685 return None
686 doc = self.workspace.get_text_document(uri)
687 if doc.version is not None and doc.version != report.doc_version:
688 return None
690 return report.diagnostics_in_range(text_range)