Coverage for src/debputy/lsp/lsp_features.py: 65%

152 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import collections 

2import dataclasses 

3import inspect 

4import sys 

5from typing import ( 

6 Callable, 

7 TypeVar, 

8 Sequence, 

9 Union, 

10 Dict, 

11 List, 

12 Optional, 

13 AsyncIterator, 

14 Self, 

15 Generic, 

16 Protocol, 

17 TYPE_CHECKING, 

18) 

19 

20from debputy.commands.debputy_cmd.context import CommandContext 

21from debputy.commands.debputy_cmd.output import _output_styling 

22from debputy.lsp.lsp_self_check import LSP_CHECKS 

23 

24try: 

25 from pygls.server import LanguageServer 

26 from debputy.lsp.debputy_ls import DebputyLanguageServer 

27except ImportError: 

28 pass 

29 

30from debputy.linting.lint_util import AsyncLinterImpl 

31from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls 

32from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace 

33 

34if TYPE_CHECKING: 

35 import lsprotocol.types as types 

36else: 

37 import debputy.lsprotocol.types as types 

38 

39C = TypeVar("C", bound=Callable) 

40 

41SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend( 

42 token_types=[ 

43 types.SemanticTokenTypes.Keyword.value, 

44 types.SemanticTokenTypes.EnumMember.value, 

45 types.SemanticTokenTypes.Comment.value, 

46 types.SemanticTokenTypes.String.value, 

47 types.SemanticTokenTypes.Macro.value, 

48 ], 

49 token_modifiers=[], 

50) 

51SEMANTIC_TOKEN_TYPES_IDS = { 

52 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types) 

53} 

54 

55DiagnosticHandler = Callable[ 

56 [ 

57 "DebputyLanguageServer", 

58 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"], 

59 ], 

60 AsyncIterator[Optional[List[types.Diagnostic]]], 

61] 

62 

63 

64@dataclasses.dataclass(slots=True) 

65class LanguageDispatchTable(Generic[C]): 

66 language_id: str 

67 filename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict) 

68 default_handler: Optional[C] = None 

69 

70 

71class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]): 

72 def __missing__(self, key: str) -> LanguageDispatchTable[C]: 

73 r = LanguageDispatchTable(key) 

74 self[key] = r 

75 return r 

76 

77 

78class DiagnosticHandlerProtocol(Protocol): 

79 async def __call__( 79 ↛ exitline 79 didn't jump to the function exit

80 self, 

81 ls: "DebputyLanguageServer", 

82 params: Union[ 

83 types.DidOpenTextDocumentParams, 

84 types.DidChangeTextDocumentParams, 

85 types.DocumentDiagnosticParams, 

86 ], 

87 ) -> Union[List[types.Diagnostic], None]: ... 

88 

89 

90DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = ( 

91 HandlerDispatchTable[DiagnosticHandlerProtocol]() 

92) 

93COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

94HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

95CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

96FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

97SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

98WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

99FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

100TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

101_ALIAS_OF = {} 

102 

103 

104@dataclasses.dataclass(slots=True, frozen=True) 

105class SecondaryLanguage: 

106 language_id: str 

107 filename_based_lookup: bool = False 

108 

109 

110@dataclasses.dataclass(slots=True, frozen=True) 

111class LanguageDispatchRule: 

112 primary_language_id: str 

113 filenames: Sequence[str] 

114 secondary_language_ids: Sequence[SecondaryLanguage] 

115 

116 @classmethod 

117 def new_rule( 

118 cls, 

119 primary_language_id: str, 

120 filenames: Union[str, Sequence[str]], 

121 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (), 

122 ) -> Self: 

123 return LanguageDispatchRule( 

124 primary_language_id, 

125 (filenames,) if isinstance(filenames, str) else tuple(filenames), 

126 tuple( 

127 SecondaryLanguage(l) if isinstance(l, str) else l 

128 for l in secondary_language_ids 

129 ), 

130 ) 

131 

132 

133_STANDARD_HANDLERS = { 

134 types.TEXT_DOCUMENT_FORMATTING: ( 

135 FORMAT_FILE_HANDLERS, 

136 on_save_trim_end_of_line_whitespace, 

137 ), 

138 types.TEXT_DOCUMENT_CODE_ACTION: ( 

139 CODE_ACTION_HANDLERS, 

140 provide_standard_quickfixes_from_diagnostics_ls, 

141 ), 

142 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: ( 

143 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

144 on_save_trim_end_of_line_whitespace, 

145 ), 

146} 

147 

148 

149def lint_diagnostics( 

150 file_format: LanguageDispatchRule, 

151) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]: 

152 

153 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl: 

154 if not inspect.iscoroutinefunction(func): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 raise ValueError("Linters are must be async") 

156 

157 async def _lint_wrapper( 

158 ls: "DebputyLanguageServer", 

159 params: Union[ 

160 types.DidOpenTextDocumentParams, 

161 types.DidChangeTextDocumentParams, 

162 types.DocumentDiagnosticParams, 

163 ], 

164 ) -> Optional[List[types.Diagnostic]]: 

165 doc = ls.workspace.get_text_document(params.text_document.uri) 

166 lint_state = ls.lint_state(doc) 

167 return await lint_state.run_diagnostics(func) 

168 

169 _register_handler(file_format, DIAGNOSTIC_HANDLERS, _lint_wrapper) 

170 

171 return func 

172 

173 return _wrapper 

174 

175 

176def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

177 return _registering_wrapper(file_format, COMPLETER_HANDLERS) 

