Coverage for src/debputy/lsp/lsp_features.py: 70%

187 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import collections 

2import dataclasses 

3import inspect 

4import sys 

5from typing import ( 

6 TypeVar, 

7 Union, 

8 Dict, 

9 List, 

10 Optional, 

11 Self, 

12 Generic, 

13 Protocol, 

14 TYPE_CHECKING, 

15 Literal, 

16) 

17from collections.abc import Callable, Sequence, AsyncIterator 

18 

19from debputy.commands.debputy_cmd.context import CommandContext 

20from debputy.commands.debputy_cmd.output import _output_styling 

21from debputy.lsp.lsp_self_check import LSP_CHECKS 

22 

23try: 

24 from pygls.server import LanguageServer 

25 from debputy.lsp.debputy_ls import DebputyLanguageServer 

26except ImportError: 

27 pass 

28 

29from debputy.linting.lint_util import AsyncLinterImpl, LintState 

30from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics_ls 

31from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace 

32 

33if TYPE_CHECKING: 

34 import lsprotocol.types as types 

35 

36 Reformatter = Callable[[LintState], Optional[Sequence[types.TextEdit]]] 

37else: 

38 import debputy.lsprotocol.types as types 

39 

40C = TypeVar("C", bound=Callable) 

41 

42SEMANTIC_TOKENS_LEGEND = types.SemanticTokensLegend( 

43 token_types=[ 

44 types.SemanticTokenTypes.Keyword.value, 

45 types.SemanticTokenTypes.EnumMember.value, 

46 types.SemanticTokenTypes.Comment.value, 

47 types.SemanticTokenTypes.String.value, 

48 types.SemanticTokenTypes.Macro.value, 

49 types.SemanticTokenTypes.Operator.value, 

50 types.SemanticTokenTypes.TypeParameter.value, 

51 types.SemanticTokenTypes.Variable.value, 

52 ], 

53 token_modifiers=[], 

54) 

55SEMANTIC_TOKEN_TYPES_IDS = { 

56 t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types) 

57} 

58 

59DiagnosticHandler = Callable[ 

60 [ 

61 "DebputyLanguageServer", 

62 Union["types.DidOpenTextDocumentParams", "types.DidChangeTextDocumentParams"], 

63 ], 

64 AsyncIterator[Optional[list[types.Diagnostic]]], 

65] 

66 

67 

68@dataclasses.dataclass(slots=True) 

69class LanguageDispatchTable(Generic[C]): 

70 language_id: str 

71 basename_based_lookups: dict[str, C] = dataclasses.field(default_factory=dict) 

72 path_name_based_lookups: dict[str, C] = dataclasses.field(default_factory=dict) 

73 default_handler: C | None = None 

74 

75 

76class HandlerDispatchTable(Generic[C], dict[str, LanguageDispatchTable[C]]): 

77 def __missing__(self, key: str) -> LanguageDispatchTable[C]: 

78 r = LanguageDispatchTable(key) 

79 self[key] = r 

80 return r 

81 

82 

83class DiagnosticHandlerProtocol(Protocol): 

84 async def __call__( 84 ↛ exitline 84 didn't return from function '__call__' because

85 self, 

86 ls: "DebputyLanguageServer", 

87 params: ( 

88 types.DidOpenTextDocumentParams 

89 | types.DidChangeTextDocumentParams 

90 | types.DocumentDiagnosticParams 

91 ), 

92 ) -> list[types.Diagnostic] | None: ... 

93 

94 

95CLI_DIAGNOSTIC_HANDLERS: dict[str, AsyncLinterImpl] = {} 

96CLI_FORMAT_FILE_HANDLERS: dict[str, "Reformatter"] = {} 

97 

98 

99LSP_DIAGNOSTIC_HANDLERS: HandlerDispatchTable[DiagnosticHandlerProtocol] = ( 

100 HandlerDispatchTable[DiagnosticHandlerProtocol]() 

101) 

102COMPLETER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

103HOVER_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

104CODE_ACTION_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

105FOLDING_RANGE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

