Coverage for src/debputy/transformation_rules.py: 74%
283 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import os
3from typing import (
4 NoReturn,
5 Optional,
6 Tuple,
7 List,
8 Literal,
9 Dict,
10 TypeVar,
11 cast,
12 final,
13)
14from collections.abc import Callable, Sequence
16from debputy.exceptions import (
17 DebputyRuntimeError,
18 PureVirtualPathError,
19 TestPathWithNonExistentFSPathError,
20)
21from debputy.filesystem_scan import FSPath
22from debputy.interpreter import (
23 extract_shebang_interpreter_from_file,
24)
25from debputy.manifest_conditions import ConditionContext, ManifestCondition
26from debputy.manifest_parser.base_types import (
27 FileSystemMode,
28 StaticFileSystemOwner,
29 StaticFileSystemGroup,
30)
31from debputy.manifest_parser.tagging_types import DebputyDispatchableType
32from debputy.manifest_parser.util import AttributePath
33from debputy.path_matcher import MatchRule
34from debputy.plugin.api import VirtualPath
35from debputy.plugins.debputy.types import DebputyCapability
36from debputy.plugin.plugin_state import (
37 run_in_context_of_plugin_wrap_errors,
38)
39from debputy.util import _warn
42class TransformationRuntimeError(DebputyRuntimeError):
43 pass
46CreateSymlinkReplacementRule = Literal[
47 "error-if-exists",
48 "error-if-directory",
49 "abort-on-non-empty-directory",
50 "discard-existing",
51]
54VP = TypeVar("VP", bound=VirtualPath)
57@dataclasses.dataclass(frozen=True, slots=True)
58class PreProvidedExclusion:
59 tag: str
60 description: str
61 pruner: Callable[[FSPath], None]
64class TransformationRule(DebputyDispatchableType):
66 __slots__ = ()
68 @final
69 def run_transform_file_system(
70 self,
71 fs_root: FSPath,
72 condition_context: ConditionContext,
73 ) -> None:
74 run_in_context_of_plugin_wrap_errors(
75 self._debputy_plugin,
76 self.transform_file_system,
77 fs_root,
78 condition_context,
79 )
81 def transform_file_system(
82 self,
83 fs_root: FSPath,
84 condition_context: ConditionContext,
85 ) -> None:
86 raise NotImplementedError
88 def _evaluate_condition(
89 self,
90 condition: ManifestCondition | None,
91 condition_context: ConditionContext,
92 result_if_condition_is_missing: bool = True,
93 ) -> bool:
94 if condition is None: 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was always true
95 return result_if_condition_is_missing
96 return condition.evaluate(condition_context)
98 def _error(
99 self,
100 msg: str,
101 *,
102 caused_by: BaseException | None = None,
103 ) -> NoReturn:
104 raise TransformationRuntimeError(msg) from caused_by
106 def _match_rule_had_no_matches(
107 self, match_rule: MatchRule, definition_source: str
108 ) -> NoReturn:
109 self._error(
110 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did'
111 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is"
112 " incorrect."
113 )
115 def _fs_path_as_dir(
116 self,
117 path: VP,
118 definition_source: str,
119 ) -> VP:
120 if path.is_dir: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true
121 return path
122 path_type = "file" if path.is_file else 'symlink/"special file system object"'
123 self._error(
124 f"The path {path.path} was expected to be a directory (or non-existing) due to"
125 f" {definition_source}. However that path existed and is a {path_type}."
126 f" You may need a `remove: {path.path}` prior to {definition_source} to"
127 " to make this transformation succeed."
128 )
130 def _ensure_is_directory(
131 self,
132 fs_root: FSPath,
133 path_to_directory: str,
134 definition_source: str,
135 ) -> FSPath:
136 current, missing_parts = fs_root.attempt_lookup(path_to_directory)
137 current = self._fs_path_as_dir(cast("FSPath", current), definition_source)
138 if missing_parts:
139 return current.mkdirs("/".join(missing_parts))
140 return current
143class RemoveTransformationRule(TransformationRule):
144 __slots__ = (
145 "_match_rules",
146 "_keep_empty_parent_dirs",
147 "_definition_source",
148 )
150 def __init__(
151 self,
152 match_rules: Sequence[MatchRule],
153 keep_empty_parent_dirs: bool,
154 definition_source: AttributePath,
155 ) -> None:
156 super().__init__()
157 self._match_rules = match_rules
158 self._keep_empty_parent_dirs = keep_empty_parent_dirs
159 self._definition_source = definition_source.path
161 def transform_file_system(
162 self,
163 fs_root: FSPath,
164 condition_context: ConditionContext,
165 ) -> None:
166 matched_any = False
167 for match_rule in self._match_rules:
168 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a
169 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52
170 matches = list(match_rule.finditer(fs_root))
171 for m in matches:
172 matched_any = True
173 parent = m.parent_dir
174 if parent is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 self._error(
176 f"Cannot remove the root directory (triggered by {self._definition_source})"
177 )
178 m.unlink(recursive=True)
179 if not self._keep_empty_parent_dirs:
180 parent.prune_if_empty_dir()
181 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures
182 if not matched_any:
183 self._match_rule_had_no_matches(match_rule, self._definition_source)
186class MoveTransformationRule(TransformationRule):
187 __slots__ = (
188 "_match_rule",
189 "_dest_path",
190 "_dest_is_dir",
191 "_definition_source",
192 "_condition",
193 )
195 def __init__(
196 self,
197 match_rule: MatchRule,
198 dest_path: str,
199 dest_is_dir: bool,
200 definition_source: AttributePath,
201 condition: ManifestCondition | None,
202 ) -> None:
203 super().__init__()
204 self._match_rule = match_rule
205 self._dest_path = dest_path
206 self._dest_is_dir = dest_is_dir
207 self._definition_source = definition_source.path
208 self._condition = condition
210 def transform_file_system(
211 self, fs_root: FSPath, condition_context: ConditionContext
212 ) -> None:
213 if not self._evaluate_condition(self._condition, condition_context): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 return
215 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la)
216 matches = list(self._match_rule.finditer(fs_root))
217 if not matches:
218 self._match_rule_had_no_matches(self._match_rule, self._definition_source)
220 target_dir: VirtualPath | None
221 if self._dest_is_dir: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 target_dir = self._ensure_is_directory(
223 fs_root,
224 self._dest_path,
225 self._definition_source,
226 )
227 else:
228 dir_part, basename = os.path.split(self._dest_path)
229 target_parent_dir = self._ensure_is_directory(
230 fs_root,
231 dir_part,
232 self._definition_source,
233 )
234 target_dir = target_parent_dir.get(basename)
236 if target_dir is None or not target_dir.is_dir: 236 ↛ 256line 236 didn't jump to line 256 because the condition on line 236 was always true
237 if len(matches) > 1: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 self._error(
239 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
240 f" (from: {self._definition_source}). Multiple paths matched the pattern and the"
241 " destination was not a directory. Either correct the pattern to only match only source"
242 " OR define the destination to be a directory (E.g., add a trailing slash - example:"
243 f' "{self._dest_path}/")'
244 )
245 p = matches[0]
246 if p.path == self._dest_path: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 self._error(
248 f"Error in {self._definition_source}, the source"
249 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the"
250 " rename redundant!?"
251 )
252 p.parent_dir = target_parent_dir
253 p.name = basename
254 return
256 assert target_dir is not None and target_dir.is_dir
257 basenames: dict[str, VirtualPath] = dict()
258 target_dir_path = target_dir.path
260 for m in matches:
261 if m.path == target_dir_path:
262 self._error(
263 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}"
264 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into"
265 " itself"
266 )
267 if m.name in basenames:
268 alt_path = basenames[m.name]
269 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be
270 # possible for a *single* glob to match the same path twice.
271 assert alt_path is not m
272 self._error(
273 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
274 f" (from: {self._definition_source}). Multiple paths matched the pattern had the"
275 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the'
276 f" pattern, so it only matches one path with that basename to avoid this conflict."
277 )
278 existing = m.get(m.name)
279 if existing and existing.is_dir:
280 self._error(
281 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
282 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace"
283 f" the existing directory {existing.path}. If this replacement is intentional, then please"
284 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)'
285 )
286 basenames[m.name] = m
287 m.parent_dir = target_dir
290class CreateSymlinkPathTransformationRule(TransformationRule):
291 __slots__ = (
292 "_link_dest",
293 "_link_target",
294 "_replacement_rule",
295 "_definition_source",
296 "_condition",
297 )
299 def __init__(
300 self,
301 link_target: str,
302 link_dest: str,
303 replacement_rule: CreateSymlinkReplacementRule,
304 definition_source: AttributePath,
305 condition: ManifestCondition | None,
306 ) -> None:
307 super().__init__()
308 self._link_target = link_target
309 self._link_dest = link_dest
310 self._replacement_rule = replacement_rule
311 self._definition_source = definition_source.path
312 self._condition = condition
314 def transform_file_system(
315 self,
316 fs_root: FSPath,
317 condition_context: ConditionContext,
318 ) -> None:
319 if not self._evaluate_condition(self._condition, condition_context): 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 return
321 dir_path_part, link_name = os.path.split(self._link_dest)
322 dir_path = self._ensure_is_directory(
323 fs_root,
324 dir_path_part,
325 self._definition_source,
326 )
327 existing = dir_path.get(link_name)
328 if existing:
329 self._handle_existing_path(existing)
330 dir_path.add_symlink(link_name, self._link_target)
332 def _handle_existing_path(self, existing: VirtualPath) -> None:
333 replacement_rule = self._replacement_rule
334 if replacement_rule == "abort-on-non-empty-directory":
335 unlink = not existing.is_dir or not any(existing.iterdir)
336 reason = "the path is a non-empty directory"
337 elif replacement_rule == "discard-existing": 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 unlink = True
339 reason = "<<internal error: you should not see an error with this message>>"
340 elif replacement_rule == "error-if-directory":
341 unlink = not existing.is_dir
342 reason = "the path is a directory"
343 else:
344 assert replacement_rule == "error-if-exists"
345 unlink = False
346 reason = "the path exists"
348 if unlink:
349 existing.unlink(recursive=True)
350 else:
351 self._error(
352 f"Refusing to replace {existing.path} with a symlink; {reason} and"
353 f" the active replacement-rule was {self._replacement_rule}. You can"
354 f' set the replacement-rule to "discard-existing", if you are not interested'
355 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}."
356 )
359class CreateDirectoryTransformationRule(TransformationRule):
360 __slots__ = (
361 "_directories",
362 "_owner",
363 "_group",
364 "_mode",
365 "_definition_source",
366 "_condition",
367 )
369 def __init__(
370 self,
371 directories: Sequence[str],
372 owner: StaticFileSystemOwner | None,
373 group: StaticFileSystemGroup | None,
374 mode: FileSystemMode | None,
375 definition_source: str,
376 condition: ManifestCondition | None,
377 ) -> None:
378 super().__init__()
379 self._directories = directories
380 self._owner = owner
381 self._group = group
382 self._mode = mode
383 self._definition_source = definition_source
384 self._condition = condition
386 def transform_file_system(
387 self,
388 fs_root: FSPath,
389 condition_context: ConditionContext,
390 ) -> None:
391 if not self._evaluate_condition(self._condition, condition_context): 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 return
393 owner = self._owner
394 group = self._group
395 mode = self._mode
396 for directory in self._directories:
397 dir_path = self._ensure_is_directory(
398 fs_root,
399 directory,
400 self._definition_source,
401 )
403 if mode is not None:
404 try:
405 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir)
406 except ValueError as e:
407 self._error(
408 f"Could not compute desired mode for {dir_path.path} as"
409 f" requested in {self._definition_source}: {e.args[0]}",
410 caused_by=e,
411 )
412 dir_path.mode = desired_mode
413 dir_path.chown(owner, group)
416def _apply_owner_and_mode(
417 path: VirtualPath,
418 owner: StaticFileSystemOwner | None,
419 group: StaticFileSystemGroup | None,
420 mode: FileSystemMode | None,
421 capabilities: str | None,
422 capability_mode: FileSystemMode | None,
423 definition_source: str,
424) -> None:
425 if owner is not None or group is not None: 425 ↛ 427line 425 didn't jump to line 427 because the condition on line 425 was always true
426 path.chown(owner, group)
427 if mode is not None: 427 ↛ 437line 427 didn't jump to line 437 because the condition on line 427 was always true
428 try:
429 desired_mode = mode.compute_mode(path.mode, path.is_dir)
430 except ValueError as e:
431 raise TransformationRuntimeError(
432 f"Could not compute desired mode for {path.path} as"
433 f" requested in {definition_source}: {e.args[0]}"
434 ) from e
435 path.mode = desired_mode
437 if path.is_file and capabilities is not None: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 cap_ref = path.metadata(DebputyCapability)
439 cap_value = cap_ref.value
440 if cap_value is not None:
441 _warn(
442 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due"
443 f" to {definition_source}."
444 )
445 assert capability_mode is not None
446 cap_ref.value = DebputyCapability(
447 capabilities,
448 capability_mode,
449 definition_source,
450 )
453class PathMetadataTransformationRule(TransformationRule):
454 __slots__ = (
455 "_match_rules",
456 "_owner",
457 "_group",
458 "_mode",
459 "_capabilities",
460 "_capability_mode",
461 "_recursive",
462 "_definition_source",
463 "_condition",
464 )
466 def __init__(
467 self,
468 match_rules: Sequence[MatchRule],
469 owner: StaticFileSystemOwner | None,
470 group: StaticFileSystemGroup | None,
471 mode: FileSystemMode | None,
472 recursive: bool,
473 capabilities: str | None,
474 capability_mode: FileSystemMode | None,
475 definition_source: str,
476 condition: ManifestCondition | None,
477 ) -> None:
478 super().__init__()
479 self._match_rules = match_rules
480 self._owner = owner
481 self._group = group
482 self._mode = mode
483 self._capabilities = capabilities
484 self._capability_mode = capability_mode
485 self._recursive = recursive
486 self._definition_source = definition_source
487 self._condition = condition
488 if self._capabilities is None and self._capability_mode is not None: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 raise ValueError("capability_mode without capabilities")
490 if self._capabilities is not None and self._capability_mode is None: 490 ↛ 491line 490 didn't jump to line 491 because the condition on line 490 was never true
491 raise ValueError("capabilities without capability_mode")
493 def transform_file_system(
494 self,
495 fs_root: FSPath,
496 condition_context: ConditionContext,
497 ) -> None:
498 if not self._evaluate_condition(self._condition, condition_context): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 return
500 owner = self._owner
501 group = self._group
502 mode = self._mode
503 capabilities = self._capabilities
504 capability_mode = self._capability_mode
505 definition_source = self._definition_source
506 d: list[FSPath] | None = [] if self._recursive else None
507 needs_file_match = True
508 if self._owner is not None or self._group is not None or self._mode is not None: 508 ↛ 511line 508 didn't jump to line 511 because the condition on line 508 was always true
509 needs_file_match = False
511 for match_rule in self._match_rules:
512 match_ok = False
513 saw_symlink = False
514 saw_directory = False
516 for path in match_rule.finditer(fs_root):
517 if path.is_symlink: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 saw_symlink = True
519 continue
520 if path.is_file or not needs_file_match: 520 ↛ 522line 520 didn't jump to line 522 because the condition on line 520 was always true
521 match_ok = True
522 if path.is_dir: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 saw_directory = True
524 if not match_ok and needs_file_match and self._recursive:
525 match_ok = any(p.is_file for p in path.all_paths())
526 _apply_owner_and_mode(
527 path,
528 owner,
529 group,
530 mode,
531 capabilities,
532 capability_mode,
533 definition_source,
534 )
535 if path.is_dir and d is not None: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true
536 d.append(path)
538 if not match_ok: 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true
539 if needs_file_match and (saw_directory or saw_symlink):
540 _warn(
541 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
542 " did not match any files, but given the attributes it can only apply to files."
543 )
544 elif saw_symlink:
545 _warn(
546 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
547 ' matched symlinks, but "path-metadata" cannot apply to symlinks.'
548 )
549 self._match_rule_had_no_matches(match_rule, self._definition_source)
551 if not d: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was always true
552 return
553 for recurse_dir in d:
554 for path in recurse_dir.all_paths():
555 if path.is_symlink:
556 continue
557 _apply_owner_and_mode(
558 path,
559 owner,
560 group,
561 mode,
562 capabilities,
563 capability_mode,
564 definition_source,
565 )
568class ModeNormalizationTransformationRule(TransformationRule):
569 __slots__ = ("_normalizations",)
571 def __init__(
572 self,
573 normalizations: Sequence[tuple[MatchRule, FileSystemMode]],
574 ) -> None:
575 # A bit of a hack since it is initialized outside `debputy`. It probably should not
576 # be a "TransformationRule" (hindsight and all)
577 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
578 self._normalizations = normalizations
580 def transform_file_system(
581 self,
582 fs_root: FSPath,
583 condition_context: ConditionContext,
584 ) -> None:
585 seen = set()
586 for match_rule, fs_mode in self._normalizations:
587 for path in match_rule.finditer(
588 fs_root, ignore_paths=lambda p: p.path in seen
589 ):
590 if path.is_symlink or path.path in seen:
591 continue
592 seen.add(path.path)
593 try:
594 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir)
595 except ValueError as e:
596 raise AssertionError(
597 "Error while applying built-in mode normalization rule"
598 ) from e
599 path.mode = desired_mode
602class NormalizeShebangLineTransformation(TransformationRule):
604 def __init__(self) -> None:
605 # A bit of a hack since it is initialized outside `debputy`. It probably should not
606 # be a "TransformationRule" (hindsight and all)
607 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
609 def transform_file_system(
610 self,
611 fs_root: VirtualPath,
612 condition_context: ConditionContext,
613 ) -> None:
614 for path in fs_root.all_paths():
615 if not path.is_file:
616 continue
617 try:
618 with path.open(byte_io=True, buffering=4096) as fd:
619 interpreter = extract_shebang_interpreter_from_file(fd)
620 except (PureVirtualPathError, TestPathWithNonExistentFSPathError):
621 # Do not make tests unnecessarily complex to write
622 continue
623 if interpreter is None:
624 continue
626 if interpreter.fixup_needed:
627 interpreter.replace_shebang_line(path)