Coverage for src/debputy/transformation_rules.py: 74%

283 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import os 

3from typing import ( 

4 NoReturn, 

5 Optional, 

6 Tuple, 

7 List, 

8 Literal, 

9 Dict, 

10 TypeVar, 

11 cast, 

12 final, 

13) 

14from collections.abc import Callable, Sequence 

15 

16from debputy.exceptions import ( 

17 DebputyRuntimeError, 

18 PureVirtualPathError, 

19 TestPathWithNonExistentFSPathError, 

20) 

21from debputy.filesystem_scan import FSPath 

22from debputy.interpreter import ( 

23 extract_shebang_interpreter_from_file, 

24) 

25from debputy.manifest_conditions import ConditionContext, ManifestCondition 

26from debputy.manifest_parser.base_types import ( 

27 FileSystemMode, 

28 StaticFileSystemOwner, 

29 StaticFileSystemGroup, 

30) 

31from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

32from debputy.manifest_parser.util import AttributePath 

33from debputy.path_matcher import MatchRule 

34from debputy.plugin.api import VirtualPath 

35from debputy.plugins.debputy.types import DebputyCapability 

36from debputy.plugin.plugin_state import ( 

37 run_in_context_of_plugin_wrap_errors, 

38) 

39from debputy.util import _warn 

40 

41 

42class TransformationRuntimeError(DebputyRuntimeError): 

43 pass 

44 

45 

46CreateSymlinkReplacementRule = Literal[ 

47 "error-if-exists", 

48 "error-if-directory", 

49 "abort-on-non-empty-directory", 

50 "discard-existing", 

51] 

52 

53 

54VP = TypeVar("VP", bound=VirtualPath) 

55 

56 

57@dataclasses.dataclass(frozen=True, slots=True) 

58class PreProvidedExclusion: 

59 tag: str 

60 description: str 

61 pruner: Callable[[FSPath], None] 

62 

63 

64class TransformationRule(DebputyDispatchableType): 

65 

66 __slots__ = () 

67 

68 @final 

69 def run_transform_file_system( 

70 self, 

71 fs_root: FSPath, 

72 condition_context: ConditionContext, 

73 ) -> None: 

74 run_in_context_of_plugin_wrap_errors( 

75 self._debputy_plugin, 

76 self.transform_file_system, 

77 fs_root, 

78 condition_context, 

79 ) 

80 

81 def transform_file_system( 

82 self, 

83 fs_root: FSPath, 

84 condition_context: ConditionContext, 

85 ) -> None: 

86 raise NotImplementedError 

87 

88 def _evaluate_condition( 

89 self, 

90 condition: ManifestCondition | None, 

91 condition_context: ConditionContext, 

92 result_if_condition_is_missing: bool = True, 

93 ) -> bool: 

94 if condition is None: 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was always true

95 return result_if_condition_is_missing 

96 return condition.evaluate(condition_context) 

97 

98 def _error( 

99 self, 

100 msg: str, 

101 *, 

102 caused_by: BaseException | None = None, 

103 ) -> NoReturn: 

104 raise TransformationRuntimeError(msg) from caused_by 

105 

106 def _match_rule_had_no_matches( 

107 self, match_rule: MatchRule, definition_source: str 

108 ) -> NoReturn: 

109 self._error( 

110 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did' 

111 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is" 

112 " incorrect." 

113 ) 

114 

115 def _fs_path_as_dir( 

116 self, 

117 path: VP, 

118 definition_source: str, 

119 ) -> VP: 

120 if path.is_dir: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true

121 return path 

122 path_type = "file" if path.is_file else 'symlink/"special file system object"' 

123 self._error( 

124 f"The path {path.path} was expected to be a directory (or non-existing) due to" 

125 f" {definition_source}. However that path existed and is a {path_type}." 

126 f" You may need a `remove: {path.path}` prior to {definition_source} to" 

127 " to make this transformation succeed." 

128 ) 

129 

130 def _ensure_is_directory( 

131 self, 

132 fs_root: FSPath, 

133 path_to_directory: str, 

134 definition_source: str, 

135 ) -> FSPath: 

