Coverage for src/debputy/lsp/lsp_generic_deb822.py: 80%

363 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import re 

2from itertools import chain 

3from typing import ( 

4 Optional, 

5 Union, 

6 Sequence, 

7 Tuple, 

8 Any, 

9 Container, 

10 List, 

11 Iterable, 

12 Iterator, 

13 Callable, 

14 cast, 

15 TYPE_CHECKING, 

16) 

17 

18from debputy.linting.lint_util import LintState, te_position_to_lsp 

19from debputy.lsp.debputy_ls import DebputyLanguageServer 

20from debputy.lsp.lsp_debian_control_reference_data import ( 

21 Deb822FileMetadata, 

22 Deb822KnownField, 

23 StanzaMetadata, 

24 F, 

25 S, 

26 SUBSTVAR_RE, 

27) 

28from debputy.lsp.lsp_features import SEMANTIC_TOKEN_TYPES_IDS 

29from debputy.lsp.quickfixes import propose_correct_text_quick_fix 

30from debputy.lsp.text_util import ( 

31 trim_end_of_line_whitespace, 

32 SemanticTokensState, 

33) 

34from debputy.lsp.vendoring._deb822_repro.locatable import ( 

35 START_POSITION, 

36 Range as TERange, 

37 Position as TEPosition, 

38) 

39from debputy.lsp.vendoring._deb822_repro.parsing import ( 

40 Deb822KeyValuePairElement, 

41 Deb822ParagraphElement, 

42 Deb822FileElement, 

43 Deb822CommentElement, 

44) 

45from debputy.lsp.vendoring._deb822_repro.tokens import tokenize_deb822_file, Deb822Token 

46from debputy.lsp.vendoring._deb822_repro.types import TokenOrElement 

47from debputy.lsprotocol.types import ( 

48 CompletionParams, 

49 CompletionList, 

50 CompletionItem, 

51 Position, 

52 MarkupContent, 

53 Hover, 

54 MarkupKind, 

55 HoverParams, 

56 FoldingRangeParams, 

57 FoldingRange, 

58 FoldingRangeKind, 

59 SemanticTokensParams, 

60 SemanticTokens, 

61 TextEdit, 

62 MessageType, 

63 SemanticTokenTypes, 

64) 

65from debputy.util import _info, _warn 

66 

67if TYPE_CHECKING: 

68 import lsprotocol.types as types 

69else: 

70 import debputy.lsprotocol.types as types 

71 

72 

73try: 

74 from pygls.server import LanguageServer 

75 from pygls.workspace import TextDocument 

76except ImportError: 

77 pass 

78 

79 

80_CONTAINS_SPACE_OR_COLON = re.compile(r"[\s:]") 

81 

82 

83def in_range( 

84 te_range: TERange, 

85 cursor_position: Position, 

86 *, 

87 inclusive_end: bool = False, 

88) -> bool: 

89 cursor_line = cursor_position.line 

90 start_pos = te_range.start_pos 

91 end_pos = te_range.end_pos 

92 if cursor_line < start_pos.line_position or cursor_line > end_pos.line_position: 

93 return False 

94 

95 if start_pos.line_position == end_pos.line_position: 

96 start_col = start_pos.cursor_position 

97 cursor_col = cursor_position.character 

98 end_col = end_pos.cursor_position 

99 if inclusive_end: 99 ↛ 101line 99 didn't jump to line 101 because the condition on line 99 was always true

100 return start_col <= cursor_col <= end_col 

101 return start_col <= cursor_col < end_col 

102 

103 if cursor_line == end_pos.line_position: 

104 return cursor_position.character < end_pos.cursor_position 

105 

106 return ( 

107 cursor_line > start_pos.line_position 

108 or start_pos.cursor_position <= cursor_position.character 

109 ) 

110 

111 

112def _field_at_position( 

113 stanza: Deb822ParagraphElement, 

114 stanza_metadata: S, 

115 stanza_range: TERange, 

116 position: Position, 

117) -> Tuple[Optional[Deb822KeyValuePairElement], Optional[F], str, bool]: 

118 te_range = TERange(stanza_range.start_pos, stanza_range.start_pos) 

119 for token_or_element in stanza.iter_parts(): 119 ↛ 147line 119 didn't jump to line 147 because the loop on line 119 didn't complete

120 te_range = token_or_element.size().relative_to(te_range.end_pos) 

