Coverage for src/debputy/transformation_rules.py: 74%

282 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2import os 

3from typing import ( 

4 NoReturn, 

5 Optional, 

6 Callable, 

7 Sequence, 

8 Tuple, 

9 List, 

10 Literal, 

11 Dict, 

12 TypeVar, 

13 cast, 

14 final, 

15) 

16 

17from debputy.exceptions import ( 

18 DebputyRuntimeError, 

19 PureVirtualPathError, 

20 TestPathWithNonExistentFSPathError, 

21) 

22from debputy.filesystem_scan import FSPath 

23from debputy.interpreter import ( 

24 extract_shebang_interpreter_from_file, 

25) 

26from debputy.manifest_conditions import ConditionContext, ManifestCondition 

27from debputy.manifest_parser.base_types import ( 

28 FileSystemMode, 

29 StaticFileSystemOwner, 

30 StaticFileSystemGroup, 

31) 

32from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

33from debputy.manifest_parser.util import AttributePath 

34from debputy.path_matcher import MatchRule 

35from debputy.plugin.api import VirtualPath 

36from debputy.plugin.debputy.types import DebputyCapability 

37from debputy.plugin.plugin_state import ( 

38 run_in_context_of_plugin_wrap_errors, 

39) 

40from debputy.util import _warn 

41 

42 

43class TransformationRuntimeError(DebputyRuntimeError): 

44 pass 

45 

46 

47CreateSymlinkReplacementRule = Literal[ 

48 "error-if-exists", 

49 "error-if-directory", 

50 "abort-on-non-empty-directory", 

51 "discard-existing", 

52] 

53 

54 

55VP = TypeVar("VP", bound=VirtualPath) 

56 

57 

58@dataclasses.dataclass(frozen=True, slots=True) 

59class PreProvidedExclusion: 

60 tag: str 

61 description: str 

62 pruner: Callable[[FSPath], None] 

63 

64 

65class TransformationRule(DebputyDispatchableType): 

66 

67 __slots__ = () 

68 

69 @final 

70 def run_transform_file_system( 

71 self, 

72 fs_root: FSPath, 

73 condition_context: ConditionContext, 

74 ) -> None: 

75 run_in_context_of_plugin_wrap_errors( 

76 self._debputy_plugin, 

77 self.transform_file_system, 

78 fs_root, 

79 condition_context, 

80 ) 

81 

82 def transform_file_system( 

83 self, 

84 fs_root: FSPath, 

85 condition_context: ConditionContext, 

86 ) -> None: 

87 raise NotImplementedError 

88 

89 def _evaluate_condition( 

90 self, 

91 condition: Optional[ManifestCondition], 

92 condition_context: ConditionContext, 

93 result_if_condition_is_missing: bool = True, 

94 ) -> bool: 

95 if condition is None: 95 ↛ 97line 95 didn't jump to line 97 because the condition on line 95 was always true

96 return result_if_condition_is_missing 

97 return condition.evaluate(condition_context) 

98 

99 def _error( 

100 self, 

101 msg: str, 

102 *, 

103 caused_by: Optional[BaseException] = None, 

104 ) -> NoReturn: 

105 raise TransformationRuntimeError(msg) from caused_by 

106 

107 def _match_rule_had_no_matches( 

108 self, match_rule: MatchRule, definition_source: str 

109 ) -> NoReturn: 

110 self._error( 

111 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did' 

112 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is" 

113 " incorrect." 

114 ) 

115 

116 def _fs_path_as_dir( 

117 self, 

118 path: VP, 

119 definition_source: str, 

120 ) -> VP: 

121 if path.is_dir: 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true

122 return path 

123 path_type = "file" if path.is_file else 'symlink/"special file system object"' 

124 self._error( 

125 f"The path {path.path} was expected to be a directory (or non-existing) due to" 

126 f" {definition_source}. However that path existed and is a {path_type}." 

127 f" You may need a `remove: {path.path}` prior to {definition_source} to" 

128 " to make this transformation succeed." 

129 ) 

130 

131 def _ensure_is_directory( 

132 self, 

133 fs_root: FSPath, 

134 path_to_directory: str, 

135 definition_source: str, 

136 ) -> FSPath: 

