Coverage for src/debputy/lsp/lsp_generic_yaml.py: 79%

309 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1from typing import ( 

2 Union, 

3 Any, 

4 Optional, 

5 List, 

6 Tuple, 

7 Iterable, 

8 TYPE_CHECKING, 

9 Callable, 

10 Sequence, 

11 get_origin, 

12 Literal, 

13 get_args, 

14) 

15 

16from debputy.linting.lint_util import LintState 

17from debputy.lsp.diagnostics import LintSeverity 

18from debputy.lsp.quickfixes import propose_correct_text_quick_fix 

19from debputy.lsp.vendoring._deb822_repro.locatable import ( 

20 Position as TEPosition, 

21 Range as TERange, 

22) 

23from debputy.manifest_parser.declarative_parser import ( 

24 DeclarativeMappingInputParser, 

25 ParserGenerator, 

26 AttributeDescription, 

27) 

28from debputy.manifest_parser.parser_doc import ( 

29 render_rule, 

30 render_attribute_doc, 

31 doc_args_for_parser_doc, 

32) 

33from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

34from debputy.manifest_parser.util import AttributePath 

35from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

36from debputy.plugin.api.impl import plugin_metadata_for_debputys_own_plugin 

37from debputy.plugin.api.impl_types import ( 

38 DebputyPluginMetadata, 

39 DeclarativeInputParser, 

40 DispatchingParserBase, 

41 InPackageContextParser, 

42 ListWrappedDeclarativeInputParser, 

43) 

44from debputy.util import _info, _warn, detect_possible_typo 

45from debputy.yaml import MANIFEST_YAML 

46from debputy.yaml.compat import ( 

47 Node, 

48 CommentedMap, 

49 LineCol, 

50 CommentedSeq, 

51 CommentedBase, 

52 YAMLError, 

53) 

54 

55if TYPE_CHECKING: 

56 import lsprotocol.types as types 

57else: 

58 import debputy.lsprotocol.types as types 

59 

60try: 

61 from pygls.server import LanguageServer 

62 from debputy.lsp.debputy_ls import DebputyLanguageServer 

63except ImportError: 

64 pass 

65 

66 

67YAML_COMPLETION_HINT_KEY = "___COMPLETE:" 

68YAML_COMPLETION_HINT_VALUE = "___COMPLETE" 

69DEBPUTY_PLUGIN_METADATA = plugin_metadata_for_debputys_own_plugin() 

70 

71 

72def resolve_hover_text_for_value( 

73 feature_set: PluginProvidedFeatureSet, 

74 parser: DeclarativeMappingInputParser, 

75 plugin_metadata: DebputyPluginMetadata, 

76 segment: Union[str, int], 

77 matched: Any, 

78) -> Optional[str]: 

79 

80 hover_doc_text: Optional[str] = None 

81 attr = parser.manifest_attributes.get(segment) 

82 attr_type = attr.attribute_type if attr is not None else None 

83 if attr_type is None: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 _info(f"Matched value for {segment} -- No attr or type") 

85 return None 

86 if isinstance(attr_type, type) and issubclass(attr_type, DebputyDispatchableType): 86 ↛ 104line 86 didn't jump to line 104 because the condition on line 86 was always true

87 parser_generator = feature_set.manifest_parser_generator 

88 parser = parser_generator.dispatch_parser_table_for(attr_type) 

89 if parser is None or not isinstance(matched, str): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 _info( 

91 f"Unknown parser for {segment} or matched is not a str -- {attr_type} {type(matched)=}" 

92 ) 

93 return None 

94 subparser = parser.parser_for(matched) 

95 if subparser is None: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 _info(f"Unknown parser for {matched} (subparser)") 

97 return None 

98 hover_doc_text = render_rule( 

99 matched, 

100 subparser.parser, 

101 plugin_metadata, 

102 ) 

103 else: 

104 _info(f"Unknown value: {matched} -- {segment}") 

