Coverage for src/debputy/filesystem_scan.py: 64%
1363 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import shutil
9import stat
10import subprocess
11import tempfile
12import time
13import typing
14from abc import ABC
15from contextlib import suppress
16from typing import (
17 Optional,
18 cast,
19 Any,
20 ContextManager,
21 TextIO,
22 BinaryIO,
23 Generic,
24 TypeVar,
25 overload,
26 Literal,
27 Never,
28)
29from collections.abc import Iterable, Iterator, Mapping, Callable
30from weakref import ref, ReferenceType
32from debputy.exceptions import (
33 PureVirtualPathError,
34 DebputyFSIsROError,
35 DebputyMetadataAccessError,
36 TestPathWithNonExistentFSPathError,
37 SymlinkLoopError,
38)
39from debputy.intermediate_manifest import PathType
40from debputy.manifest_parser.base_types import (
41 ROOT_DEFINITION,
42 StaticFileSystemOwner,
43 StaticFileSystemGroup,
44)
45from debputy.plugin.api.spec import (
46 VirtualPath,
47 PathDef,
48 PathMetadataReference,
49 PMT,
50)
51from debputy.types import VP
52from debputy.util import (
53 generated_content_dir,
54 _error,
55 escape_shell,
56 assume_not_none,
57 _normalize_path,
58 _debug_log,
59)
61BY_BASENAME = operator.attrgetter("name")
63FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
64FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
67BinaryOpenMode = Literal[
68 "rb",
69 "r+b",
70 "wb",
71 "w+b",
72 "xb",
73 "ab",
74]
75TextOpenMode = Literal[
76 "r",
77 "r+",
78 "rt",
79 "r+t",
80 "w",
81 "w+",
82 "wt",
83 "w+t",
84 "x",
85 "xt",
86 "a",
87 "at",
88]
89OpenMode = Literal[BinaryOpenMode, TextOpenMode]
92class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
93 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
95 def __init__(
96 self,
97 owning_plugin: str,
98 current_plugin: str,
99 metadata_type: type[PMT],
100 ) -> None:
101 self._owning_plugin = owning_plugin
102 self._current_plugin = current_plugin
103 self._metadata_type = metadata_type
105 @property
106 def is_present(self) -> bool:
107 return False
109 @property
110 def can_read(self) -> bool:
111 return self._owning_plugin == self._current_plugin
113 @property
114 def can_write(self) -> bool:
115 return False
117 @property
118 def value(self) -> PMT | None:
119 if self.can_read: 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true
120 return None
121 raise DebputyMetadataAccessError(
122 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
123 f" {self._owning_plugin} as the metadata has not been made"
124 f" readable to the plugin {self._current_plugin}."
125 )
127 @value.setter
128 def value(self, new_value: PMT) -> None:
129 if self._is_owner:
130 raise DebputyFSIsROError(
131 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
132 )
133 raise DebputyMetadataAccessError(
134 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
135 f" {self._owning_plugin} as the metadata has not been made"
136 f" read-write to the plugin {self._current_plugin}."
137 )
139 @property
140 def _is_owner(self) -> bool:
141 return self._owning_plugin == self._current_plugin
144@dataclasses.dataclass(slots=True)
145class PathMetadataValue(Generic[PMT]):
146 owning_plugin: str
147 metadata_type: type[PMT]
148 value: PMT | None = None
150 def can_read_value(self, current_plugin: str) -> bool:
151 return self.owning_plugin == current_plugin
153 def can_write_value(self, current_plugin: str) -> bool:
154 return self.owning_plugin == current_plugin
157class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
158 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
160 def __init__(
161 self,
162 owning_path: "VirtualPathBase",
163 current_plugin: str,
164 path_metadata_value: PathMetadataValue[PMT],
165 ) -> None:
166 self._owning_path = owning_path
167 self._current_plugin = current_plugin
168 self._path_metadata_value = path_metadata_value
170 @property
171 def is_present(self) -> bool:
172 if not self.can_read: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 return False
174 return self._path_metadata_value.value is not None
176 @property
177 def can_read(self) -> bool:
178 return self._path_metadata_value.can_read_value(self._current_plugin)
180 @property
181 def can_write(self) -> bool:
182 if not self._path_metadata_value.can_write_value(self._current_plugin): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 return False
184 owning_path = self._owning_path
185 return owning_path.is_read_write and not owning_path.is_detached
187 @property
188 def value(self) -> PMT | None:
189 if self.can_read: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 return self._path_metadata_value.value
191 raise DebputyMetadataAccessError(
192 f"Cannot read the metadata {self._metadata_type_name} owned by"
193 f" {self._owning_plugin} as the metadata has not been made"
194 f" readable to the plugin {self._current_plugin}."
195 )
197 @value.setter
198 def value(self, new_value: PMT) -> None:
199 if not self.can_write: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 m = "set" if new_value is not None else "delete"
201 raise DebputyMetadataAccessError(
202 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
203 f" {self._owning_plugin} as the metadata has not been made"
204 f" read-write to the plugin {self._current_plugin}."
205 )
206 owning_path = self._owning_path
207 if not owning_path.is_read_write: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise DebputyFSIsROError(
209 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
210 )
211 if owning_path.is_detached: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise TypeError(
213 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
214 )
215 self._path_metadata_value.value = new_value
217 @property
218 def _is_owner(self) -> bool:
219 return self._owning_plugin == self._current_plugin
221 @property
222 def _owning_plugin(self) -> str:
223 return self._path_metadata_value.owning_plugin
225 @property
226 def _metadata_type_name(self) -> str:
227 return self._path_metadata_value.metadata_type.__name__
230def _cp_a(source: str, dest: str) -> None:
231 cmd = ["cp", "-a", source, dest]
232 try:
233 subprocess.check_call(cmd)
234 except subprocess.CalledProcessError:
235 full_command = escape_shell(*cmd)
236 _error(
237 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
238 f" above to understand what went wrong. The full command was: {full_command}"
239 )
242def _split_path(path: str) -> tuple[bool, bool, list[str]]:
243 must_be_dir = True if path.endswith("/") else False
244 absolute = False
245 if path.startswith("/"):
246 absolute = True
247 path = "." + path
248 path_parts = path.rstrip("/").split("/")
249 if must_be_dir:
250 path_parts.append(".")
251 return absolute, must_be_dir, path_parts
254def _root(path: VP) -> "VirtualPathBase":
255 current = path
256 while not current.is_root_dir():
257 parent = current.parent_dir
258 assert parent is not None # type hint
259 current = parent
260 assert isinstance(current, VirtualPathBase)
261 return current
264def _check_fs_path_is_file(
265 fs_path: str,
266 unlink_on_error: Optional["VirtualPath"] = None,
267) -> None:
268 had_issue = False
269 try:
270 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
271 st = os.lstat(fs_path)
272 except FileNotFoundError:
273 had_issue = True
274 else:
275 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 had_issue = True
277 if not had_issue: 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true
278 return
280 if unlink_on_error:
281 with suppress(FileNotFoundError):
282 os.unlink(fs_path)
283 raise TypeError(
284 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
285 " linked to another file. The entry has been disconnected."
286 )
289class CurrentPluginContextManager:
290 __slots__ = ("_plugin_names",)
292 def __init__(self, initial_plugin_name: str) -> None:
293 self._plugin_names = [initial_plugin_name]
295 @property
296 def current_plugin_name(self) -> str:
297 return self._plugin_names[-1]
299 @contextlib.contextmanager
300 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
301 self._plugin_names.append(new_plugin_name)
302 yield new_plugin_name
303 self._plugin_names.pop()
306class VirtualPathBase(VirtualPath, ABC):
307 __slots__ = ()
309 def stat(self) -> os.stat_result:
310 # TODO: Remove
311 """Attempt to do stat of the underlying path (if it exists)
313 *Avoid* using `stat()` whenever possible where a more specialized attribute exist. The
314 `stat()` call returns the data from the file system and often, `debputy` does *not* track
315 its state in the file system. As an example, if you want to know the file system mode of
316 a path, please use the `mode` attribute instead.
318 This never follow symlinks (it behaves like `os.lstat`). It will raise an error
319 if the path is not backed by a file system object (that is, `has_fs_path` is False).
321 :return: The stat result or an error.
322 """
323 raise NotImplementedError()
325 @property
326 def size(self) -> int:
327 """Resolve the file size (`st_size`)
329 This may be using `stat()` and therefore `fs_path`.
331 :return: The size of the file in bytes
332 """
333 return self.stat().st_size
335 def _orphan_safe_path(self) -> str:
336 return self.path
338 def _rw_check(self) -> None:
339 if not self.is_read_write:
340 raise DebputyFSIsROError(
341 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
342 " Debputy Virtual File system is R/O."
343 )
345 @property
346 def is_detached(self) -> bool:
347 raise NotImplementedError
349 def is_root_dir(self) -> bool:
350 # The root directory is never detachable in the current setup
351 return not self.is_detached and self.parent_dir is None
353 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
354 match, missing = self.attempt_lookup(path)
355 if missing:
356 return None
357 return match
359 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]:
360 if self.is_detached: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 raise ValueError(
362 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
363 )
364 absolute, must_be_dir, path_parts = _split_path(path)
365 current = _root(self) if absolute else self
366 path_parts.reverse()
367 link_expansions = set()
368 while path_parts:
369 dir_part = path_parts.pop()
370 if dir_part == ".":
371 continue
372 if dir_part == "..":
373 if current.is_root_dir(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise ValueError(f'The path "{path}" escapes the root dir')
375 p = current.parent_dir
376 assert p is not None # type hint
377 current = cast("VirtualPathBase", p)
378 continue
379 try:
380 current = cast("VirtualPathBase", current[dir_part])
381 except KeyError:
382 path_parts.append(dir_part)
383 path_parts.reverse()
384 if must_be_dir:
385 path_parts.pop()
386 return current, path_parts
387 if current.is_symlink and path_parts:
388 if current.path in link_expansions:
389 # This is our loop detection for now. It might have some false positives where you
390 # could safely resolve the same symlink twice. However, given that this use-case is
391 # basically non-existent in practice for packaging, we just stop here for now.
392 raise SymlinkLoopError(
393 f'The path "{path}" traversed the symlink "{current.path}" multiple'
394 " times. Currently, traversing the same symlink twice is considered"
395 " a loop by `debputy` even if the path would eventually resolve."
396 " Consider filing a feature request if you have a benign case that"
397 " triggers this error."
398 )
399 link_expansions.add(current.path)
400 link_target = current.readlink()
401 link_absolute, _, link_path_parts = _split_path(link_target)
402 if link_absolute:
403 current = _root(current)
404 else:
405 current = cast(
406 "VirtualPathBase", assume_not_none(current.parent_dir)
407 )
408 link_path_parts.reverse()
409 path_parts.extend(link_path_parts)
410 return current, []
412 def mkdirs(self, path: str) -> "VirtualPath":
413 current: VirtualPath
414 current, missing_parts = self.attempt_lookup(
415 f"{path}/" if not path.endswith("/") else path
416 )
417 if not current.is_dir: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 raise ValueError(
419 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
420 " a directory. However, that path exist AND is a not directory."
421 )
422 for missing_part in missing_parts:
423 assert missing_part not in (".", "..")
424 current = current.mkdir(missing_part)
425 return current
427 def prune_if_empty_dir(self) -> None:
428 """Remove this and all (now) empty parent directories
430 Same as: `rmdir --ignore-fail-on-non-empty --parents`
432 This operation may cause the path (and any of its parent directories) to become "detached"
433 and therefore unsafe to use in further operations.
434 """
435 self._rw_check()
437 if not self.is_dir: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
439 # No-op for the root directory. There is never a case where you want to delete this directory
440 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
441 if any(self.iterdir()) or self.is_root_dir():
442 return
443 parent_dir = self.parent_dir
445 # Recursive does not matter; we already know the directory is empty.
446 self.unlink()
448 if parent_dir: 448 ↛ exitline 448 didn't return from function 'prune_if_empty_dir' because the condition on line 448 was always true
449 parent_dir.prune_if_empty_dir()
451 def _current_plugin(self) -> str:
452 if self.is_detached: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 raise TypeError("Cannot resolve the current plugin; path is detached")
454 return cast("FSRootDir", _root(self))._current_plugin()
456 @overload
457 def open_child( 457 ↛ exitline 457 didn't return from function 'open_child' because
458 self,
459 name: str,
460 mode: TextOpenMode = "r",
461 buffering: int = -1,
462 ) -> TextIO: ...
464 @overload
465 def open_child( 465 ↛ exitline 465 didn't return from function 'open_child' because
466 self,
467 name: str,
468 mode: BinaryOpenMode,
469 buffering: int = -1,
470 ) -> BinaryIO: ...
472 @contextlib.contextmanager
473 def open_child(self, name, mode="r", buffering=-1):
474 """Open a child path of the current directory in a given mode. Usually used with a context manager
476 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
477 The following symbols are accepted with the same meaning as Python's open:
478 * `r`
479 * `w`
480 * `x`
481 * `a`
482 * `+`
483 * `b`
484 * `t`
486 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
487 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
488 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
489 as-is.
491 :param name: The name of the child path to open. Must be a basename.
492 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
493 examples.
494 :param buffering: Same as open(..., buffering=...) where supported. Notably during
495 testing, the content may be purely in memory and use a BytesIO/StringIO
496 (which does not accept that parameter, but then it is buffered in a different way)
497 :return: The file handle.
498 """
499 existing = self.get(name)
500 if "r" in mode: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 if existing is None:
502 raise ValueError(
503 f"Path {self.path}/{name} does not exist and mode had `r`"
504 )
505 if "+" not in mode:
506 # open(byte_io="b" in mode) matches no typed signature overload.
507 if "b" in mode:
508 with existing.open(byte_io=True, buffering=buffering) as fd:
509 yield fd
510 else:
511 with existing.open(byte_io=False, buffering=buffering) as fd:
512 yield fd
514 encoding = None if "b" in mode else "utf-8"
516 if existing: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 if "x" in mode:
518 raise ValueError(
519 f"Path {existing.path} already exists and mode had `x`"
520 )
521 with (
522 existing.replace_fs_path_content() as fs_path,
523 open(fs_path, mode, encoding=encoding) as fd,
524 ):
525 yield fd
526 else:
527 assert "r" not in mode
528 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
529 with (
530 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
531 open(new_path.fs_path, mode, encoding=encoding) as fd,
532 ):
533 yield fd
536class InMemoryVirtualPathBase(VirtualPathBase, ABC):
537 __slots__ = (
538 "_basename",
539 "_parent_dir",
540 "_path_cache",
541 "_parent_path_cache",
542 "_last_known_parent_path",
543 "_mode",
544 "_owner",
545 "_group",
546 "_mtime",
547 "_stat_cache",
548 "_metadata",
549 "__weakref__",
550 )
552 def __init__(
553 self,
554 basename: str,
555 parent: Optional["InMemoryVirtualPathBase"],
556 initial_mode: int | None = None,
557 mtime: float | None = None,
558 stat_cache: os.stat_result | None = None,
559 ) -> None:
560 self._basename = basename
561 self._path_cache: str | None = None
562 self._parent_path_cache: str | None = None
563 self._last_known_parent_path: str | None = None
564 self._mode = initial_mode
565 self._mtime = mtime
566 self._stat_cache = stat_cache
567 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {}
568 # The `_owner` and `_group` is directly access outside the class via `tar_owner_info`
569 self._owner = ROOT_DEFINITION
570 self._group = ROOT_DEFINITION
572 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
573 # is_orphaned, which assumes self._parent_dir is an attribute.
574 self._parent_dir: ReferenceType["InMemoryVirtualPathBase"] | None = None
575 if parent is not None:
576 self.parent_dir = parent
578 @property
579 def name(self) -> str:
580 return self._basename
582 @name.setter
583 def name(self, new_name: str) -> None:
584 self._rw_check()
585 if new_name == self._basename: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 return
587 if self.is_detached: 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true
588 self._basename = new_name
589 return
590 self._rw_check()
591 parent = self.parent_dir
592 # This little parent_dir dance ensures the parent dir detects the rename properly
593 self.parent_dir = None
594 self._basename = new_name
595 self.parent_dir = parent
597 # Overridden for new return type
598 def iterdir(self) -> Iterable["InMemoryVirtualPathBase"]:
599 raise NotImplementedError
601 def all_paths(self) -> Iterable["InMemoryVirtualPathBase"]:
602 yield self
603 if not self.is_dir:
604 return
605 by_basename = BY_BASENAME
606 stack = sorted(self.iterdir(), key=by_basename, reverse=True)
607 while stack:
608 current = stack.pop()
609 yield current
610 if current.is_dir and not current.is_detached:
611 stack.extend(sorted(current.iterdir(), key=by_basename, reverse=True))
613 def walk(
614 self,
615 ) -> Iterable[tuple["InMemoryVirtualPathBase", list["InMemoryVirtualPathBase"]]]:
616 # FIXME: can this be more "os.walk"-like without making it harder to implement?
617 if not self.is_dir: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true
618 yield self, []
619 return
620 by_basename = BY_BASENAME
621 stack = [self]
622 while stack:
623 current = stack.pop()
624 children = sorted(current.iterdir(), key=by_basename)
625 assert not children or current.is_dir
626 yield current, children
627 # Removing the directory counts as discarding the children.
628 if not current.is_detached: 628 ↛ 622line 628 didn't jump to line 622 because the condition on line 628 was always true
629 stack.extend(reversed(children))
631 def _orphan_safe_path(self) -> str:
632 if not self.is_detached or self._last_known_parent_path is not None: 632 ↛ 634line 632 didn't jump to line 634 because the condition on line 632 was always true
633 return self.path
634 return f"<orphaned>/{self.name}"
636 @property
637 def is_detached(self) -> bool:
638 parent = self._parent_dir
639 if parent is None:
640 return True
641 resolved_parent = parent()
642 if resolved_parent is None: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 return True
644 return resolved_parent.is_detached
646 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
647 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
648 # behavior to avoid surprises for now.
649 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
650 # to using it)
651 __iter__ = None
653 # Overridden for new return type
654 def __getitem__(self, key: object) -> "InMemoryVirtualPathBase":
655 raise NotImplementedError
657 # Overridden for new return type
658 def get(self, key: str) -> "InMemoryVirtualPathBase | None":
659 return typing.cast("InMemoryVirtualPathBase | None", super().get(key))
661 def _add_child(self, child: "InMemoryVirtualPathBase") -> None:
662 raise TypeError(
663 f"{self._orphan_safe_path()!r} is not a directory (or did not implement this method)"
664 )
666 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None:
667 raise TypeError(
668 f"{self._orphan_safe_path()!r} is not a directory (or did not implement this method)"
669 )
671 @property
672 def path(self) -> str:
673 parent_path = self.parent_dir_path
674 if (
675 self._parent_path_cache is not None
676 and self._parent_path_cache == parent_path
677 ):
678 return assume_not_none(self._path_cache)
679 if parent_path is None: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true
680 raise ReferenceError(
681 f"The path {self.name} is detached! {self.__class__.__name__}"
682 )
683 self._parent_path_cache = parent_path
684 ret = os.path.join(parent_path, self.name)
685 self._path_cache = ret
686 return ret
688 @property
689 def parent_dir(self) -> Optional["InMemoryVirtualPathBase"]:
690 p_ref = self._parent_dir
691 p = p_ref() if p_ref is not None else None
692 if p is None: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true
693 raise ReferenceError(
694 f"The path {self.name} is detached! {self.__class__.__name__}"
695 )
696 return p
698 @parent_dir.setter
699 def parent_dir(self, new_parent: Optional["InMemoryVirtualPathBase"]) -> None:
700 self._rw_check()
701 if new_parent is not None:
702 if not new_parent.is_dir: 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true
703 raise ValueError(
704 f"The parent {new_parent._orphan_safe_path()!r} must be a directory"
705 )
706 new_parent._rw_check()
707 old_parent = None
708 self._last_known_parent_path = None
709 if not self.is_detached:
710 old_parent = self.parent_dir
711 assume_not_none(old_parent)._remove_child(self)
712 if new_parent is not None:
713 self._parent_dir = ref(new_parent)
714 new_parent._add_child(self)
715 else:
716 if old_parent is not None and not old_parent.is_detached: 716 ↛ 718line 716 didn't jump to line 718 because the condition on line 716 was always true
717 self._last_known_parent_path = old_parent.path
718 self._parent_dir = None
719 self._parent_path_cache = None
721 @property
722 def parent_dir_path(self) -> str | None:
723 if self.is_detached: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 return self._last_known_parent_path
725 return assume_not_none(self.parent_dir).path
727 def chown(
728 self,
729 owner: StaticFileSystemOwner | None,
730 group: StaticFileSystemGroup | None,
731 ) -> None:
732 """Change the owner/group of this path
734 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
735 :param group: The desired group definition for this path. If None, then no change of group is performed.
736 """
737 self._rw_check()
739 if owner is not None:
740 self._owner = owner.ownership_definition
741 if group is not None:
742 self._group = group.ownership_definition
744 def stat(self) -> os.stat_result:
745 st = self._stat_cache
746 if st is None:
747 st = self._uncached_stat()
748 self._stat_cache = st
749 return st
751 def _uncached_stat(self) -> os.stat_result:
752 raise NotImplementedError
754 @property
755 def mode(self) -> int:
756 current_mode = self._mode
757 if current_mode is None: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true
758 current_mode = stat.S_IMODE(self.stat().st_mode)
759 self._mode = current_mode
760 return current_mode
762 @mode.setter
763 def mode(self, new_mode: int) -> None:
764 self._rw_check()
765 min_bit = 0o700 if self.is_dir else 0o400
766 if (new_mode & min_bit) != min_bit: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 omode = oct(new_mode)[2:]
768 omin = oct(min_bit)[2:]
769 raise ValueError(
770 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
771 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
772 " There are no paths that do not need these requirements met and they can cause"
773 " problems during build or on the final system."
774 )
775 self._mode = new_mode
777 def _ensure_min_mode(self) -> None:
778 min_bit = 0o700 if self.is_dir else 0o600
779 if self.has_fs_path and (self.mode & 0o600) != 0o600: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true
780 try:
781 fs_path = self.fs_path
782 except TestPathWithNonExistentFSPathError:
783 pass
784 else:
785 st = os.stat(fs_path)
786 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
787 _debug_log(
788 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
789 )
790 os.chmod(fs_path, new_fs_mode)
791 self.mode |= min_bit
793 def _resolve_initial_mtime(self) -> float:
794 raise NotImplementedError
796 @property
797 def mtime(self) -> float:
798 mtime = self._mtime
799 if mtime is None:
800 mtime = self._resolve_initial_mtime()
801 self._mtime = mtime
802 return mtime
804 @mtime.setter
805 def mtime(self, new_mtime: float) -> None:
806 self._rw_check()
807 self._mtime = new_mtime
809 @property
810 def _can_replace_inline(self) -> bool:
811 return False
813 @contextlib.contextmanager
814 def add_file(
815 self,
816 name: str,
817 *,
818 unlink_if_exists: bool = True,
819 use_fs_path_mode: bool = False,
820 mode: int = 0o0644,
821 mtime: float | None = None,
822 # Special-case parameters that are not exposed in the API
823 fs_basename_matters: bool = False,
824 subdir_key: str | None = None,
825 ) -> Iterator["InMemoryVirtualPathBase"]:
826 if "/" in name or name in {".", ".."}: 826 ↛ 827line 826 didn't jump to line 827 because the condition on line 826 was never true
827 raise ValueError(f'Invalid file name: "{name}"')
828 if not self.is_dir: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true
829 raise TypeError(
830 f"Cannot create {self._orphan_safe_path()}/{name}:"
831 f" {self._orphan_safe_path()} is not a directory"
832 )
833 self._rw_check()
834 existing = self.get(name)
835 if existing is not None: 835 ↛ 836line 835 didn't jump to line 836 because the condition on line 835 was never true
836 if not unlink_if_exists:
837 raise ValueError(
838 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
839 f" and exist_ok was False"
840 )
841 existing.unlink(recursive=False)
843 if fs_basename_matters and subdir_key is None: 843 ↛ 844line 843 didn't jump to line 844 because the condition on line 843 was never true
844 raise ValueError(
845 "When fs_basename_matters is True, a subdir_key must be provided"
846 )
848 directory = generated_content_dir(subdir_key=subdir_key)
850 if fs_basename_matters: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true
851 fs_path = os.path.join(directory, name)
852 with open(fs_path, "xb") as _:
853 # Ensure that the fs_path exists
854 pass
855 child = FSBackedFilePath(
856 name,
857 self,
858 fs_path,
859 replaceable_inline=True,
860 mtime=mtime,
861 )
862 yield child
863 else:
864 with tempfile.NamedTemporaryFile(
865 dir=directory, suffix=f"__{name}", delete=False
866 ) as fd:
867 fs_path = fd.name
868 child = FSBackedFilePath(
869 name,
870 self,
871 fs_path,
872 replaceable_inline=True,
873 mtime=mtime,
874 )
875 fd.close()
876 yield child
878 if use_fs_path_mode: 878 ↛ 880line 878 didn't jump to line 880 because the condition on line 878 was never true
879 # Ensure the caller can see the current mode
880 os.chmod(fs_path, mode)
881 _check_fs_path_is_file(fs_path, unlink_on_error=child)
882 child._reset_caches()
883 if not use_fs_path_mode: 883 ↛ exitline 883 didn't return from function 'add_file' because the condition on line 883 was always true
884 child.mode = mode
886 def insert_file_from_fs_path(
887 self,
888 name: str,
889 fs_path: str,
890 *,
891 exist_ok: bool = True,
892 use_fs_path_mode: bool = False,
893 mode: int = 0o0644,
894 require_copy_on_write: bool = True,
895 follow_symlinks: bool = True,
896 reference_path: VirtualPath | None = None,
897 ) -> "InMemoryVirtualPathBase":
898 if "/" in name or name in {".", ".."}: 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true
899 raise ValueError(f'Invalid file name: "{name}"')
900 if not self.is_dir: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true
901 raise TypeError(
902 f"Cannot create {self._orphan_safe_path()}/{name}:"
903 f" {self._orphan_safe_path()} is not a directory"
904 )
905 self._rw_check()
906 if name in self and not exist_ok: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true
907 raise ValueError(
908 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
909 f" and exist_ok was False"
910 )
911 new_fs_path = fs_path
912 if follow_symlinks:
913 if reference_path is not None: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 raise ValueError(
915 "The reference_path cannot be used with follow_symlinks"
916 )
917 new_fs_path = os.path.realpath(new_fs_path, strict=True)
919 fmode: int | None = mode
920 if use_fs_path_mode:
921 fmode = None
923 st = None
924 if reference_path is None:
925 st = os.lstat(new_fs_path)
926 if stat.S_ISDIR(st.st_mode): 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 raise ValueError(
928 f'The provided path "{fs_path}" is a directory. However, this'
929 " method does not support directories"
930 )
932 if not stat.S_ISREG(st.st_mode): 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true
933 if follow_symlinks:
934 raise ValueError(
935 f"The resolved fs_path ({new_fs_path}) was not a file."
936 )
937 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
938 return FSBackedFilePath(
939 name,
940 self,
941 new_fs_path,
942 initial_mode=fmode,
943 stat_cache=st,
944 replaceable_inline=not require_copy_on_write,
945 reference_path=reference_path,
946 )
948 def add_symlink(
949 self,
950 link_name: str,
951 link_target: str,
952 *,
953 reference_path: VirtualPath | None = None,
954 ) -> "InMemoryVirtualPathBase":
955 if "/" in link_name or link_name in {".", ".."}: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 raise ValueError(
957 f'Invalid file name: "{link_name}" (it must be a valid basename)'
958 )
959 if not self.is_dir: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true
960 raise TypeError(
961 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
962 f" {self._orphan_safe_path()} is not a directory"
963 )
964 self._rw_check()
966 existing = self.get(link_name)
967 if existing: 967 ↛ 969line 967 didn't jump to line 969 because the condition on line 967 was never true
968 # Emulate ln -sf with attempts a non-recursive unlink first.
969 existing.unlink(recursive=False)
971 return SymlinkVirtualPath(
972 link_name,
973 self,
974 link_target,
975 reference_path=reference_path,
976 )
978 def mkdir(
979 self,
980 name: str,
981 *,
982 reference_path: VirtualPath | None = None,
983 ) -> "InMemoryVirtualPathBase":
984 if "/" in name or name in {".", ".."}: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true
985 raise ValueError(
986 f'Invalid file name: "{name}" (it must be a valid basename)'
987 )
988 if not self.is_dir: 988 ↛ 989line 988 didn't jump to line 989 because the condition on line 988 was never true
989 raise TypeError(
990 f"Cannot create {self._orphan_safe_path()}/{name}:"
991 f" {self._orphan_safe_path()} is not a directory"
992 )
993 if reference_path is not None and not reference_path.is_dir: 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true
994 raise ValueError(
995 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
996 )
997 self._rw_check()
999 existing = self.get(name)
1000 if existing: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true
1001 raise ValueError(f"Path {existing.path} already exist")
1002 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1004 def mkdirs(self, path: str) -> "InMemoryVirtualPathBase":
1005 return cast("InMemoryVirtualPathBase", super().mkdirs(path))
1007 @property
1008 def is_read_write(self) -> bool:
1009 """When true, the file system entry may be mutated
1011 :return: Whether file system mutations are permitted.
1012 """
1013 if self.is_detached:
1014 return True
1015 return assume_not_none(self.parent_dir).is_read_write
1017 def unlink(self, *, recursive: bool = False) -> None:
1018 """Unlink a file or a directory
1020 This operation will detach the path from the file system (causing "is_detached" to return True).
1022 Note that the root directory cannot be deleted.
1024 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1025 as well. When False, an error is raised if the path is a non-empty directory
1026 """
1027 if self.is_detached: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true
1028 return
1029 if not recursive and self.is_dir and any(self.iterdir()): 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true
1030 raise ValueError(
1031 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1032 )
1033 # The .parent_dir setter does a _rw_check() for us.
1034 self.parent_dir = None
1036 def _reset_caches(self) -> None:
1037 self._mtime = None
1038 self._stat_cache = None
1040 def metadata(
1041 self,
1042 metadata_type: type[PMT],
1043 *,
1044 owning_plugin: str | None = None,
1045 ) -> PathMetadataReference[PMT]:
1046 current_plugin = self._current_plugin()
1047 if owning_plugin is None: 1047 ↛ 1049line 1047 didn't jump to line 1049 because the condition on line 1047 was always true
1048 owning_plugin = current_plugin
1049 metadata_key = (owning_plugin, metadata_type)
1050 metadata_value = self._metadata.get(metadata_key)
1051 if metadata_value is None:
1052 if self.is_detached: 1052 ↛ 1053line 1052 didn't jump to line 1053 because the condition on line 1052 was never true
1053 raise TypeError(
1054 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1055 )
1056 if not self.is_read_write:
1057 return AlwaysEmptyReadOnlyMetadataReference(
1058 owning_plugin,
1059 current_plugin,
1060 metadata_type,
1061 )
1062 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1063 self._metadata[metadata_key] = metadata_value
1064 return PathMetadataReferenceImplementation(
1065 self,
1066 current_plugin,
1067 metadata_value,
1068 )
1070 @contextlib.contextmanager
1071 def replace_fs_path_content(
1072 self,
1073 *,
1074 use_fs_path_mode: bool = False,
1075 ) -> Iterator[str]:
1076 if not self.is_file: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true
1077 raise TypeError(
1078 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1079 )
1080 self._rw_check()
1081 fs_path = self.fs_path
1082 if not self._can_replace_inline: 1082 ↛ 1094line 1082 didn't jump to line 1094 because the condition on line 1082 was always true
1083 fs_path = self.fs_path
1084 directory = generated_content_dir()
1085 with tempfile.NamedTemporaryFile(
1086 dir=directory, suffix=f"__{self.name}", delete=False
1087 ) as new_path_fd:
1088 new_path_fd.close()
1089 _cp_a(fs_path, new_path_fd.name)
1090 fs_path = new_path_fd.name
1091 self._replaced_path(fs_path)
1092 assert self.fs_path == fs_path
1094 current_mtime = self._mtime
1095 if current_mtime is not None:
1096 os.utime(fs_path, (current_mtime, current_mtime))
1098 current_mode = self.mode
1099 yield fs_path
1100 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1101 if not use_fs_path_mode: 1101 ↛ 1103line 1101 didn't jump to line 1103 because the condition on line 1101 was always true
1102 os.chmod(fs_path, current_mode)
1103 self._reset_caches()
1105 def _replaced_path(self, new_fs_path: str) -> None:
1106 raise NotImplementedError
1109class InMemoryVirtualPath(InMemoryVirtualPathBase, ABC):
1110 __slots__ = ("_children",)
1112 def __init__(
1113 self,
1114 basename: str,
1115 parent: Optional["InMemoryVirtualPathBase"],
1116 children: dict[str, "InMemoryVirtualPathBase"] | None = None,
1117 initial_mode: int | None = None,
1118 mtime: float | None = None,
1119 stat_cache: os.stat_result | None = None,
1120 ) -> None:
1121 super().__init__(
1122 basename,
1123 parent,
1124 initial_mode,
1125 mtime,
1126 stat_cache,
1127 )
1128 self._children = children
1130 def __repr__(self) -> str:
1131 return (
1132 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
1133 f" is_file={self.is_file},"
1134 f" is_dir={self.is_dir},"
1135 f" is_symlink={self.is_symlink},"
1136 f" has_fs_path={self.has_fs_path},"
1137 f" children_len={len(self._children) if self._children else 0})"
1138 )
1140 def iterdir(self) -> Iterable["InMemoryVirtualPathBase"]:
1141 if self._children is not None:
1142 yield from self._children.values()
1144 def __getitem__(self, key) -> "InMemoryVirtualPathBase":
1145 if self._children is None:
1146 raise KeyError(
1147 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
1148 )
1149 if isinstance(key, InMemoryVirtualPathBase): 1149 ↛ 1150line 1149 didn't jump to line 1150 because the condition on line 1149 was never true
1150 key = key.name
1151 return self._children[key]
1153 def __delitem__(self, key) -> None:
1154 self._rw_check()
1155 children = self._children
1156 if children is None: 1156 ↛ 1157line 1156 didn't jump to line 1157 because the condition on line 1156 was never true
1157 raise KeyError(key)
1158 del children[key]
1160 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None:
1161 children = assume_not_none(self._children)
1162 del children[child.name]
1164 def _add_child(self, child: "InMemoryVirtualPathBase") -> None:
1165 self._rw_check()
1166 if not self.is_dir: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true
1167 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
1168 if self._children is None:
1169 self._children = {}
1171 conflict_child = self.get(child.name)
1172 if conflict_child is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true
1173 conflict_child.unlink(recursive=True)
1174 self._children[child.name] = child
1176 def _uncached_stat(self) -> os.stat_result:
1177 return os.lstat(self.fs_path)
1179 def _resolve_initial_mtime(self) -> float:
1180 return self.stat().st_mtime
1183class InMemoryOverlayFSDirectory(InMemoryVirtualPathBase):
1184 __slots__ = (
1185 "_underlying_directory",
1186 "_virtual_children",
1187 "_deleted_children",
1188 "_has_underlying_read_children",
1189 )
1191 def __init__(
1192 self,
1193 underlying_directory: VirtualPath,
1194 basename: str,
1195 parent: Optional["InMemoryVirtualPathBase"],
1196 initial_mode: int | None = None,
1197 mtime: float | None = None,
1198 stat_cache: os.stat_result | None = None,
1199 ) -> None:
1200 super().__init__(
1201 basename,
1202 parent,
1203 initial_mode,
1204 mtime,
1205 stat_cache,
1206 )
1207 self._underlying_directory: VirtualPath = underlying_directory
1208 self._virtual_children: dict[str, InMemoryVirtualPathBase] = {}
1209 self._deleted_children = set[str]()
1210 self._has_underlying_read_children = False
1212 def __getitem__(self, key: object) -> "InMemoryVirtualPathBase":
1213 if not isinstance(key, str) or key in self._deleted_children:
1214 raise KeyError(key)
1215 if c := self._virtual_children.get(key):
1216 return c
1217 if not self._ensure_read_underlying_children():
1218 raise KeyError(key)
1219 return self._virtual_children[key]
1221 def _ensure_read_underlying_children(self) -> bool:
1222 if self._has_underlying_read_children:
1223 return False
1224 for real_child in self._underlying_directory.iterdir():
1225 basename = real_child.name
1226 if basename in self._virtual_children or basename in self._deleted_children:
1227 continue
1228 if real_child.is_dir:
1229 child = InMemoryOverlayFSDirectory(
1230 real_child,
1231 basename,
1232 self,
1233 )
1234 elif real_child.is_symlink:
1235 child = SymlinkVirtualPath(
1236 basename,
1237 self,
1238 real_child.readlink(),
1239 reference_path=real_child,
1240 )
1241 else:
1242 assert real_child.is_file and real_child.has_fs_path
1243 child = FSBackedFilePath(
1244 basename,
1245 self,
1246 real_child.fs_path,
1247 reference_path=real_child,
1248 )
1249 self._virtual_children[child.name] = child
1251 def _add_child(self, child: "InMemoryVirtualPathBase") -> None:
1252 self._virtual_children[child.name] = child
1254 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None:
1255 if child.name in self._deleted_children:
1256 raise KeyError(child.name)
1257 try:
1258 del self._virtual_children[child.name]
1259 deleted = True
1260 except KeyError:
1261 deleted = False
1262 if not deleted and child.name not in self._underlying_directory:
1263 raise KeyError(child.name)
1264 self._deleted_children.add(child.name)
1266 def _uncached_stat(self) -> os.stat_result:
1267 try:
1268 return self._underlying_directory.stat() # type: ignore
1269 except AttributeError:
1270 raise PureVirtualPathError("The stat method has not been implemented")
1272 def _resolve_initial_mtime(self) -> float:
1273 return self._underlying_directory.mtime
1276class InMemoryOverlayFSRootDirectory(InMemoryOverlayFSDirectory):
1277 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1279 def __init__(self, underlying_directory: VirtualPath) -> None:
1280 self._fs_read_write = True
1281 super().__init__(
1282 underlying_directory,
1283 ".",
1284 None,
1285 initial_mode=0o755,
1286 )
1287 self._plugin_context = CurrentPluginContextManager("debputy")
1289 def is_root_dir(self) -> bool:
1290 return True
1292 @property
1293 def is_detached(self) -> bool:
1294 return False
1296 def unlink(self, *, recursive: bool = False) -> None:
1297 # There is never a case where you want to delete this directory (and even if you could,
1298 # debputy will need it for technical reasons, so the root dir stays)
1299 raise TypeError("Cannot delete the root directory")
1301 def _current_plugin(self) -> str:
1302 return self._plugin_context.current_plugin_name
1304 @contextlib.contextmanager
1305 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1306 with self._plugin_context.change_plugin_context(new_plugin) as r:
1307 yield r
1310class VirtualFSPathBase(InMemoryVirtualPath, ABC):
1311 __slots__ = ()
1313 def __init__(
1314 self,
1315 basename: str,
1316 parent: Optional["InMemoryVirtualPathBase"],
1317 children: dict[str, "InMemoryVirtualPathBase"] | None = None,
1318 initial_mode: int | None = None,
1319 mtime: float | None = None,
1320 stat_cache: os.stat_result | None = None,
1321 ) -> None:
1322 super().__init__(
1323 basename,
1324 parent,
1325 children,
1326 initial_mode=initial_mode,
1327 mtime=mtime,
1328 stat_cache=stat_cache,
1329 )
1331 def _resolve_initial_mtime(self) -> float:
1332 return time.time()
1334 @property
1335 def has_fs_path(self) -> bool:
1336 return False
1338 def stat(self) -> os.stat_result:
1339 if not self.has_fs_path:
1340 raise PureVirtualPathError(
1341 "stat() is only applicable to paths backed by the file system. The path"
1342 f" {self._orphan_safe_path()!r} is purely virtual"
1343 )
1344 return super().stat()
1346 @property
1347 def fs_path(self) -> str:
1348 if not self.has_fs_path:
1349 raise PureVirtualPathError(
1350 "fs_path is only applicable to paths backed by the file system. The path"
1351 f" {self._orphan_safe_path()!r} is purely virtual"
1352 )
1353 return self.fs_path
1356class InMemoryVirtualRootDir(InMemoryVirtualPath):
1357 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1359 def __init__(self, fs_path: str | None = None) -> None:
1360 self._fs_path = fs_path
1361 self._fs_read_write = True
1362 super().__init__(
1363 ".",
1364 None,
1365 children={},
1366 initial_mode=0o755,
1367 )
1368 self._plugin_context = CurrentPluginContextManager("debputy")
1370 @property
1371 def is_detached(self) -> bool:
1372 return False
1374 def _orphan_safe_path(self) -> str:
1375 return self.name
1377 @property
1378 def path(self) -> str:
1379 return self.name
1381 @property
1382 def parent_dir(self) -> Optional["InMemoryVirtualPathBase"]:
1383 return None
1385 @parent_dir.setter
1386 def parent_dir(self, new_parent: InMemoryVirtualPathBase | None) -> None:
1387 if new_parent is not None:
1388 raise ValueError("The root directory cannot become a non-root directory")
1390 @property
1391 def parent_dir_path(self) -> str | None:
1392 return None
1394 @property
1395 def is_dir(self) -> bool:
1396 return True
1398 @property
1399 def is_file(self) -> bool:
1400 return False
1402 @property
1403 def is_symlink(self) -> bool:
1404 return False
1406 def readlink(self) -> str:
1407 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1409 @property
1410 def has_fs_path(self) -> bool:
1411 return self._fs_path is not None
1413 def stat(self) -> os.stat_result:
1414 if not self.has_fs_path:
1415 raise PureVirtualPathError(
1416 "stat() is only applicable to paths backed by the file system. The path"
1417 f" {self._orphan_safe_path()!r} is purely virtual"
1418 )
1419 return os.stat(self.fs_path)
1421 @property
1422 def fs_path(self) -> str:
1423 if not self.has_fs_path: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true
1424 raise PureVirtualPathError(
1425 "fs_path is only applicable to paths backed by the file system. The path"
1426 f" {self._orphan_safe_path()!r} is purely virtual"
1427 )
1428 return assume_not_none(self._fs_path)
1430 @property
1431 def is_read_write(self) -> bool:
1432 return self._fs_read_write
1434 @is_read_write.setter
1435 def is_read_write(self, new_value: bool) -> None:
1436 self._fs_read_write = new_value
1438 def unlink(self, *, recursive: bool = False) -> None:
1439 # There is never a case where you want to delete this directory (and even if you could,
1440 # debputy will need it for technical reasons, so the root dir stays)
1441 raise TypeError("Cannot delete the root directory")
1443 def _current_plugin(self) -> str:
1444 return self._plugin_context.current_plugin_name
1446 @contextlib.contextmanager
1447 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1448 with self._plugin_context.change_plugin_context(new_plugin) as r:
1449 yield r
1452class VirtualPathWithReference(VirtualFSPathBase, ABC):
1453 __slots__ = ("_reference_path",)
1455 def __init__(
1456 self,
1457 basename: str,
1458 parent: InMemoryVirtualPathBase,
1459 *,
1460 default_mode: int,
1461 reference_path: VirtualPath | None = None,
1462 ) -> None:
1463 super().__init__(
1464 basename,
1465 parent=parent,
1466 initial_mode=reference_path.mode if reference_path else default_mode,
1467 )
1468 self._reference_path = reference_path
1470 @property
1471 def has_fs_path(self) -> bool:
1472 ref_path = self._reference_path
1473 return ref_path is not None and ref_path.has_fs_path
1475 def _resolve_initial_mtime(self) -> float:
1476 ref_path = self._reference_path
1477 if ref_path: 1477 ↛ 1479line 1477 didn't jump to line 1479 because the condition on line 1477 was always true
1478 return ref_path.mtime
1479 return super()._resolve_initial_mtime()
1481 @property
1482 def fs_path(self) -> str:
1483 ref_path = self._reference_path
1484 if ref_path is not None and ( 1484 ↛ 1488line 1484 didn't jump to line 1488 because the condition on line 1484 was always true
1485 not super().has_fs_path or super().fs_path == ref_path.fs_path
1486 ):
1487 return ref_path.fs_path
1488 return super().fs_path
1490 def stat(self) -> os.stat_result:
1491 ref_path = self._reference_path
1492 if ref_path is not None and (
1493 not super().has_fs_path or super().fs_path == ref_path.fs_path
1494 ):
1495 return typing.cast(VirtualPathBase, ref_path).stat()
1496 return super().stat()
1498 @overload
1499 def open( 1499 ↛ exitline 1499 didn't return from function 'open' because
1500 self,
1501 *,
1502 byte_io: Literal[False] = False,
1503 buffering: int = -1,
1504 ) -> TextIO: ...
1506 @overload
1507 def open( 1507 ↛ exitline 1507 didn't return from function 'open' because
1508 self,
1509 *,
1510 byte_io: Literal[True],
1511 buffering: Literal[0] = ...,
1512 ) -> io.FileIO: ...
1514 @overload
1515 def open( 1515 ↛ exitline 1515 didn't return from function 'open' because
1516 self,
1517 *,
1518 byte_io: Literal[True],
1519 buffering: int = -1,
1520 ) -> io.BufferedReader: ...
1522 def open(self, *, byte_io=False, buffering=-1):
1523 reference_path = self._reference_path
1524 if reference_path is not None and reference_path.fs_path == self.fs_path:
1525 return reference_path.open(byte_io=byte_io, buffering=buffering)
1526 return super().open(byte_io=byte_io, buffering=buffering)
1529class VirtualDirectoryFSPath(VirtualPathWithReference):
1530 __slots__ = ("_reference_path",)
1532 def __init__(
1533 self,
1534 basename: str,
1535 parent: InMemoryVirtualPathBase,
1536 *,
1537 reference_path: VirtualPath | None = None,
1538 ) -> None:
1539 super().__init__(
1540 basename,
1541 parent,
1542 reference_path=reference_path,
1543 default_mode=0o755,
1544 )
1545 self._reference_path = reference_path
1546 assert reference_path is None or reference_path.is_dir
1547 self._ensure_min_mode()
1549 @property
1550 def is_dir(self) -> bool:
1551 return True
1553 @property
1554 def is_file(self) -> bool:
1555 return False
1557 @property
1558 def is_symlink(self) -> bool:
1559 return False
1561 def readlink(self) -> str:
1562 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1565class SymlinkVirtualPath(VirtualPathWithReference):
1566 __slots__ = ("_link_target",)
1568 def __init__(
1569 self,
1570 basename: str,
1571 parent_dir: InMemoryVirtualPathBase,
1572 link_target: str,
1573 *,
1574 reference_path: VirtualPath | None = None,
1575 ) -> None:
1576 super().__init__(
1577 basename,
1578 parent=parent_dir,
1579 default_mode=_SYMLINK_MODE,
1580 reference_path=reference_path,
1581 )
1582 self._link_target = link_target
1584 @property
1585 def is_dir(self) -> bool:
1586 return False
1588 @property
1589 def is_file(self) -> bool:
1590 return False
1592 @property
1593 def is_symlink(self) -> bool:
1594 return True
1596 def readlink(self) -> str:
1597 return self._link_target
1599 @property
1600 def size(self) -> int:
1601 return len(self.readlink())
1604class FSBackedFilePath(VirtualPathWithReference):
1605 __slots__ = ("_fs_path", "_replaceable_inline")
1607 def __init__(
1608 self,
1609 basename: str,
1610 parent_dir: InMemoryVirtualPathBase,
1611 fs_path: str,
1612 *,
1613 replaceable_inline: bool = False,
1614 initial_mode: int | None = None,
1615 mtime: float | None = None,
1616 stat_cache: os.stat_result | None = None,
1617 reference_path: VirtualPath | None = None,
1618 ) -> None:
1619 super().__init__(
1620 basename,
1621 parent_dir,
1622 default_mode=0o644,
1623 reference_path=reference_path,
1624 )
1625 self._fs_path = fs_path
1626 self._replaceable_inline = replaceable_inline
1627 if initial_mode is not None:
1628 self.mode = initial_mode
1629 if mtime is not None:
1630 self._mtime = mtime
1631 self._stat_cache = stat_cache
1632 assert (
1633 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1634 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1635 self._ensure_min_mode()
1637 @property
1638 def is_dir(self) -> bool:
1639 return False
1641 @property
1642 def is_file(self) -> bool:
1643 return True
1645 @property
1646 def is_symlink(self) -> bool:
1647 return False
1649 def readlink(self) -> str:
1650 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1652 @property
1653 def has_fs_path(self) -> bool:
1654 return True
1656 @property
1657 def fs_path(self) -> str:
1658 return self._fs_path
1660 @property
1661 def _can_replace_inline(self) -> bool:
1662 return self._replaceable_inline
1664 def _replaced_path(self, new_fs_path: str) -> None:
1665 self._fs_path = new_fs_path
1666 self._reference_path = None
1667 self._replaceable_inline = True
1670_SYMLINK_MODE = 0o777
1673class VirtualTestPath(InMemoryVirtualPath):
1674 __slots__ = (
1675 "_path_type",
1676 "_has_fs_path",
1677 "_fs_path",
1678 "_link_target",
1679 "_content",
1680 "_materialized_content",
1681 )
1683 def __init__(
1684 self,
1685 basename: str,
1686 parent_dir: InMemoryVirtualPathBase | None,
1687 mode: int | None = None,
1688 mtime: float | None = None,
1689 is_dir: bool = False,
1690 has_fs_path: bool | None = False,
1691 fs_path: str | None = None,
1692 link_target: str | None = None,
1693 content: str | None = None,
1694 materialized_content: str | None = None,
1695 ) -> None:
1696 if is_dir:
1697 self._path_type = PathType.DIRECTORY
1698 elif link_target is not None:
1699 self._path_type = PathType.SYMLINK
1700 if mode is not None and mode != _SYMLINK_MODE: 1700 ↛ 1701line 1700 didn't jump to line 1701 because the condition on line 1700 was never true
1701 raise ValueError(
1702 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1703 )
1704 assert mode is None or mode == _SYMLINK_MODE
1705 else:
1706 self._path_type = PathType.FILE
1708 if mode is not None:
1709 initial_mode = mode
1710 else:
1711 initial_mode = 0o755 if is_dir else 0o644
1713 self._link_target = link_target
1714 if has_fs_path is None:
1715 has_fs_path = bool(fs_path)
1716 self._has_fs_path = has_fs_path
1717 self._fs_path = fs_path
1718 self._materialized_content = materialized_content
1719 super().__init__(
1720 basename,
1721 parent=parent_dir,
1722 initial_mode=initial_mode,
1723 mtime=mtime,
1724 )
1725 self._content = content
1727 @property
1728 def is_dir(self) -> bool:
1729 return self._path_type == PathType.DIRECTORY
1731 @property
1732 def is_file(self) -> bool:
1733 return self._path_type == PathType.FILE
1735 @property
1736 def is_symlink(self) -> bool:
1737 return self._path_type == PathType.SYMLINK
1739 def readlink(self) -> str:
1740 if not self.is_symlink: 1740 ↛ 1741line 1740 didn't jump to line 1741 because the condition on line 1740 was never true
1741 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1742 link_target = self._link_target
1743 assert link_target is not None
1744 return link_target
1746 def _resolve_initial_mtime(self) -> float:
1747 return time.time()
1749 @property
1750 def has_fs_path(self) -> bool:
1751 return self._has_fs_path
1753 def stat(self) -> os.stat_result:
1754 if self.has_fs_path:
1755 path = self.fs_path
1756 if path is None: 1756 ↛ 1757line 1756 didn't jump to line 1757 because the condition on line 1756 was never true
1757 raise PureVirtualPathError(
1758 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1759 " cannot provide!"
1760 )
1761 try:
1762 return os.stat(path)
1763 except FileNotFoundError as e:
1764 raise PureVirtualPathError(
1765 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1766 " cannot provide! (An fs_path was provided, but it did not exist)"
1767 ) from e
1769 raise PureVirtualPathError(
1770 "stat() is only applicable to paths backed by the file system. The path"
1771 f" {self._orphan_safe_path()!r} is purely virtual"
1772 )
1774 @property
1775 def size(self) -> int:
1776 if self._content is not None:
1777 return len(self._content.encode("utf-8"))
1778 if self.is_symlink:
1779 return len(self.readlink())
1780 if not self.has_fs_path or self.fs_path is None:
1781 return 0
1782 return self.stat().st_size
1784 @property
1785 def fs_path(self) -> str:
1786 if self.has_fs_path:
1787 if self._fs_path is None and self._materialized_content is not None:
1788 with tempfile.NamedTemporaryFile(
1789 mode="w+t",
1790 encoding="utf-8",
1791 suffix=f"__{self.name}",
1792 delete=False,
1793 ) as fd:
1794 filepath = fd.name
1795 fd.write(self._materialized_content)
1796 self._fs_path = filepath
1797 atexit.register(lambda: os.unlink(filepath))
1799 path = self._fs_path
1800 if path is None: 1800 ↛ 1801line 1800 didn't jump to line 1801 because the condition on line 1800 was never true
1801 raise PureVirtualPathError(
1802 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1803 " mock path cannot provide!"
1804 )
1805 return path
1806 raise PureVirtualPathError(
1807 "fs_path is only applicable to paths backed by the file system. The path"
1808 f" {self._orphan_safe_path()!r} is purely virtual"
1809 )
1811 def replace_fs_path_content(
1812 self,
1813 *,
1814 use_fs_path_mode: bool = False,
1815 ) -> ContextManager[str]:
1816 if self._content is not None: 1816 ↛ 1817line 1816 didn't jump to line 1817 because the condition on line 1816 was never true
1817 raise TypeError(
1818 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1819 " created with `content` but for this method to work, the path should have been"
1820 " created with `materialized_content`"
1821 )
1822 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1824 @overload
1825 def open_child( 1825 ↛ exitline 1825 didn't return from function 'open_child' because
1826 self,
1827 name: str,
1828 mode: TextOpenMode = "r",
1829 buffering: int = -1,
1830 ) -> TextIO: ...
1832 @overload
1833 def open_child( 1833 ↛ exitline 1833 didn't return from function 'open_child' because
1834 self,
1835 name: str,
1836 mode: BinaryOpenMode,
1837 buffering: int = -1,
1838 ) -> BinaryIO: ...
1840 @contextlib.contextmanager
1841 def open_child(self, name, mode="r", buffering=-1):
1842 existing = self.get(name)
1843 if existing or "r" in mode:
1844 with super().open_child(name, mode, buffering=buffering) as fd:
1845 yield fd
1846 return
1847 if "b" in mode:
1848 fd = io.BytesIO(b"")
1849 yield fd
1850 content = fd.getvalue().decode("utf-8")
1851 else:
1852 fd = io.StringIO("")
1853 yield fd
1854 content = fd.getvalue()
1855 VirtualTestPath(
1856 name,
1857 self,
1858 mode=0o644,
1859 content=content,
1860 has_fs_path=True,
1861 )
1863 @overload
1864 def open( 1864 ↛ exitline 1864 didn't return from function 'open' because
1865 self,
1866 *,
1867 byte_io: Literal[False] = False,
1868 buffering: int = -1,
1869 ) -> TextIO: ...
1871 @overload
1872 def open( 1872 ↛ exitline 1872 didn't return from function 'open' because
1873 self,
1874 *,
1875 byte_io: Literal[True],
1876 buffering: Literal[0] = ...,
1877 ) -> io.FileIO: ...
1879 @overload
1880 def open( 1880 ↛ exitline 1880 didn't return from function 'open' because
1881 self,
1882 *,
1883 byte_io: Literal[True],
1884 buffering: int = -1,
1885 ) -> io.BufferedReader: ...
1887 def open(self, *, byte_io=False, buffering=-1):
1888 if self._content is None:
1889 try:
1890 return super().open(byte_io=byte_io, buffering=buffering)
1891 except FileNotFoundError as e:
1892 raise TestPathWithNonExistentFSPathError(
1893 f"The test path {self.path} had an fs_path {self._fs_path}, which does not"
1894 " exist. This exception can only occur in the testsuite. Either have the"
1895 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1896 " if that is too painful in general, have the code accept this error as a "
1897 " test only-case and provide a default."
1898 ) from e
1900 if byte_io:
1901 return io.BytesIO(self._content.encode("utf-8"))
1902 return io.StringIO(self._content)
1904 def _replaced_path(self, new_fs_path: str) -> None:
1905 self._fs_path = new_fs_path
1908class OSFSOverlayBase(VirtualPathBase, Generic[FSP]):
1909 __slots__ = (
1910 "_path",
1911 "_fs_path",
1912 "_parent",
1913 "__weakref__",
1914 )
1916 def __init__(
1917 self,
1918 path: str,
1919 fs_path: str,
1920 parent: FSP | None,
1921 ) -> None:
1922 self._path: str = path
1923 prefix = "/" if fs_path.startswith("/") else ""
1924 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1925 self._parent: ReferenceType[FSP] | None = (
1926 ref(parent) if parent is not None else None
1927 )
1929 @property
1930 def name(self) -> str:
1931 return os.path.basename(self._path)
1933 @property
1934 def path(self) -> str:
1935 return self._path
1937 @property
1938 def parent_dir(self) -> Optional["FSP"]:
1939 parent = self._parent
1940 if parent is None:
1941 return None
1942 resolved = parent()
1943 if resolved is None:
1944 raise RuntimeError("Parent was garbage collected!")
1945 return resolved
1947 @property
1948 def fs_path(self) -> str:
1949 return self._fs_path
1951 def stat(self) -> os.stat_result:
1952 return os.lstat(self.fs_path)
1954 @property
1955 def is_dir(self) -> bool:
1956 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1957 try:
1958 return stat.S_ISDIR(self.stat().st_mode)
1959 except FileNotFoundError:
1960 return False
1962 @property
1963 def is_file(self) -> bool:
1964 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1965 try:
1966 return stat.S_ISREG(self.stat().st_mode)
1967 except FileNotFoundError:
1968 return False
1970 @property
1971 def is_symlink(self) -> bool:
1972 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1973 try:
1974 return stat.S_ISLNK(self.stat().st_mode)
1975 except FileNotFoundError:
1976 return False
1978 @property
1979 def has_fs_path(self) -> bool:
1980 return True
1982 @overload
1983 def open( 1983 ↛ exitline 1983 didn't return from function 'open' because
1984 self,
1985 *,
1986 byte_io: Literal[False] = False,
1987 buffering: int = -1,
1988 ) -> TextIO: ...
1990 @overload
1991 def open( 1991 ↛ exitline 1991 didn't return from function 'open' because
1992 self,
1993 *,
1994 byte_io: Literal[True],
1995 buffering: Literal[0] = ...,
1996 ) -> io.FileIO: ...
1998 @overload
1999 def open( 1999 ↛ exitline 1999 didn't return from function 'open' because
2000 self,
2001 *,
2002 byte_io: Literal[True],
2003 buffering: int = -1,
2004 ) -> io.BufferedReader: ...
2006 def open(self, *, byte_io=False, buffering=-1):
2007 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
2008 # case.
2009 if not self.is_file and not self.is_symlink:
2010 raise TypeError(
2011 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
2012 )
2014 if byte_io:
2015 return open(self.fs_path, "rb", buffering=buffering)
2016 return open(self.fs_path, encoding="utf-8", buffering=buffering)
2018 def metadata(
2019 self,
2020 metadata_type: type[PMT],
2021 *,
2022 owning_plugin: str | None = None,
2023 ) -> PathMetadataReference[PMT]:
2024 current_plugin = self._current_plugin()
2025 if owning_plugin is None:
2026 owning_plugin = current_plugin
2027 return AlwaysEmptyReadOnlyMetadataReference(
2028 owning_plugin,
2029 current_plugin,
2030 metadata_type,
2031 )
2033 def all_paths(self) -> Iterable["OSFSControlPath"]:
2034 yield cast("OSFSControlPath", self)
2035 if not self.is_dir:
2036 return
2037 stack = list(self.iterdir())
2038 stack.reverse()
2039 while stack:
2040 current = cast("OSFSControlPath", stack.pop())
2041 yield current
2042 if current.is_dir:
2043 stack.extend(reversed(list(current.iterdir())))
2045 def _resolve_children(
2046 self,
2047 new_child: Callable[[str, str, FSP], FSC],
2048 ) -> Mapping[str, FSC]:
2049 if not self.is_dir:
2050 return {}
2051 dir_path = self.path
2052 dir_fs_path = self.fs_path
2053 children = {}
2054 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
2055 child_path = os.path.join(dir_path, name) if dir_path != "." else name
2056 child_fs_path = (
2057 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
2058 )
2059 children[name] = new_child(
2060 child_path,
2061 child_fs_path,
2062 cast("FSP", self),
2063 )
2064 return children
2067class OSFSROOverlay(OSFSOverlayBase["OSFSROOverlay"]):
2068 __slots__ = (
2069 "_stat_cache",
2070 "_readlink_cache",
2071 "_children",
2072 "_stat_failed_cache",
2073 )
2075 def __init__(
2076 self,
2077 path: str,
2078 fs_path: str,
2079 parent: Optional["OSFSROOverlay"],
2080 ) -> None:
2081 super().__init__(path, fs_path, parent=parent)
2082 self._stat_cache: os.stat_result | None = None
2083 self._readlink_cache: str | None = None
2084 self._stat_failed_cache = False
2085 self._children: Mapping[str, OSFSROOverlay] | None = None
2087 @classmethod
2088 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay":
2089 return OSFSROOverlay(path, fs_path, None)
2091 def iterdir(self) -> Iterable["OSFSROOverlay"]:
2092 if not self.is_dir:
2093 return
2094 if self._children is None:
2095 self._ensure_children_are_resolved()
2096 yield from assume_not_none(self._children).values()
2098 def lookup(self, path: str) -> Optional["OSFSROOverlay"]:
2099 if not self.is_dir:
2100 return None
2101 if self._children is None:
2102 self._ensure_children_are_resolved()
2104 absolute, _, path_parts = _split_path(path)
2105 current = cast("OSFSROOverlay", _root(self)) if absolute else self
2106 for no, dir_part in enumerate(path_parts):
2107 if dir_part == ".":
2108 continue
2109 if dir_part == "..":
2110 if current.is_root_dir():
2111 raise ValueError(f'The path "{path}" escapes the root dir')
2112 p = current.parent_dir
2113 assert p is not None # Type hint
2114 current = cast("OSFSROOverlay", p)
2115 continue
2116 try:
2117 current = cast("OSFSROOverlay", current[dir_part])
2118 except KeyError:
2119 return None
2120 return current
2122 def _ensure_children_are_resolved(self) -> None:
2123 if not self.is_dir or self._children:
2124 return
2125 self._children = self._resolve_children(
2126 lambda n, fsp, p: OSFSROOverlay(n, fsp, p)
2127 )
2129 @property
2130 def is_detached(self) -> bool:
2131 return False
2133 def __getitem__(self, key) -> "VirtualPath":
2134 if not self.is_dir: 2134 ↛ 2136line 2134 didn't jump to line 2136 because the condition on line 2134 was always true
2135 raise KeyError(key)
2136 if self._children is None:
2137 self._ensure_children_are_resolved()
2138 if isinstance(key, InMemoryVirtualPathBase):
2139 key = key.name
2140 return assume_not_none(self._children)[key]
2142 def __delitem__(self, key) -> Never:
2143 self._error_ro_fs()
2145 @property
2146 def is_read_write(self) -> bool:
2147 return False
2149 def _rw_check(self) -> Never:
2150 self._error_ro_fs()
2152 def _error_ro_fs(self) -> Never:
2153 raise DebputyFSIsROError(
2154 f'Attempt to write to "{self.path}" failed:'
2155 " Debputy Virtual File system is R/O."
2156 )
2158 def stat(self) -> os.stat_result:
2159 if self._stat_failed_cache: 2159 ↛ 2160line 2159 didn't jump to line 2160 because the condition on line 2159 was never true
2160 raise FileNotFoundError(
2161 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
2162 )
2164 if self._stat_cache is None: 2164 ↛ 2170line 2164 didn't jump to line 2170 because the condition on line 2164 was always true
2165 try:
2166 self._stat_cache = os.lstat(self.fs_path)
2167 except FileNotFoundError:
2168 self._stat_failed_cache = True
2169 raise
2170 return self._stat_cache
2172 @property
2173 def mode(self) -> int:
2174 return stat.S_IMODE(self.stat().st_mode)
2176 @mode.setter
2177 def mode(self, _unused: int) -> Never:
2178 self._error_ro_fs()
2180 @property
2181 def mtime(self) -> float:
2182 return self.stat().st_mtime
2184 @mtime.setter
2185 def mtime(self, new_mtime: float) -> Never:
2186 self._error_ro_fs()
2188 def readlink(self) -> str:
2189 if not self.is_symlink:
2190 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2191 if self._readlink_cache is None:
2192 self._readlink_cache = os.readlink(self.fs_path)
2193 return self._readlink_cache
2195 def chown(
2196 self,
2197 owner: StaticFileSystemOwner | None,
2198 group: StaticFileSystemGroup | None,
2199 ) -> Never:
2200 self._error_ro_fs()
2202 def mkdir(self, name: str) -> Never:
2203 self._error_ro_fs()
2205 def add_file(
2206 self,
2207 name: str,
2208 *,
2209 unlink_if_exists: bool = True,
2210 use_fs_path_mode: bool = False,
2211 mode: int = 0o0644,
2212 mtime: float | None = None,
2213 ) -> Never:
2214 self._error_ro_fs()
2216 def add_symlink(self, link_name: str, link_target: str) -> Never:
2217 self._error_ro_fs()
2219 def unlink(self, *, recursive: bool = False) -> Never:
2220 self._error_ro_fs()
2223class OSFSROOverlayRootDir(OSFSROOverlay):
2224 __slots__ = ("_plugin_context",)
2226 def __init__(self, path: str, fs_path: str) -> None:
2227 super().__init__(path, fs_path, None)
2228 self._plugin_context = CurrentPluginContextManager("debputy")
2230 def _current_plugin(self) -> str:
2231 return self._plugin_context.current_plugin_name
2233 @contextlib.contextmanager
2234 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2235 with self._plugin_context.change_plugin_context(new_plugin) as r:
2236 yield r
2239class OSFSControlPath(OSFSOverlayBase["OSFSControlPath"]):
2241 def iterdir(self) -> Iterable["OSFSControlPath"]:
2242 if not self.is_dir:
2243 return
2244 yield from self._resolve_children(
2245 lambda n, fsp, p: OSFSControlPath(n, fsp, p)
2246 ).values()
2248 def lookup(self, path: str) -> Optional["OSFSControlPath"]:
2249 if not self.is_dir:
2250 return None
2252 absolute, _, path_parts = _split_path(path)
2253 current = cast("OSFSControlPath", _root(self)) if absolute else self
2254 for no, dir_part in enumerate(path_parts):
2255 if dir_part == ".":
2256 continue
2257 if dir_part == "..":
2258 if current.is_root_dir():
2259 raise ValueError(f'The path "{path}" escapes the root dir')
2260 p = current.parent_dir
2261 assert p is not None # type hint
2262 current = cast("OSFSControlPath", p)
2263 continue
2264 try:
2265 current = cast("OSFSControlPath", current[dir_part])
2266 except KeyError:
2267 return None
2268 return current
2270 @property
2271 def is_detached(self) -> bool:
2272 try:
2273 self.stat()
2274 except FileNotFoundError:
2275 return True
2276 else:
2277 return False
2279 def __getitem__(self, key) -> "VirtualPath":
2280 if not self.is_dir:
2281 raise KeyError(key)
2282 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p))
2283 if isinstance(key, InMemoryVirtualPathBase):
2284 key = key.name
2285 return children[key]
2287 def __delitem__(self, key) -> None:
2288 self[key].unlink()
2290 @property
2291 def is_read_write(self) -> bool:
2292 return True
2294 @property
2295 def mode(self) -> int:
2296 return stat.S_IMODE(self.stat().st_mode)
2298 @mode.setter
2299 def mode(self, new_mode: int) -> None:
2300 os.chmod(self.fs_path, new_mode)
2302 @property
2303 def mtime(self) -> float:
2304 return self.stat().st_mtime
2306 @mtime.setter
2307 def mtime(self, new_mtime: float) -> None:
2308 os.utime(self.fs_path, (new_mtime, new_mtime))
2310 def readlink(self) -> Never:
2311 if not self.is_symlink:
2312 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2313 assert False
2315 def chown(
2316 self,
2317 owner: StaticFileSystemOwner | None,
2318 group: StaticFileSystemGroup | None,
2319 ) -> None:
2320 raise ValueError(
2321 "No need to chown paths in the control.tar: They are always root:root"
2322 )
2324 def mkdir(self, name: str) -> Never:
2325 raise TypeError("The control.tar never contains subdirectories.")
2327 @contextlib.contextmanager
2328 def add_file(
2329 self,
2330 name: str,
2331 *,
2332 unlink_if_exists: bool = True,
2333 use_fs_path_mode: bool = False,
2334 mode: int = 0o0644,
2335 mtime: float | None = None,
2336 ) -> Iterator["VirtualPath"]:
2337 if "/" in name or name in {".", ".."}:
2338 raise ValueError(f'Invalid file name: "{name}"')
2339 if not self.is_dir:
2340 raise TypeError(
2341 f"Cannot create {self._orphan_safe_path()}/{name}:"
2342 f" {self._orphan_safe_path()} is not a directory"
2343 )
2344 self._rw_check()
2345 existing = self.get(name)
2346 if existing is not None:
2347 if not unlink_if_exists:
2348 raise ValueError(
2349 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2350 f" and exist_ok was False"
2351 )
2352 assert existing.is_file
2354 fs_path = os.path.join(self.fs_path, name)
2355 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2356 with open(fs_path, "wb") as fd:
2357 # Ensure that the fs_path exists and default mode is reasonable
2358 os.chmod(fd.fileno(), mode)
2359 child = OSFSControlPath(
2360 name,
2361 fs_path,
2362 self,
2363 )
2364 yield child
2365 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2366 child.mode = mode
2368 @contextlib.contextmanager
2369 def replace_fs_path_content(
2370 self,
2371 *,
2372 use_fs_path_mode: bool = False,
2373 ) -> Iterator[str]:
2374 if not self.is_file:
2375 raise TypeError(
2376 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2377 )
2378 restore_mode = self.mode if use_fs_path_mode else None
2379 yield self.fs_path
2380 _check_fs_path_is_file(self.fs_path, self)
2381 if restore_mode is not None:
2382 self.mode = restore_mode
2384 def add_symlink(self, link_name: str, link_target: str) -> Never:
2385 raise TypeError("The control.tar never contains symlinks.")
2387 def unlink(self, *, recursive: bool = False) -> None:
2388 if self._parent is None:
2389 return
2390 # By virtue of the control FS only containing paths, we can assume `recursive` never
2391 # matters and that `os.unlink` will be sufficient.
2392 assert self.is_file
2393 os.unlink(self.fs_path)
2396class FSControlRootDir(OSFSControlPath):
2398 @classmethod
2399 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2400 return FSControlRootDir(".", fs_path, None)
2402 def insert_file_from_fs_path(
2403 self,
2404 name: str,
2405 fs_path: str,
2406 *,
2407 exist_ok: bool = True,
2408 use_fs_path_mode: bool = False,
2409 mode: int = 0o0644,
2410 # Ignored, but accepted for compat with FSPath's variant of this function.
2411 # - This is used by install_or_generate_conffiles.
2412 reference_path: VirtualPath | None = None, # noqa
2413 ) -> "OSFSControlPath":
2414 if "/" in name or name in {".", ".."}:
2415 raise ValueError(f'Invalid file name: "{name}"')
2416 if not self.is_dir:
2417 raise TypeError(
2418 f"Cannot create {self._orphan_safe_path()}/{name}:"
2419 f" {self._orphan_safe_path()} is not a directory"
2420 )
2421 self._rw_check()
2422 if name in self and not exist_ok:
2423 raise ValueError(
2424 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2425 f" and exist_ok was False"
2426 )
2428 target_path = os.path.join(self.fs_path, name)
2429 if use_fs_path_mode:
2430 shutil.copymode(
2431 fs_path,
2432 target_path,
2433 follow_symlinks=True,
2434 )
2435 else:
2436 shutil.copyfile(
2437 fs_path,
2438 target_path,
2439 follow_symlinks=True,
2440 )
2441 os.chmod(target_path, mode)
2442 return cast("OSFSControlPath", self[name])
2445def as_path_def(pd: str | PathDef) -> PathDef:
2446 return PathDef(pd) if isinstance(pd, str) else pd
2449def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]:
2450 yield from (as_path_def(p) for p in paths)
2453def build_virtual_fs(
2454 paths: Iterable[str | PathDef],
2455 read_write_fs: bool = False,
2456) -> "InMemoryVirtualPathBase":
2457 root_dir: InMemoryVirtualRootDir | None = None
2458 directories: dict[str, InMemoryVirtualPathBase] = {}
2459 non_directories = set()
2461 def _ensure_parent_dirs(p: str) -> None:
2462 current = p.rstrip("/")
2463 missing_dirs = []
2464 while True:
2465 current = os.path.dirname(current)
2466 if current in directories:
2467 break
2468 if current in non_directories: 2468 ↛ 2469line 2468 didn't jump to line 2469 because the condition on line 2468 was never true
2469 raise ValueError(
2470 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2471 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2472 )
2473 missing_dirs.append(current)
2474 for dir_path in reversed(missing_dirs):
2475 parent_dir = directories[os.path.dirname(dir_path)]
2476 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2477 directories[dir_path] = d
2479 for path_def in as_path_defs(paths):
2480 path = path_def.path_name
2481 if path in directories or path in non_directories: 2481 ↛ 2482line 2481 didn't jump to line 2482 because the condition on line 2481 was never true
2482 raise ValueError(
2483 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2484 ' "correct order" (ensure directories occur before their children)'
2485 )
2486 if root_dir is None:
2487 root_fs_path = None
2488 if path in (".", "./", "/"):
2489 root_fs_path = path_def.fs_path
2490 root_dir = InMemoryVirtualRootDir(fs_path=root_fs_path)
2491 directories["."] = root_dir
2493 if path not in (".", "./", "/") and not path.startswith("./"):
2494 path = "./" + path
2495 if path not in (".", "./", "/"):
2496 _ensure_parent_dirs(path)
2497 if path in (".", "./"):
2498 assert "." in directories
2499 continue
2500 is_dir = False
2501 if path.endswith("/"):
2502 path = path[:-1]
2503 is_dir = True
2504 directory = directories[os.path.dirname(path)]
2505 assert not is_dir or not bool(
2506 path_def.link_target
2507 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2508 fs_path = VirtualTestPath(
2509 os.path.basename(path),
2510 directory,
2511 is_dir=is_dir,
2512 mode=path_def.mode,
2513 mtime=path_def.mtime,
2514 has_fs_path=path_def.has_fs_path,
2515 fs_path=path_def.fs_path,
2516 link_target=path_def.link_target,
2517 content=path_def.content,
2518 materialized_content=path_def.materialized_content,
2519 )
2520 assert not fs_path.is_detached
2521 if fs_path.is_dir:
2522 directories[fs_path.path] = fs_path
2523 else:
2524 non_directories.add(fs_path.path)
2526 if root_dir is None:
2527 root_dir = InMemoryVirtualRootDir()
2529 root_dir.is_read_write = read_write_fs
2530 return root_dir