Coverage for src/debputy/transformation_rules.py: 74%

277 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-28 21:56 +0000

1import os 

2from typing import ( 

3 NoReturn, 

4 Literal, 

5 TypeVar, 

6 cast, 

7 final, 

8) 

9from collections.abc import Sequence 

10 

11from debputy.exceptions import ( 

12 DebputyRuntimeError, 

13 PureVirtualPathError, 

14 TestPathWithNonExistentFSPathError, 

15) 

16from debputy.filesystem_scan import InMemoryVirtualPathBase 

17from debputy.interpreter import ( 

18 extract_shebang_interpreter_from_file, 

19) 

20from debputy.manifest_conditions import ConditionContext, ManifestCondition 

21from debputy.manifest_parser.base_types import ( 

22 FileSystemMode, 

23 StaticFileSystemOwner, 

24 StaticFileSystemGroup, 

25) 

26from debputy.manifest_parser.tagging_types import DebputyDispatchableType 

27from debputy.manifest_parser.util import AttributePath 

28from debputy.path_matcher import MatchRule 

29from debputy.plugin.api import VirtualPath 

30from debputy.plugins.debputy.types import DebputyCapability 

31from debputy.plugin.plugin_state import ( 

32 run_in_context_of_plugin_wrap_errors, 

33) 

34from debputy.util import _warn 

35 

36 

37class TransformationRuntimeError(DebputyRuntimeError): 

38 pass 

39 

40 

41CreateSymlinkReplacementRule = Literal[ 

42 "error-if-exists", 

43 "error-if-directory", 

44 "abort-on-non-empty-directory", 

45 "discard-existing", 

46] 

47 

48 

49VP = TypeVar("VP", bound=VirtualPath) 

50 

51 

52class TransformationRule(DebputyDispatchableType): 

53 

54 __slots__ = () 

55 

56 @final 

57 def run_transform_file_system( 

58 self, 

59 fs_root: InMemoryVirtualPathBase, 

60 condition_context: ConditionContext, 

61 ) -> None: 

62 run_in_context_of_plugin_wrap_errors( 

63 self._debputy_plugin, 

64 self.transform_file_system, 

65 fs_root, 

66 condition_context, 

67 ) 

68 

69 def transform_file_system( 

70 self, 

71 fs_root: InMemoryVirtualPathBase, 

72 condition_context: ConditionContext, 

73 ) -> None: 

74 raise NotImplementedError 

75 

76 def _evaluate_condition( 

77 self, 

78 condition: ManifestCondition | None, 

79 condition_context: ConditionContext, 

80 result_if_condition_is_missing: bool = True, 

81 ) -> bool: 

82 if condition is None: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true

83 return result_if_condition_is_missing 

84 return condition.evaluate(condition_context) 

85 

86 def _error( 

87 self, 

88 msg: str, 

89 *, 

90 caused_by: BaseException | None = None, 

91 ) -> NoReturn: 

92 raise TransformationRuntimeError(msg) from caused_by 

93 

94 def _match_rule_had_no_matches( 

95 self, match_rule: MatchRule, definition_source: str 

96 ) -> NoReturn: 

97 self._error( 

98 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did' 

99 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is" 

100 " incorrect." 

101 ) 

102 

103 def _fs_path_as_dir( 

104 self, 

105 path: VP, 

106 definition_source: str, 

107 ) -> VP: 

108 if path.is_dir: 108 ↛ 110line 108 didn't jump to line 110 because the condition on line 108 was always true

109 return path 

110 path_type = "file" if path.is_file else 'symlink/"special file system object"' 

111 self._error( 

112 f"The path {path.path} was expected to be a directory (or non-existing) due to" 

113 f" {definition_source}. However that path existed and is a {path_type}." 

114 f" You may need a `remove: {path.path}` prior to {definition_source} to" 

115 " to make this transformation succeed." 

116 ) 

117 

118 def _ensure_is_directory( 

119 self, 

120 fs_root: InMemoryVirtualPathBase, 

121 path_to_directory: str, 

122 definition_source: str, 

123 ) -> InMemoryVirtualPathBase: 

124 current, missing_parts = fs_root.attempt_lookup(path_to_directory) 

