Coverage for src/debputy/lsp/lsp_features.py: 69%
183 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import collections
2import dataclasses
3import inspect
4import sys
5from typing import (
6 Callable,
7 TypeVar,
8 Sequence,
9 Union,
10 Dict,
11 List,
12 Optional,
13 AsyncIterator,
14 Self,
15 Generic,
16 Protocol,
17 TYPE_CHECKING,
18 Literal,
19)
21from debputy.commands.debputy_cmd.context import CommandContext
22from debputy.commands.debputy_cmd.output import _output_styling
23from debputy.lsp.lsp_self_check import LSP_CHECKS
25try:
26 from pygls.server import LanguageServer
27 from debputy.lsp.debputy_ls import DebputyLanguageServer
28except ImportError:
29 pass
31from debputy.linting.lint_util import AsyncLinterImpl, LintState
32from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls
33from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace
35if TYPE_CHECKING:
36 import lsprotocol.types as types
38 Reformatter = Callable[[LintState], Optional[Sequence[types.TextEdit]]]
39else:
40 import debputy.lsprotocol.types as types
42C = TypeVar("C", bound=Callable)
44SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend(
45 token_types=[
46 types.SemanticTokenTypes.Keyword.value,
47 types.SemanticTokenTypes.EnumMember.value,
48 types.SemanticTokenTypes.Comment.value,
49 types.SemanticTokenTypes.String.value,
50 types.SemanticTokenTypes.Macro.value,
51 types.SemanticTokenTypes.Operator.value,
52 types.SemanticTokenTypes.TypeParameter.value,
53 types.SemanticTokenTypes.Variable.value,
54 ],
55 token_modifiers=[],
56)
57SEMANTIC_TOKEN_TYPES_IDS = {
58 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types)
59}
61DiagnosticHandler = Callable[
62 [
63 "DebputyLanguageServer",
64 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"],
65 ],
66 AsyncIterator[Optional[List[types.Diagnostic]]],
67]
70@dataclasses.dataclass(slots=True)
71class LanguageDispatchTable(Generic[C]):
72 language_id: str
73 basename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict)
74 path_name_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict)
75 default_handler: Optional[C] = None
78class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]):
79 def __missing__(self, key: str) -> LanguageDispatchTable[C]:
80 r = LanguageDispatchTable(key)
81 self[key] = r
82 return r
85class DiagnosticHandlerProtocol(Protocol):
86 async def __call__( 86 ↛ exitline 86 didn't return from function '__call__' because
87 self,
88 ls: "DebputyLanguageServer",
89 params: Union[
90 types.DidOpenTextDocumentParams,
91 types.DidChangeTextDocumentParams,
92 types.DocumentDiagnosticParams,
93 ],
94 ) -> Union[List[types.Diagnostic], None]: ...
97CLI_DIAGNOSTIC_HANDLERS: Dict[str, AsyncLinterImpl] = {}
98CLI_FORMAT_FILE_HANDLERS: Dict[str, "Reformatter"] = {}
101LSP_DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = (
102 HandlerDispatchTable[DiagnosticHandlerProtocol]()
103)
104COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
105HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
106CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
107FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
108SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
109WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
110LSP_FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
111TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable()
112_ALIAS_OF = {}
115@dataclasses.dataclass(slots=True, frozen=True)
116class BasenameMatchingRule:
117 rule_type: Literal["basename", "extension"]
118 value: str
121@dataclasses.dataclass(slots=True, frozen=True)
122class SecondaryLanguage:
123 language_id: str
124 secondary_lookup: Optional[Literal["path-name", "basename"]] = None
127@dataclasses.dataclass(slots=True, frozen=True)
128class LanguageDispatchRule:
129 primary_language_id: str
130 basename: Optional[str]
131 path_names: Sequence[str]
132 secondary_language_ids: Sequence[SecondaryLanguage]
133 is_debsrc_packaging_file: bool
135 @classmethod
136 def new_rule(
137 cls,
138 primary_language_id: str,
139 basename: Optional[str],
140 path_names: Union[str, Sequence[str]],
141 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (),
142 ) -> Self:
143 path_names_as_seq: Sequence[str] = (
144 (path_names,) if isinstance(path_names, str) else tuple(path_names)
145 )
146 is_debsrc_packaging_file = any(
147 pn.startswith("debian/") for pn in path_names_as_seq
148 )
149 return LanguageDispatchRule(
150 primary_language_id,
151 basename,
152 path_names_as_seq,
153 tuple(
154 SecondaryLanguage(l) if isinstance(l, str) else l
155 for l in secondary_language_ids
156 ),
157 is_debsrc_packaging_file,
158 )
161_STANDARD_HANDLERS = {
162 types.TEXT_DOCUMENT_FORMATTING: (
163 LSP_FORMAT_FILE_HANDLERS,
164 on_save_trim_end_of_line_whitespace,
165 ),
166 types.TEXT_DOCUMENT_CODE_ACTION: (
167 CODE_ACTION_HANDLERS,
168 provide_standard_quickfixes_from_diagnostics_ls,
169 ),
170 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: (
171 WILL_SAVE_WAIT_UNTIL_HANDLERS,
172 on_save_trim_end_of_line_whitespace,
173 ),
174}
177def lint_diagnostics(
178 file_format: LanguageDispatchRule,
179) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]:
181 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl:
182 if not inspect.iscoroutinefunction(func): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 raise ValueError("Linters are must be async")
185 async def _lint_wrapper(
186 ls: "DebputyLanguageServer",
187 params: Union[
188 types.DidOpenTextDocumentParams,
189 types.DidChangeTextDocumentParams,
190 types.DocumentDiagnosticParams,
191 ],
192 ) -> Optional[List[types.Diagnostic]]:
193 doc = ls.workspace.get_text_document(params.text_document.uri)
194 lint_state = ls.lint_state(doc)
195 return await lint_state.run_diagnostics(func)
197 _register_handler(file_format, LSP_DIAGNOSTIC_HANDLERS, _lint_wrapper)
198 if file_format.is_debsrc_packaging_file:
199 for path_name in file_format.path_names:
200 if path_name.startswith("debian/"): 200 ↛ 199line 200 didn't jump to line 199 because the condition on line 200 was always true
201 CLI_DIAGNOSTIC_HANDLERS[path_name] = func
203 return func
205 return _wrapper
208def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]:
209 return _registering_wrapper(file_format, COMPLETER_HANDLERS)
212def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]:
213 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS)
216def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]:
217 return _registering_wrapper(file_format, HOVER_HANDLERS)
220def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]:
221 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS)
224def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]:
225 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS)
228def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]:
229 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS)
232def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
233 return _registering_wrapper(file_format, LSP_FORMAT_FILE_HANDLERS)
236def lsp_cli_reformat_document(file_format: LanguageDispatchRule) -> Callable[[C], C]:
237 def _wrapper(func: C) -> C:
238 for path_name in file_format.path_names:
239 if path_name.startswith("debian/"): 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true
240 CLI_FORMAT_FILE_HANDLERS[path_name] = func
241 return func
243 return _wrapper
246def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]:
247 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS)
250def lsp_standard_handler(
251 file_format: LanguageDispatchRule,
252 topic: str,
253) -> None:
254 res = _STANDARD_HANDLERS.get(topic)
255 if res is None: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 raise ValueError(f"No standard handler for {topic}")
258 table, handler = res
260 _register_handler(file_format, table, handler)
263def _registering_wrapper(
264 file_formats: LanguageDispatchRule,
265 handler_dict: HandlerDispatchTable[C],
266) -> Callable[[C], C]:
267 def _wrapper(func: C) -> C:
268 _register_handler(file_formats, handler_dict, func)
269 return func
271 return _wrapper
274def _register_handler(
275 file_format: LanguageDispatchRule,
276 handler_dict: HandlerDispatchTable[C],
277 handler: C,
278) -> None:
279 primary_table = handler_dict[file_format.primary_language_id]
280 filename_based_dispatch = handler_dict[""]
282 if primary_table.default_handler is not None:
283 raise AssertionError(
284 f"There is already a handler for language ID {file_format.primary_language_id}"
285 )
287 primary_table.default_handler = handler
288 for filename in file_format.path_names:
289 filename_based_handler = filename_based_dispatch.path_name_based_lookups.get(
290 filename
291 )
292 if filename_based_handler is not None:
293 raise AssertionError(f"There is already a handler for filename {filename}")
294 filename_based_dispatch.path_name_based_lookups[filename] = handler
296 for secondary_language in file_format.secondary_language_ids:
297 secondary_table = handler_dict[secondary_language.language_id]
298 if secondary_language.secondary_lookup == "path-name":
299 if not file_format.path_names:
300 raise AssertionError(
301 f"secondary_lookup=path-name requires the language to have path-names. Please correct definition of {file_format.primary_language_id}"
302 )
303 for filename in file_format.path_names:
304 secondary_handler = secondary_table.path_name_based_lookups.get(
305 filename
306 )
307 if secondary_handler is not None:
308 raise AssertionError(
309 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}"
310 )
311 secondary_table.path_name_based_lookups[filename] = handler
312 elif secondary_language.secondary_lookup == "basename":
313 basename = file_format.basename
314 if not basename:
315 raise AssertionError(
316 f"secondary_lookup=basename requires the language to have a basename. Please correct definition of {file_format.primary_language_id}"
317 )
318 secondary_handler = secondary_table.basename_based_lookups.get(basename)
319 if secondary_handler is not None:
320 raise AssertionError(
321 f"There is already a handler for basename {basename} under language ID {secondary_language.language_id}"
322 )
323 secondary_table.basename_based_lookups[basename] = handler
324 elif secondary_table.default_handler is not None:
325 raise AssertionError(
326 f"There is already a primary handler for language ID {secondary_language.language_id}"
327 )
328 else:
329 secondary_table.default_handler = handler
332def ensure_cli_lsp_features_are_loaded() -> None:
333 # These imports are needed to force loading of the LSP files. The relevant registration
334 # happens as a side effect of the imports.
335 import debputy.lsp.languages as lsp_languages
336 from debputy.linting.lint_impl import LINTER_FORMATS
338 # Ensure no static analysis tool is temped to optimize out the imports. We need them
339 # for the side effect.
340 assert lsp_languages
341 assert LINTER_FORMATS
344def describe_lsp_features(context: CommandContext) -> None:
345 fo = _output_styling(context.parsed_args, sys.stdout)
346 ensure_cli_lsp_features_are_loaded()
348 feature_list = [
349 ("diagnostics (lint)", LSP_DIAGNOSTIC_HANDLERS),
350 ("code actions/quickfixes", CODE_ACTION_HANDLERS),
351 ("completion suggestions", COMPLETER_HANDLERS),
352 ("hover docs", HOVER_HANDLERS),
353 ("folding ranges", FOLDING_RANGE_HANDLERS),
354 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS),
355 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS),
356 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS),
357 ("format file handler", LSP_FORMAT_FILE_HANDLERS),
358 ]
359 print("LSP language IDs and their features:")
360 all_ids = sorted(set(lid for _, t in feature_list for lid in t))
361 for lang_id in all_ids:
362 if lang_id in _ALIAS_OF:
363 continue
364 features = [n for n, t in feature_list if lang_id in t]
365 print(f" * {lang_id}:")
366 for feature in features:
367 print(f" - {feature}")
369 aliases = collections.defaultdict(list)
370 for lang_id in all_ids:
371 main_lang = _ALIAS_OF.get(lang_id)
372 if main_lang is None:
373 continue
374 aliases[main_lang].append(lang_id)
376 print()
377 print("Aliases:")
378 for main_id, aliases in aliases.items():
379 print(f" * {main_id}: {', '.join(aliases)}")
381 print()
382 print("General features:")
383 for self_check in LSP_CHECKS:
384 is_ok = self_check.test()
385 if is_ok:
386 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}")
387 else:
388 if self_check.is_mandatory:
389 disabled = fo.colored(
390 "missing",
391 fg="red",
392 bg="black",
393 style="bold",
394 )
395 else:
396 disabled = fo.colored(
397 "disabled",
398 fg="yellow",
399 bg="black",
400 style="bold",
401 )
403 if self_check.how_to_fix:
404 print(f" * {self_check.feature}: {disabled}")
405 print(f" - {self_check.how_to_fix}")
406 else:
407 problem_suffix = f" ({self_check.problem})"
408 print(f" * {self_check.feature}: {disabled}{problem_suffix}")