Coverage for src/debputy/lsp/lsp_features.py: 70%
187 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import collections
2import dataclasses
3import inspect
4import sys
5from typing import (
6 TypeVar,
7 Union,
8 Dict,
9 List,
10 Optional,
11 Self,
12 Generic,
13 Protocol,
14 TYPE_CHECKING,
15 Literal,
16)
17from collections.abc import Callable, Sequence, AsyncIterator
19from debputy.commands.debputy_cmd.context import CommandContext
20from debputy.commands.debputy_cmd.output import _output_styling
21from debputy.lsp.lsp_self_check import LSP_CHECKS
23try:
24 from pygls.server import LanguageServer
25 from debputy.lsp.debputy_ls import DebputyLanguageServer
26except ImportError:
27 pass
29from debputy.linting.lint_util import AsyncLinterImpl, LintState
30from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls
31from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace
33if TYPE_CHECKING:
34 import lsprotocol.types as types
36 Reformatter = Callable[[LintState], Optional[Sequence[types.TextEdit]]]
37else:
38 import debputy.lsprotocol.types as types
40C = TypeVar("C", bound=Callable)
42SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend(
43 token_types=[
44 types.SemanticTokenTypes.Keyword.value,
45 types.SemanticTokenTypes.EnumMember.value,
46 types.SemanticTokenTypes.Comment.value,
47 types.SemanticTokenTypes.String.value,
48 types.SemanticTokenTypes.Macro.value,
49 types.SemanticTokenTypes.Operator.value,
50 types.SemanticTokenTypes.TypeParameter.value,
51 types.SemanticTokenTypes.Variable.value,
52 ],
53 token_modifiers=[],
54)
55SEMANTIC_TOKEN_TYPES_IDS = {
56 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types)
57}
59DiagnosticHandler = Callable[
60 [
61 "DebputyLanguageServer",
62 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"],
63 ],
64 AsyncIterator[Optional[list[types.Diagnostic]]],
65]
68@dataclasses.dataclass(slots=True)
69class LanguageDispatchTable(Generic[C]):
70 language_id: str
71 basename_based_lookups: dict[str, C] = dataclasses.field(default_factory=dict)
72 path_name_based_lookups: dict[str, C] = dataclasses.field(default_factory=dict)
73 default_handler: C | None = None
76class HandlerDispatchTable(Generic[C], dict[str, LanguageDispatchTable[C]]):
77 def __missing__(self, key: str) -> LanguageDispatchTable[C]:
78 r = LanguageDispatchTable(key)
79 self[key] = r
80 return r
83class DiagnosticHandlerProtocol(Protocol):
84 async def __call__( 84 ↛ exitline 84 didn't return from function '__call__' because
85 self,
86 ls: "DebputyLanguageServer",
87 params: (
88 types.DidOpenTextDocumentParams
89 | types.DidChangeTextDocumentParams
90 | types.DocumentDiagnosticParams
91 ),
92 ) -> list[types.Diagnostic] | None: ...
95CLI_DIAGNOSTIC_HANDLERS: dict[str, AsyncLinterImpl] = {}
96CLI_FORMAT_FILE_HANDLERS: dict[str, "Reformatter"] = {}
99LSP_DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = (
100 HandlerDispatchTable[DiagnosticHandlerProtocol]()
101)
102COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
103HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
104CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
105FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
106SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
107WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
108LSP_FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
109TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
110DOCUMENT_LINK_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
111_ALIAS_OF = {}
114@dataclasses.dataclass(slots=True, frozen=True)
115class BasenameMatchingRule:
116 rule_type: Literal["basename", "extension"]
117 value: str
120@dataclasses.dataclass(slots=True, frozen=True)
121class SecondaryLanguage:
122 language_id: str
123 secondary_lookup: Literal["path-name", "basename"] | None = None
126@dataclasses.dataclass(slots=True, frozen=True)
127class LanguageDispatchRule:
128 primary_language_id: str
129 basename: str | None
130 path_names: Sequence[str]
131 secondary_language_ids: Sequence[SecondaryLanguage]
132 is_debsrc_packaging_file: bool
134 @classmethod
135 def new_rule(
136 cls,
137 primary_language_id: str,
138 basename: str | None,
139 path_names: str | Sequence[str],
140 secondary_language_ids: Sequence[SecondaryLanguage | str] = (),
141 ) -> Self:
142 path_names_as_seq: Sequence[str] = (
143 (path_names,) if isinstance(path_names, str) else tuple(path_names)
144 )
145 is_debsrc_packaging_file = any(
146 pn.startswith("debian/") for pn in path_names_as_seq
147 )
148 return LanguageDispatchRule(
149 primary_language_id,
150 basename,
151 path_names_as_seq,
152 tuple(
153 SecondaryLanguage(l) if isinstance(l, str) else l
154 for l in secondary_language_ids
155 ),
156 is_debsrc_packaging_file,
157 )
160_STANDARD_HANDLERS = {
161 types.TEXT_DOCUMENT_FORMATTING: (
162 LSP_FORMAT_FILE_HANDLERS,
163 on_save_trim_end_of_line_whitespace,
164 ),
165 types.TEXT_DOCUMENT_CODE_ACTION: (
166 CODE_ACTION_HANDLERS,
167 provide_standard_quickfixes_from_diagnostics_ls,
168 ),
169 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: (
170 WILL_SAVE_WAIT_UNTIL_HANDLERS,
171 on_save_trim_end_of_line_whitespace,
172 ),
173}
176def lint_diagnostics(
177 file_format: LanguageDispatchRule,
178) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]:
180 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl:
181 if not inspect.iscoroutinefunction(func): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 raise ValueError("Linters are must be async")
184 async def _lint_wrapper(
185 ls: "DebputyLanguageServer",
186 params: (
187 types.DidOpenTextDocumentParams
188 | types.DidChangeTextDocumentParams
189 | types.DocumentDiagnosticParams
190 ),
191 ) -> list[types.Diagnostic] | None:
192 doc = ls.workspace.get_text_document(params.text_document.uri)
193 lint_state = ls.lint_state(doc)
194 return await lint_state.run_diagnostics(func)
196 _register_handler(file_format, LSP_DIAGNOSTIC_HANDLERS, _lint_wrapper)
197 if file_format.is_debsrc_packaging_file:
198 for path_name in file_format.path_names:
199 if path_name.startswith("debian/"): 199 ↛ 198line 199 didn't jump to line 198 because the condition on line 199 was always true
200 CLI_DIAGNOSTIC_HANDLERS[path_name] = func
202 return func
204 return _wrapper
207def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]:
208 return _registering_wrapper(file_format, COMPLETER_HANDLERS)
211def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]:
212 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS)
215def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]:
216 return _registering_wrapper(file_format, HOVER_HANDLERS)
219def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]:
220 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS)
223def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]:
224 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS)
227def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]:
228 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS)
231def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
232 return _registering_wrapper(file_format, LSP_FORMAT_FILE_HANDLERS)
235def lsp_cli_reformat_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
236 def _wrapper(func: C) -> C:
237 for path_name in file_format.path_names:
238 if path_name.startswith("debian/"): 238 ↛ 237line 238 didn't jump to line 237 because the condition on line 238 was always true
239 CLI_FORMAT_FILE_HANDLERS[path_name] = func
240 return func
242 return _wrapper
245def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]:
246 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS)
249def lsp_document_link(file_format: LanguageDispatchRule) -> Callable[[C], C]:
250 return _registering_wrapper(file_format, DOCUMENT_LINK_HANDLERS)
253def lsp_standard_handler(
254 file_format: LanguageDispatchRule,
255 topic: str,
256) -> None:
257 res = _STANDARD_HANDLERS.get(topic)
258 if res is None: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 raise ValueError(f"No standard handler for {topic}")
261 table, handler = res
263 _register_handler(file_format, table, handler)
266def _registering_wrapper(
267 file_formats: LanguageDispatchRule,
268 handler_dict: HandlerDispatchTable[C],
269) -> Callable[[C], C]:
270 def _wrapper(func: C) -> C:
271 _register_handler(file_formats, handler_dict, func)
272 return func
274 return _wrapper
277def _register_handler(
278 file_format: LanguageDispatchRule,
279 handler_dict: HandlerDispatchTable[C],
280 handler: C,
281) -> None:
282 primary_table = handler_dict[file_format.primary_language_id]
283 filename_based_dispatch = handler_dict[""]
285 if primary_table.default_handler is not None:
286 raise AssertionError(
287 f"There is already a handler for language ID {file_format.primary_language_id}"
288 )
290 primary_table.default_handler = handler
291 for filename in file_format.path_names:
292 filename_based_handler = filename_based_dispatch.path_name_based_lookups.get(
293 filename
294 )
295 if filename_based_handler is not None:
296 raise AssertionError(f"There is already a handler for filename {filename}")
297 filename_based_dispatch.path_name_based_lookups[filename] = handler
299 for secondary_language in file_format.secondary_language_ids:
300 secondary_table = handler_dict[secondary_language.language_id]
301 if secondary_language.secondary_lookup == "path-name":
302 if not file_format.path_names:
303 raise AssertionError(
304 f"secondary_lookup=path-name requires the language to have path-names. Please correct definition of {file_format.primary_language_id}"
305 )
306 for filename in file_format.path_names:
307 secondary_handler = secondary_table.path_name_based_lookups.get(
308 filename
309 )
310 if secondary_handler is not None:
311 raise AssertionError(
312 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}"
313 )
314 secondary_table.path_name_based_lookups[filename] = handler
315 elif secondary_language.secondary_lookup == "basename":
316 basename = file_format.basename
317 if not basename:
318 raise AssertionError(
319 f"secondary_lookup=basename requires the language to have a basename. Please correct definition of {file_format.primary_language_id}"
320 )
321 secondary_handler = secondary_table.basename_based_lookups.get(basename)
322 if secondary_handler is not None:
323 raise AssertionError(
324 f"There is already a handler for basename {basename} under language ID {secondary_language.language_id}"
325 )
326 secondary_table.basename_based_lookups[basename] = handler
327 elif secondary_table.default_handler is not None:
328 raise AssertionError(
329 f"There is already a primary handler for language ID {secondary_language.language_id}"
330 )
331 else:
332 secondary_table.default_handler = handler
335def ensure_cli_lsp_features_are_loaded() -> None:
336 # These imports are needed to force loading of the LSP files. The relevant registration
337 # happens as a side effect of the imports.
338 import debputy.lsp.languages as lsp_languages
339 from debputy.linting.lint_impl import LINTER_FORMATS
341 # Ensure no static analysis tool is temped to optimize out the imports. We need them
342 # for the side effect.
343 assert lsp_languages
344 assert LINTER_FORMATS
347def describe_lsp_features(context: CommandContext) -> None:
348 fo = _output_styling(context.parsed_args, sys.stdout)
349 ensure_cli_lsp_features_are_loaded()
351 feature_list = [
352 ("diagnostics (lint)", LSP_DIAGNOSTIC_HANDLERS),
353 ("code actions/quickfixes", CODE_ACTION_HANDLERS),
354 ("completion suggestions", COMPLETER_HANDLERS),
355 ("hover docs", HOVER_HANDLERS),
356 ("folding ranges", FOLDING_RANGE_HANDLERS),
357 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS),
358 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS),
359 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS),
360 ("format file handler", LSP_FORMAT_FILE_HANDLERS),
361 ("document link handler", DOCUMENT_LINK_HANDLERS),
362 ]
363 print("LSP language IDs and their features:")
364 all_ids = sorted({lid for _, t in feature_list for lid in t})
365 for lang_id in all_ids:
366 if lang_id in _ALIAS_OF:
367 continue
368 features = [n for n, t in feature_list if lang_id in t]
369 print(f" * {lang_id}:")
370 for feature in features:
371 print(f" - {feature}")
373 aliases = collections.defaultdict(list)
374 for lang_id in all_ids:
375 main_lang = _ALIAS_OF.get(lang_id)
376 if main_lang is None:
377 continue
378 aliases[main_lang].append(lang_id)
380 print()
381 print("Aliases:")
382 for main_id, aliases in aliases.items():
383 print(f" * {main_id}: {', '.join(aliases)}")
385 print()
386 print("General features:")
387 for self_check in LSP_CHECKS:
388 is_ok = self_check.test()
389 if is_ok:
390 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}")
391 else:
392 if self_check.is_mandatory:
393 disabled = fo.colored(
394 "missing",
395 fg="red",
396 bg="black",
397 style="bold",
398 )
399 else:
400 disabled = fo.colored(
401 "disabled",
402 fg="yellow",
403 bg="black",
404 style="bold",
405 )
407 if self_check.how_to_fix:
408 print(f" * {self_check.feature}: {disabled}")
409 print(f" - {self_check.how_to_fix}")
410 else:
411 problem_suffix = f" ({self_check.problem})"
412 print(f" * {self_check.feature}: {disabled}{problem_suffix}")