121 if not in_range(te_range, position): 

122 continue 

123 if isinstance(token_or_element, Deb822KeyValuePairElement): 123 ↛ 119line 123 didn't jump to line 119 because the condition on line 123 was always true

124 value_range = token_or_element.value_element.range_in_parent().relative_to( 

125 te_range.start_pos 

126 ) 

127 known_field = stanza_metadata.get(token_or_element.field_name) 

128 in_value = in_range(value_range, position) 

129 interpreter = ( 

130 known_field.field_value_class.interpreter() 

131 if known_field is not None 

132 else None 

133 ) 

134 matched_value = "" 

135 if in_value and interpreter is not None: 

136 interpreted = token_or_element.interpret_as(interpreter) 

137 for value_ref in interpreted.iter_value_references(): 

138 value_token_range = ( 

139 value_ref.locatable.range_in_parent().relative_to( 

140 value_range.start_pos 

141 ) 

142 ) 

143 if in_range(value_token_range, position, inclusive_end=True): 143 ↛ 137line 143 didn't jump to line 137 because the condition on line 143 was always true

144 matched_value = value_ref.value 

145 break 

146 return token_or_element, known_field, matched_value, in_value 

147 return None, None, "", False 

148 

149 

150def _allow_stanza_continuation( 

151 token_or_element: TokenOrElement, 

152 is_completion: bool, 

153) -> bool: 

154 if not is_completion: 

155 return False 

156 if token_or_element.is_error or token_or_element.is_comment: 

157 return True 

158 return ( 

159 token_or_element.is_whitespace 

160 and token_or_element.convert_to_text().count("\n") < 2 

161 ) 

162 

163 

164def _at_cursor( 

165 deb822_file: Deb822FileElement, 

166 file_metadata: Deb822FileMetadata[S, F], 

167 doc: "TextDocument", 

168 lines: List[str], 

169 client_position: Position, 

170 is_completion: bool = False, 

171) -> Tuple[ 

172 Position, 

173 Optional[str], 

174 str, 

175 bool, 

176 Optional[S], 

177 Optional[F], 

178 Iterable[Deb822ParagraphElement], 

179]: 

180 server_position = doc.position_codec.position_from_client_units( 

181 lines, 

182 client_position, 

183 ) 

184 te_range = TERange( 

185 START_POSITION, 

186 START_POSITION, 

187 ) 

188 paragraph_no = -1 

189 previous_stanza: Optional[Deb822ParagraphElement] = None 

190 next_stanza: Optional[Deb822ParagraphElement] = None 

191 current_word = doc.word_at_position(client_position) 

192 in_value: bool = False 

193 file_iter = iter(deb822_file.iter_parts()) 

194 matched_token: Optional[TokenOrElement] = None 

195 matched_field: Optional[str] = None 

196 stanza_metadata: Optional[S] = None 

197 known_field: Optional[F] = None 

198 

199 for token_or_element in file_iter: 199 ↛ 223line 199 didn't jump to line 223 because the loop on line 199 didn't complete

200 te_range = token_or_element.size().relative_to(te_range.end_pos) 

201 if isinstance(token_or_element, Deb822ParagraphElement): 

202 previous_stanza = token_or_element 

203 paragraph_no += 1 

204 elif not _allow_stanza_continuation(token_or_element, is_completion): 

205 previous_stanza = None 

206 if not in_range(te_range, server_position): 

207 continue 

208 matched_token = token_or_element 

209 if isinstance(token_or_element, Deb822ParagraphElement): 

210 stanza_metadata = file_metadata.guess_stanza_classification_by_idx( 

211 paragraph_no 

212 ) 

213 kvpair, known_field, current_word, in_value = _field_at_position( 

214 token_or_element, 

215 stanza_metadata, 

216 te_range, 

217 server_position, 

218 ) 

219 if kvpair is not None: 219 ↛ 221line 219 didn't jump to line 221 because the condition on line 219 was always true

220 matched_field = kvpair.field_name 

221 break 

222 

223 if matched_token is not None and _allow_stanza_continuation( 

224 matched_token, 

225 is_completion, 

226 ): 

227 next_te = next(file_iter, None) 

228 if isinstance(next_te, Deb822ParagraphElement): 

229 next_stanza = next_te 

230 

231 stanza_parts = (p for p in (previous_stanza, next_stanza) if p is not None) 