105 return hover_doc_text 

106 

107 

108def resolve_hover_text( 

109 feature_set: PluginProvidedFeatureSet, 

110 parser: Optional[Union[DeclarativeInputParser[Any], DispatchingParserBase]], 

111 plugin_metadata: DebputyPluginMetadata, 

112 segments: List[Union[str, int]], 

113 at_depth_idx: int, 

114 matched: Any, 

115 matched_key: bool, 

116) -> Optional[str]: 

117 hover_doc_text: Optional[str] = None 

118 if at_depth_idx == len(segments): 

119 segment = segments[at_depth_idx - 1] 

120 _info(f"Matched {segment} at ==, {matched_key=} ") 

121 hover_doc_text = render_rule( 

122 segment, 

123 parser, 

124 plugin_metadata, 

125 is_root_rule=False, 

126 ) 

127 elif at_depth_idx + 1 == len(segments) and isinstance( 127 ↛ 150line 127 didn't jump to line 150 because the condition on line 127 was always true

128 parser, DeclarativeMappingInputParser 

129 ): 

130 segment = segments[at_depth_idx] 

131 _info(f"Matched {segment} at -1, {matched_key=} ") 

132 if isinstance(segment, str): 132 ↛ 152line 132 didn't jump to line 152 because the condition on line 132 was always true

133 if not matched_key: 

134 hover_doc_text = resolve_hover_text_for_value( 

135 feature_set, 

136 parser, 

137 plugin_metadata, 

138 segment, 

139 matched, 

140 ) 

141 if matched_key or hover_doc_text is None: 

142 rule_name = _guess_rule_name(segments, at_depth_idx) 

143 hover_doc_text = _render_param_doc( 

144 rule_name, 

145 parser, 

146 plugin_metadata, 

147 segment, 

148 ) 

149 else: 

150 _info(f"No doc: {at_depth_idx=} {len(segments)=}") 

151 

152 return hover_doc_text 

153 

154 

155def as_hover_doc( 

156 ls: "DebputyLanguageServer", 

157 hover_doc_text: Optional[str], 

158) -> Optional[types.Hover]: 

159 if hover_doc_text is None: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 return None 

161 return types.Hover( 

162 contents=types.MarkupContent( 

163 kind=ls.hover_markup_format( 

164 types.MarkupKind.Markdown, 

165 types.MarkupKind.PlainText, 

166 ), 

167 value=hover_doc_text, 

168 ), 

169 ) 

170 

171 

172def _render_param_doc( 

173 rule_name: str, 

174 declarative_parser: DeclarativeMappingInputParser, 

175 plugin_metadata: DebputyPluginMetadata, 

176 attribute: str, 

177) -> Optional[str]: 

178 attr = declarative_parser.source_attributes.get(attribute) 

179 if attr is None: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 return None 

181 

182 doc_args, parser_doc = doc_args_for_parser_doc( 

183 rule_name, 

184 declarative_parser, 

185 plugin_metadata, 

186 ) 

187 rendered_docs = render_attribute_doc( 

188 declarative_parser, 

189 declarative_parser.source_attributes, 

190 declarative_parser.input_time_required_parameters, 

191 declarative_parser.at_least_one_of, 

192 parser_doc, 

193 doc_args, 

194 is_interactive=True, 

195 rule_name=rule_name, 

196 ) 

197 

198 for attributes, rendered_doc in rendered_docs: 198 ↛ 207line 198 didn't jump to line 207 because the loop on line 198 didn't complete

199 if attribute in attributes: 

200 full_doc = [ 

201 f"# Attribute `{attribute}`", 

202 "", 

203 ] 

204 full_doc.extend(rendered_doc) 

205 

206 return "\n".join(full_doc) 

207 return None 

208 

209 

210def _guess_rule_name(segments: List[Union[str, int]], idx: int) -> str: 

211 orig_idx = idx 

