Coverage for src/debputy/lsp/lsp_dispatch.py: 53%

99 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import asyncio 

2from typing import ( 

3 Dict, 

4 Sequence, 

5 Union, 

6 Optional, 

7 TypeVar, 

8 List, 

9 TYPE_CHECKING, 

10 Callable, 

11) 

12 

13from debputy import __version__ 

14from debputy.lsp.lsp_features import ( 

15 DIAGNOSTIC_HANDLERS, 

16 COMPLETER_HANDLERS, 

17 HOVER_HANDLERS, 

18 SEMANTIC_TOKENS_FULL_HANDLERS, 

19 CODE_ACTION_HANDLERS, 

20 SEMANTIC_TOKENS_LEGEND, 

21 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

22 FORMAT_FILE_HANDLERS, 

23 C, 

24 TEXT_DOC_INLAY_HANDLERS, 

25 HandlerDispatchTable, 

26) 

27from debputy.util import _info 

28 

29_DOCUMENT_VERSION_TABLE: Dict[str, int] = {} 

30 

31 

32if TYPE_CHECKING: 

33 import lsprotocol.types as types 

34 

35 try: 

36 from pygls.server import LanguageServer 

37 from pygls.workspace import TextDocument 

38 except ImportError: 

39 pass 

40 

41 from debputy.lsp.debputy_ls import DebputyLanguageServer 

42 

43 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

44else: 

45 import debputy.lsprotocol.types as types 

46 

47 try: 

48 from pygls.server import LanguageServer 

49 from pygls.workspace import TextDocument 

50 from debputy.lsp.debputy_ls import DebputyLanguageServer 

51 

52 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

53 except ImportError: 

54 

55 class Mock: 

56 

57 def feature(self, *args, **kwargs): 

58 return lambda x: x 

59 

60 DEBPUTY_LANGUAGE_SERVER = Mock() 

61 

62 

63P = TypeVar("P") 

64R = TypeVar("R") 

65L = TypeVar("L", "LanguageServer", "DebputyLanguageServer") 

66 

67 

68def is_doc_at_version(uri: str, version: int) -> bool: 

69 dv = _DOCUMENT_VERSION_TABLE.get(uri) 

70 return dv == version 

71 

72 

73@DEBPUTY_LANGUAGE_SERVER.feature(types.INITIALIZE) 

74async def _on_initialize( 

75 ls: "DebputyLanguageServer", 

76 params: types.InitializeParams, 

77) -> None: 

78 await ls.on_initialize(params) 

79 

80 

81@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_OPEN) 

82async def _open_document( 

83 ls: "DebputyLanguageServer", 

84 params: types.DidChangeTextDocumentParams, 

85) -> None: 

86 await _open_or_changed_document(ls, params) 

87 

88 

89@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_DID_CHANGE) 

90async def _changed_document( 

91 ls: "DebputyLanguageServer", 

92 params: types.DidChangeTextDocumentParams, 

93) -> None: 

94 await _open_or_changed_document(ls, params) 

95 

96 

97async def _open_or_changed_document( 

98 ls: "DebputyLanguageServer", 

99 params: Union[types.DidOpenTextDocumentParams, types.DidChangeTextDocumentParams], 

100) -> None: 

101 doc_uri = params.text_document.uri 

102 doc = ls.workspace.get_text_document(doc_uri) 

103 

104 def _diag_publisher( 

105 uri: str, 

106 diagnostics: Optional[List[types.Diagnostic]], 

107 version: int, 

108 _is_partial: bool, 

109 ) -> None: 

110 ls.publish_diagnostics(uri, diagnostics, version) 

111 

112 await _diagnostics_for( 

113 ls, 

114 doc, 

115 params.text_document.version, 

116 params, 

117 _diag_publisher, 

118 ) 

119 

120 

121async def _diagnostics_for( 

122 ls: "DebputyLanguageServer", 

123 doc: "TextDocument", 

124 expected_version: int, 

125 params: Union[ 

126 types.DidOpenTextDocumentParams, 

127 types.DidChangeTextDocumentParams, 

128 types.DocumentDiagnosticParams, 

129 ], 

130 publisher: Callable[[str, Optional[List["types.Diagnostic"]], int, bool], None], 

131) -> None: 

132 doc_uri = doc.uri 

133 _DOCUMENT_VERSION_TABLE[doc_uri] = expected_version 

134 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

135 handler = _resolve_handler(DIAGNOSTIC_HANDLERS, language_id, normalized_filename) 

136 if handler is None: 

137 _info( 

138 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}," 

139 f" normalized filename: {normalized_filename}) - no diagnostics handler" 

140 ) 

141 return 

142 _info( 

143 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}, normalized filename: {normalized_filename})" 

144 f" - running diagnostics for doc version {expected_version}" 

145 ) 

146 last_publish_count = -1 

147 

148 diagnostics_scanner = handler(ls, params) 

149 diagnostics: Optional[List[types.Diagnostic]] = None 

150 async for diagnostics in diagnostics_scanner: 

151 await asyncio.sleep(0) 

152 if not is_doc_at_version(doc_uri, expected_version): 

153 # This basically happens with very edit, so lets not notify the client 

154 # for that. 

155 _info( 

156 f"Cancel (obsolete) diagnostics for doc version {expected_version}: document version changed" 

157 ) 

158 break 

159 if diagnostics is None or last_publish_count != len(diagnostics): 

160 last_publish_count = len(diagnostics) if diagnostics is not None else 0 