232 

233 if stanza_metadata is None and is_completion: 

234 if paragraph_no < 0: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 paragraph_no = 0 

236 stanza_metadata = file_metadata.guess_stanza_classification_by_idx(paragraph_no) 

237 

238 return ( 

239 server_position, 

240 matched_field, 

241 current_word, 

242 in_value, 

243 stanza_metadata, 

244 known_field, 

245 stanza_parts, 

246 ) 

247 

248 

249def deb822_completer( 

250 ls: "DebputyLanguageServer", 

251 params: CompletionParams, 

252 file_metadata: Deb822FileMetadata[Any, Any], 

253) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]: 

254 doc = ls.workspace.get_text_document(params.text_document.uri) 

255 lines = doc.lines 

256 lint_state = ls.lint_state(doc) 

257 deb822_file = lint_state.parsed_deb822_file_content 

258 if deb822_file is None: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 _warn("The deb822 result missing failed!?") 

260 ls.show_message_log( 

261 "Internal error; could not get deb822 content!?", MessageType.Warning 

262 ) 

263 return None 

264 

265 ( 

266 server_pos, 

267 current_field, 

268 word_at_position, 

269 in_value, 

270 stanza_metadata, 

271 known_field, 

272 matched_stanzas, 

273 ) = _at_cursor( 

274 deb822_file, 

275 file_metadata, 

276 doc, 

277 lines, 

278 params.position, 

279 is_completion=True, 

280 ) 

281 

282 if lines[server_pos.line].startswith("#"): 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return 

284 

285 items: Optional[Sequence[CompletionItem]] 

286 markdown_kind = ls.completion_item_document_markup( 

287 MarkupKind.Markdown, MarkupKind.PlainText 

288 ) 

289 if in_value: 

290 _info(f"Completion for field value {current_field} -- {word_at_position}") 

291 if known_field is None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 return None 

293 value_being_completed = word_at_position 

294 items = known_field.value_options_for_completer( 

295 lint_state, 

296 list(matched_stanzas), 

297 value_being_completed, 

298 markdown_kind, 

299 ) 

300 else: 

301 _info("Completing field name") 

302 assert stanza_metadata is not None 

303 items = _complete_field_name( 

304 lint_state, 

305 stanza_metadata, 

306 matched_stanzas, 

307 markdown_kind, 

308 ) 

309 

310 _info( 

311 f"Completion candidates: {[i.label for i in items] if items is not None else 'None'}" 

312 ) 

313 

314 return items 

315 

316 

317def deb822_hover( 

318 ls: "DebputyLanguageServer", 

319 params: HoverParams, 

320 file_metadata: Deb822FileMetadata[S, F], 

321 *, 

322 custom_handler: Optional[ 

323 Callable[ 

324 [ 

325 "DebputyLanguageServer", 

326 Position, 

327 Optional[str], 

328 str, 

329 Optional[F], 

330 bool, 

331 "TextDocument", 

332 List[str], 

333 ], 

334 Optional[Hover], 

335 ] 

336 ] = None, 

337) -> Optional[Hover]: 

338 doc = ls.workspace.get_text_document(params.text_document.uri) 

339 lines = doc.lines 

340 deb822_file = ls.lint_state(doc).parsed_deb822_file_content 

341 if deb822_file is None: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 _warn("The deb822 result missing failed!?") 

343 ls.show_message_log( 

344 "Internal error; could not get deb822 content!?", MessageType.Warning 

345 ) 

346 return None 

347 

348 ( 

349 server_pos, 

350 current_field, 

351 word_at_position, 

352 in_value, 

353 _, 

354 known_field, 

355 _, 

356 ) = _at_cursor( 

357 deb822_file, 

358 file_metadata, 

359 doc, 

360 lines, 

361 params.position, 

362 ) 

363 

364 if lines[server_pos.line].startswith("#"): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 return 

366 

367 hover_text = None 

368 if custom_handler is not None: 368 ↛ 383line 368 didn't jump to line 383 because the condition on line 368 was always true

369 res = custom_handler( 

370 ls, 

371 server_pos, 

372 current_field, 

373 word_at_position, 

374 known_field, 

375 in_value, 

376 doc, 

377 lines, 

378 ) 

379 if isinstance(res, Hover): 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 return res 

381 hover_text = res 

382 