212 idx -= 1 

213 while idx >= 0: 213 ↛ 218line 213 didn't jump to line 218 because the condition on line 213 was always true

214 segment = segments[idx] 

215 if isinstance(segment, str): 

216 return segment 

217 idx -= 1 

218 _warn(f"Unable to derive rule name from {segments} [{orig_idx}]") 

219 return "<Bug: unknown rule name>" 

220 

221 

222def is_at(position: types.Position, lc_pos: Tuple[int, int]) -> bool: 

223 return position.line == lc_pos[0] and position.character == lc_pos[1] 

224 

225 

226def is_before(position: types.Position, lc_pos: Tuple[int, int]) -> bool: 

227 line, column = lc_pos 

228 if position.line < line: 

229 return True 

230 if position.line == line and position.character < column: 

231 return True 

232 return False 

233 

234 

235def is_after(position: types.Position, lc_pos: Tuple[int, int]) -> bool: 

236 line, column = lc_pos 

237 if position.line > line: 

238 return True 

239 if position.line == line and position.character > column: 

240 return True 

241 return False 

242 

243 

244def error_range_at_position( 

245 lines: List[str], 

246 line_no: int, 

247 char_offset: int, 

248) -> TERange: 

249 line = lines[line_no] 

250 line_len = len(line) 

251 start_idx = char_offset 

252 end_idx = start_idx 

253 

254 if line[start_idx].isspace(): 

255 

256 def _check(x: str) -> bool: 

257 return not x.isspace() 

258 

259 else: 

260 

261 def _check(x: str) -> bool: 

262 return x.isspace() 

263 

264 for i in range(end_idx, line_len): 

265 end_idx = i 

266 if _check(line[i]): 

267 break 

268 

269 for i in range(start_idx, -1, -1): 

270 if i > 0 and _check(line[i]): 

271 break 

272 start_idx = i 

273 

274 return TERange( 

275 TEPosition(line_no, start_idx), 

276 TEPosition(line_no, end_idx), 

277 ) 

278 

279 

280def _escape(v: str) -> str: 

281 return '"' + v.replace("\n", "\\n") + '"' 

282 

283 

284def insert_complete_marker_snippet( 

285 lines: List[str], 

286 server_position: types.Position, 

287) -> bool: 

288 _info(f"Complete at {server_position}") 

289 line_no = server_position.line 

290 line = lines[line_no] if line_no < len(lines) else "" 

291 pos_rhs = ( 

292 line[server_position.character :] 

293 if server_position.character < len(line) 

294 else "" 

295 ) 

296 if pos_rhs and not pos_rhs.isspace(): 

297 _info(f"No insertion: {_escape(line[server_position.character:])}") 

298 return False 

299 lhs_ws = line[: server_position.character] 

300 lhs = lhs_ws.strip() 

301 if lhs.endswith(":"): 

302 _info("Insertion of value (key seen)") 

303 new_line = line[: server_position.character] + YAML_COMPLETION_HINT_VALUE + "\n" 

304 elif lhs.startswith("-"): 

305 _info("Insertion of key or value (list item)") 

306 # Respect the provided indentation 

307 snippet = ( 

308 YAML_COMPLETION_HINT_KEY if ":" not in lhs else YAML_COMPLETION_HINT_VALUE 

309 ) 

310 new_line = line[: server_position.character] + snippet + "\n" 

311 elif not lhs or (lhs_ws and not lhs_ws[0].isspace()): 

312 _info(f"Insertion of key or value: {_escape(line[server_position.character:])}") 

313 # Respect the provided indentation 

314 snippet = ( 

315 YAML_COMPLETION_HINT_KEY if ":" not in lhs else YAML_COMPLETION_HINT_VALUE 

316 ) 

317 new_line = line[: server_position.character] + snippet + "\n" 

318 elif lhs.isalpha() and ":" not in lhs: 