136 current, missing_parts = fs_root.attempt_lookup(path_to_directory) 

137 current = self._fs_path_as_dir(cast("FSPath", current), definition_source) 

138 if missing_parts: 

139 return current.mkdirs("/".join(missing_parts)) 

140 return current 

141 

142 

143class RemoveTransformationRule(TransformationRule): 

144 __slots__ = ( 

145 "_match_rules", 

146 "_keep_empty_parent_dirs", 

147 "_definition_source", 

148 ) 

149 

150 def __init__( 

151 self, 

152 match_rules: Sequence[MatchRule], 

153 keep_empty_parent_dirs: bool, 

154 definition_source: AttributePath, 

155 ) -> None: 

156 super().__init__() 

157 self._match_rules = match_rules 

158 self._keep_empty_parent_dirs = keep_empty_parent_dirs 

159 self._definition_source = definition_source.path 

160 

161 def transform_file_system( 

162 self, 

163 fs_root: FSPath, 

164 condition_context: ConditionContext, 

165 ) -> None: 

166 matched_any = False 

167 for match_rule in self._match_rules: 

168 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a 

169 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52 

170 matches = list(match_rule.finditer(fs_root)) 

171 for m in matches: 

172 matched_any = True 

173 parent = m.parent_dir 

174 if parent is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 self._error( 

176 f"Cannot remove the root directory (triggered by {self._definition_source})" 

177 ) 

178 m.unlink(recursive=True) 

179 if not self._keep_empty_parent_dirs: 

180 parent.prune_if_empty_dir() 

181 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures 

182 if not matched_any: 

183 self._match_rule_had_no_matches(match_rule, self._definition_source) 

184 

185 

186class MoveTransformationRule(TransformationRule): 

187 __slots__ = ( 

188 "_match_rule", 

189 "_dest_path", 

190 "_dest_is_dir", 

191 "_definition_source", 

192 "_condition", 

193 ) 

194 

195 def __init__( 

196 self, 

197 match_rule: MatchRule, 

198 dest_path: str, 

199 dest_is_dir: bool, 

200 definition_source: AttributePath, 

201 condition: ManifestCondition | None, 

202 ) -> None: 

203 super().__init__() 

204 self._match_rule = match_rule 

205 self._dest_path = dest_path 

206 self._dest_is_dir = dest_is_dir 

207 self._definition_source = definition_source.path 

208 self._condition = condition 

209 

210 def transform_file_system( 

211 self, fs_root: FSPath, condition_context: ConditionContext 

212 ) -> None: 

213 if not self._evaluate_condition(self._condition, condition_context): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 return 

215 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la) 

216 matches = list(self._match_rule.finditer(fs_root)) 

217 if not matches: 

218 self._match_rule_had_no_matches(self._match_rule, self._definition_source) 

219 

220 target_dir: VirtualPath | None 

221 if self._dest_is_dir: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 target_dir = self._ensure_is_directory( 

223 fs_root, 

224 self._dest_path, 

225 self._definition_source, 

226 ) 

227 else: 

228 dir_part, basename = os.path.split(self._dest_path) 

229 target_parent_dir = self._ensure_is_directory( 

230 fs_root, 

231 dir_part, 

232 self._definition_source, 

233 ) 

234 target_dir = target_parent_dir.get(basename) 

235 

236 if target_dir is None or not target_dir.is_dir: 236 ↛ 256line 236 didn't jump to line 256 because the condition on line 236 was always true

237 if len(matches) > 1: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 self._error( 

239 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

240 f" (from: {self._definition_source}). Multiple paths matched the pattern and the" 

241 " destination was not a directory. Either correct the pattern to only match only source" 

242 " OR define the destination to be a directory (E.g., add a trailing slash - example:" 

243 f' "{self._dest_path}/")' 

244 ) 

245 p = matches[0] 

246 if p.path == self._dest_path: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 self._error( 

248 f"Error in {self._definition_source}, the source" 

249 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the" 

250 " rename redundant!?" 

251 ) 

252 p.parent_dir = target_parent_dir 

253 p.name = basename 

