Coverage for src/debputy/lsp/debputy_ls.py: 57%
347 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import os
3import time
4from typing import (
5 Optional,
6 List,
7 Any,
8 Mapping,
9 Container,
10 TYPE_CHECKING,
11 Tuple,
12 Literal,
13 Set,
14)
16import debputy.l10n as l10n
17from debputy.dh.dh_assistant import (
18 parse_drules_for_addons,
19 DhSequencerData,
20 extract_dh_addons_from_control,
21)
22from debputy.filesystem_scan import FSROOverlay, VirtualPathBase
23from debputy.l10n import Translations
24from debputy.linting.lint_util import (
25 LintState,
26 LinterImpl,
27)
28from debputy.lsp.apt_cache import AptCache
29from debputy.lsp.maint_prefs import (
30 MaintainerPreferenceTable,
31 MaintainerPreference,
32 determine_effective_preference,
33)
34from debputy.lsp.text_util import LintCapablePositionCodec
35from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file
36from debputy.packages import (
37 SourcePackage,
38 BinaryPackage,
39 DctrlParser,
40)
41from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
42from debputy.util import _info, _warn
43from debputy.yaml import MANIFEST_YAML, YAMLError
44from debputy.yaml.compat import CommentedMap
46if TYPE_CHECKING:
47 import lsprotocol.types as types
48 from pygls.server import LanguageServer
49 from pygls.workspace import TextDocument
50 from pygls.uris import from_fs_path
52else:
53 import debputy.lsprotocol.types as types
55 try:
56 from pygls.server import LanguageServer
57 from pygls.workspace import TextDocument
58 from pygls.uris import from_fs_path
59 except ImportError as e:
61 class LanguageServer:
62 def __init__(self, *args, **kwargs) -> None:
63 """Placeholder to work if pygls is not installed"""
64 # Should not be called
65 global e
66 raise e # pragma: no cover
69@dataclasses.dataclass(slots=True)
70class FileCache:
71 doc_uri: str
72 path: str
73 is_open_in_editor: Optional[bool] = None
74 last_doc_version: Optional[int] = None
75 last_mtime: Optional[float] = None
76 is_valid: bool = False
78 def _update_cache(self, doc: "TextDocument", source: str) -> None:
79 raise NotImplementedError
81 def _clear_cache(self) -> None:
82 raise NotImplementedError
84 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool:
85 doc = ls.workspace.text_documents.get(self.doc_uri)
86 if doc is None: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 doc = ls.workspace.get_text_document(self.doc_uri)
88 is_open = False
89 else:
90 is_open = True
91 new_content: Optional[str] = None
92 if is_open: 92 ↛ 104line 92 didn't jump to line 104 because the condition on line 92 was always true
93 last_doc_version = self.last_doc_version
94 dctrl_doc_version = doc.version
95 if ( 95 ↛ 103line 95 didn't jump to line 103
96 not self.is_open_in_editor
97 or last_doc_version is None
98 or dctrl_doc_version is None
99 or last_doc_version < dctrl_doc_version
100 ):
101 new_content = doc.source
103 self.last_doc_version = doc.version
104 elif doc.uri.startswith("file://"):
105 try:
106 with open(self.path) as fd:
107 st = os.fstat(fd.fileno())
108 current_mtime = st.st_mtime
109 last_mtime = self.last_mtime or current_mtime - 1
110 if self.is_open_in_editor or current_mtime > last_mtime:
111 new_content = fd.read()
112 self.last_mtime = current_mtime
113 except FileNotFoundError:
114 self._clear_cache()
115 self.is_valid = False
116 return False
117 if new_content is not None: 117 ↛ 119line 117 didn't jump to line 119 because the condition on line 117 was always true
118 self._update_cache(doc, new_content)
119 self.is_valid = True
120 return True
123@dataclasses.dataclass(slots=True)
124class Deb822FileCache(FileCache):
125 deb822_file: Optional[Deb822FileElement] = None
127 def _update_cache(self, doc: "TextDocument", source: str) -> None:
128 deb822_file = parse_deb822_file(
129 source.splitlines(keepends=True),
130 accept_files_with_error_tokens=True,
131 accept_files_with_duplicated_fields=True,
132 )
133 self.deb822_file = deb822_file
135 def _clear_cache(self) -> None:
136 self.deb822_file = None
139@dataclasses.dataclass(slots=True)
140class DctrlFileCache(Deb822FileCache):
141 dctrl_parser: Optional[DctrlParser] = None
142 source_package: Optional[SourcePackage] = None
143 binary_packages: Optional[Mapping[str, BinaryPackage]] = None
145 def _update_cache(self, doc: "TextDocument", source: str) -> None:
146 deb822_file, source_package, binary_packages = (
147 self.dctrl_parser.parse_source_debian_control(
148 source.splitlines(keepends=True),
149 ignore_errors=True,
150 )
151 )
152 self.deb822_file = deb822_file
153 self.source_package = source_package
154 self.binary_packages = binary_packages
156 def _clear_cache(self) -> None:
157 super()._clear_cache()
158 self.source_package = None
159 self.binary_packages = None
162@dataclasses.dataclass(slots=True)
163class SalsaCICache(FileCache):
164 parsed_content: Optional[CommentedMap] = None
166 def _update_cache(self, doc: "TextDocument", source: str) -> None:
167 try:
168 value = MANIFEST_YAML.load(source)
169 if isinstance(value, CommentedMap):
170 self.parsed_content = value
171 except YAMLError:
172 pass
174 def _clear_cache(self) -> None:
175 self.parsed_content = None
178@dataclasses.dataclass(slots=True)
179class DebianRulesCache(FileCache):
180 sequences: Optional[Set[str]] = None
181 saw_dh: bool = False
183 def _update_cache(self, doc: "TextDocument", source: str) -> None:
184 sequences = set()
185 self.saw_dh = parse_drules_for_addons(
186 source.splitlines(),
187 sequences,
188 )
189 self.sequences = sequences
191 def _clear_cache(self) -> None:
192 self.sequences = None
193 self.saw_dh = False
196class LSProvidedLintState(LintState):
197 def __init__(
198 self,
199 ls: "DebputyLanguageServer",
200 doc: "TextDocument",
201 source_root: str,
202 debian_dir_path: str,
203 dctrl_parser: DctrlParser,
204 ) -> None:
205 self._ls = ls
206 self._doc = doc
207 # Cache lines (doc.lines re-splits everytime)
208 self._lines = doc.lines
209 self._source_root = FSROOverlay.create_root_dir(".", source_root)
210 debian_dir = self._source_root.get("debian")
211 if debian_dir is not None and not debian_dir.is_dir: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 debian_dir = None
213 self._debian_dir = debian_dir
214 self._diagnostics: Optional[List[types.Diagnostic]] = None
215 dctrl_file = os.path.join(debian_dir_path, "control")
217 if dctrl_file != doc.path: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 self._dctrl_cache: DctrlFileCache = DctrlFileCache(
219 from_fs_path(dctrl_file),
220 dctrl_file,
221 dctrl_parser=dctrl_parser,
222 )
223 self._deb822_file: Deb822FileCache = Deb822FileCache(
224 doc.uri,
225 doc.path,
226 )
227 else:
228 self._dctrl_cache: DctrlFileCache = DctrlFileCache(
229 doc.uri,
230 doc.path,
231 dctrl_parser=dctrl_parser,
232 )
233 self._deb822_file = self._dctrl_cache
235 self._salsa_ci_caches = [
236 SalsaCICache(
237 from_fs_path(os.path.join(debian_dir_path, p)),
238 os.path.join(debian_dir_path, p),
239 )
240 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml"))
241 ]
242 drules_path = os.path.join(debian_dir_path, "rules")
243 self._drules_cache = DebianRulesCache(
244 from_fs_path(drules_path) if doc.path != drules_path else doc.uri,
245 drules_path,
246 )
248 @property
249 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
250 return self._ls.plugin_feature_set
252 @property
253 def doc_uri(self) -> str:
254 return self._doc.uri
256 @property
257 def source_root(self) -> Optional[VirtualPathBase]:
258 return self._source_root
260 @property
261 def debian_dir(self) -> Optional[VirtualPathBase]:
262 return self._debian_dir
264 @property
265 def path(self) -> str:
266 return self._doc.path
268 @property
269 def content(self) -> str:
270 return self._doc.source
272 @property
273 def lines(self) -> List[str]:
274 return self._lines
276 @property
277 def position_codec(self) -> LintCapablePositionCodec:
278 return self._doc.position_codec
280 def _resolve_dctrl(self) -> Optional[DctrlFileCache]:
281 dctrl_cache = self._dctrl_cache
282 dctrl_cache.resolve_cache(self._ls)
283 return dctrl_cache
285 @property
286 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]:
287 cache = self._deb822_file
288 cache.resolve_cache(self._ls)
289 return cache.deb822_file
291 @property
292 def source_package(self) -> Optional[SourcePackage]:
293 return self._resolve_dctrl().source_package
295 @property
296 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]:
297 return self._resolve_dctrl().binary_packages
299 def _resolve_salsa_ci(self) -> Optional[CommentedMap]:
300 for salsa_ci_cache in self._salsa_ci_caches:
301 if salsa_ci_cache.resolve_cache(self._ls):
302 return salsa_ci_cache.parsed_content
303 return None
305 @property
306 def effective_preference(self) -> Optional[MaintainerPreference]:
307 source_package = self.source_package
308 salsa_ci = self._resolve_salsa_ci()
309 if source_package is None and salsa_ci is None:
310 return None
311 style, _, _ = determine_effective_preference(
312 self.maint_preference_table,
313 source_package,
314 salsa_ci,
315 )
316 return style
318 @property
319 def maint_preference_table(self) -> MaintainerPreferenceTable:
320 return self._ls.maint_preferences
322 @property
323 def salsa_ci(self) -> Optional[CommentedMap]:
324 return None
326 @property
327 def dh_sequencer_data(self) -> DhSequencerData:
328 dh_sequences: Set[str] = set()
329 saw_dh = False
330 src_pkg = self.source_package
331 drules_cache = self._drules_cache
332 if drules_cache.resolve_cache(self._ls):
333 saw_dh = drules_cache.saw_dh
334 if drules_cache.sequences:
335 dh_sequences.update(drules_cache.sequences)
336 if src_pkg:
337 extract_dh_addons_from_control(src_pkg.fields, dh_sequences)
339 return DhSequencerData(
340 frozenset(dh_sequences),
341 saw_dh,
342 )
344 def translation(self, domain: str) -> Translations:
345 return self._ls.translation(domain)
347 def run_diagnostics(
348 self,
349 linter: LinterImpl,
350 ) -> List[types.Diagnostic]:
351 if self._diagnostics is not None:
352 raise RuntimeError(
353 "run_diagnostics cannot be run while it is already running"
354 )
355 self._diagnostics = diagnostics = []
357 linter(self)
359 self._diagnostics = None
360 return diagnostics
362 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None:
363 diagnostics = self._diagnostics
364 if diagnostics is None:
365 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics")
366 diagnostics.append(diagnostic)
369def _preference(
370 client_preference: Optional[List[types.MarkupKind]],
371 options: Container[types.MarkupKind],
372 fallback_kind: types.MarkupKind,
373) -> types.MarkupKind:
374 if not client_preference: 374 ↛ 376line 374 didn't jump to line 376 because the condition on line 374 was always true
375 return fallback_kind
376 for markdown_kind in client_preference:
377 if markdown_kind in options:
378 return markdown_kind
379 return fallback_kind
382class DebputyLanguageServer(LanguageServer):
384 def __init__(
385 self,
386 *args: Any,
387 **kwargs: Any,
388 ) -> None:
389 super().__init__(*args, **kwargs)
390 self._dctrl_parser: Optional[DctrlParser] = None
391 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None
392 self._trust_language_ids: Optional[bool] = None
393 self._finished_initialization = False
394 self.maint_preferences = MaintainerPreferenceTable({}, {})
395 self.apt_cache = AptCache()
396 self.background_tasks = set()
397 self.client_locale: Optional[str] = None
398 self.forced_locale: Optional[str] = None
399 self._active_locale: Optional[List[str]] = None
401 def finish_startup_initialization(self) -> None:
402 if self._finished_initialization:
403 return
404 assert self._dctrl_parser is not None
405 assert self._plugin_feature_set is not None
406 assert self._trust_language_ids is not None
407 self.maint_preferences = self.maint_preferences.load_preferences()
408 _info(
409 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded"
410 )
411 self._finished_initialization = True
413 async def on_initialize(self, params: types.InitializeParams) -> None:
414 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache")
415 self.background_tasks.add(task)
416 task.add_done_callback(self.background_tasks.discard)
417 self.client_locale = params.locale
418 if self.forced_locale is not None:
419 _info(
420 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]"
421 )
422 else:
423 _info(
424 f"Client locale: {self.client_locale}. Use --force-locale to override"
425 )
426 self._update_locale()
428 def _update_locale(self) -> None:
429 if self.forced_locale is not None:
430 self._active_locale = [self.forced_locale]
431 elif self.client_locale is not None:
432 self._active_locale = [self.client_locale]
433 else:
434 self._active_locale = None
436 def shutdown(self) -> None:
437 for task in self.background_tasks:
438 _info(f"Cancelling task: {task.get_name()}")
439 self.loop.call_soon_threadsafe(task.cancel)
440 return super().shutdown()
442 def translation(self, domain: str) -> Translations:
443 return l10n.translation(
444 domain,
445 languages=self._active_locale,
446 )
448 async def _load_apt_cache(self) -> None:
449 if self.apt_cache.state in ("loading", "loaded"):
450 _info(
451 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering"
452 )
453 return
454 _info("Starting load of apt cache data")
455 start = time.time()
456 try:
457 await self.apt_cache.load()
458 except ValueError as ex:
459 _warn(f"Could not load apt cache: {ex}")
460 else:
461 end = time.time()
462 _info(
463 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}"
464 )
466 @property
467 def plugin_feature_set(self) -> PluginProvidedFeatureSet:
468 res = self._plugin_feature_set
469 if res is None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true
470 raise RuntimeError(
471 "Initialization error: The plugin feature set has not been initialized before it was needed."
472 )
473 return res
475 @plugin_feature_set.setter
476 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None:
477 if self._plugin_feature_set is not None: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 raise RuntimeError(
479 "The plugin_feature_set attribute cannot be changed once set"
480 )
481 self._plugin_feature_set = plugin_feature_set
483 @property
484 def dctrl_parser(self) -> DctrlParser:
485 res = self._dctrl_parser
486 if res is None: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 raise RuntimeError(
488 "Initialization error: The dctrl_parser has not been initialized before it was needed."
489 )
490 return res
492 @dctrl_parser.setter
493 def dctrl_parser(self, parser: DctrlParser) -> None:
494 if self._dctrl_parser is not None: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true
495 raise RuntimeError("The dctrl_parser attribute cannot be changed once set")
496 self._dctrl_parser = parser
498 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState:
499 dir_path = os.path.dirname(doc.path)
501 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true
502 dir_path = os.path.dirname(dir_path)
504 source_root = os.path.dirname(dir_path)
506 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser)
508 @property
509 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]:
510 try:
511 return (
512 self.client_capabilities.text_document.hover.content_format
513 ) # type : ignore
514 except AttributeError:
515 return None
517 def hover_markup_format(
518 self,
519 *options: types.MarkupKind,
520 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
521 ) -> types.MarkupKind:
522 """Pick the client preferred hover markup format from a set of options
524 :param options: The markup kinds possible.
525 :param fallback_kind: If no overlapping option was found in the client preferences
526 (or client did not announce a value at all), this parameter is returned instead.
527 :returns: The client's preferred markup format from the provided options, or,
528 (if there is no overlap), the `fallback_kind` value is returned.
529 """
530 client_preference = self._client_hover_markup_formats
531 return _preference(client_preference, frozenset(options), fallback_kind)
533 @property
534 def _client_completion_item_document_markup_formats(
535 self,
536 ) -> Optional[List[types.MarkupKind]]:
537 try:
538 return (
539 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore
540 )
541 except AttributeError:
542 return None
544 def completion_item_document_markup(
545 self,
546 *options: types.MarkupKind,
547 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText,
548 ) -> types.MarkupKind:
549 """Pick the client preferred completion item documentation markup format from a set of options
551 :param options: The markup kinds possible.
552 :param fallback_kind: If no overlapping option was found in the client preferences
553 (or client did not announce a value at all), this parameter is returned instead.
554 :returns: The client's preferred markup format from the provided options, or,
555 (if there is no overlap), the `fallback_kind` value is returned.
556 """
558 client_preference = self._client_completion_item_document_markup_formats
559 return _preference(client_preference, frozenset(options), fallback_kind)
561 @property
562 def trust_language_ids(self) -> bool:
563 v = self._trust_language_ids
564 if v is None:
565 return True
566 return v
568 @trust_language_ids.setter
569 def trust_language_ids(self, new_value: bool) -> None:
570 self._trust_language_ids = new_value
572 def determine_language_id(
573 self,
574 doc: "TextDocument",
575 ) -> Tuple[Literal["editor-provided", "filename"], str, str]:
576 lang_id = doc.language_id
577 path = doc.path
578 try:
579 last_idx = path.rindex("debian/")
580 except ValueError:
581 cleaned_filename = os.path.basename(path)
582 else:
583 cleaned_filename = path[last_idx:]
585 if self.trust_language_ids and lang_id and not lang_id.isspace():
586 if lang_id not in ("fundamental",):
587 return "editor-provided", lang_id, cleaned_filename
588 _info(
589 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)"
590 )
592 return "filename", cleaned_filename, cleaned_filename