319 _info(f"Expanding value to a key: {_escape(line[server_position.character:])}") 

320 # Respect the provided indentation 

321 new_line = line[: server_position.character] + YAML_COMPLETION_HINT_KEY + "\n" 

322 else: 

323 c = ( 

324 line[server_position.character] 

325 if server_position.character < len(line) 

326 else "(OOB)" 

327 ) 

328 _info(f"Not touching line: {_escape(line)} -- {_escape(c)}") 

329 return False 

330 _info(f'Evaluating complete on synthetic line: "{new_line}"') 

331 if line_no < len(lines): 331 ↛ 333line 331 didn't jump to line 333 because the condition on line 331 was always true

332 lines[line_no] = new_line 

333 elif line_no == len(lines): 

334 lines.append(new_line) 

335 else: 

336 return False 

337 return True 

338 

339 

340def yaml_key_range( 

341 key: Optional[str], 

342 line: int, 

343 col: int, 

344) -> "TERange": 

345 key_len = len(key) if key else 1 

346 return TERange.between( 

347 TEPosition(line, col), 

348 TEPosition(line, col + key_len), 

349 ) 

350 

351 

352def yaml_flag_unknown_key( 

353 lint_state: LintState, 

354 key: Optional[str], 

355 expected_keys: Iterable[str], 

356 line: int, 

357 col: int, 

358 *, 

359 message_format: str = 'Unknown or unsupported key "{key}".', 

360 unknown_keys_diagnostic_severity: Optional[LintSeverity] = "error", 

361) -> Optional[str]: 

362 key_range = yaml_key_range(key, line, col) 

363 

364 candidates = detect_possible_typo(key, expected_keys) if key is not None else () 

365 extra = "" 

366 corrected_key = None 

367 if candidates: 

368 extra = f' It looks like a typo of "{candidates[0]}".' 

369 # TODO: We should be able to tell that `install-doc` and `install-docs` are the same. 

370 # That would enable this to work in more cases. 

371 corrected_key = candidates[0] if len(candidates) == 1 else None 

372 if unknown_keys_diagnostic_severity is None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 message_format = f"Possible typo of {candidates[0]}." 

374 extra = "" 

375 elif unknown_keys_diagnostic_severity is None: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 return None 

377 

378 if key is None: 

379 message_format = "Missing key" 

380 if unknown_keys_diagnostic_severity is not None: 380 ↛ 388line 380 didn't jump to line 388 because the condition on line 380 was always true

381 lint_state.emit_diagnostic( 

382 key_range, 

383 message_format.format(key=key) + extra, 

384 unknown_keys_diagnostic_severity, 

385 "debputy", 

386 quickfixes=[propose_correct_text_quick_fix(n) for n in candidates], 

387 ) 

388 return corrected_key 

389 

390 

391def resolve_keyword( 

392 current_parser: Union[DeclarativeInputParser[Any], DispatchingParserBase], 

393 current_plugin: DebputyPluginMetadata, 

394 segments: List[Union[str, int]], 

395 segment_idx: int, 

396 parser_generator: ParserGenerator, 

397 *, 

398 is_completion_attempt: bool = False, 

399) -> Optional[ 

400 Tuple[ 

401 Union[DeclarativeInputParser[Any], DispatchingParserBase], 

402 DebputyPluginMetadata, 

403 int, 

404 ] 

405]: 

406 if segment_idx >= len(segments): 

407 return current_parser, current_plugin, segment_idx 

408 current_segment = segments[segment_idx] 

409 if isinstance(current_parser, ListWrappedDeclarativeInputParser): 

410 if isinstance(current_segment, int): 410 ↛ 417line 410 didn't jump to line 417 because the condition on line 410 was always true

411 current_parser = current_parser.delegate 

412 segment_idx += 1 

413 if segment_idx >= len(segments): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 return current_parser, current_plugin, segment_idx 

415 current_segment = segments[segment_idx] 