137 current, missing_parts = fs_root.attempt_lookup(path_to_directory) 

138 current = self._fs_path_as_dir(cast("FSPath", current), definition_source) 

139 if missing_parts: 

140 return current.mkdirs("/".join(missing_parts)) 

141 return current 

142 

143 

144class RemoveTransformationRule(TransformationRule): 

145 __slots__ = ( 

146 "_match_rules", 

147 "_keep_empty_parent_dirs", 

148 "_definition_source", 

149 ) 

150 

151 def __init__( 

152 self, 

153 match_rules: Sequence[MatchRule], 

154 keep_empty_parent_dirs: bool, 

155 definition_source: AttributePath, 

156 ) -> None: 

157 super().__init__() 

158 self._match_rules = match_rules 

159 self._keep_empty_parent_dirs = keep_empty_parent_dirs 

160 self._definition_source = definition_source.path 

161 

162 def transform_file_system( 

163 self, 

164 fs_root: FSPath, 

165 condition_context: ConditionContext, 

166 ) -> None: 

167 matched_any = False 

168 for match_rule in self._match_rules: 

169 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a 

170 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52 

171 matches = list(match_rule.finditer(fs_root)) 

172 for m in matches: 

173 matched_any = True 

174 parent = m.parent_dir 

175 if parent is None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 self._error( 

177 f"Cannot remove the root directory (triggered by {self._definition_source})" 

178 ) 

179 m.unlink(recursive=True) 

180 if not self._keep_empty_parent_dirs: 

181 parent.prune_if_empty_dir() 

182 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures 

183 if not matched_any: 

184 self._match_rule_had_no_matches(match_rule, self._definition_source) 

185 

186 

187class MoveTransformationRule(TransformationRule): 

188 __slots__ = ( 

189 "_match_rule", 

190 "_dest_path", 

191 "_dest_is_dir", 

192 "_definition_source", 

193 "_condition", 

194 ) 

195 

196 def __init__( 

197 self, 

198 match_rule: MatchRule, 

199 dest_path: str, 

200 dest_is_dir: bool, 

201 definition_source: AttributePath, 

202 condition: Optional[ManifestCondition], 

203 ) -> None: 

204 super().__init__() 

205 self._match_rule = match_rule 

206 self._dest_path = dest_path 

207 self._dest_is_dir = dest_is_dir 

208 self._definition_source = definition_source.path 

209 self._condition = condition 

210 

211 def transform_file_system( 

212 self, fs_root: FSPath, condition_context: ConditionContext 

213 ) -> None: 

214 if not self._evaluate_condition(self._condition, condition_context): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 return 

216 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la) 

217 matches = list(self._match_rule.finditer(fs_root)) 

218 if not matches: 

219 self._match_rule_had_no_matches(self._match_rule, self._definition_source) 

220 

221 target_dir: Optional[VirtualPath] 

222 if self._dest_is_dir: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 target_dir = self._ensure_is_directory( 

224 fs_root, 

225 self._dest_path, 

226 self._definition_source, 

227 ) 

228 else: 

229 dir_part, basename = os.path.split(self._dest_path) 

230 target_parent_dir = self._ensure_is_directory( 

231 fs_root, 

232 dir_part, 

233 self._definition_source, 

234 ) 

235 target_dir = target_parent_dir.get(basename) 

236 

237 if target_dir is None or not target_dir.is_dir: 237 ↛ 257line 237 didn't jump to line 257 because the condition on line 237 was always true

238 if len(matches) > 1: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 self._error( 

240 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

241 f" (from: {self._definition_source}). Multiple paths matched the pattern and the" 

242 " destination was not a directory. Either correct the pattern to only match only source" 

243 " OR define the destination to be a directory (E.g., add a trailing slash - example:" 

244 f' "{self._dest_path}/")' 

245 ) 

246 p = matches[0] 

247 if p.path == self._dest_path: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 self._error( 

249 f"Error in {self._definition_source}, the source" 

250 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the" 

251 " rename redundant!?" 

252 ) 

253 p.parent_dir = target_parent_dir 

