Coverage for src/debputy/filesystem_scan.py: 66%
1295 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import shutil
9import stat
10import subprocess
11import tempfile
12import time
13from abc import ABC
14from contextlib import suppress
15from typing import (
16 List,
17 Dict,
18 Optional,
19 Tuple,
20 Union,
21 cast,
22 Any,
23 ContextManager,
24 TextIO,
25 BinaryIO,
26 NoReturn,
27 Type,
28 Generic,
29 TypeVar,
30 overload,
31 Literal,
32)
33from collections.abc import Iterable, Iterator, Mapping, Callable
34from weakref import ref, ReferenceType
36from debputy.exceptions import (
37 PureVirtualPathError,
38 DebputyFSIsROError,
39 DebputyMetadataAccessError,
40 TestPathWithNonExistentFSPathError,
41 SymlinkLoopError,
42)
43from debputy.intermediate_manifest import PathType
44from debputy.manifest_parser.base_types import (
45 ROOT_DEFINITION,
46 StaticFileSystemOwner,
47 StaticFileSystemGroup,
48)
49from debputy.plugin.api.spec import (
50 VirtualPath,
51 PathDef,
52 PathMetadataReference,
53 PMT,
54)
55from debputy.types import VP
56from debputy.util import (
57 generated_content_dir,
58 _error,
59 escape_shell,
60 assume_not_none,
61 _normalize_path,
62 _debug_log,
63)
65BY_BASENAME = operator.attrgetter("name")
67FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
68FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
71BinaryOpenMode = Literal[
72 "rb",
73 "r+b",
74 "wb",
75 "w+b",
76 "xb",
77 "ab",
78]
79TextOpenMode = Literal[
80 "r",
81 "r+",
82 "rt",
83 "r+t",
84 "w",
85 "w+",
86 "wt",
87 "w+t",
88 "x",
89 "xt",
90 "a",
91 "at",
92]
93OpenMode = Union[BinaryOpenMode, TextOpenMode]
96class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
97 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
99 def __init__(
100 self,
101 owning_plugin: str,
102 current_plugin: str,
103 metadata_type: type[PMT],
104 ) -> None:
105 self._owning_plugin = owning_plugin
106 self._current_plugin = current_plugin
107 self._metadata_type = metadata_type
109 @property
110 def is_present(self) -> bool:
111 return False
113 @property
114 def can_read(self) -> bool:
115 return self._owning_plugin == self._current_plugin
117 @property
118 def can_write(self) -> bool:
119 return False
121 @property
122 def value(self) -> PMT | None:
123 if self.can_read: 123 ↛ 125line 123 didn't jump to line 125 because the condition on line 123 was always true
124 return None
125 raise DebputyMetadataAccessError(
126 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
127 f" {self._owning_plugin} as the metadata has not been made"
128 f" readable to the plugin {self._current_plugin}."
129 )
131 @value.setter
132 def value(self, new_value: PMT) -> None:
133 if self._is_owner:
134 raise DebputyFSIsROError(
135 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
136 )
137 raise DebputyMetadataAccessError(
138 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
139 f" {self._owning_plugin} as the metadata has not been made"
140 f" read-write to the plugin {self._current_plugin}."
141 )
143 @property
144 def _is_owner(self) -> bool:
145 return self._owning_plugin == self._current_plugin
148@dataclasses.dataclass(slots=True)
149class PathMetadataValue(Generic[PMT]):
150 owning_plugin: str
151 metadata_type: type[PMT]
152 value: PMT | None = None
154 def can_read_value(self, current_plugin: str) -> bool:
155 return self.owning_plugin == current_plugin
157 def can_write_value(self, current_plugin: str) -> bool:
158 return self.owning_plugin == current_plugin
161class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
162 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
164 def __init__(
165 self,
166 owning_path: VirtualPath,
167 current_plugin: str,
168 path_metadata_value: PathMetadataValue[PMT],
169 ) -> None:
170 self._owning_path = owning_path
171 self._current_plugin = current_plugin
172 self._path_metadata_value = path_metadata_value
174 @property
175 def is_present(self) -> bool:
176 if not self.can_read: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 return False
178 return self._path_metadata_value.value is not None
180 @property
181 def can_read(self) -> bool:
182 return self._path_metadata_value.can_read_value(self._current_plugin)
184 @property
185 def can_write(self) -> bool:
186 if not self._path_metadata_value.can_write_value(self._current_plugin): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 return False
188 owning_path = self._owning_path
189 return owning_path.is_read_write and not owning_path.is_detached
191 @property
192 def value(self) -> PMT | None:
193 if self.can_read: 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was always true
194 return self._path_metadata_value.value
195 raise DebputyMetadataAccessError(
196 f"Cannot read the metadata {self._metadata_type_name} owned by"
197 f" {self._owning_plugin} as the metadata has not been made"
198 f" readable to the plugin {self._current_plugin}."
199 )
201 @value.setter
202 def value(self, new_value: PMT) -> None:
203 if not self.can_write: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 m = "set" if new_value is not None else "delete"
205 raise DebputyMetadataAccessError(
206 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
207 f" {self._owning_plugin} as the metadata has not been made"
208 f" read-write to the plugin {self._current_plugin}."
209 )
210 owning_path = self._owning_path
211 if not owning_path.is_read_write: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise DebputyFSIsROError(
213 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
214 )
215 if owning_path.is_detached: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 raise TypeError(
217 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
218 )
219 self._path_metadata_value.value = new_value
221 @property
222 def _is_owner(self) -> bool:
223 return self._owning_plugin == self._current_plugin
225 @property
226 def _owning_plugin(self) -> str:
227 return self._path_metadata_value.owning_plugin
229 @property
230 def _metadata_type_name(self) -> str:
231 return self._path_metadata_value.metadata_type.__name__
234def _cp_a(source: str, dest: str) -> None:
235 cmd = ["cp", "-a", source, dest]
236 try:
237 subprocess.check_call(cmd)
238 except subprocess.CalledProcessError:
239 full_command = escape_shell(*cmd)
240 _error(
241 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
242 f" above to understand what went wrong. The full command was: {full_command}"
243 )
246def _split_path(path: str) -> tuple[bool, bool, list[str]]:
247 must_be_dir = True if path.endswith("/") else False
248 absolute = False
249 if path.startswith("/"):
250 absolute = True
251 path = "." + path
252 path_parts = path.rstrip("/").split("/")
253 if must_be_dir:
254 path_parts.append(".")
255 return absolute, must_be_dir, path_parts
258def _root(path: VP) -> VP:
259 current = path
260 while True:
261 parent = current.parent_dir
262 if parent is None:
263 return current
264 current = parent
267def _check_fs_path_is_file(
268 fs_path: str,
269 unlink_on_error: Optional["VirtualPath"] = None,
270) -> None:
271 had_issue = False
272 try:
273 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
274 st = os.lstat(fs_path)
275 except FileNotFoundError:
276 had_issue = True
277 else:
278 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 had_issue = True
280 if not had_issue: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true
281 return
283 if unlink_on_error:
284 with suppress(FileNotFoundError):
285 os.unlink(fs_path)
286 raise TypeError(
287 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
288 " linked to another file. The entry has been disconnected."
289 )
292class CurrentPluginContextManager:
293 __slots__ = ("_plugin_names",)
295 def __init__(self, initial_plugin_name: str) -> None:
296 self._plugin_names = [initial_plugin_name]
298 @property
299 def current_plugin_name(self) -> str:
300 return self._plugin_names[-1]
302 @contextlib.contextmanager
303 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
304 self._plugin_names.append(new_plugin_name)
305 yield new_plugin_name
306 self._plugin_names.pop()
309class VirtualPathBase(VirtualPath, ABC):
310 __slots__ = ()
312 def _orphan_safe_path(self) -> str:
313 return self.path
315 def _rw_check(self) -> None:
316 if not self.is_read_write:
317 raise DebputyFSIsROError(
318 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
319 " Debputy Virtual File system is R/O."
320 )
322 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
323 match, missing = self.attempt_lookup(path)
324 if missing:
325 return None
326 return match
328 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]:
329 if self.is_detached: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 raise ValueError(
331 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
332 )
333 absolute, must_be_dir, path_parts = _split_path(path)
334 current = _root(self) if absolute else self
335 path_parts.reverse()
336 link_expansions = set()
337 while path_parts:
338 dir_part = path_parts.pop()
339 if dir_part == ".":
340 continue
341 if dir_part == "..":
342 p = current.parent_dir
343 if p is None: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 raise ValueError(f'The path "{path}" escapes the root dir')
345 current = p
346 continue
347 try:
348 current = current[dir_part]
349 except KeyError:
350 path_parts.append(dir_part)
351 path_parts.reverse()
352 if must_be_dir:
353 path_parts.pop()
354 return current, path_parts
355 if current.is_symlink and path_parts:
356 if current.path in link_expansions:
357 # This is our loop detection for now. It might have some false positives where you
358 # could safely resolve the same symlink twice. However, given that this use-case is
359 # basically non-existent in practice for packaging, we just stop here for now.
360 raise SymlinkLoopError(
361 f'The path "{path}" traversed the symlink "{current.path}" multiple'
362 " times. Currently, traversing the same symlink twice is considered"
363 " a loop by `debputy` even if the path would eventually resolve."
364 " Consider filing a feature request if you have a benign case that"
365 " triggers this error."
366 )
367 link_expansions.add(current.path)
368 link_target = current.readlink()
369 link_absolute, _, link_path_parts = _split_path(link_target)
370 if link_absolute:
371 current = _root(current)
372 else:
373 current = assume_not_none(current.parent_dir)
374 link_path_parts.reverse()
375 path_parts.extend(link_path_parts)
376 return current, []
378 def mkdirs(self, path: str) -> "VirtualPath":
379 current: VirtualPath
380 current, missing_parts = self.attempt_lookup(
381 f"{path}/" if not path.endswith("/") else path
382 )
383 if not current.is_dir: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true
384 raise ValueError(
385 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
386 " a directory. However, that path exist AND is a not directory."
387 )
388 for missing_part in missing_parts:
389 assert missing_part not in (".", "..")
390 current = current.mkdir(missing_part)
391 return current
393 def prune_if_empty_dir(self) -> None:
394 """Remove this and all (now) empty parent directories
396 Same as: `rmdir --ignore-fail-on-non-empty --parents`
398 This operation may cause the path (and any of its parent directories) to become "detached"
399 and therefore unsafe to use in further operations.
400 """
401 self._rw_check()
403 if not self.is_dir: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
405 if any(self.iterdir):
406 return
407 parent_dir = assume_not_none(self.parent_dir)
409 # Recursive does not matter; we already know the directory is empty.
410 self.unlink()
412 # Note: The root dir must never be deleted. This works because when delegating it to the root
413 # directory, its implementation of this method is a no-op. If this is later rewritten to an
414 # inline loop (rather than recursion), be sure to preserve this feature.
415 parent_dir.prune_if_empty_dir()
417 def _current_plugin(self) -> str:
418 if self.is_detached: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true
419 raise TypeError("Cannot resolve the current plugin; path is detached")
420 current = self
421 while True:
422 next_parent = current.parent_dir
423 if next_parent is None:
424 break
425 current = next_parent
426 assert current is not None
427 return cast("FSRootDir", current)._current_plugin()
429 @overload
430 def open_child( 430 ↛ exitline 430 didn't return from function 'open_child' because
431 self,
432 name: str,
433 mode: TextOpenMode = ...,
434 buffering: int = -1,
435 ) -> TextIO: ...
437 @overload
438 def open_child( 438 ↛ exitline 438 didn't return from function 'open_child' because
439 self,
440 name: str,
441 mode: BinaryOpenMode = ...,
442 buffering: int = -1,
443 ) -> BinaryIO: ...
445 @contextlib.contextmanager
446 def open_child(
447 self,
448 name: str,
449 mode: OpenMode = "r",
450 buffering: int = -1,
451 ) -> TextIO | BinaryIO:
452 """Open a child path of the current directory in a given mode. Usually used with a context manager
454 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
455 The following symbols are accepted with the same meaning as Python's open:
456 * `r`
457 * `w`
458 * `x`
459 * `a`
460 * `+`
461 * `b`
462 * `t`
464 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
465 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
466 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
467 as-is.
469 :param name: The name of the child path to open. Must be a basename.
470 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
471 examples.
472 :param buffering: Same as open(..., buffering=...) where supported. Notably during
473 testing, the content may be purely in memory and use a BytesIO/StringIO
474 (which does not accept that parameter, but then it is buffered in a different way)
475 :return: The file handle.
476 """
477 existing = self.get(name)
478 if "r" in mode: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 if existing is None:
480 raise ValueError(
481 f"Path {self.path}/{name} does not exist and mode had `r`"
482 )
483 if "+" not in mode:
484 with existing.open(byte_io="b" in mode, buffering=buffering) as fd:
485 yield fd
487 encoding = None if "b" in mode else "utf-8"
489 if existing: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 if "x" in mode:
491 raise ValueError(
492 f"Path {existing.path} already exists and mode had `x`"
493 )
494 with (
495 existing.replace_fs_path_content() as fs_path,
496 open(fs_path, mode, encoding=encoding) as fd,
497 ):
498 yield fd
499 else:
500 assert "r" not in mode
501 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
502 with (
503 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
504 open(new_path.fs_path, mode, encoding=encoding) as fd,
505 ):
506 yield fd
509class FSPath(VirtualPathBase, ABC):
510 __slots__ = (
511 "_basename",
512 "_parent_dir",
513 "_children",
514 "_path_cache",
515 "_parent_path_cache",
516 "_last_known_parent_path",
517 "_mode",
518 "_owner",
519 "_group",
520 "_mtime",
521 "_stat_cache",
522 "_metadata",
523 "__weakref__",
524 )
526 def __init__(
527 self,
528 basename: str,
529 parent: Optional["FSPath"],
530 children: dict[str, "FSPath"] | None = None,
531 initial_mode: int | None = None,
532 mtime: float | None = None,
533 stat_cache: os.stat_result | None = None,
534 ) -> None:
535 self._basename = basename
536 self._path_cache: str | None = None
537 self._parent_path_cache: str | None = None
538 self._children = children
539 self._last_known_parent_path: str | None = None
540 self._mode = initial_mode
541 self._mtime = mtime
542 self._stat_cache = stat_cache
543 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {}
544 self._owner = ROOT_DEFINITION
545 self._group = ROOT_DEFINITION
547 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
548 # is_orphaned, which assumes self._parent_dir is an attribute.
549 self._parent_dir: ReferenceType["FSPath"] | None = None
550 if parent is not None:
551 self.parent_dir = parent
553 def __repr__(self) -> str:
554 return (
555 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
556 f" is_file={self.is_file},"
557 f" is_dir={self.is_dir},"
558 f" is_symlink={self.is_symlink},"
559 f" has_fs_path={self.has_fs_path},"
560 f" children_len={len(self._children) if self._children else 0})"
561 )
563 @property
564 def name(self) -> str:
565 return self._basename
567 @name.setter
568 def name(self, new_name: str) -> None:
569 self._rw_check()
570 if new_name == self._basename: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 return
572 if self.is_detached: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 self._basename = new_name
574 return
575 self._rw_check()
576 parent = self.parent_dir
577 # This little parent_dir dance ensures the parent dir detects the rename properly
578 self.parent_dir = None
579 self._basename = new_name
580 self.parent_dir = parent
582 @property
583 def iterdir(self) -> Iterable["FSPath"]:
584 if self._children is not None:
585 yield from self._children.values()
587 def all_paths(self) -> Iterable["FSPath"]:
588 yield self
589 if not self.is_dir:
590 return
591 by_basename = BY_BASENAME
592 stack = sorted(self.iterdir, key=by_basename, reverse=True)
593 while stack:
594 current = stack.pop()
595 yield current
596 if current.is_dir and not current.is_detached:
597 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True))
599 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]:
600 # FIXME: can this be more "os.walk"-like without making it harder to implement?
601 if not self.is_dir: 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true
602 yield self, []
603 return
604 by_basename = BY_BASENAME
605 stack = [self]
606 while stack:
607 current = stack.pop()
608 children = sorted(current.iterdir, key=by_basename)
609 assert not children or current.is_dir
610 yield current, children
611 # Removing the directory counts as discarding the children.
612 if not current.is_detached: 612 ↛ 606line 612 didn't jump to line 606 because the condition on line 612 was always true
613 stack.extend(reversed(children))
615 def _orphan_safe_path(self) -> str:
616 if not self.is_detached or self._last_known_parent_path is not None: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true
617 return self.path
618 return f"<orphaned>/{self.name}"
620 @property
621 def is_detached(self) -> bool:
622 parent = self._parent_dir
623 if parent is None:
624 return True
625 resolved_parent = parent()
626 if resolved_parent is None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true
627 return True
628 return resolved_parent.is_detached
630 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
631 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
632 # behavior to avoid surprises for now.
633 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
634 # to using it)
635 __iter__ = None
637 def __getitem__(self, key) -> "FSPath":
638 if self._children is None:
639 raise KeyError(
640 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
641 )
642 if isinstance(key, FSPath): 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 key = key.name
644 return self._children[key]
646 def __delitem__(self, key) -> None:
647 self._rw_check()
648 children = self._children
649 if children is None: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true
650 raise KeyError(key)
651 del children[key]
653 def get(self, key: str) -> "Optional[FSPath]":
654 try:
655 return self[key]
656 except KeyError:
657 return None
659 def __contains__(self, item: object) -> bool:
660 if isinstance(item, VirtualPath): 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true
661 return item.parent_dir is self
662 if not isinstance(item, str): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true
663 return False
664 m = self.get(item)
665 return m is not None
667 def _add_child(self, child: "FSPath") -> None:
668 self._rw_check()
669 if not self.is_dir: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true
670 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
671 if self._children is None:
672 self._children = {}
674 conflict_child = self.get(child.name)
675 if conflict_child is not None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 conflict_child.unlink(recursive=True)
677 self._children[child.name] = child
679 @property
680 def tar_path(self) -> str:
681 path = self.path
682 if self.is_dir:
683 return path + "/"
684 return path
686 @property
687 def path(self) -> str:
688 parent_path = self.parent_dir_path
689 if (
690 self._parent_path_cache is not None
691 and self._parent_path_cache == parent_path
692 ):
693 return assume_not_none(self._path_cache)
694 if parent_path is None: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true
695 raise ReferenceError(
696 f"The path {self.name} is detached! {self.__class__.__name__}"
697 )
698 self._parent_path_cache = parent_path
699 ret = os.path.join(parent_path, self.name)
700 self._path_cache = ret
701 return ret
703 @property
704 def parent_dir(self) -> Optional["FSPath"]:
705 p_ref = self._parent_dir
706 p = p_ref() if p_ref is not None else None
707 if p is None: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 raise ReferenceError(
709 f"The path {self.name} is detached! {self.__class__.__name__}"
710 )
711 return p
713 @parent_dir.setter
714 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
715 self._rw_check()
716 if new_parent is not None:
717 if not new_parent.is_dir: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 raise ValueError(
719 f"The parent {new_parent._orphan_safe_path()} must be a directory"
720 )
721 new_parent._rw_check()
722 old_parent = None
723 self._last_known_parent_path = None
724 if not self.is_detached:
725 old_parent = self.parent_dir
726 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
727 del old_parent_children[self.name]
728 if new_parent is not None:
729 self._parent_dir = ref(new_parent)
730 new_parent._add_child(self)
731 else:
732 if old_parent is not None and not old_parent.is_detached: 732 ↛ 734line 732 didn't jump to line 734 because the condition on line 732 was always true
733 self._last_known_parent_path = old_parent.path
734 self._parent_dir = None
735 self._parent_path_cache = None
737 @property
738 def parent_dir_path(self) -> str | None:
739 if self.is_detached: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true
740 return self._last_known_parent_path
741 return assume_not_none(self.parent_dir).path
743 def chown(
744 self,
745 owner: StaticFileSystemOwner | None,
746 group: StaticFileSystemGroup | None,
747 ) -> None:
748 """Change the owner/group of this path
750 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
751 :param group: The desired group definition for this path. If None, then no change of group is performed.
752 """
753 self._rw_check()
755 if owner is not None:
756 self._owner = owner.ownership_definition
757 if group is not None:
758 self._group = group.ownership_definition
760 def stat(self) -> os.stat_result:
761 st = self._stat_cache
762 if st is None:
763 st = self._uncached_stat()
764 self._stat_cache = st
765 return st
767 def _uncached_stat(self) -> os.stat_result:
768 return os.lstat(self.fs_path)
770 @property
771 def mode(self) -> int:
772 current_mode = self._mode
773 if current_mode is None: 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true
774 current_mode = stat.S_IMODE(self.stat().st_mode)
775 self._mode = current_mode
776 return current_mode
778 @mode.setter
779 def mode(self, new_mode: int) -> None:
780 self._rw_check()
781 min_bit = 0o700 if self.is_dir else 0o400
782 if (new_mode & min_bit) != min_bit: 782 ↛ 783line 782 didn't jump to line 783 because the condition on line 782 was never true
783 omode = oct(new_mode)[2:]
784 omin = oct(min_bit)[2:]
785 raise ValueError(
786 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
787 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
788 " There are no paths that do not need these requirements met and they can cause"
789 " problems during build or on the final system."
790 )
791 self._mode = new_mode
793 def _ensure_min_mode(self) -> None:
794 min_bit = 0o700 if self.is_dir else 0o600
795 if self.has_fs_path and (self.mode & 0o600) != 0o600: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true
796 try:
797 fs_path = self.fs_path
798 except TestPathWithNonExistentFSPathError:
799 pass
800 else:
801 st = os.stat(fs_path)
802 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
803 _debug_log(
804 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
805 )
806 os.chmod(fs_path, new_fs_mode)
807 self.mode |= min_bit
809 @property
810 def mtime(self) -> float:
811 mtime = self._mtime
812 if mtime is None:
813 mtime = self.stat().st_mtime
814 self._mtime = mtime
815 return mtime
817 @mtime.setter
818 def mtime(self, new_mtime: float) -> None:
819 self._rw_check()
820 self._mtime = new_mtime
822 @property
823 def tar_owner_info(self) -> tuple[str, int, str, int]:
824 owner = self._owner
825 group = self._group
826 return (
827 owner.entity_name,
828 owner.entity_id,
829 group.entity_name,
830 group.entity_id,
831 )
833 @property
834 def _can_replace_inline(self) -> bool:
835 return False
837 @contextlib.contextmanager
838 def add_file(
839 self,
840 name: str,
841 *,
842 unlink_if_exists: bool = True,
843 use_fs_path_mode: bool = False,
844 mode: int = 0o0644,
845 mtime: float | None = None,
846 # Special-case parameters that are not exposed in the API
847 fs_basename_matters: bool = False,
848 subdir_key: str | None = None,
849 ) -> Iterator["FSPath"]:
850 if "/" in name or name in {".", ".."}: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true
851 raise ValueError(f'Invalid file name: "{name}"')
852 if not self.is_dir: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 raise TypeError(
854 f"Cannot create {self._orphan_safe_path()}/{name}:"
855 f" {self._orphan_safe_path()} is not a directory"
856 )
857 self._rw_check()
858 existing = self.get(name)
859 if existing is not None: 859 ↛ 860line 859 didn't jump to line 860 because the condition on line 859 was never true
860 if not unlink_if_exists:
861 raise ValueError(
862 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
863 f" and exist_ok was False"
864 )
865 existing.unlink(recursive=False)
867 if fs_basename_matters and subdir_key is None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true
868 raise ValueError(
869 "When fs_basename_matters is True, a subdir_key must be provided"
870 )
872 directory = generated_content_dir(subdir_key=subdir_key)
874 if fs_basename_matters: 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true
875 fs_path = os.path.join(directory, name)
876 with open(fs_path, "xb") as _:
877 # Ensure that the fs_path exists
878 pass
879 child = FSBackedFilePath(
880 name,
881 self,
882 fs_path,
883 replaceable_inline=True,
884 mtime=mtime,
885 )
886 yield child
887 else:
888 with tempfile.NamedTemporaryFile(
889 dir=directory, suffix=f"__{name}", delete=False
890 ) as fd:
891 fs_path = fd.name
892 child = FSBackedFilePath(
893 name,
894 self,
895 fs_path,
896 replaceable_inline=True,
897 mtime=mtime,
898 )
899 fd.close()
900 yield child
902 if use_fs_path_mode: 902 ↛ 904line 902 didn't jump to line 904 because the condition on line 902 was never true
903 # Ensure the caller can see the current mode
904 os.chmod(fs_path, mode)
905 _check_fs_path_is_file(fs_path, unlink_on_error=child)
906 child._reset_caches()
907 if not use_fs_path_mode: 907 ↛ exitline 907 didn't return from function 'add_file' because the condition on line 907 was always true
908 child.mode = mode
910 def insert_file_from_fs_path(
911 self,
912 name: str,
913 fs_path: str,
914 *,
915 exist_ok: bool = True,
916 use_fs_path_mode: bool = False,
917 mode: int = 0o0644,
918 require_copy_on_write: bool = True,
919 follow_symlinks: bool = True,
920 reference_path: VirtualPath | None = None,
921 ) -> "FSPath":
922 if "/" in name or name in {".", ".."}: 922 ↛ 923line 922 didn't jump to line 923 because the condition on line 922 was never true
923 raise ValueError(f'Invalid file name: "{name}"')
924 if not self.is_dir: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true
925 raise TypeError(
926 f"Cannot create {self._orphan_safe_path()}/{name}:"
927 f" {self._orphan_safe_path()} is not a directory"
928 )
929 self._rw_check()
930 if name in self and not exist_ok: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true
931 raise ValueError(
932 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
933 f" and exist_ok was False"
934 )
935 new_fs_path = fs_path
936 if follow_symlinks:
937 if reference_path is not None: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 raise ValueError(
939 "The reference_path cannot be used with follow_symlinks"
940 )
941 new_fs_path = os.path.realpath(new_fs_path, strict=True)
943 fmode: int | None = mode
944 if use_fs_path_mode:
945 fmode = None
947 st = None
948 if reference_path is None:
949 st = os.lstat(new_fs_path)
950 if stat.S_ISDIR(st.st_mode): 950 ↛ 951line 950 didn't jump to line 951 because the condition on line 950 was never true
951 raise ValueError(
952 f'The provided path "{fs_path}" is a directory. However, this'
953 " method does not support directories"
954 )
956 if not stat.S_ISREG(st.st_mode): 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true
957 if follow_symlinks:
958 raise ValueError(
959 f"The resolved fs_path ({new_fs_path}) was not a file."
960 )
961 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
962 return FSBackedFilePath(
963 name,
964 self,
965 new_fs_path,
966 initial_mode=fmode,
967 stat_cache=st,
968 replaceable_inline=not require_copy_on_write,
969 reference_path=reference_path,
970 )
972 def add_symlink(
973 self,
974 link_name: str,
975 link_target: str,
976 *,
977 reference_path: VirtualPath | None = None,
978 ) -> "FSPath":
979 if "/" in link_name or link_name in {".", ".."}: 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true
980 raise ValueError(
981 f'Invalid file name: "{link_name}" (it must be a valid basename)'
982 )
983 if not self.is_dir: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true
984 raise TypeError(
985 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
986 f" {self._orphan_safe_path()} is not a directory"
987 )
988 self._rw_check()
990 existing = self.get(link_name)
991 if existing: 991 ↛ 993line 991 didn't jump to line 993 because the condition on line 991 was never true
992 # Emulate ln -sf with attempts a non-recursive unlink first.
993 existing.unlink(recursive=False)
995 return SymlinkVirtualPath(
996 link_name,
997 self,
998 link_target,
999 reference_path=reference_path,
1000 )
1002 def mkdir(
1003 self,
1004 name: str,
1005 *,
1006 reference_path: VirtualPath | None = None,
1007 ) -> "FSPath":
1008 if "/" in name or name in {".", ".."}: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 raise ValueError(
1010 f'Invalid file name: "{name}" (it must be a valid basename)'
1011 )
1012 if not self.is_dir: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true
1013 raise TypeError(
1014 f"Cannot create {self._orphan_safe_path()}/{name}:"
1015 f" {self._orphan_safe_path()} is not a directory"
1016 )
1017 if reference_path is not None and not reference_path.is_dir: 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true
1018 raise ValueError(
1019 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
1020 )
1021 self._rw_check()
1023 existing = self.get(name)
1024 if existing: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true
1025 raise ValueError(f"Path {existing.path} already exist")
1026 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1028 def mkdirs(self, path: str) -> "FSPath":
1029 return cast("FSPath", super().mkdirs(path))
1031 @property
1032 def is_read_write(self) -> bool:
1033 """When true, the file system entry may be mutated
1035 :return: Whether file system mutations are permitted.
1036 """
1037 if self.is_detached:
1038 return True
1039 return assume_not_none(self.parent_dir).is_read_write
1041 def unlink(self, *, recursive: bool = False) -> None:
1042 """Unlink a file or a directory
1044 This operation will detach the path from the file system (causing "is_detached" to return True).
1046 Note that the root directory cannot be deleted.
1048 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1049 as well. When False, an error is raised if the path is a non-empty directory
1050 """
1051 if self.is_detached: 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true
1052 return
1053 if not recursive and any(self.iterdir): 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true
1054 raise ValueError(
1055 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1056 )
1057 # The .parent_dir setter does a _rw_check() for us.
1058 self.parent_dir = None
1060 def _reset_caches(self) -> None:
1061 self._mtime = None
1062 self._stat_cache = None
1064 def metadata(
1065 self,
1066 metadata_type: type[PMT],
1067 *,
1068 owning_plugin: str | None = None,
1069 ) -> PathMetadataReference[PMT]:
1070 current_plugin = self._current_plugin()
1071 if owning_plugin is None: 1071 ↛ 1073line 1071 didn't jump to line 1073 because the condition on line 1071 was always true
1072 owning_plugin = current_plugin
1073 metadata_key = (owning_plugin, metadata_type)
1074 metadata_value = self._metadata.get(metadata_key)
1075 if metadata_value is None:
1076 if self.is_detached: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true
1077 raise TypeError(
1078 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1079 )
1080 if not self.is_read_write:
1081 return AlwaysEmptyReadOnlyMetadataReference(
1082 owning_plugin,
1083 current_plugin,
1084 metadata_type,
1085 )
1086 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1087 self._metadata[metadata_key] = metadata_value
1088 return PathMetadataReferenceImplementation(
1089 self,
1090 current_plugin,
1091 metadata_value,
1092 )
1094 @contextlib.contextmanager
1095 def replace_fs_path_content(
1096 self,
1097 *,
1098 use_fs_path_mode: bool = False,
1099 ) -> Iterator[str]:
1100 if not self.is_file: 1100 ↛ 1101line 1100 didn't jump to line 1101 because the condition on line 1100 was never true
1101 raise TypeError(
1102 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1103 )
1104 self._rw_check()
1105 fs_path = self.fs_path
1106 if not self._can_replace_inline: 1106 ↛ 1118line 1106 didn't jump to line 1118 because the condition on line 1106 was always true
1107 fs_path = self.fs_path
1108 directory = generated_content_dir()
1109 with tempfile.NamedTemporaryFile(
1110 dir=directory, suffix=f"__{self.name}", delete=False
1111 ) as new_path_fd:
1112 new_path_fd.close()
1113 _cp_a(fs_path, new_path_fd.name)
1114 fs_path = new_path_fd.name
1115 self._replaced_path(fs_path)
1116 assert self.fs_path == fs_path
1118 current_mtime = self._mtime
1119 if current_mtime is not None:
1120 os.utime(fs_path, (current_mtime, current_mtime))
1122 current_mode = self.mode
1123 yield fs_path
1124 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1125 if not use_fs_path_mode: 1125 ↛ 1127line 1125 didn't jump to line 1127 because the condition on line 1125 was always true
1126 os.chmod(fs_path, current_mode)
1127 self._reset_caches()
1129 def _replaced_path(self, new_fs_path: str) -> None:
1130 raise NotImplementedError
1133class VirtualFSPathBase(FSPath, ABC):
1134 __slots__ = ()
1136 def __init__(
1137 self,
1138 basename: str,
1139 parent: Optional["FSPath"],
1140 children: dict[str, "FSPath"] | None = None,
1141 initial_mode: int | None = None,
1142 mtime: float | None = None,
1143 stat_cache: os.stat_result | None = None,
1144 ) -> None:
1145 super().__init__(
1146 basename,
1147 parent,
1148 children,
1149 initial_mode=initial_mode,
1150 mtime=mtime,
1151 stat_cache=stat_cache,
1152 )
1154 @property
1155 def mtime(self) -> float:
1156 mtime = self._mtime
1157 if mtime is None:
1158 mtime = time.time()
1159 self._mtime = mtime
1160 return mtime
1162 @property
1163 def has_fs_path(self) -> bool:
1164 return False
1166 def stat(self) -> os.stat_result:
1167 if not self.has_fs_path:
1168 raise PureVirtualPathError(
1169 "stat() is only applicable to paths backed by the file system. The path"
1170 f" {self._orphan_safe_path()!r} is purely virtual"
1171 )
1172 return super().stat()
1174 @property
1175 def fs_path(self) -> str:
1176 if not self.has_fs_path:
1177 raise PureVirtualPathError(
1178 "fs_path is only applicable to paths backed by the file system. The path"
1179 f" {self._orphan_safe_path()!r} is purely virtual"
1180 )
1181 return self.fs_path
1184class FSRootDir(FSPath):
1185 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1187 def __init__(self, fs_path: str | None = None) -> None:
1188 self._fs_path = fs_path
1189 self._fs_read_write = True
1190 super().__init__(
1191 ".",
1192 None,
1193 children={},
1194 initial_mode=0o755,
1195 )
1196 self._plugin_context = CurrentPluginContextManager("debputy")
1198 @property
1199 def is_detached(self) -> bool:
1200 return False
1202 def _orphan_safe_path(self) -> str:
1203 return self.name
1205 @property
1206 def path(self) -> str:
1207 return self.name
1209 @property
1210 def parent_dir(self) -> Optional["FSPath"]:
1211 return None
1213 @parent_dir.setter
1214 def parent_dir(self, new_parent: FSPath | None) -> None:
1215 if new_parent is not None:
1216 raise ValueError("The root directory cannot become a non-root directory")
1218 @property
1219 def parent_dir_path(self) -> str | None:
1220 return None
1222 @property
1223 def is_dir(self) -> bool:
1224 return True
1226 @property
1227 def is_file(self) -> bool:
1228 return False
1230 @property
1231 def is_symlink(self) -> bool:
1232 return False
1234 def readlink(self) -> str:
1235 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1237 @property
1238 def has_fs_path(self) -> bool:
1239 return self._fs_path is not None
1241 def stat(self) -> os.stat_result:
1242 if not self.has_fs_path:
1243 raise PureVirtualPathError(
1244 "stat() is only applicable to paths backed by the file system. The path"
1245 f" {self._orphan_safe_path()!r} is purely virtual"
1246 )
1247 return os.stat(self.fs_path)
1249 @property
1250 def fs_path(self) -> str:
1251 if not self.has_fs_path: 1251 ↛ 1252line 1251 didn't jump to line 1252 because the condition on line 1251 was never true
1252 raise PureVirtualPathError(
1253 "fs_path is only applicable to paths backed by the file system. The path"
1254 f" {self._orphan_safe_path()!r} is purely virtual"
1255 )
1256 return assume_not_none(self._fs_path)
1258 @property
1259 def is_read_write(self) -> bool:
1260 return self._fs_read_write
1262 @is_read_write.setter
1263 def is_read_write(self, new_value: bool) -> None:
1264 self._fs_read_write = new_value
1266 def prune_if_empty_dir(self) -> None:
1267 # No-op for the root directory. There is never a case where you want to delete this directory
1268 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1269 return
1271 def unlink(self, *, recursive: bool = False) -> None:
1272 # There is never a case where you want to delete this directory (and even if you could,
1273 # debputy will need it for technical reasons, so the root dir stays)
1274 raise TypeError("Cannot delete the root directory")
1276 def _current_plugin(self) -> str:
1277 return self._plugin_context.current_plugin_name
1279 @contextlib.contextmanager
1280 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1281 with self._plugin_context.change_plugin_context(new_plugin) as r:
1282 yield r
1285class VirtualPathWithReference(VirtualFSPathBase, ABC):
1286 __slots__ = ("_reference_path",)
1288 def __init__(
1289 self,
1290 basename: str,
1291 parent: FSPath,
1292 *,
1293 default_mode: int,
1294 reference_path: VirtualPath | None = None,
1295 ) -> None:
1296 super().__init__(
1297 basename,
1298 parent=parent,
1299 initial_mode=reference_path.mode if reference_path else default_mode,
1300 )
1301 self._reference_path = reference_path
1303 @property
1304 def has_fs_path(self) -> bool:
1305 ref_path = self._reference_path
1306 return ref_path is not None and ref_path.has_fs_path
1308 @property
1309 def mtime(self) -> float:
1310 mtime = self._mtime
1311 if mtime is None: 1311 ↛ 1318line 1311 didn't jump to line 1318 because the condition on line 1311 was always true
1312 ref_path = self._reference_path
1313 if ref_path: 1313 ↛ 1316line 1313 didn't jump to line 1316 because the condition on line 1313 was always true
1314 mtime = ref_path.mtime
1315 else:
1316 mtime = super().mtime
1317 self._mtime = mtime
1318 return mtime
1320 @mtime.setter
1321 def mtime(self, new_mtime: float) -> None:
1322 self._rw_check()
1323 self._mtime = new_mtime
1325 @property
1326 def fs_path(self) -> str:
1327 ref_path = self._reference_path
1328 if ref_path is not None and ( 1328 ↛ 1332line 1328 didn't jump to line 1332 because the condition on line 1328 was always true
1329 not super().has_fs_path or super().fs_path == ref_path.fs_path
1330 ):
1331 return ref_path.fs_path
1332 return super().fs_path
1334 def stat(self) -> os.stat_result:
1335 ref_path = self._reference_path
1336 if ref_path is not None and (
1337 not super().has_fs_path or super().fs_path == ref_path.fs_path
1338 ):
1339 return ref_path.stat()
1340 return super().stat()
1342 def open(
1343 self,
1344 *,
1345 byte_io: bool = False,
1346 buffering: int = -1,
1347 ) -> TextIO | BinaryIO:
1348 reference_path = self._reference_path
1349 if reference_path is not None and reference_path.fs_path == self.fs_path:
1350 return reference_path.open(byte_io=byte_io, buffering=buffering)
1351 return super().open(byte_io=byte_io, buffering=buffering)
1354class VirtualDirectoryFSPath(VirtualPathWithReference):
1355 __slots__ = ("_reference_path",)
1357 def __init__(
1358 self,
1359 basename: str,
1360 parent: FSPath,
1361 *,
1362 reference_path: VirtualPath | None = None,
1363 ) -> None:
1364 super().__init__(
1365 basename,
1366 parent,
1367 reference_path=reference_path,
1368 default_mode=0o755,
1369 )
1370 self._reference_path = reference_path
1371 assert reference_path is None or reference_path.is_dir
1372 self._ensure_min_mode()
1374 @property
1375 def is_dir(self) -> bool:
1376 return True
1378 @property
1379 def is_file(self) -> bool:
1380 return False
1382 @property
1383 def is_symlink(self) -> bool:
1384 return False
1386 def readlink(self) -> str:
1387 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1390class SymlinkVirtualPath(VirtualPathWithReference):
1391 __slots__ = ("_link_target",)
1393 def __init__(
1394 self,
1395 basename: str,
1396 parent_dir: FSPath,
1397 link_target: str,
1398 *,
1399 reference_path: VirtualPath | None = None,
1400 ) -> None:
1401 super().__init__(
1402 basename,
1403 parent=parent_dir,
1404 default_mode=_SYMLINK_MODE,
1405 reference_path=reference_path,
1406 )
1407 self._link_target = link_target
1409 @property
1410 def is_dir(self) -> bool:
1411 return False
1413 @property
1414 def is_file(self) -> bool:
1415 return False
1417 @property
1418 def is_symlink(self) -> bool:
1419 return True
1421 def readlink(self) -> str:
1422 return self._link_target
1424 @property
1425 def size(self) -> int:
1426 return len(self.readlink())
1429class FSBackedFilePath(VirtualPathWithReference):
1430 __slots__ = ("_fs_path", "_replaceable_inline")
1432 def __init__(
1433 self,
1434 basename: str,
1435 parent_dir: FSPath,
1436 fs_path: str,
1437 *,
1438 replaceable_inline: bool = False,
1439 initial_mode: int | None = None,
1440 mtime: float | None = None,
1441 stat_cache: os.stat_result | None = None,
1442 reference_path: VirtualPath | None = None,
1443 ) -> None:
1444 super().__init__(
1445 basename,
1446 parent_dir,
1447 default_mode=0o644,
1448 reference_path=reference_path,
1449 )
1450 self._fs_path = fs_path
1451 self._replaceable_inline = replaceable_inline
1452 if initial_mode is not None:
1453 self.mode = initial_mode
1454 if mtime is not None:
1455 self._mtime = mtime
1456 self._stat_cache = stat_cache
1457 assert (
1458 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1459 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1460 self._ensure_min_mode()
1462 @property
1463 def is_dir(self) -> bool:
1464 return False
1466 @property
1467 def is_file(self) -> bool:
1468 return True
1470 @property
1471 def is_symlink(self) -> bool:
1472 return False
1474 def readlink(self) -> str:
1475 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1477 @property
1478 def has_fs_path(self) -> bool:
1479 return True
1481 @property
1482 def fs_path(self) -> str:
1483 return self._fs_path
1485 @property
1486 def _can_replace_inline(self) -> bool:
1487 return self._replaceable_inline
1489 def _replaced_path(self, new_fs_path: str) -> None:
1490 self._fs_path = new_fs_path
1491 self._reference_path = None
1492 self._replaceable_inline = True
1495_SYMLINK_MODE = 0o777
1498class VirtualTestPath(FSPath):
1499 __slots__ = (
1500 "_path_type",
1501 "_has_fs_path",
1502 "_fs_path",
1503 "_link_target",
1504 "_content",
1505 "_materialized_content",
1506 )
1508 def __init__(
1509 self,
1510 basename: str,
1511 parent_dir: FSPath | None,
1512 mode: int | None = None,
1513 mtime: float | None = None,
1514 is_dir: bool = False,
1515 has_fs_path: bool | None = False,
1516 fs_path: str | None = None,
1517 link_target: str | None = None,
1518 content: str | None = None,
1519 materialized_content: str | None = None,
1520 ) -> None:
1521 if is_dir:
1522 self._path_type = PathType.DIRECTORY
1523 elif link_target is not None:
1524 self._path_type = PathType.SYMLINK
1525 if mode is not None and mode != _SYMLINK_MODE: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true
1526 raise ValueError(
1527 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1528 )
1529 assert mode is None or mode == _SYMLINK_MODE
1530 else:
1531 self._path_type = PathType.FILE
1533 if mode is not None:
1534 initial_mode = mode
1535 else:
1536 initial_mode = 0o755 if is_dir else 0o644
1538 self._link_target = link_target
1539 if has_fs_path is None:
1540 has_fs_path = bool(fs_path)
1541 self._has_fs_path = has_fs_path
1542 self._fs_path = fs_path
1543 self._materialized_content = materialized_content
1544 super().__init__(
1545 basename,
1546 parent=parent_dir,
1547 initial_mode=initial_mode,
1548 mtime=mtime,
1549 )
1550 self._content = content
1552 @property
1553 def is_dir(self) -> bool:
1554 return self._path_type == PathType.DIRECTORY
1556 @property
1557 def is_file(self) -> bool:
1558 return self._path_type == PathType.FILE
1560 @property
1561 def is_symlink(self) -> bool:
1562 return self._path_type == PathType.SYMLINK
1564 def readlink(self) -> str:
1565 if not self.is_symlink: 1565 ↛ 1566line 1565 didn't jump to line 1566 because the condition on line 1565 was never true
1566 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1567 link_target = self._link_target
1568 assert link_target is not None
1569 return link_target
1571 @property
1572 def mtime(self) -> float:
1573 if self._mtime is None:
1574 self._mtime = time.time()
1575 return self._mtime
1577 @mtime.setter
1578 def mtime(self, new_mtime: float) -> None:
1579 self._rw_check()
1580 self._mtime = new_mtime
1582 @property
1583 def has_fs_path(self) -> bool:
1584 return self._has_fs_path
1586 def stat(self) -> os.stat_result:
1587 if self.has_fs_path:
1588 path = self.fs_path
1589 if path is None: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true
1590 raise PureVirtualPathError(
1591 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1592 " cannot provide!"
1593 )
1594 try:
1595 return os.stat(path)
1596 except FileNotFoundError as e:
1597 raise PureVirtualPathError(
1598 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1599 " cannot provide! (An fs_path was provided, but it did not exist)"
1600 ) from e
1602 raise PureVirtualPathError(
1603 "stat() is only applicable to paths backed by the file system. The path"
1604 f" {self._orphan_safe_path()!r} is purely virtual"
1605 )
1607 @property
1608 def size(self) -> int:
1609 if self._content is not None:
1610 return len(self._content.encode("utf-8"))
1611 if self.is_symlink:
1612 return len(self.readlink())
1613 if not self.has_fs_path or self.fs_path is None:
1614 return 0
1615 return self.stat().st_size
1617 @property
1618 def fs_path(self) -> str:
1619 if self.has_fs_path:
1620 if self._fs_path is None and self._materialized_content is not None:
1621 with tempfile.NamedTemporaryFile(
1622 mode="w+t",
1623 encoding="utf-8",
1624 suffix=f"__{self.name}",
1625 delete=False,
1626 ) as fd:
1627 filepath = fd.name
1628 fd.write(self._materialized_content)
1629 self._fs_path = filepath
1630 atexit.register(lambda: os.unlink(filepath))
1632 path = self._fs_path
1633 if path is None: 1633 ↛ 1634line 1633 didn't jump to line 1634 because the condition on line 1633 was never true
1634 raise PureVirtualPathError(
1635 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1636 " mock path cannot provide!"
1637 )
1638 return path
1639 raise PureVirtualPathError(
1640 "fs_path is only applicable to paths backed by the file system. The path"
1641 f" {self._orphan_safe_path()!r} is purely virtual"
1642 )
1644 def replace_fs_path_content(
1645 self,
1646 *,
1647 use_fs_path_mode: bool = False,
1648 ) -> ContextManager[str]:
1649 if self._content is not None: 1649 ↛ 1650line 1649 didn't jump to line 1650 because the condition on line 1649 was never true
1650 raise TypeError(
1651 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1652 " created with `content` but for this method to work, the path should have been"
1653 " created with `materialized_content`"
1654 )
1655 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1657 @contextlib.contextmanager
1658 def open_child(
1659 self,
1660 name: str,
1661 mode: OpenMode = "r",
1662 buffering: int = -1,
1663 ) -> TextIO | BinaryIO:
1664 existing = self.get(name)
1665 if existing or "r" in mode:
1666 with super().open_child(name, mode, buffering=buffering) as fd:
1667 yield fd
1668 return
1669 if "b" in mode:
1670 fd = io.BytesIO(b"")
1671 yield fd
1672 content = fd.getvalue().decode("utf-8")
1673 else:
1674 fd = io.StringIO("")
1675 yield fd
1676 content = fd.getvalue()
1677 VirtualTestPath(
1678 name,
1679 self,
1680 mode=0o644,
1681 content=content,
1682 has_fs_path=True,
1683 )
1685 def open(
1686 self,
1687 *,
1688 byte_io: bool = False,
1689 buffering: int = -1,
1690 ) -> TextIO | BinaryIO:
1691 if self._content is None:
1692 try:
1693 return super().open(byte_io=byte_io, buffering=buffering)
1694 except FileNotFoundError as e:
1695 raise TestPathWithNonExistentFSPathError(
1696 f"The test path {self.path} had an fs_path {self._fs_path}, which does not"
1697 " exist. This exception can only occur in the testsuite. Either have the"
1698 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1699 " if that is too painful in general, have the code accept this error as a "
1700 " test only-case and provide a default."
1701 ) from e
1703 if byte_io:
1704 return io.BytesIO(self._content.encode("utf-8"))
1705 return io.StringIO(self._content)
1707 def _replaced_path(self, new_fs_path: str) -> None:
1708 self._fs_path = new_fs_path
1711class FSOverlayBase(VirtualPathBase, Generic[FSP]):
1712 __slots__ = (
1713 "_path",
1714 "_fs_path",
1715 "_parent",
1716 "__weakref__",
1717 )
1719 def __init__(
1720 self,
1721 path: str,
1722 fs_path: str,
1723 parent: FSP | None,
1724 ) -> None:
1725 self._path: str = path
1726 prefix = "/" if fs_path.startswith("/") else ""
1727 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1728 self._parent: ReferenceType[FSP] | None = (
1729 ref(parent) if parent is not None else None
1730 )
1732 @property
1733 def name(self) -> str:
1734 return os.path.basename(self._path)
1736 @property
1737 def path(self) -> str:
1738 return self._path
1740 @property
1741 def parent_dir(self) -> Optional["FSP"]:
1742 parent = self._parent
1743 if parent is None:
1744 return None
1745 resolved = parent()
1746 if resolved is None:
1747 raise RuntimeError("Parent was garbage collected!")
1748 return resolved
1750 @property
1751 def fs_path(self) -> str:
1752 return self._fs_path
1754 def stat(self) -> os.stat_result:
1755 return os.lstat(self.fs_path)
1757 @property
1758 def is_dir(self) -> bool:
1759 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1760 try:
1761 return stat.S_ISDIR(self.stat().st_mode)
1762 except FileNotFoundError:
1763 return False
1764 except NotImplementedError:
1765 print(self.__class__)
1767 @property
1768 def is_file(self) -> bool:
1769 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1770 try:
1771 return stat.S_ISREG(self.stat().st_mode)
1772 except FileNotFoundError:
1773 return False
1775 @property
1776 def is_symlink(self) -> bool:
1777 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1778 try:
1779 return stat.S_ISLNK(self.stat().st_mode)
1780 except FileNotFoundError:
1781 return False
1783 @property
1784 def has_fs_path(self) -> bool:
1785 return True
1787 def open(
1788 self,
1789 *,
1790 byte_io: bool = False,
1791 buffering: int = -1,
1792 ) -> TextIO | BinaryIO:
1793 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1794 # case.
1795 if not self.is_file and not self.is_symlink:
1796 raise TypeError(
1797 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1798 )
1800 if byte_io:
1801 return open(self.fs_path, "rb", buffering=buffering)
1802 return open(self.fs_path, encoding="utf-8", buffering=buffering)
1804 def metadata(
1805 self,
1806 metadata_type: type[PMT],
1807 *,
1808 owning_plugin: str | None = None,
1809 ) -> PathMetadataReference[PMT]:
1810 current_plugin = self._current_plugin()
1811 if owning_plugin is None:
1812 owning_plugin = current_plugin
1813 return AlwaysEmptyReadOnlyMetadataReference(
1814 owning_plugin,
1815 current_plugin,
1816 metadata_type,
1817 )
1819 def all_paths(self) -> Iterable["FSControlPath"]:
1820 yield self
1821 if not self.is_dir:
1822 return
1823 stack = list(self.iterdir)
1824 stack.reverse()
1825 while stack:
1826 current = stack.pop()
1827 yield current
1828 if current.is_dir:
1829 stack.extend(reversed(list(current.iterdir)))
1831 def _resolve_children(
1832 self, new_child: Callable[[str, str, FSP], FSC]
1833 ) -> Mapping[str, FSC]:
1834 if not self.is_dir:
1835 return {}
1836 dir_path = self.path
1837 dir_fs_path = self.fs_path
1838 children = {}
1839 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1840 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1841 child_fs_path = (
1842 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1843 )
1844 children[name] = new_child(
1845 child_path,
1846 child_fs_path,
1847 self,
1848 )
1849 return children
1852class FSROOverlay(FSOverlayBase["FSROOverlay"]):
1853 __slots__ = (
1854 "_stat_cache",
1855 "_readlink_cache",
1856 "_children",
1857 "_stat_failed_cache",
1858 )
1860 def __init__(
1861 self,
1862 path: str,
1863 fs_path: str,
1864 parent: Optional["FSROOverlay"],
1865 ) -> None:
1866 super().__init__(path, fs_path, parent=parent)
1867 self._stat_cache: os.stat_result | None = None
1868 self._readlink_cache: str | None = None
1869 self._stat_failed_cache = False
1870 self._children: Mapping[str, FSROOverlay] | None = None
1872 @classmethod
1873 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay":
1874 return FSROOverlay(path, fs_path, None)
1876 @property
1877 def iterdir(self) -> Iterable["FSROOverlay"]:
1878 if not self.is_dir:
1879 return
1880 if self._children is None:
1881 self._ensure_children_are_resolved()
1882 yield from assume_not_none(self._children).values()
1884 def lookup(self, path: str) -> Optional["FSROOverlay"]:
1885 if not self.is_dir:
1886 return None
1887 if self._children is None:
1888 self._ensure_children_are_resolved()
1890 absolute, _, path_parts = _split_path(path)
1891 current = cast("FSROOverlay", _root(self)) if absolute else self
1892 for no, dir_part in enumerate(path_parts):
1893 if dir_part == ".":
1894 continue
1895 if dir_part == "..":
1896 p = current.parent_dir
1897 if p is None:
1898 raise ValueError(f'The path "{path}" escapes the root dir')
1899 current = cast("FSROOverlay", p)
1900 continue
1901 try:
1902 current = current[dir_part]
1903 except KeyError:
1904 return None
1905 return current
1907 def _ensure_children_are_resolved(self) -> None:
1908 if not self.is_dir or self._children:
1909 return
1910 self._children = self._resolve_children(
1911 lambda n, fsp, p: FSROOverlay(n, fsp, p)
1912 )
1914 @property
1915 def is_detached(self) -> bool:
1916 return False
1918 def __getitem__(self, key) -> "VirtualPath":
1919 if not self.is_dir: 1919 ↛ 1921line 1919 didn't jump to line 1921 because the condition on line 1919 was always true
1920 raise KeyError(key)
1921 if self._children is None:
1922 self._ensure_children_are_resolved()
1923 if isinstance(key, FSPath):
1924 key = key.name
1925 return self._children[key]
1927 def __delitem__(self, key) -> None:
1928 self._error_ro_fs()
1930 @property
1931 def is_read_write(self) -> bool:
1932 return False
1934 def _rw_check(self) -> None:
1935 self._error_ro_fs()
1937 def _error_ro_fs(self) -> NoReturn:
1938 raise DebputyFSIsROError(
1939 f'Attempt to write to "{self.path}" failed:'
1940 " Debputy Virtual File system is R/O."
1941 )
1943 def stat(self) -> os.stat_result:
1944 if self._stat_failed_cache: 1944 ↛ 1945line 1944 didn't jump to line 1945 because the condition on line 1944 was never true
1945 raise FileNotFoundError(
1946 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
1947 )
1949 if self._stat_cache is None: 1949 ↛ 1955line 1949 didn't jump to line 1955 because the condition on line 1949 was always true
1950 try:
1951 self._stat_cache = os.lstat(self.fs_path)
1952 except FileNotFoundError:
1953 self._stat_failed_cache = True
1954 raise
1955 return self._stat_cache
1957 @property
1958 def mode(self) -> int:
1959 return stat.S_IMODE(self.stat().st_mode)
1961 @mode.setter
1962 def mode(self, _unused: int) -> None:
1963 self._error_ro_fs()
1965 @property
1966 def mtime(self) -> float:
1967 return self.stat().st_mtime
1969 @mtime.setter
1970 def mtime(self, new_mtime: float) -> None:
1971 self._error_ro_fs()
1973 def readlink(self) -> str:
1974 if not self.is_symlink:
1975 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1976 if self._readlink_cache is None:
1977 self._readlink_cache = os.readlink(self.fs_path)
1978 return self._readlink_cache
1980 def chown(
1981 self,
1982 owner: StaticFileSystemOwner | None,
1983 group: StaticFileSystemGroup | None,
1984 ) -> None:
1985 self._error_ro_fs()
1987 def mkdir(self, name: str) -> "VirtualPath":
1988 self._error_ro_fs()
1990 def add_file(
1991 self,
1992 name: str,
1993 *,
1994 unlink_if_exists: bool = True,
1995 use_fs_path_mode: bool = False,
1996 mode: int = 0o0644,
1997 mtime: float | None = None,
1998 ) -> ContextManager["VirtualPath"]:
1999 self._error_ro_fs()
2001 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2002 self._error_ro_fs()
2004 def unlink(self, *, recursive: bool = False) -> None:
2005 self._error_ro_fs()
2008class FSROOverlayRootDir(FSROOverlay):
2009 __slots__ = ("_plugin_context",)
2011 def __init__(self, path: str, fs_path: str) -> None:
2012 super().__init__(path, fs_path, None)
2013 self._plugin_context = CurrentPluginContextManager("debputy")
2015 def _current_plugin(self) -> str:
2016 return self._plugin_context.current_plugin_name
2018 @contextlib.contextmanager
2019 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2020 with self._plugin_context.change_plugin_context(new_plugin) as r:
2021 yield r
2024class FSControlPath(FSOverlayBase["FSControlPath"]):
2026 @property
2027 def iterdir(self) -> Iterable["FSControlPath"]:
2028 if not self.is_dir:
2029 return
2030 yield from self._resolve_children(
2031 lambda n, fsp, p: FSControlPath(n, fsp, p)
2032 ).values()
2034 def lookup(self, path: str) -> Optional["FSControlPath"]:
2035 if not self.is_dir:
2036 return None
2038 absolute, _, path_parts = _split_path(path)
2039 current = cast("FSControlPath", _root(self)) if absolute else self
2040 for no, dir_part in enumerate(path_parts):
2041 if dir_part == ".":
2042 continue
2043 if dir_part == "..":
2044 p = current.parent_dir
2045 if p is None:
2046 raise ValueError(f'The path "{path}" escapes the root dir')
2047 current = cast("FSControlPath", p)
2048 continue
2049 try:
2050 current = current[dir_part]
2051 except KeyError:
2052 return None
2053 return current
2055 @property
2056 def is_detached(self) -> bool:
2057 try:
2058 self.stat()
2059 except FileNotFoundError:
2060 return True
2061 else:
2062 return False
2064 def __getitem__(self, key) -> "VirtualPath":
2065 if not self.is_dir:
2066 raise KeyError(key)
2067 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p))
2068 if isinstance(key, FSPath):
2069 key = key.name
2070 return children[key]
2072 def __delitem__(self, key) -> None:
2073 self[key].unlink()
2075 @property
2076 def is_read_write(self) -> bool:
2077 return True
2079 @property
2080 def mode(self) -> int:
2081 return stat.S_IMODE(self.stat().st_mode)
2083 @mode.setter
2084 def mode(self, new_mode: int) -> None:
2085 os.chmod(self.fs_path, new_mode)
2087 @property
2088 def mtime(self) -> float:
2089 return self.stat().st_mtime
2091 @mtime.setter
2092 def mtime(self, new_mtime: float) -> None:
2093 os.utime(self.fs_path, (new_mtime, new_mtime))
2095 def readlink(self) -> str:
2096 if not self.is_symlink:
2097 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2098 assert False
2100 def chown(
2101 self,
2102 owner: StaticFileSystemOwner | None,
2103 group: StaticFileSystemGroup | None,
2104 ) -> None:
2105 raise ValueError(
2106 "No need to chown paths in the control.tar: They are always root:root"
2107 )
2109 def mkdir(self, name: str) -> "VirtualPath":
2110 raise TypeError("The control.tar never contains subdirectories.")
2112 @contextlib.contextmanager
2113 def add_file(
2114 self,
2115 name: str,
2116 *,
2117 unlink_if_exists: bool = True,
2118 use_fs_path_mode: bool = False,
2119 mode: int = 0o0644,
2120 mtime: float | None = None,
2121 ) -> ContextManager["VirtualPath"]:
2122 if "/" in name or name in {".", ".."}:
2123 raise ValueError(f'Invalid file name: "{name}"')
2124 if not self.is_dir:
2125 raise TypeError(
2126 f"Cannot create {self._orphan_safe_path()}/{name}:"
2127 f" {self._orphan_safe_path()} is not a directory"
2128 )
2129 self._rw_check()
2130 existing = self.get(name)
2131 if existing is not None:
2132 if not unlink_if_exists:
2133 raise ValueError(
2134 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2135 f" and exist_ok was False"
2136 )
2137 assert existing.is_file
2139 fs_path = os.path.join(self.fs_path, name)
2140 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2141 with open(fs_path, "wb") as fd:
2142 # Ensure that the fs_path exists and default mode is reasonable
2143 os.chmod(fd.fileno(), mode)
2144 child = FSControlPath(
2145 name,
2146 fs_path,
2147 self,
2148 )
2149 yield child
2150 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2151 child.mode = mode
2153 @contextlib.contextmanager
2154 def replace_fs_path_content(
2155 self,
2156 *,
2157 use_fs_path_mode: bool = False,
2158 ) -> ContextManager[str]:
2159 if not self.is_file:
2160 raise TypeError(
2161 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2162 )
2163 restore_mode = self.mode if use_fs_path_mode else None
2164 yield self.fs_path
2165 _check_fs_path_is_file(self.fs_path, self)
2166 if restore_mode is not None:
2167 self.mode = restore_mode
2169 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2170 raise TypeError("The control.tar never contains symlinks.")
2172 def unlink(self, *, recursive: bool = False) -> None:
2173 if self._parent is None:
2174 return
2175 # By virtue of the control FS only containing paths, we can assume `recursive` never
2176 # matters and that `os.unlink` will be sufficient.
2177 assert self.is_file
2178 os.unlink(self.fs_path)
2181class FSControlRootDir(FSControlPath):
2183 @classmethod
2184 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2185 return FSControlRootDir(".", fs_path, None)
2187 def insert_file_from_fs_path(
2188 self,
2189 name: str,
2190 fs_path: str,
2191 *,
2192 exist_ok: bool = True,
2193 use_fs_path_mode: bool = False,
2194 mode: int = 0o0644,
2195 # Ignored, but accepted for compat with FSPath's variant of this function.
2196 # - This is used by install_or_generate_conffiles.
2197 reference_path: VirtualPath | None = None, # noqa
2198 ) -> "FSControlPath":
2199 if "/" in name or name in {".", ".."}:
2200 raise ValueError(f'Invalid file name: "{name}"')
2201 if not self.is_dir:
2202 raise TypeError(
2203 f"Cannot create {self._orphan_safe_path()}/{name}:"
2204 f" {self._orphan_safe_path()} is not a directory"
2205 )
2206 self._rw_check()
2207 if name in self and not exist_ok:
2208 raise ValueError(
2209 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2210 f" and exist_ok was False"
2211 )
2213 target_path = os.path.join(self.fs_path, name)
2214 if use_fs_path_mode:
2215 shutil.copymode(
2216 fs_path,
2217 target_path,
2218 follow_symlinks=True,
2219 )
2220 else:
2221 shutil.copyfile(
2222 fs_path,
2223 target_path,
2224 follow_symlinks=True,
2225 )
2226 os.chmod(target_path, mode)
2227 return cast("FSControlPath", self[name])
2230def as_path_def(pd: str | PathDef) -> PathDef:
2231 return PathDef(pd) if isinstance(pd, str) else pd
2234def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]:
2235 yield from (as_path_def(p) for p in paths)
2238def build_virtual_fs(
2239 paths: Iterable[str | PathDef],
2240 read_write_fs: bool = False,
2241) -> "FSPath":
2242 root_dir: FSRootDir | None = None
2243 directories: dict[str, FSPath] = {}
2244 non_directories = set()
2246 def _ensure_parent_dirs(p: str) -> None:
2247 current = p.rstrip("/")
2248 missing_dirs = []
2249 while True:
2250 current = os.path.dirname(current)
2251 if current in directories:
2252 break
2253 if current in non_directories: 2253 ↛ 2254line 2253 didn't jump to line 2254 because the condition on line 2253 was never true
2254 raise ValueError(
2255 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2256 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2257 )
2258 missing_dirs.append(current)
2259 for dir_path in reversed(missing_dirs):
2260 parent_dir = directories[os.path.dirname(dir_path)]
2261 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2262 directories[dir_path] = d
2264 for path_def in as_path_defs(paths):
2265 path = path_def.path_name
2266 if path in directories or path in non_directories: 2266 ↛ 2267line 2266 didn't jump to line 2267 because the condition on line 2266 was never true
2267 raise ValueError(
2268 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2269 ' "correct order" (ensure directories occur before their children)'
2270 )
2271 if root_dir is None:
2272 root_fs_path = None
2273 if path in (".", "./", "/"):
2274 root_fs_path = path_def.fs_path
2275 root_dir = FSRootDir(fs_path=root_fs_path)
2276 directories["."] = root_dir
2278 if path not in (".", "./", "/") and not path.startswith("./"):
2279 path = "./" + path
2280 if path not in (".", "./", "/"):
2281 _ensure_parent_dirs(path)
2282 if path in (".", "./"):
2283 assert "." in directories
2284 continue
2285 is_dir = False
2286 if path.endswith("/"):
2287 path = path[:-1]
2288 is_dir = True
2289 directory = directories[os.path.dirname(path)]
2290 assert not is_dir or not bool(
2291 path_def.link_target
2292 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2293 fs_path = VirtualTestPath(
2294 os.path.basename(path),
2295 directory,
2296 is_dir=is_dir,
2297 mode=path_def.mode,
2298 mtime=path_def.mtime,
2299 has_fs_path=path_def.has_fs_path,
2300 fs_path=path_def.fs_path,
2301 link_target=path_def.link_target,
2302 content=path_def.content,
2303 materialized_content=path_def.materialized_content,
2304 )
2305 assert not fs_path.is_detached
2306 if fs_path.is_dir:
2307 directories[fs_path.path] = fs_path
2308 else:
2309 non_directories.add(fs_path.path)
2311 if root_dir is None:
2312 root_dir = FSRootDir()
2314 root_dir.is_read_write = read_write_fs
2315 return root_dir