125 current = self._fs_path_as_dir( 

126 cast("InMemoryVirtualPathBase", current), definition_source 

127 ) 

128 if missing_parts: 

129 return current.mkdirs("/".join(missing_parts)) 

130 return current 

131 

132 

133class RemoveTransformationRule(TransformationRule): 

134 __slots__ = ( 

135 "_match_rules", 

136 "_keep_empty_parent_dirs", 

137 "_definition_source", 

138 ) 

139 

140 def __init__( 

141 self, 

142 match_rules: Sequence[MatchRule], 

143 keep_empty_parent_dirs: bool, 

144 definition_source: AttributePath, 

145 ) -> None: 

146 super().__init__() 

147 self._match_rules = match_rules 

148 self._keep_empty_parent_dirs = keep_empty_parent_dirs 

149 self._definition_source = definition_source.path 

150 

151 def transform_file_system( 

152 self, 

153 fs_root: InMemoryVirtualPathBase, 

154 condition_context: ConditionContext, 

155 ) -> None: 

156 matched_any = False 

157 for match_rule in self._match_rules: 

158 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a 

159 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52 

160 matches = list(match_rule.finditer(fs_root)) 

161 for m in matches: 

162 matched_any = True 

163 parent = m.parent_dir 

164 if parent is None: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 self._error( 

166 f"Cannot remove the root directory (triggered by {self._definition_source})" 

167 ) 

168 m.unlink(recursive=True) 

169 if not self._keep_empty_parent_dirs: 

170 parent.prune_if_empty_dir() 

171 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures 

172 if not matched_any: 

173 self._match_rule_had_no_matches(match_rule, self._definition_source) 

174 

175 

176class MoveTransformationRule(TransformationRule): 

177 __slots__ = ( 

178 "_match_rule", 

179 "_dest_path", 

180 "_dest_is_dir", 

181 "_definition_source", 

182 "_condition", 

183 ) 

184 

185 def __init__( 

186 self, 

187 match_rule: MatchRule, 

188 dest_path: str, 

189 dest_is_dir: bool, 

190 definition_source: AttributePath, 

191 condition: ManifestCondition | None, 

192 ) -> None: 

193 super().__init__() 

194 self._match_rule = match_rule 

195 self._dest_path = dest_path 

196 self._dest_is_dir = dest_is_dir 

197 self._definition_source = definition_source.path 

198 self._condition = condition 

199 

200 def transform_file_system( 

201 self, fs_root: InMemoryVirtualPathBase, condition_context: ConditionContext 

202 ) -> None: 

203 if not self._evaluate_condition(self._condition, condition_context): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 return 

205 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la) 

206 matches = list(self._match_rule.finditer(fs_root)) 

207 if not matches: 

208 self._match_rule_had_no_matches(self._match_rule, self._definition_source) 

209 

210 target_dir: VirtualPath | None 

211 if self._dest_is_dir: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 target_dir = self._ensure_is_directory( 

213 fs_root, 

214 self._dest_path, 

215 self._definition_source, 

216 ) 

217 else: 

218 dir_part, basename = os.path.split(self._dest_path) 

219 target_parent_dir = self._ensure_is_directory( 

220 fs_root, 

221 dir_part, 

222 self._definition_source, 

223 ) 

224 target_dir = target_parent_dir.get(basename) 

225 

226 if target_dir is None or not target_dir.is_dir: 226 ↛ 246line 226 didn't jump to line 246 because the condition on line 226 was always true

227 if len(matches) > 1: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 self._error( 

229 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

230 f" (from: {self._definition_source}). Multiple paths matched the pattern and the" 

231 " destination was not a directory. Either correct the pattern to only match only source" 

232 " OR define the destination to be a directory (E.g., add a trailing slash - example:" 

233 f' "{self._dest_path}/")' 

234 ) 

235 p = matches[0] 

236 if p.path == self._dest_path: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 self._error( 

238 f"Error in {self._definition_source}, the source" 

239 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the" 

240 " rename redundant!?" 

241 ) 

242 p.parent_dir = target_parent_dir 

243 p.name = basename 

244 return 

245 

246 assert target_dir is not None and target_dir.is_dir 