254 p.name = basename 

255 return 

256 

257 assert target_dir is not None and target_dir.is_dir 

258 basenames: Dict[str, VirtualPath] = dict() 

259 target_dir_path = target_dir.path 

260 

261 for m in matches: 

262 if m.path == target_dir_path: 

263 self._error( 

264 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}" 

265 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into" 

266 " itself" 

267 ) 

268 if m.name in basenames: 

269 alt_path = basenames[m.name] 

270 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be 

271 # possible for a *single* glob to match the same path twice. 

272 assert alt_path is not m 

273 self._error( 

274 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

275 f" (from: {self._definition_source}). Multiple paths matched the pattern had the" 

276 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the' 

277 f" pattern, so it only matches one path with that basename to avoid this conflict." 

278 ) 

279 existing = m.get(m.name) 

280 if existing and existing.is_dir: 

281 self._error( 

282 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

283 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace" 

284 f" the existing directory {existing.path}. If this replacement is intentional, then please" 

285 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)' 

286 ) 

287 basenames[m.name] = m 

288 m.parent_dir = target_dir 

289 

290 

291class CreateSymlinkPathTransformationRule(TransformationRule): 

292 __slots__ = ( 

293 "_link_dest", 

294 "_link_target", 

295 "_replacement_rule", 

296 "_definition_source", 

297 "_condition", 

298 ) 

299 

300 def __init__( 

301 self, 

302 link_target: str, 

303 link_dest: str, 

304 replacement_rule: CreateSymlinkReplacementRule, 

305 definition_source: AttributePath, 

306 condition: Optional[ManifestCondition], 

307 ) -> None: 

308 super().__init__() 

309 self._link_target = link_target 

310 self._link_dest = link_dest 

311 self._replacement_rule = replacement_rule 

312 self._definition_source = definition_source.path 

313 self._condition = condition 

314 

315 def transform_file_system( 

316 self, 

317 fs_root: FSPath, 

318 condition_context: ConditionContext, 

319 ) -> None: 

320 if not self._evaluate_condition(self._condition, condition_context): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 return 

322 dir_path_part, link_name = os.path.split(self._link_dest) 

323 dir_path = self._ensure_is_directory( 

324 fs_root, 

325 dir_path_part, 

326 self._definition_source, 

327 ) 

328 existing = dir_path.get(link_name) 

329 if existing: 

330 self._handle_existing_path(existing) 

331 dir_path.add_symlink(link_name, self._link_target) 

332 

333 def _handle_existing_path(self, existing: VirtualPath) -> None: 

334 replacement_rule = self._replacement_rule 

335 if replacement_rule == "abort-on-non-empty-directory": 

336 unlink = not existing.is_dir or not any(existing.iterdir) 

337 reason = "the path is a non-empty directory" 

338 elif replacement_rule == "discard-existing": 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 unlink = True 

340 reason = "<<internal error: you should not see an error with this message>>" 

341 elif replacement_rule == "error-if-directory": 

342 unlink = not existing.is_dir 

343 reason = "the path is a directory" 

344 else: 

345 assert replacement_rule == "error-if-exists" 

346 unlink = False 

347 reason = "the path exists" 

348 

349 if unlink: 

350 existing.unlink(recursive=True) 

351 else: 

352 self._error( 

353 f"Refusing to replace {existing.path} with a symlink; {reason} and" 

354 f" the active replacement-rule was {self._replacement_rule}. You can" 

355 f' set the replacement-rule to "discard-existing", if you are not interested' 

356 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}." 

357 ) 

358 

359 

360class CreateDirectoryTransformationRule(TransformationRule): 

361 __slots__ = ( 

362 "_directories", 

363 "_owner", 

364 "_group", 

365 "_mode", 

366 "_definition_source", 

367 "_condition", 

368 ) 

369 

370 def __init__( 

371 self, 

372 directories: Sequence[str], 

373 owner: Optional[StaticFileSystemOwner], 

374 group: Optional[StaticFileSystemGroup], 

375 mode: Optional[FileSystemMode], 

376 definition_source: str, 

377 condition: Optional[ManifestCondition], 

378 ) -> None: 

