Coverage for src/debputy/lsp/lsp_features.py: 65%
153 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import collections
2import dataclasses
3import inspect
4import sys
5from typing import (
6 Callable,
7 TypeVar,
8 Sequence,
9 Union,
10 Dict,
11 List,
12 Optional,
13 AsyncIterator,
14 Self,
15 Generic,
16 Protocol,
17 Iterable,
18 TYPE_CHECKING,
19 Tuple,
20)
22from debputy.commands.debputy_cmd.context import CommandContext
23from debputy.commands.debputy_cmd.output import _output_styling
24from debputy.lsp.lsp_self_check import LSP_CHECKS
26try:
27 from pygls.server import LanguageServer
28 from debputy.lsp.debputy_ls import DebputyLanguageServer
29except ImportError:
30 pass
32from debputy.linting.lint_util import LinterImpl
33from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics
34from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace
36if TYPE_CHECKING:
37 import lsprotocol.types as types
38else:
39 import debputy.lsprotocol.types as types
41C = TypeVar("C", bound=Callable)
43SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend(
44 token_types=[
45 types.SemanticTokenTypes.Keyword.value,
46 types.SemanticTokenTypes.EnumMember.value,
47 types.SemanticTokenTypes.Comment.value,
48 types.SemanticTokenTypes.String.value,
49 ],
50 token_modifiers=[],
51)
52SEMANTIC_TOKEN_TYPES_IDS = {
53 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types)
54}
56DiagnosticHandler = Callable[
57 [
58 "DebputyLanguageServer",
59 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"],
60 ],
61 AsyncIterator[Optional[List[types.Diagnostic]]],
62]
65@dataclasses.dataclass(slots=True)
66class LanguageDispatchTable(Generic[C]):
67 language_id: str
68 filename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict)
69 default_handler: Optional[C] = None
72class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]):
73 def __missing__(self, key: str) -> LanguageDispatchTable[C]:
74 r = LanguageDispatchTable(key)
75 self[key] = r
76 return r
79class DiagnosticHandlerProtocol(Protocol):
80 async def __call__( 80 ↛ exitline 80 didn't jump to the function exit
81 self,
82 ls: "DebputyLanguageServer",
83 params: Union[
84 types.DidOpenTextDocumentParams,
85 types.DidChangeTextDocumentParams,
86 types.DocumentDiagnosticParams,
87 ],
88 ) -> Iterable[Union[List[types.Diagnostic], None]]: ...
91DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = (
92 HandlerDispatchTable[DiagnosticHandlerProtocol]()
93)
94COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
95HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
96CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
97FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
98SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
99WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
100FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
101TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
102_ALIAS_OF = {}
105@dataclasses.dataclass(slots=True, frozen=True)
106class SecondaryLanguage:
107 language_id: str
108 filename_based_lookup: bool = False
111@dataclasses.dataclass(slots=True, frozen=True)
112class LanguageDispatchRule:
113 primary_language_id: str
114 filenames: Sequence[str]
115 secondary_language_ids: Sequence[SecondaryLanguage]
117 @classmethod
118 def new_rule(
119 cls,
120 primary_language_id: str,
121 filenames: Union[str, Sequence[str]],
122 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (),
123 ) -> Self:
124 return LanguageDispatchRule(
125 primary_language_id,
126 (filenames,) if isinstance(filenames, str) else tuple(filenames),
127 tuple(
128 SecondaryLanguage(l) if isinstance(l, str) else l
129 for l in secondary_language_ids
130 ),
131 )
134_STANDARD_HANDLERS = { 134 ↛ exitline 134 didn't jump to the function exit
135 types.TEXT_DOCUMENT_FORMATTING: (
136 FORMAT_FILE_HANDLERS,
137 on_save_trim_end_of_line_whitespace,
138 ),
139 types.TEXT_DOCUMENT_CODE_ACTION: (
140 CODE_ACTION_HANDLERS,
141 lambda ls, params: provide_standard_quickfixes_from_diagnostics(params),
142 ),
143 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: (
144 WILL_SAVE_WAIT_UNTIL_HANDLERS,
145 on_save_trim_end_of_line_whitespace,
146 ),
147}
149LI = TypeVar("LI", DiagnosticHandlerProtocol, LinterImpl)
152def lint_diagnostics(
153 file_format: LanguageDispatchRule,
154) -> Callable[[LI], LI]:
156 def _wrapper(func: LI) -> LI:
157 if not inspect.iscoroutinefunction(func): 157 ↛ 172line 157 didn't jump to line 172 because the condition on line 157 was always true
159 async def _lint_wrapper(
160 ls: "DebputyLanguageServer",
161 params: Union[
162 types.DidOpenTextDocumentParams,
163 types.DidChangeTextDocumentParams,
164 types.DocumentDiagnosticParams,
165 ],
166 ) -> Optional[List[types.Diagnostic]]:
167 doc = ls.workspace.get_text_document(params.text_document.uri)
168 lint_state = ls.lint_state(doc)
169 yield lint_state.run_diagnostics(func)
171 else:
172 raise ValueError("Linters are all non-async at the moment")
174 _register_handler(file_format, DIAGNOSTIC_HANDLERS, _lint_wrapper)
176 return func
178 return _wrapper
181def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]:
182 return _registering_wrapper(file_format, COMPLETER_HANDLERS)
185def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]:
186 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS)
189def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]:
190 return _registering_wrapper(file_format, HOVER_HANDLERS)
193def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]:
194 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS)
197def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]:
198 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS)
201def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]:
202 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS)
205def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
206 return _registering_wrapper(file_format, FORMAT_FILE_HANDLERS)
209def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]:
210 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS)
213def lsp_standard_handler(
214 file_format: LanguageDispatchRule,
215 topic: str,
216) -> None:
217 res = _STANDARD_HANDLERS.get(topic)
218 if res is None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 raise ValueError(f"No standard handler for {topic}")
221 table, handler = res
223 _register_handler(file_format, table, handler)
226def _registering_wrapper(
227 file_formats: LanguageDispatchRule,
228 handler_dict: HandlerDispatchTable[C],
229) -> Callable[[C], C]:
230 def _wrapper(func: C) -> C:
231 _register_handler(file_formats, handler_dict, func)
232 return func
234 return _wrapper
237def _register_handler(
238 file_format: LanguageDispatchRule,
239 handler_dict: HandlerDispatchTable[C],
240 handler: C,
241) -> None:
242 primary_table = handler_dict[file_format.primary_language_id]
243 filename_based_dispatch = handler_dict[""]
245 if primary_table.default_handler is not None:
246 raise AssertionError(
247 f"There is already a handler for language ID {file_format.primary_language_id}"
248 )
250 primary_table.default_handler = handler
251 for filename in file_format.filenames:
252 filename_based_handler = filename_based_dispatch.filename_based_lookups.get(
253 filename
254 )
255 if filename_based_handler is not None:
256 raise AssertionError(f"There is already a handler for filename {filename}")
257 filename_based_dispatch.filename_based_lookups[filename] = handler
259 for secondary_language in file_format.secondary_language_ids:
260 secondary_table = handler_dict[secondary_language.language_id]
261 if secondary_language.filename_based_lookup:
262 for filename in file_format.filenames:
263 secondary_handler = secondary_table.filename_based_lookups.get(filename)
264 if secondary_handler is not None:
265 raise AssertionError(
266 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}"
267 )
268 secondary_table.filename_based_lookups[filename] = handler
269 elif secondary_table.default_handler is not None:
270 raise AssertionError(
271 f"There is already a primary handler for language ID {secondary_language.language_id}"
272 )
273 else:
274 secondary_table.default_handler = handler
277def ensure_lsp_features_are_loaded() -> None:
278 # FIXME: This import is needed to force loading of the LSP files. But it only works
279 # for files with a linter (which currently happens to be all of them, but this is
280 # a bit fragile).
281 from debputy.linting.lint_impl import LINTER_FORMATS
283 assert LINTER_FORMATS
286def describe_lsp_features(context: CommandContext) -> None:
287 fo = _output_styling(context.parsed_args, sys.stdout)
288 ensure_lsp_features_are_loaded()
290 feature_list = [
291 ("diagnostics (lint)", DIAGNOSTIC_HANDLERS),
292 ("code actions/quickfixes", CODE_ACTION_HANDLERS),
293 ("completion suggestions", COMPLETER_HANDLERS),
294 ("hover docs", HOVER_HANDLERS),
295 ("folding ranges", FOLDING_RANGE_HANDLERS),
296 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS),
297 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS),
298 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS),
299 ("format file handler", FORMAT_FILE_HANDLERS),
300 ]
301 print("LSP language IDs and their features:")
302 all_ids = sorted(set(lid for _, t in feature_list for lid in t))
303 for lang_id in all_ids:
304 if lang_id in _ALIAS_OF:
305 continue
306 features = [n for n, t in feature_list if lang_id in t]
307 print(f" * {lang_id}:")
308 for feature in features:
309 print(f" - {feature}")
311 aliases = collections.defaultdict(list)
312 for lang_id in all_ids:
313 main_lang = _ALIAS_OF.get(lang_id)
314 if main_lang is None:
315 continue
316 aliases[main_lang].append(lang_id)
318 print()
319 print("Aliases:")
320 for main_id, aliases in aliases.items():
321 print(f" * {main_id}: {', '.join(aliases)}")
323 print()
324 print("General features:")
325 for self_check in LSP_CHECKS:
326 is_ok = self_check.test()
327 if is_ok:
328 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}")
329 else:
330 if self_check.is_mandatory:
331 disabled = fo.colored(
332 "missing",
333 fg="red",
334 bg="black",
335 style="bold",
336 )
337 else:
338 disabled = fo.colored(
339 "disabled",
340 fg="yellow",
341 bg="black",
342 style="bold",
343 )
345 if self_check.how_to_fix:
346 print(f" * {self_check.feature}: {disabled}")
347 print(f" - {self_check.how_to_fix}")
348 else:
349 problem_suffix = f" ({self_check.problem})"
350 print(f" * {self_check.feature}: {disabled}{problem_suffix}")