Coverage for src/debputy/transformation_rules.py: 74%
277 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
1import os
2from typing import (
3 NoReturn,
4 Literal,
5 TypeVar,
6 cast,
7 final,
8)
9from collections.abc import Sequence
11from debputy.exceptions import (
12 DebputyRuntimeError,
13 PureVirtualPathError,
14 TestPathWithNonExistentFSPathError,
15)
16from debputy.filesystem_scan import InMemoryVirtualPathBase
17from debputy.interpreter import (
18 extract_shebang_interpreter_from_file,
19)
20from debputy.manifest_conditions import ConditionContext, ManifestCondition
21from debputy.manifest_parser.base_types import (
22 FileSystemMode,
23 StaticFileSystemOwner,
24 StaticFileSystemGroup,
25)
26from debputy.manifest_parser.tagging_types import DebputyDispatchableType
27from debputy.manifest_parser.util import AttributePath
28from debputy.path_matcher import MatchRule
29from debputy.plugin.api import VirtualPath
30from debputy.plugins.debputy.types import DebputyCapability
31from debputy.plugin.plugin_state import (
32 run_in_context_of_plugin_wrap_errors,
33)
34from debputy.util import _warn
37class TransformationRuntimeError(DebputyRuntimeError):
38 pass
41CreateSymlinkReplacementRule = Literal[
42 "error-if-exists",
43 "error-if-directory",
44 "abort-on-non-empty-directory",
45 "discard-existing",
46]
49VP = TypeVar("VP", bound=VirtualPath)
52class TransformationRule(DebputyDispatchableType):
54 __slots__ = ()
56 @final
57 def run_transform_file_system(
58 self,
59 fs_root: InMemoryVirtualPathBase,
60 condition_context: ConditionContext,
61 ) -> None:
62 run_in_context_of_plugin_wrap_errors(
63 self._debputy_plugin,
64 self.transform_file_system,
65 fs_root,
66 condition_context,
67 )
69 def transform_file_system(
70 self,
71 fs_root: InMemoryVirtualPathBase,
72 condition_context: ConditionContext,
73 ) -> None:
74 raise NotImplementedError
76 def _evaluate_condition(
77 self,
78 condition: ManifestCondition | None,
79 condition_context: ConditionContext,
80 result_if_condition_is_missing: bool = True,
81 ) -> bool:
82 if condition is None: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true
83 return result_if_condition_is_missing
84 return condition.evaluate(condition_context)
86 def _error(
87 self,
88 msg: str,
89 *,
90 caused_by: BaseException | None = None,
91 ) -> NoReturn:
92 raise TransformationRuntimeError(msg) from caused_by
94 def _match_rule_had_no_matches(
95 self, match_rule: MatchRule, definition_source: str
96 ) -> NoReturn:
97 self._error(
98 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did'
99 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is"
100 " incorrect."
101 )
103 def _fs_path_as_dir(
104 self,
105 path: VP,
106 definition_source: str,
107 ) -> VP:
108 if path.is_dir: 108 ↛ 110line 108 didn't jump to line 110 because the condition on line 108 was always true
109 return path
110 path_type = "file" if path.is_file else 'symlink/"special file system object"'
111 self._error(
112 f"The path {path.path} was expected to be a directory (or non-existing) due to"
113 f" {definition_source}. However that path existed and is a {path_type}."
114 f" You may need a `remove: {path.path}` prior to {definition_source} to"
115 " to make this transformation succeed."
116 )
118 def _ensure_is_directory(
119 self,
120 fs_root: InMemoryVirtualPathBase,
121 path_to_directory: str,
122 definition_source: str,
123 ) -> InMemoryVirtualPathBase:
124 current, missing_parts = fs_root.attempt_lookup(path_to_directory)
125 current = self._fs_path_as_dir(
126 cast("InMemoryVirtualPathBase", current), definition_source
127 )
128 if missing_parts:
129 return current.mkdirs("/".join(missing_parts))
130 return current
133class RemoveTransformationRule(TransformationRule):
134 __slots__ = (
135 "_match_rules",
136 "_keep_empty_parent_dirs",
137 "_definition_source",
138 )
140 def __init__(
141 self,
142 match_rules: Sequence[MatchRule],
143 keep_empty_parent_dirs: bool,
144 definition_source: AttributePath,
145 ) -> None:
146 super().__init__()
147 self._match_rules = match_rules
148 self._keep_empty_parent_dirs = keep_empty_parent_dirs
149 self._definition_source = definition_source.path
151 def transform_file_system(
152 self,
153 fs_root: InMemoryVirtualPathBase,
154 condition_context: ConditionContext,
155 ) -> None:
156 matched_any = False
157 for match_rule in self._match_rules:
158 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a
159 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52
160 matches = list(match_rule.finditer(fs_root))
161 for m in matches:
162 matched_any = True
163 parent = m.parent_dir
164 if parent is None: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 self._error(
166 f"Cannot remove the root directory (triggered by {self._definition_source})"
167 )
168 m.unlink(recursive=True)
169 if not self._keep_empty_parent_dirs:
170 parent.prune_if_empty_dir()
171 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures
172 if not matched_any:
173 self._match_rule_had_no_matches(match_rule, self._definition_source)
176class MoveTransformationRule(TransformationRule):
177 __slots__ = (
178 "_match_rule",
179 "_dest_path",
180 "_dest_is_dir",
181 "_definition_source",
182 "_condition",
183 )
185 def __init__(
186 self,
187 match_rule: MatchRule,
188 dest_path: str,
189 dest_is_dir: bool,
190 definition_source: AttributePath,
191 condition: ManifestCondition | None,
192 ) -> None:
193 super().__init__()
194 self._match_rule = match_rule
195 self._dest_path = dest_path
196 self._dest_is_dir = dest_is_dir
197 self._definition_source = definition_source.path
198 self._condition = condition
200 def transform_file_system(
201 self, fs_root: InMemoryVirtualPathBase, condition_context: ConditionContext
202 ) -> None:
203 if not self._evaluate_condition(self._condition, condition_context): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 return
205 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la)
206 matches = list(self._match_rule.finditer(fs_root))
207 if not matches:
208 self._match_rule_had_no_matches(self._match_rule, self._definition_source)
210 target_dir: VirtualPath | None
211 if self._dest_is_dir: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 target_dir = self._ensure_is_directory(
213 fs_root,
214 self._dest_path,
215 self._definition_source,
216 )
217 else:
218 dir_part, basename = os.path.split(self._dest_path)
219 target_parent_dir = self._ensure_is_directory(
220 fs_root,
221 dir_part,
222 self._definition_source,
223 )
224 target_dir = target_parent_dir.get(basename)
226 if target_dir is None or not target_dir.is_dir: 226 ↛ 246line 226 didn't jump to line 246 because the condition on line 226 was always true
227 if len(matches) > 1: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 self._error(
229 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
230 f" (from: {self._definition_source}). Multiple paths matched the pattern and the"
231 " destination was not a directory. Either correct the pattern to only match only source"
232 " OR define the destination to be a directory (E.g., add a trailing slash - example:"
233 f' "{self._dest_path}/")'
234 )
235 p = matches[0]
236 if p.path == self._dest_path: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 self._error(
238 f"Error in {self._definition_source}, the source"
239 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the"
240 " rename redundant!?"
241 )
242 p.parent_dir = target_parent_dir
243 p.name = basename
244 return
246 assert target_dir is not None and target_dir.is_dir
247 basenames: dict[str, VirtualPath] = dict()
248 target_dir_path = target_dir.path
250 for m in matches:
251 if m.path == target_dir_path:
252 self._error(
253 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}"
254 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into"
255 " itself"
256 )
257 if m.name in basenames:
258 alt_path = basenames[m.name]
259 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be
260 # possible for a *single* glob to match the same path twice.
261 assert alt_path is not m
262 self._error(
263 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
264 f" (from: {self._definition_source}). Multiple paths matched the pattern had the"
265 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the'
266 f" pattern, so it only matches one path with that basename to avoid this conflict."
267 )
268 existing = m.get(m.name)
269 if existing and existing.is_dir:
270 self._error(
271 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
272 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace"
273 f" the existing directory {existing.path}. If this replacement is intentional, then please"
274 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)'
275 )
276 basenames[m.name] = m
277 m.parent_dir = target_dir
280class CreateSymlinkPathTransformationRule(TransformationRule):
281 __slots__ = (
282 "_link_dest",
283 "_link_target",
284 "_replacement_rule",
285 "_definition_source",
286 "_condition",
287 )
289 def __init__(
290 self,
291 link_target: str,
292 link_dest: str,
293 replacement_rule: CreateSymlinkReplacementRule,
294 definition_source: AttributePath,
295 condition: ManifestCondition | None,
296 ) -> None:
297 super().__init__()
298 self._link_target = link_target
299 self._link_dest = link_dest
300 self._replacement_rule = replacement_rule
301 self._definition_source = definition_source.path
302 self._condition = condition
304 def transform_file_system(
305 self,
306 fs_root: InMemoryVirtualPathBase,
307 condition_context: ConditionContext,
308 ) -> None:
309 if not self._evaluate_condition(self._condition, condition_context): 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 return
311 dir_path_part, link_name = os.path.split(self._link_dest)
312 dir_path = self._ensure_is_directory(
313 fs_root,
314 dir_path_part,
315 self._definition_source,
316 )
317 existing = dir_path.get(link_name)
318 if existing:
319 self._handle_existing_path(existing)
320 dir_path.add_symlink(link_name, self._link_target)
322 def _handle_existing_path(self, existing: VirtualPath) -> None:
323 replacement_rule = self._replacement_rule
324 if replacement_rule == "abort-on-non-empty-directory":
325 unlink = not existing.is_dir or not any(existing.iterdir())
326 reason = "the path is a non-empty directory"
327 elif replacement_rule == "discard-existing": 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 unlink = True
329 reason = "<<internal error: you should not see an error with this message>>"
330 elif replacement_rule == "error-if-directory":
331 unlink = not existing.is_dir
332 reason = "the path is a directory"
333 else:
334 assert replacement_rule == "error-if-exists"
335 unlink = False
336 reason = "the path exists"
338 if unlink:
339 existing.unlink(recursive=True)
340 else:
341 self._error(
342 f"Refusing to replace {existing.path} with a symlink; {reason} and"
343 f" the active replacement-rule was {self._replacement_rule}. You can"
344 f' set the replacement-rule to "discard-existing", if you are not interested'
345 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}."
346 )
349class CreateDirectoryTransformationRule(TransformationRule):
350 __slots__ = (
351 "_directories",
352 "_owner",
353 "_group",
354 "_mode",
355 "_definition_source",
356 "_condition",
357 )
359 def __init__(
360 self,
361 directories: Sequence[str],
362 owner: StaticFileSystemOwner | None,
363 group: StaticFileSystemGroup | None,
364 mode: FileSystemMode | None,
365 definition_source: str,
366 condition: ManifestCondition | None,
367 ) -> None:
368 super().__init__()
369 self._directories = directories
370 self._owner = owner
371 self._group = group
372 self._mode = mode
373 self._definition_source = definition_source
374 self._condition = condition
376 def transform_file_system(
377 self,
378 fs_root: InMemoryVirtualPathBase,
379 condition_context: ConditionContext,
380 ) -> None:
381 if not self._evaluate_condition(self._condition, condition_context): 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 return
383 owner = self._owner
384 group = self._group
385 mode = self._mode
386 for directory in self._directories:
387 dir_path = self._ensure_is_directory(
388 fs_root,
389 directory,
390 self._definition_source,
391 )
393 if mode is not None:
394 try:
395 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir)
396 except ValueError as e:
397 self._error(
398 f"Could not compute desired mode for {dir_path.path} as"
399 f" requested in {self._definition_source}: {e.args[0]}",
400 caused_by=e,
401 )
402 dir_path.mode = desired_mode
403 dir_path.chown(owner, group)
406def _apply_owner_and_mode(
407 path: VirtualPath,
408 owner: StaticFileSystemOwner | None,
409 group: StaticFileSystemGroup | None,
410 mode: FileSystemMode | None,
411 capabilities: str | None,
412 capability_mode: FileSystemMode | None,
413 definition_source: str,
414) -> None:
415 if owner is not None or group is not None: 415 ↛ 417line 415 didn't jump to line 417 because the condition on line 415 was always true
416 path.chown(owner, group)
417 if mode is not None: 417 ↛ 427line 417 didn't jump to line 427 because the condition on line 417 was always true
418 try:
419 desired_mode = mode.compute_mode(path.mode, path.is_dir)
420 except ValueError as e:
421 raise TransformationRuntimeError(
422 f"Could not compute desired mode for {path.path} as"
423 f" requested in {definition_source}: {e.args[0]}"
424 ) from e
425 path.mode = desired_mode
427 if path.is_file and capabilities is not None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 cap_ref = path.metadata(DebputyCapability)
429 cap_value = cap_ref.value
430 if cap_value is not None:
431 _warn(
432 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due"
433 f" to {definition_source}."
434 )
435 assert capability_mode is not None
436 cap_ref.value = DebputyCapability(
437 capabilities,
438 capability_mode,
439 definition_source,
440 )
443class PathMetadataTransformationRule(TransformationRule):
444 __slots__ = (
445 "_match_rules",
446 "_owner",
447 "_group",
448 "_mode",
449 "_capabilities",
450 "_capability_mode",
451 "_recursive",
452 "_definition_source",
453 "_condition",
454 )
456 def __init__(
457 self,
458 match_rules: Sequence[MatchRule],
459 owner: StaticFileSystemOwner | None,
460 group: StaticFileSystemGroup | None,
461 mode: FileSystemMode | None,
462 recursive: bool,
463 capabilities: str | None,
464 capability_mode: FileSystemMode | None,
465 definition_source: str,
466 condition: ManifestCondition | None,
467 ) -> None:
468 super().__init__()
469 self._match_rules = match_rules
470 self._owner = owner
471 self._group = group
472 self._mode = mode
473 self._capabilities = capabilities
474 self._capability_mode = capability_mode
475 self._recursive = recursive
476 self._definition_source = definition_source
477 self._condition = condition
478 if self._capabilities is None and self._capability_mode is not None: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 raise ValueError("capability_mode without capabilities")
480 if self._capabilities is not None and self._capability_mode is None: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 raise ValueError("capabilities without capability_mode")
483 def transform_file_system(
484 self,
485 fs_root: InMemoryVirtualPathBase,
486 condition_context: ConditionContext,
487 ) -> None:
488 if not self._evaluate_condition(self._condition, condition_context): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 return
490 owner = self._owner
491 group = self._group
492 mode = self._mode
493 capabilities = self._capabilities
494 capability_mode = self._capability_mode
495 definition_source = self._definition_source
496 d: list[InMemoryVirtualPathBase] | None = [] if self._recursive else None
497 needs_file_match = True
498 if self._owner is not None or self._group is not None or self._mode is not None: 498 ↛ 501line 498 didn't jump to line 501 because the condition on line 498 was always true
499 needs_file_match = False
501 for match_rule in self._match_rules:
502 match_ok = False
503 saw_symlink = False
504 saw_directory = False
506 for path in match_rule.finditer(fs_root):
507 if path.is_symlink: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true
508 saw_symlink = True
509 continue
510 if path.is_file or not needs_file_match: 510 ↛ 512line 510 didn't jump to line 512 because the condition on line 510 was always true
511 match_ok = True
512 if path.is_dir: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 saw_directory = True
514 if not match_ok and needs_file_match and self._recursive:
515 match_ok = any(p.is_file for p in path.all_paths())
516 _apply_owner_and_mode(
517 path,
518 owner,
519 group,
520 mode,
521 capabilities,
522 capability_mode,
523 definition_source,
524 )
525 if path.is_dir and d is not None: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 d.append(path)
528 if not match_ok: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 if needs_file_match and (saw_directory or saw_symlink):
530 _warn(
531 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
532 " did not match any files, but given the attributes it can only apply to files."
533 )
534 elif saw_symlink:
535 _warn(
536 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
537 ' matched symlinks, but "path-metadata" cannot apply to symlinks.'
538 )
539 self._match_rule_had_no_matches(match_rule, self._definition_source)
541 if not d: 541 ↛ 543line 541 didn't jump to line 543 because the condition on line 541 was always true
542 return
543 for recurse_dir in d:
544 for path in recurse_dir.all_paths():
545 if path.is_symlink:
546 continue
547 _apply_owner_and_mode(
548 path,
549 owner,
550 group,
551 mode,
552 capabilities,
553 capability_mode,
554 definition_source,
555 )
558class ModeNormalizationTransformationRule(TransformationRule):
559 __slots__ = ("_normalizations",)
561 def __init__(
562 self,
563 normalizations: Sequence[tuple[MatchRule, FileSystemMode]],
564 ) -> None:
565 # A bit of a hack since it is initialized outside `debputy`. It probably should not
566 # be a "TransformationRule" (hindsight and all)
567 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
568 self._normalizations = normalizations
570 def transform_file_system(
571 self,
572 fs_root: InMemoryVirtualPathBase,
573 condition_context: ConditionContext,
574 ) -> None:
575 seen = set()
576 for match_rule, fs_mode in self._normalizations:
577 for path in match_rule.finditer(
578 fs_root, ignore_paths=lambda p: p.path in seen
579 ):
580 if path.is_symlink or path.path in seen:
581 continue
582 seen.add(path.path)
583 try:
584 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir)
585 except ValueError as e:
586 raise AssertionError(
587 "Error while applying built-in mode normalization rule"
588 ) from e
589 path.mode = desired_mode
592class NormalizeShebangLineTransformation(TransformationRule):
594 def __init__(self) -> None:
595 # A bit of a hack since it is initialized outside `debputy`. It probably should not
596 # be a "TransformationRule" (hindsight and all)
597 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
599 def transform_file_system(
600 self,
601 fs_root: VirtualPath,
602 condition_context: ConditionContext,
603 ) -> None:
604 for path in fs_root.all_paths():
605 if not path.is_file:
606 continue
607 try:
608 with path.open(byte_io=True, buffering=4096) as fd:
609 interpreter = extract_shebang_interpreter_from_file(fd)
610 except (PureVirtualPathError, TestPathWithNonExistentFSPathError):
611 # Do not make tests unnecessarily complex to write
612 continue
613 if interpreter is None:
614 continue
616 if interpreter.fixup_needed:
617 interpreter.replace_shebang_line(path)