Coverage for src/debputy/lsp/lsp_dispatch.py: 45%

104 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import inspect 

2import os.path 

3from typing import ( 

4 Union, 

5 Optional, 

6 TypeVar, 

7 List, 

8 TYPE_CHECKING, 

9 Literal, 

10) 

11from collections.abc import Sequence, Callable 

12 

13from debputy import __version__ 

14from debputy.linting.lint_util import AbortTaskError 

15from debputy.lsp.lsp_features import ( 

16 LSP_DIAGNOSTIC_HANDLERS, 

17 COMPLETER_HANDLERS, 

18 HOVER_HANDLERS, 

19 SEMANTIC_TOKENS_FULL_HANDLERS, 

20 CODE_ACTION_HANDLERS, 

21 SEMANTIC_TOKENS_LEGEND, 

22 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

23 LSP_FORMAT_FILE_HANDLERS, 

24 C, 

25 TEXT_DOC_INLAY_HANDLERS, 

26 HandlerDispatchTable, 

27 DOCUMENT_LINK_HANDLERS, 

28) 

29from debputy.util import _info, _trace_log 

30 

31if TYPE_CHECKING: 

32 import lsprotocol.types as types 

33 

34 try: 

35 from pygls.server import LanguageServer 

36 from pygls.workspace import TextDocument 

37 except ImportError: 

38 pass 

39 

40 from debputy.lsp.debputy_ls import DebputyLanguageServer 

41 

42 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

43else: 

44 import debputy.lsprotocol.types as types 

45 

46 try: 

47 from pygls.server import LanguageServer 

48 from pygls.workspace import TextDocument 

49 from debputy.lsp.debputy_ls import DebputyLanguageServer 

50 

51 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

52 except ImportError: 

53 

54 class Mock: 

55 

56 def feature(self, *args, **kwargs): 

57 return lambda x: x 

58 

59 DEBPUTY_LANGUAGE_SERVER = Mock() 

60 

61 

62P = TypeVar("P") 

63R = TypeVar("R") 

64L = TypeVar("L", "LanguageServer", "DebputyLanguageServer") 

65 

66 

67@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE) 

68async def _on_initialize( 

69 ls: "DebputyLanguageServer", 

70 params: types.InitializeParams, 

71) -> None: 

72 await ls.on_initialize(params) 

73 

74 

75@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN) 

76async def _open_document( 

77 ls: "DebputyLanguageServer", 

78 params: types.DidChangeTextDocumentParams, 

79) -> None: 

80 await _open_or_changed_document(ls, params, "Opened") 

81 

82 

83@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE) 

84async def _changed_document( 

85 ls: "DebputyLanguageServer", 

86 params: types.DidChangeTextDocumentParams, 

87) -> None: 

88 await _open_or_changed_document(ls, params, "Changed") 

89 

90 

91@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE) 

92def _close_document( 

93 ls: "DebputyLanguageServer", 

94 params: types.DidCloseTextDocumentParams, 

95) -> None: 

96 ls.close_document(params.text_document.uri) 

97 

98 

99async def _open_or_changed_document( 

100 ls: "DebputyLanguageServer", 

101 params: types.DidOpenTextDocumentParams | types.DidChangeTextDocumentParams, 

102 event_name: Literal["Opened", "Changed"], 

103) -> None: 

104 doc_uri = params.text_document.uri 

105 doc = ls.workspace.get_text_document(doc_uri) 

106 

107 await _diagnostics_for( 

108 ls, 

109 doc, 

110 params.text_document.version, 

111 params, 

112 event_name, 

113 ) 

114 

115 

116async def _diagnostics_for( 

117 ls: "DebputyLanguageServer", 

118 doc: "TextDocument", 

119 expected_version: int, 

120 params: ( 

121 types.DidOpenTextDocumentParams 

122 | types.DidChangeTextDocumentParams 

123 | types.DocumentDiagnosticParams 

124 ), 

125 event_name: Literal["Opened", "Changed"], 

126) -> None: 

127 doc_uri = doc.uri 

128 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

129 log_func = _info if event_name == "Opened" else _trace_log 

130 handler = _resolve_handler( 

131 LSP_DIAGNOSTIC_HANDLERS, 

132 language_id, 

133 normalized_filename, 

134 log_func, 

135 ) 

136 if handler is None: 

137 log_func( 

138 f"{event_name} document: {doc.path} ({language_id}, {id_source}," 

139 f" normalized filename: {normalized_filename}) - no diagnostics handler" 

140 ) 

141 return 

142 log_func( 

143 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})" 

144 f" - running diagnostics for doc version {expected_version}" 

145 ) 

146 

147 try: 

148 diagnostics = await handler(ls, params) 

149 except AbortTaskError as e: 

150 _trace_log(f"Aborted lint task: {e.message}") 

151 return 

152 

153 ls.record_diagnostics(doc_uri, expected_version, diagnostics or [], False) 

154 

155 

156@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION) 

157async def _completions( 

158 ls: "DebputyLanguageServer", 

159 params: types.CompletionParams, 

160) -> types.CompletionList | Sequence[types.CompletionItem] | None: 

161 return await _dispatch_standard_handler( 

162 ls, 

163 params.text_document.uri, 

164 params, 

165 COMPLETER_HANDLERS, 

166 "Complete request", 

167 ) 

168 

169 

170@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER) 