379 super().__init__() 

380 self._directories = directories 

381 self._owner = owner 

382 self._group = group 

383 self._mode = mode 

384 self._definition_source = definition_source 

385 self._condition = condition 

386 

387 def transform_file_system( 

388 self, 

389 fs_root: FSPath, 

390 condition_context: ConditionContext, 

391 ) -> None: 

392 if not self._evaluate_condition(self._condition, condition_context): 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 return 

394 owner = self._owner 

395 group = self._group 

396 mode = self._mode 

397 for directory in self._directories: 

398 dir_path = self._ensure_is_directory( 

399 fs_root, 

400 directory, 

401 self._definition_source, 

402 ) 

403 

404 if mode is not None: 

405 try: 

406 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir) 

407 except ValueError as e: 

408 self._error( 

409 f"Could not compute desired mode for {dir_path.path} as" 

410 f" requested in {self._definition_source}: {e.args[0]}", 

411 caused_by=e, 

412 ) 

413 dir_path.mode = desired_mode 

414 dir_path.chown(owner, group) 

415 

416 

417def _apply_owner_and_mode( 

418 path: VirtualPath, 

419 owner: Optional[StaticFileSystemOwner], 

420 group: Optional[StaticFileSystemGroup], 

421 mode: Optional[FileSystemMode], 

422 capabilities: Optional[str], 

423 capability_mode: Optional[FileSystemMode], 

424 definition_source: str, 

425) -> None: 

426 if owner is not None or group is not None: 426 ↛ 428line 426 didn't jump to line 428 because the condition on line 426 was always true

427 path.chown(owner, group) 

428 if mode is not None: 428 ↛ 438line 428 didn't jump to line 438 because the condition on line 428 was always true

429 try: 

430 desired_mode = mode.compute_mode(path.mode, path.is_dir) 

431 except ValueError as e: 

432 raise TransformationRuntimeError( 

433 f"Could not compute desired mode for {path.path} as" 

434 f" requested in {definition_source}: {e.args[0]}" 

435 ) from e 

436 path.mode = desired_mode 

437 

438 if path.is_file and capabilities is not None: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 cap_ref = path.metadata(DebputyCapability) 

440 cap_value = cap_ref.value 

441 if cap_value is not None: 

442 _warn( 

443 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due" 

444 f" to {definition_source}." 

445 ) 

446 assert capability_mode is not None 

447 cap_ref.value = DebputyCapability( 

448 capabilities, 

449 capability_mode, 

450 definition_source, 

451 ) 

452 

453 

454class PathMetadataTransformationRule(TransformationRule): 

455 __slots__ = ( 

456 "_match_rules", 

457 "_owner", 

458 "_group", 

459 "_mode", 

460 "_capabilities", 

461 "_capability_mode", 

462 "_recursive", 

463 "_definition_source", 

464 "_condition", 

465 ) 

466 

467 def __init__( 

468 self, 

469 match_rules: Sequence[MatchRule], 

470 owner: Optional[StaticFileSystemOwner], 

471 group: Optional[StaticFileSystemGroup], 

472 mode: Optional[FileSystemMode], 

473 recursive: bool, 

474 capabilities: Optional[str], 

475 capability_mode: Optional[FileSystemMode], 

476 definition_source: str, 

477 condition: Optional[ManifestCondition], 

478 ) -> None: 

479 super().__init__() 

480 self._match_rules = match_rules 

481 self._owner = owner 

482 self._group = group 

483 self._mode = mode 

484 self._capabilities = capabilities 

485 self._capability_mode = capability_mode 

486 self._recursive = recursive 

487 self._definition_source = definition_source 

488 self._condition = condition 

489 if self._capabilities is None and self._capability_mode is not None: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 raise ValueError("capability_mode without capabilities") 

491 if self._capabilities is not None and self._capability_mode is None: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 raise ValueError("capabilities without capability_mode") 

493 

494 def transform_file_system( 

495 self, 

496 fs_root: FSPath, 

497 condition_context: ConditionContext, 

498 ) -> None: 

499 if not self._evaluate_condition(self._condition, condition_context): 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true

500 return 