416 

417 if not isinstance(current_segment, str): 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 return None 

419 

420 if is_completion_attempt and current_segment.endswith( 

421 (YAML_COMPLETION_HINT_KEY, YAML_COMPLETION_HINT_VALUE) 

422 ): 

423 return current_parser, current_plugin, segment_idx 

424 

425 if isinstance(current_parser, InPackageContextParser): 

426 return resolve_keyword( 

427 current_parser.delegate, 

428 current_plugin, 

429 segments, 

430 segment_idx + 1, 

431 parser_generator, 

432 is_completion_attempt=is_completion_attempt, 

433 ) 

434 elif isinstance(current_parser, DispatchingParserBase): 

435 if not current_parser.is_known_keyword(current_segment): 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 if is_completion_attempt: 

437 return current_parser, current_plugin, segment_idx 

438 return None 

439 subparser = current_parser.parser_for(current_segment) 

440 segment_idx += 1 

441 if segment_idx < len(segments): 

442 return resolve_keyword( 

443 subparser.parser, 

444 subparser.plugin_metadata, 

445 segments, 

446 segment_idx, 

447 parser_generator, 

448 is_completion_attempt=is_completion_attempt, 

449 ) 

450 return subparser.parser, subparser.plugin_metadata, segment_idx 

451 elif isinstance(current_parser, DeclarativeMappingInputParser): 451 ↛ 473line 451 didn't jump to line 473 because the condition on line 451 was always true

452 attr = current_parser.manifest_attributes.get(current_segment) 

453 attr_type = attr.attribute_type if attr is not None else None 

454 if ( 

455 attr_type is not None 

456 and isinstance(attr_type, type) 

457 and issubclass(attr_type, DebputyDispatchableType) 

458 ): 

459 subparser = parser_generator.dispatch_parser_table_for(attr_type) 

460 if subparser is not None and ( 

461 is_completion_attempt or segment_idx + 1 < len(segments) 

462 ): 

463 return resolve_keyword( 

464 subparser, 

465 current_plugin, 

466 segments, 

467 segment_idx + 1, 

468 parser_generator, 

469 is_completion_attempt=is_completion_attempt, 

470 ) 

471 return current_parser, current_plugin, segment_idx 

472 else: 

473 _info(f"Unknown parser: {current_parser.__class__}") 

474 return None 

475 

476 

477def _trace_cursor( 

478 content: Any, 

479 attribute_path: AttributePath, 

480 server_position: types.Position, 

481) -> Optional[Tuple[bool, AttributePath, Any, Any]]: 

482 matched_key: Optional[Union[str, int]] = None 

483 matched: Optional[Node] = None 

484 matched_was_key: bool = False 

485 

486 if isinstance(content, CommentedMap): 

487 dict_lc: LineCol = content.lc 

488 for k, v in content.items(): 

489 k_lc = dict_lc.key(k) 

490 if is_before(server_position, k_lc): 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 break 

492 v_lc = dict_lc.value(k) 

493 if is_before(server_position, v_lc): 

494 # TODO: Handle ":" and "whitespace" 

495 matched = k 

496 matched_key = k 

497 matched_was_key = True 

498 break 

499 matched = v 

500 matched_key = k 

501 elif isinstance(content, CommentedSeq): 501 ↛ 510line 501 didn't jump to line 510 because the condition on line 501 was always true

502 list_lc: LineCol = content.lc 

503 for idx, value in enumerate(content): 

504 i_lc = list_lc.item(idx) 

505 if is_before(server_position, i_lc): 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true

506 break 

507 matched_key = idx 

508 matched = value 

509 

510 if matched is not None: 510 ↛ 516line 510 didn't jump to line 516 because the condition on line 510 was always true

511 assert matched_key is not None 

512 sub_path = attribute_path[matched_key] 

513 if not matched_was_key and isinstance(matched, CommentedBase): 