106SEMANTIC_TOKENS_FULL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

107WILL_SAVE_WAIT_UNTIL_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

108LSP_FORMAT_FILE_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

109TEXT_DOC_INLAY_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

110DOCUMENT_LINK_HANDLERS: HandlerDispatchTable[C] = HandlerDispatchTable() 

111_ALIAS_OF = {} 

112 

113 

114@dataclasses.dataclass(slots=True, frozen=True) 

115class BasenameMatchingRule: 

116 rule_type: Literal["basename", "extension"] 

117 value: str 

118 

119 

120@dataclasses.dataclass(slots=True, frozen=True) 

121class SecondaryLanguage: 

122 language_id: str 

123 secondary_lookup: Literal["path-name", "basename"] | None = None 

124 

125 

126@dataclasses.dataclass(slots=True, frozen=True) 

127class LanguageDispatchRule: 

128 primary_language_id: str 

129 basename: str | None 

130 path_names: Sequence[str] 

131 secondary_language_ids: Sequence[SecondaryLanguage] 

132 is_debsrc_packaging_file: bool 

133 

134 @classmethod 

135 def new_rule( 

136 cls, 

137 primary_language_id: str, 

138 basename: str | None, 

139 path_names: str | Sequence[str], 

140 secondary_language_ids: Sequence[SecondaryLanguage | str] = (), 

141 ) -> Self: 

142 path_names_as_seq: Sequence[str] = ( 

143 (path_names,) if isinstance(path_names, str) else tuple(path_names) 

144 ) 

145 is_debsrc_packaging_file = any( 

146 pn.startswith("debian/") for pn in path_names_as_seq 

147 ) 

148 return LanguageDispatchRule( 

149 primary_language_id, 

150 basename, 

151 path_names_as_seq, 

152 tuple( 

153 SecondaryLanguage(l) if isinstance(l, str) else l 

154 for l in secondary_language_ids 

155 ), 

156 is_debsrc_packaging_file, 

157 ) 

158 

159 

160_STANDARD_HANDLERS = { 

161 types.TEXT_DOCUMENT_FORMATTING: ( 

162 LSP_FORMAT_FILE_HANDLERS, 

163 on_save_trim_end_of_line_whitespace, 

164 ), 

165 types.TEXT_DOCUMENT_CODE_ACTION: ( 

166 CODE_ACTION_HANDLERS, 

167 provide_standard_quickfixes_from_diagnostics_ls, 

168 ), 

169 types.TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: ( 

170 WILL_SAVE_WAIT_UNTIL_HANDLERS, 

171 on_save_trim_end_of_line_whitespace, 

172 ), 

173} 

174 

175 

176def lint_diagnostics( 

177 file_format: LanguageDispatchRule, 

178) -> Callable[[AsyncLinterImpl], AsyncLinterImpl]: 

179 

180 def _wrapper(func: AsyncLinterImpl) -> AsyncLinterImpl: 

181 if not inspect.iscoroutinefunction(func): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 raise ValueError("Linters are must be async") 

183 

184 async def _lint_wrapper( 

185 ls: "DebputyLanguageServer", 

186 params: ( 

187 types.DidOpenTextDocumentParams 

188 | types.DidChangeTextDocumentParams 

189 | types.DocumentDiagnosticParams 

190 ), 

191 ) -> list[types.Diagnostic] | None: 

192 doc = ls.workspace.get_text_document(params.text_document.uri) 

193 lint_state = ls.lint_state(doc) 

194 return await lint_state.run_diagnostics(func) 

195 

196 _register_handler(file_format, LSP_DIAGNOSTIC_HANDLERS, _lint_wrapper) 

197 if file_format.is_debsrc_packaging_file: 

198 for path_name in file_format.path_names: 

199 if path_name.startswith("debian/"): 199 ↛ 198line 199 didn't jump to line 198 because the condition on line 199 was always true

200 CLI_DIAGNOSTIC_HANDLERS[path_name] = func 

201 

202 return func 

203 

204 return _wrapper 

205 

206 