254 return 

255 

256 assert target_dir is not None and target_dir.is_dir 

257 basenames: dict[str, VirtualPath] = dict() 

258 target_dir_path = target_dir.path 

259 

260 for m in matches: 

261 if m.path == target_dir_path: 

262 self._error( 

263 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}" 

264 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into" 

265 " itself" 

266 ) 

267 if m.name in basenames: 

268 alt_path = basenames[m.name] 

269 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be 

270 # possible for a *single* glob to match the same path twice. 

271 assert alt_path is not m 

272 self._error( 

273 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

274 f" (from: {self._definition_source}). Multiple paths matched the pattern had the" 

275 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the' 

276 f" pattern, so it only matches one path with that basename to avoid this conflict." 

277 ) 

278 existing = m.get(m.name) 

279 if existing and existing.is_dir: 

280 self._error( 

281 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

282 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace" 

283 f" the existing directory {existing.path}. If this replacement is intentional, then please" 

284 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)' 

285 ) 

286 basenames[m.name] = m 

287 m.parent_dir = target_dir 

288 

289 

290class CreateSymlinkPathTransformationRule(TransformationRule): 

291 __slots__ = ( 

292 "_link_dest", 

293 "_link_target", 

294 "_replacement_rule", 

295 "_definition_source", 

296 "_condition", 

297 ) 

298 

299 def __init__( 

300 self, 

301 link_target: str, 

302 link_dest: str, 

303 replacement_rule: CreateSymlinkReplacementRule, 

304 definition_source: AttributePath, 

305 condition: ManifestCondition | None, 

306 ) -> None: 

307 super().__init__() 

308 self._link_target = link_target 

309 self._link_dest = link_dest 

310 self._replacement_rule = replacement_rule 

311 self._definition_source = definition_source.path 

312 self._condition = condition 

313 

314 def transform_file_system( 

315 self, 

316 fs_root: FSPath, 

317 condition_context: ConditionContext, 

318 ) -> None: 

319 if not self._evaluate_condition(self._condition, condition_context): 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 return 

321 dir_path_part, link_name = os.path.split(self._link_dest) 

322 dir_path = self._ensure_is_directory( 

323 fs_root, 

324 dir_path_part, 

325 self._definition_source, 

326 ) 

327 existing = dir_path.get(link_name) 

328 if existing: 

329 self._handle_existing_path(existing) 

330 dir_path.add_symlink(link_name, self._link_target) 

331 

332 def _handle_existing_path(self, existing: VirtualPath) -> None: 

333 replacement_rule = self._replacement_rule 

334 if replacement_rule == "abort-on-non-empty-directory": 

335 unlink = not existing.is_dir or not any(existing.iterdir) 

336 reason = "the path is a non-empty directory" 

337 elif replacement_rule == "discard-existing": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 unlink = True 

339 reason = "<<internal error: you should not see an error with this message>>" 

340 elif replacement_rule == "error-if-directory": 

341 unlink = not existing.is_dir 

342 reason = "the path is a directory" 

343 else: 

344 assert replacement_rule == "error-if-exists" 

345 unlink = False 

346 reason = "the path exists" 

347 

348 if unlink: 

349 existing.unlink(recursive=True) 

350 else: 

351 self._error( 

352 f"Refusing to replace {existing.path} with a symlink; {reason} and" 

353 f" the active replacement-rule was {self._replacement_rule}. You can" 

354 f' set the replacement-rule to "discard-existing", if you are not interested' 

355 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}." 

356 ) 

357 

358 

359class CreateDirectoryTransformationRule(TransformationRule): 

360 __slots__ = ( 

361 "_directories", 

362 "_owner", 

363 "_group", 

364 "_mode", 

365 "_definition_source", 

366 "_condition", 

367 ) 

368 

369 def __init__( 

370 self, 

371 directories: Sequence[str], 

372 owner: StaticFileSystemOwner | None, 

373 group: StaticFileSystemGroup | None, 

374 mode: FileSystemMode | None, 

375 definition_source: str, 

376 condition: ManifestCondition | None, 

377 ) -> None: 

