Coverage for src/debputy/lsp/debputy_ls.py: 57%

347 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2import os 

3import time 

4from typing import ( 

5 Optional, 

6 List, 

7 Any, 

8 Mapping, 

9 Container, 

10 TYPE_CHECKING, 

11 Tuple, 

12 Literal, 

13 Set, 

14) 

15 

16import debputy.l10n as l10n 

17from debputy.dh.dh_assistant import ( 

18 parse_drules_for_addons, 

19 DhSequencerData, 

20 extract_dh_addons_from_control, 

21) 

22from debputy.filesystem_scan import FSROOverlay, VirtualPathBase 

23from debputy.l10n import Translations 

24from debputy.linting.lint_util import ( 

25 LintState, 

26 LinterImpl, 

27) 

28from debputy.lsp.apt_cache import AptCache 

29from debputy.lsp.maint_prefs import ( 

30 MaintainerPreferenceTable, 

31 MaintainerPreference, 

32 determine_effective_preference, 

33) 

34from debputy.lsp.text_util import LintCapablePositionCodec 

35from debputy.lsp.vendoring._deb822_repro import Deb822FileElement, parse_deb822_file 

36from debputy.packages import ( 

37 SourcePackage, 

38 BinaryPackage, 

39 DctrlParser, 

40) 

41from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

42from debputy.util import _info, _warn 

43from debputy.yaml import MANIFEST_YAML, YAMLError 

44from debputy.yaml.compat import CommentedMap 

45 

46if TYPE_CHECKING: 

47 import lsprotocol.types as types 

48 from pygls.server import LanguageServer 

49 from pygls.workspace import TextDocument 

50 from pygls.uris import from_fs_path 

51 

52else: 

53 import debputy.lsprotocol.types as types 

54 

55 try: 

56 from pygls.server import LanguageServer 

57 from pygls.workspace import TextDocument 

58 from pygls.uris import from_fs_path 

59 except ImportError as e: 

60 

61 class LanguageServer: 

62 def __init__(self, *args, **kwargs) -> None: 

63 """Placeholder to work if pygls is not installed""" 

64 # Should not be called 

65 global e 

66 raise e # pragma: no cover 

67 

68 

69@dataclasses.dataclass(slots=True) 

70class FileCache: 

71 doc_uri: str 

72 path: str 

73 is_open_in_editor: Optional[bool] = None 

74 last_doc_version: Optional[int] = None 

75 last_mtime: Optional[float] = None 

76 is_valid: bool = False 

77 

78 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

79 raise NotImplementedError 

80 

81 def _clear_cache(self) -> None: 

82 raise NotImplementedError 

83 

84 def resolve_cache(self, ls: "DebputyLanguageServer") -> bool: 

85 doc = ls.workspace.text_documents.get(self.doc_uri) 

86 if doc is None: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 doc = ls.workspace.get_text_document(self.doc_uri) 

88 is_open = False 

89 else: 

90 is_open = True 

91 new_content: Optional[str] = None 

92 if is_open: 92 ↛ 104line 92 didn't jump to line 104 because the condition on line 92 was always true

93 last_doc_version = self.last_doc_version 

94 dctrl_doc_version = doc.version 

95 if ( 95 ↛ 103line 95 didn't jump to line 103

96 not self.is_open_in_editor 

97 or last_doc_version is None 

98 or dctrl_doc_version is None 

99 or last_doc_version < dctrl_doc_version 

100 ): 

101 new_content = doc.source 

102 

103 self.last_doc_version = doc.version 

104 elif doc.uri.startswith("file://"): 

105 try: 

106 with open(self.path) as fd: 

107 st = os.fstat(fd.fileno()) 

108 current_mtime = st.st_mtime 

109 last_mtime = self.last_mtime or current_mtime - 1 

110 if self.is_open_in_editor or current_mtime > last_mtime: 

111 new_content = fd.read() 

112 self.last_mtime = current_mtime 

113 except FileNotFoundError: 

114 self._clear_cache() 

115 self.is_valid = False 

116 return False 

117 if new_content is not None: 117 ↛ 119line 117 didn't jump to line 119 because the condition on line 117 was always true

118 self._update_cache(doc, new_content) 

119 self.is_valid = True 

120 return True 

121 

122 

123@dataclasses.dataclass(slots=True) 