207def lsp_completer(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

208 return _registering_wrapper(file_format, COMPLETER_HANDLERS) 

209 

210 

211def lsp_code_actions(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

212 return _registering_wrapper(file_format, CODE_ACTION_HANDLERS) 

213 

214 

215def lsp_hover(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

216 return _registering_wrapper(file_format, HOVER_HANDLERS) 

217 

218 

219def lsp_text_doc_inlay_hints(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

220 return _registering_wrapper(file_format, TEXT_DOC_INLAY_HANDLERS) 

221 

222 

223def lsp_folding_ranges(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

224 return _registering_wrapper(file_format, FOLDING_RANGE_HANDLERS) 

225 

226 

227def lsp_will_save_wait_until(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

228 return _registering_wrapper(file_format, WILL_SAVE_WAIT_UNTIL_HANDLERS) 

229 

230 

231def lsp_format_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

232 return _registering_wrapper(file_format, LSP_FORMAT_FILE_HANDLERS) 

233 

234 

235def lsp_cli_reformat_document(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

236 def _wrapper(func: C) -> C: 

237 for path_name in file_format.path_names: 

238 if path_name.startswith("debian/"): 238 ↛ 237line 238 didn't jump to line 237 because the condition on line 238 was always true

239 CLI_FORMAT_FILE_HANDLERS[path_name] = func 

240 return func 

241 

242 return _wrapper 

243 

244 

245def lsp_semantic_tokens_full(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

246 return _registering_wrapper(file_format, SEMANTIC_TOKENS_FULL_HANDLERS) 

247 

248 

249def lsp_document_link(file_format: LanguageDispatchRule) -> Callable[[C], C]: 

250 return _registering_wrapper(file_format, DOCUMENT_LINK_HANDLERS) 

251 

252 

253def lsp_standard_handler( 

254 file_format: LanguageDispatchRule, 

255 topic: str, 

256) -> None: 

257 res = _STANDARD_HANDLERS.get(topic) 

258 if res is None: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 raise ValueError(f"No standard handler for {topic}") 

260 

261 table, handler = res 

262 

263 _register_handler(file_format, table, handler) 

264 

265 

266def _registering_wrapper( 

267 file_formats: LanguageDispatchRule, 

268 handler_dict: HandlerDispatchTable[C], 

269) -> Callable[[C], C]: 

270 def _wrapper(func: C) -> C: 

271 _register_handler(file_formats, handler_dict, func) 

272 return func 

273 

274 return _wrapper 

275 

276 

277def _register_handler( 

278 file_format: LanguageDispatchRule, 

279 handler_dict: HandlerDispatchTable[C], 

280 handler: C, 

281) -> None: 

282 primary_table = handler_dict[file_format.primary_language_id] 

283 filename_based_dispatch = handler_dict[""] 

284 

285 if primary_table.default_handler is not None: 

286 raise AssertionError( 

287 f"There is already a handler for language ID {file_format.primary_language_id}" 

288 ) 

289 

290 primary_table.default_handler = handler 

291 for filename in file_format.path_names: 

292 filename_based_handler = filename_based_dispatch.path_name_based_lookups.get( 

293 filename 

294 ) 

295 if filename_based_handler is not None: 

296 raise AssertionError(f"There is already a handler for filename {filename}") 

297 filename_based_dispatch.path_name_based_lookups[filename] = handler 

298 

299 for secondary_language in file_format.secondary_language_ids: 

300 secondary_table = handler_dict[secondary_language.language_id] 

301 if secondary_language.secondary_lookup == "path-name": 

302 if not file_format.path_names: 

303 raise AssertionError( 

304 f"secondary_lookup=path-name requires the language to have path-names. Please correct definition of {file_format.primary_language_id}" 

305 ) 

306 for filename in file_format.path_names: 

307 secondary_handler = secondary_table.path_name_based_lookups.get( 

308 filename 

309 ) 

310 if secondary_handler is not None: 

311 raise AssertionError( 

312 f"There is already a handler for filename {filename} under language ID {secondary_language.language_id}" 

313 ) 

314 secondary_table.path_name_based_lookups[filename] = handler 

315 elif secondary_language.secondary_lookup == "basename": 

316 basename = file_format.basename 

317 if not basename: 

318 raise AssertionError( 

319 f"secondary_lookup=basename requires the language to have a basename. Please correct definition of {file_format.primary_language_id}" 

320 ) 

321 secondary_handler = secondary_table.basename_based_lookups.get(basename) 

322 if secondary_handler is not None: 

323 raise AssertionError( 

324 f"There is already a handler for basename {basename} under language ID {secondary_language.language_id}" 

325 ) 

326 secondary_table.basename_based_lookups[basename] = handler 

327 elif secondary_table.default_handler is not None: 

328 raise AssertionError( 

329 f"There is already a primary handler for language ID {secondary_language.language_id}" 

330 ) 

331 else: 

332 secondary_table.default_handler = handler 

333 

334 

335def ensure_cli_lsp_features_are_loaded() -> None: 

336 # These imports are needed to force loading of the LSP files. The relevant registration 

337 # happens as a side effect of the imports. 

338 import debputy.lsp.languages as lsp_languages 

339 from debputy.linting.lint_impl import LINTER_FORMATS 

340 

341 # Ensure no static analysis tool is temped to optimize out the imports. We need them 

342 # for the side effect. 

343 assert lsp_languages 

344 assert LINTER_FORMATS 

345 

346 

347def describe_lsp_features(context: CommandContext) -> None: 

348 fo = _output_styling(context.parsed_args, sys.stdout) 

349 ensure_cli_lsp_features_are_loaded() 

350 

351 feature_list = [ 

352 ("diagnostics (lint)", LSP_DIAGNOSTIC_HANDLERS), 

353 ("code actions/quickfixes", CODE_ACTION_HANDLERS), 

354 ("completion suggestions", COMPLETER_HANDLERS), 

355 ("hover docs", HOVER_HANDLERS), 

356 ("folding ranges", FOLDING_RANGE_HANDLERS), 

357 ("semantic tokens", SEMANTIC_TOKENS_FULL_HANDLERS), 

358 ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS), 

359 ("inlay hint (doc)", TEXT_DOC_INLAY_HANDLERS), 

360 ("format file handler", LSP_FORMAT_FILE_HANDLERS), 

361 ("document link handler", DOCUMENT_LINK_HANDLERS), 

362 ] 

363 print("LSP language IDs and their features:") 

364 all_ids = sorted({lid for _, t in feature_list for lid in t}) 

365 for lang_id in all_ids: 

366 if lang_id in _ALIAS_OF: 

367 continue 

368 features = [n for n, t in feature_list if lang_id in t] 

369 print(f" * {lang_id}:") 

370 for feature in features: 

371 print(f" - {feature}") 

372 

373 aliases = collections.defaultdict(list) 

374 for lang_id in all_ids: 

375 main_lang = _ALIAS_OF.get(lang_id) 

376 if main_lang is None: 

377 continue 

378 aliases[main_lang].append(lang_id) 

379 

380 print() 

381 print("Aliases:") 

382 for main_id, aliases in aliases.items(): 

383 print(f" * {main_id}: {', '.join(aliases)}") 

384 

385 print() 

386 print("General features:") 

387 for self_check in LSP_CHECKS: 

388 is_ok = self_check.test() 

389 if is_ok: 

390 print(f" * {self_check.feature}: {fo.colored('enabled', fg='green')}") 

391 else: 

392 if self_check.is_mandatory: 

393 disabled = fo.colored( 

394 "missing", 

395 fg="red", 

396 bg="black", 

397 style="bold", 

398 ) 

399 else: 

400 disabled = fo.colored( 

401 "disabled", 

402 fg="yellow", 

403 bg="black", 

404 style="bold", 

405 ) 

406 

407 if self_check.how_to_fix: 

408 print(f" * {self_check.feature}: {disabled}") 

409 print(f" - {self_check.how_to_fix}") 

410 else: 

411 problem_suffix = f" ({self_check.problem})" 

412 print(f" * {self_check.feature}: {disabled}{problem_suffix}")