383 if hover_text is None: 

384 if current_field is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 _info("No hover information as we cannot determine which field it is for") 

386 return None 

387 

388 if known_field is None: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true

389 return None 

390 if in_value: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 if not known_field.known_values: 

392 return None 

393 keyword = known_field.known_values.get(word_at_position) 

394 if keyword is None: 

395 return None 

396 hover_text = keyword.long_description_translated(ls) 

397 if hover_text is not None: 

398 header = "`{VALUE}` (Field: {FIELD_NAME})".format( 

399 VALUE=keyword.value, 

400 FIELD_NAME=known_field.name, 

401 ) 

402 hover_text = f"# {header}\n\n{hover_text}" 

403 else: 

404 hover_text = known_field.long_description_translated(ls) 

405 if hover_text is None: 405 ↛ 406line 405 didn't jump to line 406

406 hover_text = ( 

407 f"No documentation is available for the field {current_field}." 

408 ) 

409 hover_text = f"# {known_field.name}\n\n{hover_text}" 

410 

411 if hover_text is None: 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true

412 return None 

413 return Hover( 

414 contents=MarkupContent( 

415 kind=ls.hover_markup_format(MarkupKind.Markdown, MarkupKind.PlainText), 

416 value=hover_text, 

417 ) 

418 ) 

419 

420 

421def deb822_token_iter( 

422 tokens: Iterable[Deb822Token], 

423) -> Iterator[Tuple[Deb822Token, int, int, int, int]]: 

424 line_no = 0 

425 line_offset = 0 

426 

427 for token in tokens: 

428 start_line = line_no 

429 start_line_offset = line_offset 

430 

431 newlines = token.text.count("\n") 

432 line_no += newlines 

433 text_len = len(token.text) 

434 if newlines: 

435 if token.text.endswith("\n"): 435 ↛ 439line 435 didn't jump to line 439 because the condition on line 435 was always true

436 line_offset = 0 

437 else: 

438 # -2, one to remove the "\n" and one to get 0-offset 

439 line_offset = text_len - token.text.rindex("\n") - 2 

440 else: 

441 line_offset += text_len 

442 

443 yield token, start_line, start_line_offset, line_no, line_offset 

444 

445 

446def deb822_folding_ranges( 

447 ls: "DebputyLanguageServer", 

448 params: FoldingRangeParams, 

449 # Unused for now: might be relevant for supporting folding for some fields 

450 _file_metadata: Deb822FileMetadata[Any, Any], 

451) -> Optional[Sequence[FoldingRange]]: 

452 doc = ls.workspace.get_text_document(params.text_document.uri) 

453 comment_start = -1 

454 folding_ranges = [] 

455 for ( 

456 token, 

457 start_line, 

458 start_offset, 

459 end_line, 

460 end_offset, 

461 ) in deb822_token_iter(tokenize_deb822_file(doc.lines)): 

462 if token.is_comment: 

463 if comment_start < 0: 

464 comment_start = start_line 

465 elif comment_start > -1: 

466 comment_start = -1 

467 folding_range = FoldingRange( 

468 comment_start, 

469 end_line, 

470 kind=FoldingRangeKind.Comment, 

471 ) 

472 

473 folding_ranges.append(folding_range) 

474 

475 return folding_ranges 

476 

477 

478class Deb822SemanticTokensState(SemanticTokensState): 

479 

480 __slots__ = ( 

481 "file_metadata", 

482 "keyword_token_code", 

483 "known_value_token_code", 

484 "comment_token_code", 

485 "substvars_token_code", 

486 "allow_overlapping_tokens", 

487 ) 

488 

489 def __init__( 

490 self, 

491 ls: "DebputyLanguageServer", 

492 doc: "TextDocument", 

493 lines: List[str], 

494 tokens: List[int], 

495 file_metadata: Deb822FileMetadata[Any, Any], 

496 keyword_token_code: int, 

497 known_value_token_code: int, 

498 comment_token_code: int, 

499 substvars_token_code: int, 

500 ) -> None: 

501 super().__init__(ls, doc, lines, tokens) 

502 self.file_metadata = file_metadata 

503 self.keyword_token_code = keyword_token_code 

504 self.known_value_token_code = known_value_token_code 

505 self.comment_token_code = comment_token_code 

506 self.substvars_token_code = substvars_token_code 

507 

508 

