Coverage for src/debputy/lsp/lsp_features.py: 69%

183 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import collections 

2import dataclasses 

3import inspect 

4import sys 

5from typing import ( 

6 Callable, 

7 TypeVar, 

8 Sequence, 

9 Union, 

10 Dict, 

11 List, 

12 Optional, 

13 AsyncIterator, 

14 Self, 

15 Generic, 

16 Protocol, 

17 TYPE_CHECKING, 

18 Literal, 

19) 

20 

21from debputy.commands.debputy_cmd.context import CommandContext 

22from debputy.commands.debputy_cmd.output import _output_styling 

23from debputy.lsp.lsp_self_check import LSP_CHECKS 

24 

25try: 

26 from pygls.server import LanguageServer 

27 from debputy.lsp.debputy_ls import DebputyLanguageServer 

28except ImportError: 

29 pass 

30 

31from debputy.linting.lint_util import AsyncLinterImpl, LintState 

32from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls 

33from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace 

34 

35if TYPE_CHECKING: 

36 import lsprotocol.types as types 

37 

38 Reformatter = Callable[[LintState], Optional[Sequence[types.TextEdit]]] 

39else: 

40 import debputy.lsprotocol.types as types 

41 

42C = TypeVar("C", bound=Callable) 

43 

44SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend( 

45 token_types=[ 

46 types.SemanticTokenTypes.Keyword.value, 

47 types.SemanticTokenTypes.EnumMember.value, 

48 types.SemanticTokenTypes.Comment.value, 

49 types.SemanticTokenTypes.String.value, 

50 types.SemanticTokenTypes.Macro.value, 

51 types.SemanticTokenTypes.Operator.value, 

52 types.SemanticTokenTypes.TypeParameter.value, 

53 types.SemanticTokenTypes.Variable.value, 

54 ], 

55 token_modifiers=[], 

56) 

57SEMANTIC_TOKEN_TYPES_IDS = { 

58 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types) 

59} 

60 

61DiagnosticHandler = Callable[ 

62 [ 

63 "DebputyLanguageServer", 

64 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"], 

65 ], 

66 AsyncIterator[Optional[List[types.Diagnostic]]], 

67] 

68 

69 

70@dataclasses.dataclass(slots=True) 

71class LanguageDispatchTable(Generic[C]): 

72 language_id: str 

73 basename_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict) 

74 path_name_based_lookups: Dict[str, C] = dataclasses.field(default_factory=dict) 

75 default_handler: Optional[C] = None 

76 

77 

78class HandlerDispatchTable(Generic[C], Dict[str, LanguageDispatchTable[C]]): 

79 def __missing__(self, key: str) -> LanguageDispatchTable[C]: 

80 r = LanguageDispatchTable(key) 

81 self[key] = r 

82 return r 

83 

84 

85class DiagnosticHandlerProtocol(Protocol): 

86 async def __call__( 86 ↛ exitline 86 didn't return from function '__call__' because

87 self, 

88 ls: "DebputyLanguageServer", 

89 params: Union[ 

90 types.DidOpenTextDocumentParams, 

91 types.DidChangeTextDocumentParams, 

92 types.DocumentDiagnosticParams, 

93 ], 

94 ) -> Union[List[types.Diagnostic], None]: ... 

95 

96 

97CLI_DIAGNOSTIC_HANDLERS: Dict[str, AsyncLinterImpl] = {} 

98CLI_FORMAT_FILE_HANDLERS: Dict[str, "Reformatter"] = {} 

99 

100 

101LSP_DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = ( 

102 HandlerDispatchTable[DiagnosticHandlerProtocol]() 

103) 

104COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

105HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

106CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

107FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

108SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

109WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

110LSP_FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

111TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

112_ALIAS_OF = {} 

113 

114 

115@dataclasses.dataclass(slots=True, frozen=True) 

116class BasenameMatchingRule: 

117 rule_type: Literal["basename", "extension"] 

118 value: str 

119 

120 

121@dataclasses.dataclass(slots=True, frozen=True) 

122class SecondaryLanguage: 

123 language_id: str 

124 secondary_lookup: Optional[Literal["path-name", "basename"]] = None 

125 

126 

127@dataclasses.dataclass(slots=True, frozen=True) 

128class LanguageDispatchRule: 

129 primary_language_id: str 

130 basename: Optional[str] 

131 path_names: Sequence[str] 

132 secondary_language_ids: Sequence[SecondaryLanguage] 

133 is_debsrc_packaging_file: bool 

134 

135 @classmethod 

136 def new_rule( 

137 cls, 

138 primary_language_id: str, 

139 basename: Optional[str], 

140 path_names: Union[str, Sequence[str]], 

141 secondary_language_ids: Sequence[Union[SecondaryLanguage, str]] = (), 

142 ) -> Self: 