247 basenames: dict[str, VirtualPath] = dict() 

248 target_dir_path = target_dir.path 

249 

250 for m in matches: 

251 if m.path == target_dir_path: 

252 self._error( 

253 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}" 

254 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into" 

255 " itself" 

256 ) 

257 if m.name in basenames: 

258 alt_path = basenames[m.name] 

259 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be 

260 # possible for a *single* glob to match the same path twice. 

261 assert alt_path is not m 

262 self._error( 

263 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

264 f" (from: {self._definition_source}). Multiple paths matched the pattern had the" 

265 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the' 

266 f" pattern, so it only matches one path with that basename to avoid this conflict." 

267 ) 

268 existing = m.get(m.name) 

269 if existing and existing.is_dir: 

270 self._error( 

271 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}" 

272 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace" 

273 f" the existing directory {existing.path}. If this replacement is intentional, then please" 

274 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)' 

275 ) 

276 basenames[m.name] = m 

277 m.parent_dir = target_dir 

278 

279 

280class CreateSymlinkPathTransformationRule(TransformationRule): 

281 __slots__ = ( 

282 "_link_dest", 

283 "_link_target", 

284 "_replacement_rule", 

285 "_definition_source", 

286 "_condition", 

287 ) 

288 

289 def __init__( 

290 self, 

291 link_target: str, 

292 link_dest: str, 

293 replacement_rule: CreateSymlinkReplacementRule, 

294 definition_source: AttributePath, 

295 condition: ManifestCondition | None, 

296 ) -> None: 

297 super().__init__() 

298 self._link_target = link_target 

299 self._link_dest = link_dest 

300 self._replacement_rule = replacement_rule 

301 self._definition_source = definition_source.path 

302 self._condition = condition 

303 

304 def transform_file_system( 

305 self, 

306 fs_root: InMemoryVirtualPathBase, 

307 condition_context: ConditionContext, 

308 ) -> None: 

309 if not self._evaluate_condition(self._condition, condition_context): 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 return 

311 dir_path_part, link_name = os.path.split(self._link_dest) 

312 dir_path = self._ensure_is_directory( 

313 fs_root, 

314 dir_path_part, 

315 self._definition_source, 

316 ) 

317 existing = dir_path.get(link_name) 

318 if existing: 

319 self._handle_existing_path(existing) 

320 dir_path.add_symlink(link_name, self._link_target) 

321 

322 def _handle_existing_path(self, existing: VirtualPath) -> None: 

323 replacement_rule = self._replacement_rule 

324 if replacement_rule == "abort-on-non-empty-directory": 

325 unlink = not existing.is_dir or not any(existing.iterdir()) 

326 reason = "the path is a non-empty directory" 

327 elif replacement_rule == "discard-existing": 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 unlink = True 

329 reason = "<<internal error: you should not see an error with this message>>" 

330 elif replacement_rule == "error-if-directory": 

331 unlink = not existing.is_dir 

332 reason = "the path is a directory" 

333 else: 

334 assert replacement_rule == "error-if-exists" 

335 unlink = False 

336 reason = "the path exists" 

337 

338 if unlink: 

339 existing.unlink(recursive=True) 

340 else: 

341 self._error( 

342 f"Refusing to replace {existing.path} with a symlink; {reason} and" 

343 f" the active replacement-rule was {self._replacement_rule}. You can" 

344 f' set the replacement-rule to "discard-existing", if you are not interested' 

345 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}." 

346 ) 

347 

348 

349class CreateDirectoryTransformationRule(TransformationRule): 

350 __slots__ = ( 

351 "_directories", 

352 "_owner", 

353 "_group", 

354 "_mode", 

355 "_definition_source", 

356 "_condition", 

357 ) 

358 

359 def __init__( 

360 self, 

361 directories: Sequence[str], 

362 owner: StaticFileSystemOwner | None, 

363 group: StaticFileSystemGroup | None, 

364 mode: FileSystemMode | None, 

365 definition_source: str, 

366 condition: ManifestCondition | None, 

367 ) -> None: 

368 super().__init__() 

369 self._directories = directories 

370 self._owner = owner 

371 self._group = group 

372 self._mode = mode 