509def _emit_tokens_for_comment_element( 

510 sem_token_state: Deb822SemanticTokensState, 

511 comment_element: Deb822CommentElement, 

512 comment_start_line: int, 

513 comment_token_code: int, 

514) -> None: 

515 for comment_line_no, comment_token in enumerate( 

516 comment_element.iter_parts(), 

517 start=comment_start_line, 

518 ): 

519 assert comment_token.is_comment 

520 assert isinstance(comment_token, Deb822Token) 

521 sem_token_state.emit_token( 

522 Position(comment_line_no, 0), 

523 len(comment_token.text.rstrip()), 

524 comment_token_code, 

525 ) 

526 

527 

528async def scan_for_syntax_errors_and_token_level_diagnostics( 

529 deb822_file: Deb822FileElement, 

530 lint_state: LintState, 

531) -> int: 

532 first_error = len(lint_state.lines) + 1 

533 spell_checker = lint_state.spellchecker() 

534 

535 async for ( 

536 token, 

537 start_line, 

538 start_offset, 

539 end_line, 

540 end_offset, 

541 ) in lint_state.slow_iter(deb822_token_iter(deb822_file.iter_tokens())): 

542 if token.is_error: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 first_error = min(first_error, start_line) 

544 token_range = TERange( 

545 TEPosition( 

546 start_line, 

547 start_offset, 

548 ), 

549 TEPosition( 

550 end_line, 

551 end_offset, 

552 ), 

553 ) 

554 lint_state.emit_diagnostic( 

555 token_range, 

556 "Syntax error", 

557 "error", 

558 "debputy", 

559 ) 

560 elif token.is_comment: 

561 for word, col_pos, end_col_pos in spell_checker.iter_words(token.text): 

562 corrections = spell_checker.provide_corrections_for(word) 

563 if not corrections: 563 ↛ 565line 563 didn't jump to line 565 because the condition on line 563 was always true

564 continue 

565 word_range = TERange.between( 

566 TEPosition( 

567 start_line, 

568 col_pos, 

569 ), 

570 TEPosition( 

571 start_line, 

572 end_col_pos, 

573 ), 

574 ) 

575 lint_state.emit_diagnostic( 

576 word_range, 

577 f'Spelling "{word}"', 

578 "spelling", 

579 "debputy", 

580 quickfixes=[propose_correct_text_quick_fix(c) for c in corrections], 

581 enable_non_interactive_auto_fix=False, 

582 ) 

583 return first_error 

584 

585 

586async def _deb822_paragraph_semantic_tokens_full( 

587 ls: "DebputyLanguageServer", 

588 sem_token_state: Deb822SemanticTokensState, 

589 stanza: Deb822ParagraphElement, 

590 stanza_idx: int, 

591) -> None: 

592 doc = sem_token_state.doc 

593 keyword_token_code = sem_token_state.keyword_token_code 

594 known_value_token_code = sem_token_state.known_value_token_code 

595 comment_token_code = sem_token_state.comment_token_code 

596 

597 stanza_position = stanza.position_in_file() 

598 stanza_metadata = sem_token_state.file_metadata.classify_stanza( 

599 stanza, 

600 stanza_idx=stanza_idx, 

601 ) 

602 async for kvpair in ls.slow_iter( 

603 stanza.iter_parts_of_type(Deb822KeyValuePairElement), yield_every=25 

604 ): 

605 kvpair_position = kvpair.position_in_parent().relative_to(stanza_position) 

606 field_start = kvpair.field_token.position_in_parent().relative_to( 

607 kvpair_position 

608 ) 

609 comment = kvpair.comment_element 

610 if comment: 

611 comment_start_line = field_start.line_position - len(comment) 

612 _emit_tokens_for_comment_element( 

613 sem_token_state, 

614 comment, 

615 comment_start_line, 

616 comment_token_code, 

617 ) 

618 

619 field_size = doc.position_codec.client_num_units(kvpair.field_name) 

620 

621 sem_token_state.emit_token( 

622 te_position_to_lsp(field_start), 

623 field_size, 

624 keyword_token_code, 

625 ) 

626 

627 known_field: Optional[Deb822KnownField] = stanza_metadata.get(kvpair.field_name) 

628 if known_field is not None: 

629 if known_field.spellcheck_value: 

630 continue 

631 known_values: Container[str] = known_field.known_values or frozenset() 