124class Deb822FileCache(FileCache): 

125 deb822_file: Optional[Deb822FileElement] = None 

126 

127 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

128 deb822_file = parse_deb822_file( 

129 source.splitlines(keepends=True), 

130 accept_files_with_error_tokens=True, 

131 accept_files_with_duplicated_fields=True, 

132 ) 

133 self.deb822_file = deb822_file 

134 

135 def _clear_cache(self) -> None: 

136 self.deb822_file = None 

137 

138 

139@dataclasses.dataclass(slots=True) 

140class DctrlFileCache(Deb822FileCache): 

141 dctrl_parser: Optional[DctrlParser] = None 

142 source_package: Optional[SourcePackage] = None 

143 binary_packages: Optional[Mapping[str, BinaryPackage]] = None 

144 

145 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

146 deb822_file, source_package, binary_packages = ( 

147 self.dctrl_parser.parse_source_debian_control( 

148 source.splitlines(keepends=True), 

149 ignore_errors=True, 

150 ) 

151 ) 

152 self.deb822_file = deb822_file 

153 self.source_package = source_package 

154 self.binary_packages = binary_packages 

155 

156 def _clear_cache(self) -> None: 

157 super()._clear_cache() 

158 self.source_package = None 

159 self.binary_packages = None 

160 

161 

162@dataclasses.dataclass(slots=True) 

163class SalsaCICache(FileCache): 

164 parsed_content: Optional[CommentedMap] = None 

165 

166 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

167 try: 

168 value = MANIFEST_YAML.load(source) 

169 if isinstance(value, CommentedMap): 

170 self.parsed_content = value 

171 except YAMLError: 

172 pass 

173 

174 def _clear_cache(self) -> None: 

175 self.parsed_content = None 

176 

177 

178@dataclasses.dataclass(slots=True) 

179class DebianRulesCache(FileCache): 

180 sequences: Optional[Set[str]] = None 

181 saw_dh: bool = False 

182 

183 def _update_cache(self, doc: "TextDocument", source: str) -> None: 

184 sequences = set() 

185 self.saw_dh = parse_drules_for_addons( 

186 source.splitlines(), 

187 sequences, 

188 ) 

189 self.sequences = sequences 

190 

191 def _clear_cache(self) -> None: 

192 self.sequences = None 

193 self.saw_dh = False 

194 

195 

196class LSProvidedLintState(LintState): 

197 def __init__( 

198 self, 

199 ls: "DebputyLanguageServer", 

200 doc: "TextDocument", 

201 source_root: str, 

202 debian_dir_path: str, 

203 dctrl_parser: DctrlParser, 

204 ) -> None: 

205 self._ls = ls 

206 self._doc = doc 

207 # Cache lines (doc.lines re-splits everytime) 

208 self._lines = doc.lines 

209 self._source_root = FSROOverlay.create_root_dir(".", source_root) 

210 debian_dir = self._source_root.get("debian") 

211 if debian_dir is not None and not debian_dir.is_dir: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 debian_dir = None 

213 self._debian_dir = debian_dir 

214 self._diagnostics: Optional[List[types.Diagnostic]] = None 

215 dctrl_file = os.path.join(debian_dir_path, "control") 

216 

217 if dctrl_file != doc.path: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 self._dctrl_cache: DctrlFileCache = DctrlFileCache( 

219 from_fs_path(dctrl_file), 

220 dctrl_file, 

221 dctrl_parser=dctrl_parser, 

222 ) 

223 self._deb822_file: Deb822FileCache = Deb822FileCache( 

224 doc.uri, 

225 doc.path, 

226 ) 

227 else: 

228 self._dctrl_cache: DctrlFileCache = DctrlFileCache( 

229 doc.uri, 

230 doc.path, 

231 dctrl_parser=dctrl_parser, 

232 ) 

233 self._deb822_file = self._dctrl_cache 

234 

235 self._salsa_ci_caches = [ 

236 SalsaCICache( 

237 from_fs_path(os.path.join(debian_dir_path, p)), 

238 os.path.join(debian_dir_path, p), 

239 ) 

240 for p in ("salsa-ci.yml", os.path.join("..", ".gitlab-ci.yml")) 

241 ] 

242 drules_path = os.path.join(debian_dir_path, "rules") 