161 publisher( 

162 doc.uri, 

163 diagnostics, 

164 expected_version, 

165 True, 

166 ) 

167 publisher( 

168 doc.uri, 

169 diagnostics, 

170 expected_version, 

171 False, 

172 ) 

173 

174 

175@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_COMPLETION) 

176def _completions( 

177 ls: "DebputyLanguageServer", 

178 params: types.CompletionParams, 

179) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]: 

180 return _dispatch_standard_handler( 

181 ls, 

182 params.text_document.uri, 

183 params, 

184 COMPLETER_HANDLERS, 

185 "Complete request", 

186 ) 

187 

188 

189@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_HOVER) 

190def _hover( 

191 ls: "DebputyLanguageServer", 

192 params: types.CompletionParams, 

193) -> Optional[types.Hover]: 

194 return _dispatch_standard_handler( 

195 ls, 

196 params.text_document.uri, 

197 params, 

198 HOVER_HANDLERS, 

199 "Hover doc request", 

200 ) 

201 

202 

203@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_INLAY_HINT) 

204def _doc_inlay_hint( 

205 ls: "DebputyLanguageServer", 

206 params: types.InlayHintParams, 

207) -> Optional[List[types.InlayHint]]: 

208 return _dispatch_standard_handler( 

209 ls, 

210 params.text_document.uri, 

211 params, 

212 TEXT_DOC_INLAY_HANDLERS, 

213 "Inlay hint (doc) request", 

214 ) 

215 

216 

217@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_CODE_ACTION) 

218def _code_actions( 

219 ls: "DebputyLanguageServer", 

220 params: types.CodeActionParams, 

221) -> Optional[List[Union[types.Command, types.CodeAction]]]: 

222 return _dispatch_standard_handler( 

223 ls, 

224 params.text_document.uri, 

225 params, 

226 CODE_ACTION_HANDLERS, 

227 "Code action request", 

228 ) 

229 

230 

231@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FOLDING_RANGE) 

232def _folding_ranges( 

233 ls: "DebputyLanguageServer", 

234 params: types.FoldingRangeParams, 

235) -> Optional[Sequence[types.FoldingRange]]: 

236 return _dispatch_standard_handler( 

237 ls, 

238 params.text_document.uri, 

239 params, 

240 HOVER_HANDLERS, 

241 "Folding range request", 

242 ) 

243 

244 

245@DEBPUTY_LANGUAGE_SERVER.feature( 

246 types.TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

247 types.SemanticTokensRegistrationOptions( 

248 SEMANTIC_TOKENS_LEGEND, 

249 full=True, 

250 ), 

251) 

252def _semantic_tokens_full( 

253 ls: "DebputyLanguageServer", 

254 params: types.SemanticTokensParams, 

255) -> Optional[types.SemanticTokens]: 

256 return _dispatch_standard_handler( 

257 ls, 

258 params.text_document.uri, 

259 params, 

260 SEMANTIC_TOKENS_FULL_HANDLERS, 

261 "Semantic tokens request", 

262 ) 

263 

264 

265@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL) 

266def _will_save_wait_until( 

267 ls: "DebputyLanguageServer", 

268 params: types.WillSaveTextDocumentParams, 

269) -> Optional[Sequence[types.TextEdit]]: 

270 return _dispatch_standard_handler( 

271 ls, 

272 params.text_document.uri, 

273 params, 

274 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

275 "On-save formatting", 

276 ) 

277 

278 

279@DEBPUTY_LANGUAGE_SERVER.feature(types.TEXT_DOCUMENT_FORMATTING) 

280def _format_document( 

281 ls: "DebputyLanguageServer", 

282 params: types.WillSaveTextDocumentParams, 

283) -> Optional[Sequence[types.TextEdit]]: 

284 return _dispatch_standard_handler( 

285 ls, 

286 params.text_document.uri, 

287 params, 

288 FORMAT_FILE_HANDLERS, 

289 "Full document formatting", 

290 ) 

291 

292 

293def _dispatch_standard_handler( 

294 ls: "DebputyLanguageServer", 

295 doc_uri: str, 

296 params: P, 

297 handler_table: HandlerDispatchTable[C], 

298 request_type: str, 

299) -> Optional[R]: 

300 doc = ls.workspace.get_text_document(doc_uri) 

301 

302 id_source, language_id, normalized_filename = ls.determine_language_id(doc) 

303 handler = _resolve_handler(handler_table, language_id, normalized_filename) 

304 if handler is None: 

305 _info( 

306 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

307 f" normalized filename: {normalized_filename}) - no handler" 

308 ) 

309 return None 

310 _info( 

311 f"{request_type} for document: {doc.path} ({language_id}, {id_source}," 

312 f" normalized filename: {normalized_filename}) - delegating to handler" 

313 ) 

314 

315 return handler( 

316 ls, 

317 params, 

318 ) 

319 

320 

321def _resolve_handler( 

322 handler_table: HandlerDispatchTable[C], 

323 language_id: str, 

324 normalized_filename: str, 

325) -> Optional[C]: 

326 dispatch_table = handler_table.get(language_id) if language_id != "" else None 

327 _info(f"LID: {language_id} - {dispatch_table}") 

328 if dispatch_table is None: 

329 filename_based_table = handler_table[""] 

330 return filename_based_table.filename_based_lookups.get(normalized_filename) 

331 return dispatch_table.filename_based_lookups.get( 

332 normalized_filename, dispatch_table.default_handler 

333 )