Coverage for src/debputy/lsp/lsp_dispatch.py: 44%

100 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import inspect 

2import os.path 

3from typing import ( 

4 Sequence, 

5 Union, 

6 Optional, 

7 TypeVar, 

8 List, 

9 TYPE_CHECKING, 

10 Callable, 

11 Literal, 

12) 

13 

14from debputy import __version__ 

15from debputy.linting.lint_util import AbortTaskError 

16from debputy.lsp.lsp_features import ( 

17 LSP_DIAGNOSTIC_HANDLERS, 

18 COMPLETER_HANDLERS, 

19 HOVER_HANDLERS, 

20 SEMANTIC_TOKENS_FULL_HANDLERS, 

21 CODE_ACTION_HANDLERS, 

22 SEMANTIC_TOKENS_LEGEND, 

23 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

24 LSP_FORMAT_FILE_HANDLERS, 

25 C, 

26 TEXT_DOC_INLAY_HANDLERS, 

27 HandlerDispatchTable, 

28) 

29from debputy.util import _info, _trace_log 

30 

31if TYPE_CHECKING: 

32 import lsprotocol.types as types 

33 

34 try: 

35 from pygls.server import LanguageServer 

36 from pygls.workspace import TextDocument 

37 except ImportError: 

38 pass 

39 

40 from debputy.lsp.debputy_ls import DebputyLanguageServer 

41 

42 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

43else: 

44 import debputy.lsprotocol.types as types 

45 

46 try: 

47 from pygls.server import LanguageServer 

48 from pygls.workspace import TextDocument 

49 from debputy.lsp.debputy_ls import DebputyLanguageServer 

50 

51 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

52 except ImportError: 

53 

54 class Mock: 

55 

56 def feature(self, *args, **kwargs): 

57 return lambda x: x 

58 

59 DEBPUTY_LANGUAGE_SERVER = Mock() 

60 

61 

62P = TypeVar("P") 

63R = TypeVar("R") 

64L = TypeVar("L", "LanguageServer", "DebputyLanguageServer") 

65 

66 

67@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE) 

68async def _on_initialize( 

69 ls: "DebputyLanguageServer", 

70 params: types.InitializeParams, 

71) -> None: 

72 await ls.on_initialize(params) 

73 

74 

75@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN) 

76async def _open_document( 

77 ls: "DebputyLanguageServer", 

78 params: types.DidChangeTextDocumentParams, 

79) -> None: 

80 await _open_or_changed_document(ls, params, "Opened") 

81 

82 

83@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE) 

84async def _changed_document( 

85 ls: "DebputyLanguageServer", 

86 params: types.DidChangeTextDocumentParams, 

87) -> None: 

88 await _open_or_changed_document(ls, params, "Changed") 

89 

90 

91@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE) 

92def _close_document( 

93 ls: "DebputyLanguageServer", 

94 params: types.DidCloseTextDocumentParams, 

95) -> None: 

96 ls.close_document(params.text_document.uri) 

97 

98 

99async def _open_or_changed_document( 

100 ls: "DebputyLanguageServer", 

101 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams], 

102 event_name: Literal["Opened", "Changed"], 

103) -> None: 

104 doc_uri = params.text_document.uri 

105 doc = ls.workspace.get_text_document(doc_uri) 

106 

107 await _diagnostics_for( 

108 ls, 

109 doc, 

110 params.text_document.version, 

111 params, 

112 event_name, 

113 ) 

114 

115 

116async def _diagnostics_for( 

117 ls: "DebputyLanguageServer", 

118 doc: "TextDocument", 

119 expected_version: int, 

120 params: Union[ 

121 types.DidOpenTextDocumentParams, 

122 types.DidChangeTextDocumentParams, 

123 types.DocumentDiagnosticParams, 

124 ], 

125 event_name: Literal["Opened", "Changed"], 

126) -> None: 

127 doc_uri = doc.uri 

128 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

129 log_func = _info if event_name == "Opened" else _trace_log 

130 handler = _resolve_handler( 

131 LSP_DIAGNOSTIC_HANDLERS, 

132 language_id, 

133 normalized_filename, 

134 log_func, 

135 ) 

136 if handler is None: 

137 log_func( 

138 f"{event_name} document: {doc.path} ({language_id}, {id_source}," 

139 f" normalized filename: {normalized_filename}) - no diagnostics handler" 

140 ) 

141 return 

142 log_func( 

143 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})" 

144 f" - running diagnostics for doc version {expected_version}" 

145 ) 

146 

147 try: 

148 diagnostics = await handler(ls, params) 

149 except AbortTaskError as e: 

150 _trace_log(f"Aborted lint task: {e.message}") 

151 return 

152 

153 ls.record_diagnostics(doc_uri, expected_version, diagnostics or [], False) 

154 

155 

156@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION) 

157async def _completions( 

158 ls: "DebputyLanguageServer", 

159 params: types.CompletionParams, 

160) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]: 

161 return await _dispatch_standard_handler( 

162 ls, 

163 params.text_document.uri, 

164 params, 

165 COMPLETER_HANDLERS, 

166 "Complete request", 

167 ) 

