Coverage for src/debputy/lsp/lsp_features.py: 65%

153 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import collections 

2import dataclasses 

3import inspect 

4import sys 

5from typing import ( 

6 Callable, 

7 TypeVar, 

8 Sequence, 

9 Union, 

10 Dict, 

11 List, 

12 Optional, 

13 AsyncIterator, 

14 Self, 

15 Generic, 

16 Protocol, 

17 Iterable, 

18 TYPE_CHECKING, 

19 Tuple, 

20) 

21 

22from debputy.commands.debputy_cmd.context import CommandContext 

23from debputy.commands.debputy_cmd.output import _output_styling 

24from debputy.lsp.lsp_self_check import LSP_CHECKS 

25 

26try: 

27 from pygls.server import LanguageServer 

28 from debputy.lsp.debputy_ls import DebputyLanguageServer 

29except ImportError: 

30 pass 

31 

32from debputy.linting.lint_util import LinterImpl 

33from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics 

34from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace 

35 

36if TYPE_CHECKING: 

37 import lsprotocol.types as types 

38else: 

39 import debputy.lsprotocol.types as types 

40 

41C = TypeVar("C", bound=Callable) 

42 

43SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend( 

44 token_types=[ 

45 types.SemanticTokenTypes.Keyword.value, 

46 types.SemanticTokenTypes.EnumMember.value, 

47 types.SemanticTokenTypes.Comment.value, 

48 types.SemanticTokenTypes.String.value, 

49 ], 

50 token_modifiers=[], 

51) 

52SEMANTIC_TOKEN_TYPES_IDS = { 

53 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types) 

54} 

55 

56DiagnosticHandler = Callable[ 

57 [ 

58 "DebputyLanguageServer", 

59 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"], 

60 ], 

61 AsyncIterator[Optional[List[types.Diagnostic]]], 

62] 

63 

64 

65@dataclasses.dataclass(slots=True) 

66class LanguageDispatchTable(Generic[C]): 

67 language_id: str 

68 filename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict) 

69 default_handler: Optional[C] = None 

70 

71 

72class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]): 

73 def __missing__(self, key: str) -> LanguageDispatchTable[C]: 

74 r = LanguageDispatchTable(key) 

75 self[key] = r 

76 return r 

77 

78 

79class DiagnosticHandlerProtocol(Protocol): 

80 async def __call__( 80 ↛ exitline 80 didn't jump to the function exit

81 self, 

82 ls: "DebputyLanguageServer", 

83 params: Union[ 

84 types.DidOpenTextDocumentParams, 

85 types.DidChangeTextDocumentParams, 

86 types.DocumentDiagnosticParams, 

87 ], 

88 ) -> Iterable[Union[List[types.Diagnostic], None]]: ... 

89 

90 

91DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = ( 

92 HandlerDispatchTable[DiagnosticHandlerProtocol]() 

93) 

94COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

95HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

96CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

97FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

98SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

99WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

100FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

101TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

102_ALIAS_OF = {} 

103 

104 

105@dataclasses.dataclass(slots=True, frozen=True) 

106class SecondaryLanguage: 

107 language_id: str 

108 filename_based_lookup: bool = False 

109 

110 

111@dataclasses.dataclass(slots=True, frozen=True) 

112class LanguageDispatchRule: 

113 primary_language_id: str 

114 filenames: Sequence[str] 

115 secondary_language_ids: Sequence[SecondaryLanguage] 

116 

117 @classmethod 

118 def new_rule( 

119 cls, 

120 primary_language_id: str, 

121 filenames: Union[str, Sequence[str]], 

122 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (), 

123 ) -> Self: 

124 return LanguageDispatchRule( 

125 primary_language_id, 

126 (filenames,) if isinstance(filenames, str) else tuple(filenames), 

127 tuple( 

128 SecondaryLanguage(l) if isinstance(l, str) else l 

129 for l in secondary_language_ids 

130 ), 

131 ) 

132 

133 

134_STANDARD_HANDLERS = { 134 ↛ exitline 134 didn't jump to the function exit

135 types.TEXT_DOCUMENT_FORMATTING: ( 

136 FORMAT_FILE_HANDLERS, 

137 on_save_trim_end_of_line_whitespace, 

138 ), 

139 types.TEXT_DOCUMENT_CODE_ACTION: ( 

140 CODE_ACTION_HANDLERS, 

141 lambda ls, params: provide_standard_quickfixes_from_diagnostics(params), 

142 ), 

143 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: ( 

144 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

145 on_save_trim_end_of_line_whitespace, 

146 ), 

147} 

148 

149LI = TypeVar("LI", DiagnosticHandlerProtocol, LinterImpl) 

150 

151 

152def lint_diagnostics( 

153 file_format: LanguageDispatchRule, 

154) -> Callable[[LI], LI]: 

155 

156 def _wrapper(func: LI) -> LI: 

157 if not inspect.iscoroutinefunction(func): 157 ↛ 172line 157 didn't jump to line 172 because the condition on line 157 was always true

158 

159 async def _lint_wrapper( 

160 ls: "DebputyLanguageServer", 

161 params: Union[ 

162 types.DidOpenTextDocumentParams, 

163 types.DidChangeTextDocumentParams, 

164 types.DocumentDiagnosticParams, 

165 ], 

166 ) -> Optional[List[types.Diagnostic]]: 

167 doc = ls.workspace.get_text_document(params.text_document.uri) 

168 lint_state = ls.lint_state(doc) 

169 yield lint_state.run_diagnostics(func) 

170 

171 else: 

172 raise ValueError("Linters are all non-async at the moment") 

173 

174 _register_handler(file_format, DIAGNOSTIC_HANDLERS, _lint_wrapper) 