178 

179 

180def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

181 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS) 

182 

183 

184def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

185 return _registering_wrapper(file_format, HOVER_HANDLERS) 

186 

187 

188def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

189 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS) 

190 

191 

192def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

193 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS) 

194 

195 

196def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

197 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS) 

198 

199 

200def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

201 return _registering_wrapper(file_format, FORMAT_FILE_HANDLERS) 

202 

203 

204def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

205 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS) 

206 

207 

208def lsp_standard_handler( 

209 file_format: LanguageDispatchRule, 

210 topic: str, 

211) -> None: 

212 res = _STANDARD_HANDLERS.get(topic) 

213 if res is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 raise ValueError(f"No standard handler for {topic}") 

215 

216 table, handler = res 

217 

218 _register_handler(file_format, table, handler) 

219 

220 

221def _registering_wrapper( 

222 file_formats: LanguageDispatchRule, 

223 handler_dict: HandlerDispatchTable[C], 

224) -> Callable[[C], C]: 

225 def _wrapper(func: C) -> C: 

226 _register_handler(file_formats, handler_dict, func) 

227 return func 

228 

229 return _wrapper 

230 

231 

232def _register_handler( 

233 file_format: LanguageDispatchRule, 

234 handler_dict: HandlerDispatchTable[C], 

235 handler: C, 

236) -> None: 

237 primary_table = handler_dict[file_format.primary_language_id] 

238 filename_based_dispatch = handler_dict[""] 

239 

240 if primary_table.default_handler is not None: 

241 raise AssertionError( 

242 f"There is already a handler for language ID {file_format.primary_language_id}" 

243 ) 

244 

245 primary_table.default_handler = handler 

246 for filename in file_format.filenames: 

247 filename_based_handler = filename_based_dispatch.filename_based_lookups.get( 

248 filename 

249 ) 

250 if filename_based_handler is not None: 

251 raise AssertionError(f"There is already a handler for filename {filename}") 

252 filename_based_dispatch.filename_based_lookups[filename] = handler 

253 

254 for secondary_language in file_format.secondary_language_ids: 

255 secondary_table = handler_dict[secondary_language.language_id] 

256 if secondary_language.filename_based_lookup: 

257 for filename in file_format.filenames: 

258 secondary_handler = secondary_table.filename_based_lookups.get(filename) 

259 if secondary_handler is not None: 

260 raise AssertionError( 

261 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}" 

262 ) 

263 secondary_table.filename_based_lookups[filename] = handler 

264 elif secondary_table.default_handler is not None: 

265 raise AssertionError( 

266 f"There is already a primary handler for language ID {secondary_language.language_id}" 

267 ) 

268 else: 

269 secondary_table.default_handler = handler 

270 

271 

272def ensure_lsp_features_are_loaded() -> None: 

273 # FIXME: This import is needed to force loading of the LSP files. But it only works 

274 # for files with a linter (which currently happens to be all of them, but this is 

275 # a bit fragile). 

276 from debputy.linting.lint_impl import LINTER_FORMATS 

277 

278 assert LINTER_FORMATS 

279 

280 

281def describe_lsp_features(context: CommandContext) -> None: 

282 fo = _output_styling(context.parsed_args, sys.stdout) 

283 ensure_lsp_features_are_loaded() 

284 

285 feature_list = [ 

286 ("diagnostics (lint)", DIAGNOSTIC_HANDLERS), 

287 ("code actions/quickfixes", CODE_ACTION_HANDLERS), 

288 ("completion suggestions", COMPLETER_HANDLERS), 

289 ("hover docs", HOVER_HANDLERS), 

290 ("folding ranges", FOLDING_RANGE_HANDLERS), 

291 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS), 

292 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS), 

293 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS), 

294 ("format file handler", FORMAT_FILE_HANDLERS), 

295 ] 

296 print("LSP language IDs and their features:") 

297 all_ids = sorted(set(lid for _, t in feature_list for lid in t)) 

298 for lang_id in all_ids: 

299 if lang_id in _ALIAS_OF: 

300 continue 

301 features = [n for n, t in feature_list if lang_id in t] 

302 print(f" * {lang_id}:") 

303 for feature in features: 

304 print(f" - {feature}") 

305 

306 aliases = collections.defaultdict(list) 

307 for lang_id in all_ids: 

308 main_lang = _ALIAS_OF.get(lang_id) 

309 if main_lang is None: 

310 continue 

311 aliases[main_lang].append(lang_id) 

312 

313 print() 

314 print("Aliases:") 

315 for main_id, aliases in aliases.items(): 

316 print(f" * {main_id}: {', '.join(aliases)}") 

317 

318 print() 

319 print("General features:") 

320 for self_check in LSP_CHECKS: 

321 is_ok = self_check.test() 

322 if is_ok: 

323 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}") 

324 else: 

325 if self_check.is_mandatory: 

326 disabled = fo.colored( 

327 "missing", 

328 fg="red", 

329 bg="black", 

330 style="bold", 

331 ) 

332 else: 

333 disabled = fo.colored( 

334 "disabled", 

335 fg="yellow", 

336 bg="black", 

337 style="bold", 

338 ) 

339 

340 if self_check.how_to_fix: 

341 print(f" * {self_check.feature}: {disabled}") 

342 print(f" - {self_check.how_to_fix}") 

343 else: 

344 problem_suffix = f" ({self_check.problem})" 

345 print(f" * {self_check.feature}: {disabled}{problem_suffix}")