Coverage for src/debputy/lsp/lsp_dispatch.py: 53%
99 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import asyncio
2from typing import (
3 Dict,
4 Sequence,
5 Union,
6 Optional,
7 TypeVar,
8 List,
9 TYPE_CHECKING,
10 Callable,
11)
13from debputy import __version__
14from debputy.lsp.lsp_features import (
15 DIAGNOSTIC_HANDLERS,
16 COMPLETER_HANDLERS,
17 HOVER_HANDLERS,
18 SEMANTIC_TOKENS_FULL_HANDLERS,
19 CODE_ACTION_HANDLERS,
20 SEMANTIC_TOKENS_LEGEND,
21 WILL_SAVE_WAIT_UNTIL_HANDLERS,
22 FORMAT_FILE_HANDLERS,
23 C,
24 TEXT_DOC_INLAY_HANDLERS,
25 HandlerDispatchTable,
26)
27from debputy.util import _info
29_DOCUMENT_VERSION_TABLE: Dict[str, int] = {}
32if TYPE_CHECKING:
33 import lsprotocol.types as types
35 try:
36 from pygls.server import LanguageServer
37 from pygls.workspace import TextDocument
38 except ImportError:
39 pass
41 from debputy.lsp.debputy_ls import DebputyLanguageServer
43 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
44else:
45 import debputy.lsprotocol.types as types
47 try:
48 from pygls.server import LanguageServer
49 from pygls.workspace import TextDocument
50 from debputy.lsp.debputy_ls import DebputyLanguageServer
52 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
53 except ImportError:
55 class Mock:
57 def feature(self, *args, **kwargs):
58 return lambda x: x
60 DEBPUTY_LANGUAGE_SERVER = Mock()
63P = TypeVar("P")
64R = TypeVar("R")
65L = TypeVar("L", "LanguageServer", "DebputyLanguageServer")
68def is_doc_at_version(uri: str, version: int) -> bool:
69 dv = _DOCUMENT_VERSION_TABLE.get(uri)
70 return dv == version
73@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE)
74async def _on_initialize(
75 ls: "DebputyLanguageServer",
76 params: types.InitializeParams,
77) -> None:
78 await ls.on_initialize(params)
81@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN)
82async def _open_document(
83 ls: "DebputyLanguageServer",
84 params: types.DidChangeTextDocumentParams,
85) -> None:
86 await _open_or_changed_document(ls, params)
89@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE)
90async def _changed_document(
91 ls: "DebputyLanguageServer",
92 params: types.DidChangeTextDocumentParams,
93) -> None:
94 await _open_or_changed_document(ls, params)
97async def _open_or_changed_document(
98 ls: "DebputyLanguageServer",
99 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams],
100) -> None:
101 doc_uri = params.text_document.uri
102 doc = ls.workspace.get_text_document(doc_uri)
104 def _diag_publisher(
105 uri: str,
106 diagnostics: Optional[List[types.Diagnostic]],
107 version: int,
108 _is_partial: bool,
109 ) -> None:
110 ls.publish_diagnostics(uri, diagnostics, version)
112 await _diagnostics_for(
113 ls,
114 doc,
115 params.text_document.version,
116 params,
117 _diag_publisher,
118 )
121async def _diagnostics_for(
122 ls: "DebputyLanguageServer",
123 doc: "TextDocument",
124 expected_version: int,
125 params: Union[
126 types.DidOpenTextDocumentParams,
127 types.DidChangeTextDocumentParams,
128 types.DocumentDiagnosticParams,
129 ],
130 publisher: Callable[[str, Optional[List["types.Diagnostic"]], int, bool], None],
131) -> None:
132 doc_uri = doc.uri
133 _DOCUMENT_VERSION_TABLE[doc_uri] = expected_version
134 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
135 handler = _resolve_handler(DIAGNOSTIC_HANDLERS, language_id, normalized_filename)
136 if handler is None:
137 _info(
138 f"Opened/Changed document: {doc.path} ({language_id}, {id_source},"
139 f" normalized filename: {normalized_filename}) - no diagnostics handler"
140 )
141 return
142 _info(
143 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})"
144 f" - running diagnostics for doc version {expected_version}"
145 )
146 last_publish_count = -1
148 diagnostics_scanner = handler(ls, params)
149 diagnostics: Optional[List[types.Diagnostic]] = None
150 async for diagnostics in diagnostics_scanner:
151 await asyncio.sleep(0)
152 if not is_doc_at_version(doc_uri, expected_version):
153 # This basically happens with very edit, so lets not notify the client
154 # for that.
155 _info(
156 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed"
157 )
158 break
159 if diagnostics is None or last_publish_count != len(diagnostics):
160 last_publish_count = len(diagnostics) if diagnostics is not None else 0
161 publisher(
162 doc.uri,
163 diagnostics,
164 expected_version,
165 True,
166 )
167 publisher(
168 doc.uri,
169 diagnostics,
170 expected_version,
171 False,
172 )
175@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION)
176def _completions(
177 ls: "DebputyLanguageServer",
178 params: types.CompletionParams,
179) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]:
180 return _dispatch_standard_handler(
181 ls,
182 params.text_document.uri,
183 params,
184 COMPLETER_HANDLERS,
185 "Complete request",
186 )
189@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER)
190def _hover(
191 ls: "DebputyLanguageServer",
192 params: types.CompletionParams,
193) -> Optional[types.Hover]:
194 return _dispatch_standard_handler(
195 ls,
196 params.text_document.uri,
197 params,
198 HOVER_HANDLERS,
199 "Hover doc request",
200 )
203@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT)
204def _doc_inlay_hint(
205 ls: "DebputyLanguageServer",
206 params: types.InlayHintParams,
207) -> Optional[List[types.InlayHint]]:
208 return _dispatch_standard_handler(
209 ls,
210 params.text_document.uri,
211 params,
212 TEXT_DOC_INLAY_HANDLERS,
213 "Inlay hint (doc) request",
214 )
217@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION)
218def _code_actions(
219 ls: "DebputyLanguageServer",
220 params: types.CodeActionParams,
221) -> Optional[List[Union[types.Command, types.CodeAction]]]:
222 return _dispatch_standard_handler(
223 ls,
224 params.text_document.uri,
225 params,
226 CODE_ACTION_HANDLERS,
227 "Code action request",
228 )
231@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE)
232def _folding_ranges(
233 ls: "DebputyLanguageServer",
234 params: types.FoldingRangeParams,
235) -> Optional[Sequence[types.FoldingRange]]:
236 return _dispatch_standard_handler(
237 ls,
238 params.text_document.uri,
239 params,
240 HOVER_HANDLERS,
241 "Folding range request",
242 )
245@DEBPUTY_LANGUAGE_SERVER.feature(
246 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
247 types.SemanticTokensRegistrationOptions(
248 SEMANTIC_TOKENS_LEGEND,
249 full=True,
250 ),
251)
252def _semantic_tokens_full(
253 ls: "DebputyLanguageServer",
254 params: types.SemanticTokensParams,
255) -> Optional[types.SemanticTokens]:
256 return _dispatch_standard_handler(
257 ls,
258 params.text_document.uri,
259 params,
260 SEMANTIC_TOKENS_FULL_HANDLERS,
261 "Semantic tokens request",
262 )
265@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL)
266def _will_save_wait_until(
267 ls: "DebputyLanguageServer",
268 params: types.WillSaveTextDocumentParams,
269) -> Optional[Sequence[types.TextEdit]]:
270 return _dispatch_standard_handler(
271 ls,
272 params.text_document.uri,
273 params,
274 WILL_SAVE_WAIT_UNTIL_HANDLERS,
275 "On-save formatting",
276 )
279@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING)
280def _format_document(
281 ls: "DebputyLanguageServer",
282 params: types.WillSaveTextDocumentParams,
283) -> Optional[Sequence[types.TextEdit]]:
284 return _dispatch_standard_handler(
285 ls,
286 params.text_document.uri,
287 params,
288 FORMAT_FILE_HANDLERS,
289 "Full document formatting",
290 )
293def _dispatch_standard_handler(
294 ls: "DebputyLanguageServer",
295 doc_uri: str,
296 params: P,
297 handler_table: HandlerDispatchTable[C],
298 request_type: str,
299) -> Optional[R]:
300 doc = ls.workspace.get_text_document(doc_uri)
302 id_source, language_id, normalized_filename = ls.determine_language_id(doc)
303 handler = _resolve_handler(handler_table, language_id, normalized_filename)
304 if handler is None:
305 _info(
306 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
307 f" normalized filename: {normalized_filename}) - no handler"
308 )
309 return None
310 _info(
311 f"{request_type} for document: {doc.path} ({language_id}, {id_source},"
312 f" normalized filename: {normalized_filename}) - delegating to handler"
313 )
315 return handler(
316 ls,
317 params,
318 )
321def _resolve_handler(
322 handler_table: HandlerDispatchTable[C],
323 language_id: str,
324 normalized_filename: str,
325) -> Optional[C]:
326 dispatch_table = handler_table.get(language_id) if language_id != "" else None
327 _info(f"LID: {language_id} - {dispatch_table}")
328 if dispatch_table is None:
329 filename_based_table = handler_table[""]
330 return filename_based_table.filename_based_lookups.get(normalized_filename)
331 return dispatch_table.filename_based_lookups.get(
332 normalized_filename, dispatch_table.default_handler
333 )