243 self._drules_cache = DebianRulesCache( 

244 from_fs_path(drules_path) if doc.path != drules_path else doc.uri, 

245 drules_path, 

246 ) 

247 

248 @property 

249 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

250 return self._ls.plugin_feature_set 

251 

252 @property 

253 def doc_uri(self) -> str: 

254 return self._doc.uri 

255 

256 @property 

257 def source_root(self) -> Optional[VirtualPathBase]: 

258 return self._source_root 

259 

260 @property 

261 def debian_dir(self) -> Optional[VirtualPathBase]: 

262 return self._debian_dir 

263 

264 @property 

265 def path(self) -> str: 

266 return self._doc.path 

267 

268 @property 

269 def content(self) -> str: 

270 return self._doc.source 

271 

272 @property 

273 def lines(self) -> List[str]: 

274 return self._lines 

275 

276 @property 

277 def position_codec(self) -> LintCapablePositionCodec: 

278 return self._doc.position_codec 

279 

280 def _resolve_dctrl(self) -> Optional[DctrlFileCache]: 

281 dctrl_cache = self._dctrl_cache 

282 dctrl_cache.resolve_cache(self._ls) 

283 return dctrl_cache 

284 

285 @property 

286 def parsed_deb822_file_content(self) -> Optional[Deb822FileElement]: 

287 cache = self._deb822_file 

288 cache.resolve_cache(self._ls) 

289 return cache.deb822_file 

290 

291 @property 

292 def source_package(self) -> Optional[SourcePackage]: 

293 return self._resolve_dctrl().source_package 

294 

295 @property 

296 def binary_packages(self) -> Optional[Mapping[str, BinaryPackage]]: 

297 return self._resolve_dctrl().binary_packages 

298 

299 def _resolve_salsa_ci(self) -> Optional[CommentedMap]: 

300 for salsa_ci_cache in self._salsa_ci_caches: 

301 if salsa_ci_cache.resolve_cache(self._ls): 

302 return salsa_ci_cache.parsed_content 

303 return None 

304 

305 @property 

306 def effective_preference(self) -> Optional[MaintainerPreference]: 

307 source_package = self.source_package 

308 salsa_ci = self._resolve_salsa_ci() 

309 if source_package is None and salsa_ci is None: 

310 return None 

311 style, _, _ = determine_effective_preference( 

312 self.maint_preference_table, 

313 source_package, 

314 salsa_ci, 

315 ) 

316 return style 

317 

318 @property 

319 def maint_preference_table(self) -> MaintainerPreferenceTable: 

320 return self._ls.maint_preferences 

321 

322 @property 

323 def salsa_ci(self) -> Optional[CommentedMap]: 

324 return None 

325 

326 @property 

327 def dh_sequencer_data(self) -> DhSequencerData: 

328 dh_sequences: Set[str] = set() 

329 saw_dh = False 

330 src_pkg = self.source_package 

331 drules_cache = self._drules_cache 

332 if drules_cache.resolve_cache(self._ls): 

333 saw_dh = drules_cache.saw_dh 

334 if drules_cache.sequences: 

335 dh_sequences.update(drules_cache.sequences) 

336 if src_pkg: 

337 extract_dh_addons_from_control(src_pkg.fields, dh_sequences) 

338 

339 return DhSequencerData( 

340 frozenset(dh_sequences), 

341 saw_dh, 

342 ) 

343 

344 def translation(self, domain: str) -> Translations: 

345 return self._ls.translation(domain) 

346 

347 def run_diagnostics( 

348 self, 

349 linter: LinterImpl, 

350 ) -> List[types.Diagnostic]: 

351 if self._diagnostics is not None: 

352 raise RuntimeError( 

353 "run_diagnostics cannot be run while it is already running" 

354 ) 

355 self._diagnostics = diagnostics = [] 

356 

357 linter(self) 

358 

359 self._diagnostics = None 

360 return diagnostics 

361 

362 def _emit_diagnostic(self, diagnostic: types.Diagnostic) -> None: 

363 diagnostics = self._diagnostics 

364 if diagnostics is None: 

365 raise TypeError("Cannot run emit_diagnostic outside of run_diagnostics") 

366 diagnostics.append(diagnostic) 

367 

368 