171async def _hover( 

172 ls: "DebputyLanguageServer", 

173 params: types.CompletionParams, 

174) -> types.Hover | None: 

175 return await _dispatch_standard_handler( 

176 ls, 

177 params.text_document.uri, 

178 params, 

179 HOVER_HANDLERS, 

180 "Hover doc request", 

181 ) 

182 

183 

184@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT) 

185async def _doc_inlay_hint( 

186 ls: "DebputyLanguageServer", 

187 params: types.InlayHintParams, 

188) -> list[types.InlayHint] | None: 

189 return await _dispatch_standard_handler( 

190 ls, 

191 params.text_document.uri, 

192 params, 

193 TEXT_DOC_INLAY_HANDLERS, 

194 "Inlay hint (doc) request", 

195 ) 

196 

197 

198@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION) 

199async def _code_actions( 

200 ls: "DebputyLanguageServer", 

201 params: types.CodeActionParams, 

202) -> list[types.Command | types.CodeAction] | None: 

203 return await _dispatch_standard_handler( 

204 ls, 

205 params.text_document.uri, 

206 params, 

207 CODE_ACTION_HANDLERS, 

208 "Code action request", 

209 ) 

210 

211 

212@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE) 

213async def _folding_ranges( 

214 ls: "DebputyLanguageServer", 

215 params: types.FoldingRangeParams, 

216) -> Sequence[types.FoldingRange] | None: 

217 return await _dispatch_standard_handler( 

218 ls, 

219 params.text_document.uri, 

220 params, 

221 HOVER_HANDLERS, 

222 "Folding range request", 

223 ) 

224 

225 

226@DEBPUTY_LANGUAGE_SERVER.feature( 

227 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

228 types.SemanticTokensRegistrationOptions( 

229 SEMANTIC_TOKENS_LEGEND, 

230 full=True, 

231 ), 

232) 

233async def semantic_tokens_full( 

234 ls: "DebputyLanguageServer", 

235 params: types.SemanticTokensParams, 

236) -> types.SemanticTokens | None: 

237 return await _dispatch_standard_handler( 

238 ls, 

239 params.text_document.uri, 

240 params, 

241 SEMANTIC_TOKENS_FULL_HANDLERS, 

242 "Semantic tokens request", 

243 ) 

244 

245 

246@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL) 

247async def _will_save_wait_until( 

248 ls: "DebputyLanguageServer", 

249 params: types.WillSaveTextDocumentParams, 

250) -> Sequence[types.TextEdit] | None: 

251 return await _dispatch_standard_handler( 

252 ls, 

253 params.text_document.uri, 

254 params, 

255 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

256 "On-save formatting", 

257 ) 

258 

259 

260@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING) 

261async def _format_document( 

262 ls: "DebputyLanguageServer", 

263 params: types.WillSaveTextDocumentParams, 

264) -> Sequence[types.TextEdit] | None: 

265 return await _dispatch_standard_handler( 

266 ls, 

267 params.text_document.uri, 

268 params, 

269 LSP_FORMAT_FILE_HANDLERS, 

270 "Full document formatting", 

271 ) 

272 

273 

274@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DOCUMENT_LINK) 

275async def _document_link( 

276 ls: "DebputyLanguageServer", 

277 params: types.DocumentLinkParams, 

278) -> Optional[Sequence[types.DocumentLink]]: 

279 return await _dispatch_standard_handler( 

280 ls, 

281 params.text_document.uri, 

282 params, 

283 DOCUMENT_LINK_HANDLERS, 

284 "Document links request", 

285 ) 

286 

287 

288async def _dispatch_standard_handler( 

289 ls: "DebputyLanguageServer", 

290 doc_uri: str, 

291 params: P, 

292 handler_table: HandlerDispatchTable[C], 

293 request_type: str, 

294) -> R | None: 

295 doc = ls.workspace.get_text_document(doc_uri) 

296 

297 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

298 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info) 

299 if handler is None: 

300 _info( 

301 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

302 f" normalized filename: {normalized_filename}) - no handler" 

303 ) 

304 return None 

305 _info( 

306 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

307 f" normalized filename: {normalized_filename}) - delegating to handler {handler.__qualname__}" 

308 ) 

309 

310 if inspect.iscoroutinefunction(handler): 

311 return await handler(ls, params) 

312 return handler(ls, params) 

313 

314 

315def _resolve_handler( 

316 handler_table: HandlerDispatchTable[C], 

317 language_id: str, 

318 normalized_filename: str, 

319 log_func: Callable[[str], None], 

320) -> C | None: 

321 dispatch_table = handler_table.get(language_id) if language_id != "" else None 

322 log_func(f"resolve_handler({language_id=}, {normalized_filename=})") 

323 if dispatch_table is None: 

324 filename_based_table = handler_table[""] 

325 m = filename_based_table.basename_based_lookups.get( 

326 os.path.basename(normalized_filename) 

327 ) 

328 if m is not None: 

329 return m 

330 return filename_based_table.path_name_based_lookups.get(normalized_filename) 

331 m = dispatch_table.basename_based_lookups.get( 

332 os.path.basename(normalized_filename), 

333 dispatch_table.default_handler, 

334 ) 

335 if m is not None: 

336 return m 

337 return dispatch_table.path_name_based_lookups.get( 

338 normalized_filename, dispatch_table.default_handler 

339 )