Coverage for src/debputy/lsp/lsp_dispatch.py: 45%
104 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import inspect
2import os.path
3from typing import (
4 Union,
5 Optional,
6 TypeVar,
7 List,
8 TYPE_CHECKING,
9 Literal,
10)
11from collections.abc import Sequence, Callable
13from debputy import __version__
14from debputy.linting.lint_util import AbortTaskError
15from debputy.lsp.lsp_features import (
16 LSP_DIAGNOSTIC_HANDLERS,
17 COMPLETER_HANDLERS,
18 HOVER_HANDLERS,
19 SEMANTIC_TOKENS_FULL_HANDLERS,
20 CODE_ACTION_HANDLERS,
21 SEMANTIC_TOKENS_LEGEND,
22 WILL_SAVE_WAIT_UNTIL_HANDLERS,
23 LSP_FORMAT_FILE_HANDLERS,
24 C,
25 TEXT_DOC_INLAY_HANDLERS,
26 HandlerDispatchTable,
27 DOCUMENT_LINK_HANDLERS,
28)
29from debputy.util import _info, _trace_log
31if TYPE_CHECKING:
32 import lsprotocol.types as types
34 try:
35 from pygls.server import LanguageServer
36 from pygls.workspace import TextDocument
37 except ImportError:
38 pass
40 from debputy.lsp.debputy_ls import DebputyLanguageServer
42 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
43else:
44 import debputy.lsprotocol.types as types
46 try:
47 from pygls.server import LanguageServer
48 from pygls.workspace import TextDocument
49 from debputy.lsp.debputy_ls import DebputyLanguageServer
51 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
52 except ImportError:
54 class Mock:
56 def feature(self, *args, **kwargs):
57 return lambda x: x
59 DEBPUTY_LANGUAGE_SERVER = Mock()
62P = TypeVar("P")
63R = TypeVar("R")
64L = TypeVar("L", "LanguageServer", "DebputyLanguageServer")
67@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE)
68async def _on_initialize(
69 ls: "DebputyLanguageServer",
70 params: types.InitializeParams,
71) -> None:
72 await ls.on_initialize(params)
75@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN)
76async def _open_document(
77 ls: "DebputyLanguageServer",
78 params: types.DidChangeTextDocumentParams,
79) -> None:
80 await _open_or_changed_document(ls, params, "Opened")
83@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE)
84async def _changed_document(
85 ls: "DebputyLanguageServer",
86 params: types.DidChangeTextDocumentParams,
87) -> None:
88 await _open_or_changed_document(ls, params, "Changed")
91@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE)
92def _close_document(
93 ls: "DebputyLanguageServer",
94 params: types.DidCloseTextDocumentParams,
95) -> None:
96 ls.close_document(params.text_document.uri)
99async def _open_or_changed_document(
100 ls: "DebputyLanguageServer",
101 params: types.DidOpenTextDocumentParams | types.DidChangeTextDocumentParams,
102 event_name: Literal["Opened", "Changed"],
103) -> None:
104 doc_uri = params.text_document.uri
105 doc = ls.workspace.get_text_document(doc_uri)
107 await _diagnostics_for(
108 ls,
109 doc,
110 params.text_document.version,
111 params,
112 event_name,
113 )
116async def _diagnostics_for(
117 ls: "DebputyLanguageServer",
118 doc: "TextDocument",
119 expected_version: int,
120 params: (
121 types.DidOpenTextDocumentParams
122 | types.DidChangeTextDocumentParams
123 | types.DocumentDiagnosticParams
124 ),
125 event_name: Literal["Opened", "Changed"],
126) -> None:
127 doc_uri = doc.uri
128 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
129 log_func = _info if event_name == "Opened" else _trace_log
130 handler = _resolve_handler(
131 LSP_DIAGNOSTIC_HANDLERS,
132 language_id,
133 normalized_filename,
134 log_func,
135 )
136 if handler is None:
137 log_func(
138 f"{event_name} document: {doc.path} ({language_id}, {id_source},"
139 f" normalized filename: {normalized_filename}) - no diagnostics handler"
140 )
141 return
142 log_func(
143 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})"
144 f" - running diagnostics for doc version {expected_version}"
145 )
147 try:
148 diagnostics = await handler(ls, params)
149 except AbortTaskError as e:
150 _trace_log(f"Aborted lint task: {e.message}")
151 return
153 ls.record_diagnostics(doc_uri, expected_version, diagnostics or [], False)
156@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION)
157async def _completions(
158 ls: "DebputyLanguageServer",
159 params: types.CompletionParams,
160) -> types.CompletionList | Sequence[types.CompletionItem] | None:
161 return await _dispatch_standard_handler(
162 ls,
163 params.text_document.uri,
164 params,
165 COMPLETER_HANDLERS,
166 "Complete request",
167 )
170@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER)
171async def _hover(
172 ls: "DebputyLanguageServer",
173 params: types.CompletionParams,
174) -> types.Hover | None:
175 return await _dispatch_standard_handler(
176 ls,
177 params.text_document.uri,
178 params,
179 HOVER_HANDLERS,
180 "Hover doc request",
181 )
184@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT)
185async def _doc_inlay_hint(
186 ls: "DebputyLanguageServer",
187 params: types.InlayHintParams,
188) -> list[types.InlayHint] | None:
189 return await _dispatch_standard_handler(
190 ls,
191 params.text_document.uri,
192 params,
193 TEXT_DOC_INLAY_HANDLERS,
194 "Inlay hint (doc) request",
195 )
198@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION)
199async def _code_actions(
200 ls: "DebputyLanguageServer",
201 params: types.CodeActionParams,
202) -> list[types.Command | types.CodeAction] | None:
203 return await _dispatch_standard_handler(
204 ls,
205 params.text_document.uri,
206 params,
207 CODE_ACTION_HANDLERS,
208 "Code action request",
209 )
212@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE)
213async def _folding_ranges(
214 ls: "DebputyLanguageServer",
215 params: types.FoldingRangeParams,
216) -> Sequence[types.FoldingRange] | None:
217 return await _dispatch_standard_handler(
218 ls,
219 params.text_document.uri,
220 params,
221 HOVER_HANDLERS,
222 "Folding range request",
223 )
226@DEBPUTY_LANGUAGE_SERVER.feature(
227 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
228 types.SemanticTokensRegistrationOptions(
229 SEMANTIC_TOKENS_LEGEND,
230 full=True,
231 ),
232)
233async def semantic_tokens_full(
234 ls: "DebputyLanguageServer",
235 params: types.SemanticTokensParams,
236) -> types.SemanticTokens | None:
237 return await _dispatch_standard_handler(
238 ls,
239 params.text_document.uri,
240 params,
241 SEMANTIC_TOKENS_FULL_HANDLERS,
242 "Semantic tokens request",
243 )
246@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL)
247async def _will_save_wait_until(
248 ls: "DebputyLanguageServer",
249 params: types.WillSaveTextDocumentParams,
250) -> Sequence[types.TextEdit] | None:
251 return await _dispatch_standard_handler(
252 ls,
253 params.text_document.uri,
254 params,
255 WILL_SAVE_WAIT_UNTIL_HANDLERS,
256 "On-save formatting",
257 )
260@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING)
261async def _format_document(
262 ls: "DebputyLanguageServer",
263 params: types.WillSaveTextDocumentParams,
264) -> Sequence[types.TextEdit] | None:
265 return await _dispatch_standard_handler(
266 ls,
267 params.text_document.uri,
268 params,
269 LSP_FORMAT_FILE_HANDLERS,
270 "Full document formatting",
271 )
274@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DOCUMENT_LINK)
275async def _document_link(
276 ls: "DebputyLanguageServer",
277 params: types.DocumentLinkParams,
278) -> Optional[Sequence[types.DocumentLink]]:
279 return await _dispatch_standard_handler(
280 ls,
281 params.text_document.uri,
282 params,
283 DOCUMENT_LINK_HANDLERS,
284 "Document links request",
285 )
288async def _dispatch_standard_handler(
289 ls: "DebputyLanguageServer",
290 doc_uri: str,
291 params: P,
292 handler_table: HandlerDispatchTable[C],
293 request_type: str,
294) -> R | None:
295 doc = ls.workspace.get_text_document(doc_uri)
297 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
298 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info)
299 if handler is None:
300 _info(
301 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
302 f" normalized filename: {normalized_filename}) - no handler"
303 )
304 return None
305 _info(
306 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
307 f" normalized filename: {normalized_filename}) - delegating to handler {handler.__qualname__}"
308 )
310 if inspect.iscoroutinefunction(handler):
311 return await handler(ls, params)
312 return handler(ls, params)
315def _resolve_handler(
316 handler_table: HandlerDispatchTable[C],
317 language_id: str,
318 normalized_filename: str,
319 log_func: Callable[[str], None],
320) -> C | None:
321 dispatch_table = handler_table.get(language_id) if language_id != "" else None
322 log_func(f"resolve_handler({language_id=}, {normalized_filename=})")
323 if dispatch_table is None:
324 filename_based_table = handler_table[""]
325 m = filename_based_table.basename_based_lookups.get(
326 os.path.basename(normalized_filename)
327 )
328 if m is not None:
329 return m
330 return filename_based_table.path_name_based_lookups.get(normalized_filename)
331 m = dispatch_table.basename_based_lookups.get(
332 os.path.basename(normalized_filename),
333 dispatch_table.default_handler,
334 )
335 if m is not None:
336 return m
337 return dispatch_table.path_name_based_lookups.get(
338 normalized_filename, dispatch_table.default_handler
339 )