Coverage for src/debputy/filesystem_scan.py: 66%
1294 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import shutil
9import stat
10import subprocess
11import tempfile
12import time
13from abc import ABC
14from contextlib import suppress
15from typing import (
16 List,
17 Iterable,
18 Dict,
19 Optional,
20 Tuple,
21 Union,
22 Iterator,
23 Mapping,
24 cast,
25 Any,
26 ContextManager,
27 TextIO,
28 BinaryIO,
29 NoReturn,
30 Type,
31 Generic,
32 Callable,
33 TypeVar,
34 overload,
35 Literal,
36)
37from weakref import ref, ReferenceType
39from debputy.exceptions import (
40 PureVirtualPathError,
41 DebputyFSIsROError,
42 DebputyMetadataAccessError,
43 TestPathWithNonExistentFSPathError,
44 SymlinkLoopError,
45)
46from debputy.intermediate_manifest import PathType
47from debputy.manifest_parser.base_types import (
48 ROOT_DEFINITION,
49 StaticFileSystemOwner,
50 StaticFileSystemGroup,
51)
52from debputy.plugin.api.spec import (
53 VirtualPath,
54 PathDef,
55 PathMetadataReference,
56 PMT,
57)
58from debputy.types import VP
59from debputy.util import (
60 generated_content_dir,
61 _error,
62 escape_shell,
63 assume_not_none,
64 _normalize_path,
65 _debug_log,
66)
68BY_BASENAME = operator.attrgetter("name")
70FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
71FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
74BinaryOpenMode = Literal[
75 "rb",
76 "r+b",
77 "wb",
78 "w+b",
79 "xb",
80 "ab",
81]
82TextOpenMode = Literal[
83 "r",
84 "r+",
85 "rt",
86 "r+t",
87 "w",
88 "w+",
89 "wt",
90 "w+t",
91 "x",
92 "xt",
93 "a",
94 "at",
95]
96OpenMode = Union[BinaryOpenMode, TextOpenMode]
99class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
100 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
102 def __init__(
103 self,
104 owning_plugin: str,
105 current_plugin: str,
106 metadata_type: Type[PMT],
107 ) -> None:
108 self._owning_plugin = owning_plugin
109 self._current_plugin = current_plugin
110 self._metadata_type = metadata_type
112 @property
113 def is_present(self) -> bool:
114 return False
116 @property
117 def can_read(self) -> bool:
118 return self._owning_plugin == self._current_plugin
120 @property
121 def can_write(self) -> bool:
122 return False
124 @property
125 def value(self) -> Optional[PMT]:
126 if self.can_read: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true
127 return None
128 raise DebputyMetadataAccessError(
129 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
130 f" {self._owning_plugin} as the metadata has not been made"
131 f" readable to the plugin {self._current_plugin}."
132 )
134 @value.setter
135 def value(self, new_value: PMT) -> None:
136 if self._is_owner:
137 raise DebputyFSIsROError(
138 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
139 )
140 raise DebputyMetadataAccessError(
141 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
142 f" {self._owning_plugin} as the metadata has not been made"
143 f" read-write to the plugin {self._current_plugin}."
144 )
146 @property
147 def _is_owner(self) -> bool:
148 return self._owning_plugin == self._current_plugin
151@dataclasses.dataclass(slots=True)
152class PathMetadataValue(Generic[PMT]):
153 owning_plugin: str
154 metadata_type: Type[PMT]
155 value: Optional[PMT] = None
157 def can_read_value(self, current_plugin: str) -> bool:
158 return self.owning_plugin == current_plugin
160 def can_write_value(self, current_plugin: str) -> bool:
161 return self.owning_plugin == current_plugin
164class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
165 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
167 def __init__(
168 self,
169 owning_path: VirtualPath,
170 current_plugin: str,
171 path_metadata_value: PathMetadataValue[PMT],
172 ) -> None:
173 self._owning_path = owning_path
174 self._current_plugin = current_plugin
175 self._path_metadata_value = path_metadata_value
177 @property
178 def is_present(self) -> bool:
179 if not self.can_read: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 return False
181 return self._path_metadata_value.value is not None
183 @property
184 def can_read(self) -> bool:
185 return self._path_metadata_value.can_read_value(self._current_plugin)
187 @property
188 def can_write(self) -> bool:
189 if not self._path_metadata_value.can_write_value(self._current_plugin): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 return False
191 owning_path = self._owning_path
192 return owning_path.is_read_write and not owning_path.is_detached
194 @property
195 def value(self) -> Optional[PMT]:
196 if self.can_read: 196 ↛ 198line 196 didn't jump to line 198 because the condition on line 196 was always true
197 return self._path_metadata_value.value
198 raise DebputyMetadataAccessError(
199 f"Cannot read the metadata {self._metadata_type_name} owned by"
200 f" {self._owning_plugin} as the metadata has not been made"
201 f" readable to the plugin {self._current_plugin}."
202 )
204 @value.setter
205 def value(self, new_value: PMT) -> None:
206 if not self.can_write: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 m = "set" if new_value is not None else "delete"
208 raise DebputyMetadataAccessError(
209 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
210 f" {self._owning_plugin} as the metadata has not been made"
211 f" read-write to the plugin {self._current_plugin}."
212 )
213 owning_path = self._owning_path
214 if not owning_path.is_read_write: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 raise DebputyFSIsROError(
216 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
217 )
218 if owning_path.is_detached: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 raise TypeError(
220 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
221 )
222 self._path_metadata_value.value = new_value
224 @property
225 def _is_owner(self) -> bool:
226 return self._owning_plugin == self._current_plugin
228 @property
229 def _owning_plugin(self) -> str:
230 return self._path_metadata_value.owning_plugin
232 @property
233 def _metadata_type_name(self) -> str:
234 return self._path_metadata_value.metadata_type.__name__
237def _cp_a(source: str, dest: str) -> None:
238 cmd = ["cp", "-a", source, dest]
239 try:
240 subprocess.check_call(cmd)
241 except subprocess.CalledProcessError:
242 full_command = escape_shell(*cmd)
243 _error(
244 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
245 f" above to understand what went wrong. The full command was: {full_command}"
246 )
249def _split_path(path: str) -> Tuple[bool, bool, List[str]]:
250 must_be_dir = True if path.endswith("/") else False
251 absolute = False
252 if path.startswith("/"):
253 absolute = True
254 path = "." + path
255 path_parts = path.rstrip("/").split("/")
256 if must_be_dir:
257 path_parts.append(".")
258 return absolute, must_be_dir, path_parts
261def _root(path: VP) -> VP:
262 current = path
263 while True:
264 parent = current.parent_dir
265 if parent is None:
266 return current
267 current = parent
270def _check_fs_path_is_file(
271 fs_path: str,
272 unlink_on_error: Optional["VirtualPath"] = None,
273) -> None:
274 had_issue = False
275 try:
276 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
277 st = os.lstat(fs_path)
278 except FileNotFoundError:
279 had_issue = True
280 else:
281 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 had_issue = True
283 if not had_issue: 283 ↛ 286line 283 didn't jump to line 286 because the condition on line 283 was always true
284 return
286 if unlink_on_error:
287 with suppress(FileNotFoundError):
288 os.unlink(fs_path)
289 raise TypeError(
290 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
291 " linked to another file. The entry has been disconnected."
292 )
295class CurrentPluginContextManager:
296 __slots__ = ("_plugin_names",)
298 def __init__(self, initial_plugin_name: str) -> None:
299 self._plugin_names = [initial_plugin_name]
301 @property
302 def current_plugin_name(self) -> str:
303 return self._plugin_names[-1]
305 @contextlib.contextmanager
306 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
307 self._plugin_names.append(new_plugin_name)
308 yield new_plugin_name
309 self._plugin_names.pop()
312class VirtualPathBase(VirtualPath, ABC):
313 __slots__ = ()
315 def _orphan_safe_path(self) -> str:
316 return self.path
318 def _rw_check(self) -> None:
319 if not self.is_read_write:
320 raise DebputyFSIsROError(
321 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
322 " Debputy Virtual File system is R/O."
323 )
325 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
326 match, missing = self.attempt_lookup(path)
327 if missing:
328 return None
329 return match
331 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]:
332 if self.is_detached: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 raise ValueError(
334 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
335 )
336 absolute, must_be_dir, path_parts = _split_path(path)
337 current = _root(self) if absolute else self
338 path_parts.reverse()
339 link_expansions = set()
340 while path_parts:
341 dir_part = path_parts.pop()
342 if dir_part == ".":
343 continue
344 if dir_part == "..":
345 p = current.parent_dir
346 if p is None: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 raise ValueError(f'The path "{path}" escapes the root dir')
348 current = p
349 continue
350 try:
351 current = current[dir_part]
352 except KeyError:
353 path_parts.append(dir_part)
354 path_parts.reverse()
355 if must_be_dir:
356 path_parts.pop()
357 return current, path_parts
358 if current.is_symlink and path_parts:
359 if current.path in link_expansions:
360 # This is our loop detection for now. It might have some false positives where you
361 # could safely resolve the same symlink twice. However, given that this use-case is
362 # basically non-existent in practice for packaging, we just stop here for now.
363 raise SymlinkLoopError(
364 f'The path "{path}" traversed the symlink "{current.path}" multiple'
365 " times. Currently, traversing the same symlink twice is considered"
366 " a loop by `debputy` even if the path would eventually resolve."
367 " Consider filing a feature request if you have a benign case that"
368 " triggers this error."
369 )
370 link_expansions.add(current.path)
371 link_target = current.readlink()
372 link_absolute, _, link_path_parts = _split_path(link_target)
373 if link_absolute:
374 current = _root(current)
375 else:
376 current = assume_not_none(current.parent_dir)
377 link_path_parts.reverse()
378 path_parts.extend(link_path_parts)
379 return current, []
381 def mkdirs(self, path: str) -> "VirtualPath":
382 current: VirtualPath
383 current, missing_parts = self.attempt_lookup(
384 f"{path}/" if not path.endswith("/") else path
385 )
386 if not current.is_dir: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 raise ValueError(
388 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
389 " a directory. However, that path exist AND is a not directory."
390 )
391 for missing_part in missing_parts:
392 assert missing_part not in (".", "..")
393 current = current.mkdir(missing_part)
394 return current
396 def prune_if_empty_dir(self) -> None:
397 """Remove this and all (now) empty parent directories
399 Same as: `rmdir --ignore-fail-on-non-empty --parents`
401 This operation may cause the path (and any of its parent directories) to become "detached"
402 and therefore unsafe to use in further operations.
403 """
404 self._rw_check()
406 if not self.is_dir: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
408 if any(self.iterdir):
409 return
410 parent_dir = assume_not_none(self.parent_dir)
412 # Recursive does not matter; we already know the directory is empty.
413 self.unlink()
415 # Note: The root dir must never be deleted. This works because when delegating it to the root
416 # directory, its implementation of this method is a no-op. If this is later rewritten to an
417 # inline loop (rather than recursion), be sure to preserve this feature.
418 parent_dir.prune_if_empty_dir()
420 def _current_plugin(self) -> str:
421 if self.is_detached: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 raise TypeError("Cannot resolve the current plugin; path is detached")
423 current = self
424 while True:
425 next_parent = current.parent_dir
426 if next_parent is None:
427 break
428 current = next_parent
429 assert current is not None
430 return cast("FSRootDir", current)._current_plugin()
432 @overload
433 def open_child( 433 ↛ exitline 433 didn't return from function 'open_child' because
434 self,
435 name: str,
436 mode: TextOpenMode = ...,
437 buffering: int = -1,
438 ) -> TextIO: ...
440 @overload
441 def open_child( 441 ↛ exitline 441 didn't return from function 'open_child' because
442 self,
443 name: str,
444 mode: BinaryOpenMode = ...,
445 buffering: int = -1,
446 ) -> BinaryIO: ...
448 @contextlib.contextmanager
449 def open_child(
450 self,
451 name: str,
452 mode: OpenMode = "r",
453 buffering: int = -1,
454 ) -> Union[TextIO, BinaryIO]:
455 """Open a child path of the current directory in a given mode. Usually used with a context manager
457 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
458 The following symbols are accepted with the same meaning as Python's open:
459 * `r`
460 * `w`
461 * `x`
462 * `a`
463 * `+`
464 * `b`
465 * `t`
467 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
468 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
469 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
470 as-is.
472 :param name: The name of the child path to open. Must be a basename.
473 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
474 examples.
475 :param buffering: Same as open(..., buffering=...) where supported. Notably during
476 testing, the content may be purely in memory and use a BytesIO/StringIO
477 (which does not accept that parameter, but then it is buffered in a different way)
478 :return: The file handle.
479 """
480 existing = self.get(name)
481 if "r" in mode: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 if existing is None:
483 raise ValueError(
484 f"Path {self.path}/{name} does not exist and mode had `r`"
485 )
486 if "+" not in mode:
487 with existing.open(byte_io="b" in mode, buffering=buffering) as fd:
488 yield fd
490 encoding = None if "b" in mode else "utf-8"
492 if existing: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 if "x" in mode:
494 raise ValueError(
495 f"Path {existing.path} already exists and mode had `x`"
496 )
497 with (
498 existing.replace_fs_path_content() as fs_path,
499 open(fs_path, mode, encoding=encoding) as fd,
500 ):
501 yield fd
502 else:
503 assert "r" not in mode
504 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
505 with (
506 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
507 open(new_path.fs_path, mode, encoding=encoding) as fd,
508 ):
509 yield fd
512class FSPath(VirtualPathBase, ABC):
513 __slots__ = (
514 "_basename",
515 "_parent_dir",
516 "_children",
517 "_path_cache",
518 "_parent_path_cache",
519 "_last_known_parent_path",
520 "_mode",
521 "_owner",
522 "_group",
523 "_mtime",
524 "_stat_cache",
525 "_metadata",
526 "__weakref__",
527 )
529 def __init__(
530 self,
531 basename: str,
532 parent: Optional["FSPath"],
533 children: Optional[Dict[str, "FSPath"]] = None,
534 initial_mode: Optional[int] = None,
535 mtime: Optional[float] = None,
536 stat_cache: Optional[os.stat_result] = None,
537 ) -> None:
538 self._basename = basename
539 self._path_cache: Optional[str] = None
540 self._parent_path_cache: Optional[str] = None
541 self._children = children
542 self._last_known_parent_path: Optional[str] = None
543 self._mode = initial_mode
544 self._mtime = mtime
545 self._stat_cache = stat_cache
546 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {}
547 self._owner = ROOT_DEFINITION
548 self._group = ROOT_DEFINITION
550 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
551 # is_orphaned, which assumes self._parent_dir is an attribute.
552 self._parent_dir: Optional[ReferenceType["FSPath"]] = None
553 if parent is not None:
554 self.parent_dir = parent
556 def __repr__(self) -> str:
557 return (
558 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
559 f" is_file={self.is_file},"
560 f" is_dir={self.is_dir},"
561 f" is_symlink={self.is_symlink},"
562 f" has_fs_path={self.has_fs_path},"
563 f" children_len={len(self._children) if self._children else 0})"
564 )
566 @property
567 def name(self) -> str:
568 return self._basename
570 @name.setter
571 def name(self, new_name: str) -> None:
572 self._rw_check()
573 if new_name == self._basename: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true
574 return
575 if self.is_detached: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 self._basename = new_name
577 return
578 self._rw_check()
579 parent = self.parent_dir
580 # This little parent_dir dance ensures the parent dir detects the rename properly
581 self.parent_dir = None
582 self._basename = new_name
583 self.parent_dir = parent
585 @property
586 def iterdir(self) -> Iterable["FSPath"]:
587 if self._children is not None:
588 yield from self._children.values()
590 def all_paths(self) -> Iterable["FSPath"]:
591 yield self
592 if not self.is_dir:
593 return
594 by_basename = BY_BASENAME
595 stack = sorted(self.iterdir, key=by_basename, reverse=True)
596 while stack:
597 current = stack.pop()
598 yield current
599 if current.is_dir and not current.is_detached:
600 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True))
602 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]:
603 # FIXME: can this be more "os.walk"-like without making it harder to implement?
604 if not self.is_dir: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 yield self, []
606 return
607 by_basename = BY_BASENAME
608 stack = [self]
609 while stack:
610 current = stack.pop()
611 children = sorted(current.iterdir, key=by_basename)
612 assert not children or current.is_dir
613 yield current, children
614 # Removing the directory counts as discarding the children.
615 if not current.is_detached: 615 ↛ 609line 615 didn't jump to line 609 because the condition on line 615 was always true
616 stack.extend(reversed(children))
618 def _orphan_safe_path(self) -> str:
619 if not self.is_detached or self._last_known_parent_path is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true
620 return self.path
621 return f"<orphaned>/{self.name}"
623 @property
624 def is_detached(self) -> bool:
625 parent = self._parent_dir
626 if parent is None:
627 return True
628 resolved_parent = parent()
629 if resolved_parent is None: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true
630 return True
631 return resolved_parent.is_detached
633 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
634 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
635 # behavior to avoid surprises for now.
636 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
637 # to using it)
638 __iter__ = None
640 def __getitem__(self, key) -> "FSPath":
641 if self._children is None:
642 raise KeyError(
643 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
644 )
645 if isinstance(key, FSPath): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 key = key.name
647 return self._children[key]
649 def __delitem__(self, key) -> None:
650 self._rw_check()
651 children = self._children
652 if children is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true
653 raise KeyError(key)
654 del children[key]
656 def get(self, key: str) -> "Optional[FSPath]":
657 try:
658 return self[key]
659 except KeyError:
660 return None
662 def __contains__(self, item: object) -> bool:
663 if isinstance(item, VirtualPath): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true
664 return item.parent_dir is self
665 if not isinstance(item, str): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 return False
667 m = self.get(item)
668 return m is not None
670 def _add_child(self, child: "FSPath") -> None:
671 self._rw_check()
672 if not self.is_dir: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
674 if self._children is None:
675 self._children = {}
677 conflict_child = self.get(child.name)
678 if conflict_child is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 conflict_child.unlink(recursive=True)
680 self._children[child.name] = child
682 @property
683 def tar_path(self) -> str:
684 path = self.path
685 if self.is_dir:
686 return path + "/"
687 return path
689 @property
690 def path(self) -> str:
691 parent_path = self.parent_dir_path
692 if (
693 self._parent_path_cache is not None
694 and self._parent_path_cache == parent_path
695 ):
696 return assume_not_none(self._path_cache)
697 if parent_path is None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 raise ReferenceError(
699 f"The path {self.name} is detached! {self.__class__.__name__}"
700 )
701 self._parent_path_cache = parent_path
702 ret = os.path.join(parent_path, self.name)
703 self._path_cache = ret
704 return ret
706 @property
707 def parent_dir(self) -> Optional["FSPath"]:
708 p_ref = self._parent_dir
709 p = p_ref() if p_ref is not None else None
710 if p is None: 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true
711 raise ReferenceError(
712 f"The path {self.name} is detached! {self.__class__.__name__}"
713 )
714 return p
716 @parent_dir.setter
717 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
718 self._rw_check()
719 if new_parent is not None:
720 if not new_parent.is_dir: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 raise ValueError(
722 f"The parent {new_parent._orphan_safe_path()} must be a directory"
723 )
724 new_parent._rw_check()
725 old_parent = None
726 self._last_known_parent_path = None
727 if not self.is_detached:
728 old_parent = self.parent_dir
729 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
730 del old_parent_children[self.name]
731 if new_parent is not None:
732 self._parent_dir = ref(new_parent)
733 new_parent._add_child(self)
734 else:
735 if old_parent is not None and not old_parent.is_detached: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true
736 self._last_known_parent_path = old_parent.path
737 self._parent_dir = None
738 self._parent_path_cache = None
740 @property
741 def parent_dir_path(self) -> Optional[str]:
742 if self.is_detached: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true
743 return self._last_known_parent_path
744 return assume_not_none(self.parent_dir).path
746 def chown(
747 self,
748 owner: Optional[StaticFileSystemOwner],
749 group: Optional[StaticFileSystemGroup],
750 ) -> None:
751 """Change the owner/group of this path
753 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
754 :param group: The desired group definition for this path. If None, then no change of group is performed.
755 """
756 self._rw_check()
758 if owner is not None:
759 self._owner = owner.ownership_definition
760 if group is not None:
761 self._group = group.ownership_definition
763 def stat(self) -> os.stat_result:
764 st = self._stat_cache
765 if st is None:
766 st = self._uncached_stat()
767 self._stat_cache = st
768 return st
770 def _uncached_stat(self) -> os.stat_result:
771 return os.lstat(self.fs_path)
773 @property
774 def mode(self) -> int:
775 current_mode = self._mode
776 if current_mode is None: 776 ↛ 777line 776 didn't jump to line 777 because the condition on line 776 was never true
777 current_mode = stat.S_IMODE(self.stat().st_mode)
778 self._mode = current_mode
779 return current_mode
781 @mode.setter
782 def mode(self, new_mode: int) -> None:
783 self._rw_check()
784 min_bit = 0o700 if self.is_dir else 0o400
785 if (new_mode & min_bit) != min_bit: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 omode = oct(new_mode)[2:]
787 omin = oct(min_bit)[2:]
788 raise ValueError(
789 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
790 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
791 " There are no paths that do not need these requirements met and they can cause"
792 " problems during build or on the final system."
793 )
794 self._mode = new_mode
796 def _ensure_min_mode(self) -> None:
797 min_bit = 0o700 if self.is_dir else 0o600
798 if self.has_fs_path and (self.mode & 0o600) != 0o600: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true
799 try:
800 fs_path = self.fs_path
801 except TestPathWithNonExistentFSPathError:
802 pass
803 else:
804 st = os.stat(fs_path)
805 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
806 _debug_log(
807 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
808 )
809 os.chmod(fs_path, new_fs_mode)
810 self.mode |= min_bit
812 @property
813 def mtime(self) -> float:
814 mtime = self._mtime
815 if mtime is None:
816 mtime = self.stat().st_mtime
817 self._mtime = mtime
818 return mtime
820 @mtime.setter
821 def mtime(self, new_mtime: float) -> None:
822 self._rw_check()
823 self._mtime = new_mtime
825 @property
826 def tar_owner_info(self) -> Tuple[str, int, str, int]:
827 owner = self._owner
828 group = self._group
829 return (
830 owner.entity_name,
831 owner.entity_id,
832 group.entity_name,
833 group.entity_id,
834 )
836 @property
837 def _can_replace_inline(self) -> bool:
838 return False
840 @contextlib.contextmanager
841 def add_file(
842 self,
843 name: str,
844 *,
845 unlink_if_exists: bool = True,
846 use_fs_path_mode: bool = False,
847 mode: int = 0o0644,
848 mtime: Optional[float] = None,
849 # Special-case parameters that are not exposed in the API
850 fs_basename_matters: bool = False,
851 subdir_key: Optional[str] = None,
852 ) -> Iterator["FSPath"]:
853 if "/" in name or name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true
854 raise ValueError(f'Invalid file name: "{name}"')
855 if not self.is_dir: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 raise TypeError(
857 f"Cannot create {self._orphan_safe_path()}/{name}:"
858 f" {self._orphan_safe_path()} is not a directory"
859 )
860 self._rw_check()
861 existing = self.get(name)
862 if existing is not None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 if not unlink_if_exists:
864 raise ValueError(
865 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
866 f" and exist_ok was False"
867 )
868 existing.unlink(recursive=False)
870 if fs_basename_matters and subdir_key is None: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true
871 raise ValueError(
872 "When fs_basename_matters is True, a subdir_key must be provided"
873 )
875 directory = generated_content_dir(subdir_key=subdir_key)
877 if fs_basename_matters: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true
878 fs_path = os.path.join(directory, name)
879 with open(fs_path, "xb") as _:
880 # Ensure that the fs_path exists
881 pass
882 child = FSBackedFilePath(
883 name,
884 self,
885 fs_path,
886 replaceable_inline=True,
887 mtime=mtime,
888 )
889 yield child
890 else:
891 with tempfile.NamedTemporaryFile(
892 dir=directory, suffix=f"__{name}", delete=False
893 ) as fd:
894 fs_path = fd.name
895 child = FSBackedFilePath(
896 name,
897 self,
898 fs_path,
899 replaceable_inline=True,
900 mtime=mtime,
901 )
902 fd.close()
903 yield child
905 if use_fs_path_mode: 905 ↛ 907line 905 didn't jump to line 907 because the condition on line 905 was never true
906 # Ensure the caller can see the current mode
907 os.chmod(fs_path, mode)
908 _check_fs_path_is_file(fs_path, unlink_on_error=child)
909 child._reset_caches()
910 if not use_fs_path_mode: 910 ↛ exitline 910 didn't return from function 'add_file' because the condition on line 910 was always true
911 child.mode = mode
913 def insert_file_from_fs_path(
914 self,
915 name: str,
916 fs_path: str,
917 *,
918 exist_ok: bool = True,
919 use_fs_path_mode: bool = False,
920 mode: int = 0o0644,
921 require_copy_on_write: bool = True,
922 follow_symlinks: bool = True,
923 reference_path: Optional[VirtualPath] = None,
924 ) -> "FSPath":
925 if "/" in name or name in {".", ".."}: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 raise ValueError(f'Invalid file name: "{name}"')
927 if not self.is_dir: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true
928 raise TypeError(
929 f"Cannot create {self._orphan_safe_path()}/{name}:"
930 f" {self._orphan_safe_path()} is not a directory"
931 )
932 self._rw_check()
933 if name in self and not exist_ok: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 raise ValueError(
935 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
936 f" and exist_ok was False"
937 )
938 new_fs_path = fs_path
939 if follow_symlinks:
940 if reference_path is not None: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true
941 raise ValueError(
942 "The reference_path cannot be used with follow_symlinks"
943 )
944 new_fs_path = os.path.realpath(new_fs_path, strict=True)
946 fmode: Optional[int] = mode
947 if use_fs_path_mode:
948 fmode = None
950 st = None
951 if reference_path is None:
952 st = os.lstat(new_fs_path)
953 if stat.S_ISDIR(st.st_mode): 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true
954 raise ValueError(
955 f'The provided path "{fs_path}" is a directory. However, this'
956 " method does not support directories"
957 )
959 if not stat.S_ISREG(st.st_mode): 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true
960 if follow_symlinks:
961 raise ValueError(
962 f"The resolved fs_path ({new_fs_path}) was not a file."
963 )
964 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
965 return FSBackedFilePath(
966 name,
967 self,
968 new_fs_path,
969 initial_mode=fmode,
970 stat_cache=st,
971 replaceable_inline=not require_copy_on_write,
972 reference_path=reference_path,
973 )
975 def add_symlink(
976 self,
977 link_name: str,
978 link_target: str,
979 *,
980 reference_path: Optional[VirtualPath] = None,
981 ) -> "FSPath":
982 if "/" in link_name or link_name in {".", ".."}: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true
983 raise ValueError(
984 f'Invalid file name: "{link_name}" (it must be a valid basename)'
985 )
986 if not self.is_dir: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true
987 raise TypeError(
988 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
989 f" {self._orphan_safe_path()} is not a directory"
990 )
991 self._rw_check()
993 existing = self.get(link_name)
994 if existing: 994 ↛ 996line 994 didn't jump to line 996 because the condition on line 994 was never true
995 # Emulate ln -sf with attempts a non-recursive unlink first.
996 existing.unlink(recursive=False)
998 return SymlinkVirtualPath(
999 link_name,
1000 self,
1001 link_target,
1002 reference_path=reference_path,
1003 )
1005 def mkdir(
1006 self,
1007 name: str,
1008 *,
1009 reference_path: Optional[VirtualPath] = None,
1010 ) -> "FSPath":
1011 if "/" in name or name in {".", ".."}: 1011 ↛ 1012line 1011 didn't jump to line 1012 because the condition on line 1011 was never true
1012 raise ValueError(
1013 f'Invalid file name: "{name}" (it must be a valid basename)'
1014 )
1015 if not self.is_dir: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 raise TypeError(
1017 f"Cannot create {self._orphan_safe_path()}/{name}:"
1018 f" {self._orphan_safe_path()} is not a directory"
1019 )
1020 if reference_path is not None and not reference_path.is_dir: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true
1021 raise ValueError(
1022 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
1023 )
1024 self._rw_check()
1026 existing = self.get(name)
1027 if existing: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true
1028 raise ValueError(f"Path {existing.path} already exist")
1029 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1031 def mkdirs(self, path: str) -> "FSPath":
1032 return cast("FSPath", super().mkdirs(path))
1034 @property
1035 def is_read_write(self) -> bool:
1036 """When true, the file system entry may be mutated
1038 :return: Whether file system mutations are permitted.
1039 """
1040 if self.is_detached:
1041 return True
1042 return assume_not_none(self.parent_dir).is_read_write
1044 def unlink(self, *, recursive: bool = False) -> None:
1045 """Unlink a file or a directory
1047 This operation will detach the path from the file system (causing "is_detached" to return True).
1049 Note that the root directory cannot be deleted.
1051 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1052 as well. When False, an error is raised if the path is a non-empty directory
1053 """
1054 if self.is_detached: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 return
1056 if not recursive and any(self.iterdir): 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true
1057 raise ValueError(
1058 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1059 )
1060 # The .parent_dir setter does a _rw_check() for us.
1061 self.parent_dir = None
1063 def _reset_caches(self) -> None:
1064 self._mtime = None
1065 self._stat_cache = None
1067 def metadata(
1068 self,
1069 metadata_type: Type[PMT],
1070 *,
1071 owning_plugin: Optional[str] = None,
1072 ) -> PathMetadataReference[PMT]:
1073 current_plugin = self._current_plugin()
1074 if owning_plugin is None: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true
1075 owning_plugin = current_plugin
1076 metadata_key = (owning_plugin, metadata_type)
1077 metadata_value = self._metadata.get(metadata_key)
1078 if metadata_value is None:
1079 if self.is_detached: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true
1080 raise TypeError(
1081 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1082 )
1083 if not self.is_read_write:
1084 return AlwaysEmptyReadOnlyMetadataReference(
1085 owning_plugin,
1086 current_plugin,
1087 metadata_type,
1088 )
1089 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1090 self._metadata[metadata_key] = metadata_value
1091 return PathMetadataReferenceImplementation(
1092 self,
1093 current_plugin,
1094 metadata_value,
1095 )
1097 @contextlib.contextmanager
1098 def replace_fs_path_content(
1099 self,
1100 *,
1101 use_fs_path_mode: bool = False,
1102 ) -> Iterator[str]:
1103 if not self.is_file: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 raise TypeError(
1105 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1106 )
1107 self._rw_check()
1108 fs_path = self.fs_path
1109 if not self._can_replace_inline: 1109 ↛ 1121line 1109 didn't jump to line 1121 because the condition on line 1109 was always true
1110 fs_path = self.fs_path
1111 directory = generated_content_dir()
1112 with tempfile.NamedTemporaryFile(
1113 dir=directory, suffix=f"__{self.name}", delete=False
1114 ) as new_path_fd:
1115 new_path_fd.close()
1116 _cp_a(fs_path, new_path_fd.name)
1117 fs_path = new_path_fd.name
1118 self._replaced_path(fs_path)
1119 assert self.fs_path == fs_path
1121 current_mtime = self._mtime
1122 if current_mtime is not None:
1123 os.utime(fs_path, (current_mtime, current_mtime))
1125 current_mode = self.mode
1126 yield fs_path
1127 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1128 if not use_fs_path_mode: 1128 ↛ 1130line 1128 didn't jump to line 1130 because the condition on line 1128 was always true
1129 os.chmod(fs_path, current_mode)
1130 self._reset_caches()
1132 def _replaced_path(self, new_fs_path: str) -> None:
1133 raise NotImplementedError
1136class VirtualFSPathBase(FSPath, ABC):
1137 __slots__ = ()
1139 def __init__(
1140 self,
1141 basename: str,
1142 parent: Optional["FSPath"],
1143 children: Optional[Dict[str, "FSPath"]] = None,
1144 initial_mode: Optional[int] = None,
1145 mtime: Optional[float] = None,
1146 stat_cache: Optional[os.stat_result] = None,
1147 ) -> None:
1148 super().__init__(
1149 basename,
1150 parent,
1151 children,
1152 initial_mode=initial_mode,
1153 mtime=mtime,
1154 stat_cache=stat_cache,
1155 )
1157 @property
1158 def mtime(self) -> float:
1159 mtime = self._mtime
1160 if mtime is None:
1161 mtime = time.time()
1162 self._mtime = mtime
1163 return mtime
1165 @property
1166 def has_fs_path(self) -> bool:
1167 return False
1169 def stat(self) -> os.stat_result:
1170 if not self.has_fs_path:
1171 raise PureVirtualPathError(
1172 "stat() is only applicable to paths backed by the file system. The path"
1173 f" {self._orphan_safe_path()!r} is purely virtual"
1174 )
1175 return super().stat()
1177 @property
1178 def fs_path(self) -> str:
1179 if not self.has_fs_path:
1180 raise PureVirtualPathError(
1181 "fs_path is only applicable to paths backed by the file system. The path"
1182 f" {self._orphan_safe_path()!r} is purely virtual"
1183 )
1184 return self.fs_path
1187class FSRootDir(FSPath):
1188 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1190 def __init__(self, fs_path: Optional[str] = None) -> None:
1191 self._fs_path = fs_path
1192 self._fs_read_write = True
1193 super().__init__(
1194 ".",
1195 None,
1196 children={},
1197 initial_mode=0o755,
1198 )
1199 self._plugin_context = CurrentPluginContextManager("debputy")
1201 @property
1202 def is_detached(self) -> bool:
1203 return False
1205 def _orphan_safe_path(self) -> str:
1206 return self.name
1208 @property
1209 def path(self) -> str:
1210 return self.name
1212 @property
1213 def parent_dir(self) -> Optional["FSPath"]:
1214 return None
1216 @parent_dir.setter
1217 def parent_dir(self, new_parent: Optional[FSPath]) -> None:
1218 if new_parent is not None:
1219 raise ValueError("The root directory cannot become a non-root directory")
1221 @property
1222 def parent_dir_path(self) -> Optional[str]:
1223 return None
1225 @property
1226 def is_dir(self) -> bool:
1227 return True
1229 @property
1230 def is_file(self) -> bool:
1231 return False
1233 @property
1234 def is_symlink(self) -> bool:
1235 return False
1237 def readlink(self) -> str:
1238 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1240 @property
1241 def has_fs_path(self) -> bool:
1242 return self._fs_path is not None
1244 def stat(self) -> os.stat_result:
1245 if not self.has_fs_path:
1246 raise PureVirtualPathError(
1247 "stat() is only applicable to paths backed by the file system. The path"
1248 f" {self._orphan_safe_path()!r} is purely virtual"
1249 )
1250 return os.stat(self.fs_path)
1252 @property
1253 def fs_path(self) -> str:
1254 if not self.has_fs_path: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true
1255 raise PureVirtualPathError(
1256 "fs_path is only applicable to paths backed by the file system. The path"
1257 f" {self._orphan_safe_path()!r} is purely virtual"
1258 )
1259 return assume_not_none(self._fs_path)
1261 @property
1262 def is_read_write(self) -> bool:
1263 return self._fs_read_write
1265 @is_read_write.setter
1266 def is_read_write(self, new_value: bool) -> None:
1267 self._fs_read_write = new_value
1269 def prune_if_empty_dir(self) -> None:
1270 # No-op for the root directory. There is never a case where you want to delete this directory
1271 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1272 return
1274 def unlink(self, *, recursive: bool = False) -> None:
1275 # There is never a case where you want to delete this directory (and even if you could,
1276 # debputy will need it for technical reasons, so the root dir stays)
1277 raise TypeError("Cannot delete the root directory")
1279 def _current_plugin(self) -> str:
1280 return self._plugin_context.current_plugin_name
1282 @contextlib.contextmanager
1283 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1284 with self._plugin_context.change_plugin_context(new_plugin) as r:
1285 yield r
1288class VirtualPathWithReference(VirtualFSPathBase, ABC):
1289 __slots__ = ("_reference_path",)
1291 def __init__(
1292 self,
1293 basename: str,
1294 parent: FSPath,
1295 *,
1296 default_mode: int,
1297 reference_path: Optional[VirtualPath] = None,
1298 ) -> None:
1299 super().__init__(
1300 basename,
1301 parent=parent,
1302 initial_mode=reference_path.mode if reference_path else default_mode,
1303 )
1304 self._reference_path = reference_path
1306 @property
1307 def has_fs_path(self) -> bool:
1308 ref_path = self._reference_path
1309 return ref_path is not None and ref_path.has_fs_path
1311 @property
1312 def mtime(self) -> float:
1313 mtime = self._mtime
1314 if mtime is None: 1314 ↛ 1321line 1314 didn't jump to line 1321 because the condition on line 1314 was always true
1315 ref_path = self._reference_path
1316 if ref_path: 1316 ↛ 1319line 1316 didn't jump to line 1319 because the condition on line 1316 was always true
1317 mtime = ref_path.mtime
1318 else:
1319 mtime = super().mtime
1320 self._mtime = mtime
1321 return mtime
1323 @mtime.setter
1324 def mtime(self, new_mtime: float) -> None:
1325 self._rw_check()
1326 self._mtime = new_mtime
1328 @property
1329 def fs_path(self) -> str:
1330 ref_path = self._reference_path
1331 if ref_path is not None and ( 1331 ↛ 1335line 1331 didn't jump to line 1335 because the condition on line 1331 was always true
1332 not super().has_fs_path or super().fs_path == ref_path.fs_path
1333 ):
1334 return ref_path.fs_path
1335 return super().fs_path
1337 def stat(self) -> os.stat_result:
1338 ref_path = self._reference_path
1339 if ref_path is not None and (
1340 not super().has_fs_path or super().fs_path == ref_path.fs_path
1341 ):
1342 return ref_path.stat()
1343 return super().stat()
1345 def open(
1346 self,
1347 *,
1348 byte_io: bool = False,
1349 buffering: int = -1,
1350 ) -> Union[TextIO, BinaryIO]:
1351 reference_path = self._reference_path
1352 if reference_path is not None and reference_path.fs_path == self.fs_path:
1353 return reference_path.open(byte_io=byte_io, buffering=buffering)
1354 return super().open(byte_io=byte_io, buffering=buffering)
1357class VirtualDirectoryFSPath(VirtualPathWithReference):
1358 __slots__ = ("_reference_path",)
1360 def __init__(
1361 self,
1362 basename: str,
1363 parent: FSPath,
1364 *,
1365 reference_path: Optional[VirtualPath] = None,
1366 ) -> None:
1367 super().__init__(
1368 basename,
1369 parent,
1370 reference_path=reference_path,
1371 default_mode=0o755,
1372 )
1373 self._reference_path = reference_path
1374 assert reference_path is None or reference_path.is_dir
1375 self._ensure_min_mode()
1377 @property
1378 def is_dir(self) -> bool:
1379 return True
1381 @property
1382 def is_file(self) -> bool:
1383 return False
1385 @property
1386 def is_symlink(self) -> bool:
1387 return False
1389 def readlink(self) -> str:
1390 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1393class SymlinkVirtualPath(VirtualPathWithReference):
1394 __slots__ = ("_link_target",)
1396 def __init__(
1397 self,
1398 basename: str,
1399 parent_dir: FSPath,
1400 link_target: str,
1401 *,
1402 reference_path: Optional[VirtualPath] = None,
1403 ) -> None:
1404 super().__init__(
1405 basename,
1406 parent=parent_dir,
1407 default_mode=_SYMLINK_MODE,
1408 reference_path=reference_path,
1409 )
1410 self._link_target = link_target
1412 @property
1413 def is_dir(self) -> bool:
1414 return False
1416 @property
1417 def is_file(self) -> bool:
1418 return False
1420 @property
1421 def is_symlink(self) -> bool:
1422 return True
1424 def readlink(self) -> str:
1425 return self._link_target
1427 @property
1428 def size(self) -> int:
1429 return len(self.readlink())
1432class FSBackedFilePath(VirtualPathWithReference):
1433 __slots__ = ("_fs_path", "_replaceable_inline")
1435 def __init__(
1436 self,
1437 basename: str,
1438 parent_dir: FSPath,
1439 fs_path: str,
1440 *,
1441 replaceable_inline: bool = False,
1442 initial_mode: Optional[int] = None,
1443 mtime: Optional[float] = None,
1444 stat_cache: Optional[os.stat_result] = None,
1445 reference_path: Optional[VirtualPath] = None,
1446 ) -> None:
1447 super().__init__(
1448 basename,
1449 parent_dir,
1450 default_mode=0o644,
1451 reference_path=reference_path,
1452 )
1453 self._fs_path = fs_path
1454 self._replaceable_inline = replaceable_inline
1455 if initial_mode is not None:
1456 self.mode = initial_mode
1457 if mtime is not None:
1458 self._mtime = mtime
1459 self._stat_cache = stat_cache
1460 assert (
1461 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1462 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1463 self._ensure_min_mode()
1465 @property
1466 def is_dir(self) -> bool:
1467 return False
1469 @property
1470 def is_file(self) -> bool:
1471 return True
1473 @property
1474 def is_symlink(self) -> bool:
1475 return False
1477 def readlink(self) -> str:
1478 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1480 @property
1481 def has_fs_path(self) -> bool:
1482 return True
1484 @property
1485 def fs_path(self) -> str:
1486 return self._fs_path
1488 @property
1489 def _can_replace_inline(self) -> bool:
1490 return self._replaceable_inline
1492 def _replaced_path(self, new_fs_path: str) -> None:
1493 self._fs_path = new_fs_path
1494 self._reference_path = None
1495 self._replaceable_inline = True
1498_SYMLINK_MODE = 0o777
1501class VirtualTestPath(FSPath):
1502 __slots__ = (
1503 "_path_type",
1504 "_has_fs_path",
1505 "_fs_path",
1506 "_link_target",
1507 "_content",
1508 "_materialized_content",
1509 )
1511 def __init__(
1512 self,
1513 basename: str,
1514 parent_dir: Optional[FSPath],
1515 mode: Optional[int] = None,
1516 mtime: Optional[float] = None,
1517 is_dir: bool = False,
1518 has_fs_path: Optional[bool] = False,
1519 fs_path: Optional[str] = None,
1520 link_target: Optional[str] = None,
1521 content: Optional[str] = None,
1522 materialized_content: Optional[str] = None,
1523 ) -> None:
1524 if is_dir:
1525 self._path_type = PathType.DIRECTORY
1526 elif link_target is not None:
1527 self._path_type = PathType.SYMLINK
1528 if mode is not None and mode != _SYMLINK_MODE: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true
1529 raise ValueError(
1530 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1531 )
1532 assert mode is None or mode == _SYMLINK_MODE
1533 else:
1534 self._path_type = PathType.FILE
1536 if mode is not None:
1537 initial_mode = mode
1538 else:
1539 initial_mode = 0o755 if is_dir else 0o644
1541 self._link_target = link_target
1542 if has_fs_path is None:
1543 has_fs_path = bool(fs_path)
1544 self._has_fs_path = has_fs_path
1545 self._fs_path = fs_path
1546 self._materialized_content = materialized_content
1547 super().__init__(
1548 basename,
1549 parent=parent_dir,
1550 initial_mode=initial_mode,
1551 mtime=mtime,
1552 )
1553 self._content = content
1555 @property
1556 def is_dir(self) -> bool:
1557 return self._path_type == PathType.DIRECTORY
1559 @property
1560 def is_file(self) -> bool:
1561 return self._path_type == PathType.FILE
1563 @property
1564 def is_symlink(self) -> bool:
1565 return self._path_type == PathType.SYMLINK
1567 def readlink(self) -> str:
1568 if not self.is_symlink: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true
1569 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1570 link_target = self._link_target
1571 assert link_target is not None
1572 return link_target
1574 @property
1575 def mtime(self) -> float:
1576 if self._mtime is None:
1577 self._mtime = time.time()
1578 return self._mtime
1580 @mtime.setter
1581 def mtime(self, new_mtime: float) -> None:
1582 self._rw_check()
1583 self._mtime = new_mtime
1585 @property
1586 def has_fs_path(self) -> bool:
1587 return self._has_fs_path
1589 def stat(self) -> os.stat_result:
1590 if self.has_fs_path:
1591 path = self.fs_path
1592 if path is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true
1593 raise PureVirtualPathError(
1594 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1595 " cannot provide!"
1596 )
1597 try:
1598 return os.stat(path)
1599 except FileNotFoundError as e:
1600 raise PureVirtualPathError(
1601 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1602 " cannot provide! (An fs_path was provided, but it did not exist)"
1603 ) from e
1605 raise PureVirtualPathError(
1606 "stat() is only applicable to paths backed by the file system. The path"
1607 f" {self._orphan_safe_path()!r} is purely virtual"
1608 )
1610 @property
1611 def size(self) -> int:
1612 if self._content is not None:
1613 return len(self._content.encode("utf-8"))
1614 if self.is_symlink:
1615 return len(self.readlink())
1616 if not self.has_fs_path or self.fs_path is None:
1617 return 0
1618 return self.stat().st_size
1620 @property
1621 def fs_path(self) -> str:
1622 if self.has_fs_path:
1623 if self._fs_path is None and self._materialized_content is not None:
1624 with tempfile.NamedTemporaryFile(
1625 mode="w+t",
1626 encoding="utf-8",
1627 suffix=f"__{self.name}",
1628 delete=False,
1629 ) as fd:
1630 filepath = fd.name
1631 fd.write(self._materialized_content)
1632 self._fs_path = filepath
1633 atexit.register(lambda: os.unlink(filepath))
1635 path = self._fs_path
1636 if path is None: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true
1637 raise PureVirtualPathError(
1638 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1639 " mock path cannot provide!"
1640 )
1641 return path
1642 raise PureVirtualPathError(
1643 "fs_path is only applicable to paths backed by the file system. The path"
1644 f" {self._orphan_safe_path()!r} is purely virtual"
1645 )
1647 def replace_fs_path_content(
1648 self,
1649 *,
1650 use_fs_path_mode: bool = False,
1651 ) -> ContextManager[str]:
1652 if self._content is not None: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true
1653 raise TypeError(
1654 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1655 " created with `content` but for this method to work, the path should have been"
1656 " created with `materialized_content`"
1657 )
1658 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1660 @contextlib.contextmanager
1661 def open_child(
1662 self,
1663 name: str,
1664 mode: OpenMode = "r",
1665 buffering: int = -1,
1666 ) -> Union[TextIO, BinaryIO]:
1667 existing = self.get(name)
1668 if existing or "r" in mode:
1669 with super().open_child(name, mode, buffering=buffering) as fd:
1670 yield fd
1671 return
1672 if "b" in mode:
1673 fd = io.BytesIO(b"")
1674 yield fd
1675 content = fd.getvalue().decode("utf-8")
1676 else:
1677 fd = io.StringIO("")
1678 yield fd
1679 content = fd.getvalue()
1680 VirtualTestPath(
1681 name,
1682 self,
1683 mode=0o644,
1684 content=content,
1685 has_fs_path=True,
1686 )
1688 def open(
1689 self,
1690 *,
1691 byte_io: bool = False,
1692 buffering: int = -1,
1693 ) -> Union[TextIO, BinaryIO]:
1694 if self._content is None:
1695 try:
1696 return super().open(byte_io=byte_io, buffering=buffering)
1697 except FileNotFoundError as e:
1698 raise TestPathWithNonExistentFSPathError(
1699 f"The test path {self.path} had an fs_path {self._fs_path}, which does not"
1700 " exist. This exception can only occur in the testsuite. Either have the"
1701 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1702 " if that is too painful in general, have the code accept this error as a "
1703 " test only-case and provide a default."
1704 ) from e
1706 if byte_io:
1707 return io.BytesIO(self._content.encode("utf-8"))
1708 return io.StringIO(self._content)
1710 def _replaced_path(self, new_fs_path: str) -> None:
1711 self._fs_path = new_fs_path
1714class FSOverlayBase(VirtualPathBase, Generic[FSP]):
1715 __slots__ = (
1716 "_path",
1717 "_fs_path",
1718 "_parent",
1719 "__weakref__",
1720 )
1722 def __init__(
1723 self,
1724 path: str,
1725 fs_path: str,
1726 parent: Optional[FSP],
1727 ) -> None:
1728 self._path: str = path
1729 prefix = "/" if fs_path.startswith("/") else ""
1730 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1731 self._parent: Optional[ReferenceType[FSP]] = (
1732 ref(parent) if parent is not None else None
1733 )
1735 @property
1736 def name(self) -> str:
1737 return os.path.basename(self._path)
1739 @property
1740 def path(self) -> str:
1741 return self._path
1743 @property
1744 def parent_dir(self) -> Optional["FSP"]:
1745 parent = self._parent
1746 if parent is None:
1747 return None
1748 resolved = parent()
1749 if resolved is None:
1750 raise RuntimeError("Parent was garbage collected!")
1751 return resolved
1753 @property
1754 def fs_path(self) -> str:
1755 return self._fs_path
1757 def stat(self) -> os.stat_result:
1758 return os.lstat(self.fs_path)
1760 @property
1761 def is_dir(self) -> bool:
1762 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1763 try:
1764 return stat.S_ISDIR(self.stat().st_mode)
1765 except FileNotFoundError:
1766 return False
1767 except NotImplementedError:
1768 print(self.__class__)
1770 @property
1771 def is_file(self) -> bool:
1772 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1773 try:
1774 return stat.S_ISREG(self.stat().st_mode)
1775 except FileNotFoundError:
1776 return False
1778 @property
1779 def is_symlink(self) -> bool:
1780 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1781 try:
1782 return stat.S_ISLNK(self.stat().st_mode)
1783 except FileNotFoundError:
1784 return False
1786 @property
1787 def has_fs_path(self) -> bool:
1788 return True
1790 def open(
1791 self,
1792 *,
1793 byte_io: bool = False,
1794 buffering: int = -1,
1795 ) -> Union[TextIO, BinaryIO]:
1796 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1797 # case.
1798 if not self.is_file and not self.is_symlink:
1799 raise TypeError(
1800 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1801 )
1803 if byte_io:
1804 return open(self.fs_path, "rb", buffering=buffering)
1805 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering)
1807 def metadata(
1808 self,
1809 metadata_type: Type[PMT],
1810 *,
1811 owning_plugin: Optional[str] = None,
1812 ) -> PathMetadataReference[PMT]:
1813 current_plugin = self._current_plugin()
1814 if owning_plugin is None:
1815 owning_plugin = current_plugin
1816 return AlwaysEmptyReadOnlyMetadataReference(
1817 owning_plugin,
1818 current_plugin,
1819 metadata_type,
1820 )
1822 def all_paths(self) -> Iterable["FSControlPath"]:
1823 yield self
1824 if not self.is_dir:
1825 return
1826 stack = list(self.iterdir)
1827 stack.reverse()
1828 while stack:
1829 current = stack.pop()
1830 yield current
1831 if current.is_dir:
1832 stack.extend(reversed(list(current.iterdir)))
1834 def _resolve_children(
1835 self, new_child: Callable[[str, str, FSP], FSC]
1836 ) -> Mapping[str, FSC]:
1837 if not self.is_dir:
1838 return {}
1839 dir_path = self.path
1840 dir_fs_path = self.fs_path
1841 children = {}
1842 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1843 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1844 child_fs_path = (
1845 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1846 )
1847 children[name] = new_child(
1848 child_path,
1849 child_fs_path,
1850 self,
1851 )
1852 return children
1855class FSROOverlay(FSOverlayBase["FSROOverlay"]):
1856 __slots__ = (
1857 "_stat_cache",
1858 "_readlink_cache",
1859 "_children",
1860 "_stat_failed_cache",
1861 )
1863 def __init__(
1864 self,
1865 path: str,
1866 fs_path: str,
1867 parent: Optional["FSROOverlay"],
1868 ) -> None:
1869 super().__init__(path, fs_path, parent=parent)
1870 self._stat_cache: Optional[os.stat_result] = None
1871 self._readlink_cache: Optional[str] = None
1872 self._stat_failed_cache = False
1873 self._children: Optional[Mapping[str, FSROOverlay]] = None
1875 @classmethod
1876 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay":
1877 return FSROOverlay(path, fs_path, None)
1879 @property
1880 def iterdir(self) -> Iterable["FSROOverlay"]:
1881 if not self.is_dir:
1882 return
1883 if self._children is None:
1884 self._ensure_children_are_resolved()
1885 yield from assume_not_none(self._children).values()
1887 def lookup(self, path: str) -> Optional["FSROOverlay"]:
1888 if not self.is_dir:
1889 return None
1890 if self._children is None:
1891 self._ensure_children_are_resolved()
1893 absolute, _, path_parts = _split_path(path)
1894 current = cast("FSROOverlay", _root(self)) if absolute else self
1895 for no, dir_part in enumerate(path_parts):
1896 if dir_part == ".":
1897 continue
1898 if dir_part == "..":
1899 p = current.parent_dir
1900 if p is None:
1901 raise ValueError(f'The path "{path}" escapes the root dir')
1902 current = cast("FSROOverlay", p)
1903 continue
1904 try:
1905 current = current[dir_part]
1906 except KeyError:
1907 return None
1908 return current
1910 def _ensure_children_are_resolved(self) -> None:
1911 if not self.is_dir or self._children:
1912 return
1913 self._children = self._resolve_children(
1914 lambda n, fsp, p: FSROOverlay(n, fsp, p)
1915 )
1917 @property
1918 def is_detached(self) -> bool:
1919 return False
1921 def __getitem__(self, key) -> "VirtualPath":
1922 if not self.is_dir: 1922 ↛ 1924line 1922 didn't jump to line 1924 because the condition on line 1922 was always true
1923 raise KeyError(key)
1924 if self._children is None:
1925 self._ensure_children_are_resolved()
1926 if isinstance(key, FSPath):
1927 key = key.name
1928 return self._children[key]
1930 def __delitem__(self, key) -> None:
1931 self._error_ro_fs()
1933 @property
1934 def is_read_write(self) -> bool:
1935 return False
1937 def _rw_check(self) -> None:
1938 self._error_ro_fs()
1940 def _error_ro_fs(self) -> NoReturn:
1941 raise DebputyFSIsROError(
1942 f'Attempt to write to "{self.path}" failed:'
1943 " Debputy Virtual File system is R/O."
1944 )
1946 def stat(self) -> os.stat_result:
1947 if self._stat_failed_cache: 1947 ↛ 1948line 1947 didn't jump to line 1948 because the condition on line 1947 was never true
1948 raise FileNotFoundError(
1949 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
1950 )
1952 if self._stat_cache is None: 1952 ↛ 1958line 1952 didn't jump to line 1958 because the condition on line 1952 was always true
1953 try:
1954 self._stat_cache = os.lstat(self.fs_path)
1955 except FileNotFoundError:
1956 self._stat_failed_cache = True
1957 raise
1958 return self._stat_cache
1960 @property
1961 def mode(self) -> int:
1962 return stat.S_IMODE(self.stat().st_mode)
1964 @mode.setter
1965 def mode(self, _unused: int) -> None:
1966 self._error_ro_fs()
1968 @property
1969 def mtime(self) -> float:
1970 return self.stat().st_mtime
1972 @mtime.setter
1973 def mtime(self, new_mtime: float) -> None:
1974 self._error_ro_fs()
1976 def readlink(self) -> str:
1977 if not self.is_symlink:
1978 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1979 if self._readlink_cache is None:
1980 self._readlink_cache = os.readlink(self.fs_path)
1981 return self._readlink_cache
1983 def chown(
1984 self,
1985 owner: Optional[StaticFileSystemOwner],
1986 group: Optional[StaticFileSystemGroup],
1987 ) -> None:
1988 self._error_ro_fs()
1990 def mkdir(self, name: str) -> "VirtualPath":
1991 self._error_ro_fs()
1993 def add_file(
1994 self,
1995 name: str,
1996 *,
1997 unlink_if_exists: bool = True,
1998 use_fs_path_mode: bool = False,
1999 mode: int = 0o0644,
2000 mtime: Optional[float] = None,
2001 ) -> ContextManager["VirtualPath"]:
2002 self._error_ro_fs()
2004 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2005 self._error_ro_fs()
2007 def unlink(self, *, recursive: bool = False) -> None:
2008 self._error_ro_fs()
2011class FSROOverlayRootDir(FSROOverlay):
2012 __slots__ = ("_plugin_context",)
2014 def __init__(self, path: str, fs_path: str) -> None:
2015 super().__init__(path, fs_path, None)
2016 self._plugin_context = CurrentPluginContextManager("debputy")
2018 def _current_plugin(self) -> str:
2019 return self._plugin_context.current_plugin_name
2021 @contextlib.contextmanager
2022 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2023 with self._plugin_context.change_plugin_context(new_plugin) as r:
2024 yield r
2027class FSControlPath(FSOverlayBase["FSControlPath"]):
2029 @property
2030 def iterdir(self) -> Iterable["FSControlPath"]:
2031 if not self.is_dir:
2032 return
2033 yield from self._resolve_children(
2034 lambda n, fsp, p: FSControlPath(n, fsp, p)
2035 ).values()
2037 def lookup(self, path: str) -> Optional["FSControlPath"]:
2038 if not self.is_dir:
2039 return None
2041 absolute, _, path_parts = _split_path(path)
2042 current = cast("FSControlPath", _root(self)) if absolute else self
2043 for no, dir_part in enumerate(path_parts):
2044 if dir_part == ".":
2045 continue
2046 if dir_part == "..":
2047 p = current.parent_dir
2048 if p is None:
2049 raise ValueError(f'The path "{path}" escapes the root dir')
2050 current = cast("FSControlPath", p)
2051 continue
2052 try:
2053 current = current[dir_part]
2054 except KeyError:
2055 return None
2056 return current
2058 @property
2059 def is_detached(self) -> bool:
2060 try:
2061 self.stat()
2062 except FileNotFoundError:
2063 return True
2064 else:
2065 return False
2067 def __getitem__(self, key) -> "VirtualPath":
2068 if not self.is_dir:
2069 raise KeyError(key)
2070 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p))
2071 if isinstance(key, FSPath):
2072 key = key.name
2073 return children[key]
2075 def __delitem__(self, key) -> None:
2076 self[key].unlink()
2078 @property
2079 def is_read_write(self) -> bool:
2080 return True
2082 @property
2083 def mode(self) -> int:
2084 return stat.S_IMODE(self.stat().st_mode)
2086 @mode.setter
2087 def mode(self, new_mode: int) -> None:
2088 os.chmod(self.fs_path, new_mode)
2090 @property
2091 def mtime(self) -> float:
2092 return self.stat().st_mtime
2094 @mtime.setter
2095 def mtime(self, new_mtime: float) -> None:
2096 os.utime(self.fs_path, (new_mtime, new_mtime))
2098 def readlink(self) -> str:
2099 if not self.is_symlink:
2100 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2101 assert False
2103 def chown(
2104 self,
2105 owner: Optional[StaticFileSystemOwner],
2106 group: Optional[StaticFileSystemGroup],
2107 ) -> None:
2108 raise ValueError(
2109 "No need to chown paths in the control.tar: They are always root:root"
2110 )
2112 def mkdir(self, name: str) -> "VirtualPath":
2113 raise TypeError("The control.tar never contains subdirectories.")
2115 @contextlib.contextmanager
2116 def add_file(
2117 self,
2118 name: str,
2119 *,
2120 unlink_if_exists: bool = True,
2121 use_fs_path_mode: bool = False,
2122 mode: int = 0o0644,
2123 mtime: Optional[float] = None,
2124 ) -> ContextManager["VirtualPath"]:
2125 if "/" in name or name in {".", ".."}:
2126 raise ValueError(f'Invalid file name: "{name}"')
2127 if not self.is_dir:
2128 raise TypeError(
2129 f"Cannot create {self._orphan_safe_path()}/{name}:"
2130 f" {self._orphan_safe_path()} is not a directory"
2131 )
2132 self._rw_check()
2133 existing = self.get(name)
2134 if existing is not None:
2135 if not unlink_if_exists:
2136 raise ValueError(
2137 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2138 f" and exist_ok was False"
2139 )
2140 assert existing.is_file
2142 fs_path = os.path.join(self.fs_path, name)
2143 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2144 with open(fs_path, "wb") as fd:
2145 # Ensure that the fs_path exists and default mode is reasonable
2146 os.chmod(fd.fileno(), mode)
2147 child = FSControlPath(
2148 name,
2149 fs_path,
2150 self,
2151 )
2152 yield child
2153 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2154 child.mode = mode
2156 @contextlib.contextmanager
2157 def replace_fs_path_content(
2158 self,
2159 *,
2160 use_fs_path_mode: bool = False,
2161 ) -> ContextManager[str]:
2162 if not self.is_file:
2163 raise TypeError(
2164 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2165 )
2166 restore_mode = self.mode if use_fs_path_mode else None
2167 yield self.fs_path
2168 _check_fs_path_is_file(self.fs_path, self)
2169 if restore_mode is not None:
2170 self.mode = restore_mode
2172 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2173 raise TypeError("The control.tar never contains symlinks.")
2175 def unlink(self, *, recursive: bool = False) -> None:
2176 if self._parent is None:
2177 return
2178 # By virtue of the control FS only containing paths, we can assume `recursive` never
2179 # matters and that `os.unlink` will be sufficient.
2180 assert self.is_file
2181 os.unlink(self.fs_path)
2184class FSControlRootDir(FSControlPath):
2186 @classmethod
2187 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2188 return FSControlRootDir(".", fs_path, None)
2190 def insert_file_from_fs_path(
2191 self,
2192 name: str,
2193 fs_path: str,
2194 *,
2195 exist_ok: bool = True,
2196 use_fs_path_mode: bool = False,
2197 mode: int = 0o0644,
2198 # Ignored, but accepted for compat with FSPath's variant of this function.
2199 # - This is used by install_or_generate_conffiles.
2200 reference_path: Optional[VirtualPath] = None, # noqa
2201 ) -> "FSControlPath":
2202 if "/" in name or name in {".", ".."}:
2203 raise ValueError(f'Invalid file name: "{name}"')
2204 if not self.is_dir:
2205 raise TypeError(
2206 f"Cannot create {self._orphan_safe_path()}/{name}:"
2207 f" {self._orphan_safe_path()} is not a directory"
2208 )
2209 self._rw_check()
2210 if name in self and not exist_ok:
2211 raise ValueError(
2212 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2213 f" and exist_ok was False"
2214 )
2216 target_path = os.path.join(self.fs_path, name)
2217 if use_fs_path_mode:
2218 shutil.copymode(
2219 fs_path,
2220 target_path,
2221 follow_symlinks=True,
2222 )
2223 else:
2224 shutil.copyfile(
2225 fs_path,
2226 target_path,
2227 follow_symlinks=True,
2228 )
2229 os.chmod(target_path, mode)
2230 return cast("FSControlPath", self[name])
2233def as_path_def(pd: Union[str, PathDef]) -> PathDef:
2234 return PathDef(pd) if isinstance(pd, str) else pd
2237def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]:
2238 yield from (as_path_def(p) for p in paths)
2241def build_virtual_fs(
2242 paths: Iterable[Union[str, PathDef]],
2243 read_write_fs: bool = False,
2244) -> "FSPath":
2245 root_dir: Optional[FSRootDir] = None
2246 directories: Dict[str, FSPath] = {}
2247 non_directories = set()
2249 def _ensure_parent_dirs(p: str) -> None:
2250 current = p.rstrip("/")
2251 missing_dirs = []
2252 while True:
2253 current = os.path.dirname(current)
2254 if current in directories:
2255 break
2256 if current in non_directories: 2256 ↛ 2257line 2256 didn't jump to line 2257 because the condition on line 2256 was never true
2257 raise ValueError(
2258 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2259 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2260 )
2261 missing_dirs.append(current)
2262 for dir_path in reversed(missing_dirs):
2263 parent_dir = directories[os.path.dirname(dir_path)]
2264 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2265 directories[dir_path] = d
2267 for path_def in as_path_defs(paths):
2268 path = path_def.path_name
2269 if path in directories or path in non_directories: 2269 ↛ 2270line 2269 didn't jump to line 2270 because the condition on line 2269 was never true
2270 raise ValueError(
2271 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2272 ' "correct order" (ensure directories occur before their children)'
2273 )
2274 if root_dir is None:
2275 root_fs_path = None
2276 if path in (".", "./", "/"):
2277 root_fs_path = path_def.fs_path
2278 root_dir = FSRootDir(fs_path=root_fs_path)
2279 directories["."] = root_dir
2281 if path not in (".", "./", "/") and not path.startswith("./"):
2282 path = "./" + path
2283 if path not in (".", "./", "/"):
2284 _ensure_parent_dirs(path)
2285 if path in (".", "./"):
2286 assert "." in directories
2287 continue
2288 is_dir = False
2289 if path.endswith("/"):
2290 path = path[:-1]
2291 is_dir = True
2292 directory = directories[os.path.dirname(path)]
2293 assert not is_dir or not bool(
2294 path_def.link_target
2295 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2296 fs_path = VirtualTestPath(
2297 os.path.basename(path),
2298 directory,
2299 is_dir=is_dir,
2300 mode=path_def.mode,
2301 mtime=path_def.mtime,
2302 has_fs_path=path_def.has_fs_path,
2303 fs_path=path_def.fs_path,
2304 link_target=path_def.link_target,
2305 content=path_def.content,
2306 materialized_content=path_def.materialized_content,
2307 )
2308 assert not fs_path.is_detached
2309 if fs_path.is_dir:
2310 directories[fs_path.path] = fs_path
2311 else:
2312 non_directories.add(fs_path.path)
2314 if root_dir is None:
2315 root_dir = FSRootDir()
2317 root_dir.is_read_write = read_write_fs
2318 return root_dir