Coverage for src/debputy/lsp/lsp_features.py: 65%
152 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import collections
2import dataclasses
3import inspect
4import sys
5from typing import (
6 Callable,
7 TypeVar,
8 Sequence,
9 Union,
10 Dict,
11 List,
12 Optional,
13 AsyncIterator,
14 Self,
15 Generic,
16 Protocol,
17 TYPE_CHECKING,
18)
20from debputy.commands.debputy_cmd.context import CommandContext
21from debputy.commands.debputy_cmd.output import _output_styling
22from debputy.lsp.lsp_self_check import LSP_CHECKS
24try:
25 from pygls.server import LanguageServer
26 from debputy.lsp.debputy_ls import DebputyLanguageServer
27except ImportError:
28 pass
30from debputy.linting.lint_util import AsyncLinterImpl
31from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls
32from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace
34if TYPE_CHECKING:
35 import lsprotocol.types as types
36else:
37 import debputy.lsprotocol.types as types
39C = TypeVar("C", bound=Callable)
41SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend(
42 token_types=[
43 types.SemanticTokenTypes.Keyword.value,
44 types.SemanticTokenTypes.EnumMember.value,
45 types.SemanticTokenTypes.Comment.value,
46 types.SemanticTokenTypes.String.value,
47 types.SemanticTokenTypes.Macro.value,
48 ],
49 token_modifiers=[],
50)
51SEMANTIC_TOKEN_TYPES_IDS = {
52 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types)
53}
55DiagnosticHandler = Callable[
56 [
57 "DebputyLanguageServer",
58 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"],
59 ],
60 AsyncIterator[Optional[List[types.Diagnostic]]],
61]
64@dataclasses.dataclass(slots=True)
65class LanguageDispatchTable(Generic[C]):
66 language_id: str
67 filename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict)
68 default_handler: Optional[C] = None
71class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]):
72 def __missing__(self, key: str) -> LanguageDispatchTable[C]:
73 r = LanguageDispatchTable(key)
74 self[key] = r
75 return r
78class DiagnosticHandlerProtocol(Protocol):
79 async def __call__( 79 ↛ exitline 79 didn't jump to the function exit
80 self,
81 ls: "DebputyLanguageServer",
82 params: Union[
83 types.DidOpenTextDocumentParams,
84 types.DidChangeTextDocumentParams,
85 types.DocumentDiagnosticParams,
86 ],
87 ) -> Union[List[types.Diagnostic], None]: ...
90DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = (
91 HandlerDispatchTable[DiagnosticHandlerProtocol]()
92)
93COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
94HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
95CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
96FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
97SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
98WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
99FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
100TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
101_ALIAS_OF = {}
104@dataclasses.dataclass(slots=True, frozen=True)
105class SecondaryLanguage:
106 language_id: str
107 filename_based_lookup: bool = False
110@dataclasses.dataclass(slots=True, frozen=True)
111class LanguageDispatchRule:
112 primary_language_id: str
113 filenames: Sequence[str]
114 secondary_language_ids: Sequence[SecondaryLanguage]
116 @classmethod
117 def new_rule(
118 cls,
119 primary_language_id: str,
120 filenames: Union[str, Sequence[str]],
121 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (),
122 ) -> Self:
123 return LanguageDispatchRule(
124 primary_language_id,
125 (filenames,) if isinstance(filenames, str) else tuple(filenames),
126 tuple(
127 SecondaryLanguage(l) if isinstance(l, str) else l
128 for l in secondary_language_ids
129 ),
130 )
133_STANDARD_HANDLERS = {
134 types.TEXT_DOCUMENT_FORMATTING: (
135 FORMAT_FILE_HANDLERS,
136 on_save_trim_end_of_line_whitespace,
137 ),
138 types.TEXT_DOCUMENT_CODE_ACTION: (
139 CODE_ACTION_HANDLERS,
140 provide_standard_quickfixes_from_diagnostics_ls,
141 ),
142 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: (
143 WILL_SAVE_WAIT_UNTIL_HANDLERS,
144 on_save_trim_end_of_line_whitespace,
145 ),
146}
149def lint_diagnostics(
150 file_format: LanguageDispatchRule,
151) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]:
153 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl:
154 if not inspect.iscoroutinefunction(func): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 raise ValueError("Linters are must be async")
157 async def _lint_wrapper(
158 ls: "DebputyLanguageServer",
159 params: Union[
160 types.DidOpenTextDocumentParams,
161 types.DidChangeTextDocumentParams,
162 types.DocumentDiagnosticParams,
163 ],
164 ) -> Optional[List[types.Diagnostic]]:
165 doc = ls.workspace.get_text_document(params.text_document.uri)
166 lint_state = ls.lint_state(doc)
167 return await lint_state.run_diagnostics(func)
169 _register_handler(file_format, DIAGNOSTIC_HANDLERS, _lint_wrapper)
171 return func
173 return _wrapper
176def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]:
177 return _registering_wrapper(file_format, COMPLETER_HANDLERS)
180def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]:
181 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS)
184def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]:
185 return _registering_wrapper(file_format, HOVER_HANDLERS)
188def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]:
189 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS)
192def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]:
193 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS)
196def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]:
197 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS)
200def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
201 return _registering_wrapper(file_format, FORMAT_FILE_HANDLERS)
204def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]:
205 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS)
208def lsp_standard_handler(
209 file_format: LanguageDispatchRule,
210 topic: str,
211) -> None:
212 res = _STANDARD_HANDLERS.get(topic)
213 if res is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 raise ValueError(f"No standard handler for {topic}")
216 table, handler = res
218 _register_handler(file_format, table, handler)
221def _registering_wrapper(
222 file_formats: LanguageDispatchRule,
223 handler_dict: HandlerDispatchTable[C],
224) -> Callable[[C], C]:
225 def _wrapper(func: C) -> C:
226 _register_handler(file_formats, handler_dict, func)
227 return func
229 return _wrapper
232def _register_handler(
233 file_format: LanguageDispatchRule,
234 handler_dict: HandlerDispatchTable[C],
235 handler: C,
236) -> None:
237 primary_table = handler_dict[file_format.primary_language_id]
238 filename_based_dispatch = handler_dict[""]
240 if primary_table.default_handler is not None:
241 raise AssertionError(
242 f"There is already a handler for language ID {file_format.primary_language_id}"
243 )
245 primary_table.default_handler = handler
246 for filename in file_format.filenames:
247 filename_based_handler = filename_based_dispatch.filename_based_lookups.get(
248 filename
249 )
250 if filename_based_handler is not None:
251 raise AssertionError(f"There is already a handler for filename {filename}")
252 filename_based_dispatch.filename_based_lookups[filename] = handler
254 for secondary_language in file_format.secondary_language_ids:
255 secondary_table = handler_dict[secondary_language.language_id]
256 if secondary_language.filename_based_lookup:
257 for filename in file_format.filenames:
258 secondary_handler = secondary_table.filename_based_lookups.get(filename)
259 if secondary_handler is not None:
260 raise AssertionError(
261 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}"
262 )
263 secondary_table.filename_based_lookups[filename] = handler
264 elif secondary_table.default_handler is not None:
265 raise AssertionError(
266 f"There is already a primary handler for language ID {secondary_language.language_id}"
267 )
268 else:
269 secondary_table.default_handler = handler
272def ensure_lsp_features_are_loaded() -> None:
273 # FIXME: This import is needed to force loading of the LSP files. But it only works
274 # for files with a linter (which currently happens to be all of them, but this is
275 # a bit fragile).
276 from debputy.linting.lint_impl import LINTER_FORMATS
278 assert LINTER_FORMATS
281def describe_lsp_features(context: CommandContext) -> None:
282 fo = _output_styling(context.parsed_args, sys.stdout)
283 ensure_lsp_features_are_loaded()
285 feature_list = [
286 ("diagnostics (lint)", DIAGNOSTIC_HANDLERS),
287 ("code actions/quickfixes", CODE_ACTION_HANDLERS),
288 ("completion suggestions", COMPLETER_HANDLERS),
289 ("hover docs", HOVER_HANDLERS),
290 ("folding ranges", FOLDING_RANGE_HANDLERS),
291 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS),
292 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS),
293 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS),
294 ("format file handler", FORMAT_FILE_HANDLERS),
295 ]
296 print("LSP language IDs and their features:")
297 all_ids = sorted(set(lid for _, t in feature_list for lid in t))
298 for lang_id in all_ids:
299 if lang_id in _ALIAS_OF:
300 continue
301 features = [n for n, t in feature_list if lang_id in t]
302 print(f" * {lang_id}:")
303 for feature in features:
304 print(f" - {feature}")
306 aliases = collections.defaultdict(list)
307 for lang_id in all_ids:
308 main_lang = _ALIAS_OF.get(lang_id)
309 if main_lang is None:
310 continue
311 aliases[main_lang].append(lang_id)
313 print()
314 print("Aliases:")
315 for main_id, aliases in aliases.items():
316 print(f" * {main_id}: {', '.join(aliases)}")
318 print()
319 print("General features:")
320 for self_check in LSP_CHECKS:
321 is_ok = self_check.test()
322 if is_ok:
323 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}")
324 else:
325 if self_check.is_mandatory:
326 disabled = fo.colored(
327 "missing",
328 fg="red",
329 bg="black",
330 style="bold",
331 )
332 else:
333 disabled = fo.colored(
334 "disabled",
335 fg="yellow",
336 bg="black",
337 style="bold",
338 )
340 if self_check.how_to_fix:
341 print(f" * {self_check.feature}: {disabled}")
342 print(f" - {self_check.how_to_fix}")
343 else:
344 problem_suffix = f" ({self_check.problem})"
345 print(f" * {self_check.feature}: {disabled}{problem_suffix}")