Coverage for src/debputy/lsp/lsp_dispatch.py: 58%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import inspect 

2from typing import ( 

3 Sequence, 

4 Union, 

5 Optional, 

6 TypeVar, 

7 List, 

8 TYPE_CHECKING, 

9 Callable, 

10 Literal, 

11) 

12 

13from debputy import __version__ 

14from debputy.linting.lint_util import AbortTaskError 

15from debputy.lsp.lsp_features import ( 

16 DIAGNOSTIC_HANDLERS, 

17 COMPLETER_HANDLERS, 

18 HOVER_HANDLERS, 

19 SEMANTIC_TOKENS_FULL_HANDLERS, 

20 CODE_ACTION_HANDLERS, 

21 SEMANTIC_TOKENS_LEGEND, 

22 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

23 FORMAT_FILE_HANDLERS, 

24 C, 

25 TEXT_DOC_INLAY_HANDLERS, 

26 HandlerDispatchTable, 

27) 

28from debputy.util import _info, _trace_log 

29 

30if TYPE_CHECKING: 

31 import lsprotocol.types as types 

32 

33 try: 

34 from pygls.server import LanguageServer 

35 from pygls.workspace import TextDocument 

36 except ImportError: 

37 pass 

38 

39 from debputy.lsp.debputy_ls import DebputyLanguageServer 

40 

41 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

42else: 

43 import debputy.lsprotocol.types as types 

44 

45 try: 

46 from pygls.server import LanguageServer 

47 from pygls.workspace import TextDocument 

48 from debputy.lsp.debputy_ls import DebputyLanguageServer 

49 

50 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

51 except ImportError: 

52 

53 class Mock: 

54 

55 def feature(self, *args, **kwargs): 

56 return lambda x: x 

57 

58 DEBPUTY_LANGUAGE_SERVER = Mock() 

59 

60 

61P = TypeVar("P") 

62R = TypeVar("R") 

63L = TypeVar("L", "LanguageServer", "DebputyLanguageServer") 

64 

65 

66@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE) 

67async def _on_initialize( 

68 ls: "DebputyLanguageServer", 

69 params: types.InitializeParams, 

70) -> None: 

71 await ls.on_initialize(params) 

72 

73 

74@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN) 

75async def _open_document( 

76 ls: "DebputyLanguageServer", 

77 params: types.DidChangeTextDocumentParams, 

78) -> None: 

79 await _open_or_changed_document(ls, params, "Opened") 

80 

81 

82@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE) 

83async def _changed_document( 

84 ls: "DebputyLanguageServer", 

85 params: types.DidChangeTextDocumentParams, 

86) -> None: 

87 await _open_or_changed_document(ls, params, "Changed") 

88 

89 

90@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CLOSE) 

91def _changed_document( 

92 ls: "DebputyLanguageServer", 

93 params: types.DidCloseTextDocumentParams, 

94) -> None: 

95 ls.close_document(params.text_document.uri) 

96 

97 

98async def _open_or_changed_document( 

99 ls: "DebputyLanguageServer", 

100 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams], 

101 event_name: Literal["Opened", "Changed"], 

102) -> None: 

103 doc_uri = params.text_document.uri 

104 doc = ls.workspace.get_text_document(doc_uri) 

105 

106 await _diagnostics_for( 

107 ls, 

108 doc, 

109 params.text_document.version, 

110 params, 

111 event_name, 

112 ) 

113 

114 

115async def _diagnostics_for( 

116 ls: "DebputyLanguageServer", 

117 doc: "TextDocument", 

118 expected_version: int, 

119 params: Union[ 

120 types.DidOpenTextDocumentParams, 

121 types.DidChangeTextDocumentParams, 

122 types.DocumentDiagnosticParams, 

123 ], 

124 event_name: Literal["Opened", "Changed"], 

125) -> None: 

126 doc_uri = doc.uri 

127 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

128 log_func = _info if event_name == "Opened" else _trace_log 

129 handler = _resolve_handler( 

130 DIAGNOSTIC_HANDLERS, 

131 language_id, 

132 normalized_filename, 

133 log_func, 

134 ) 

135 if handler is None: 

136 log_func( 

137 f"{event_name} document: {doc.path} ({language_id}, {id_source}," 

138 f" normalized filename: {normalized_filename}) - no diagnostics handler" 

139 ) 

140 return 

141 log_func( 

142 f"{event_name} document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})" 

143 f" - running diagnostics for doc version {expected_version}" 

144 ) 

145 

146 try: 

147 diagnostics = await handler(ls, params) 

148 except AbortTaskError as e: 

149 _trace_log(f"Aborted lint task: {e.message}") 

150 return 

151 

152 ls.record_diagnostics(doc_uri, expected_version, diagnostics, False) 

153 

154 

155@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION) 

156async def _completions( 

157 ls: "DebputyLanguageServer", 

158 params: types.CompletionParams, 

159) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]: 

