Coverage for src/debputy/lsp/lsp_dispatch.py: 44%
100 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import inspect
2import os.path
3from typing import (
4 Sequence,
5 Union,
6 Optional,
7 TypeVar,
8 List,
9 TYPE_CHECKING,
10 Callable,
11 Literal,
12)
14from debputy import __version__
15from debputy.linting.lint_util import AbortTaskError
16from debputy.lsp.lsp_features import (
17 LSP_DIAGNOSTIC_HANDLERS,
18 COMPLETER_HANDLERS,
19 HOVER_HANDLERS,
20 SEMANTIC_TOKENS_FULL_HANDLERS,
21 CODE_ACTION_HANDLERS,
22 SEMANTIC_TOKENS_LEGEND,
23 WILL_SAVE_WAIT_UNTIL_HANDLERS,
24 LSP_FORMAT_FILE_HANDLERS,
25 C,
26 TEXT_DOC_INLAY_HANDLERS,
27 HandlerDispatchTable,
28)
29from debputy.util import _info, _trace_log
31if TYPE_CHECKING:
32 import lsprotocol.types as types
34 try:
35 from pygls.server import LanguageServer
36 from pygls.workspace import TextDocument
37 except ImportError:
38 pass
40 from debputy.lsp.debputy_ls import DebputyLanguageServer
42 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
43else:
44 import debputy.lsprotocol.types as types
46 try:
47 from pygls.server import LanguageServer
48 from pygls.workspace import TextDocument
49 from debputy.lsp.debputy_ls import DebputyLanguageServer
51 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
52 except ImportError:
54 class Mock:
56 def feature(self, *args, **kwargs):
57 return lambda x: x
59 DEBPUTY_LANGUAGE_SERVER = Mock()
62P = TypeVar("P")
63R = TypeVar("R")
64L = TypeVar("L", "LanguageServer", "DebputyLanguageServer")
67@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE)
68async def _on_initialize(
69 ls: "DebputyLanguageServer",
70 params: types.InitializeParams,
71) -> None:
72 await ls.on_initialize(params)
75@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN)
76async def _open_document(
77 ls: "DebputyLanguageServer",
78 params: types.DidChangeTextDocumentParams,
79) -> None:
80 await _open_or_changed_document(ls, params, "Opened")
83@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE)
84async def _changed_document(
85 ls: "DebputyLanguageServer",
86 params: types.DidChangeTextDocumentParams,
87) -> None:
88 await _open_or_changed_document(ls, params, "Changed")
91@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE)
92def _close_document(
93 ls: "DebputyLanguageServer",
94 params: types.DidCloseTextDocumentParams,
95) -> None:
96 ls.close_document(params.text_document.uri)
99async def _open_or_changed_document(
100 ls: "DebputyLanguageServer",
101 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams],
102 event_name: Literal["Opened", "Changed"],
103) -> None:
104 doc_uri = params.text_document.uri
105 doc = ls.workspace.get_text_document(doc_uri)
107 await _diagnostics_for(
108 ls,
109 doc,
110 params.text_document.version,
111 params,
112 event_name,
113 )
116async def _diagnostics_for(
117 ls: "DebputyLanguageServer",
118 doc: "TextDocument",
119 expected_version: int,
120 params: Union[
121 types.DidOpenTextDocumentParams,
122 types.DidChangeTextDocumentParams,
123 types.DocumentDiagnosticParams,
124 ],
125 event_name: Literal["Opened", "Changed"],
126) -> None:
127 doc_uri = doc.uri
128 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
129 log_func = _info if event_name == "Opened" else _trace_log
130 handler = _resolve_handler(
131 LSP_DIAGNOSTIC_HANDLERS,
132 language_id,
133 normalized_filename,
134 log_func,
135 )
136 if handler is None:
137 log_func(
138 f"{event_name} document: {doc.path} ({language_id}, {id_source},"
139 f" normalized filename: {normalized_filename}) - no diagnostics handler"
140 )
141 return
142 log_func(
143 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})"
144 f" - running diagnostics for doc version {expected_version}"
145 )
147 try:
148 diagnostics = await handler(ls, params)
149 except AbortTaskError as e:
150 _trace_log(f"Aborted lint task: {e.message}")
151 return
153 ls.record_diagnostics(doc_uri, expected_version, diagnostics or [], False)
156@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION)
157async def _completions(
158 ls: "DebputyLanguageServer",
159 params: types.CompletionParams,
160) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]:
161 return await _dispatch_standard_handler(
162 ls,
163 params.text_document.uri,
164 params,
165 COMPLETER_HANDLERS,
166 "Complete request",
167 )
170@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER)
171async def _hover(
172 ls: "DebputyLanguageServer",
173 params: types.CompletionParams,
174) -> Optional[types.Hover]:
175 return await _dispatch_standard_handler(
176 ls,
177 params.text_document.uri,
178 params,
179 HOVER_HANDLERS,
180 "Hover doc request",
181 )
184@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT)
185async def _doc_inlay_hint(
186 ls: "DebputyLanguageServer",
187 params: types.InlayHintParams,
188) -> Optional[List[types.InlayHint]]:
189 return await _dispatch_standard_handler(
190 ls,
191 params.text_document.uri,
192 params,
193 TEXT_DOC_INLAY_HANDLERS,
194 "Inlay hint (doc) request",
195 )
198@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION)
199async def _code_actions(
200 ls: "DebputyLanguageServer",
201 params: types.CodeActionParams,
202) -> Optional[List[Union[types.Command, types.CodeAction]]]:
203 return await _dispatch_standard_handler(
204 ls,
205 params.text_document.uri,
206 params,
207 CODE_ACTION_HANDLERS,
208 "Code action request",
209 )
212@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE)
213async def _folding_ranges(
214 ls: "DebputyLanguageServer",
215 params: types.FoldingRangeParams,
216) -> Optional[Sequence[types.FoldingRange]]:
217 return await _dispatch_standard_handler(
218 ls,
219 params.text_document.uri,
220 params,
221 HOVER_HANDLERS,
222 "Folding range request",
223 )
226@DEBPUTY_LANGUAGE_SERVER.feature(
227 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
228 types.SemanticTokensRegistrationOptions(
229 SEMANTIC_TOKENS_LEGEND,
230 full=True,
231 ),
232)
233async def semantic_tokens_full(
234 ls: "DebputyLanguageServer",
235 params: types.SemanticTokensParams,
236) -> Optional[types.SemanticTokens]:
237 return await _dispatch_standard_handler(
238 ls,
239 params.text_document.uri,
240 params,
241 SEMANTIC_TOKENS_FULL_HANDLERS,
242 "Semantic tokens request",
243 )
246@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL)
247async def _will_save_wait_until(
248 ls: "DebputyLanguageServer",
249 params: types.WillSaveTextDocumentParams,
250) -> Optional[Sequence[types.TextEdit]]:
251 return await _dispatch_standard_handler(
252 ls,
253 params.text_document.uri,
254 params,
255 WILL_SAVE_WAIT_UNTIL_HANDLERS,
256 "On-save formatting",
257 )
260@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING)
261async def _format_document(
262 ls: "DebputyLanguageServer",
263 params: types.WillSaveTextDocumentParams,
264) -> Optional[Sequence[types.TextEdit]]:
265 return await _dispatch_standard_handler(
266 ls,
267 params.text_document.uri,
268 params,
269 LSP_FORMAT_FILE_HANDLERS,
270 "Full document formatting",
271 )
274async def _dispatch_standard_handler(
275 ls: "DebputyLanguageServer",
276 doc_uri: str,
277 params: P,
278 handler_table: HandlerDispatchTable[C],
279 request_type: str,
280) -> Optional[R]:
281 doc = ls.workspace.get_text_document(doc_uri)
283 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
284 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info)
285 if handler is None:
286 _info(
287 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
288 f" normalized filename: {normalized_filename}) - no handler"
289 )
290 return None
291 _info(
292 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
293 f" normalized filename: {normalized_filename}) - delegating to handler {handler.__qualname__}"
294 )
296 if inspect.iscoroutinefunction(handler):
297 return await handler(ls, params)
298 return handler(ls, params)
301def _resolve_handler(
302 handler_table: HandlerDispatchTable[C],
303 language_id: str,
304 normalized_filename: str,
305 log_func: Callable[[str], None],
306) -> Optional[C]:
307 dispatch_table = handler_table.get(language_id) if language_id != "" else None
308 log_func(f"resolve_handler({language_id=}, {normalized_filename=})")
309 if dispatch_table is None:
310 filename_based_table = handler_table[""]
311 m = filename_based_table.basename_based_lookups.get(
312 os.path.basename(normalized_filename)
313 )
314 if m is not None:
315 return m
316 return filename_based_table.path_name_based_lookups.get(normalized_filename)
317 m = dispatch_table.basename_based_lookups.get(
318 os.path.basename(normalized_filename),
319 dispatch_table.default_handler,
320 )
321 if m is not None:
322 return m
323 return dispatch_table.path_name_based_lookups.get(
324 normalized_filename, dispatch_table.default_handler
325 )