632 interpretation = known_field.field_value_class.interpreter() 

633 field_disallows_substvars = ( 

634 known_field.is_substvars_disabled_even_if_allowed_by_stanza 

635 ) 

636 allow_substvars = ( 

637 stanza_metadata.is_substvars_allowed_in_stanza 

638 and not field_disallows_substvars 

639 ) 

640 else: 

641 known_values = frozenset() 

642 interpretation = None 

643 allow_substvars = stanza_metadata.is_substvars_allowed_in_stanza 

644 

645 value_element_pos = kvpair.value_element.position_in_parent().relative_to( 

646 kvpair_position 

647 ) 

648 if interpretation is None: 

649 for value_line in kvpair.value_element.value_lines: 

650 comment_element = value_line.comment_element 

651 if comment_element: 

652 assert comment_element.position_in_parent().line_position == 0 

653 comment_start_line = ( 

654 value_line.position_in_parent() 

655 .relative_to(value_element_pos) 

656 .line_position 

657 ) 

658 _emit_tokens_for_comment_element( 

659 sem_token_state, 

660 comment_element, 

661 comment_start_line, 

662 comment_token_code, 

663 ) 

664 continue 

665 else: 

666 parts = kvpair.interpret_as(interpretation).iter_parts() 

667 for te in parts: 

668 if te.is_whitespace: 

669 continue 

670 if te.is_separator: 

671 continue 

672 value_range_in_parent_te = te.range_in_parent() 

673 value_range_te = value_range_in_parent_te.relative_to(value_element_pos) 

674 value = te.convert_to_text() 

675 if te.is_comment: 

676 token_type = comment_token_code 

677 value = value.rstrip() 

678 elif value in known_values: 

679 token_type = known_value_token_code 

680 elif allow_substvars and "${" in value: 

681 _process_value_with_substvars( 

682 sem_token_state, 

683 value, 

684 value_range_te, 

685 None, 

686 ) 

687 continue 

688 else: 

689 continue 

690 value_len = doc.position_codec.client_num_units(value) 

691 sem_token_state.emit_token( 

692 te_position_to_lsp(value_range_te.start_pos), 

693 value_len, 

694 token_type, 

695 ) 

696 

697 

698def _split_into_substvars( 

699 value: str, 

700 base_token_type: Optional[int], 

701 substvar_token_type: int, 

702) -> Iterable[Tuple[str, Optional[int]]]: 

703 

704 i = 0 

705 next_search = i 

706 full_value_len = len(value) 

707 while i < full_value_len: 

708 try: 

709 subst_var_start = value.index("${", next_search) 

710 subst_var_end = value.index("}", subst_var_start + 2) 

711 except ValueError: 

712 token = value[i:full_value_len] 

713 if token: 713 ↛ 715line 713 didn't jump to line 715 because the condition on line 713 was always true

714 yield token, base_token_type 

715 return 

716 

717 subst_var_end += 1 

718 subst_var = value[subst_var_start:subst_var_end] 

719 if subst_var != "${}" and not SUBSTVAR_RE.match(subst_var): 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 subst_var = None 

721 

722 if subst_var is None: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true

723 next_search = subst_var_end 

724 continue 

725 

726 token = value[i:subst_var_start] 

727 if token: 

728 yield token, base_token_type 

729 yield subst_var, substvar_token_type 

730 i = subst_var_end 

731 next_search = i 

732 

733 

734def _process_value_with_substvars( 

735 sem_token_state: Deb822SemanticTokensState, 

736 value: str, 

737 value_range_te: "TERange", 

738 base_token_type: Optional[int], 

739) -> None: 

740 pos_codec = sem_token_state.doc.position_codec 

741 

742 # TODO: Support overlapping tokens if the editor does. 

743 

744 line = value_range_te.start_pos.line_position 

745 token_pos = value_range_te.start_pos.cursor_position 

746 substvar_token_code = sem_token_state.substvars_token_code 

747 for token, token_type in _split_into_substvars( 

748 value, 

749 base_token_type, 

750 substvar_token_code, 

751 ): 

752 token_len = len(token) 

753 if token_type is not None: 

754 sem_token_state.emit_token( 

755 types.Position(line, token_pos), 

756 pos_codec.client_num_units(token), 

757 token_type, 

758 ) 

759 token_pos += token_len 

760 

761 