160 return await _dispatch_standard_handler( 

161 ls, 

162 params.text_document.uri, 

163 params, 

164 COMPLETER_HANDLERS, 

165 "Complete request", 

166 ) 

167 

168 

169@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER) 

170async def _hover( 

171 ls: "DebputyLanguageServer", 

172 params: types.CompletionParams, 

173) -> Optional[types.Hover]: 

174 return await _dispatch_standard_handler( 

175 ls, 

176 params.text_document.uri, 

177 params, 

178 HOVER_HANDLERS, 

179 "Hover doc request", 

180 ) 

181 

182 

183@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT) 

184async def _doc_inlay_hint( 

185 ls: "DebputyLanguageServer", 

186 params: types.InlayHintParams, 

187) -> Optional[List[types.InlayHint]]: 

188 return await _dispatch_standard_handler( 

189 ls, 

190 params.text_document.uri, 

191 params, 

192 TEXT_DOC_INLAY_HANDLERS, 

193 "Inlay hint (doc) request", 

194 ) 

195 

196 

197@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION) 

198async def _code_actions( 

199 ls: "DebputyLanguageServer", 

200 params: types.CodeActionParams, 

201) -> Optional[List[Union[types.Command, types.CodeAction]]]: 

202 return await _dispatch_standard_handler( 

203 ls, 

204 params.text_document.uri, 

205 params, 

206 CODE_ACTION_HANDLERS, 

207 "Code action request", 

208 ) 

209 

210 

211@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE) 

212async def _folding_ranges( 

213 ls: "DebputyLanguageServer", 

214 params: types.FoldingRangeParams, 

215) -> Optional[Sequence[types.FoldingRange]]: 

216 return await _dispatch_standard_handler( 

217 ls, 

218 params.text_document.uri, 

219 params, 

220 HOVER_HANDLERS, 

221 "Folding range request", 

222 ) 

223 

224 

225@DEBPUTY_LANGUAGE_SERVER.feature( 

226 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

227 types.SemanticTokensRegistrationOptions( 

228 SEMANTIC_TOKENS_LEGEND, 

229 full=True, 

230 ), 

231) 

232async def _semantic_tokens_full( 

233 ls: "DebputyLanguageServer", 

234 params: types.SemanticTokensParams, 

235) -> Optional[types.SemanticTokens]: 

236 return await _dispatch_standard_handler( 

237 ls, 

238 params.text_document.uri, 

239 params, 

240 SEMANTIC_TOKENS_FULL_HANDLERS, 

241 "Semantic tokens request", 

242 ) 

243 

244 

245@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL) 

246async def _will_save_wait_until( 

247 ls: "DebputyLanguageServer", 

248 params: types.WillSaveTextDocumentParams, 

249) -> Optional[Sequence[types.TextEdit]]: 

250 return await _dispatch_standard_handler( 

251 ls, 

252 params.text_document.uri, 

253 params, 

254 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

255 "On-save formatting", 

256 ) 

257 

258 

259@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING) 

260async def _format_document( 

261 ls: "DebputyLanguageServer", 

262 params: types.WillSaveTextDocumentParams, 

263) -> Optional[Sequence[types.TextEdit]]: 

264 return await _dispatch_standard_handler( 

265 ls, 

266 params.text_document.uri, 

267 params, 

268 FORMAT_FILE_HANDLERS, 

269 "Full document formatting", 

270 ) 

271 

272 

273async def _dispatch_standard_handler( 

274 ls: "DebputyLanguageServer", 

275 doc_uri: str, 

276 params: P, 

277 handler_table: HandlerDispatchTable[C], 

278 request_type: str, 

279) -> Optional[R]: 

280 doc = ls.workspace.get_text_document(doc_uri) 

281 

282 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

283 handler = _resolve_handler(handler_table, language_id, normalized_filename, _info) 

284 if handler is None: 

285 _info( 

286 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

287 f" normalized filename: {normalized_filename}) - no handler" 

288 ) 

289 return None 

290 _info( 

291 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

292 f" normalized filename: {normalized_filename}) - delegating to handler" 

293 ) 

294 

295 if inspect.iscoroutinefunction(handler): 

296 return await handler(ls, params) 

297 return handler(ls, params) 

298 

299 

300def _resolve_handler( 

301 handler_table: HandlerDispatchTable[C], 

302 language_id: str, 

303 normalized_filename: str, 

304 log_func: Callable[[str], None], 

305) -> Optional[C]: 

306 dispatch_table = handler_table.get(language_id) if language_id != "" else None 

307 log_func(f"resolve_handler({language_id=}, {normalized_filename=})") 

308 if dispatch_table is None: 

309 filename_based_table = handler_table[""] 

310 return filename_based_table.filename_based_lookups.get(normalized_filename) 

311 return dispatch_table.filename_based_lookups.get( 

312 normalized_filename, dispatch_table.default_handler 

313 )