369def _preference( 

370 client_preference: Optional[List[types.MarkupKind]], 

371 options: Container[types.MarkupKind], 

372 fallback_kind: types.MarkupKind, 

373) -> types.MarkupKind: 

374 if not client_preference: 374 ↛ 376line 374 didn't jump to line 376 because the condition on line 374 was always true

375 return fallback_kind 

376 for markdown_kind in client_preference: 

377 if markdown_kind in options: 

378 return markdown_kind 

379 return fallback_kind 

380 

381 

382class DebputyLanguageServer(LanguageServer): 

383 

384 def __init__( 

385 self, 

386 *args: Any, 

387 **kwargs: Any, 

388 ) -> None: 

389 super().__init__(*args, **kwargs) 

390 self._dctrl_parser: Optional[DctrlParser] = None 

391 self._plugin_feature_set: Optional[PluginProvidedFeatureSet] = None 

392 self._trust_language_ids: Optional[bool] = None 

393 self._finished_initialization = False 

394 self.maint_preferences = MaintainerPreferenceTable({}, {}) 

395 self.apt_cache = AptCache() 

396 self.background_tasks = set() 

397 self.client_locale: Optional[str] = None 

398 self.forced_locale: Optional[str] = None 

399 self._active_locale: Optional[List[str]] = None 

400 

401 def finish_startup_initialization(self) -> None: 

402 if self._finished_initialization: 

403 return 

404 assert self._dctrl_parser is not None 

405 assert self._plugin_feature_set is not None 

406 assert self._trust_language_ids is not None 

407 self.maint_preferences = self.maint_preferences.load_preferences() 

408 _info( 

409 f"Loaded style preferences: {len(self.maint_preferences.maintainer_preferences)} unique maintainer preferences recorded" 

410 ) 

411 self._finished_initialization = True 

412 

413 async def on_initialize(self, params: types.InitializeParams) -> None: 

414 task = self.loop.create_task(self._load_apt_cache(), name="Index apt cache") 

415 self.background_tasks.add(task) 

416 task.add_done_callback(self.background_tasks.discard) 

417 self.client_locale = params.locale 

418 if self.forced_locale is not None: 

419 _info( 

420 f"Ignoring client locale: {self.client_locale}. Using {self.forced_locale} instead as requested [--force-locale]" 

421 ) 

422 else: 

423 _info( 

424 f"Client locale: {self.client_locale}. Use --force-locale to override" 

425 ) 

426 self._update_locale() 

427 

428 def _update_locale(self) -> None: 

429 if self.forced_locale is not None: 

430 self._active_locale = [self.forced_locale] 

431 elif self.client_locale is not None: 

432 self._active_locale = [self.client_locale] 

433 else: 

434 self._active_locale = None 

435 

436 def shutdown(self) -> None: 

437 for task in self.background_tasks: 

438 _info(f"Cancelling task: {task.get_name()}") 

439 self.loop.call_soon_threadsafe(task.cancel) 

440 return super().shutdown() 

441 

442 def translation(self, domain: str) -> Translations: 

443 return l10n.translation( 

444 domain, 

445 languages=self._active_locale, 

446 ) 

447 

448 async def _load_apt_cache(self) -> None: 

449 if self.apt_cache.state in ("loading", "loaded"): 

450 _info( 

451 f"The apt cache data is already in state {self.apt_cache.state}, not re-triggering" 

452 ) 

453 return 

454 _info("Starting load of apt cache data") 

455 start = time.time() 

456 try: 

457 await self.apt_cache.load() 

458 except ValueError as ex: 

459 _warn(f"Could not load apt cache: {ex}") 

460 else: 

461 end = time.time() 

462 _info( 

463 f"Loading apt cache finished after {end-start} seconds and is now in state {self.apt_cache.state}" 

464 ) 

465 

466 @property 

467 def plugin_feature_set(self) -> PluginProvidedFeatureSet: 

468 res = self._plugin_feature_set 

469 if res is None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true

470 raise RuntimeError( 

471 "Initialization error: The plugin feature set has not been initialized before it was needed." 

472 ) 

473 return res 

474 

475 @plugin_feature_set.setter 

476 def plugin_feature_set(self, plugin_feature_set: PluginProvidedFeatureSet) -> None: 

477 if self._plugin_feature_set is not None: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 raise RuntimeError( 

479 "The plugin_feature_set attribute cannot be changed once set" 

480 ) 