762def deb822_format_file( 

763 lint_state: LintState, 

764 file_metadata: Deb822FileMetadata[Any, Any], 

765) -> Optional[Sequence[TextEdit]]: 

766 effective_preference = lint_state.effective_preference 

767 if effective_preference is None: 

768 return trim_end_of_line_whitespace(lint_state.position_codec, lint_state.lines) 

769 formatter = effective_preference.deb822_formatter() 

770 lines = lint_state.lines 

771 deb822_file = lint_state.parsed_deb822_file_content 

772 if deb822_file is None: 

773 _warn("The deb822 result missing failed!?") 

774 return None 

775 

776 return list( 

777 file_metadata.reformat( 

778 effective_preference, 

779 deb822_file, 

780 formatter, 

781 lint_state.content, 

782 lint_state.position_codec, 

783 lines, 

784 ) 

785 ) 

786 

787 

788async def deb822_semantic_tokens_full( 

789 ls: "DebputyLanguageServer", 

790 request: SemanticTokensParams, 

791 file_metadata: Deb822FileMetadata[Any, Any], 

792) -> Optional[SemanticTokens]: 

793 doc = ls.workspace.get_text_document(request.text_document.uri) 

794 position_codec = doc.position_codec 

795 lines = doc.lines 

796 deb822_file = ls.lint_state(doc).parsed_deb822_file_content 

797 if deb822_file is None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 _warn("The deb822 result missing failed!?") 

799 ls.show_message_log( 

800 "Internal error; could not get deb822 content!?", MessageType.Warning 

801 ) 

802 return None 

803 

804 tokens: List[int] = [] 

805 comment_token_code = SEMANTIC_TOKEN_TYPES_IDS[SemanticTokenTypes.Comment.value] 

806 sem_token_state = Deb822SemanticTokensState( 

807 ls, 

808 doc, 

809 lines, 

810 tokens, 

811 file_metadata, 

812 SEMANTIC_TOKEN_TYPES_IDS[SemanticTokenTypes.Keyword], 

813 SEMANTIC_TOKEN_TYPES_IDS[SemanticTokenTypes.EnumMember], 

814 comment_token_code, 

815 SEMANTIC_TOKEN_TYPES_IDS[SemanticTokenTypes.Macro], 

816 ) 

817 

818 stanza_idx = 0 

819 

820 async for part in ls.slow_iter(deb822_file.iter_parts(), yield_every=20): 

821 if part.is_comment: 

822 pos = part.position_in_file() 

823 sem_token_state.emit_token( 

824 te_position_to_lsp(pos), 

825 # Avoid trailing newline 

826 position_codec.client_num_units(part.convert_to_text().rstrip()), 

827 comment_token_code, 

828 ) 

829 elif isinstance(part, Deb822ParagraphElement): 

830 await _deb822_paragraph_semantic_tokens_full( 

831 ls, 

832 sem_token_state, 

833 part, 

834 stanza_idx, 

835 ) 

836 stanza_idx += 1 

837 if not tokens: 837 ↛ 838line 837 didn't jump to line 838 because the condition on line 837 was never true

838 return None 

839 return SemanticTokens(tokens) 

840 

841 

842def _complete_field_name( 

843 lint_state: LintState, 

844 stanza_metadata: StanzaMetadata[Any], 

845 matched_stanzas: Iterable[Deb822ParagraphElement], 

846 markdown_kind: MarkupKind, 

847) -> Sequence[CompletionItem]: 

848 items = [] 

849 matched_stanzas = list(matched_stanzas) 

850 seen_fields = set( 

851 stanza_metadata.normalize_field_name(f.lower()) 

852 for f in chain.from_iterable( 

853 # The typing from python3-debian is not entirely optimal here. The iter always return a 

854 # `str`, but the provided type is `ParagraphKey` (because `__getitem__` supports those) 

855 # and that is not exclusively a `str`. 

856 # 

857 # So, this cast for now 

858 cast("Iterable[str]", s) 

859 for s in matched_stanzas 

860 ) 

861 ) 

862 for cand_key, cand in stanza_metadata.items(): 

863 if stanza_metadata.normalize_field_name(cand_key.lower()) in seen_fields: 

864 continue 

865 item = cand.complete_field(lint_state, matched_stanzas, markdown_kind) 

866 if item is not None: 

867 items.append(item) 

868 return items