175 

176 return func 

177 

178 return _wrapper 

179 

180 

181def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

182 return _registering_wrapper(file_format, COMPLETER_HANDLERS) 

183 

184 

185def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

186 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS) 

187 

188 

189def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

190 return _registering_wrapper(file_format, HOVER_HANDLERS) 

191 

192 

193def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

194 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS) 

195 

196 

197def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

198 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS) 

199 

200 

201def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

202 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS) 

203 

204 

205def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

206 return _registering_wrapper(file_format, FORMAT_FILE_HANDLERS) 

207 

208 

209def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

210 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS) 

211 

212 

213def lsp_standard_handler( 

214 file_format: LanguageDispatchRule, 

215 topic: str, 

216) -> None: 

217 res = _STANDARD_HANDLERS.get(topic) 

218 if res is None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 raise ValueError(f"No standard handler for {topic}") 

220 

221 table, handler = res 

222 

223 _register_handler(file_format, table, handler) 

224 

225 

226def _registering_wrapper( 

227 file_formats: LanguageDispatchRule, 

228 handler_dict: HandlerDispatchTable[C], 

229) -> Callable[[C], C]: 

230 def _wrapper(func: C) -> C: 

231 _register_handler(file_formats, handler_dict, func) 

232 return func 

233 

234 return _wrapper 

235 

236 

237def _register_handler( 

238 file_format: LanguageDispatchRule, 

239 handler_dict: HandlerDispatchTable[C], 

240 handler: C, 

241) -> None: 

242 primary_table = handler_dict[file_format.primary_language_id] 

243 filename_based_dispatch = handler_dict[""] 

244 

245 if primary_table.default_handler is not None: 

246 raise AssertionError( 

247 f"There is already a handler for language ID {file_format.primary_language_id}" 

248 ) 

249 

250 primary_table.default_handler = handler 

251 for filename in file_format.filenames: 

252 filename_based_handler = filename_based_dispatch.filename_based_lookups.get( 

253 filename 

254 ) 

255 if filename_based_handler is not None: 

256 raise AssertionError(f"There is already a handler for filename {filename}") 

257 filename_based_dispatch.filename_based_lookups[filename] = handler 

258 

259 for secondary_language in file_format.secondary_language_ids: 

260 secondary_table = handler_dict[secondary_language.language_id] 

261 if secondary_language.filename_based_lookup: 

262 for filename in file_format.filenames: 

263 secondary_handler = secondary_table.filename_based_lookups.get(filename) 

264 if secondary_handler is not None: 

265 raise AssertionError( 

266 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}" 

267 ) 

268 secondary_table.filename_based_lookups[filename] = handler 

269 elif secondary_table.default_handler is not None: 

270 raise AssertionError( 

271 f"There is already a primary handler for language ID {secondary_language.language_id}" 

272 ) 

273 else: 

274 secondary_table.default_handler = handler 

275 

276 

277def ensure_lsp_features_are_loaded() -> None: 

278 # FIXME: This import is needed to force loading of the LSP files. But it only works 

279 # for files with a linter (which currently happens to be all of them, but this is 

280 # a bit fragile). 

281 from debputy.linting.lint_impl import LINTER_FORMATS 

282 

283 assert LINTER_FORMATS 

284 

285 

286def describe_lsp_features(context: CommandContext) -> None: 

287 fo = _output_styling(context.parsed_args, sys.stdout) 

288 ensure_lsp_features_are_loaded() 

289 

290 feature_list = [ 

291 ("diagnostics (lint)", DIAGNOSTIC_HANDLERS), 

292 ("code actions/quickfixes", CODE_ACTION_HANDLERS), 

293 ("completion suggestions", COMPLETER_HANDLERS), 

294 ("hover docs", HOVER_HANDLERS), 

295 ("folding ranges", FOLDING_RANGE_HANDLERS), 

296 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS), 

297 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS), 

298 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS), 

299 ("format file handler", FORMAT_FILE_HANDLERS), 

300 ] 

301 print("LSP language IDs and their features:") 

302 all_ids = sorted(set(lid for _, t in feature_list for lid in t)) 

303 for lang_id in all_ids: 

304 if lang_id in _ALIAS_OF: 

305 continue 

306 features = [n for n, t in feature_list if lang_id in t] 

307 print(f" * {lang_id}:") 

308 for feature in features: 

309 print(f" - {feature}") 

310 

311 aliases = collections.defaultdict(list) 

312 for lang_id in all_ids: 

313 main_lang = _ALIAS_OF.get(lang_id) 

314 if main_lang is None: 

315 continue 

316 aliases[main_lang].append(lang_id) 

317 

318 print() 

319 print("Aliases:") 

320 for main_id, aliases in aliases.items(): 

321 print(f" * {main_id}: {', '.join(aliases)}") 

322 

323 print() 

324 print("General features:") 

325 for self_check in LSP_CHECKS: 

326 is_ok = self_check.test() 

327 if is_ok: 

328 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}") 

329 else: 

330 if self_check.is_mandatory: 

331 disabled = fo.colored( 

332 "missing", 

333 fg="red", 

334 bg="black", 

335 style="bold", 

336 ) 

337 else: 

338 disabled = fo.colored( 

339 "disabled", 

340 fg="yellow", 

341 bg="black", 

342 style="bold", 

343 ) 

344 

345 if self_check.how_to_fix: 

346 print(f" * {self_check.feature}: {disabled}") 

347 print(f" - {self_check.how_to_fix}") 

348 else: 

349 problem_suffix = f" ({self_check.problem})" 

350 print(f" * {self_check.feature}: {disabled}{problem_suffix}")