481 self._plugin_feature_set = plugin_feature_set 

482 

483 @property 

484 def dctrl_parser(self) -> DctrlParser: 

485 res = self._dctrl_parser 

486 if res is None: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 raise RuntimeError( 

488 "Initialization error: The dctrl_parser has not been initialized before it was needed." 

489 ) 

490 return res 

491 

492 @dctrl_parser.setter 

493 def dctrl_parser(self, parser: DctrlParser) -> None: 

494 if self._dctrl_parser is not None: 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true

495 raise RuntimeError("The dctrl_parser attribute cannot be changed once set") 

496 self._dctrl_parser = parser 

497 

498 def lint_state(self, doc: "TextDocument") -> LSProvidedLintState: 

499 dir_path = os.path.dirname(doc.path) 

500 

501 while dir_path and dir_path != "/" and os.path.basename(dir_path) != "debian": 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true

502 dir_path = os.path.dirname(dir_path) 

503 

504 source_root = os.path.dirname(dir_path) 

505 

506 return LSProvidedLintState(self, doc, source_root, dir_path, self.dctrl_parser) 

507 

508 @property 

509 def _client_hover_markup_formats(self) -> Optional[List[types.MarkupKind]]: 

510 try: 

511 return ( 

512 self.client_capabilities.text_document.hover.content_format 

513 ) # type : ignore 

514 except AttributeError: 

515 return None 

516 

517 def hover_markup_format( 

518 self, 

519 *options: types.MarkupKind, 

520 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

521 ) -> types.MarkupKind: 

522 """Pick the client preferred hover markup format from a set of options 

523 

524 :param options: The markup kinds possible. 

525 :param fallback_kind: If no overlapping option was found in the client preferences 

526 (or client did not announce a value at all), this parameter is returned instead. 

527 :returns: The client's preferred markup format from the provided options, or, 

528 (if there is no overlap), the `fallback_kind` value is returned. 

529 """ 

530 client_preference = self._client_hover_markup_formats 

531 return _preference(client_preference, frozenset(options), fallback_kind) 

532 

533 @property 

534 def _client_completion_item_document_markup_formats( 

535 self, 

536 ) -> Optional[List[types.MarkupKind]]: 

537 try: 

538 return ( 

539 self.client_capabilities.text_document.completion.completion_item.documentation_format # type : ignore 

540 ) 

541 except AttributeError: 

542 return None 

543 

544 def completion_item_document_markup( 

545 self, 

546 *options: types.MarkupKind, 

547 fallback_kind: types.MarkupKind = types.MarkupKind.PlainText, 

548 ) -> types.MarkupKind: 

549 """Pick the client preferred completion item documentation markup format from a set of options 

550 

551 :param options: The markup kinds possible. 

552 :param fallback_kind: If no overlapping option was found in the client preferences 

553 (or client did not announce a value at all), this parameter is returned instead. 

554 :returns: The client's preferred markup format from the provided options, or, 

555 (if there is no overlap), the `fallback_kind` value is returned. 

556 """ 

557 

558 client_preference = self._client_completion_item_document_markup_formats 

559 return _preference(client_preference, frozenset(options), fallback_kind) 

560 

561 @property 

562 def trust_language_ids(self) -> bool: 

563 v = self._trust_language_ids 

564 if v is None: 

565 return True 

566 return v 

567 

568 @trust_language_ids.setter 

569 def trust_language_ids(self, new_value: bool) -> None: 

570 self._trust_language_ids = new_value 

571 

572 def determine_language_id( 

573 self, 

574 doc: "TextDocument", 

575 ) -> Tuple[Literal["editor-provided", "filename"], str, str]: 

576 lang_id = doc.language_id 

577 path = doc.path 

578 try: 

579 last_idx = path.rindex("debian/") 

580 except ValueError: 

581 cleaned_filename = os.path.basename(path) 

582 else: 

583 cleaned_filename = path[last_idx:] 

584 

585 if self.trust_language_ids and lang_id and not lang_id.isspace(): 

586 if lang_id not in ("fundamental",): 

587 return "editor-provided", lang_id, cleaned_filename 

588 _info( 

589 f"Ignoring editor provided language ID: {lang_id} (reverting to filename based detection instead)" 

590 ) 

591 

592 return "filename", cleaned_filename, cleaned_filename