378 super().__init__() 

379 self._directories = directories 

380 self._owner = owner 

381 self._group = group 

382 self._mode = mode 

383 self._definition_source = definition_source 

384 self._condition = condition 

385 

386 def transform_file_system( 

387 self, 

388 fs_root: FSPath, 

389 condition_context: ConditionContext, 

390 ) -> None: 

391 if not self._evaluate_condition(self._condition, condition_context): 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 return 

393 owner = self._owner 

394 group = self._group 

395 mode = self._mode 

396 for directory in self._directories: 

397 dir_path = self._ensure_is_directory( 

398 fs_root, 

399 directory, 

400 self._definition_source, 

401 ) 

402 

403 if mode is not None: 

404 try: 

405 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir) 

406 except ValueError as e: 

407 self._error( 

408 f"Could not compute desired mode for {dir_path.path} as" 

409 f" requested in {self._definition_source}: {e.args[0]}", 

410 caused_by=e, 

411 ) 

412 dir_path.mode = desired_mode 

413 dir_path.chown(owner, group) 

414 

415 

416def _apply_owner_and_mode( 

417 path: VirtualPath, 

418 owner: StaticFileSystemOwner | None, 

419 group: StaticFileSystemGroup | None, 

420 mode: FileSystemMode | None, 

421 capabilities: str | None, 

422 capability_mode: FileSystemMode | None, 

423 definition_source: str, 

424) -> None: 

425 if owner is not None or group is not None: 425 ↛ 427line 425 didn't jump to line 427 because the condition on line 425 was always true

426 path.chown(owner, group) 

427 if mode is not None: 427 ↛ 437line 427 didn't jump to line 437 because the condition on line 427 was always true

428 try: 

429 desired_mode = mode.compute_mode(path.mode, path.is_dir) 

430 except ValueError as e: 

431 raise TransformationRuntimeError( 

432 f"Could not compute desired mode for {path.path} as" 

433 f" requested in {definition_source}: {e.args[0]}" 

434 ) from e 

435 path.mode = desired_mode 

436 

437 if path.is_file and capabilities is not None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 cap_ref = path.metadata(DebputyCapability) 

439 cap_value = cap_ref.value 

440 if cap_value is not None: 

441 _warn( 

442 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due" 

443 f" to {definition_source}." 

444 ) 

445 assert capability_mode is not None 

446 cap_ref.value = DebputyCapability( 

447 capabilities, 

448 capability_mode, 

449 definition_source, 

450 ) 

451 

452 

453class PathMetadataTransformationRule(TransformationRule): 

454 __slots__ = ( 

455 "_match_rules", 

456 "_owner", 

457 "_group", 

458 "_mode", 

459 "_capabilities", 

460 "_capability_mode", 

461 "_recursive", 

462 "_definition_source", 

463 "_condition", 

464 ) 

465 

466 def __init__( 

467 self, 

468 match_rules: Sequence[MatchRule], 

469 owner: StaticFileSystemOwner | None, 

470 group: StaticFileSystemGroup | None, 

471 mode: FileSystemMode | None, 

472 recursive: bool, 

473 capabilities: str | None, 

474 capability_mode: FileSystemMode | None, 

475 definition_source: str, 

476 condition: ManifestCondition | None, 

477 ) -> None: 

478 super().__init__() 

479 self._match_rules = match_rules 

480 self._owner = owner 

481 self._group = group 

482 self._mode = mode 

483 self._capabilities = capabilities 

484 self._capability_mode = capability_mode 

485 self._recursive = recursive 

486 self._definition_source = definition_source 

487 self._condition = condition 

488 if self._capabilities is None and self._capability_mode is not None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 raise ValueError("capability_mode without capabilities") 

490 if self._capabilities is not None and self._capability_mode is None: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true

491 raise ValueError("capabilities without capability_mode") 

492 

493 def transform_file_system( 

494 self, 

495 fs_root: FSPath, 

496 condition_context: ConditionContext, 

497 ) -> None: 

498 if not self._evaluate_condition(self._condition, condition_context): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 return 

500 owner = self._owner 

