Coverage for src/debputy/filesystem_scan.py: 66%
1295 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-19 09:24 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-19 09:24 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import shutil
9import stat
10import subprocess
11import tempfile
12import time
13from abc import ABC
14from contextlib import suppress
15from typing import (
16 Optional,
17 cast,
18 Any,
19 ContextManager,
20 TextIO,
21 BinaryIO,
22 Generic,
23 TypeVar,
24 overload,
25 Literal,
26 Never,
27)
28from collections.abc import Iterable, Iterator, Mapping, Callable
29from weakref import ref, ReferenceType
31from debputy.exceptions import (
32 PureVirtualPathError,
33 DebputyFSIsROError,
34 DebputyMetadataAccessError,
35 TestPathWithNonExistentFSPathError,
36 SymlinkLoopError,
37)
38from debputy.intermediate_manifest import PathType
39from debputy.manifest_parser.base_types import (
40 ROOT_DEFINITION,
41 StaticFileSystemOwner,
42 StaticFileSystemGroup,
43)
44from debputy.plugin.api.spec import (
45 VirtualPath,
46 PathDef,
47 PathMetadataReference,
48 PMT,
49)
50from debputy.types import VP
51from debputy.util import (
52 generated_content_dir,
53 _error,
54 escape_shell,
55 assume_not_none,
56 _normalize_path,
57 _debug_log,
58)
60BY_BASENAME = operator.attrgetter("name")
62FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
63FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
66BinaryOpenMode = Literal[
67 "rb",
68 "r+b",
69 "wb",
70 "w+b",
71 "xb",
72 "ab",
73]
74TextOpenMode = Literal[
75 "r",
76 "r+",
77 "rt",
78 "r+t",
79 "w",
80 "w+",
81 "wt",
82 "w+t",
83 "x",
84 "xt",
85 "a",
86 "at",
87]
88OpenMode = Literal[BinaryOpenMode, TextOpenMode]
91class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
92 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
94 def __init__(
95 self,
96 owning_plugin: str,
97 current_plugin: str,
98 metadata_type: type[PMT],
99 ) -> None:
100 self._owning_plugin = owning_plugin
101 self._current_plugin = current_plugin
102 self._metadata_type = metadata_type
104 @property
105 def is_present(self) -> bool:
106 return False
108 @property
109 def can_read(self) -> bool:
110 return self._owning_plugin == self._current_plugin
112 @property
113 def can_write(self) -> bool:
114 return False
116 @property
117 def value(self) -> PMT | None:
118 if self.can_read: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true
119 return None
120 raise DebputyMetadataAccessError(
121 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
122 f" {self._owning_plugin} as the metadata has not been made"
123 f" readable to the plugin {self._current_plugin}."
124 )
126 @value.setter
127 def value(self, new_value: PMT) -> None:
128 if self._is_owner:
129 raise DebputyFSIsROError(
130 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
131 )
132 raise DebputyMetadataAccessError(
133 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
134 f" {self._owning_plugin} as the metadata has not been made"
135 f" read-write to the plugin {self._current_plugin}."
136 )
138 @property
139 def _is_owner(self) -> bool:
140 return self._owning_plugin == self._current_plugin
143@dataclasses.dataclass(slots=True)
144class PathMetadataValue(Generic[PMT]):
145 owning_plugin: str
146 metadata_type: type[PMT]
147 value: PMT | None = None
149 def can_read_value(self, current_plugin: str) -> bool:
150 return self.owning_plugin == current_plugin
152 def can_write_value(self, current_plugin: str) -> bool:
153 return self.owning_plugin == current_plugin
156class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
157 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
159 def __init__(
160 self,
161 owning_path: VirtualPath,
162 current_plugin: str,
163 path_metadata_value: PathMetadataValue[PMT],
164 ) -> None:
165 self._owning_path = owning_path
166 self._current_plugin = current_plugin
167 self._path_metadata_value = path_metadata_value
169 @property
170 def is_present(self) -> bool:
171 if not self.can_read: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 return False
173 return self._path_metadata_value.value is not None
175 @property
176 def can_read(self) -> bool:
177 return self._path_metadata_value.can_read_value(self._current_plugin)
179 @property
180 def can_write(self) -> bool:
181 if not self._path_metadata_value.can_write_value(self._current_plugin): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 return False
183 owning_path = self._owning_path
184 return owning_path.is_read_write and not owning_path.is_detached
186 @property
187 def value(self) -> PMT | None:
188 if self.can_read: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true
189 return self._path_metadata_value.value
190 raise DebputyMetadataAccessError(
191 f"Cannot read the metadata {self._metadata_type_name} owned by"
192 f" {self._owning_plugin} as the metadata has not been made"
193 f" readable to the plugin {self._current_plugin}."
194 )
196 @value.setter
197 def value(self, new_value: PMT) -> None:
198 if not self.can_write: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 m = "set" if new_value is not None else "delete"
200 raise DebputyMetadataAccessError(
201 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
202 f" {self._owning_plugin} as the metadata has not been made"
203 f" read-write to the plugin {self._current_plugin}."
204 )
205 owning_path = self._owning_path
206 if not owning_path.is_read_write: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 raise DebputyFSIsROError(
208 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
209 )
210 if owning_path.is_detached: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 raise TypeError(
212 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
213 )
214 self._path_metadata_value.value = new_value
216 @property
217 def _is_owner(self) -> bool:
218 return self._owning_plugin == self._current_plugin
220 @property
221 def _owning_plugin(self) -> str:
222 return self._path_metadata_value.owning_plugin
224 @property
225 def _metadata_type_name(self) -> str:
226 return self._path_metadata_value.metadata_type.__name__
229def _cp_a(source: str, dest: str) -> None:
230 cmd = ["cp", "-a", source, dest]
231 try:
232 subprocess.check_call(cmd)
233 except subprocess.CalledProcessError:
234 full_command = escape_shell(*cmd)
235 _error(
236 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
237 f" above to understand what went wrong. The full command was: {full_command}"
238 )
241def _split_path(path: str) -> tuple[bool, bool, list[str]]:
242 must_be_dir = True if path.endswith("/") else False
243 absolute = False
244 if path.startswith("/"):
245 absolute = True
246 path = "." + path
247 path_parts = path.rstrip("/").split("/")
248 if must_be_dir:
249 path_parts.append(".")
250 return absolute, must_be_dir, path_parts
253def _root(path: VP) -> VP:
254 current = path
255 while True:
256 parent = current.parent_dir
257 if parent is None:
258 return current
259 current = parent
262def _check_fs_path_is_file(
263 fs_path: str,
264 unlink_on_error: Optional["VirtualPath"] = None,
265) -> None:
266 had_issue = False
267 try:
268 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
269 st = os.lstat(fs_path)
270 except FileNotFoundError:
271 had_issue = True
272 else:
273 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 had_issue = True
275 if not had_issue: 275 ↛ 278line 275 didn't jump to line 278 because the condition on line 275 was always true
276 return
278 if unlink_on_error:
279 with suppress(FileNotFoundError):
280 os.unlink(fs_path)
281 raise TypeError(
282 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
283 " linked to another file. The entry has been disconnected."
284 )
287class CurrentPluginContextManager:
288 __slots__ = ("_plugin_names",)
290 def __init__(self, initial_plugin_name: str) -> None:
291 self._plugin_names = [initial_plugin_name]
293 @property
294 def current_plugin_name(self) -> str:
295 return self._plugin_names[-1]
297 @contextlib.contextmanager
298 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
299 self._plugin_names.append(new_plugin_name)
300 yield new_plugin_name
301 self._plugin_names.pop()
304class VirtualPathBase(VirtualPath, ABC):
305 __slots__ = ()
307 def _orphan_safe_path(self) -> str:
308 return self.path
310 def _rw_check(self) -> None:
311 if not self.is_read_write:
312 raise DebputyFSIsROError(
313 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
314 " Debputy Virtual File system is R/O."
315 )
317 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
318 match, missing = self.attempt_lookup(path)
319 if missing:
320 return None
321 return match
323 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]:
324 if self.is_detached: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 raise ValueError(
326 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
327 )
328 absolute, must_be_dir, path_parts = _split_path(path)
329 current = _root(self) if absolute else self
330 path_parts.reverse()
331 link_expansions = set()
332 while path_parts:
333 dir_part = path_parts.pop()
334 if dir_part == ".":
335 continue
336 if dir_part == "..":
337 p = current.parent_dir
338 if p is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 raise ValueError(f'The path "{path}" escapes the root dir')
340 current = p
341 continue
342 try:
343 current = current[dir_part]
344 except KeyError:
345 path_parts.append(dir_part)
346 path_parts.reverse()
347 if must_be_dir:
348 path_parts.pop()
349 return current, path_parts
350 if current.is_symlink and path_parts:
351 if current.path in link_expansions:
352 # This is our loop detection for now. It might have some false positives where you
353 # could safely resolve the same symlink twice. However, given that this use-case is
354 # basically non-existent in practice for packaging, we just stop here for now.
355 raise SymlinkLoopError(
356 f'The path "{path}" traversed the symlink "{current.path}" multiple'
357 " times. Currently, traversing the same symlink twice is considered"
358 " a loop by `debputy` even if the path would eventually resolve."
359 " Consider filing a feature request if you have a benign case that"
360 " triggers this error."
361 )
362 link_expansions.add(current.path)
363 link_target = current.readlink()
364 link_absolute, _, link_path_parts = _split_path(link_target)
365 if link_absolute:
366 current = _root(current)
367 else:
368 current = assume_not_none(current.parent_dir)
369 link_path_parts.reverse()
370 path_parts.extend(link_path_parts)
371 return current, []
373 def mkdirs(self, path: str) -> "VirtualPath":
374 current: VirtualPath
375 current, missing_parts = self.attempt_lookup(
376 f"{path}/" if not path.endswith("/") else path
377 )
378 if not current.is_dir: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true
379 raise ValueError(
380 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
381 " a directory. However, that path exist AND is a not directory."
382 )
383 for missing_part in missing_parts:
384 assert missing_part not in (".", "..")
385 current = current.mkdir(missing_part)
386 return current
388 def prune_if_empty_dir(self) -> None:
389 """Remove this and all (now) empty parent directories
391 Same as: `rmdir --ignore-fail-on-non-empty --parents`
393 This operation may cause the path (and any of its parent directories) to become "detached"
394 and therefore unsafe to use in further operations.
395 """
396 self._rw_check()
398 if not self.is_dir: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
400 if any(self.iterdir):
401 return
402 parent_dir = assume_not_none(self.parent_dir)
404 # Recursive does not matter; we already know the directory is empty.
405 self.unlink()
407 # Note: The root dir must never be deleted. This works because when delegating it to the root
408 # directory, its implementation of this method is a no-op. If this is later rewritten to an
409 # inline loop (rather than recursion), be sure to preserve this feature.
410 parent_dir.prune_if_empty_dir()
412 def _current_plugin(self) -> str:
413 if self.is_detached: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 raise TypeError("Cannot resolve the current plugin; path is detached")
415 current = self
416 while True:
417 next_parent = current.parent_dir
418 if next_parent is None:
419 break
420 current = next_parent
421 assert current is not None
422 return cast("FSRootDir", current)._current_plugin()
424 @overload
425 def open_child( 425 ↛ exitline 425 didn't return from function 'open_child' because
426 self,
427 name: str,
428 mode: TextOpenMode = ...,
429 buffering: int = -1,
430 ) -> TextIO: ...
432 @overload
433 def open_child( 433 ↛ exitline 433 didn't return from function 'open_child' because
434 self,
435 name: str,
436 mode: BinaryOpenMode = ...,
437 buffering: int = -1,
438 ) -> BinaryIO: ...
440 @overload
441 def open_child( 441 ↛ exitline 441 didn't return from function 'open_child' because
442 self,
443 name: str,
444 mode: OpenMode = ...,
445 buffering: int = -1,
446 ) -> TextIO | BinaryIO: ...
448 @contextlib.contextmanager
449 def open_child(
450 self,
451 name: str,
452 mode: OpenMode = "r",
453 buffering: int = -1,
454 ) -> TextIO | BinaryIO:
455 """Open a child path of the current directory in a given mode. Usually used with a context manager
457 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
458 The following symbols are accepted with the same meaning as Python's open:
459 * `r`
460 * `w`
461 * `x`
462 * `a`
463 * `+`
464 * `b`
465 * `t`
467 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
468 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
469 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
470 as-is.
472 :param name: The name of the child path to open. Must be a basename.
473 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
474 examples.
475 :param buffering: Same as open(..., buffering=...) where supported. Notably during
476 testing, the content may be purely in memory and use a BytesIO/StringIO
477 (which does not accept that parameter, but then it is buffered in a different way)
478 :return: The file handle.
479 """
480 existing = self.get(name)
481 if "r" in mode: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 if existing is None:
483 raise ValueError(
484 f"Path {self.path}/{name} does not exist and mode had `r`"
485 )
486 if "+" not in mode:
487 with existing.open(byte_io="b" in mode, buffering=buffering) as fd:
488 yield fd
490 encoding = None if "b" in mode else "utf-8"
492 if existing: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 if "x" in mode:
494 raise ValueError(
495 f"Path {existing.path} already exists and mode had `x`"
496 )
497 with (
498 existing.replace_fs_path_content() as fs_path,
499 open(fs_path, mode, encoding=encoding) as fd,
500 ):
501 yield fd
502 else:
503 assert "r" not in mode
504 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
505 with (
506 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
507 open(new_path.fs_path, mode, encoding=encoding) as fd,
508 ):
509 yield fd
512class FSPath(VirtualPathBase, ABC):
513 __slots__ = (
514 "_basename",
515 "_parent_dir",
516 "_children",
517 "_path_cache",
518 "_parent_path_cache",
519 "_last_known_parent_path",
520 "_mode",
521 "_owner",
522 "_group",
523 "_mtime",
524 "_stat_cache",
525 "_metadata",
526 "__weakref__",
527 )
529 def __init__(
530 self,
531 basename: str,
532 parent: Optional["FSPath"],
533 children: dict[str, "FSPath"] | None = None,
534 initial_mode: int | None = None,
535 mtime: float | None = None,
536 stat_cache: os.stat_result | None = None,
537 ) -> None:
538 self._basename = basename
539 self._path_cache: str | None = None
540 self._parent_path_cache: str | None = None
541 self._children = children
542 self._last_known_parent_path: str | None = None
543 self._mode = initial_mode
544 self._mtime = mtime
545 self._stat_cache = stat_cache
546 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {}
547 self._owner = ROOT_DEFINITION
548 self._group = ROOT_DEFINITION
550 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
551 # is_orphaned, which assumes self._parent_dir is an attribute.
552 self._parent_dir: ReferenceType["FSPath"] | None = None
553 if parent is not None:
554 self.parent_dir = parent
556 def __repr__(self) -> str:
557 return (
558 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
559 f" is_file={self.is_file},"
560 f" is_dir={self.is_dir},"
561 f" is_symlink={self.is_symlink},"
562 f" has_fs_path={self.has_fs_path},"
563 f" children_len={len(self._children) if self._children else 0})"
564 )
566 @property
567 def name(self) -> str:
568 return self._basename
570 @name.setter
571 def name(self, new_name: str) -> None:
572 self._rw_check()
573 if new_name == self._basename: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true
574 return
575 if self.is_detached: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 self._basename = new_name
577 return
578 self._rw_check()
579 parent = self.parent_dir
580 # This little parent_dir dance ensures the parent dir detects the rename properly
581 self.parent_dir = None
582 self._basename = new_name
583 self.parent_dir = parent
585 @property
586 def iterdir(self) -> Iterable["FSPath"]:
587 if self._children is not None:
588 yield from self._children.values()
590 def all_paths(self) -> Iterable["FSPath"]:
591 yield self
592 if not self.is_dir:
593 return
594 by_basename = BY_BASENAME
595 stack = sorted(self.iterdir, key=by_basename, reverse=True)
596 while stack:
597 current = stack.pop()
598 yield current
599 if current.is_dir and not current.is_detached:
600 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True))
602 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]:
603 # FIXME: can this be more "os.walk"-like without making it harder to implement?
604 if not self.is_dir: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 yield self, []
606 return
607 by_basename = BY_BASENAME
608 stack = [self]
609 while stack:
610 current = stack.pop()
611 children = sorted(current.iterdir, key=by_basename)
612 assert not children or current.is_dir
613 yield current, children
614 # Removing the directory counts as discarding the children.
615 if not current.is_detached: 615 ↛ 609line 615 didn't jump to line 609 because the condition on line 615 was always true
616 stack.extend(reversed(children))
618 def _orphan_safe_path(self) -> str:
619 if not self.is_detached or self._last_known_parent_path is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true
620 return self.path
621 return f"<orphaned>/{self.name}"
623 @property
624 def is_detached(self) -> bool:
625 parent = self._parent_dir
626 if parent is None:
627 return True
628 resolved_parent = parent()
629 if resolved_parent is None: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true
630 return True
631 return resolved_parent.is_detached
633 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
634 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
635 # behavior to avoid surprises for now.
636 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
637 # to using it)
638 __iter__ = None
640 def __getitem__(self, key) -> "FSPath":
641 if self._children is None:
642 raise KeyError(
643 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
644 )
645 if isinstance(key, FSPath): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 key = key.name
647 return self._children[key]
649 def __delitem__(self, key) -> None:
650 self._rw_check()
651 children = self._children
652 if children is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true
653 raise KeyError(key)
654 del children[key]
656 def get(self, key: str) -> "Optional[FSPath]":
657 try:
658 return self[key]
659 except KeyError:
660 return None
662 def __contains__(self, item: object) -> bool:
663 if isinstance(item, VirtualPath): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true
664 return item.parent_dir is self
665 if not isinstance(item, str): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 return False
667 m = self.get(item)
668 return m is not None
670 def _add_child(self, child: "FSPath") -> None:
671 self._rw_check()
672 if not self.is_dir: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
674 if self._children is None:
675 self._children = {}
677 conflict_child = self.get(child.name)
678 if conflict_child is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 conflict_child.unlink(recursive=True)
680 self._children[child.name] = child
682 @property
683 def tar_path(self) -> str:
684 path = self.path
685 if self.is_dir:
686 return path + "/"
687 return path
689 @property
690 def path(self) -> str:
691 parent_path = self.parent_dir_path
692 if (
693 self._parent_path_cache is not None
694 and self._parent_path_cache == parent_path
695 ):
696 return assume_not_none(self._path_cache)
697 if parent_path is None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 raise ReferenceError(
699 f"The path {self.name} is detached! {self.__class__.__name__}"
700 )
701 self._parent_path_cache = parent_path
702 ret = os.path.join(parent_path, self.name)
703 self._path_cache = ret
704 return ret
706 @property
707 def parent_dir(self) -> Optional["FSPath"]:
708 p_ref = self._parent_dir
709 p = p_ref() if p_ref is not None else None
710 if p is None: 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true
711 raise ReferenceError(
712 f"The path {self.name} is detached! {self.__class__.__name__}"
713 )
714 return p
716 @parent_dir.setter
717 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
718 self._rw_check()
719 if new_parent is not None:
720 if not new_parent.is_dir: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 raise ValueError(
722 f"The parent {new_parent._orphan_safe_path()} must be a directory"
723 )
724 new_parent._rw_check()
725 old_parent = None
726 self._last_known_parent_path = None
727 if not self.is_detached:
728 old_parent = self.parent_dir
729 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
730 del old_parent_children[self.name]
731 if new_parent is not None:
732 self._parent_dir = ref(new_parent)
733 new_parent._add_child(self)
734 else:
735 if old_parent is not None and not old_parent.is_detached: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true
736 self._last_known_parent_path = old_parent.path
737 self._parent_dir = None
738 self._parent_path_cache = None
740 @property
741 def parent_dir_path(self) -> str | None:
742 if self.is_detached: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true
743 return self._last_known_parent_path
744 return assume_not_none(self.parent_dir).path
746 def chown(
747 self,
748 owner: StaticFileSystemOwner | None,
749 group: StaticFileSystemGroup | None,
750 ) -> None:
751 """Change the owner/group of this path
753 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
754 :param group: The desired group definition for this path. If None, then no change of group is performed.
755 """
756 self._rw_check()
758 if owner is not None:
759 self._owner = owner.ownership_definition
760 if group is not None:
761 self._group = group.ownership_definition
763 def stat(self) -> os.stat_result:
764 st = self._stat_cache
765 if st is None:
766 st = self._uncached_stat()
767 self._stat_cache = st
768 return st
770 def _uncached_stat(self) -> os.stat_result:
771 return os.lstat(self.fs_path)
773 @property
774 def mode(self) -> int:
775 current_mode = self._mode
776 if current_mode is None: 776 ↛ 777line 776 didn't jump to line 777 because the condition on line 776 was never true
777 current_mode = stat.S_IMODE(self.stat().st_mode)
778 self._mode = current_mode
779 return current_mode
781 @mode.setter
782 def mode(self, new_mode: int) -> None:
783 self._rw_check()
784 min_bit = 0o700 if self.is_dir else 0o400
785 if (new_mode & min_bit) != min_bit: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 omode = oct(new_mode)[2:]
787 omin = oct(min_bit)[2:]
788 raise ValueError(
789 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
790 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
791 " There are no paths that do not need these requirements met and they can cause"
792 " problems during build or on the final system."
793 )
794 self._mode = new_mode
796 def _ensure_min_mode(self) -> None:
797 min_bit = 0o700 if self.is_dir else 0o600
798 if self.has_fs_path and (self.mode & 0o600) != 0o600: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true
799 try:
800 fs_path = self.fs_path
801 except TestPathWithNonExistentFSPathError:
802 pass
803 else:
804 st = os.stat(fs_path)
805 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
806 _debug_log(
807 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
808 )
809 os.chmod(fs_path, new_fs_mode)
810 self.mode |= min_bit
812 @property
813 def mtime(self) -> float:
814 mtime = self._mtime
815 if mtime is None:
816 mtime = self.stat().st_mtime
817 self._mtime = mtime
818 return mtime
820 @mtime.setter
821 def mtime(self, new_mtime: float) -> None:
822 self._rw_check()
823 self._mtime = new_mtime
825 @property
826 def tar_owner_info(self) -> tuple[str, int, str, int]:
827 owner = self._owner
828 group = self._group
829 return (
830 owner.entity_name,
831 owner.entity_id,
832 group.entity_name,
833 group.entity_id,
834 )
836 @property
837 def _can_replace_inline(self) -> bool:
838 return False
840 @contextlib.contextmanager
841 def add_file(
842 self,
843 name: str,
844 *,
845 unlink_if_exists: bool = True,
846 use_fs_path_mode: bool = False,
847 mode: int = 0o0644,
848 mtime: float | None = None,
849 # Special-case parameters that are not exposed in the API
850 fs_basename_matters: bool = False,
851 subdir_key: str | None = None,
852 ) -> Iterator["FSPath"]:
853 if "/" in name or name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true
854 raise ValueError(f'Invalid file name: "{name}"')
855 if not self.is_dir: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 raise TypeError(
857 f"Cannot create {self._orphan_safe_path()}/{name}:"
858 f" {self._orphan_safe_path()} is not a directory"
859 )
860 self._rw_check()
861 existing = self.get(name)
862 if existing is not None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 if not unlink_if_exists:
864 raise ValueError(
865 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
866 f" and exist_ok was False"
867 )
868 existing.unlink(recursive=False)
870 if fs_basename_matters and subdir_key is None: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true
871 raise ValueError(
872 "When fs_basename_matters is True, a subdir_key must be provided"
873 )
875 directory = generated_content_dir(subdir_key=subdir_key)
877 if fs_basename_matters: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true
878 fs_path = os.path.join(directory, name)
879 with open(fs_path, "xb") as _:
880 # Ensure that the fs_path exists
881 pass
882 child = FSBackedFilePath(
883 name,
884 self,
885 fs_path,
886 replaceable_inline=True,
887 mtime=mtime,
888 )
889 yield child
890 else:
891 with tempfile.NamedTemporaryFile(
892 dir=directory, suffix=f"__{name}", delete=False
893 ) as fd:
894 fs_path = fd.name
895 child = FSBackedFilePath(
896 name,
897 self,
898 fs_path,
899 replaceable_inline=True,
900 mtime=mtime,
901 )
902 fd.close()
903 yield child
905 if use_fs_path_mode: 905 ↛ 907line 905 didn't jump to line 907 because the condition on line 905 was never true
906 # Ensure the caller can see the current mode
907 os.chmod(fs_path, mode)
908 _check_fs_path_is_file(fs_path, unlink_on_error=child)
909 child._reset_caches()
910 if not use_fs_path_mode: 910 ↛ exitline 910 didn't return from function 'add_file' because the condition on line 910 was always true
911 child.mode = mode
913 def insert_file_from_fs_path(
914 self,
915 name: str,
916 fs_path: str,
917 *,
918 exist_ok: bool = True,
919 use_fs_path_mode: bool = False,
920 mode: int = 0o0644,
921 require_copy_on_write: bool = True,
922 follow_symlinks: bool = True,
923 reference_path: VirtualPath | None = None,
924 ) -> "FSPath":
925 if "/" in name or name in {".", ".."}: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 raise ValueError(f'Invalid file name: "{name}"')
927 if not self.is_dir: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true
928 raise TypeError(
929 f"Cannot create {self._orphan_safe_path()}/{name}:"
930 f" {self._orphan_safe_path()} is not a directory"
931 )
932 self._rw_check()
933 if name in self and not exist_ok: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true
934 raise ValueError(
935 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
936 f" and exist_ok was False"
937 )
938 new_fs_path = fs_path
939 if follow_symlinks:
940 if reference_path is not None: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true
941 raise ValueError(
942 "The reference_path cannot be used with follow_symlinks"
943 )
944 new_fs_path = os.path.realpath(new_fs_path, strict=True)
946 fmode: int | None = mode
947 if use_fs_path_mode:
948 fmode = None
950 st = None
951 if reference_path is None:
952 st = os.lstat(new_fs_path)
953 if stat.S_ISDIR(st.st_mode): 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true
954 raise ValueError(
955 f'The provided path "{fs_path}" is a directory. However, this'
956 " method does not support directories"
957 )
959 if not stat.S_ISREG(st.st_mode): 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true
960 if follow_symlinks:
961 raise ValueError(
962 f"The resolved fs_path ({new_fs_path}) was not a file."
963 )
964 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
965 return FSBackedFilePath(
966 name,
967 self,
968 new_fs_path,
969 initial_mode=fmode,
970 stat_cache=st,
971 replaceable_inline=not require_copy_on_write,
972 reference_path=reference_path,
973 )
975 def add_symlink(
976 self,
977 link_name: str,
978 link_target: str,
979 *,
980 reference_path: VirtualPath | None = None,
981 ) -> "FSPath":
982 if "/" in link_name or link_name in {".", ".."}: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true
983 raise ValueError(
984 f'Invalid file name: "{link_name}" (it must be a valid basename)'
985 )
986 if not self.is_dir: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true
987 raise TypeError(
988 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
989 f" {self._orphan_safe_path()} is not a directory"
990 )
991 self._rw_check()
993 existing = self.get(link_name)
994 if existing: 994 ↛ 996line 994 didn't jump to line 996 because the condition on line 994 was never true
995 # Emulate ln -sf with attempts a non-recursive unlink first.
996 existing.unlink(recursive=False)
998 return SymlinkVirtualPath(
999 link_name,
1000 self,
1001 link_target,
1002 reference_path=reference_path,
1003 )
1005 def mkdir(
1006 self,
1007 name: str,
1008 *,
1009 reference_path: VirtualPath | None = None,
1010 ) -> "FSPath":
1011 if "/" in name or name in {".", ".."}: 1011 ↛ 1012line 1011 didn't jump to line 1012 because the condition on line 1011 was never true
1012 raise ValueError(
1013 f'Invalid file name: "{name}" (it must be a valid basename)'
1014 )
1015 if not self.is_dir: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 raise TypeError(
1017 f"Cannot create {self._orphan_safe_path()}/{name}:"
1018 f" {self._orphan_safe_path()} is not a directory"
1019 )
1020 if reference_path is not None and not reference_path.is_dir: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true
1021 raise ValueError(
1022 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
1023 )
1024 self._rw_check()
1026 existing = self.get(name)
1027 if existing: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true
1028 raise ValueError(f"Path {existing.path} already exist")
1029 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1031 def mkdirs(self, path: str) -> "FSPath":
1032 return cast("FSPath", super().mkdirs(path))
1034 @property
1035 def is_read_write(self) -> bool:
1036 """When true, the file system entry may be mutated
1038 :return: Whether file system mutations are permitted.
1039 """
1040 if self.is_detached:
1041 return True
1042 return assume_not_none(self.parent_dir).is_read_write
1044 def unlink(self, *, recursive: bool = False) -> None:
1045 """Unlink a file or a directory
1047 This operation will detach the path from the file system (causing "is_detached" to return True).
1049 Note that the root directory cannot be deleted.
1051 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1052 as well. When False, an error is raised if the path is a non-empty directory
1053 """
1054 if self.is_detached: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 return
1056 if not recursive and any(self.iterdir): 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true
1057 raise ValueError(
1058 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1059 )
1060 # The .parent_dir setter does a _rw_check() for us.
1061 self.parent_dir = None
1063 def _reset_caches(self) -> None:
1064 self._mtime = None
1065 self._stat_cache = None
1067 def metadata(
1068 self,
1069 metadata_type: type[PMT],
1070 *,
1071 owning_plugin: str | None = None,
1072 ) -> PathMetadataReference[PMT]:
1073 current_plugin = self._current_plugin()
1074 if owning_plugin is None: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true
1075 owning_plugin = current_plugin
1076 metadata_key = (owning_plugin, metadata_type)
1077 metadata_value = self._metadata.get(metadata_key)
1078 if metadata_value is None:
1079 if self.is_detached: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true
1080 raise TypeError(
1081 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1082 )
1083 if not self.is_read_write:
1084 return AlwaysEmptyReadOnlyMetadataReference(
1085 owning_plugin,
1086 current_plugin,
1087 metadata_type,
1088 )
1089 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1090 self._metadata[metadata_key] = metadata_value
1091 return PathMetadataReferenceImplementation(
1092 self,
1093 current_plugin,
1094 metadata_value,
1095 )
1097 @contextlib.contextmanager
1098 def replace_fs_path_content(
1099 self,
1100 *,
1101 use_fs_path_mode: bool = False,
1102 ) -> Iterator[str]:
1103 if not self.is_file: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 raise TypeError(
1105 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1106 )
1107 self._rw_check()
1108 fs_path = self.fs_path
1109 if not self._can_replace_inline: 1109 ↛ 1121line 1109 didn't jump to line 1121 because the condition on line 1109 was always true
1110 fs_path = self.fs_path
1111 directory = generated_content_dir()
1112 with tempfile.NamedTemporaryFile(
1113 dir=directory, suffix=f"__{self.name}", delete=False
1114 ) as new_path_fd:
1115 new_path_fd.close()
1116 _cp_a(fs_path, new_path_fd.name)
1117 fs_path = new_path_fd.name
1118 self._replaced_path(fs_path)
1119 assert self.fs_path == fs_path
1121 current_mtime = self._mtime
1122 if current_mtime is not None:
1123 os.utime(fs_path, (current_mtime, current_mtime))
1125 current_mode = self.mode
1126 yield fs_path
1127 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1128 if not use_fs_path_mode: 1128 ↛ 1130line 1128 didn't jump to line 1130 because the condition on line 1128 was always true
1129 os.chmod(fs_path, current_mode)
1130 self._reset_caches()
1132 def _replaced_path(self, new_fs_path: str) -> None:
1133 raise NotImplementedError
1136class VirtualFSPathBase(FSPath, ABC):
1137 __slots__ = ()
1139 def __init__(
1140 self,
1141 basename: str,
1142 parent: Optional["FSPath"],
1143 children: dict[str, "FSPath"] | None = None,
1144 initial_mode: int | None = None,
1145 mtime: float | None = None,
1146 stat_cache: os.stat_result | None = None,
1147 ) -> None:
1148 super().__init__(
1149 basename,
1150 parent,
1151 children,
1152 initial_mode=initial_mode,
1153 mtime=mtime,
1154 stat_cache=stat_cache,
1155 )
1157 @property
1158 def mtime(self) -> float:
1159 mtime = self._mtime
1160 if mtime is None:
1161 mtime = time.time()
1162 self._mtime = mtime
1163 return mtime
1165 @property
1166 def has_fs_path(self) -> bool:
1167 return False
1169 def stat(self) -> os.stat_result:
1170 if not self.has_fs_path:
1171 raise PureVirtualPathError(
1172 "stat() is only applicable to paths backed by the file system. The path"
1173 f" {self._orphan_safe_path()!r} is purely virtual"
1174 )
1175 return super().stat()
1177 @property
1178 def fs_path(self) -> str:
1179 if not self.has_fs_path:
1180 raise PureVirtualPathError(
1181 "fs_path is only applicable to paths backed by the file system. The path"
1182 f" {self._orphan_safe_path()!r} is purely virtual"
1183 )
1184 return self.fs_path
1187class FSRootDir(FSPath):
1188 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1190 def __init__(self, fs_path: str | None = None) -> None:
1191 self._fs_path = fs_path
1192 self._fs_read_write = True
1193 super().__init__(
1194 ".",
1195 None,
1196 children={},
1197 initial_mode=0o755,
1198 )
1199 self._plugin_context = CurrentPluginContextManager("debputy")
1201 @property
1202 def is_detached(self) -> bool:
1203 return False
1205 def _orphan_safe_path(self) -> str:
1206 return self.name
1208 @property
1209 def path(self) -> str:
1210 return self.name
1212 @property
1213 def parent_dir(self) -> Optional["FSPath"]:
1214 return None
1216 @parent_dir.setter
1217 def parent_dir(self, new_parent: FSPath | None) -> None:
1218 if new_parent is not None:
1219 raise ValueError("The root directory cannot become a non-root directory")
1221 @property
1222 def parent_dir_path(self) -> str | None:
1223 return None
1225 @property
1226 def is_dir(self) -> bool:
1227 return True
1229 @property
1230 def is_file(self) -> bool:
1231 return False
1233 @property
1234 def is_symlink(self) -> bool:
1235 return False
1237 def readlink(self) -> str:
1238 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1240 @property
1241 def has_fs_path(self) -> bool:
1242 return self._fs_path is not None
1244 def stat(self) -> os.stat_result:
1245 if not self.has_fs_path:
1246 raise PureVirtualPathError(
1247 "stat() is only applicable to paths backed by the file system. The path"
1248 f" {self._orphan_safe_path()!r} is purely virtual"
1249 )
1250 return os.stat(self.fs_path)
1252 @property
1253 def fs_path(self) -> str:
1254 if not self.has_fs_path: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true
1255 raise PureVirtualPathError(
1256 "fs_path is only applicable to paths backed by the file system. The path"
1257 f" {self._orphan_safe_path()!r} is purely virtual"
1258 )
1259 return assume_not_none(self._fs_path)
1261 @property
1262 def is_read_write(self) -> bool:
1263 return self._fs_read_write
1265 @is_read_write.setter
1266 def is_read_write(self, new_value: bool) -> None:
1267 self._fs_read_write = new_value
1269 def prune_if_empty_dir(self) -> None:
1270 # No-op for the root directory. There is never a case where you want to delete this directory
1271 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1272 return
1274 def unlink(self, *, recursive: bool = False) -> None:
1275 # There is never a case where you want to delete this directory (and even if you could,
1276 # debputy will need it for technical reasons, so the root dir stays)
1277 raise TypeError("Cannot delete the root directory")
1279 def _current_plugin(self) -> str:
1280 return self._plugin_context.current_plugin_name
1282 @contextlib.contextmanager
1283 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1284 with self._plugin_context.change_plugin_context(new_plugin) as r:
1285 yield r
1288class VirtualPathWithReference(VirtualFSPathBase, ABC):
1289 __slots__ = ("_reference_path",)
1291 def __init__(
1292 self,
1293 basename: str,
1294 parent: FSPath,
1295 *,
1296 default_mode: int,
1297 reference_path: VirtualPath | None = None,
1298 ) -> None:
1299 super().__init__(
1300 basename,
1301 parent=parent,
1302 initial_mode=reference_path.mode if reference_path else default_mode,
1303 )
1304 self._reference_path = reference_path
1306 @property
1307 def has_fs_path(self) -> bool:
1308 ref_path = self._reference_path
1309 return ref_path is not None and ref_path.has_fs_path
1311 @property
1312 def mtime(self) -> float:
1313 mtime = self._mtime
1314 if mtime is None: 1314 ↛ 1321line 1314 didn't jump to line 1321 because the condition on line 1314 was always true
1315 ref_path = self._reference_path
1316 if ref_path: 1316 ↛ 1319line 1316 didn't jump to line 1319 because the condition on line 1316 was always true
1317 mtime = ref_path.mtime
1318 else:
1319 mtime = super().mtime
1320 self._mtime = mtime
1321 return mtime
1323 @mtime.setter
1324 def mtime(self, new_mtime: float) -> None:
1325 self._rw_check()
1326 self._mtime = new_mtime
1328 @property
1329 def fs_path(self) -> str:
1330 ref_path = self._reference_path
1331 if ref_path is not None and ( 1331 ↛ 1335line 1331 didn't jump to line 1335 because the condition on line 1331 was always true
1332 not super().has_fs_path or super().fs_path == ref_path.fs_path
1333 ):
1334 return ref_path.fs_path
1335 return super().fs_path
1337 def stat(self) -> os.stat_result:
1338 ref_path = self._reference_path
1339 if ref_path is not None and (
1340 not super().has_fs_path or super().fs_path == ref_path.fs_path
1341 ):
1342 return ref_path.stat()
1343 return super().stat()
1345 def open(
1346 self,
1347 *,
1348 byte_io: bool = False,
1349 buffering: int = -1,
1350 ) -> TextIO | BinaryIO:
1351 reference_path = self._reference_path
1352 if reference_path is not None and reference_path.fs_path == self.fs_path:
1353 return reference_path.open(byte_io=byte_io, buffering=buffering)
1354 return super().open(byte_io=byte_io, buffering=buffering)
1357class VirtualDirectoryFSPath(VirtualPathWithReference):
1358 __slots__ = ("_reference_path",)
1360 def __init__(
1361 self,
1362 basename: str,
1363 parent: FSPath,
1364 *,
1365 reference_path: VirtualPath | None = None,
1366 ) -> None:
1367 super().__init__(
1368 basename,
1369 parent,
1370 reference_path=reference_path,
1371 default_mode=0o755,
1372 )
1373 self._reference_path = reference_path
1374 assert reference_path is None or reference_path.is_dir
1375 self._ensure_min_mode()
1377 @property
1378 def is_dir(self) -> bool:
1379 return True
1381 @property
1382 def is_file(self) -> bool:
1383 return False
1385 @property
1386 def is_symlink(self) -> bool:
1387 return False
1389 def readlink(self) -> str:
1390 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1393class SymlinkVirtualPath(VirtualPathWithReference):
1394 __slots__ = ("_link_target",)
1396 def __init__(
1397 self,
1398 basename: str,
1399 parent_dir: FSPath,
1400 link_target: str,
1401 *,
1402 reference_path: VirtualPath | None = None,
1403 ) -> None:
1404 super().__init__(
1405 basename,
1406 parent=parent_dir,
1407 default_mode=_SYMLINK_MODE,
1408 reference_path=reference_path,
1409 )
1410 self._link_target = link_target
1412 @property
1413 def is_dir(self) -> bool:
1414 return False
1416 @property
1417 def is_file(self) -> bool:
1418 return False
1420 @property
1421 def is_symlink(self) -> bool:
1422 return True
1424 def readlink(self) -> str:
1425 return self._link_target
1427 @property
1428 def size(self) -> int:
1429 return len(self.readlink())
1432class FSBackedFilePath(VirtualPathWithReference):
1433 __slots__ = ("_fs_path", "_replaceable_inline")
1435 def __init__(
1436 self,
1437 basename: str,
1438 parent_dir: FSPath,
1439 fs_path: str,
1440 *,
1441 replaceable_inline: bool = False,
1442 initial_mode: int | None = None,
1443 mtime: float | None = None,
1444 stat_cache: os.stat_result | None = None,
1445 reference_path: VirtualPath | None = None,
1446 ) -> None:
1447 super().__init__(
1448 basename,
1449 parent_dir,
1450 default_mode=0o644,
1451 reference_path=reference_path,
1452 )
1453 self._fs_path = fs_path
1454 self._replaceable_inline = replaceable_inline
1455 if initial_mode is not None:
1456 self.mode = initial_mode
1457 if mtime is not None:
1458 self._mtime = mtime
1459 self._stat_cache = stat_cache
1460 assert (
1461 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1462 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1463 self._ensure_min_mode()
1465 @property
1466 def is_dir(self) -> bool:
1467 return False
1469 @property
1470 def is_file(self) -> bool:
1471 return True
1473 @property
1474 def is_symlink(self) -> bool:
1475 return False
1477 def readlink(self) -> str:
1478 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1480 @property
1481 def has_fs_path(self) -> bool:
1482 return True
1484 @property
1485 def fs_path(self) -> str:
1486 return self._fs_path
1488 @property
1489 def _can_replace_inline(self) -> bool:
1490 return self._replaceable_inline
1492 def _replaced_path(self, new_fs_path: str) -> None:
1493 self._fs_path = new_fs_path
1494 self._reference_path = None
1495 self._replaceable_inline = True
1498_SYMLINK_MODE = 0o777
1501class VirtualTestPath(FSPath):
1502 __slots__ = (
1503 "_path_type",
1504 "_has_fs_path",
1505 "_fs_path",
1506 "_link_target",
1507 "_content",
1508 "_materialized_content",
1509 )
1511 def __init__(
1512 self,
1513 basename: str,
1514 parent_dir: FSPath | None,
1515 mode: int | None = None,
1516 mtime: float | None = None,
1517 is_dir: bool = False,
1518 has_fs_path: bool | None = False,
1519 fs_path: str | None = None,
1520 link_target: str | None = None,
1521 content: str | None = None,
1522 materialized_content: str | None = None,
1523 ) -> None:
1524 if is_dir:
1525 self._path_type = PathType.DIRECTORY
1526 elif link_target is not None:
1527 self._path_type = PathType.SYMLINK
1528 if mode is not None and mode != _SYMLINK_MODE: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true
1529 raise ValueError(
1530 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1531 )
1532 assert mode is None or mode == _SYMLINK_MODE
1533 else:
1534 self._path_type = PathType.FILE
1536 if mode is not None:
1537 initial_mode = mode
1538 else:
1539 initial_mode = 0o755 if is_dir else 0o644
1541 self._link_target = link_target
1542 if has_fs_path is None:
1543 has_fs_path = bool(fs_path)
1544 self._has_fs_path = has_fs_path
1545 self._fs_path = fs_path
1546 self._materialized_content = materialized_content
1547 super().__init__(
1548 basename,
1549 parent=parent_dir,
1550 initial_mode=initial_mode,
1551 mtime=mtime,
1552 )
1553 self._content = content
1555 @property
1556 def is_dir(self) -> bool:
1557 return self._path_type == PathType.DIRECTORY
1559 @property
1560 def is_file(self) -> bool:
1561 return self._path_type == PathType.FILE
1563 @property
1564 def is_symlink(self) -> bool:
1565 return self._path_type == PathType.SYMLINK
1567 def readlink(self) -> str:
1568 if not self.is_symlink: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true
1569 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1570 link_target = self._link_target
1571 assert link_target is not None
1572 return link_target
1574 @property
1575 def mtime(self) -> float:
1576 if self._mtime is None:
1577 self._mtime = time.time()
1578 return self._mtime
1580 @mtime.setter
1581 def mtime(self, new_mtime: float) -> None:
1582 self._rw_check()
1583 self._mtime = new_mtime
1585 @property
1586 def has_fs_path(self) -> bool:
1587 return self._has_fs_path
1589 def stat(self) -> os.stat_result:
1590 if self.has_fs_path:
1591 path = self.fs_path
1592 if path is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true
1593 raise PureVirtualPathError(
1594 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1595 " cannot provide!"
1596 )
1597 try:
1598 return os.stat(path)
1599 except FileNotFoundError as e:
1600 raise PureVirtualPathError(
1601 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1602 " cannot provide! (An fs_path was provided, but it did not exist)"
1603 ) from e
1605 raise PureVirtualPathError(
1606 "stat() is only applicable to paths backed by the file system. The path"
1607 f" {self._orphan_safe_path()!r} is purely virtual"
1608 )
1610 @property
1611 def size(self) -> int:
1612 if self._content is not None:
1613 return len(self._content.encode("utf-8"))
1614 if self.is_symlink:
1615 return len(self.readlink())
1616 if not self.has_fs_path or self.fs_path is None:
1617 return 0
1618 return self.stat().st_size
1620 @property
1621 def fs_path(self) -> str:
1622 if self.has_fs_path:
1623 if self._fs_path is None and self._materialized_content is not None:
1624 with tempfile.NamedTemporaryFile(
1625 mode="w+t",
1626 encoding="utf-8",
1627 suffix=f"__{self.name}",
1628 delete=False,
1629 ) as fd:
1630 filepath = fd.name
1631 fd.write(self._materialized_content)
1632 self._fs_path = filepath
1633 atexit.register(lambda: os.unlink(filepath))
1635 path = self._fs_path
1636 if path is None: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true
1637 raise PureVirtualPathError(
1638 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1639 " mock path cannot provide!"
1640 )
1641 return path
1642 raise PureVirtualPathError(
1643 "fs_path is only applicable to paths backed by the file system. The path"
1644 f" {self._orphan_safe_path()!r} is purely virtual"
1645 )
1647 def replace_fs_path_content(
1648 self,
1649 *,
1650 use_fs_path_mode: bool = False,
1651 ) -> ContextManager[str]:
1652 if self._content is not None: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true
1653 raise TypeError(
1654 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1655 " created with `content` but for this method to work, the path should have been"
1656 " created with `materialized_content`"
1657 )
1658 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1660 @contextlib.contextmanager
1661 def open_child(
1662 self,
1663 name: str,
1664 mode: OpenMode = "r",
1665 buffering: int = -1,
1666 ) -> TextIO | BinaryIO:
1667 existing = self.get(name)
1668 if existing or "r" in mode:
1669 with super().open_child(name, mode, buffering=buffering) as fd:
1670 yield fd
1671 return
1672 if "b" in mode:
1673 fd = io.BytesIO(b"")
1674 yield fd
1675 content = fd.getvalue().decode("utf-8")
1676 else:
1677 fd = io.StringIO("")
1678 yield fd
1679 content = fd.getvalue()
1680 VirtualTestPath(
1681 name,
1682 self,
1683 mode=0o644,
1684 content=content,
1685 has_fs_path=True,
1686 )
1688 def open(
1689 self,
1690 *,
1691 byte_io: bool = False,
1692 buffering: int = -1,
1693 ) -> TextIO | BinaryIO:
1694 if self._content is None:
1695 try:
1696 return super().open(byte_io=byte_io, buffering=buffering)
1697 except FileNotFoundError as e:
1698 raise TestPathWithNonExistentFSPathError(
1699 f"The test path {self.path} had an fs_path {self._fs_path}, which does not"
1700 " exist. This exception can only occur in the testsuite. Either have the"
1701 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1702 " if that is too painful in general, have the code accept this error as a "
1703 " test only-case and provide a default."
1704 ) from e
1706 if byte_io:
1707 return io.BytesIO(self._content.encode("utf-8"))
1708 return io.StringIO(self._content)
1710 def _replaced_path(self, new_fs_path: str) -> None:
1711 self._fs_path = new_fs_path
1714class OSFSOverlayBase(VirtualPathBase, Generic[FSP]):
1715 __slots__ = (
1716 "_path",
1717 "_fs_path",
1718 "_parent",
1719 "__weakref__",
1720 )
1722 def __init__(
1723 self,
1724 path: str,
1725 fs_path: str,
1726 parent: FSP | None,
1727 ) -> None:
1728 self._path: str = path
1729 prefix = "/" if fs_path.startswith("/") else ""
1730 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1731 self._parent: ReferenceType[FSP] | None = (
1732 ref(parent) if parent is not None else None
1733 )
1735 @property
1736 def name(self) -> str:
1737 return os.path.basename(self._path)
1739 @property
1740 def path(self) -> str:
1741 return self._path
1743 @property
1744 def parent_dir(self) -> Optional["FSP"]:
1745 parent = self._parent
1746 if parent is None:
1747 return None
1748 resolved = parent()
1749 if resolved is None:
1750 raise RuntimeError("Parent was garbage collected!")
1751 return resolved
1753 @property
1754 def fs_path(self) -> str:
1755 return self._fs_path
1757 def stat(self) -> os.stat_result:
1758 return os.lstat(self.fs_path)
1760 @property
1761 def is_dir(self) -> bool:
1762 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1763 try:
1764 return stat.S_ISDIR(self.stat().st_mode)
1765 except FileNotFoundError:
1766 return False
1768 @property
1769 def is_file(self) -> bool:
1770 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1771 try:
1772 return stat.S_ISREG(self.stat().st_mode)
1773 except FileNotFoundError:
1774 return False
1776 @property
1777 def is_symlink(self) -> bool:
1778 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1779 try:
1780 return stat.S_ISLNK(self.stat().st_mode)
1781 except FileNotFoundError:
1782 return False
1784 @property
1785 def has_fs_path(self) -> bool:
1786 return True
1788 def open(
1789 self,
1790 *,
1791 byte_io: bool = False,
1792 buffering: int = -1,
1793 ) -> TextIO | BinaryIO:
1794 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1795 # case.
1796 if not self.is_file and not self.is_symlink:
1797 raise TypeError(
1798 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1799 )
1801 if byte_io:
1802 return open(self.fs_path, "rb", buffering=buffering)
1803 return open(self.fs_path, encoding="utf-8", buffering=buffering)
1805 def metadata(
1806 self,
1807 metadata_type: type[PMT],
1808 *,
1809 owning_plugin: str | None = None,
1810 ) -> PathMetadataReference[PMT]:
1811 current_plugin = self._current_plugin()
1812 if owning_plugin is None:
1813 owning_plugin = current_plugin
1814 return AlwaysEmptyReadOnlyMetadataReference(
1815 owning_plugin,
1816 current_plugin,
1817 metadata_type,
1818 )
1820 def all_paths(self) -> Iterable["OSFSControlPath"]:
1821 yield self
1822 if not self.is_dir:
1823 return
1824 stack = list(self.iterdir)
1825 stack.reverse()
1826 while stack:
1827 current = stack.pop()
1828 yield current
1829 if current.is_dir:
1830 stack.extend(reversed(list(current.iterdir)))
1832 def _resolve_children(
1833 self, new_child: Callable[[str, str, FSP], FSC]
1834 ) -> Mapping[str, FSC]:
1835 if not self.is_dir:
1836 return {}
1837 dir_path = self.path
1838 dir_fs_path = self.fs_path
1839 children = {}
1840 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1841 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1842 child_fs_path = (
1843 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1844 )
1845 children[name] = new_child(
1846 child_path,
1847 child_fs_path,
1848 self,
1849 )
1850 return children
1853class OSFSROOverlay(OSFSOverlayBase["FSROOverlay"]):
1854 __slots__ = (
1855 "_stat_cache",
1856 "_readlink_cache",
1857 "_children",
1858 "_stat_failed_cache",
1859 )
1861 def __init__(
1862 self,
1863 path: str,
1864 fs_path: str,
1865 parent: Optional["OSFSROOverlay"],
1866 ) -> None:
1867 super().__init__(path, fs_path, parent=parent)
1868 self._stat_cache: os.stat_result | None = None
1869 self._readlink_cache: str | None = None
1870 self._stat_failed_cache = False
1871 self._children: Mapping[str, OSFSROOverlay] | None = None
1873 @classmethod
1874 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay":
1875 return OSFSROOverlay(path, fs_path, None)
1877 @property
1878 def iterdir(self) -> Iterable["OSFSROOverlay"]:
1879 if not self.is_dir:
1880 return
1881 if self._children is None:
1882 self._ensure_children_are_resolved()
1883 yield from assume_not_none(self._children).values()
1885 def lookup(self, path: str) -> Optional["OSFSROOverlay"]:
1886 if not self.is_dir:
1887 return None
1888 if self._children is None:
1889 self._ensure_children_are_resolved()
1891 absolute, _, path_parts = _split_path(path)
1892 current = cast("FSROOverlay", _root(self)) if absolute else self
1893 for no, dir_part in enumerate(path_parts):
1894 if dir_part == ".":
1895 continue
1896 if dir_part == "..":
1897 p = current.parent_dir
1898 if p is None:
1899 raise ValueError(f'The path "{path}" escapes the root dir')
1900 current = cast("FSROOverlay", p)
1901 continue
1902 try:
1903 current = current[dir_part]
1904 except KeyError:
1905 return None
1906 return current
1908 def _ensure_children_are_resolved(self) -> None:
1909 if not self.is_dir or self._children:
1910 return
1911 self._children = self._resolve_children(
1912 lambda n, fsp, p: OSFSROOverlay(n, fsp, p)
1913 )
1915 @property
1916 def is_detached(self) -> bool:
1917 return False
1919 def __getitem__(self, key) -> "VirtualPath":
1920 if not self.is_dir: 1920 ↛ 1922line 1920 didn't jump to line 1922 because the condition on line 1920 was always true
1921 raise KeyError(key)
1922 if self._children is None:
1923 self._ensure_children_are_resolved()
1924 if isinstance(key, FSPath):
1925 key = key.name
1926 return self._children[key]
1928 def __delitem__(self, key) -> Never:
1929 self._error_ro_fs()
1931 @property
1932 def is_read_write(self) -> bool:
1933 return False
1935 def _rw_check(self) -> Never:
1936 self._error_ro_fs()
1938 def _error_ro_fs(self) -> Never:
1939 raise DebputyFSIsROError(
1940 f'Attempt to write to "{self.path}" failed:'
1941 " Debputy Virtual File system is R/O."
1942 )
1944 def stat(self) -> os.stat_result:
1945 if self._stat_failed_cache: 1945 ↛ 1946line 1945 didn't jump to line 1946 because the condition on line 1945 was never true
1946 raise FileNotFoundError(
1947 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
1948 )
1950 if self._stat_cache is None: 1950 ↛ 1956line 1950 didn't jump to line 1956 because the condition on line 1950 was always true
1951 try:
1952 self._stat_cache = os.lstat(self.fs_path)
1953 except FileNotFoundError:
1954 self._stat_failed_cache = True
1955 raise
1956 return self._stat_cache
1958 @property
1959 def mode(self) -> int:
1960 return stat.S_IMODE(self.stat().st_mode)
1962 @mode.setter
1963 def mode(self, _unused: int) -> Never:
1964 self._error_ro_fs()
1966 @property
1967 def mtime(self) -> float:
1968 return self.stat().st_mtime
1970 @mtime.setter
1971 def mtime(self, new_mtime: float) -> Never:
1972 self._error_ro_fs()
1974 def readlink(self) -> str:
1975 if not self.is_symlink:
1976 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1977 if self._readlink_cache is None:
1978 self._readlink_cache = os.readlink(self.fs_path)
1979 return self._readlink_cache
1981 def chown(
1982 self,
1983 owner: StaticFileSystemOwner | None,
1984 group: StaticFileSystemGroup | None,
1985 ) -> Never:
1986 self._error_ro_fs()
1988 def mkdir(self, name: str) -> Never:
1989 self._error_ro_fs()
1991 def add_file(
1992 self,
1993 name: str,
1994 *,
1995 unlink_if_exists: bool = True,
1996 use_fs_path_mode: bool = False,
1997 mode: int = 0o0644,
1998 mtime: float | None = None,
1999 ) -> Never:
2000 self._error_ro_fs()
2002 def add_symlink(self, link_name: str, link_target: str) -> Never:
2003 self._error_ro_fs()
2005 def unlink(self, *, recursive: bool = False) -> Never:
2006 self._error_ro_fs()
2009class OSFSROOverlayRootDir(OSFSROOverlay):
2010 __slots__ = ("_plugin_context",)
2012 def __init__(self, path: str, fs_path: str) -> None:
2013 super().__init__(path, fs_path, None)
2014 self._plugin_context = CurrentPluginContextManager("debputy")
2016 def _current_plugin(self) -> str:
2017 return self._plugin_context.current_plugin_name
2019 @contextlib.contextmanager
2020 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2021 with self._plugin_context.change_plugin_context(new_plugin) as r:
2022 yield r
2025class OSFSControlPath(OSFSOverlayBase["FSControlPath"]):
2027 @property
2028 def iterdir(self) -> Iterable["OSFSControlPath"]:
2029 if not self.is_dir:
2030 return
2031 yield from self._resolve_children(
2032 lambda n, fsp, p: OSFSControlPath(n, fsp, p)
2033 ).values()
2035 def lookup(self, path: str) -> Optional["OSFSControlPath"]:
2036 if not self.is_dir:
2037 return None
2039 absolute, _, path_parts = _split_path(path)
2040 current = cast("FSControlPath", _root(self)) if absolute else self
2041 for no, dir_part in enumerate(path_parts):
2042 if dir_part == ".":
2043 continue
2044 if dir_part == "..":
2045 p = current.parent_dir
2046 if p is None:
2047 raise ValueError(f'The path "{path}" escapes the root dir')
2048 current = cast("FSControlPath", p)
2049 continue
2050 try:
2051 current = current[dir_part]
2052 except KeyError:
2053 return None
2054 return current
2056 @property
2057 def is_detached(self) -> bool:
2058 try:
2059 self.stat()
2060 except FileNotFoundError:
2061 return True
2062 else:
2063 return False
2065 def __getitem__(self, key) -> "VirtualPath":
2066 if not self.is_dir:
2067 raise KeyError(key)
2068 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p))
2069 if isinstance(key, FSPath):
2070 key = key.name
2071 return children[key]
2073 def __delitem__(self, key) -> None:
2074 self[key].unlink()
2076 @property
2077 def is_read_write(self) -> bool:
2078 return True
2080 @property
2081 def mode(self) -> int:
2082 return stat.S_IMODE(self.stat().st_mode)
2084 @mode.setter
2085 def mode(self, new_mode: int) -> None:
2086 os.chmod(self.fs_path, new_mode)
2088 @property
2089 def mtime(self) -> float:
2090 return self.stat().st_mtime
2092 @mtime.setter
2093 def mtime(self, new_mtime: float) -> None:
2094 os.utime(self.fs_path, (new_mtime, new_mtime))
2096 def readlink(self) -> Never:
2097 if not self.is_symlink:
2098 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2099 assert False
2101 def chown(
2102 self,
2103 owner: StaticFileSystemOwner | None,
2104 group: StaticFileSystemGroup | None,
2105 ) -> None:
2106 raise ValueError(
2107 "No need to chown paths in the control.tar: They are always root:root"
2108 )
2110 def mkdir(self, name: str) -> Never:
2111 raise TypeError("The control.tar never contains subdirectories.")
2113 @contextlib.contextmanager
2114 def add_file(
2115 self,
2116 name: str,
2117 *,
2118 unlink_if_exists: bool = True,
2119 use_fs_path_mode: bool = False,
2120 mode: int = 0o0644,
2121 mtime: float | None = None,
2122 ) -> ContextManager["VirtualPath"]:
2123 if "/" in name or name in {".", ".."}:
2124 raise ValueError(f'Invalid file name: "{name}"')
2125 if not self.is_dir:
2126 raise TypeError(
2127 f"Cannot create {self._orphan_safe_path()}/{name}:"
2128 f" {self._orphan_safe_path()} is not a directory"
2129 )
2130 self._rw_check()
2131 existing = self.get(name)
2132 if existing is not None:
2133 if not unlink_if_exists:
2134 raise ValueError(
2135 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2136 f" and exist_ok was False"
2137 )
2138 assert existing.is_file
2140 fs_path = os.path.join(self.fs_path, name)
2141 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2142 with open(fs_path, "wb") as fd:
2143 # Ensure that the fs_path exists and default mode is reasonable
2144 os.chmod(fd.fileno(), mode)
2145 child = OSFSControlPath(
2146 name,
2147 fs_path,
2148 self,
2149 )
2150 yield child
2151 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2152 child.mode = mode
2154 @contextlib.contextmanager
2155 def replace_fs_path_content(
2156 self,
2157 *,
2158 use_fs_path_mode: bool = False,
2159 ) -> ContextManager[str]:
2160 if not self.is_file:
2161 raise TypeError(
2162 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2163 )
2164 restore_mode = self.mode if use_fs_path_mode else None
2165 yield self.fs_path
2166 _check_fs_path_is_file(self.fs_path, self)
2167 if restore_mode is not None:
2168 self.mode = restore_mode
2170 def add_symlink(self, link_name: str, link_target: str) -> Never:
2171 raise TypeError("The control.tar never contains symlinks.")
2173 def unlink(self, *, recursive: bool = False) -> None:
2174 if self._parent is None:
2175 return
2176 # By virtue of the control FS only containing paths, we can assume `recursive` never
2177 # matters and that `os.unlink` will be sufficient.
2178 assert self.is_file
2179 os.unlink(self.fs_path)
2182class FSControlRootDir(OSFSControlPath):
2184 @classmethod
2185 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2186 return FSControlRootDir(".", fs_path, None)
2188 def insert_file_from_fs_path(
2189 self,
2190 name: str,
2191 fs_path: str,
2192 *,
2193 exist_ok: bool = True,
2194 use_fs_path_mode: bool = False,
2195 mode: int = 0o0644,
2196 # Ignored, but accepted for compat with FSPath's variant of this function.
2197 # - This is used by install_or_generate_conffiles.
2198 reference_path: VirtualPath | None = None, # noqa
2199 ) -> "OSFSControlPath":
2200 if "/" in name or name in {".", ".."}:
2201 raise ValueError(f'Invalid file name: "{name}"')
2202 if not self.is_dir:
2203 raise TypeError(
2204 f"Cannot create {self._orphan_safe_path()}/{name}:"
2205 f" {self._orphan_safe_path()} is not a directory"
2206 )
2207 self._rw_check()
2208 if name in self and not exist_ok:
2209 raise ValueError(
2210 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2211 f" and exist_ok was False"
2212 )
2214 target_path = os.path.join(self.fs_path, name)
2215 if use_fs_path_mode:
2216 shutil.copymode(
2217 fs_path,
2218 target_path,
2219 follow_symlinks=True,
2220 )
2221 else:
2222 shutil.copyfile(
2223 fs_path,
2224 target_path,
2225 follow_symlinks=True,
2226 )
2227 os.chmod(target_path, mode)
2228 return cast("FSControlPath", self[name])
2231def as_path_def(pd: str | PathDef) -> PathDef:
2232 return PathDef(pd) if isinstance(pd, str) else pd
2235def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]:
2236 yield from (as_path_def(p) for p in paths)
2239def build_virtual_fs(
2240 paths: Iterable[str | PathDef],
2241 read_write_fs: bool = False,
2242) -> "FSPath":
2243 root_dir: FSRootDir | None = None
2244 directories: dict[str, FSPath] = {}
2245 non_directories = set()
2247 def _ensure_parent_dirs(p: str) -> None:
2248 current = p.rstrip("/")
2249 missing_dirs = []
2250 while True:
2251 current = os.path.dirname(current)
2252 if current in directories:
2253 break
2254 if current in non_directories: 2254 ↛ 2255line 2254 didn't jump to line 2255 because the condition on line 2254 was never true
2255 raise ValueError(
2256 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2257 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2258 )
2259 missing_dirs.append(current)
2260 for dir_path in reversed(missing_dirs):
2261 parent_dir = directories[os.path.dirname(dir_path)]
2262 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2263 directories[dir_path] = d
2265 for path_def in as_path_defs(paths):
2266 path = path_def.path_name
2267 if path in directories or path in non_directories: 2267 ↛ 2268line 2267 didn't jump to line 2268 because the condition on line 2267 was never true
2268 raise ValueError(
2269 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2270 ' "correct order" (ensure directories occur before their children)'
2271 )
2272 if root_dir is None:
2273 root_fs_path = None
2274 if path in (".", "./", "/"):
2275 root_fs_path = path_def.fs_path
2276 root_dir = FSRootDir(fs_path=root_fs_path)
2277 directories["."] = root_dir
2279 if path not in (".", "./", "/") and not path.startswith("./"):
2280 path = "./" + path
2281 if path not in (".", "./", "/"):
2282 _ensure_parent_dirs(path)
2283 if path in (".", "./"):
2284 assert "." in directories
2285 continue
2286 is_dir = False
2287 if path.endswith("/"):
2288 path = path[:-1]
2289 is_dir = True
2290 directory = directories[os.path.dirname(path)]
2291 assert not is_dir or not bool(
2292 path_def.link_target
2293 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2294 fs_path = VirtualTestPath(
2295 os.path.basename(path),
2296 directory,
2297 is_dir=is_dir,
2298 mode=path_def.mode,
2299 mtime=path_def.mtime,
2300 has_fs_path=path_def.has_fs_path,
2301 fs_path=path_def.fs_path,
2302 link_target=path_def.link_target,
2303 content=path_def.content,
2304 materialized_content=path_def.materialized_content,
2305 )
2306 assert not fs_path.is_detached
2307 if fs_path.is_dir:
2308 directories[fs_path.path] = fs_path
2309 else:
2310 non_directories.add(fs_path.path)
2312 if root_dir is None:
2313 root_dir = FSRootDir()
2315 root_dir.is_read_write = read_write_fs
2316 return root_dir