143 path_names_as_seq: Sequence[str] = ( 

144 (path_names,) if isinstance(path_names, str) else tuple(path_names) 

145 ) 

146 is_debsrc_packaging_file = any( 

147 pn.startswith("debian/") for pn in path_names_as_seq 

148 ) 

149 return LanguageDispatchRule( 

150 primary_language_id, 

151 basename, 

152 path_names_as_seq, 

153 tuple( 

154 SecondaryLanguage(l) if isinstance(l, str) else l 

155 for l in secondary_language_ids 

156 ), 

157 is_debsrc_packaging_file, 

158 ) 

159 

160 

161_STANDARD_HANDLERS = { 

162 types.TEXT_DOCUMENT_FORMATTING: ( 

163 LSP_FORMAT_FILE_HANDLERS, 

164 on_save_trim_end_of_line_whitespace, 

165 ), 

166 types.TEXT_DOCUMENT_CODE_ACTION: ( 

167 CODE_ACTION_HANDLERS, 

168 provide_standard_quickfixes_from_diagnostics_ls, 

169 ), 

170 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: ( 

171 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

172 on_save_trim_end_of_line_whitespace, 

173 ), 

174} 

175 

176 

177def lint_diagnostics( 

178 file_format: LanguageDispatchRule, 

179) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]: 

180 

181 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl: 

182 if not inspect.iscoroutinefunction(func): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 raise ValueError("Linters are must be async") 

184 

185 async def _lint_wrapper( 

186 ls: "DebputyLanguageServer", 

187 params: Union[ 

188 types.DidOpenTextDocumentParams, 

189 types.DidChangeTextDocumentParams, 

190 types.DocumentDiagnosticParams, 

191 ], 

192 ) -> Optional[List[types.Diagnostic]]: 

193 doc = ls.workspace.get_text_document(params.text_document.uri) 

194 lint_state = ls.lint_state(doc) 

195 return await lint_state.run_diagnostics(func) 

196 

197 _register_handler(file_format, LSP_DIAGNOSTIC_HANDLERS, _lint_wrapper) 

198 if file_format.is_debsrc_packaging_file: 

199 for path_name in file_format.path_names: 

200 if path_name.startswith("debian/"): 200 ↛ 199line 200 didn't jump to line 199 because the condition on line 200 was always true

201 CLI_DIAGNOSTIC_HANDLERS[path_name] = func 

202 

203 return func 

204 

205 return _wrapper 

206 

207 

208def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

209 return _registering_wrapper(file_format, COMPLETER_HANDLERS) 

210 

211 

212def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

213 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS) 

214 

215 

216def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

217 return _registering_wrapper(file_format, HOVER_HANDLERS) 

218 

219 

220def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

221 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS) 

222 

223 

224def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

225 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS) 

226 

227 

228def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

229 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS) 

230 

231 

232def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

233 return _registering_wrapper(file_format, LSP_FORMAT_FILE_HANDLERS) 

234 

235 

236def lsp_cli_reformat_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

237 def _wrapper(func: C) -> C: 

238 for path_name in file_format.path_names: 

239 if path_name.startswith("debian/"): 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true

240 CLI_FORMAT_FILE_HANDLERS[path_name] = func 

241 return func 

242 

243 return _wrapper 

244 

245 

246def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

247 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS) 

248 

249 

250def lsp_standard_handler( 

251 file_format: LanguageDispatchRule, 

252 topic: str, 

253) -> None: 

254 res = _STANDARD_HANDLERS.get(topic) 

255 if res is None: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 raise ValueError(f"No standard handler for {topic}") 

257 

258 table, handler = res 

259 

260 _register_handler(file_format, table, handler) 

261 

262 

263def _registering_wrapper( 

264 file_formats: LanguageDispatchRule, 

265 handler_dict: HandlerDispatchTable[C], 

266) -> Callable[[C], C]: 

267 def _wrapper(func: C) -> C: 

268 _register_handler(file_formats, handler_dict, func) 

269 return func 

270 

271 return _wrapper 

272 

273 

274def _register_handler( 

275 file_format: LanguageDispatchRule, 

276 handler_dict: HandlerDispatchTable[C], 

277 handler: C, 

278) -> None: 

279 primary_table = handler_dict[file_format.primary_language_id] 

280 filename_based_dispatch = handler_dict[""] 

281 

282 if primary_table.default_handler is not None: 

283 raise AssertionError( 

284 f"There is already a handler for language ID {file_format.primary_language_id}" 

285 ) 

286 

287 primary_table.default_handler = handler 

288 for filename in file_format.path_names: 