168 

169 

170@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER) 

171async def _hover( 

172 ls: "DebputyLanguageServer", 

173 params: types.CompletionParams, 

174) -> Optional[types.Hover]: 

175 return await _dispatch_standard_handler( 

176 ls, 

177 params.text_document.uri, 

178 params, 

179 HOVER_HANDLERS, 

180 "Hover doc request", 

181 ) 

182 

183 

184@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT) 

185async def _doc_inlay_hint( 

186 ls: "DebputyLanguageServer", 

187 params: types.InlayHintParams, 

188) -> Optional[List[types.InlayHint]]: 

189 return await _dispatch_standard_handler( 

190 ls, 

191 params.text_document.uri, 

192 params, 

193 TEXT_DOC_INLAY_HANDLERS, 

194 "Inlay hint (doc) request", 

195 ) 

196 

197 

198@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION) 

199async def _code_actions( 

200 ls: "DebputyLanguageServer", 

201 params: types.CodeActionParams, 

202) -> Optional[List[Union[types.Command, types.CodeAction]]]: 

203 return await _dispatch_standard_handler( 

204 ls, 

205 params.text_document.uri, 

206 params, 

207 CODE_ACTION_HANDLERS, 

208 "Code action request", 

209 ) 

210 

211 

212@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE) 

213async def _folding_ranges( 

214 ls: "DebputyLanguageServer", 

215 params: types.FoldingRangeParams, 

216) -> Optional[Sequence[types.FoldingRange]]: 

217 return await _dispatch_standard_handler( 

218 ls, 

219 params.text_document.uri, 

220 params, 

221 HOVER_HANDLERS, 

222 "Folding range request", 

223 ) 

224 

225 

226@DEBPUTY_LANGUAGE_SERVER.feature( 

227 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

228 types.SemanticTokensRegistrationOptions( 

229 SEMANTIC_TOKENS_LEGEND, 

230 full=True, 

231 ), 

232) 

233async def semantic_tokens_full( 

234 ls: "DebputyLanguageServer", 

235 params: types.SemanticTokensParams, 

236) -> Optional[types.SemanticTokens]: 

237 return await _dispatch_standard_handler( 

238 ls, 

239 params.text_document.uri, 

240 params, 

241 SEMANTIC_TOKENS_FULL_HANDLERS, 

242 "Semantic tokens request", 

243 ) 

244 

245 

246@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL) 

247async def _will_save_wait_until( 

248 ls: "DebputyLanguageServer", 

249 params: types.WillSaveTextDocumentParams, 

250) -> Optional[Sequence[types.TextEdit]]: 

251 return await _dispatch_standard_handler( 

252 ls, 

253 params.text_document.uri, 

254 params, 

255 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

256 "On-save formatting", 

257 ) 

258 

259 

260@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING) 

261async def _format_document( 

262 ls: "DebputyLanguageServer", 

263 params: types.WillSaveTextDocumentParams, 

264) -> Optional[Sequence[types.TextEdit]]: 

265 return await _dispatch_standard_handler( 

266 ls, 

267 params.text_document.uri, 

268 params, 

269 LSP_FORMAT_FILE_HANDLERS, 

270 "Full document formatting", 

271 ) 

272 

273 

274async def _dispatch_standard_handler( 

275 ls: "DebputyLanguageServer", 

276 doc_uri: str, 

277 params: P, 

278 handler_table: HandlerDispatchTable[C], 

279 request_type: str, 

280) -> Optional[R]: 

281 doc = ls.workspace.get_text_document(doc_uri) 

282 

283 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

284 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info) 

285 if handler is None: 

286 _info( 

287 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

288 f" normalized filename: {normalized_filename}) - no handler" 

289 ) 

290 return None 

291 _info( 

292 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

293 f" normalized filename: {normalized_filename}) - delegating to handler {handler.__qualname__}" 

294 ) 

295 

296 if inspect.iscoroutinefunction(handler): 

297 return await handler(ls, params) 

298 return handler(ls, params) 

299 

300 

301def _resolve_handler( 

302 handler_table: HandlerDispatchTable[C], 

303 language_id: str, 

304 normalized_filename: str, 

305 log_func: Callable[[str], None], 

306) -> Optional[C]: 

307 dispatch_table = handler_table.get(language_id) if language_id != "" else None 

308 log_func(f"resolve_handler({language_id=}, {normalized_filename=})") 

309 if dispatch_table is None: 

310 filename_based_table = handler_table[""] 

311 m = filename_based_table.basename_based_lookups.get( 

312 os.path.basename(normalized_filename) 

313 ) 

314 if m is not None: 

315 return m 

316 return filename_based_table.path_name_based_lookups.get(normalized_filename) 

317 m = dispatch_table.basename_based_lookups.get( 

318 os.path.basename(normalized_filename), 

319 dispatch_table.default_handler, 

320 ) 

321 if m is not None: 

322 return m 

323 return dispatch_table.path_name_based_lookups.get( 

324 normalized_filename, dispatch_table.default_handler 

325 )