501 owner = self._owner 

502 group = self._group 

503 mode = self._mode 

504 capabilities = self._capabilities 

505 capability_mode = self._capability_mode 

506 definition_source = self._definition_source 

507 d: Optional[List[FSPath]] = [] if self._recursive else None 

508 needs_file_match = True 

509 if self._owner is not None or self._group is not None or self._mode is not None: 509 ↛ 512line 509 didn't jump to line 512 because the condition on line 509 was always true

510 needs_file_match = False 

511 

512 for match_rule in self._match_rules: 

513 match_ok = False 

514 saw_symlink = False 

515 saw_directory = False 

516 

517 for path in match_rule.finditer(fs_root): 

518 if path.is_symlink: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 saw_symlink = True 

520 continue 

521 if path.is_file or not needs_file_match: 521 ↛ 523line 521 didn't jump to line 523 because the condition on line 521 was always true

522 match_ok = True 

523 if path.is_dir: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true

524 saw_directory = True 

525 if not match_ok and needs_file_match and self._recursive: 

526 match_ok = any(p.is_file for p in path.all_paths()) 

527 _apply_owner_and_mode( 

528 path, 

529 owner, 

530 group, 

531 mode, 

532 capabilities, 

533 capability_mode, 

534 definition_source, 

535 ) 

536 if path.is_dir and d is not None: 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true

537 d.append(path) 

538 

539 if not match_ok: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 if needs_file_match and (saw_directory or saw_symlink): 

541 _warn( 

542 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

543 " did not match any files, but given the attributes it can only apply to files." 

544 ) 

545 elif saw_symlink: 

546 _warn( 

547 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

548 ' matched symlinks, but "path-metadata" cannot apply to symlinks.' 

549 ) 

550 self._match_rule_had_no_matches(match_rule, self._definition_source) 

551 

552 if not d: 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true

553 return 

554 for recurse_dir in d: 

555 for path in recurse_dir.all_paths(): 

556 if path.is_symlink: 

557 continue 

558 _apply_owner_and_mode( 

559 path, 

560 owner, 

561 group, 

562 mode, 

563 capabilities, 

564 capability_mode, 

565 definition_source, 

566 ) 

567 

568 

569class ModeNormalizationTransformationRule(TransformationRule): 

570 __slots__ = ("_normalizations",) 

571 

572 def __init__( 

573 self, 

574 normalizations: Sequence[Tuple[MatchRule, FileSystemMode]], 

575 ) -> None: 

576 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

577 # be a "TransformationRule" (hindsight and all) 

578 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

579 self._normalizations = normalizations 

580 

581 def transform_file_system( 

582 self, 

583 fs_root: FSPath, 

584 condition_context: ConditionContext, 

585 ) -> None: 

586 seen = set() 

587 for match_rule, fs_mode in self._normalizations: 

588 for path in match_rule.finditer( 

589 fs_root, ignore_paths=lambda p: p.path in seen 

590 ): 

591 if path.is_symlink or path.path in seen: 

592 continue 

593 seen.add(path.path) 

594 try: 

595 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir) 

596 except ValueError as e: 

597 raise AssertionError( 

598 "Error while applying built-in mode normalization rule" 

599 ) from e 

600 path.mode = desired_mode 

601 

602 

603class NormalizeShebangLineTransformation(TransformationRule): 

604 

605 def __init__(self) -> None: 

606 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

607 # be a "TransformationRule" (hindsight and all) 

608 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

609 

610 def transform_file_system( 

611 self, 

612 fs_root: VirtualPath, 

613 condition_context: ConditionContext, 

614 ) -> None: 

615 for path in fs_root.all_paths(): 

616 if not path.is_file: 

617 continue 

618 try: 

619 with path.open(byte_io=True, buffering=4096) as fd: 

620 interpreter = extract_shebang_interpreter_from_file(fd) 

621 except (PureVirtualPathError, TestPathWithNonExistentFSPathError): 

622 # Do not make tests unnecessarily complex to write 

623 continue 

624 if interpreter is None: 

625 continue 

626 

627 if interpreter.fixup_needed: 

628 interpreter.replace_shebang_line(path)