289 filename_based_handler = filename_based_dispatch.path_name_based_lookups.get( 

290 filename 

291 ) 

292 if filename_based_handler is not None: 

293 raise AssertionError(f"There is already a handler for filename {filename}") 

294 filename_based_dispatch.path_name_based_lookups[filename] = handler 

295 

296 for secondary_language in file_format.secondary_language_ids: 

297 secondary_table = handler_dict[secondary_language.language_id] 

298 if secondary_language.secondary_lookup == "path-name": 

299 if not file_format.path_names: 

300 raise AssertionError( 

301 f"secondary_lookup=path-name requires the language to have path-names. Please correct definition of {file_format.primary_language_id}" 

302 ) 

303 for filename in file_format.path_names: 

304 secondary_handler = secondary_table.path_name_based_lookups.get( 

305 filename 

306 ) 

307 if secondary_handler is not None: 

308 raise AssertionError( 

309 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}" 

310 ) 

311 secondary_table.path_name_based_lookups[filename] = handler 

312 elif secondary_language.secondary_lookup == "basename": 

313 basename = file_format.basename 

314 if not basename: 

315 raise AssertionError( 

316 f"secondary_lookup=basename requires the language to have a basename. Please correct definition of {file_format.primary_language_id}" 

317 ) 

318 secondary_handler = secondary_table.basename_based_lookups.get(basename) 

319 if secondary_handler is not None: 

320 raise AssertionError( 

321 f"There is already a handler for basename {basename} under language ID {secondary_language.language_id}" 

322 ) 

323 secondary_table.basename_based_lookups[basename] = handler 

324 elif secondary_table.default_handler is not None: 

325 raise AssertionError( 

326 f"There is already a primary handler for language ID {secondary_language.language_id}" 

327 ) 

328 else: 

329 secondary_table.default_handler = handler 

330 

331 

332def ensure_cli_lsp_features_are_loaded() -> None: 

333 # These imports are needed to force loading of the LSP files. The relevant registration 

334 # happens as a side effect of the imports. 

335 import debputy.lsp.languages as lsp_languages 

336 from debputy.linting.lint_impl import LINTER_FORMATS 

337 

338 # Ensure no static analysis tool is temped to optimize out the imports. We need them 

339 # for the side effect. 

340 assert lsp_languages 

341 assert LINTER_FORMATS 

342 

343 

344def describe_lsp_features(context: CommandContext) -> None: 

345 fo = _output_styling(context.parsed_args, sys.stdout) 

346 ensure_cli_lsp_features_are_loaded() 

347 

348 feature_list = [ 

349 ("diagnostics (lint)", LSP_DIAGNOSTIC_HANDLERS), 

350 ("code actions/quickfixes", CODE_ACTION_HANDLERS), 

351 ("completion suggestions", COMPLETER_HANDLERS), 

352 ("hover docs", HOVER_HANDLERS), 

353 ("folding ranges", FOLDING_RANGE_HANDLERS), 

354 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS), 

355 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS), 

356 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS), 

357 ("format file handler", LSP_FORMAT_FILE_HANDLERS), 

358 ] 

359 print("LSP language IDs and their features:") 

360 all_ids = sorted(set(lid for _, t in feature_list for lid in t)) 

361 for lang_id in all_ids: 

362 if lang_id in _ALIAS_OF: 

363 continue 

364 features = [n for n, t in feature_list if lang_id in t] 

365 print(f" * {lang_id}:") 

366 for feature in features: 

367 print(f" - {feature}") 

368 

369 aliases = collections.defaultdict(list) 

370 for lang_id in all_ids: 

371 main_lang = _ALIAS_OF.get(lang_id) 

372 if main_lang is None: 

373 continue 

374 aliases[main_lang].append(lang_id) 

375 

376 print() 

377 print("Aliases:") 

378 for main_id, aliases in aliases.items(): 

379 print(f" * {main_id}: {', '.join(aliases)}") 

380 

381 print() 

382 print("General features:") 

383 for self_check in LSP_CHECKS: 

384 is_ok = self_check.test() 

385 if is_ok: 

386 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}") 

387 else: 

388 if self_check.is_mandatory: 

389 disabled = fo.colored( 

390 "missing", 

391 fg="red", 

392 bg="black", 

393 style="bold", 

394 ) 

395 else: 

396 disabled = fo.colored( 

397 "disabled", 

398 fg="yellow", 

399 bg="black", 

400 style="bold", 

401 ) 

402 

403 if self_check.how_to_fix: 

404 print(f" * {self_check.feature}: {disabled}") 

405 print(f" - {self_check.how_to_fix}") 

406 else: 

407 problem_suffix = f" ({self_check.problem})" 

408 print(f" * {self_check.feature}: {disabled}{problem_suffix}")