514 return _trace_cursor(matched, sub_path, server_position) 

515 return matched_was_key, sub_path, matched, content 

516 return None 

517 

518 

519def maybe_quote_yaml_value(v: str) -> str: 

520 if v and v[0].isdigit(): 

521 try: 

522 float(v) 

523 return f"'{v}'" 

524 except ValueError: 

525 pass 

526 return v 

527 

528 

529def _complete_value(v: Any) -> str: 

530 if isinstance(v, str): 530 ↛ 532line 530 didn't jump to line 532 because the condition on line 530 was always true

531 return maybe_quote_yaml_value(v) 

532 return str(v) 

533 

534 

535def completion_from_attr( 

536 attr: AttributeDescription, 

537 pg: ParserGenerator, 

538 matched: Any, 

539) -> Optional[Union[types.CompletionList, Sequence[types.CompletionItem]]]: 

540 type_mapping = pg.get_mapped_type_from_target_type(attr.attribute_type) 

541 if type_mapping is not None: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 attr_type = type_mapping.source_type 

543 else: 

544 attr_type = attr.attribute_type 

545 

546 orig = get_origin(attr_type) 

547 valid_values: Sequence[Any] = tuple() 

548 

549 if orig == Literal: 

550 valid_values = get_args(attr_type) 

551 elif orig == bool or attr.attribute_type == bool: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was always true

552 valid_values = ("true", "false") 

553 elif isinstance(orig, type) and issubclass(orig, DebputyDispatchableType): 

554 parser = pg.dispatch_parser_table_for(orig) 

555 _info(f"M: {parser}") 

556 

557 if matched in valid_values: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 _info(f"Already filled: {matched} is one of {valid_values}") 

559 return None 

560 if valid_values: 560 ↛ 562line 560 didn't jump to line 562 because the condition on line 560 was always true

561 return [types.CompletionItem(_complete_value(x)) for x in valid_values] 

562 return None 

563 

564 

565def generic_yaml_hover( 

566 ls: "DebputyLanguageServer", 

567 params: types.HoverParams, 

568 root_parser_initializer: Callable[ 

569 [ParserGenerator], Union[DeclarativeInputParser[Any], DispatchingParserBase] 

570 ], 

571) -> Optional[types.Hover]: 

572 doc = ls.workspace.get_text_document(params.text_document.uri) 

573 lines = doc.lines 

574 position_codec = doc.position_codec 

575 server_position = position_codec.position_from_client_units(lines, params.position) 

576 

577 try: 

578 content = MANIFEST_YAML.load("".join(lines)) 

579 except YAMLError: 

580 return None 

581 attribute_root_path = AttributePath.root_path(content) 

582 m = _trace_cursor(content, attribute_root_path, server_position) 

583 if m is None: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 _info("No match") 

585 return None 

586 matched_key, attr_path, matched, _ = m 

587 _info(f"Matched path: {matched} (path: {attr_path.path}) [{matched_key=}]") 

588 

589 feature_set = ls.plugin_feature_set 

590 parser_generator = feature_set.manifest_parser_generator 

591 root_parser = root_parser_initializer(parser_generator) 

592 segments = list(attr_path.path_segments()) 

593 km = resolve_keyword( 

594 root_parser, 

595 DEBPUTY_PLUGIN_METADATA, 

596 segments, 

597 0, 

598 parser_generator, 

599 ) 

600 if km is None: 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true

601 _info("No keyword match") 

602 return None 

603 parser, plugin_metadata, at_depth_idx = km 

604 _info(f"Match leaf parser {at_depth_idx}/{len(segments)} -- {parser.__class__}") 

605 hover_doc_text = resolve_hover_text( 

606 feature_set, 

607 parser, 

608 plugin_metadata, 

609 segments, 

610 at_depth_idx, 

611 matched, 

612 matched_key, 

613 ) 

614 return as_hover_doc(ls, hover_doc_text)