373 self._definition_source = definition_source 

374 self._condition = condition 

375 

376 def transform_file_system( 

377 self, 

378 fs_root: InMemoryVirtualPathBase, 

379 condition_context: ConditionContext, 

380 ) -> None: 

381 if not self._evaluate_condition(self._condition, condition_context): 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true

382 return 

383 owner = self._owner 

384 group = self._group 

385 mode = self._mode 

386 for directory in self._directories: 

387 dir_path = self._ensure_is_directory( 

388 fs_root, 

389 directory, 

390 self._definition_source, 

391 ) 

392 

393 if mode is not None: 

394 try: 

395 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir) 

396 except ValueError as e: 

397 self._error( 

398 f"Could not compute desired mode for {dir_path.path} as" 

399 f" requested in {self._definition_source}: {e.args[0]}", 

400 caused_by=e, 

401 ) 

402 dir_path.mode = desired_mode 

403 dir_path.chown(owner, group) 

404 

405 

406def _apply_owner_and_mode( 

407 path: VirtualPath, 

408 owner: StaticFileSystemOwner | None, 

409 group: StaticFileSystemGroup | None, 

410 mode: FileSystemMode | None, 

411 capabilities: str | None, 

412 capability_mode: FileSystemMode | None, 

413 definition_source: str, 

414) -> None: 

415 if owner is not None or group is not None: 415 ↛ 417line 415 didn't jump to line 417 because the condition on line 415 was always true

416 path.chown(owner, group) 

417 if mode is not None: 417 ↛ 427line 417 didn't jump to line 427 because the condition on line 417 was always true

418 try: 

419 desired_mode = mode.compute_mode(path.mode, path.is_dir) 

420 except ValueError as e: 

421 raise TransformationRuntimeError( 

422 f"Could not compute desired mode for {path.path} as" 

423 f" requested in {definition_source}: {e.args[0]}" 

424 ) from e 

425 path.mode = desired_mode 

426 

427 if path.is_file and capabilities is not None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 cap_ref = path.metadata(DebputyCapability) 

429 cap_value = cap_ref.value 

430 if cap_value is not None: 

431 _warn( 

432 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due" 

433 f" to {definition_source}." 

434 ) 

435 assert capability_mode is not None 

436 cap_ref.value = DebputyCapability( 

437 capabilities, 

438 capability_mode, 

439 definition_source, 

440 ) 

441 

442 

443class PathMetadataTransformationRule(TransformationRule): 

444 __slots__ = ( 

445 "_match_rules", 

446 "_owner", 

447 "_group", 

448 "_mode", 

449 "_capabilities", 

450 "_capability_mode", 

451 "_recursive", 

452 "_definition_source", 

453 "_condition", 

454 ) 

455 

456 def __init__( 

457 self, 

458 match_rules: Sequence[MatchRule], 

459 owner: StaticFileSystemOwner | None, 

460 group: StaticFileSystemGroup | None, 

461 mode: FileSystemMode | None, 

462 recursive: bool, 

463 capabilities: str | None, 

464 capability_mode: FileSystemMode | None, 

465 definition_source: str, 

466 condition: ManifestCondition | None, 

467 ) -> None: 

468 super().__init__() 

469 self._match_rules = match_rules 

470 self._owner = owner 

471 self._group = group 

472 self._mode = mode 

473 self._capabilities = capabilities 

474 self._capability_mode = capability_mode 

475 self._recursive = recursive 

476 self._definition_source = definition_source 

477 self._condition = condition 

478 if self._capabilities is None and self._capability_mode is not None: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 raise ValueError("capability_mode without capabilities") 

480 if self._capabilities is not None and self._capability_mode is None: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 raise ValueError("capabilities without capability_mode") 

482 

483 def transform_file_system( 

484 self, 

485 fs_root: InMemoryVirtualPathBase, 

486 condition_context: ConditionContext, 

487 ) -> None: 

488 if not self._evaluate_condition(self._condition, condition_context): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 return 

490 owner = self._owner 

491 group = self._group 

492 mode = self._mode 

493 capabilities = self._capabilities 

494 capability_mode = self._capability_mode 

495 definition_source = self._definition_source 

