Coverage for src/debputy/filesystem_scan.py: 70%
1276 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import stat
9import subprocess
10import tempfile
11import time
12from abc import ABC
13from contextlib import suppress
14from typing import (
15 List,
16 Iterable,
17 Dict,
18 Optional,
19 Tuple,
20 Union,
21 Iterator,
22 Mapping,
23 cast,
24 Any,
25 ContextManager,
26 TextIO,
27 BinaryIO,
28 NoReturn,
29 Type,
30 Generic,
31 Callable,
32 TypeVar,
33 overload,
34 Literal,
35)
36from weakref import ref, ReferenceType
38from debputy.exceptions import (
39 PureVirtualPathError,
40 DebputyFSIsROError,
41 DebputyMetadataAccessError,
42 TestPathWithNonExistentFSPathError,
43 SymlinkLoopError,
44)
45from debputy.intermediate_manifest import PathType
46from debputy.manifest_parser.base_types import (
47 ROOT_DEFINITION,
48 StaticFileSystemOwner,
49 StaticFileSystemGroup,
50)
51from debputy.plugin.api.spec import (
52 VirtualPath,
53 PathDef,
54 PathMetadataReference,
55 PMT,
56)
57from debputy.types import VP
58from debputy.util import (
59 generated_content_dir,
60 _error,
61 escape_shell,
62 assume_not_none,
63 _normalize_path,
64 _debug_log,
65)
67BY_BASENAME = operator.attrgetter("name")
69FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
70FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
73BinaryOpenMode = Literal[
74 "rb",
75 "r+b",
76 "wb",
77 "w+b",
78 "xb",
79 "ab",
80]
81TextOpenMode = Literal[
82 "r",
83 "r+",
84 "rt",
85 "r+t",
86 "w",
87 "w+",
88 "wt",
89 "w+t",
90 "x",
91 "xt",
92 "a",
93 "at",
94]
95OpenMode = Union[BinaryOpenMode, TextOpenMode]
98class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
99 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
101 def __init__(
102 self,
103 owning_plugin: str,
104 current_plugin: str,
105 metadata_type: Type[PMT],
106 ) -> None:
107 self._owning_plugin = owning_plugin
108 self._current_plugin = current_plugin
109 self._metadata_type = metadata_type
111 @property
112 def is_present(self) -> bool:
113 return False
115 @property
116 def can_read(self) -> bool:
117 return self._owning_plugin == self._current_plugin
119 @property
120 def can_write(self) -> bool:
121 return False
123 @property
124 def value(self) -> Optional[PMT]:
125 if self.can_read: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true
126 return None
127 raise DebputyMetadataAccessError(
128 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
129 f" {self._owning_plugin} as the metadata has not been made"
130 f" readable to the plugin {self._current_plugin}."
131 )
133 @value.setter
134 def value(self, new_value: PMT) -> None:
135 if self._is_owner:
136 raise DebputyFSIsROError(
137 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
138 )
139 raise DebputyMetadataAccessError(
140 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
141 f" {self._owning_plugin} as the metadata has not been made"
142 f" read-write to the plugin {self._current_plugin}."
143 )
145 @property
146 def _is_owner(self) -> bool:
147 return self._owning_plugin == self._current_plugin
150@dataclasses.dataclass(slots=True)
151class PathMetadataValue(Generic[PMT]):
152 owning_plugin: str
153 metadata_type: Type[PMT]
154 value: Optional[PMT] = None
156 def can_read_value(self, current_plugin: str) -> bool:
157 return self.owning_plugin == current_plugin
159 def can_write_value(self, current_plugin: str) -> bool:
160 return self.owning_plugin == current_plugin
163class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
164 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
166 def __init__(
167 self,
168 owning_path: VirtualPath,
169 current_plugin: str,
170 path_metadata_value: PathMetadataValue[PMT],
171 ) -> None:
172 self._owning_path = owning_path
173 self._current_plugin = current_plugin
174 self._path_metadata_value = path_metadata_value
176 @property
177 def is_present(self) -> bool:
178 if not self.can_read: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 return False
180 return self._path_metadata_value.value is not None
182 @property
183 def can_read(self) -> bool:
184 return self._path_metadata_value.can_read_value(self._current_plugin)
186 @property
187 def can_write(self) -> bool:
188 if not self._path_metadata_value.can_write_value(self._current_plugin): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 return False
190 owning_path = self._owning_path
191 return owning_path.is_read_write and not owning_path.is_detached
193 @property
194 def value(self) -> Optional[PMT]:
195 if self.can_read: 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true
196 return self._path_metadata_value.value
197 raise DebputyMetadataAccessError(
198 f"Cannot read the metadata {self._metadata_type_name} owned by"
199 f" {self._owning_plugin} as the metadata has not been made"
200 f" readable to the plugin {self._current_plugin}."
201 )
203 @value.setter
204 def value(self, new_value: PMT) -> None:
205 if not self.can_write: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 m = "set" if new_value is not None else "delete"
207 raise DebputyMetadataAccessError(
208 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
209 f" {self._owning_plugin} as the metadata has not been made"
210 f" read-write to the plugin {self._current_plugin}."
211 )
212 owning_path = self._owning_path
213 if not owning_path.is_read_write: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 raise DebputyFSIsROError(
215 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
216 )
217 if owning_path.is_detached: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 raise TypeError(
219 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
220 )
221 self._path_metadata_value.value = new_value
223 @property
224 def _is_owner(self) -> bool:
225 return self._owning_plugin == self._current_plugin
227 @property
228 def _owning_plugin(self) -> str:
229 return self._path_metadata_value.owning_plugin
231 @property
232 def _metadata_type_name(self) -> str:
233 return self._path_metadata_value.metadata_type.__name__
236def _cp_a(source: str, dest: str) -> None:
237 cmd = ["cp", "-a", source, dest]
238 try:
239 subprocess.check_call(cmd)
240 except subprocess.CalledProcessError:
241 full_command = escape_shell(*cmd)
242 _error(
243 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
244 f" above to understand what went wrong. The full command was: {full_command}"
245 )
248def _split_path(path: str) -> Tuple[bool, bool, List[str]]:
249 must_be_dir = True if path.endswith("/") else False
250 absolute = False
251 if path.startswith("/"):
252 absolute = True
253 path = "." + path
254 path_parts = path.rstrip("/").split("/")
255 if must_be_dir:
256 path_parts.append(".")
257 return absolute, must_be_dir, path_parts
260def _root(path: VP) -> VP:
261 current = path
262 while True:
263 parent = current.parent_dir
264 if parent is None:
265 return current
266 current = parent
269def _check_fs_path_is_file(
270 fs_path: str,
271 unlink_on_error: Optional["VirtualPath"] = None,
272) -> None:
273 had_issue = False
274 try:
275 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
276 st = os.lstat(fs_path)
277 except FileNotFoundError:
278 had_issue = True
279 else:
280 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 had_issue = True
282 if not had_issue: 282 ↛ 285line 282 didn't jump to line 285 because the condition on line 282 was always true
283 return
285 if unlink_on_error:
286 with suppress(FileNotFoundError):
287 os.unlink(fs_path)
288 raise TypeError(
289 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
290 " linked to another file. The entry has been disconnected."
291 )
294class CurrentPluginContextManager:
295 __slots__ = ("_plugin_names",)
297 def __init__(self, initial_plugin_name: str) -> None:
298 self._plugin_names = [initial_plugin_name]
300 @property
301 def current_plugin_name(self) -> str:
302 return self._plugin_names[-1]
304 @contextlib.contextmanager
305 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
306 self._plugin_names.append(new_plugin_name)
307 yield new_plugin_name
308 self._plugin_names.pop()
311class VirtualPathBase(VirtualPath, ABC):
312 __slots__ = ()
314 def _orphan_safe_path(self) -> str:
315 return self.path
317 def _rw_check(self) -> None:
318 if not self.is_read_write:
319 raise DebputyFSIsROError(
320 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
321 " Debputy Virtual File system is R/O."
322 )
324 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
325 match, missing = self.attempt_lookup(path)
326 if missing:
327 return None
328 return match
330 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]:
331 if self.is_detached: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise ValueError(
333 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
334 )
335 absolute, must_be_dir, path_parts = _split_path(path)
336 current = _root(self) if absolute else self
337 path_parts.reverse()
338 link_expansions = set()
339 while path_parts:
340 dir_part = path_parts.pop()
341 if dir_part == ".":
342 continue
343 if dir_part == "..":
344 p = current.parent_dir
345 if p is None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 raise ValueError(f'The path "{path}" escapes the root dir')
347 current = p
348 continue
349 try:
350 current = current[dir_part]
351 except KeyError:
352 path_parts.append(dir_part)
353 path_parts.reverse()
354 if must_be_dir:
355 path_parts.pop()
356 return current, path_parts
357 if current.is_symlink and path_parts:
358 if current.path in link_expansions:
359 # This is our loop detection for now. It might have some false positives where you
360 # could safely resolve the same symlink twice. However, given that this use-case is
361 # basically non-existent in practice for packaging, we just stop here for now.
362 raise SymlinkLoopError(
363 f'The path "{path}" traversed the symlink "{current.path}" multiple'
364 " times. Currently, traversing the same symlink twice is considered"
365 " a loop by `debputy` even if the path would eventually resolve."
366 " Consider filing a feature request if you have a benign case that"
367 " triggers this error."
368 )
369 link_expansions.add(current.path)
370 link_target = current.readlink()
371 link_absolute, _, link_path_parts = _split_path(link_target)
372 if link_absolute:
373 current = _root(current)
374 else:
375 current = assume_not_none(current.parent_dir)
376 link_path_parts.reverse()
377 path_parts.extend(link_path_parts)
378 return current, []
380 def mkdirs(self, path: str) -> "VirtualPath":
381 current: VirtualPath
382 current, missing_parts = self.attempt_lookup(
383 f"{path}/" if not path.endswith("/") else path
384 )
385 if not current.is_dir: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 raise ValueError(
387 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
388 " a directory. However, that path exist AND is a not directory."
389 )
390 for missing_part in missing_parts:
391 assert missing_part not in (".", "..")
392 current = current.mkdir(missing_part)
393 return current
395 def prune_if_empty_dir(self) -> None:
396 """Remove this and all (now) empty parent directories
398 Same as: `rmdir --ignore-fail-on-non-empty --parents`
400 This operation may cause the path (and any of its parent directories) to become "detached"
401 and therefore unsafe to use in further operations.
402 """
403 self._rw_check()
405 if not self.is_dir: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
407 if any(self.iterdir):
408 return
409 parent_dir = assume_not_none(self.parent_dir)
411 # Recursive does not matter; we already know the directory is empty.
412 self.unlink()
414 # Note: The root dir must never be deleted. This works because when delegating it to the root
415 # directory, its implementation of this method is a no-op. If this is later rewritten to an
416 # inline loop (rather than recursion), be sure to preserve this feature.
417 parent_dir.prune_if_empty_dir()
419 def _current_plugin(self) -> str:
420 if self.is_detached: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 raise TypeError("Cannot resolve the current plugin; path is detached")
422 current = self
423 while True:
424 next_parent = current.parent_dir
425 if next_parent is None:
426 break
427 current = next_parent
428 assert current is not None
429 return cast("FSRootDir", current)._current_plugin()
431 @overload
432 def open_child( 432 ↛ exitline 432 didn't jump to the function exit
433 self,
434 name: str,
435 mode: TextOpenMode = ...,
436 buffering: int = -1,
437 ) -> TextIO: ...
439 @overload
440 def open_child( 440 ↛ exitline 440 didn't jump to the function exit
441 self,
442 name: str,
443 mode: BinaryOpenMode = ...,
444 buffering: int = -1,
445 ) -> BinaryIO: ...
447 @contextlib.contextmanager
448 def open_child(
449 self,
450 name: str,
451 mode: OpenMode = "r",
452 buffering: int = -1,
453 ) -> Union[TextIO, BinaryIO]:
454 """Open a child path of the current directory in a given mode. Usually used with a context manager
456 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
457 The following symbols are accepted with the same meaning as Python's open:
458 * `r`
459 * `w`
460 * `x`
461 * `a`
462 * `+`
463 * `b`
464 * `t`
466 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
467 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
468 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
469 as-is.
471 :param name: The name of the child path to open. Must be a basename.
472 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
473 examples.
474 :param buffering: Same as open(..., buffering=...) where supported. Notably during
475 testing, the content may be purely in memory and use a BytesIO/StringIO
476 (which does not accept that parameter, but then it is buffered in a different way)
477 :return: The file handle.
478 """
479 existing = self.get(name)
480 if "r" in mode: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 if existing is None:
482 raise ValueError(
483 f"Path {self.path}/{name} does not exist and mode had `r`"
484 )
485 if "+" not in mode:
486 with existing.open(byte_io="b" in mode, buffering=buffering) as fd:
487 yield fd
489 encoding = None if "b" in mode else "utf-8"
491 if existing: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 if "x" in mode:
493 raise ValueError(
494 f"Path {existing.path} already exists and mode had `x`"
495 )
496 with (
497 existing.replace_fs_path_content() as fs_path,
498 open(fs_path, mode, encoding=encoding) as fd,
499 ):
500 yield fd
501 else:
502 assert "r" not in mode
503 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
504 with (
505 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
506 open(new_path.fs_path, mode, encoding=encoding) as fd,
507 ):
508 yield fd
511class FSPath(VirtualPathBase, ABC):
512 __slots__ = (
513 "_basename",
514 "_parent_dir",
515 "_children",
516 "_path_cache",
517 "_parent_path_cache",
518 "_last_known_parent_path",
519 "_mode",
520 "_owner",
521 "_group",
522 "_mtime",
523 "_stat_cache",
524 "_metadata",
525 "__weakref__",
526 )
528 def __init__(
529 self,
530 basename: str,
531 parent: Optional["FSPath"],
532 children: Optional[Dict[str, "FSPath"]] = None,
533 initial_mode: Optional[int] = None,
534 mtime: Optional[float] = None,
535 stat_cache: Optional[os.stat_result] = None,
536 ) -> None:
537 self._basename = basename
538 self._path_cache: Optional[str] = None
539 self._parent_path_cache: Optional[str] = None
540 self._children = children
541 self._last_known_parent_path: Optional[str] = None
542 self._mode = initial_mode
543 self._mtime = mtime
544 self._stat_cache = stat_cache
545 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {}
546 self._owner = ROOT_DEFINITION
547 self._group = ROOT_DEFINITION
549 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
550 # is_orphaned, which assumes self._parent_dir is an attribute.
551 self._parent_dir: Optional[ReferenceType["FSPath"]] = None
552 if parent is not None:
553 self.parent_dir = parent
555 def __repr__(self) -> str:
556 return (
557 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
558 f" is_file={self.is_file},"
559 f" is_dir={self.is_dir},"
560 f" is_symlink={self.is_symlink},"
561 f" has_fs_path={self.has_fs_path},"
562 f" children_len={len(self._children) if self._children else 0})"
563 )
565 @property
566 def name(self) -> str:
567 return self._basename
569 @name.setter
570 def name(self, new_name: str) -> None:
571 self._rw_check()
572 if new_name == self._basename: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 return
574 if self.is_detached: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 self._basename = new_name
576 return
577 self._rw_check()
578 parent = self.parent_dir
579 # This little parent_dir dance ensures the parent dir detects the rename properly
580 self.parent_dir = None
581 self._basename = new_name
582 self.parent_dir = parent
584 @property
585 def iterdir(self) -> Iterable["FSPath"]:
586 if self._children is not None:
587 yield from self._children.values()
589 def all_paths(self) -> Iterable["FSPath"]:
590 yield self
591 if not self.is_dir:
592 return
593 by_basename = BY_BASENAME
594 stack = sorted(self.iterdir, key=by_basename, reverse=True)
595 while stack:
596 current = stack.pop()
597 yield current
598 if current.is_dir and not current.is_detached:
599 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True))
601 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]:
602 # FIXME: can this be more "os.walk"-like without making it harder to implement?
603 if not self.is_dir: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 yield self, []
605 return
606 by_basename = BY_BASENAME
607 stack = [self]
608 while stack:
609 current = stack.pop()
610 children = sorted(current.iterdir, key=by_basename)
611 assert not children or current.is_dir
612 yield current, children
613 # Removing the directory counts as discarding the children.
614 if not current.is_detached: 614 ↛ 608line 614 didn't jump to line 608 because the condition on line 614 was always true
615 stack.extend(reversed(children))
617 def _orphan_safe_path(self) -> str:
618 if not self.is_detached or self._last_known_parent_path is not None: 618 ↛ 620line 618 didn't jump to line 620 because the condition on line 618 was always true
619 return self.path
620 return f"<orphaned>/{self.name}"
622 @property
623 def is_detached(self) -> bool:
624 parent = self._parent_dir
625 if parent is None:
626 return True
627 resolved_parent = parent()
628 if resolved_parent is None: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 return True
630 return resolved_parent.is_detached
632 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
633 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
634 # behavior to avoid surprises for now.
635 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
636 # to using it)
637 __iter__ = None
639 def __getitem__(self, key) -> "FSPath":
640 if self._children is None:
641 raise KeyError(
642 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
643 )
644 if isinstance(key, FSPath): 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true
645 key = key.name
646 return self._children[key]
648 def __delitem__(self, key) -> None:
649 self._rw_check()
650 children = self._children
651 if children is None: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true
652 raise KeyError(key)
653 del children[key]
655 def get(self, key: str) -> "Optional[FSPath]":
656 try:
657 return self[key]
658 except KeyError:
659 return None
661 def __contains__(self, item: object) -> bool:
662 if isinstance(item, VirtualPath): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true
663 return item.parent_dir is self
664 if not isinstance(item, str): 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true
665 return False
666 m = self.get(item)
667 return m is not None
669 def _add_child(self, child: "FSPath") -> None:
670 self._rw_check()
671 if not self.is_dir: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true
672 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
673 if self._children is None:
674 self._children = {}
676 conflict_child = self.get(child.name)
677 if conflict_child is not None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 conflict_child.unlink(recursive=True)
679 self._children[child.name] = child
681 @property
682 def tar_path(self) -> str:
683 path = self.path
684 if self.is_dir:
685 return path + "/"
686 return path
688 @property
689 def path(self) -> str:
690 parent_path = self.parent_dir_path
691 if (
692 self._parent_path_cache is not None
693 and self._parent_path_cache == parent_path
694 ):
695 return assume_not_none(self._path_cache)
696 if parent_path is None: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true
697 raise ReferenceError(
698 f"The path {self.name} is detached! {self.__class__.__name__}"
699 )
700 self._parent_path_cache = parent_path
701 ret = os.path.join(parent_path, self.name)
702 self._path_cache = ret
703 return ret
705 @property
706 def parent_dir(self) -> Optional["FSPath"]:
707 p_ref = self._parent_dir
708 p = p_ref() if p_ref is not None else None
709 if p is None: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 raise ReferenceError(
711 f"The path {self.name} is detached! {self.__class__.__name__}"
712 )
713 return p
715 @parent_dir.setter
716 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
717 self._rw_check()
718 if new_parent is not None:
719 if not new_parent.is_dir: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true
720 raise ValueError(
721 f"The parent {new_parent._orphan_safe_path()} must be a directory"
722 )
723 new_parent._rw_check()
724 old_parent = None
725 self._last_known_parent_path = None
726 if not self.is_detached:
727 old_parent = self.parent_dir
728 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
729 del old_parent_children[self.name]
730 if new_parent is not None:
731 self._parent_dir = ref(new_parent)
732 new_parent._add_child(self)
733 else:
734 if old_parent is not None and not old_parent.is_detached: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true
735 self._last_known_parent_path = old_parent.path
736 self._parent_dir = None
737 self._parent_path_cache = None
739 @property
740 def parent_dir_path(self) -> Optional[str]:
741 if self.is_detached: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true
742 return self._last_known_parent_path
743 return assume_not_none(self.parent_dir).path
745 def chown(
746 self,
747 owner: Optional[StaticFileSystemOwner],
748 group: Optional[StaticFileSystemGroup],
749 ) -> None:
750 """Change the owner/group of this path
752 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
753 :param group: The desired group definition for this path. If None, then no change of group is performed.
754 """
755 self._rw_check()
757 if owner is not None:
758 self._owner = owner.ownership_definition
759 if group is not None:
760 self._group = group.ownership_definition
762 def stat(self) -> os.stat_result:
763 st = self._stat_cache
764 if st is None:
765 st = self._uncached_stat()
766 self._stat_cache = st
767 return st
769 def _uncached_stat(self) -> os.stat_result:
770 return os.lstat(self.fs_path)
772 @property
773 def mode(self) -> int:
774 current_mode = self._mode
775 if current_mode is None: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 current_mode = stat.S_IMODE(self.stat().st_mode)
777 self._mode = current_mode
778 return current_mode
780 @mode.setter
781 def mode(self, new_mode: int) -> None:
782 self._rw_check()
783 min_bit = 0o700 if self.is_dir else 0o400
784 if (new_mode & min_bit) != min_bit: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 omode = oct(new_mode)[2:]
786 omin = oct(min_bit)[2:]
787 raise ValueError(
788 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
789 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
790 " There are no paths that do not need these requirements met and they can cause"
791 " problems during build or on the final system."
792 )
793 self._mode = new_mode
795 def _ensure_min_mode(self) -> None:
796 min_bit = 0o700 if self.is_dir else 0o600
797 if self.has_fs_path and (self.mode & 0o600) != 0o600: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 try:
799 fs_path = self.fs_path
800 except TestPathWithNonExistentFSPathError:
801 pass
802 else:
803 st = os.stat(fs_path)
804 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
805 _debug_log(
806 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
807 )
808 os.chmod(fs_path, new_fs_mode)
809 self.mode |= min_bit
811 @property
812 def mtime(self) -> float:
813 mtime = self._mtime
814 if mtime is None:
815 mtime = self.stat().st_mtime
816 self._mtime = mtime
817 return mtime
819 @mtime.setter
820 def mtime(self, new_mtime: float) -> None:
821 self._rw_check()
822 self._mtime = new_mtime
824 @property
825 def tar_owner_info(self) -> Tuple[str, int, str, int]:
826 owner = self._owner
827 group = self._group
828 return (
829 owner.entity_name,
830 owner.entity_id,
831 group.entity_name,
832 group.entity_id,
833 )
835 @property
836 def _can_replace_inline(self) -> bool:
837 return False
839 @contextlib.contextmanager
840 def add_file(
841 self,
842 name: str,
843 *,
844 unlink_if_exists: bool = True,
845 use_fs_path_mode: bool = False,
846 mode: int = 0o0644,
847 mtime: Optional[float] = None,
848 # Special-case parameters that are not exposed in the API
849 fs_basename_matters: bool = False,
850 subdir_key: Optional[str] = None,
851 ) -> Iterator["FSPath"]:
852 if "/" in name or name in {".", ".."}: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 raise ValueError(f'Invalid file name: "{name}"')
854 if not self.is_dir: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true
855 raise TypeError(
856 f"Cannot create {self._orphan_safe_path()}/{name}:"
857 f" {self._orphan_safe_path()} is not a directory"
858 )
859 self._rw_check()
860 existing = self.get(name)
861 if existing is not None: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true
862 if not unlink_if_exists:
863 raise ValueError(
864 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
865 f" and exist_ok was False"
866 )
867 existing.unlink(recursive=False)
869 if fs_basename_matters and subdir_key is None: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true
870 raise ValueError(
871 "When fs_basename_matters is True, a subdir_key must be provided"
872 )
874 directory = generated_content_dir(subdir_key=subdir_key)
876 if fs_basename_matters: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 fs_path = os.path.join(directory, name)
878 with open(fs_path, "xb") as _:
879 # Ensure that the fs_path exists
880 pass
881 child = FSBackedFilePath(
882 name,
883 self,
884 fs_path,
885 replaceable_inline=True,
886 mtime=mtime,
887 )
888 yield child
889 else:
890 with tempfile.NamedTemporaryFile(
891 dir=directory, suffix=f"__{name}", delete=False
892 ) as fd:
893 fs_path = fd.name
894 child = FSBackedFilePath(
895 name,
896 self,
897 fs_path,
898 replaceable_inline=True,
899 mtime=mtime,
900 )
901 fd.close()
902 yield child
904 if use_fs_path_mode: 904 ↛ 906line 904 didn't jump to line 906 because the condition on line 904 was never true
905 # Ensure the caller can see the current mode
906 os.chmod(fs_path, mode)
907 _check_fs_path_is_file(fs_path, unlink_on_error=child)
908 child._reset_caches()
909 if not use_fs_path_mode: 909 ↛ exitline 909 didn't return from function 'add_file' because the condition on line 909 was always true
910 child.mode = mode
912 def insert_file_from_fs_path(
913 self,
914 name: str,
915 fs_path: str,
916 *,
917 exist_ok: bool = True,
918 use_fs_path_mode: bool = False,
919 mode: int = 0o0644,
920 require_copy_on_write: bool = True,
921 follow_symlinks: bool = True,
922 reference_path: Optional[VirtualPath] = None,
923 ) -> "FSPath":
924 if "/" in name or name in {".", ".."}: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true
925 raise ValueError(f'Invalid file name: "{name}"')
926 if not self.is_dir: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 raise TypeError(
928 f"Cannot create {self._orphan_safe_path()}/{name}:"
929 f" {self._orphan_safe_path()} is not a directory"
930 )
931 self._rw_check()
932 if name in self and not exist_ok: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true
933 raise ValueError(
934 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
935 f" and exist_ok was False"
936 )
937 new_fs_path = fs_path
938 if follow_symlinks:
939 if reference_path is not None: 939 ↛ 940line 939 didn't jump to line 940 because the condition on line 939 was never true
940 raise ValueError(
941 "The reference_path cannot be used with follow_symlinks"
942 )
943 new_fs_path = os.path.realpath(new_fs_path, strict=True)
945 fmode: Optional[int] = mode
946 if use_fs_path_mode:
947 fmode = None
949 st = None
950 if reference_path is None:
951 st = os.lstat(new_fs_path)
952 if stat.S_ISDIR(st.st_mode): 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true
953 raise ValueError(
954 f'The provided path "{fs_path}" is a directory. However, this'
955 " method does not support directories"
956 )
958 if not stat.S_ISREG(st.st_mode): 958 ↛ 959line 958 didn't jump to line 959 because the condition on line 958 was never true
959 if follow_symlinks:
960 raise ValueError(
961 f"The resolved fs_path ({new_fs_path}) was not a file."
962 )
963 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
964 return FSBackedFilePath(
965 name,
966 self,
967 new_fs_path,
968 initial_mode=fmode,
969 stat_cache=st,
970 replaceable_inline=not require_copy_on_write,
971 reference_path=reference_path,
972 )
974 def add_symlink(
975 self,
976 link_name: str,
977 link_target: str,
978 *,
979 reference_path: Optional[VirtualPath] = None,
980 ) -> "FSPath":
981 if "/" in link_name or link_name in {".", ".."}: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true
982 raise ValueError(
983 f'Invalid file name: "{link_name}" (it must be a valid basename)'
984 )
985 if not self.is_dir: 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true
986 raise TypeError(
987 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
988 f" {self._orphan_safe_path()} is not a directory"
989 )
990 self._rw_check()
992 existing = self.get(link_name)
993 if existing: 993 ↛ 995line 993 didn't jump to line 995 because the condition on line 993 was never true
994 # Emulate ln -sf with attempts a non-recursive unlink first.
995 existing.unlink(recursive=False)
997 return SymlinkVirtualPath(
998 link_name,
999 self,
1000 link_target,
1001 reference_path=reference_path,
1002 )
1004 def mkdir(
1005 self,
1006 name: str,
1007 *,
1008 reference_path: Optional[VirtualPath] = None,
1009 ) -> "FSPath":
1010 if "/" in name or name in {".", ".."}: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 raise ValueError(
1012 f'Invalid file name: "{name}" (it must be a valid basename)'
1013 )
1014 if not self.is_dir: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true
1015 raise TypeError(
1016 f"Cannot create {self._orphan_safe_path()}/{name}:"
1017 f" {self._orphan_safe_path()} is not a directory"
1018 )
1019 if reference_path is not None and not reference_path.is_dir: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true
1020 raise ValueError(
1021 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
1022 )
1023 self._rw_check()
1025 existing = self.get(name)
1026 if existing: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true
1027 raise ValueError(f"Path {existing.path} already exist")
1028 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1030 def mkdirs(self, path: str) -> "FSPath":
1031 return cast("FSPath", super().mkdirs(path))
1033 @property
1034 def is_read_write(self) -> bool:
1035 """When true, the file system entry may be mutated
1037 :return: Whether file system mutations are permitted.
1038 """
1039 if self.is_detached:
1040 return True
1041 return assume_not_none(self.parent_dir).is_read_write
1043 def unlink(self, *, recursive: bool = False) -> None:
1044 """Unlink a file or a directory
1046 This operation will detach the path from the file system (causing "is_detached" to return True).
1048 Note that the root directory cannot be deleted.
1050 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1051 as well. When False, an error is raised if the path is a non-empty directory
1052 """
1053 if self.is_detached: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true
1054 return
1055 if not recursive and any(self.iterdir): 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true
1056 raise ValueError(
1057 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1058 )
1059 # The .parent_dir setter does a _rw_check() for us.
1060 self.parent_dir = None
1062 def _reset_caches(self) -> None:
1063 self._mtime = None
1064 self._stat_cache = None
1066 def metadata(
1067 self,
1068 metadata_type: Type[PMT],
1069 *,
1070 owning_plugin: Optional[str] = None,
1071 ) -> PathMetadataReference[PMT]:
1072 current_plugin = self._current_plugin()
1073 if owning_plugin is None: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true
1074 owning_plugin = current_plugin
1075 metadata_key = (owning_plugin, metadata_type)
1076 metadata_value = self._metadata.get(metadata_key)
1077 if metadata_value is None:
1078 if self.is_detached: 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true
1079 raise TypeError(
1080 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1081 )
1082 if not self.is_read_write:
1083 return AlwaysEmptyReadOnlyMetadataReference(
1084 owning_plugin,
1085 current_plugin,
1086 metadata_type,
1087 )
1088 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1089 self._metadata[metadata_key] = metadata_value
1090 return PathMetadataReferenceImplementation(
1091 self,
1092 current_plugin,
1093 metadata_value,
1094 )
1096 @contextlib.contextmanager
1097 def replace_fs_path_content(
1098 self,
1099 *,
1100 use_fs_path_mode: bool = False,
1101 ) -> Iterator[str]:
1102 if not self.is_file: 1102 ↛ 1103line 1102 didn't jump to line 1103 because the condition on line 1102 was never true
1103 raise TypeError(
1104 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1105 )
1106 self._rw_check()
1107 fs_path = self.fs_path
1108 if not self._can_replace_inline: 1108 ↛ 1120line 1108 didn't jump to line 1120 because the condition on line 1108 was always true
1109 fs_path = self.fs_path
1110 directory = generated_content_dir()
1111 with tempfile.NamedTemporaryFile(
1112 dir=directory, suffix=f"__{self.name}", delete=False
1113 ) as new_path_fd:
1114 new_path_fd.close()
1115 _cp_a(fs_path, new_path_fd.name)
1116 fs_path = new_path_fd.name
1117 self._replaced_path(fs_path)
1118 assert self.fs_path == fs_path
1120 current_mtime = self._mtime
1121 if current_mtime is not None:
1122 os.utime(fs_path, (current_mtime, current_mtime))
1124 current_mode = self.mode
1125 yield fs_path
1126 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1127 if not use_fs_path_mode: 1127 ↛ 1129line 1127 didn't jump to line 1129 because the condition on line 1127 was always true
1128 os.chmod(fs_path, current_mode)
1129 self._reset_caches()
1131 def _replaced_path(self, new_fs_path: str) -> None:
1132 raise NotImplementedError
1135class VirtualFSPathBase(FSPath, ABC):
1136 __slots__ = ()
1138 def __init__(
1139 self,
1140 basename: str,
1141 parent: Optional["FSPath"],
1142 children: Optional[Dict[str, "FSPath"]] = None,
1143 initial_mode: Optional[int] = None,
1144 mtime: Optional[float] = None,
1145 stat_cache: Optional[os.stat_result] = None,
1146 ) -> None:
1147 super().__init__(
1148 basename,
1149 parent,
1150 children,
1151 initial_mode=initial_mode,
1152 mtime=mtime,
1153 stat_cache=stat_cache,
1154 )
1156 @property
1157 def mtime(self) -> float:
1158 mtime = self._mtime
1159 if mtime is None:
1160 mtime = time.time()
1161 self._mtime = mtime
1162 return mtime
1164 @property
1165 def has_fs_path(self) -> bool:
1166 return False
1168 def stat(self) -> os.stat_result:
1169 if not self.has_fs_path:
1170 raise PureVirtualPathError(
1171 "stat() is only applicable to paths backed by the file system. The path"
1172 f" {self._orphan_safe_path()!r} is purely virtual"
1173 )
1174 return super().stat()
1176 @property
1177 def fs_path(self) -> str:
1178 if not self.has_fs_path:
1179 raise PureVirtualPathError(
1180 "fs_path is only applicable to paths backed by the file system. The path"
1181 f" {self._orphan_safe_path()!r} is purely virtual"
1182 )
1183 return self.fs_path
1186class FSRootDir(FSPath):
1187 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1189 def __init__(self, fs_path: Optional[str] = None) -> None:
1190 self._fs_path = fs_path
1191 self._fs_read_write = True
1192 super().__init__(
1193 ".",
1194 None,
1195 children={},
1196 initial_mode=0o755,
1197 )
1198 self._plugin_context = CurrentPluginContextManager("debputy")
1200 @property
1201 def is_detached(self) -> bool:
1202 return False
1204 def _orphan_safe_path(self) -> str:
1205 return self.name
1207 @property
1208 def path(self) -> str:
1209 return self.name
1211 @property
1212 def parent_dir(self) -> Optional["FSPath"]:
1213 return None
1215 @parent_dir.setter
1216 def parent_dir(self, new_parent: Optional[FSPath]) -> None:
1217 if new_parent is not None:
1218 raise ValueError("The root directory cannot become a non-root directory")
1220 @property
1221 def parent_dir_path(self) -> Optional[str]:
1222 return None
1224 @property
1225 def is_dir(self) -> bool:
1226 return True
1228 @property
1229 def is_file(self) -> bool:
1230 return False
1232 @property
1233 def is_symlink(self) -> bool:
1234 return False
1236 def readlink(self) -> str:
1237 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1239 @property
1240 def has_fs_path(self) -> bool:
1241 return self._fs_path is not None
1243 def stat(self) -> os.stat_result:
1244 if not self.has_fs_path:
1245 raise PureVirtualPathError(
1246 "stat() is only applicable to paths backed by the file system. The path"
1247 f" {self._orphan_safe_path()!r} is purely virtual"
1248 )
1249 return os.stat(self.fs_path)
1251 @property
1252 def fs_path(self) -> str:
1253 if not self.has_fs_path: 1253 ↛ 1254line 1253 didn't jump to line 1254 because the condition on line 1253 was never true
1254 raise PureVirtualPathError(
1255 "fs_path is only applicable to paths backed by the file system. The path"
1256 f" {self._orphan_safe_path()!r} is purely virtual"
1257 )
1258 return assume_not_none(self._fs_path)
1260 @property
1261 def is_read_write(self) -> bool:
1262 return self._fs_read_write
1264 @is_read_write.setter
1265 def is_read_write(self, new_value: bool) -> None:
1266 self._fs_read_write = new_value
1268 def prune_if_empty_dir(self) -> None:
1269 # No-op for the root directory. There is never a case where you want to delete this directory
1270 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1271 return
1273 def unlink(self, *, recursive: bool = False) -> None:
1274 # There is never a case where you want to delete this directory (and even if you could,
1275 # debputy will need it for technical reasons, so the root dir stays)
1276 raise TypeError("Cannot delete the root directory")
1278 def _current_plugin(self) -> str:
1279 return self._plugin_context.current_plugin_name
1281 @contextlib.contextmanager
1282 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1283 with self._plugin_context.change_plugin_context(new_plugin) as r:
1284 yield r
1287class VirtualPathWithReference(VirtualFSPathBase, ABC):
1288 __slots__ = ("_reference_path",)
1290 def __init__(
1291 self,
1292 basename: str,
1293 parent: FSPath,
1294 *,
1295 default_mode: int,
1296 reference_path: Optional[VirtualPath] = None,
1297 ) -> None:
1298 super().__init__(
1299 basename,
1300 parent=parent,
1301 initial_mode=reference_path.mode if reference_path else default_mode,
1302 )
1303 self._reference_path = reference_path
1305 @property
1306 def has_fs_path(self) -> bool:
1307 ref_path = self._reference_path
1308 return ref_path is not None and ref_path.has_fs_path
1310 @property
1311 def mtime(self) -> float:
1312 mtime = self._mtime
1313 if mtime is None: 1313 ↛ 1320line 1313 didn't jump to line 1320 because the condition on line 1313 was always true
1314 ref_path = self._reference_path
1315 if ref_path: 1315 ↛ 1318line 1315 didn't jump to line 1318 because the condition on line 1315 was always true
1316 mtime = ref_path.mtime
1317 else:
1318 mtime = super().mtime
1319 self._mtime = mtime
1320 return mtime
1322 @mtime.setter
1323 def mtime(self, new_mtime: float) -> None:
1324 self._rw_check()
1325 self._mtime = new_mtime
1327 @property
1328 def fs_path(self) -> str:
1329 ref_path = self._reference_path
1330 if ref_path is not None and ( 1330 ↛ 1334line 1330 didn't jump to line 1334 because the condition on line 1330 was always true
1331 not super().has_fs_path or super().fs_path == ref_path.fs_path
1332 ):
1333 return ref_path.fs_path
1334 return super().fs_path
1336 def stat(self) -> os.stat_result:
1337 ref_path = self._reference_path
1338 if ref_path is not None and (
1339 not super().has_fs_path or super().fs_path == ref_path.fs_path
1340 ):
1341 return ref_path.stat()
1342 return super().stat()
1344 def open(
1345 self,
1346 *,
1347 byte_io: bool = False,
1348 buffering: int = -1,
1349 ) -> Union[TextIO, BinaryIO]:
1350 reference_path = self._reference_path
1351 if reference_path is not None and reference_path.fs_path == self.fs_path:
1352 return reference_path.open(byte_io=byte_io, buffering=buffering)
1353 return super().open(byte_io=byte_io, buffering=buffering)
1356class VirtualDirectoryFSPath(VirtualPathWithReference):
1357 __slots__ = ("_reference_path",)
1359 def __init__(
1360 self,
1361 basename: str,
1362 parent: FSPath,
1363 *,
1364 reference_path: Optional[VirtualPath] = None,
1365 ) -> None:
1366 super().__init__(
1367 basename,
1368 parent,
1369 reference_path=reference_path,
1370 default_mode=0o755,
1371 )
1372 self._reference_path = reference_path
1373 assert reference_path is None or reference_path.is_dir
1374 self._ensure_min_mode()
1376 @property
1377 def is_dir(self) -> bool:
1378 return True
1380 @property
1381 def is_file(self) -> bool:
1382 return False
1384 @property
1385 def is_symlink(self) -> bool:
1386 return False
1388 def readlink(self) -> str:
1389 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1392class SymlinkVirtualPath(VirtualPathWithReference):
1393 __slots__ = ("_link_target",)
1395 def __init__(
1396 self,
1397 basename: str,
1398 parent_dir: FSPath,
1399 link_target: str,
1400 *,
1401 reference_path: Optional[VirtualPath] = None,
1402 ) -> None:
1403 super().__init__(
1404 basename,
1405 parent=parent_dir,
1406 default_mode=_SYMLINK_MODE,
1407 reference_path=reference_path,
1408 )
1409 self._link_target = link_target
1411 @property
1412 def is_dir(self) -> bool:
1413 return False
1415 @property
1416 def is_file(self) -> bool:
1417 return False
1419 @property
1420 def is_symlink(self) -> bool:
1421 return True
1423 def readlink(self) -> str:
1424 return self._link_target
1427class FSBackedFilePath(VirtualPathWithReference):
1428 __slots__ = ("_fs_path", "_replaceable_inline")
1430 def __init__(
1431 self,
1432 basename: str,
1433 parent_dir: FSPath,
1434 fs_path: str,
1435 *,
1436 replaceable_inline: bool = False,
1437 initial_mode: Optional[int] = None,
1438 mtime: Optional[float] = None,
1439 stat_cache: Optional[os.stat_result] = None,
1440 reference_path: Optional[VirtualPath] = None,
1441 ) -> None:
1442 super().__init__(
1443 basename,
1444 parent_dir,
1445 default_mode=0o644,
1446 reference_path=reference_path,
1447 )
1448 self._fs_path = fs_path
1449 self._replaceable_inline = replaceable_inline
1450 if initial_mode is not None:
1451 self.mode = initial_mode
1452 if mtime is not None:
1453 self._mtime = mtime
1454 self._stat_cache = stat_cache
1455 assert (
1456 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1457 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1458 self._ensure_min_mode()
1460 @property
1461 def is_dir(self) -> bool:
1462 return False
1464 @property
1465 def is_file(self) -> bool:
1466 return True
1468 @property
1469 def is_symlink(self) -> bool:
1470 return False
1472 def readlink(self) -> str:
1473 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1475 @property
1476 def has_fs_path(self) -> bool:
1477 return True
1479 @property
1480 def fs_path(self) -> str:
1481 return self._fs_path
1483 @property
1484 def _can_replace_inline(self) -> bool:
1485 return self._replaceable_inline
1487 def _replaced_path(self, new_fs_path: str) -> None:
1488 self._fs_path = new_fs_path
1489 self._reference_path = None
1490 self._replaceable_inline = True
1493_SYMLINK_MODE = 0o777
1496class VirtualTestPath(FSPath):
1497 __slots__ = (
1498 "_path_type",
1499 "_has_fs_path",
1500 "_fs_path",
1501 "_link_target",
1502 "_content",
1503 "_materialized_content",
1504 )
1506 def __init__(
1507 self,
1508 basename: str,
1509 parent_dir: Optional[FSPath],
1510 mode: Optional[int] = None,
1511 mtime: Optional[float] = None,
1512 is_dir: bool = False,
1513 has_fs_path: Optional[bool] = False,
1514 fs_path: Optional[str] = None,
1515 link_target: Optional[str] = None,
1516 content: Optional[str] = None,
1517 materialized_content: Optional[str] = None,
1518 ) -> None:
1519 if is_dir:
1520 self._path_type = PathType.DIRECTORY
1521 elif link_target is not None:
1522 self._path_type = PathType.SYMLINK
1523 if mode is not None and mode != _SYMLINK_MODE: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true
1524 raise ValueError(
1525 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1526 )
1527 assert mode is None or mode == _SYMLINK_MODE
1528 else:
1529 self._path_type = PathType.FILE
1531 if mode is not None:
1532 initial_mode = mode
1533 else:
1534 initial_mode = 0o755 if is_dir else 0o644
1536 self._link_target = link_target
1537 if has_fs_path is None:
1538 has_fs_path = bool(fs_path)
1539 self._has_fs_path = has_fs_path
1540 self._fs_path = fs_path
1541 self._materialized_content = materialized_content
1542 super().__init__(
1543 basename,
1544 parent=parent_dir,
1545 initial_mode=initial_mode,
1546 mtime=mtime,
1547 )
1548 self._content = content
1550 @property
1551 def is_dir(self) -> bool:
1552 return self._path_type == PathType.DIRECTORY
1554 @property
1555 def is_file(self) -> bool:
1556 return self._path_type == PathType.FILE
1558 @property
1559 def is_symlink(self) -> bool:
1560 return self._path_type == PathType.SYMLINK
1562 def readlink(self) -> str:
1563 if not self.is_symlink: 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true
1564 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1565 link_target = self._link_target
1566 assert link_target is not None
1567 return link_target
1569 @property
1570 def mtime(self) -> float:
1571 if self._mtime is None:
1572 self._mtime = time.time()
1573 return self._mtime
1575 @mtime.setter
1576 def mtime(self, new_mtime: float) -> None:
1577 self._rw_check()
1578 self._mtime = new_mtime
1580 @property
1581 def has_fs_path(self) -> bool:
1582 return self._has_fs_path
1584 def stat(self) -> os.stat_result:
1585 if self.has_fs_path:
1586 path = self.fs_path
1587 if path is None: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true
1588 raise PureVirtualPathError(
1589 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1590 " cannot provide!"
1591 )
1592 try:
1593 return os.stat(path)
1594 except FileNotFoundError as e:
1595 raise PureVirtualPathError(
1596 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1597 " cannot provide! (An fs_path was provided, but it did not exist)"
1598 ) from e
1600 raise PureVirtualPathError(
1601 "stat() is only applicable to paths backed by the file system. The path"
1602 f" {self._orphan_safe_path()!r} is purely virtual"
1603 )
1605 @property
1606 def size(self) -> int:
1607 if self._content is not None:
1608 return len(self._content.encode("utf-8"))
1609 if self.is_symlink:
1610 return len(self.readlink())
1611 if not self.has_fs_path or self.fs_path is None:
1612 return 0
1613 return self.stat().st_size
1615 @property
1616 def fs_path(self) -> str:
1617 if self.has_fs_path:
1618 if self._fs_path is None and self._materialized_content is not None:
1619 with tempfile.NamedTemporaryFile(
1620 mode="w+t",
1621 encoding="utf-8",
1622 suffix=f"__{self.name}",
1623 delete=False,
1624 ) as fd:
1625 filepath = fd.name
1626 fd.write(self._materialized_content)
1627 self._fs_path = filepath
1628 atexit.register(lambda: os.unlink(filepath)) 1628 ↛ exitline 1628 didn't run the lambda on line 1628
1630 path = self._fs_path
1631 if path is None: 1631 ↛ 1632line 1631 didn't jump to line 1632 because the condition on line 1631 was never true
1632 raise PureVirtualPathError(
1633 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1634 " mock path cannot provide!"
1635 )
1636 return path
1637 raise PureVirtualPathError(
1638 "fs_path is only applicable to paths backed by the file system. The path"
1639 f" {self._orphan_safe_path()!r} is purely virtual"
1640 )
1642 def replace_fs_path_content(
1643 self,
1644 *,
1645 use_fs_path_mode: bool = False,
1646 ) -> ContextManager[str]:
1647 if self._content is not None: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true
1648 raise TypeError(
1649 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1650 " created with `content` but for this method to work, the path should have been"
1651 " created with `materialized_content`"
1652 )
1653 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1655 @contextlib.contextmanager
1656 def open_child(
1657 self,
1658 name: str,
1659 mode: OpenMode = "r",
1660 buffering: int = -1,
1661 ) -> Union[TextIO, BinaryIO]:
1662 existing = self.get(name)
1663 if existing or "r" in mode:
1664 with super().open_child(name, mode, buffering=buffering) as fd:
1665 yield fd
1666 return
1667 if "b" in mode:
1668 fd = io.BytesIO(b"")
1669 yield fd
1670 content = fd.getvalue().decode("utf-8")
1671 else:
1672 fd = io.StringIO("")
1673 yield fd
1674 content = fd.getvalue()
1675 VirtualTestPath(
1676 name,
1677 self,
1678 mode=0o644,
1679 content=content,
1680 has_fs_path=True,
1681 )
1683 def open(
1684 self,
1685 *,
1686 byte_io: bool = False,
1687 buffering: int = -1,
1688 ) -> Union[TextIO, BinaryIO]:
1689 if self._content is None:
1690 try:
1691 return super().open(byte_io=byte_io, buffering=buffering)
1692 except FileNotFoundError as e:
1693 raise TestPathWithNonExistentFSPathError(
1694 "The test path {self.path} had an fs_path {self._fs_path}, which does not"
1695 " exist. This exception can only occur in the testsuite. Either have the"
1696 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1697 " if that is too painful in general, have the code accept this error as a "
1698 " test only-case and provide a default."
1699 ) from e
1701 if byte_io:
1702 return io.BytesIO(self._content.encode("utf-8"))
1703 return io.StringIO(self._content)
1705 def _replaced_path(self, new_fs_path: str) -> None:
1706 self._fs_path = new_fs_path
1709class FSOverlayBase(VirtualPathBase, Generic[FSP]):
1710 __slots__ = (
1711 "_path",
1712 "_fs_path",
1713 "_parent",
1714 "__weakref__",
1715 )
1717 def __init__(
1718 self,
1719 path: str,
1720 fs_path: str,
1721 parent: Optional[FSP],
1722 ) -> None:
1723 self._path: str = path
1724 prefix = "/" if fs_path.startswith("/") else ""
1725 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1726 self._parent: Optional[ReferenceType[FSP]] = (
1727 ref(parent) if parent is not None else None
1728 )
1730 @property
1731 def name(self) -> str:
1732 return os.path.basename(self._path)
1734 @property
1735 def path(self) -> str:
1736 return self._path
1738 @property
1739 def parent_dir(self) -> Optional["FSP"]:
1740 parent = self._parent
1741 if parent is None:
1742 return None
1743 resolved = parent()
1744 if resolved is None:
1745 raise RuntimeError("Parent was garbage collected!")
1746 return resolved
1748 @property
1749 def fs_path(self) -> str:
1750 return self._fs_path
1752 def stat(self) -> os.stat_result:
1753 return os.lstat(self.fs_path)
1755 @property
1756 def is_dir(self) -> bool:
1757 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1758 try:
1759 return stat.S_ISDIR(self.stat().st_mode)
1760 except FileNotFoundError: 1760 ↛ 1762line 1760 didn't jump to line 1762
1761 return False
1762 except NotImplementedError:
1763 print(self.__class__)
1765 @property
1766 def is_file(self) -> bool:
1767 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1768 try:
1769 return stat.S_ISREG(self.stat().st_mode)
1770 except FileNotFoundError:
1771 return False
1773 @property
1774 def is_symlink(self) -> bool:
1775 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1776 try:
1777 return stat.S_ISLNK(self.stat().st_mode)
1778 except FileNotFoundError:
1779 return False
1781 @property
1782 def has_fs_path(self) -> bool:
1783 return True
1785 def open(
1786 self,
1787 *,
1788 byte_io: bool = False,
1789 buffering: int = -1,
1790 ) -> Union[TextIO, BinaryIO]:
1791 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1792 # case.
1793 if not self.is_file and not self.is_symlink:
1794 raise TypeError(
1795 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1796 )
1798 if byte_io:
1799 return open(self.fs_path, "rb", buffering=buffering)
1800 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering)
1802 def metadata(
1803 self,
1804 metadata_type: Type[PMT],
1805 *,
1806 owning_plugin: Optional[str] = None,
1807 ) -> PathMetadataReference[PMT]:
1808 current_plugin = self._current_plugin()
1809 if owning_plugin is None:
1810 owning_plugin = current_plugin
1811 return AlwaysEmptyReadOnlyMetadataReference(
1812 owning_plugin,
1813 current_plugin,
1814 metadata_type,
1815 )
1817 def all_paths(self) -> Iterable["FSControlPath"]:
1818 yield self
1819 if not self.is_dir:
1820 return
1821 stack = list(self.iterdir)
1822 stack.reverse()
1823 while stack:
1824 current = stack.pop()
1825 yield current
1826 if current.is_dir:
1827 stack.extend(reversed(list(current.iterdir)))
1829 def _resolve_children(
1830 self, new_child: Callable[[str, str, FSP], FSC]
1831 ) -> Mapping[str, FSC]:
1832 if not self.is_dir:
1833 return {}
1834 dir_path = self.path
1835 dir_fs_path = self.fs_path
1836 children = {}
1837 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1838 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1839 child_fs_path = (
1840 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1841 )
1842 children[name] = new_child(
1843 child_path,
1844 child_fs_path,
1845 self,
1846 )
1847 return children
1850class FSROOverlay(FSOverlayBase["FSROOverlay"]):
1851 __slots__ = (
1852 "_stat_cache",
1853 "_readlink_cache",
1854 "_children",
1855 "_stat_failed_cache",
1856 )
1858 def __init__(
1859 self,
1860 path: str,
1861 fs_path: str,
1862 parent: Optional["FSROOverlay"],
1863 ) -> None:
1864 super().__init__(path, fs_path, parent=parent)
1865 self._stat_cache: Optional[os.stat_result] = None
1866 self._readlink_cache: Optional[str] = None
1867 self._stat_failed_cache = False
1868 self._children: Optional[Mapping[str, FSROOverlay]] = None
1870 @classmethod
1871 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay":
1872 return FSROOverlay(path, fs_path, None)
1874 @property
1875 def iterdir(self) -> Iterable["FSROOverlay"]:
1876 if not self.is_dir:
1877 return
1878 if self._children is None:
1879 self._ensure_children_are_resolved()
1880 yield from assume_not_none(self._children).values()
1882 def lookup(self, path: str) -> Optional["FSROOverlay"]:
1883 if not self.is_dir:
1884 return None
1885 if self._children is None:
1886 self._ensure_children_are_resolved()
1888 absolute, _, path_parts = _split_path(path)
1889 current = cast("FSROOverlay", _root(self)) if absolute else self
1890 for no, dir_part in enumerate(path_parts):
1891 if dir_part == ".":
1892 continue
1893 if dir_part == "..":
1894 p = current.parent_dir
1895 if p is None:
1896 raise ValueError(f'The path "{path}" escapes the root dir')
1897 current = cast("FSROOverlay", p)
1898 continue
1899 try:
1900 current = current[dir_part]
1901 except KeyError:
1902 return None
1903 return current
1905 def _ensure_children_are_resolved(self) -> None:
1906 if not self.is_dir or self._children:
1907 return
1908 self._children = self._resolve_children(
1909 lambda n, fsp, p: FSROOverlay(n, fsp, p)
1910 )
1912 @property
1913 def is_detached(self) -> bool:
1914 return False
1916 def __getitem__(self, key) -> "VirtualPath":
1917 if not self.is_dir: 1917 ↛ 1919line 1917 didn't jump to line 1919 because the condition on line 1917 was always true
1918 raise KeyError(key)
1919 if self._children is None:
1920 self._ensure_children_are_resolved()
1921 if isinstance(key, FSPath):
1922 key = key.name
1923 return self._children[key]
1925 def __delitem__(self, key) -> None:
1926 self._error_ro_fs()
1928 @property
1929 def is_read_write(self) -> bool:
1930 return False
1932 def _rw_check(self) -> None:
1933 self._error_ro_fs()
1935 def _error_ro_fs(self) -> NoReturn:
1936 raise DebputyFSIsROError(
1937 f'Attempt to write to "{self.path}" failed:'
1938 " Debputy Virtual File system is R/O."
1939 )
1941 def stat(self) -> os.stat_result:
1942 if self._stat_failed_cache: 1942 ↛ 1943line 1942 didn't jump to line 1943 because the condition on line 1942 was never true
1943 raise FileNotFoundError(
1944 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
1945 )
1947 if self._stat_cache is None: 1947 ↛ 1953line 1947 didn't jump to line 1953 because the condition on line 1947 was always true
1948 try:
1949 self._stat_cache = os.lstat(self.fs_path)
1950 except FileNotFoundError:
1951 self._stat_failed_cache = True
1952 raise
1953 return self._stat_cache
1955 @property
1956 def mode(self) -> int:
1957 return stat.S_IMODE(self.stat().st_mode)
1959 @mode.setter
1960 def mode(self, _unused: int) -> None:
1961 self._error_ro_fs()
1963 @property
1964 def mtime(self) -> float:
1965 return self.stat().st_mtime
1967 @mtime.setter
1968 def mtime(self, new_mtime: float) -> None:
1969 self._error_ro_fs()
1971 def readlink(self) -> str:
1972 if not self.is_symlink:
1973 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1974 if self._readlink_cache is None:
1975 self._readlink_cache = os.readlink(self.fs_path)
1976 return self._readlink_cache
1978 def chown(
1979 self,
1980 owner: Optional[StaticFileSystemOwner],
1981 group: Optional[StaticFileSystemGroup],
1982 ) -> None:
1983 self._error_ro_fs()
1985 def mkdir(self, name: str) -> "VirtualPath":
1986 self._error_ro_fs()
1988 def add_file(
1989 self,
1990 name: str,
1991 *,
1992 unlink_if_exists: bool = True,
1993 use_fs_path_mode: bool = False,
1994 mode: int = 0o0644,
1995 mtime: Optional[float] = None,
1996 ) -> ContextManager["VirtualPath"]:
1997 self._error_ro_fs()
1999 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2000 self._error_ro_fs()
2002 def unlink(self, *, recursive: bool = False) -> None:
2003 self._error_ro_fs()
2006class FSROOverlayRootDir(FSROOverlay):
2007 __slots__ = ("_plugin_context",)
2009 def __init__(self, path: str, fs_path: str) -> None:
2010 super().__init__(path, fs_path, None)
2011 self._plugin_context = CurrentPluginContextManager("debputy")
2013 def _current_plugin(self) -> str:
2014 return self._plugin_context.current_plugin_name
2016 @contextlib.contextmanager
2017 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2018 with self._plugin_context.change_plugin_context(new_plugin) as r:
2019 yield r
2022class FSControlPath(FSOverlayBase["FSControlPath"]):
2024 @property
2025 def iterdir(self) -> Iterable["FSControlPath"]:
2026 if not self.is_dir:
2027 return
2028 yield from self._resolve_children(
2029 lambda n, fsp, p: FSControlPath(n, fsp, p)
2030 ).values()
2032 def lookup(self, path: str) -> Optional["FSControlPath"]:
2033 if not self.is_dir:
2034 return None
2036 absolute, _, path_parts = _split_path(path)
2037 current = cast("FSControlPath", _root(self)) if absolute else self
2038 for no, dir_part in enumerate(path_parts):
2039 if dir_part == ".":
2040 continue
2041 if dir_part == "..":
2042 p = current.parent_dir
2043 if p is None:
2044 raise ValueError(f'The path "{path}" escapes the root dir')
2045 current = cast("FSControlPath", p)
2046 continue
2047 try:
2048 current = current[dir_part]
2049 except KeyError:
2050 return None
2051 return current
2053 @property
2054 def is_detached(self) -> bool:
2055 try:
2056 self.stat()
2057 except FileNotFoundError:
2058 return True
2059 else:
2060 return False
2062 def __getitem__(self, key) -> "VirtualPath":
2063 if not self.is_dir:
2064 raise KeyError(key)
2065 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p))
2066 if isinstance(key, FSPath):
2067 key = key.name
2068 return children[key]
2070 def __delitem__(self, key) -> None:
2071 self[key].unlink()
2073 @property
2074 def is_read_write(self) -> bool:
2075 return True
2077 @property
2078 def mode(self) -> int:
2079 return stat.S_IMODE(self.stat().st_mode)
2081 @mode.setter
2082 def mode(self, new_mode: int) -> None:
2083 os.chmod(self.fs_path, new_mode)
2085 @property
2086 def mtime(self) -> float:
2087 return self.stat().st_mtime
2089 @mtime.setter
2090 def mtime(self, new_mtime: float) -> None:
2091 os.utime(self.fs_path, (new_mtime, new_mtime))
2093 def readlink(self) -> str:
2094 if not self.is_symlink:
2095 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2096 assert False
2098 def chown(
2099 self,
2100 owner: Optional[StaticFileSystemOwner],
2101 group: Optional[StaticFileSystemGroup],
2102 ) -> None:
2103 raise ValueError(
2104 "No need to chown paths in the control.tar: They are always root:root"
2105 )
2107 def mkdir(self, name: str) -> "VirtualPath":
2108 raise TypeError("The control.tar never contains subdirectories.")
2110 @contextlib.contextmanager
2111 def add_file(
2112 self,
2113 name: str,
2114 *,
2115 unlink_if_exists: bool = True,
2116 use_fs_path_mode: bool = False,
2117 mode: int = 0o0644,
2118 mtime: Optional[float] = None,
2119 ) -> ContextManager["VirtualPath"]:
2120 if "/" in name or name in {".", ".."}:
2121 raise ValueError(f'Invalid file name: "{name}"')
2122 if not self.is_dir:
2123 raise TypeError(
2124 f"Cannot create {self._orphan_safe_path()}/{name}:"
2125 f" {self._orphan_safe_path()} is not a directory"
2126 )
2127 self._rw_check()
2128 existing = self.get(name)
2129 if existing is not None:
2130 if not unlink_if_exists:
2131 raise ValueError(
2132 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2133 f" and exist_ok was False"
2134 )
2135 assert existing.is_file
2137 fs_path = os.path.join(self.fs_path, name)
2138 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2139 with open(fs_path, "wb") as fd:
2140 # Ensure that the fs_path exists and default mode is reasonable
2141 os.chmod(fd.fileno(), mode)
2142 child = FSControlPath(
2143 name,
2144 fs_path,
2145 self,
2146 )
2147 yield child
2148 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2149 child.mode = mode
2151 @contextlib.contextmanager
2152 def replace_fs_path_content(
2153 self,
2154 *,
2155 use_fs_path_mode: bool = False,
2156 ) -> ContextManager[str]:
2157 if not self.is_file:
2158 raise TypeError(
2159 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2160 )
2161 restore_mode = self.mode if use_fs_path_mode else None
2162 yield self.fs_path
2163 _check_fs_path_is_file(self.fs_path, self)
2164 if restore_mode is not None:
2165 self.mode = restore_mode
2167 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath":
2168 raise TypeError("The control.tar never contains symlinks.")
2170 def unlink(self, *, recursive: bool = False) -> None:
2171 if self._parent is None:
2172 return
2173 # By virtue of the control FS only containing paths, we can assume `recursive` never
2174 # matters and that `os.unlink` will be sufficient.
2175 assert self.is_file
2176 os.unlink(self.fs_path)
2179class FSControlRootDir(FSControlPath):
2181 @classmethod
2182 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2183 return FSControlRootDir(".", fs_path, None)
2186def as_path_def(pd: Union[str, PathDef]) -> PathDef:
2187 return PathDef(pd) if isinstance(pd, str) else pd
2190def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]:
2191 yield from (as_path_def(p) for p in paths)
2194def build_virtual_fs(
2195 paths: Iterable[Union[str, PathDef]],
2196 read_write_fs: bool = False,
2197) -> "FSPath":
2198 root_dir: Optional[FSRootDir] = None
2199 directories: Dict[str, FSPath] = {}
2200 non_directories = set()
2202 def _ensure_parent_dirs(p: str) -> None:
2203 current = p.rstrip("/")
2204 missing_dirs = []
2205 while True:
2206 current = os.path.dirname(current)
2207 if current in directories:
2208 break
2209 if current in non_directories: 2209 ↛ 2210line 2209 didn't jump to line 2210 because the condition on line 2209 was never true
2210 raise ValueError(
2211 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2212 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2213 )
2214 missing_dirs.append(current)
2215 for dir_path in reversed(missing_dirs):
2216 parent_dir = directories[os.path.dirname(dir_path)]
2217 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2218 directories[dir_path] = d
2220 for path_def in as_path_defs(paths):
2221 path = path_def.path_name
2222 if path in directories or path in non_directories: 2222 ↛ 2223line 2222 didn't jump to line 2223 because the condition on line 2222 was never true
2223 raise ValueError(
2224 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2225 ' "correct order" (ensure directories occur before their children)'
2226 )
2227 if root_dir is None:
2228 root_fs_path = None
2229 if path in (".", "./", "/"):
2230 root_fs_path = path_def.fs_path
2231 root_dir = FSRootDir(fs_path=root_fs_path)
2232 directories["."] = root_dir
2234 if path not in (".", "./", "/") and not path.startswith("./"):
2235 path = "./" + path
2236 if path not in (".", "./", "/"):
2237 _ensure_parent_dirs(path)
2238 if path in (".", "./"):
2239 assert "." in directories
2240 continue
2241 is_dir = False
2242 if path.endswith("/"):
2243 path = path[:-1]
2244 is_dir = True
2245 directory = directories[os.path.dirname(path)]
2246 assert not is_dir or not bool(
2247 path_def.link_target
2248 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2249 fs_path = VirtualTestPath(
2250 os.path.basename(path),
2251 directory,
2252 is_dir=is_dir,
2253 mode=path_def.mode,
2254 mtime=path_def.mtime,
2255 has_fs_path=path_def.has_fs_path,
2256 fs_path=path_def.fs_path,
2257 link_target=path_def.link_target,
2258 content=path_def.content,
2259 materialized_content=path_def.materialized_content,
2260 )
2261 assert not fs_path.is_detached
2262 if fs_path.is_dir:
2263 directories[fs_path.path] = fs_path
2264 else:
2265 non_directories.add(fs_path.path)
2267 if root_dir is None:
2268 root_dir = FSRootDir()
2270 root_dir.is_read_write = read_write_fs
2271 return root_dir