Coverage for src/debputy/transformation_rules.py: 74%
282 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import os
3from typing import (
4 NoReturn,
5 Optional,
6 Callable,
7 Sequence,
8 Tuple,
9 List,
10 Literal,
11 Dict,
12 TypeVar,
13 cast,
14 final,
15)
17from debputy.exceptions import (
18 DebputyRuntimeError,
19 PureVirtualPathError,
20 TestPathWithNonExistentFSPathError,
21)
22from debputy.filesystem_scan import FSPath
23from debputy.interpreter import (
24 extract_shebang_interpreter_from_file,
25)
26from debputy.manifest_conditions import ConditionContext, ManifestCondition
27from debputy.manifest_parser.base_types import (
28 FileSystemMode,
29 StaticFileSystemOwner,
30 StaticFileSystemGroup,
31)
32from debputy.manifest_parser.tagging_types import DebputyDispatchableType
33from debputy.manifest_parser.util import AttributePath
34from debputy.path_matcher import MatchRule
35from debputy.plugin.api import VirtualPath
36from debputy.plugin.debputy.types import DebputyCapability
37from debputy.plugin.plugin_state import (
38 run_in_context_of_plugin_wrap_errors,
39)
40from debputy.util import _warn
43class TransformationRuntimeError(DebputyRuntimeError):
44 pass
47CreateSymlinkReplacementRule = Literal[
48 "error-if-exists",
49 "error-if-directory",
50 "abort-on-non-empty-directory",
51 "discard-existing",
52]
55VP = TypeVar("VP", bound=VirtualPath)
58@dataclasses.dataclass(frozen=True, slots=True)
59class PreProvidedExclusion:
60 tag: str
61 description: str
62 pruner: Callable[[FSPath], None]
65class TransformationRule(DebputyDispatchableType):
67 __slots__ = ()
69 @final
70 def run_transform_file_system(
71 self,
72 fs_root: FSPath,
73 condition_context: ConditionContext,
74 ) -> None:
75 run_in_context_of_plugin_wrap_errors(
76 self._debputy_plugin,
77 self.transform_file_system,
78 fs_root,
79 condition_context,
80 )
82 def transform_file_system(
83 self,
84 fs_root: FSPath,
85 condition_context: ConditionContext,
86 ) -> None:
87 raise NotImplementedError
89 def _evaluate_condition(
90 self,
91 condition: Optional[ManifestCondition],
92 condition_context: ConditionContext,
93 result_if_condition_is_missing: bool = True,
94 ) -> bool:
95 if condition is None: 95 ↛ 97line 95 didn't jump to line 97 because the condition on line 95 was always true
96 return result_if_condition_is_missing
97 return condition.evaluate(condition_context)
99 def _error(
100 self,
101 msg: str,
102 *,
103 caused_by: Optional[BaseException] = None,
104 ) -> NoReturn:
105 raise TransformationRuntimeError(msg) from caused_by
107 def _match_rule_had_no_matches(
108 self, match_rule: MatchRule, definition_source: str
109 ) -> NoReturn:
110 self._error(
111 f'The match rule "{match_rule.describe_match_short()}" in transformation "{definition_source}" did'
112 " not match any paths. Either the definition is redundant (and can be omitted) or the match rule is"
113 " incorrect."
114 )
116 def _fs_path_as_dir(
117 self,
118 path: VP,
119 definition_source: str,
120 ) -> VP:
121 if path.is_dir: 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true
122 return path
123 path_type = "file" if path.is_file else 'symlink/"special file system object"'
124 self._error(
125 f"The path {path.path} was expected to be a directory (or non-existing) due to"
126 f" {definition_source}. However that path existed and is a {path_type}."
127 f" You may need a `remove: {path.path}` prior to {definition_source} to"
128 " to make this transformation succeed."
129 )
131 def _ensure_is_directory(
132 self,
133 fs_root: FSPath,
134 path_to_directory: str,
135 definition_source: str,
136 ) -> FSPath:
137 current, missing_parts = fs_root.attempt_lookup(path_to_directory)
138 current = self._fs_path_as_dir(cast("FSPath", current), definition_source)
139 if missing_parts:
140 return current.mkdirs("/".join(missing_parts))
141 return current
144class RemoveTransformationRule(TransformationRule):
145 __slots__ = (
146 "_match_rules",
147 "_keep_empty_parent_dirs",
148 "_definition_source",
149 )
151 def __init__(
152 self,
153 match_rules: Sequence[MatchRule],
154 keep_empty_parent_dirs: bool,
155 definition_source: AttributePath,
156 ) -> None:
157 super().__init__()
158 self._match_rules = match_rules
159 self._keep_empty_parent_dirs = keep_empty_parent_dirs
160 self._definition_source = definition_source.path
162 def transform_file_system(
163 self,
164 fs_root: FSPath,
165 condition_context: ConditionContext,
166 ) -> None:
167 matched_any = False
168 for match_rule in self._match_rules:
169 # Fully resolve the matches to avoid RuntimeError caused by collection changing size as a
170 # consequence of the removal: https://salsa.debian.org/debian/debputy/-/issues/52
171 matches = list(match_rule.finditer(fs_root))
172 for m in matches:
173 matched_any = True
174 parent = m.parent_dir
175 if parent is None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 self._error(
177 f"Cannot remove the root directory (triggered by {self._definition_source})"
178 )
179 m.unlink(recursive=True)
180 if not self._keep_empty_parent_dirs:
181 parent.prune_if_empty_dir()
182 # FIXME: `rm` should probably be forgiving or at least support a condition to avoid failures
183 if not matched_any:
184 self._match_rule_had_no_matches(match_rule, self._definition_source)
187class MoveTransformationRule(TransformationRule):
188 __slots__ = (
189 "_match_rule",
190 "_dest_path",
191 "_dest_is_dir",
192 "_definition_source",
193 "_condition",
194 )
196 def __init__(
197 self,
198 match_rule: MatchRule,
199 dest_path: str,
200 dest_is_dir: bool,
201 definition_source: AttributePath,
202 condition: Optional[ManifestCondition],
203 ) -> None:
204 super().__init__()
205 self._match_rule = match_rule
206 self._dest_path = dest_path
207 self._dest_is_dir = dest_is_dir
208 self._definition_source = definition_source.path
209 self._condition = condition
211 def transform_file_system(
212 self, fs_root: FSPath, condition_context: ConditionContext
213 ) -> None:
214 if not self._evaluate_condition(self._condition, condition_context): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 return
216 # Eager resolve is necessary to avoid "self-recursive" matching in special cases (e.g., **/*.la)
217 matches = list(self._match_rule.finditer(fs_root))
218 if not matches:
219 self._match_rule_had_no_matches(self._match_rule, self._definition_source)
221 target_dir: Optional[VirtualPath]
222 if self._dest_is_dir: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 target_dir = self._ensure_is_directory(
224 fs_root,
225 self._dest_path,
226 self._definition_source,
227 )
228 else:
229 dir_part, basename = os.path.split(self._dest_path)
230 target_parent_dir = self._ensure_is_directory(
231 fs_root,
232 dir_part,
233 self._definition_source,
234 )
235 target_dir = target_parent_dir.get(basename)
237 if target_dir is None or not target_dir.is_dir: 237 ↛ 257line 237 didn't jump to line 257 because the condition on line 237 was always true
238 if len(matches) > 1: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 self._error(
240 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
241 f" (from: {self._definition_source}). Multiple paths matched the pattern and the"
242 " destination was not a directory. Either correct the pattern to only match only source"
243 " OR define the destination to be a directory (E.g., add a trailing slash - example:"
244 f' "{self._dest_path}/")'
245 )
246 p = matches[0]
247 if p.path == self._dest_path: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 self._error(
249 f"Error in {self._definition_source}, the source"
250 f" {self._match_rule.describe_match_short()} matched {self._dest_path} making the"
251 " rename redundant!?"
252 )
253 p.parent_dir = target_parent_dir
254 p.name = basename
255 return
257 assert target_dir is not None and target_dir.is_dir
258 basenames: Dict[str, VirtualPath] = dict()
259 target_dir_path = target_dir.path
261 for m in matches:
262 if m.path == target_dir_path:
263 self._error(
264 f"Error in {self._definition_source}, the source {self._match_rule.describe_match_short()}"
265 f"matched {self._dest_path} (among other), but it is not possible to copy a directory into"
266 " itself"
267 )
268 if m.name in basenames:
269 alt_path = basenames[m.name]
270 # We document "two *distinct*" paths. However, as the glob matches are written, it should not be
271 # possible for a *single* glob to match the same path twice.
272 assert alt_path is not m
273 self._error(
274 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
275 f" (from: {self._definition_source}). Multiple paths matched the pattern had the"
276 f' same basename "{m.name}" ("{m.path}" vs. "{alt_path.path}"). Please correct the'
277 f" pattern, so it only matches one path with that basename to avoid this conflict."
278 )
279 existing = m.get(m.name)
280 if existing and existing.is_dir:
281 self._error(
282 f"Could not rename {self._match_rule.describe_match_short()} to {self._dest_path}"
283 f" (from: {self._definition_source}). The pattern matched {m.path} which would replace"
284 f" the existing directory {existing.path}. If this replacement is intentional, then please"
285 f' remove "{existing.path}" first (e.g., via `- remove: "{existing.path}"`)'
286 )
287 basenames[m.name] = m
288 m.parent_dir = target_dir
291class CreateSymlinkPathTransformationRule(TransformationRule):
292 __slots__ = (
293 "_link_dest",
294 "_link_target",
295 "_replacement_rule",
296 "_definition_source",
297 "_condition",
298 )
300 def __init__(
301 self,
302 link_target: str,
303 link_dest: str,
304 replacement_rule: CreateSymlinkReplacementRule,
305 definition_source: AttributePath,
306 condition: Optional[ManifestCondition],
307 ) -> None:
308 super().__init__()
309 self._link_target = link_target
310 self._link_dest = link_dest
311 self._replacement_rule = replacement_rule
312 self._definition_source = definition_source.path
313 self._condition = condition
315 def transform_file_system(
316 self,
317 fs_root: FSPath,
318 condition_context: ConditionContext,
319 ) -> None:
320 if not self._evaluate_condition(self._condition, condition_context): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 return
322 dir_path_part, link_name = os.path.split(self._link_dest)
323 dir_path = self._ensure_is_directory(
324 fs_root,
325 dir_path_part,
326 self._definition_source,
327 )
328 existing = dir_path.get(link_name)
329 if existing:
330 self._handle_existing_path(existing)
331 dir_path.add_symlink(link_name, self._link_target)
333 def _handle_existing_path(self, existing: VirtualPath) -> None:
334 replacement_rule = self._replacement_rule
335 if replacement_rule == "abort-on-non-empty-directory":
336 unlink = not existing.is_dir or not any(existing.iterdir)
337 reason = "the path is a non-empty directory"
338 elif replacement_rule == "discard-existing": 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 unlink = True
340 reason = "<<internal error: you should not see an error with this message>>"
341 elif replacement_rule == "error-if-directory":
342 unlink = not existing.is_dir
343 reason = "the path is a directory"
344 else:
345 assert replacement_rule == "error-if-exists"
346 unlink = False
347 reason = "the path exists"
349 if unlink:
350 existing.unlink(recursive=True)
351 else:
352 self._error(
353 f"Refusing to replace {existing.path} with a symlink; {reason} and"
354 f" the active replacement-rule was {self._replacement_rule}. You can"
355 f' set the replacement-rule to "discard-existing", if you are not interested'
356 f" in the contents of {existing.path}. This error was triggered by {self._definition_source}."
357 )
360class CreateDirectoryTransformationRule(TransformationRule):
361 __slots__ = (
362 "_directories",
363 "_owner",
364 "_group",
365 "_mode",
366 "_definition_source",
367 "_condition",
368 )
370 def __init__(
371 self,
372 directories: Sequence[str],
373 owner: Optional[StaticFileSystemOwner],
374 group: Optional[StaticFileSystemGroup],
375 mode: Optional[FileSystemMode],
376 definition_source: str,
377 condition: Optional[ManifestCondition],
378 ) -> None:
379 super().__init__()
380 self._directories = directories
381 self._owner = owner
382 self._group = group
383 self._mode = mode
384 self._definition_source = definition_source
385 self._condition = condition
387 def transform_file_system(
388 self,
389 fs_root: FSPath,
390 condition_context: ConditionContext,
391 ) -> None:
392 if not self._evaluate_condition(self._condition, condition_context): 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true
393 return
394 owner = self._owner
395 group = self._group
396 mode = self._mode
397 for directory in self._directories:
398 dir_path = self._ensure_is_directory(
399 fs_root,
400 directory,
401 self._definition_source,
402 )
404 if mode is not None:
405 try:
406 desired_mode = mode.compute_mode(dir_path.mode, dir_path.is_dir)
407 except ValueError as e:
408 self._error(
409 f"Could not compute desired mode for {dir_path.path} as"
410 f" requested in {self._definition_source}: {e.args[0]}",
411 caused_by=e,
412 )
413 dir_path.mode = desired_mode
414 dir_path.chown(owner, group)
417def _apply_owner_and_mode(
418 path: VirtualPath,
419 owner: Optional[StaticFileSystemOwner],
420 group: Optional[StaticFileSystemGroup],
421 mode: Optional[FileSystemMode],
422 capabilities: Optional[str],
423 capability_mode: Optional[FileSystemMode],
424 definition_source: str,
425) -> None:
426 if owner is not None or group is not None: 426 ↛ 428line 426 didn't jump to line 428 because the condition on line 426 was always true
427 path.chown(owner, group)
428 if mode is not None: 428 ↛ 438line 428 didn't jump to line 438 because the condition on line 428 was always true
429 try:
430 desired_mode = mode.compute_mode(path.mode, path.is_dir)
431 except ValueError as e:
432 raise TransformationRuntimeError(
433 f"Could not compute desired mode for {path.path} as"
434 f" requested in {definition_source}: {e.args[0]}"
435 ) from e
436 path.mode = desired_mode
438 if path.is_file and capabilities is not None: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 cap_ref = path.metadata(DebputyCapability)
440 cap_value = cap_ref.value
441 if cap_value is not None:
442 _warn(
443 f"Replacing the capabilities set on path {path.path} from {cap_value.definition_source} due"
444 f" to {definition_source}."
445 )
446 assert capability_mode is not None
447 cap_ref.value = DebputyCapability(
448 capabilities,
449 capability_mode,
450 definition_source,
451 )
454class PathMetadataTransformationRule(TransformationRule):
455 __slots__ = (
456 "_match_rules",
457 "_owner",
458 "_group",
459 "_mode",
460 "_capabilities",
461 "_capability_mode",
462 "_recursive",
463 "_definition_source",
464 "_condition",
465 )
467 def __init__(
468 self,
469 match_rules: Sequence[MatchRule],
470 owner: Optional[StaticFileSystemOwner],
471 group: Optional[StaticFileSystemGroup],
472 mode: Optional[FileSystemMode],
473 recursive: bool,
474 capabilities: Optional[str],
475 capability_mode: Optional[FileSystemMode],
476 definition_source: str,
477 condition: Optional[ManifestCondition],
478 ) -> None:
479 super().__init__()
480 self._match_rules = match_rules
481 self._owner = owner
482 self._group = group
483 self._mode = mode
484 self._capabilities = capabilities
485 self._capability_mode = capability_mode
486 self._recursive = recursive
487 self._definition_source = definition_source
488 self._condition = condition
489 if self._capabilities is None and self._capability_mode is not None: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 raise ValueError("capability_mode without capabilities")
491 if self._capabilities is not None and self._capability_mode is None: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 raise ValueError("capabilities without capability_mode")
494 def transform_file_system(
495 self,
496 fs_root: FSPath,
497 condition_context: ConditionContext,
498 ) -> None:
499 if not self._evaluate_condition(self._condition, condition_context): 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true
500 return
501 owner = self._owner
502 group = self._group
503 mode = self._mode
504 capabilities = self._capabilities
505 capability_mode = self._capability_mode
506 definition_source = self._definition_source
507 d: Optional[List[FSPath]] = [] if self._recursive else None
508 needs_file_match = True
509 if self._owner is not None or self._group is not None or self._mode is not None: 509 ↛ 512line 509 didn't jump to line 512 because the condition on line 509 was always true
510 needs_file_match = False
512 for match_rule in self._match_rules:
513 match_ok = False
514 saw_symlink = False
515 saw_directory = False
517 for path in match_rule.finditer(fs_root):
518 if path.is_symlink: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 saw_symlink = True
520 continue
521 if path.is_file or not needs_file_match: 521 ↛ 523line 521 didn't jump to line 523 because the condition on line 521 was always true
522 match_ok = True
523 if path.is_dir: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 saw_directory = True
525 if not match_ok and needs_file_match and self._recursive:
526 match_ok = any(p.is_file for p in path.all_paths())
527 _apply_owner_and_mode(
528 path,
529 owner,
530 group,
531 mode,
532 capabilities,
533 capability_mode,
534 definition_source,
535 )
536 if path.is_dir and d is not None: 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true
537 d.append(path)
539 if not match_ok: 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 if needs_file_match and (saw_directory or saw_symlink):
541 _warn(
542 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
543 " did not match any files, but given the attributes it can only apply to files."
544 )
545 elif saw_symlink:
546 _warn(
547 f"The match rule {match_rule.describe_match_short()} (from {self._definition_source})"
548 ' matched symlinks, but "path-metadata" cannot apply to symlinks.'
549 )
550 self._match_rule_had_no_matches(match_rule, self._definition_source)
552 if not d: 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true
553 return
554 for recurse_dir in d:
555 for path in recurse_dir.all_paths():
556 if path.is_symlink:
557 continue
558 _apply_owner_and_mode(
559 path,
560 owner,
561 group,
562 mode,
563 capabilities,
564 capability_mode,
565 definition_source,
566 )
569class ModeNormalizationTransformationRule(TransformationRule):
570 __slots__ = ("_normalizations",)
572 def __init__(
573 self,
574 normalizations: Sequence[Tuple[MatchRule, FileSystemMode]],
575 ) -> None:
576 # A bit of a hack since it is initialized outside `debputy`. It probably should not
577 # be a "TransformationRule" (hindsight and all)
578 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
579 self._normalizations = normalizations
581 def transform_file_system(
582 self,
583 fs_root: FSPath,
584 condition_context: ConditionContext,
585 ) -> None:
586 seen = set()
587 for match_rule, fs_mode in self._normalizations:
588 for path in match_rule.finditer(
589 fs_root, ignore_paths=lambda p: p.path in seen
590 ):
591 if path.is_symlink or path.path in seen:
592 continue
593 seen.add(path.path)
594 try:
595 desired_mode = fs_mode.compute_mode(path.mode, path.is_dir)
596 except ValueError as e:
597 raise AssertionError(
598 "Error while applying built-in mode normalization rule"
599 ) from e
600 path.mode = desired_mode
603class NormalizeShebangLineTransformation(TransformationRule):
605 def __init__(self) -> None:
606 # A bit of a hack since it is initialized outside `debputy`. It probably should not
607 # be a "TransformationRule" (hindsight and all)
608 run_in_context_of_plugin_wrap_errors("debputy", super().__init__)
610 def transform_file_system(
611 self,
612 fs_root: VirtualPath,
613 condition_context: ConditionContext,
614 ) -> None:
615 for path in fs_root.all_paths():
616 if not path.is_file:
617 continue
618 try:
619 with path.open(byte_io=True, buffering=4096) as fd:
620 interpreter = extract_shebang_interpreter_from_file(fd)
621 except (PureVirtualPathError, TestPathWithNonExistentFSPathError):
622 # Do not make tests unnecessarily complex to write
623 continue
624 if interpreter is None:
625 continue
627 if interpreter.fixup_needed:
628 interpreter.replace_shebang_line(path)