Coverage for src/debputy/lsp/lsp_dispatch.py: 58%
93 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-03-24 16:38 +0000
1import inspect
2from typing import (
3 Sequence,
4 Union,
5 Optional,
6 TypeVar,
7 List,
8 TYPE_CHECKING,
9 Callable,
10 Literal,
11)
13from debputy import __version__
14from debputy.linting.lint_util import AbortTaskError
15from debputy.lsp.lsp_features import (
16 DIAGNOSTIC_HANDLERS,
17 COMPLETER_HANDLERS,
18 HOVER_HANDLERS,
19 SEMANTIC_TOKENS_FULL_HANDLERS,
20 CODE_ACTION_HANDLERS,
21 SEMANTIC_TOKENS_LEGEND,
22 WILL_SAVE_WAIT_UNTIL_HANDLERS,
23 FORMAT_FILE_HANDLERS,
24 C,
25 TEXT_DOC_INLAY_HANDLERS,
26 HandlerDispatchTable,
27)
28from debputy.util import _info, _trace_log
30if TYPE_CHECKING:
31 import lsprotocol.types as types
33 try:
34 from pygls.server import LanguageServer
35 from pygls.workspace import TextDocument
36 except ImportError:
37 pass
39 from debputy.lsp.debputy_ls import DebputyLanguageServer
41 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
42else:
43 import debputy.lsprotocol.types as types
45 try:
46 from pygls.server import LanguageServer
47 from pygls.workspace import TextDocument
48 from debputy.lsp.debputy_ls import DebputyLanguageServer
50 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
51 except ImportError:
53 class Mock:
55 def feature(self, *args, **kwargs):
56 return lambda x: x
58 DEBPUTY_LANGUAGE_SERVER = Mock()
61P = TypeVar("P")
62R = TypeVar("R")
63L = TypeVar("L", "LanguageServer", "DebputyLanguageServer")
66@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE)
67async def _on_initialize(
68 ls: "DebputyLanguageServer",
69 params: types.InitializeParams,
70) -> None:
71 await ls.on_initialize(params)
74@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN)
75async def _open_document(
76 ls: "DebputyLanguageServer",
77 params: types.DidChangeTextDocumentParams,
78) -> None:
79 await _open_or_changed_document(ls, params, "Opened")
82@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE)
83async def _changed_document(
84 ls: "DebputyLanguageServer",
85 params: types.DidChangeTextDocumentParams,
86) -> None:
87 await _open_or_changed_document(ls, params, "Changed")
90@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE)
91def _changed_document(
92 ls: "DebputyLanguageServer",
93 params: types.DidCloseTextDocumentParams,
94) -> None:
95 ls.close_document(params.text_document.uri)
98async def _open_or_changed_document(
99 ls: "DebputyLanguageServer",
100 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams],
101 event_name: Literal["Opened", "Changed"],
102) -> None:
103 doc_uri = params.text_document.uri
104 doc = ls.workspace.get_text_document(doc_uri)
106 await _diagnostics_for(
107 ls,
108 doc,
109 params.text_document.version,
110 params,
111 event_name,
112 )
115async def _diagnostics_for(
116 ls: "DebputyLanguageServer",
117 doc: "TextDocument",
118 expected_version: int,
119 params: Union[
120 types.DidOpenTextDocumentParams,
121 types.DidChangeTextDocumentParams,
122 types.DocumentDiagnosticParams,
123 ],
124 event_name: Literal["Opened", "Changed"],
125) -> None:
126 doc_uri = doc.uri
127 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
128 log_func = _info if event_name == "Opened" else _trace_log
129 handler = _resolve_handler(
130 DIAGNOSTIC_HANDLERS,
131 language_id,
132 normalized_filename,
133 log_func,
134 )
135 if handler is None:
136 log_func(
137 f"{event_name} document: {doc.path} ({language_id}, {id_source},"
138 f" normalized filename: {normalized_filename}) - no diagnostics handler"
139 )
140 return
141 log_func(
142 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})"
143 f" - running diagnostics for doc version {expected_version}"
144 )
146 try:
147 diagnostics = await handler(ls, params)
148 except AbortTaskError as e:
149 _trace_log(f"Aborted lint task: {e.message}")
150 return
152 ls.record_diagnostics(doc_uri, expected_version, diagnostics, False)
155@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION)
156async def _completions(
157 ls: "DebputyLanguageServer",
158 params: types.CompletionParams,
159) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]:
160 return await _dispatch_standard_handler(
161 ls,
162 params.text_document.uri,
163 params,
164 COMPLETER_HANDLERS,
165 "Complete request",
166 )
169@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER)
170async def _hover(
171 ls: "DebputyLanguageServer",
172 params: types.CompletionParams,
173) -> Optional[types.Hover]:
174 return await _dispatch_standard_handler(
175 ls,
176 params.text_document.uri,
177 params,
178 HOVER_HANDLERS,
179 "Hover doc request",
180 )
183@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT)
184async def _doc_inlay_hint(
185 ls: "DebputyLanguageServer",
186 params: types.InlayHintParams,
187) -> Optional[List[types.InlayHint]]:
188 return await _dispatch_standard_handler(
189 ls,
190 params.text_document.uri,
191 params,
192 TEXT_DOC_INLAY_HANDLERS,
193 "Inlay hint (doc) request",
194 )
197@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION)
198async def _code_actions(
199 ls: "DebputyLanguageServer",
200 params: types.CodeActionParams,
201) -> Optional[List[Union[types.Command, types.CodeAction]]]:
202 return await _dispatch_standard_handler(
203 ls,
204 params.text_document.uri,
205 params,
206 CODE_ACTION_HANDLERS,
207 "Code action request",
208 )
211@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE)
212async def _folding_ranges(
213 ls: "DebputyLanguageServer",
214 params: types.FoldingRangeParams,
215) -> Optional[Sequence[types.FoldingRange]]:
216 return await _dispatch_standard_handler(
217 ls,
218 params.text_document.uri,
219 params,
220 HOVER_HANDLERS,
221 "Folding range request",
222 )
225@DEBPUTY_LANGUAGE_SERVER.feature(
226 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
227 types.SemanticTokensRegistrationOptions(
228 SEMANTIC_TOKENS_LEGEND,
229 full=True,
230 ),
231)
232async def _semantic_tokens_full(
233 ls: "DebputyLanguageServer",
234 params: types.SemanticTokensParams,
235) -> Optional[types.SemanticTokens]:
236 return await _dispatch_standard_handler(
237 ls,
238 params.text_document.uri,
239 params,
240 SEMANTIC_TOKENS_FULL_HANDLERS,
241 "Semantic tokens request",
242 )
245@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL)
246async def _will_save_wait_until(
247 ls: "DebputyLanguageServer",
248 params: types.WillSaveTextDocumentParams,
249) -> Optional[Sequence[types.TextEdit]]:
250 return await _dispatch_standard_handler(
251 ls,
252 params.text_document.uri,
253 params,
254 WILL_SAVE_WAIT_UNTIL_HANDLERS,
255 "On-save formatting",
256 )
259@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING)
260async def _format_document(
261 ls: "DebputyLanguageServer",
262 params: types.WillSaveTextDocumentParams,
263) -> Optional[Sequence[types.TextEdit]]:
264 return await _dispatch_standard_handler(
265 ls,
266 params.text_document.uri,
267 params,
268 FORMAT_FILE_HANDLERS,
269 "Full document formatting",
270 )
273async def _dispatch_standard_handler(
274 ls: "DebputyLanguageServer",
275 doc_uri: str,
276 params: P,
277 handler_table: HandlerDispatchTable[C],
278 request_type: str,
279) -> Optional[R]:
280 doc = ls.workspace.get_text_document(doc_uri)
282 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
283 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info)
284 if handler is None:
285 _info(
286 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
287 f" normalized filename: {normalized_filename}) - no handler"
288 )
289 return None
290 _info(
291 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
292 f" normalized filename: {normalized_filename}) - delegating to handler"
293 )
295 if inspect.iscoroutinefunction(handler):
296 return await handler(ls, params)
297 return handler(ls, params)
300def _resolve_handler(
301 handler_table: HandlerDispatchTable[C],
302 language_id: str,
303 normalized_filename: str,
304 log_func: Callable[[str], None],
305) -> Optional[C]:
306 dispatch_table = handler_table.get(language_id) if language_id != "" else None
307 log_func(f"resolve_handler({language_id=}, {normalized_filename=})")
308 if dispatch_table is None:
309 filename_based_table = handler_table[""]
310 return filename_based_table.filename_based_lookups.get(normalized_filename)
311 return dispatch_table.filename_based_lookups.get(
312 normalized_filename, dispatch_table.default_handler
313 )