496 d: list[InMemoryVirtualPathBase] | None = [] if self._recursive else None 

497 needs_file_match = True 

498 if self._owner is not None or self._group is not None or self._mode is not None: 498 ↛ 501line 498 didn't jump to line 501 because the condition on line 498 was always true

499 needs_file_match = False 

500 

501 for match_rule in self._match_rules: 

502 match_ok = False 

503 saw_symlink = False 

504 saw_directory = False 

505 

506 for path in match_rule.finditer(fs_root): 

507 if path.is_symlink: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true

508 saw_symlink = True 

509 continue 

510 if path.is_file or not needs_file_match: 510 ↛ 512line 510 didn't jump to line 512 because the condition on line 510 was always true

511 match_ok = True 

512 if path.is_dir: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 saw_directory = True 

514 if not match_ok and needs_file_match and self._recursive: 

515 match_ok = any(p.is_file for p in path.all_paths()) 

516 _apply_owner_and_mode( 

517 path, 

518 owner, 

519 group, 

520 mode, 

521 capabilities, 

522 capability_mode, 

523 definition_source, 

524 ) 

525 if path.is_dir and d is not None: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 d.append(path) 

527 

528 if not match_ok: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 if needs_file_match and (saw_directory or saw_symlink): 

530 _warn( 

531 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

532 " did not match any files, but given the attributes it can only apply to files." 

533 ) 

534 elif saw_symlink: 

535 _warn( 

536 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})" 

537 ' matched symlinks, but "path-metadata" cannot apply to symlinks.' 

538 ) 

539 self._match_rule_had_no_matches(match_rule, self._definition_source) 

540 

541 if not d: 541 ↛ 543line 541 didn't jump to line 543 because the condition on line 541 was always true

542 return 

543 for recurse_dir in d: 

544 for path in recurse_dir.all_paths(): 

545 if path.is_symlink: 

546 continue 

547 _apply_owner_and_mode( 

548 path, 

549 owner, 

550 group, 

551 mode, 

552 capabilities, 

553 capability_mode, 

554 definition_source, 

555 ) 

556 

557 

558class ModeNormalizationTransformationRule(TransformationRule): 

559 __slots__ = ("_normalizations",) 

560 

561 def __init__( 

562 self, 

563 normalizations: Sequence[tuple[MatchRule, FileSystemMode]], 

564 ) -> None: 

565 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

566 # be a "TransformationRule" (hindsight and all) 

567 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

568 self._normalizations = normalizations 

569 

570 def transform_file_system( 

571 self, 

572 fs_root: InMemoryVirtualPathBase, 

573 condition_context: ConditionContext, 

574 ) -> None: 

575 seen = set() 

576 for match_rule, fs_mode in self._normalizations: 

577 for path in match_rule.finditer( 

578 fs_root, ignore_paths=lambda p: p.path in seen 

579 ): 

580 if path.is_symlink or path.path in seen: 

581 continue 

582 seen.add(path.path) 

583 try: 

584 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir) 

585 except ValueError as e: 

586 raise AssertionError( 

587 "Error while applying built-in mode normalization rule" 

588 ) from e 

589 path.mode = desired_mode 

590 

591 

592class NormalizeShebangLineTransformation(TransformationRule): 

593 

594 def __init__(self) -> None: 

595 # A bit of a hack since it is initialized outside `debputy`. It probably should not 

596 # be a "TransformationRule" (hindsight and all) 

597 run_in_context_of_plugin_wrap_errors("debputy", super().__init__) 

598 

599 def transform_file_system( 

600 self, 

601 fs_root: VirtualPath, 

602 condition_context: ConditionContext, 

603 ) -> None: 

604 for path in fs_root.all_paths(): 

605 if not path.is_file: 

606 continue 

607 try: 

608 with path.open(byte_io=True, buffering=4096) as fd: 

609 interpreter = extract_shebang_interpreter_from_file(fd) 

610 except (PureVirtualPathError, TestPathWithNonExistentFSPathError): 

611 # Do not make tests unnecessarily complex to write 

612 continue 

613 if interpreter is None: 

614 continue 

615 

616 if interpreter.fixup_needed: 

617 interpreter.replace_shebang_line(path)