501 group = self._group 

502 mode = self._mode 

503 capabilities = self._capabilities 

504 capability_mode = self._capability_mode 

505 definition_source = self._definition_source 

506 d: list[FSPath] | None = [] if self._recursive else None 

507 needs_file_match = True 

508 if self._owner is not None or self._group is not None or self._mode is not None: 508 ↛ 511line 508 didn't jump to line 511 because the condition on line 508 was always true

509 needs_file_match = False 

510 

511 for match_rule in self._match_rules: 

512 match_ok = False 

513 saw_symlink = False 

514 saw_directory = False 

515 

516 for path in match_rule.finditer(fs_root): 

517 if path.is_symlink: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 saw_symlink = True 

519 continue 

520 if path.is_file or not needs_file_match: 520 ↛ 522line 520 didn't jump to line 522 because the condition on line 520 was always true

521 match_ok = True 

522 if path.is_dir: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 saw_directory = True 

524 if not match_ok and needs_file_match and self._recursive: 

525 match_ok = any(p.is_file for p in path.all_paths()) 

526 _apply_owner_and_mode( 

527 path, 

528 owner, 

529 group, 

530 mode, 

531 capabilities, 

532 capability_mode, 

533 definition_source, 

534 ) 

535 if path.is_dir and d is not None: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 d.append(path) 

537 

538 if not match_ok: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true

539 if needs_file_match and (saw_directory or saw_symlink): 

540 _warn( 

541 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

542 " did not match any files, but given the attributes it can only apply to files." 

543 ) 

544 elif saw_symlink: 

545 _warn( 

546 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

547 ' matched symlinks, but "path-metadata" cannot apply to symlinks.' 

548 ) 

549 self._match_rule_had_no_matches(match_rule, self._definition_source) 

550 

551 if not d: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was always true

552 return 

553 for recurse_dir in d: 

554 for path in recurse_dir.all_paths(): 

555 if path.is_symlink: 

556 continue 

557 _apply_owner_and_mode( 

558 path, 

559 owner, 

560 group, 

561 mode, 

562 capabilities, 

563 capability_mode, 

564 definition_source, 

565 ) 

566 

567 

568class ModeNormalizationTransformationRule(TransformationRule): 

569 __slots__ = ("_normalizations",) 

570 

571 def __init__( 

572 self, 

573 normalizations: Sequence[tuple[MatchRule, FileSystemMode]], 

574 ) -> None: 

575 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

576 # be a "TransformationRule" (hindsight and all) 

577 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

578 self._normalizations = normalizations 

579 

580 def transform_file_system( 

581 self, 

582 fs_root: FSPath, 

583 condition_context: ConditionContext, 

584 ) -> None: 

585 seen = set() 

586 for match_rule, fs_mode in self._normalizations: 

587 for path in match_rule.finditer( 

588 fs_root, ignore_paths=lambda p: p.path in seen 

589 ): 

590 if path.is_symlink or path.path in seen: 

591 continue 

592 seen.add(path.path) 

593 try: 

594 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir) 

595 except ValueError as e: 

596 raise AssertionError( 

597 "Error while applying built-in mode normalization rule" 

598 ) from e 

599 path.mode = desired_mode 

600 

601 

602class NormalizeShebangLineTransformation(TransformationRule): 

603 

604 def __init__(self) -> None: 

605 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

606 # be a "TransformationRule" (hindsight and all) 

607 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

608 

609 def transform_file_system( 

610 self, 

611 fs_root: VirtualPath, 

612 condition_context: ConditionContext, 

613 ) -> None: 

614 for path in fs_root.all_paths(): 

615 if not path.is_file: 

616 continue 

617 try: 

618 with path.open(byte_io=True, buffering=4096) as fd: 

619 interpreter = extract_shebang_interpreter_from_file(fd) 

620 except (PureVirtualPathError, TestPathWithNonExistentFSPathError): 

621 # Do not make tests unnecessarily complex to write 

622 continue 

623 if interpreter is None: 

624 continue 

625 

626 if interpreter.fixup_needed: 

627 interpreter.replace_shebang_line(path)