Coverage for src/debputy/filesystem_scan.py: 67%
1302 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
1import atexit
2import contextlib
3import dataclasses
4import errno
5import io
6import operator
7import os
8import shutil
9import stat
10import subprocess
11import tempfile
12import time
13import typing
14from abc import ABC
15from contextlib import suppress
16from typing import (
17 Optional,
18 cast,
19 Any,
20 ContextManager,
21 TextIO,
22 BinaryIO,
23 Generic,
24 TypeVar,
25 overload,
26 Literal,
27 Never,
28)
29from collections.abc import Iterable, Iterator, Mapping, Callable
30from weakref import ref, ReferenceType
32from debputy.exceptions import (
33 PureVirtualPathError,
34 DebputyFSIsROError,
35 DebputyMetadataAccessError,
36 TestPathWithNonExistentFSPathError,
37 SymlinkLoopError,
38)
39from debputy.intermediate_manifest import PathType
40from debputy.manifest_parser.base_types import (
41 ROOT_DEFINITION,
42 StaticFileSystemOwner,
43 StaticFileSystemGroup,
44)
45from debputy.plugin.api.spec import (
46 VirtualPath,
47 PathDef,
48 PathMetadataReference,
49 PMT,
50)
51from debputy.types import VP
52from debputy.util import (
53 generated_content_dir,
54 _error,
55 escape_shell,
56 assume_not_none,
57 _normalize_path,
58 _debug_log,
59)
61BY_BASENAME = operator.attrgetter("name")
63FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True)
64FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True)
67BinaryOpenMode = Literal[
68 "rb",
69 "r+b",
70 "wb",
71 "w+b",
72 "xb",
73 "ab",
74]
75TextOpenMode = Literal[
76 "r",
77 "r+",
78 "rt",
79 "r+t",
80 "w",
81 "w+",
82 "wt",
83 "w+t",
84 "x",
85 "xt",
86 "a",
87 "at",
88]
89OpenMode = Literal[BinaryOpenMode, TextOpenMode]
92class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]):
93 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin")
95 def __init__(
96 self,
97 owning_plugin: str,
98 current_plugin: str,
99 metadata_type: type[PMT],
100 ) -> None:
101 self._owning_plugin = owning_plugin
102 self._current_plugin = current_plugin
103 self._metadata_type = metadata_type
105 @property
106 def is_present(self) -> bool:
107 return False
109 @property
110 def can_read(self) -> bool:
111 return self._owning_plugin == self._current_plugin
113 @property
114 def can_write(self) -> bool:
115 return False
117 @property
118 def value(self) -> PMT | None:
119 if self.can_read: 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true
120 return None
121 raise DebputyMetadataAccessError(
122 f"Cannot read the metadata {self._metadata_type.__name__} owned by"
123 f" {self._owning_plugin} as the metadata has not been made"
124 f" readable to the plugin {self._current_plugin}."
125 )
127 @value.setter
128 def value(self, new_value: PMT) -> None:
129 if self._is_owner:
130 raise DebputyFSIsROError(
131 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only"
132 )
133 raise DebputyMetadataAccessError(
134 f"Cannot set the metadata {self._metadata_type.__name__} owned by"
135 f" {self._owning_plugin} as the metadata has not been made"
136 f" read-write to the plugin {self._current_plugin}."
137 )
139 @property
140 def _is_owner(self) -> bool:
141 return self._owning_plugin == self._current_plugin
144@dataclasses.dataclass(slots=True)
145class PathMetadataValue(Generic[PMT]):
146 owning_plugin: str
147 metadata_type: type[PMT]
148 value: PMT | None = None
150 def can_read_value(self, current_plugin: str) -> bool:
151 return self.owning_plugin == current_plugin
153 def can_write_value(self, current_plugin: str) -> bool:
154 return self.owning_plugin == current_plugin
157class PathMetadataReferenceImplementation(PathMetadataReference[PMT]):
158 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value")
160 def __init__(
161 self,
162 owning_path: "VirtualPathBase",
163 current_plugin: str,
164 path_metadata_value: PathMetadataValue[PMT],
165 ) -> None:
166 self._owning_path = owning_path
167 self._current_plugin = current_plugin
168 self._path_metadata_value = path_metadata_value
170 @property
171 def is_present(self) -> bool:
172 if not self.can_read: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 return False
174 return self._path_metadata_value.value is not None
176 @property
177 def can_read(self) -> bool:
178 return self._path_metadata_value.can_read_value(self._current_plugin)
180 @property
181 def can_write(self) -> bool:
182 if not self._path_metadata_value.can_write_value(self._current_plugin): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 return False
184 owning_path = self._owning_path
185 return owning_path.is_read_write and not owning_path.is_detached
187 @property
188 def value(self) -> PMT | None:
189 if self.can_read: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 return self._path_metadata_value.value
191 raise DebputyMetadataAccessError(
192 f"Cannot read the metadata {self._metadata_type_name} owned by"
193 f" {self._owning_plugin} as the metadata has not been made"
194 f" readable to the plugin {self._current_plugin}."
195 )
197 @value.setter
198 def value(self, new_value: PMT) -> None:
199 if not self.can_write: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 m = "set" if new_value is not None else "delete"
201 raise DebputyMetadataAccessError(
202 f"Cannot {m} the metadata {self._metadata_type_name} owned by"
203 f" {self._owning_plugin} as the metadata has not been made"
204 f" read-write to the plugin {self._current_plugin}."
205 )
206 owning_path = self._owning_path
207 if not owning_path.is_read_write: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise DebputyFSIsROError(
209 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only"
210 )
211 if owning_path.is_detached: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise TypeError(
213 f"Cannot set the metadata {self._metadata_type_name} as the path is detached"
214 )
215 self._path_metadata_value.value = new_value
217 @property
218 def _is_owner(self) -> bool:
219 return self._owning_plugin == self._current_plugin
221 @property
222 def _owning_plugin(self) -> str:
223 return self._path_metadata_value.owning_plugin
225 @property
226 def _metadata_type_name(self) -> str:
227 return self._path_metadata_value.metadata_type.__name__
230def _cp_a(source: str, dest: str) -> None:
231 cmd = ["cp", "-a", source, dest]
232 try:
233 subprocess.check_call(cmd)
234 except subprocess.CalledProcessError:
235 full_command = escape_shell(*cmd)
236 _error(
237 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp"
238 f" above to understand what went wrong. The full command was: {full_command}"
239 )
242def _split_path(path: str) -> tuple[bool, bool, list[str]]:
243 must_be_dir = True if path.endswith("/") else False
244 absolute = False
245 if path.startswith("/"):
246 absolute = True
247 path = "." + path
248 path_parts = path.rstrip("/").split("/")
249 if must_be_dir:
250 path_parts.append(".")
251 return absolute, must_be_dir, path_parts
254def _root(path: VP) -> "VirtualPathBase":
255 current = path
256 while not current.is_root_dir():
257 parent = current.parent_dir
258 assert parent is not None # type hint
259 current = parent
260 assert isinstance(current, VirtualPathBase)
261 return current
264def _check_fs_path_is_file(
265 fs_path: str,
266 unlink_on_error: Optional["VirtualPath"] = None,
267) -> None:
268 had_issue = False
269 try:
270 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect
271 st = os.lstat(fs_path)
272 except FileNotFoundError:
273 had_issue = True
274 else:
275 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 had_issue = True
277 if not had_issue: 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true
278 return
280 if unlink_on_error:
281 with suppress(FileNotFoundError):
282 os.unlink(fs_path)
283 raise TypeError(
284 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard"
285 " linked to another file. The entry has been disconnected."
286 )
289class CurrentPluginContextManager:
290 __slots__ = ("_plugin_names",)
292 def __init__(self, initial_plugin_name: str) -> None:
293 self._plugin_names = [initial_plugin_name]
295 @property
296 def current_plugin_name(self) -> str:
297 return self._plugin_names[-1]
299 @contextlib.contextmanager
300 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]:
301 self._plugin_names.append(new_plugin_name)
302 yield new_plugin_name
303 self._plugin_names.pop()
306class VirtualPathBase(VirtualPath, ABC):
307 __slots__ = ()
309 def stat(self) -> os.stat_result:
310 # TODO: Remove
311 """Attempt to do stat of the underlying path (if it exists)
313 *Avoid* using `stat()` whenever possible where a more specialized attribute exist. The
314 `stat()` call returns the data from the file system and often, `debputy` does *not* track
315 its state in the file system. As an example, if you want to know the file system mode of
316 a path, please use the `mode` attribute instead.
318 This never follow symlinks (it behaves like `os.lstat`). It will raise an error
319 if the path is not backed by a file system object (that is, `has_fs_path` is False).
321 :return: The stat result or an error.
322 """
323 raise NotImplementedError()
325 @property
326 def size(self) -> int:
327 """Resolve the file size (`st_size`)
329 This may be using `stat()` and therefore `fs_path`.
331 :return: The size of the file in bytes
332 """
333 return self.stat().st_size
335 def _orphan_safe_path(self) -> str:
336 return self.path
338 def _rw_check(self) -> None:
339 if not self.is_read_write:
340 raise DebputyFSIsROError(
341 f'Attempt to write to "{self._orphan_safe_path()}" failed:'
342 " Debputy Virtual File system is R/O."
343 )
345 @property
346 def is_detached(self) -> bool:
347 raise NotImplementedError
349 def is_root_dir(self) -> bool:
350 # The root directory is never detachable in the current setup
351 return not self.is_detached and self.parent_dir is None
353 def lookup(self, path: str) -> Optional["VirtualPathBase"]:
354 match, missing = self.attempt_lookup(path)
355 if missing:
356 return None
357 return match
359 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]:
360 if self.is_detached: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 raise ValueError(
362 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached'
363 )
364 absolute, must_be_dir, path_parts = _split_path(path)
365 current = _root(self) if absolute else self
366 path_parts.reverse()
367 link_expansions = set()
368 while path_parts:
369 dir_part = path_parts.pop()
370 if dir_part == ".":
371 continue
372 if dir_part == "..":
373 if current.is_root_dir(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise ValueError(f'The path "{path}" escapes the root dir')
375 p = current.parent_dir
376 assert p is not None # type hint
377 current = cast("VirtualPathBase", p)
378 continue
379 try:
380 current = cast("VirtualPathBase", current[dir_part])
381 except KeyError:
382 path_parts.append(dir_part)
383 path_parts.reverse()
384 if must_be_dir:
385 path_parts.pop()
386 return current, path_parts
387 if current.is_symlink and path_parts:
388 if current.path in link_expansions:
389 # This is our loop detection for now. It might have some false positives where you
390 # could safely resolve the same symlink twice. However, given that this use-case is
391 # basically non-existent in practice for packaging, we just stop here for now.
392 raise SymlinkLoopError(
393 f'The path "{path}" traversed the symlink "{current.path}" multiple'
394 " times. Currently, traversing the same symlink twice is considered"
395 " a loop by `debputy` even if the path would eventually resolve."
396 " Consider filing a feature request if you have a benign case that"
397 " triggers this error."
398 )
399 link_expansions.add(current.path)
400 link_target = current.readlink()
401 link_absolute, _, link_path_parts = _split_path(link_target)
402 if link_absolute:
403 current = _root(current)
404 else:
405 current = cast(
406 "VirtualPathBase", assume_not_none(current.parent_dir)
407 )
408 link_path_parts.reverse()
409 path_parts.extend(link_path_parts)
410 return current, []
412 def mkdirs(self, path: str) -> "VirtualPath":
413 current: VirtualPath
414 current, missing_parts = self.attempt_lookup(
415 f"{path}/" if not path.endswith("/") else path
416 )
417 if not current.is_dir: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 raise ValueError(
419 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be'
420 " a directory. However, that path exist AND is a not directory."
421 )
422 for missing_part in missing_parts:
423 assert missing_part not in (".", "..")
424 current = current.mkdir(missing_part)
425 return current
427 def prune_if_empty_dir(self) -> None:
428 """Remove this and all (now) empty parent directories
430 Same as: `rmdir --ignore-fail-on-non-empty --parents`
432 This operation may cause the path (and any of its parent directories) to become "detached"
433 and therefore unsafe to use in further operations.
434 """
435 self._rw_check()
437 if not self.is_dir: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 raise TypeError(f"{self._orphan_safe_path()} is not a directory")
439 if any(self.iterdir()):
440 return
441 parent_dir = typing.cast("VirtualPathBase", assume_not_none(self.parent_dir))
443 # Recursive does not matter; we already know the directory is empty.
444 self.unlink()
446 # Note: The root dir must never be deleted. This works because when delegating it to the root
447 # directory, its implementation of this method is a no-op. If this is later rewritten to an
448 # inline loop (rather than recursion), be sure to preserve this feature.
449 parent_dir.prune_if_empty_dir()
451 def _current_plugin(self) -> str:
452 if self.is_detached: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 raise TypeError("Cannot resolve the current plugin; path is detached")
454 return cast("FSRootDir", _root(self))._current_plugin()
456 @overload
457 def open_child( 457 ↛ exitline 457 didn't return from function 'open_child' because
458 self,
459 name: str,
460 mode: TextOpenMode = "r",
461 buffering: int = -1,
462 ) -> TextIO: ...
464 @overload
465 def open_child( 465 ↛ exitline 465 didn't return from function 'open_child' because
466 self,
467 name: str,
468 mode: BinaryOpenMode,
469 buffering: int = -1,
470 ) -> BinaryIO: ...
472 @contextlib.contextmanager
473 def open_child(self, name, mode="r", buffering=-1):
474 """Open a child path of the current directory in a given mode. Usually used with a context manager
476 The path is opened according to the `mode` parameter similar to the built-in `open` in Python.
477 The following symbols are accepted with the same meaning as Python's open:
478 * `r`
479 * `w`
480 * `x`
481 * `a`
482 * `+`
483 * `b`
484 * `t`
486 Like Python's `open`, this can create a new file provided the file system is in read-write mode.
487 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created
488 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left
489 as-is.
491 :param name: The name of the child path to open. Must be a basename.
492 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more
493 examples.
494 :param buffering: Same as open(..., buffering=...) where supported. Notably during
495 testing, the content may be purely in memory and use a BytesIO/StringIO
496 (which does not accept that parameter, but then it is buffered in a different way)
497 :return: The file handle.
498 """
499 existing = self.get(name)
500 if "r" in mode: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 if existing is None:
502 raise ValueError(
503 f"Path {self.path}/{name} does not exist and mode had `r`"
504 )
505 if "+" not in mode:
506 # open(byte_io="b" in mode) matches no typed signature overload.
507 if "b" in mode:
508 with existing.open(byte_io=True, buffering=buffering) as fd:
509 yield fd
510 else:
511 with existing.open(byte_io=False, buffering=buffering) as fd:
512 yield fd
514 encoding = None if "b" in mode else "utf-8"
516 if existing: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 if "x" in mode:
518 raise ValueError(
519 f"Path {existing.path} already exists and mode had `x`"
520 )
521 with (
522 existing.replace_fs_path_content() as fs_path,
523 open(fs_path, mode, encoding=encoding) as fd,
524 ):
525 yield fd
526 else:
527 assert "r" not in mode
528 # unlink_if_exists=False as a precaution (the "already exists" should not end up here).
529 with (
530 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path,
531 open(new_path.fs_path, mode, encoding=encoding) as fd,
532 ):
533 yield fd
536class FSPath(VirtualPathBase, ABC):
537 __slots__ = (
538 "_basename",
539 "_parent_dir",
540 "_children",
541 "_path_cache",
542 "_parent_path_cache",
543 "_last_known_parent_path",
544 "_mode",
545 "_owner",
546 "_group",
547 "_mtime",
548 "_stat_cache",
549 "_metadata",
550 "__weakref__",
551 )
553 def __init__(
554 self,
555 basename: str,
556 parent: Optional["FSPath"],
557 children: dict[str, "FSPath"] | None = None,
558 initial_mode: int | None = None,
559 mtime: float | None = None,
560 stat_cache: os.stat_result | None = None,
561 ) -> None:
562 self._basename = basename
563 self._path_cache: str | None = None
564 self._parent_path_cache: str | None = None
565 self._children = children
566 self._last_known_parent_path: str | None = None
567 self._mode = initial_mode
568 self._mtime = mtime
569 self._stat_cache = stat_cache
570 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {}
571 self._owner = ROOT_DEFINITION
572 self._group = ROOT_DEFINITION
574 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls
575 # is_orphaned, which assumes self._parent_dir is an attribute.
576 self._parent_dir: ReferenceType["FSPath"] | None = None
577 if parent is not None:
578 self.parent_dir = parent
580 def __repr__(self) -> str:
581 return (
582 f"{self.__class__.__name__}({self._orphan_safe_path()!r},"
583 f" is_file={self.is_file},"
584 f" is_dir={self.is_dir},"
585 f" is_symlink={self.is_symlink},"
586 f" has_fs_path={self.has_fs_path},"
587 f" children_len={len(self._children) if self._children else 0})"
588 )
590 @property
591 def name(self) -> str:
592 return self._basename
594 @name.setter
595 def name(self, new_name: str) -> None:
596 self._rw_check()
597 if new_name == self._basename: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 return
599 if self.is_detached: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 self._basename = new_name
601 return
602 self._rw_check()
603 parent = self.parent_dir
604 # This little parent_dir dance ensures the parent dir detects the rename properly
605 self.parent_dir = None
606 self._basename = new_name
607 self.parent_dir = parent
609 def iterdir(self) -> Iterable["FSPath"]:
610 if self._children is not None:
611 yield from self._children.values()
613 def all_paths(self) -> Iterable["FSPath"]:
614 yield self
615 if not self.is_dir:
616 return
617 by_basename = BY_BASENAME
618 stack = sorted(self.iterdir(), key=by_basename, reverse=True)
619 while stack:
620 current = stack.pop()
621 yield current
622 if current.is_dir and not current.is_detached:
623 stack.extend(sorted(current.iterdir(), key=by_basename, reverse=True))
625 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]:
626 # FIXME: can this be more "os.walk"-like without making it harder to implement?
627 if not self.is_dir: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 yield self, []
629 return
630 by_basename = BY_BASENAME
631 stack = [self]
632 while stack:
633 current = stack.pop()
634 children = sorted(current.iterdir(), key=by_basename)
635 assert not children or current.is_dir
636 yield current, children
637 # Removing the directory counts as discarding the children.
638 if not current.is_detached: 638 ↛ 632line 638 didn't jump to line 632 because the condition on line 638 was always true
639 stack.extend(reversed(children))
641 def _orphan_safe_path(self) -> str:
642 if not self.is_detached or self._last_known_parent_path is not None: 642 ↛ 644line 642 didn't jump to line 644 because the condition on line 642 was always true
643 return self.path
644 return f"<orphaned>/{self.name}"
646 @property
647 def is_detached(self) -> bool:
648 parent = self._parent_dir
649 if parent is None:
650 return True
651 resolved_parent = parent()
652 if resolved_parent is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true
653 return True
654 return resolved_parent.is_detached
656 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence.
657 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence
658 # behavior to avoid surprises for now.
659 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed
660 # to using it)
661 __iter__ = None
663 def __getitem__(self, key) -> "FSPath":
664 if self._children is None:
665 raise KeyError(
666 f"{key} (note: {self._orphan_safe_path()!r} has no children)"
667 )
668 if isinstance(key, FSPath): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 key = key.name
670 return self._children[key]
672 def __delitem__(self, key) -> None:
673 self._rw_check()
674 children = self._children
675 if children is None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 raise KeyError(key)
677 del children[key]
679 def get(self, key: str) -> "Optional[FSPath]":
680 try:
681 return self[key]
682 except KeyError:
683 return None
685 def __contains__(self, item: object) -> bool:
686 if isinstance(item, VirtualPath): 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 return item.parent_dir is self
688 if not isinstance(item, str): 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true
689 return False
690 m = self.get(item)
691 return m is not None
693 def _add_child(self, child: "FSPath") -> None:
694 self._rw_check()
695 if not self.is_dir: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true
696 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory")
697 if self._children is None:
698 self._children = {}
700 conflict_child = self.get(child.name)
701 if conflict_child is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 conflict_child.unlink(recursive=True)
703 self._children[child.name] = child
705 @property
706 def tar_path(self) -> str:
707 path = self.path
708 if self.is_dir:
709 return path + "/"
710 return path
712 @property
713 def path(self) -> str:
714 parent_path = self.parent_dir_path
715 if (
716 self._parent_path_cache is not None
717 and self._parent_path_cache == parent_path
718 ):
719 return assume_not_none(self._path_cache)
720 if parent_path is None: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 raise ReferenceError(
722 f"The path {self.name} is detached! {self.__class__.__name__}"
723 )
724 self._parent_path_cache = parent_path
725 ret = os.path.join(parent_path, self.name)
726 self._path_cache = ret
727 return ret
729 @property
730 def parent_dir(self) -> Optional["FSPath"]:
731 p_ref = self._parent_dir
732 p = p_ref() if p_ref is not None else None
733 if p is None: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true
734 raise ReferenceError(
735 f"The path {self.name} is detached! {self.__class__.__name__}"
736 )
737 return p
739 @parent_dir.setter
740 def parent_dir(self, new_parent: Optional["FSPath"]) -> None:
741 self._rw_check()
742 if new_parent is not None:
743 if not new_parent.is_dir: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true
744 raise ValueError(
745 f"The parent {new_parent._orphan_safe_path()} must be a directory"
746 )
747 new_parent._rw_check()
748 old_parent = None
749 self._last_known_parent_path = None
750 if not self.is_detached:
751 old_parent = self.parent_dir
752 old_parent_children = assume_not_none(assume_not_none(old_parent)._children)
753 del old_parent_children[self.name]
754 if new_parent is not None:
755 self._parent_dir = ref(new_parent)
756 new_parent._add_child(self)
757 else:
758 if old_parent is not None and not old_parent.is_detached: 758 ↛ 760line 758 didn't jump to line 760 because the condition on line 758 was always true
759 self._last_known_parent_path = old_parent.path
760 self._parent_dir = None
761 self._parent_path_cache = None
763 @property
764 def parent_dir_path(self) -> str | None:
765 if self.is_detached: 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true
766 return self._last_known_parent_path
767 return assume_not_none(self.parent_dir).path
769 def chown(
770 self,
771 owner: StaticFileSystemOwner | None,
772 group: StaticFileSystemGroup | None,
773 ) -> None:
774 """Change the owner/group of this path
776 :param owner: The desired owner definition for this path. If None, then no change of owner is performed.
777 :param group: The desired group definition for this path. If None, then no change of group is performed.
778 """
779 self._rw_check()
781 if owner is not None:
782 self._owner = owner.ownership_definition
783 if group is not None:
784 self._group = group.ownership_definition
786 def stat(self) -> os.stat_result:
787 st = self._stat_cache
788 if st is None:
789 st = self._uncached_stat()
790 self._stat_cache = st
791 return st
793 def _uncached_stat(self) -> os.stat_result:
794 return os.lstat(self.fs_path)
796 @property
797 def mode(self) -> int:
798 current_mode = self._mode
799 if current_mode is None: 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true
800 current_mode = stat.S_IMODE(self.stat().st_mode)
801 self._mode = current_mode
802 return current_mode
804 @mode.setter
805 def mode(self, new_mode: int) -> None:
806 self._rw_check()
807 min_bit = 0o700 if self.is_dir else 0o400
808 if (new_mode & min_bit) != min_bit: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true
809 omode = oct(new_mode)[2:]
810 omin = oct(min_bit)[2:]
811 raise ValueError(
812 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;'
813 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)."
814 " There are no paths that do not need these requirements met and they can cause"
815 " problems during build or on the final system."
816 )
817 self._mode = new_mode
819 def _ensure_min_mode(self) -> None:
820 min_bit = 0o700 if self.is_dir else 0o600
821 if self.has_fs_path and (self.mode & 0o600) != 0o600: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true
822 try:
823 fs_path = self.fs_path
824 except TestPathWithNonExistentFSPathError:
825 pass
826 else:
827 st = os.stat(fs_path)
828 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit
829 _debug_log(
830 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line"
831 )
832 os.chmod(fs_path, new_fs_mode)
833 self.mode |= min_bit
835 def _resolve_initial_mtime(self) -> float:
836 return self.stat().st_mtime
838 @property
839 def mtime(self) -> float:
840 mtime = self._mtime
841 if mtime is None:
842 mtime = self._resolve_initial_mtime()
843 self._mtime = mtime
844 return mtime
846 @mtime.setter
847 def mtime(self, new_mtime: float) -> None:
848 self._rw_check()
849 self._mtime = new_mtime
851 @property
852 def tar_owner_info(self) -> tuple[str, int, str, int]:
853 owner = self._owner
854 group = self._group
855 return (
856 owner.entity_name,
857 owner.entity_id,
858 group.entity_name,
859 group.entity_id,
860 )
862 @property
863 def _can_replace_inline(self) -> bool:
864 return False
866 @contextlib.contextmanager
867 def add_file(
868 self,
869 name: str,
870 *,
871 unlink_if_exists: bool = True,
872 use_fs_path_mode: bool = False,
873 mode: int = 0o0644,
874 mtime: float | None = None,
875 # Special-case parameters that are not exposed in the API
876 fs_basename_matters: bool = False,
877 subdir_key: str | None = None,
878 ) -> Iterator["FSPath"]:
879 if "/" in name or name in {".", ".."}: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true
880 raise ValueError(f'Invalid file name: "{name}"')
881 if not self.is_dir: 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true
882 raise TypeError(
883 f"Cannot create {self._orphan_safe_path()}/{name}:"
884 f" {self._orphan_safe_path()} is not a directory"
885 )
886 self._rw_check()
887 existing = self.get(name)
888 if existing is not None: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 if not unlink_if_exists:
890 raise ValueError(
891 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
892 f" and exist_ok was False"
893 )
894 existing.unlink(recursive=False)
896 if fs_basename_matters and subdir_key is None: 896 ↛ 897line 896 didn't jump to line 897 because the condition on line 896 was never true
897 raise ValueError(
898 "When fs_basename_matters is True, a subdir_key must be provided"
899 )
901 directory = generated_content_dir(subdir_key=subdir_key)
903 if fs_basename_matters: 903 ↛ 904line 903 didn't jump to line 904 because the condition on line 903 was never true
904 fs_path = os.path.join(directory, name)
905 with open(fs_path, "xb") as _:
906 # Ensure that the fs_path exists
907 pass
908 child = FSBackedFilePath(
909 name,
910 self,
911 fs_path,
912 replaceable_inline=True,
913 mtime=mtime,
914 )
915 yield child
916 else:
917 with tempfile.NamedTemporaryFile(
918 dir=directory, suffix=f"__{name}", delete=False
919 ) as fd:
920 fs_path = fd.name
921 child = FSBackedFilePath(
922 name,
923 self,
924 fs_path,
925 replaceable_inline=True,
926 mtime=mtime,
927 )
928 fd.close()
929 yield child
931 if use_fs_path_mode: 931 ↛ 933line 931 didn't jump to line 933 because the condition on line 931 was never true
932 # Ensure the caller can see the current mode
933 os.chmod(fs_path, mode)
934 _check_fs_path_is_file(fs_path, unlink_on_error=child)
935 child._reset_caches()
936 if not use_fs_path_mode: 936 ↛ exitline 936 didn't return from function 'add_file' because the condition on line 936 was always true
937 child.mode = mode
939 def insert_file_from_fs_path(
940 self,
941 name: str,
942 fs_path: str,
943 *,
944 exist_ok: bool = True,
945 use_fs_path_mode: bool = False,
946 mode: int = 0o0644,
947 require_copy_on_write: bool = True,
948 follow_symlinks: bool = True,
949 reference_path: VirtualPath | None = None,
950 ) -> "FSPath":
951 if "/" in name or name in {".", ".."}: 951 ↛ 952line 951 didn't jump to line 952 because the condition on line 951 was never true
952 raise ValueError(f'Invalid file name: "{name}"')
953 if not self.is_dir: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true
954 raise TypeError(
955 f"Cannot create {self._orphan_safe_path()}/{name}:"
956 f" {self._orphan_safe_path()} is not a directory"
957 )
958 self._rw_check()
959 if name in self and not exist_ok: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true
960 raise ValueError(
961 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
962 f" and exist_ok was False"
963 )
964 new_fs_path = fs_path
965 if follow_symlinks:
966 if reference_path is not None: 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true
967 raise ValueError(
968 "The reference_path cannot be used with follow_symlinks"
969 )
970 new_fs_path = os.path.realpath(new_fs_path, strict=True)
972 fmode: int | None = mode
973 if use_fs_path_mode:
974 fmode = None
976 st = None
977 if reference_path is None:
978 st = os.lstat(new_fs_path)
979 if stat.S_ISDIR(st.st_mode): 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true
980 raise ValueError(
981 f'The provided path "{fs_path}" is a directory. However, this'
982 " method does not support directories"
983 )
985 if not stat.S_ISREG(st.st_mode): 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true
986 if follow_symlinks:
987 raise ValueError(
988 f"The resolved fs_path ({new_fs_path}) was not a file."
989 )
990 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.")
991 return FSBackedFilePath(
992 name,
993 self,
994 new_fs_path,
995 initial_mode=fmode,
996 stat_cache=st,
997 replaceable_inline=not require_copy_on_write,
998 reference_path=reference_path,
999 )
1001 def add_symlink(
1002 self,
1003 link_name: str,
1004 link_target: str,
1005 *,
1006 reference_path: VirtualPath | None = None,
1007 ) -> "FSPath":
1008 if "/" in link_name or link_name in {".", ".."}: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 raise ValueError(
1010 f'Invalid file name: "{link_name}" (it must be a valid basename)'
1011 )
1012 if not self.is_dir: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true
1013 raise TypeError(
1014 f"Cannot create {self._orphan_safe_path()}/{link_name}:"
1015 f" {self._orphan_safe_path()} is not a directory"
1016 )
1017 self._rw_check()
1019 existing = self.get(link_name)
1020 if existing: 1020 ↛ 1022line 1020 didn't jump to line 1022 because the condition on line 1020 was never true
1021 # Emulate ln -sf with attempts a non-recursive unlink first.
1022 existing.unlink(recursive=False)
1024 return SymlinkVirtualPath(
1025 link_name,
1026 self,
1027 link_target,
1028 reference_path=reference_path,
1029 )
1031 def mkdir(
1032 self,
1033 name: str,
1034 *,
1035 reference_path: VirtualPath | None = None,
1036 ) -> "FSPath":
1037 if "/" in name or name in {".", ".."}: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true
1038 raise ValueError(
1039 f'Invalid file name: "{name}" (it must be a valid basename)'
1040 )
1041 if not self.is_dir: 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true
1042 raise TypeError(
1043 f"Cannot create {self._orphan_safe_path()}/{name}:"
1044 f" {self._orphan_safe_path()} is not a directory"
1045 )
1046 if reference_path is not None and not reference_path.is_dir: 1046 ↛ 1047line 1046 didn't jump to line 1047 because the condition on line 1046 was never true
1047 raise ValueError(
1048 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!'
1049 )
1050 self._rw_check()
1052 existing = self.get(name)
1053 if existing: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true
1054 raise ValueError(f"Path {existing.path} already exist")
1055 return VirtualDirectoryFSPath(name, self, reference_path=reference_path)
1057 def mkdirs(self, path: str) -> "FSPath":
1058 return cast("FSPath", super().mkdirs(path))
1060 @property
1061 def is_read_write(self) -> bool:
1062 """When true, the file system entry may be mutated
1064 :return: Whether file system mutations are permitted.
1065 """
1066 if self.is_detached:
1067 return True
1068 return assume_not_none(self.parent_dir).is_read_write
1070 def unlink(self, *, recursive: bool = False) -> None:
1071 """Unlink a file or a directory
1073 This operation will detach the path from the file system (causing "is_detached" to return True).
1075 Note that the root directory cannot be deleted.
1077 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them
1078 as well. When False, an error is raised if the path is a non-empty directory
1079 """
1080 if self.is_detached: 1080 ↛ 1081line 1080 didn't jump to line 1081 because the condition on line 1080 was never true
1081 return
1082 if not recursive and any(self.iterdir()): 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true
1083 raise ValueError(
1084 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False'
1085 )
1086 # The .parent_dir setter does a _rw_check() for us.
1087 self.parent_dir = None
1089 def _reset_caches(self) -> None:
1090 self._mtime = None
1091 self._stat_cache = None
1093 def metadata(
1094 self,
1095 metadata_type: type[PMT],
1096 *,
1097 owning_plugin: str | None = None,
1098 ) -> PathMetadataReference[PMT]:
1099 current_plugin = self._current_plugin()
1100 if owning_plugin is None: 1100 ↛ 1102line 1100 didn't jump to line 1102 because the condition on line 1100 was always true
1101 owning_plugin = current_plugin
1102 metadata_key = (owning_plugin, metadata_type)
1103 metadata_value = self._metadata.get(metadata_key)
1104 if metadata_value is None:
1105 if self.is_detached: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true
1106 raise TypeError(
1107 f"Cannot access the metadata {metadata_type.__name__}: The path is detached."
1108 )
1109 if not self.is_read_write:
1110 return AlwaysEmptyReadOnlyMetadataReference(
1111 owning_plugin,
1112 current_plugin,
1113 metadata_type,
1114 )
1115 metadata_value = PathMetadataValue(owning_plugin, metadata_type)
1116 self._metadata[metadata_key] = metadata_value
1117 return PathMetadataReferenceImplementation(
1118 self,
1119 current_plugin,
1120 metadata_value,
1121 )
1123 @contextlib.contextmanager
1124 def replace_fs_path_content(
1125 self,
1126 *,
1127 use_fs_path_mode: bool = False,
1128 ) -> Iterator[str]:
1129 if not self.is_file: 1129 ↛ 1130line 1129 didn't jump to line 1130 because the condition on line 1129 was never true
1130 raise TypeError(
1131 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
1132 )
1133 self._rw_check()
1134 fs_path = self.fs_path
1135 if not self._can_replace_inline: 1135 ↛ 1147line 1135 didn't jump to line 1147 because the condition on line 1135 was always true
1136 fs_path = self.fs_path
1137 directory = generated_content_dir()
1138 with tempfile.NamedTemporaryFile(
1139 dir=directory, suffix=f"__{self.name}", delete=False
1140 ) as new_path_fd:
1141 new_path_fd.close()
1142 _cp_a(fs_path, new_path_fd.name)
1143 fs_path = new_path_fd.name
1144 self._replaced_path(fs_path)
1145 assert self.fs_path == fs_path
1147 current_mtime = self._mtime
1148 if current_mtime is not None:
1149 os.utime(fs_path, (current_mtime, current_mtime))
1151 current_mode = self.mode
1152 yield fs_path
1153 _check_fs_path_is_file(fs_path, unlink_on_error=self)
1154 if not use_fs_path_mode: 1154 ↛ 1156line 1154 didn't jump to line 1156 because the condition on line 1154 was always true
1155 os.chmod(fs_path, current_mode)
1156 self._reset_caches()
1158 def _replaced_path(self, new_fs_path: str) -> None:
1159 raise NotImplementedError
1162class VirtualFSPathBase(FSPath, ABC):
1163 __slots__ = ()
1165 def __init__(
1166 self,
1167 basename: str,
1168 parent: Optional["FSPath"],
1169 children: dict[str, "FSPath"] | None = None,
1170 initial_mode: int | None = None,
1171 mtime: float | None = None,
1172 stat_cache: os.stat_result | None = None,
1173 ) -> None:
1174 super().__init__(
1175 basename,
1176 parent,
1177 children,
1178 initial_mode=initial_mode,
1179 mtime=mtime,
1180 stat_cache=stat_cache,
1181 )
1183 def _resolve_initial_mtime(self) -> float:
1184 return time.time()
1186 @property
1187 def has_fs_path(self) -> bool:
1188 return False
1190 def stat(self) -> os.stat_result:
1191 if not self.has_fs_path:
1192 raise PureVirtualPathError(
1193 "stat() is only applicable to paths backed by the file system. The path"
1194 f" {self._orphan_safe_path()!r} is purely virtual"
1195 )
1196 return super().stat()
1198 @property
1199 def fs_path(self) -> str:
1200 if not self.has_fs_path:
1201 raise PureVirtualPathError(
1202 "fs_path is only applicable to paths backed by the file system. The path"
1203 f" {self._orphan_safe_path()!r} is purely virtual"
1204 )
1205 return self.fs_path
1208class FSRootDir(FSPath):
1209 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context")
1211 def __init__(self, fs_path: str | None = None) -> None:
1212 self._fs_path = fs_path
1213 self._fs_read_write = True
1214 super().__init__(
1215 ".",
1216 None,
1217 children={},
1218 initial_mode=0o755,
1219 )
1220 self._plugin_context = CurrentPluginContextManager("debputy")
1222 @property
1223 def is_detached(self) -> bool:
1224 return False
1226 def _orphan_safe_path(self) -> str:
1227 return self.name
1229 @property
1230 def path(self) -> str:
1231 return self.name
1233 @property
1234 def parent_dir(self) -> Optional["FSPath"]:
1235 return None
1237 @parent_dir.setter
1238 def parent_dir(self, new_parent: FSPath | None) -> None:
1239 if new_parent is not None:
1240 raise ValueError("The root directory cannot become a non-root directory")
1242 @property
1243 def parent_dir_path(self) -> str | None:
1244 return None
1246 @property
1247 def is_dir(self) -> bool:
1248 return True
1250 @property
1251 def is_file(self) -> bool:
1252 return False
1254 @property
1255 def is_symlink(self) -> bool:
1256 return False
1258 def readlink(self) -> str:
1259 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1261 @property
1262 def has_fs_path(self) -> bool:
1263 return self._fs_path is not None
1265 def stat(self) -> os.stat_result:
1266 if not self.has_fs_path:
1267 raise PureVirtualPathError(
1268 "stat() is only applicable to paths backed by the file system. The path"
1269 f" {self._orphan_safe_path()!r} is purely virtual"
1270 )
1271 return os.stat(self.fs_path)
1273 @property
1274 def fs_path(self) -> str:
1275 if not self.has_fs_path: 1275 ↛ 1276line 1275 didn't jump to line 1276 because the condition on line 1275 was never true
1276 raise PureVirtualPathError(
1277 "fs_path is only applicable to paths backed by the file system. The path"
1278 f" {self._orphan_safe_path()!r} is purely virtual"
1279 )
1280 return assume_not_none(self._fs_path)
1282 @property
1283 def is_read_write(self) -> bool:
1284 return self._fs_read_write
1286 @is_read_write.setter
1287 def is_read_write(self, new_value: bool) -> None:
1288 self._fs_read_write = new_value
1290 def prune_if_empty_dir(self) -> None:
1291 # No-op for the root directory. There is never a case where you want to delete this directory
1292 # (and even if you could, debputy will need it for technical reasons, so the root dir stays)
1293 return
1295 def unlink(self, *, recursive: bool = False) -> None:
1296 # There is never a case where you want to delete this directory (and even if you could,
1297 # debputy will need it for technical reasons, so the root dir stays)
1298 raise TypeError("Cannot delete the root directory")
1300 def _current_plugin(self) -> str:
1301 return self._plugin_context.current_plugin_name
1303 @contextlib.contextmanager
1304 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
1305 with self._plugin_context.change_plugin_context(new_plugin) as r:
1306 yield r
1309class VirtualPathWithReference(VirtualFSPathBase, ABC):
1310 __slots__ = ("_reference_path",)
1312 def __init__(
1313 self,
1314 basename: str,
1315 parent: FSPath,
1316 *,
1317 default_mode: int,
1318 reference_path: VirtualPath | None = None,
1319 ) -> None:
1320 super().__init__(
1321 basename,
1322 parent=parent,
1323 initial_mode=reference_path.mode if reference_path else default_mode,
1324 )
1325 self._reference_path = reference_path
1327 @property
1328 def has_fs_path(self) -> bool:
1329 ref_path = self._reference_path
1330 return ref_path is not None and ref_path.has_fs_path
1332 def _resolve_initial_mtime(self) -> float:
1333 ref_path = self._reference_path
1334 if ref_path: 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was always true
1335 return ref_path.mtime
1336 return super()._resolve_initial_mtime()
1338 @property
1339 def fs_path(self) -> str:
1340 ref_path = self._reference_path
1341 if ref_path is not None and ( 1341 ↛ 1345line 1341 didn't jump to line 1345 because the condition on line 1341 was always true
1342 not super().has_fs_path or super().fs_path == ref_path.fs_path
1343 ):
1344 return ref_path.fs_path
1345 return super().fs_path
1347 def stat(self) -> os.stat_result:
1348 ref_path = self._reference_path
1349 if ref_path is not None and (
1350 not super().has_fs_path or super().fs_path == ref_path.fs_path
1351 ):
1352 return typing.cast(VirtualPathBase, ref_path).stat()
1353 return super().stat()
1355 @overload
1356 def open( 1356 ↛ exitline 1356 didn't return from function 'open' because
1357 self,
1358 *,
1359 byte_io: Literal[False] = False,
1360 buffering: int = -1,
1361 ) -> TextIO: ...
1363 @overload
1364 def open( 1364 ↛ exitline 1364 didn't return from function 'open' because
1365 self,
1366 *,
1367 byte_io: Literal[True],
1368 buffering: Literal[0] = ...,
1369 ) -> io.FileIO: ...
1371 @overload
1372 def open( 1372 ↛ exitline 1372 didn't return from function 'open' because
1373 self,
1374 *,
1375 byte_io: Literal[True],
1376 buffering: int = -1,
1377 ) -> io.BufferedReader: ...
1379 def open(self, *, byte_io=False, buffering=-1):
1380 reference_path = self._reference_path
1381 if reference_path is not None and reference_path.fs_path == self.fs_path:
1382 return reference_path.open(byte_io=byte_io, buffering=buffering)
1383 return super().open(byte_io=byte_io, buffering=buffering)
1386class VirtualDirectoryFSPath(VirtualPathWithReference):
1387 __slots__ = ("_reference_path",)
1389 def __init__(
1390 self,
1391 basename: str,
1392 parent: FSPath,
1393 *,
1394 reference_path: VirtualPath | None = None,
1395 ) -> None:
1396 super().__init__(
1397 basename,
1398 parent,
1399 reference_path=reference_path,
1400 default_mode=0o755,
1401 )
1402 self._reference_path = reference_path
1403 assert reference_path is None or reference_path.is_dir
1404 self._ensure_min_mode()
1406 @property
1407 def is_dir(self) -> bool:
1408 return True
1410 @property
1411 def is_file(self) -> bool:
1412 return False
1414 @property
1415 def is_symlink(self) -> bool:
1416 return False
1418 def readlink(self) -> str:
1419 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink')
1422class SymlinkVirtualPath(VirtualPathWithReference):
1423 __slots__ = ("_link_target",)
1425 def __init__(
1426 self,
1427 basename: str,
1428 parent_dir: FSPath,
1429 link_target: str,
1430 *,
1431 reference_path: VirtualPath | None = None,
1432 ) -> None:
1433 super().__init__(
1434 basename,
1435 parent=parent_dir,
1436 default_mode=_SYMLINK_MODE,
1437 reference_path=reference_path,
1438 )
1439 self._link_target = link_target
1441 @property
1442 def is_dir(self) -> bool:
1443 return False
1445 @property
1446 def is_file(self) -> bool:
1447 return False
1449 @property
1450 def is_symlink(self) -> bool:
1451 return True
1453 def readlink(self) -> str:
1454 return self._link_target
1456 @property
1457 def size(self) -> int:
1458 return len(self.readlink())
1461class FSBackedFilePath(VirtualPathWithReference):
1462 __slots__ = ("_fs_path", "_replaceable_inline")
1464 def __init__(
1465 self,
1466 basename: str,
1467 parent_dir: FSPath,
1468 fs_path: str,
1469 *,
1470 replaceable_inline: bool = False,
1471 initial_mode: int | None = None,
1472 mtime: float | None = None,
1473 stat_cache: os.stat_result | None = None,
1474 reference_path: VirtualPath | None = None,
1475 ) -> None:
1476 super().__init__(
1477 basename,
1478 parent_dir,
1479 default_mode=0o644,
1480 reference_path=reference_path,
1481 )
1482 self._fs_path = fs_path
1483 self._replaceable_inline = replaceable_inline
1484 if initial_mode is not None:
1485 self.mode = initial_mode
1486 if mtime is not None:
1487 self._mtime = mtime
1488 self._stat_cache = stat_cache
1489 assert (
1490 not replaceable_inline or "debputy/scratch-dir/" in fs_path
1491 ), f"{fs_path} should not be inline-replaceable -- {self.path}"
1492 self._ensure_min_mode()
1494 @property
1495 def is_dir(self) -> bool:
1496 return False
1498 @property
1499 def is_file(self) -> bool:
1500 return True
1502 @property
1503 def is_symlink(self) -> bool:
1504 return False
1506 def readlink(self) -> str:
1507 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink')
1509 @property
1510 def has_fs_path(self) -> bool:
1511 return True
1513 @property
1514 def fs_path(self) -> str:
1515 return self._fs_path
1517 @property
1518 def _can_replace_inline(self) -> bool:
1519 return self._replaceable_inline
1521 def _replaced_path(self, new_fs_path: str) -> None:
1522 self._fs_path = new_fs_path
1523 self._reference_path = None
1524 self._replaceable_inline = True
1527_SYMLINK_MODE = 0o777
1530class VirtualTestPath(FSPath):
1531 __slots__ = (
1532 "_path_type",
1533 "_has_fs_path",
1534 "_fs_path",
1535 "_link_target",
1536 "_content",
1537 "_materialized_content",
1538 )
1540 def __init__(
1541 self,
1542 basename: str,
1543 parent_dir: FSPath | None,
1544 mode: int | None = None,
1545 mtime: float | None = None,
1546 is_dir: bool = False,
1547 has_fs_path: bool | None = False,
1548 fs_path: str | None = None,
1549 link_target: str | None = None,
1550 content: str | None = None,
1551 materialized_content: str | None = None,
1552 ) -> None:
1553 if is_dir:
1554 self._path_type = PathType.DIRECTORY
1555 elif link_target is not None:
1556 self._path_type = PathType.SYMLINK
1557 if mode is not None and mode != _SYMLINK_MODE: 1557 ↛ 1558line 1557 didn't jump to line 1558 because the condition on line 1557 was never true
1558 raise ValueError(
1559 f'Please do not assign a mode to symlinks. Triggered for "{basename}".'
1560 )
1561 assert mode is None or mode == _SYMLINK_MODE
1562 else:
1563 self._path_type = PathType.FILE
1565 if mode is not None:
1566 initial_mode = mode
1567 else:
1568 initial_mode = 0o755 if is_dir else 0o644
1570 self._link_target = link_target
1571 if has_fs_path is None:
1572 has_fs_path = bool(fs_path)
1573 self._has_fs_path = has_fs_path
1574 self._fs_path = fs_path
1575 self._materialized_content = materialized_content
1576 super().__init__(
1577 basename,
1578 parent=parent_dir,
1579 initial_mode=initial_mode,
1580 mtime=mtime,
1581 )
1582 self._content = content
1584 @property
1585 def is_dir(self) -> bool:
1586 return self._path_type == PathType.DIRECTORY
1588 @property
1589 def is_file(self) -> bool:
1590 return self._path_type == PathType.FILE
1592 @property
1593 def is_symlink(self) -> bool:
1594 return self._path_type == PathType.SYMLINK
1596 def readlink(self) -> str:
1597 if not self.is_symlink: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true
1598 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
1599 link_target = self._link_target
1600 assert link_target is not None
1601 return link_target
1603 def _resolve_initial_mtime(self) -> float:
1604 return time.time()
1606 @property
1607 def has_fs_path(self) -> bool:
1608 return self._has_fs_path
1610 def stat(self) -> os.stat_result:
1611 if self.has_fs_path:
1612 path = self.fs_path
1613 if path is None: 1613 ↛ 1614line 1613 didn't jump to line 1614 because the condition on line 1613 was never true
1614 raise PureVirtualPathError(
1615 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1616 " cannot provide!"
1617 )
1618 try:
1619 return os.stat(path)
1620 except FileNotFoundError as e:
1621 raise PureVirtualPathError(
1622 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path"
1623 " cannot provide! (An fs_path was provided, but it did not exist)"
1624 ) from e
1626 raise PureVirtualPathError(
1627 "stat() is only applicable to paths backed by the file system. The path"
1628 f" {self._orphan_safe_path()!r} is purely virtual"
1629 )
1631 @property
1632 def size(self) -> int:
1633 if self._content is not None:
1634 return len(self._content.encode("utf-8"))
1635 if self.is_symlink:
1636 return len(self.readlink())
1637 if not self.has_fs_path or self.fs_path is None:
1638 return 0
1639 return self.stat().st_size
1641 @property
1642 def fs_path(self) -> str:
1643 if self.has_fs_path:
1644 if self._fs_path is None and self._materialized_content is not None:
1645 with tempfile.NamedTemporaryFile(
1646 mode="w+t",
1647 encoding="utf-8",
1648 suffix=f"__{self.name}",
1649 delete=False,
1650 ) as fd:
1651 filepath = fd.name
1652 fd.write(self._materialized_content)
1653 self._fs_path = filepath
1654 atexit.register(lambda: os.unlink(filepath))
1656 path = self._fs_path
1657 if path is None: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true
1658 raise PureVirtualPathError(
1659 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this "
1660 " mock path cannot provide!"
1661 )
1662 return path
1663 raise PureVirtualPathError(
1664 "fs_path is only applicable to paths backed by the file system. The path"
1665 f" {self._orphan_safe_path()!r} is purely virtual"
1666 )
1668 def replace_fs_path_content(
1669 self,
1670 *,
1671 use_fs_path_mode: bool = False,
1672 ) -> ContextManager[str]:
1673 if self._content is not None: 1673 ↛ 1674line 1673 didn't jump to line 1674 because the condition on line 1673 was never true
1674 raise TypeError(
1675 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was"
1676 " created with `content` but for this method to work, the path should have been"
1677 " created with `materialized_content`"
1678 )
1679 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode)
1681 @overload
1682 def open_child( 1682 ↛ exitline 1682 didn't return from function 'open_child' because
1683 self,
1684 name: str,
1685 mode: TextOpenMode = "r",
1686 buffering: int = -1,
1687 ) -> TextIO: ...
1689 @overload
1690 def open_child( 1690 ↛ exitline 1690 didn't return from function 'open_child' because
1691 self,
1692 name: str,
1693 mode: BinaryOpenMode,
1694 buffering: int = -1,
1695 ) -> BinaryIO: ...
1697 @contextlib.contextmanager
1698 def open_child(self, name, mode="r", buffering=-1):
1699 existing = self.get(name)
1700 if existing or "r" in mode:
1701 with super().open_child(name, mode, buffering=buffering) as fd:
1702 yield fd
1703 return
1704 if "b" in mode:
1705 fd = io.BytesIO(b"")
1706 yield fd
1707 content = fd.getvalue().decode("utf-8")
1708 else:
1709 fd = io.StringIO("")
1710 yield fd
1711 content = fd.getvalue()
1712 VirtualTestPath(
1713 name,
1714 self,
1715 mode=0o644,
1716 content=content,
1717 has_fs_path=True,
1718 )
1720 @overload
1721 def open( 1721 ↛ exitline 1721 didn't return from function 'open' because
1722 self,
1723 *,
1724 byte_io: Literal[False] = False,
1725 buffering: int = -1,
1726 ) -> TextIO: ...
1728 @overload
1729 def open( 1729 ↛ exitline 1729 didn't return from function 'open' because
1730 self,
1731 *,
1732 byte_io: Literal[True],
1733 buffering: Literal[0] = ...,
1734 ) -> io.FileIO: ...
1736 @overload
1737 def open( 1737 ↛ exitline 1737 didn't return from function 'open' because
1738 self,
1739 *,
1740 byte_io: Literal[True],
1741 buffering: int = -1,
1742 ) -> io.BufferedReader: ...
1744 def open(self, *, byte_io=False, buffering=-1):
1745 if self._content is None:
1746 try:
1747 return super().open(byte_io=byte_io, buffering=buffering)
1748 except FileNotFoundError as e:
1749 raise TestPathWithNonExistentFSPathError(
1750 f"The test path {self.path} had an fs_path {self._fs_path}, which does not"
1751 " exist. This exception can only occur in the testsuite. Either have the"
1752 " test provide content for the path (`virtual_path_def(..., content=...) or,"
1753 " if that is too painful in general, have the code accept this error as a "
1754 " test only-case and provide a default."
1755 ) from e
1757 if byte_io:
1758 return io.BytesIO(self._content.encode("utf-8"))
1759 return io.StringIO(self._content)
1761 def _replaced_path(self, new_fs_path: str) -> None:
1762 self._fs_path = new_fs_path
1765class OSFSOverlayBase(VirtualPathBase, Generic[FSP]):
1766 __slots__ = (
1767 "_path",
1768 "_fs_path",
1769 "_parent",
1770 "__weakref__",
1771 )
1773 def __init__(
1774 self,
1775 path: str,
1776 fs_path: str,
1777 parent: FSP | None,
1778 ) -> None:
1779 self._path: str = path
1780 prefix = "/" if fs_path.startswith("/") else ""
1781 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False)
1782 self._parent: ReferenceType[FSP] | None = (
1783 ref(parent) if parent is not None else None
1784 )
1786 @property
1787 def name(self) -> str:
1788 return os.path.basename(self._path)
1790 @property
1791 def path(self) -> str:
1792 return self._path
1794 @property
1795 def parent_dir(self) -> Optional["FSP"]:
1796 parent = self._parent
1797 if parent is None:
1798 return None
1799 resolved = parent()
1800 if resolved is None:
1801 raise RuntimeError("Parent was garbage collected!")
1802 return resolved
1804 @property
1805 def fs_path(self) -> str:
1806 return self._fs_path
1808 def stat(self) -> os.stat_result:
1809 return os.lstat(self.fs_path)
1811 @property
1812 def is_dir(self) -> bool:
1813 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1814 try:
1815 return stat.S_ISDIR(self.stat().st_mode)
1816 except FileNotFoundError:
1817 return False
1819 @property
1820 def is_file(self) -> bool:
1821 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1822 try:
1823 return stat.S_ISREG(self.stat().st_mode)
1824 except FileNotFoundError:
1825 return False
1827 @property
1828 def is_symlink(self) -> bool:
1829 # The root path can have a non-existent fs_path (such as d/tmp not always existing)
1830 try:
1831 return stat.S_ISLNK(self.stat().st_mode)
1832 except FileNotFoundError:
1833 return False
1835 @property
1836 def has_fs_path(self) -> bool:
1837 return True
1839 @overload
1840 def open( 1840 ↛ exitline 1840 didn't return from function 'open' because
1841 self,
1842 *,
1843 byte_io: Literal[False] = False,
1844 buffering: int = -1,
1845 ) -> TextIO: ...
1847 @overload
1848 def open( 1848 ↛ exitline 1848 didn't return from function 'open' because
1849 self,
1850 *,
1851 byte_io: Literal[True],
1852 buffering: Literal[0] = ...,
1853 ) -> io.FileIO: ...
1855 @overload
1856 def open( 1856 ↛ exitline 1856 didn't return from function 'open' because
1857 self,
1858 *,
1859 byte_io: Literal[True],
1860 buffering: int = -1,
1861 ) -> io.BufferedReader: ...
1863 def open(self, *, byte_io=False, buffering=-1):
1864 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this
1865 # case.
1866 if not self.is_file and not self.is_symlink:
1867 raise TypeError(
1868 f"Cannot open {self.path} for reading: It is not a file nor a symlink"
1869 )
1871 if byte_io:
1872 return open(self.fs_path, "rb", buffering=buffering)
1873 return open(self.fs_path, encoding="utf-8", buffering=buffering)
1875 def metadata(
1876 self,
1877 metadata_type: type[PMT],
1878 *,
1879 owning_plugin: str | None = None,
1880 ) -> PathMetadataReference[PMT]:
1881 current_plugin = self._current_plugin()
1882 if owning_plugin is None:
1883 owning_plugin = current_plugin
1884 return AlwaysEmptyReadOnlyMetadataReference(
1885 owning_plugin,
1886 current_plugin,
1887 metadata_type,
1888 )
1890 def all_paths(self) -> Iterable["OSFSControlPath"]:
1891 yield cast("OSFSControlPath", self)
1892 if not self.is_dir:
1893 return
1894 stack = list(self.iterdir())
1895 stack.reverse()
1896 while stack:
1897 current = cast("OSFSControlPath", stack.pop())
1898 yield current
1899 if current.is_dir:
1900 stack.extend(reversed(list(current.iterdir())))
1902 def _resolve_children(
1903 self,
1904 new_child: Callable[[str, str, FSP], FSC],
1905 ) -> Mapping[str, FSC]:
1906 if not self.is_dir:
1907 return {}
1908 dir_path = self.path
1909 dir_fs_path = self.fs_path
1910 children = {}
1911 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename):
1912 child_path = os.path.join(dir_path, name) if dir_path != "." else name
1913 child_fs_path = (
1914 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name
1915 )
1916 children[name] = new_child(
1917 child_path,
1918 child_fs_path,
1919 cast("FSP", self),
1920 )
1921 return children
1924class OSFSROOverlay(OSFSOverlayBase["OSFSROOverlay"]):
1925 __slots__ = (
1926 "_stat_cache",
1927 "_readlink_cache",
1928 "_children",
1929 "_stat_failed_cache",
1930 )
1932 def __init__(
1933 self,
1934 path: str,
1935 fs_path: str,
1936 parent: Optional["OSFSROOverlay"],
1937 ) -> None:
1938 super().__init__(path, fs_path, parent=parent)
1939 self._stat_cache: os.stat_result | None = None
1940 self._readlink_cache: str | None = None
1941 self._stat_failed_cache = False
1942 self._children: Mapping[str, OSFSROOverlay] | None = None
1944 @classmethod
1945 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay":
1946 return OSFSROOverlay(path, fs_path, None)
1948 def iterdir(self) -> Iterable["OSFSROOverlay"]:
1949 if not self.is_dir:
1950 return
1951 if self._children is None:
1952 self._ensure_children_are_resolved()
1953 yield from assume_not_none(self._children).values()
1955 def lookup(self, path: str) -> Optional["OSFSROOverlay"]:
1956 if not self.is_dir:
1957 return None
1958 if self._children is None:
1959 self._ensure_children_are_resolved()
1961 absolute, _, path_parts = _split_path(path)
1962 current = cast("OSFSROOverlay", _root(self)) if absolute else self
1963 for no, dir_part in enumerate(path_parts):
1964 if dir_part == ".":
1965 continue
1966 if dir_part == "..":
1967 if current.is_root_dir():
1968 raise ValueError(f'The path "{path}" escapes the root dir')
1969 p = current.parent_dir
1970 assert p is not None # Type hint
1971 current = cast("OSFSROOverlay", p)
1972 continue
1973 try:
1974 current = cast("OSFSROOverlay", current[dir_part])
1975 except KeyError:
1976 return None
1977 return current
1979 def _ensure_children_are_resolved(self) -> None:
1980 if not self.is_dir or self._children:
1981 return
1982 self._children = self._resolve_children(
1983 lambda n, fsp, p: OSFSROOverlay(n, fsp, p)
1984 )
1986 @property
1987 def is_detached(self) -> bool:
1988 return False
1990 def __getitem__(self, key) -> "VirtualPath":
1991 if not self.is_dir: 1991 ↛ 1993line 1991 didn't jump to line 1993 because the condition on line 1991 was always true
1992 raise KeyError(key)
1993 if self._children is None:
1994 self._ensure_children_are_resolved()
1995 if isinstance(key, FSPath):
1996 key = key.name
1997 return assume_not_none(self._children)[key]
1999 def __delitem__(self, key) -> Never:
2000 self._error_ro_fs()
2002 @property
2003 def is_read_write(self) -> bool:
2004 return False
2006 def _rw_check(self) -> Never:
2007 self._error_ro_fs()
2009 def _error_ro_fs(self) -> Never:
2010 raise DebputyFSIsROError(
2011 f'Attempt to write to "{self.path}" failed:'
2012 " Debputy Virtual File system is R/O."
2013 )
2015 def stat(self) -> os.stat_result:
2016 if self._stat_failed_cache: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true
2017 raise FileNotFoundError(
2018 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path
2019 )
2021 if self._stat_cache is None: 2021 ↛ 2027line 2021 didn't jump to line 2027 because the condition on line 2021 was always true
2022 try:
2023 self._stat_cache = os.lstat(self.fs_path)
2024 except FileNotFoundError:
2025 self._stat_failed_cache = True
2026 raise
2027 return self._stat_cache
2029 @property
2030 def mode(self) -> int:
2031 return stat.S_IMODE(self.stat().st_mode)
2033 @mode.setter
2034 def mode(self, _unused: int) -> Never:
2035 self._error_ro_fs()
2037 @property
2038 def mtime(self) -> float:
2039 return self.stat().st_mtime
2041 @mtime.setter
2042 def mtime(self, new_mtime: float) -> Never:
2043 self._error_ro_fs()
2045 def readlink(self) -> str:
2046 if not self.is_symlink:
2047 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2048 if self._readlink_cache is None:
2049 self._readlink_cache = os.readlink(self.fs_path)
2050 return self._readlink_cache
2052 def chown(
2053 self,
2054 owner: StaticFileSystemOwner | None,
2055 group: StaticFileSystemGroup | None,
2056 ) -> Never:
2057 self._error_ro_fs()
2059 def mkdir(self, name: str) -> Never:
2060 self._error_ro_fs()
2062 def add_file(
2063 self,
2064 name: str,
2065 *,
2066 unlink_if_exists: bool = True,
2067 use_fs_path_mode: bool = False,
2068 mode: int = 0o0644,
2069 mtime: float | None = None,
2070 ) -> Never:
2071 self._error_ro_fs()
2073 def add_symlink(self, link_name: str, link_target: str) -> Never:
2074 self._error_ro_fs()
2076 def unlink(self, *, recursive: bool = False) -> Never:
2077 self._error_ro_fs()
2080class OSFSROOverlayRootDir(OSFSROOverlay):
2081 __slots__ = ("_plugin_context",)
2083 def __init__(self, path: str, fs_path: str) -> None:
2084 super().__init__(path, fs_path, None)
2085 self._plugin_context = CurrentPluginContextManager("debputy")
2087 def _current_plugin(self) -> str:
2088 return self._plugin_context.current_plugin_name
2090 @contextlib.contextmanager
2091 def change_plugin_context(self, new_plugin: str) -> Iterator[str]:
2092 with self._plugin_context.change_plugin_context(new_plugin) as r:
2093 yield r
2096class OSFSControlPath(OSFSOverlayBase["OSFSControlPath"]):
2098 def iterdir(self) -> Iterable["OSFSControlPath"]:
2099 if not self.is_dir:
2100 return
2101 yield from self._resolve_children(
2102 lambda n, fsp, p: OSFSControlPath(n, fsp, p)
2103 ).values()
2105 def lookup(self, path: str) -> Optional["OSFSControlPath"]:
2106 if not self.is_dir:
2107 return None
2109 absolute, _, path_parts = _split_path(path)
2110 current = cast("OSFSControlPath", _root(self)) if absolute else self
2111 for no, dir_part in enumerate(path_parts):
2112 if dir_part == ".":
2113 continue
2114 if dir_part == "..":
2115 if current.is_root_dir():
2116 raise ValueError(f'The path "{path}" escapes the root dir')
2117 p = current.parent_dir
2118 assert p is not None # type hint
2119 current = cast("OSFSControlPath", p)
2120 continue
2121 try:
2122 current = cast("OSFSControlPath", current[dir_part])
2123 except KeyError:
2124 return None
2125 return current
2127 @property
2128 def is_detached(self) -> bool:
2129 try:
2130 self.stat()
2131 except FileNotFoundError:
2132 return True
2133 else:
2134 return False
2136 def __getitem__(self, key) -> "VirtualPath":
2137 if not self.is_dir:
2138 raise KeyError(key)
2139 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p))
2140 if isinstance(key, FSPath):
2141 key = key.name
2142 return children[key]
2144 def __delitem__(self, key) -> None:
2145 self[key].unlink()
2147 @property
2148 def is_read_write(self) -> bool:
2149 return True
2151 @property
2152 def mode(self) -> int:
2153 return stat.S_IMODE(self.stat().st_mode)
2155 @mode.setter
2156 def mode(self, new_mode: int) -> None:
2157 os.chmod(self.fs_path, new_mode)
2159 @property
2160 def mtime(self) -> float:
2161 return self.stat().st_mtime
2163 @mtime.setter
2164 def mtime(self, new_mtime: float) -> None:
2165 os.utime(self.fs_path, (new_mtime, new_mtime))
2167 def readlink(self) -> Never:
2168 if not self.is_symlink:
2169 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})")
2170 assert False
2172 def chown(
2173 self,
2174 owner: StaticFileSystemOwner | None,
2175 group: StaticFileSystemGroup | None,
2176 ) -> None:
2177 raise ValueError(
2178 "No need to chown paths in the control.tar: They are always root:root"
2179 )
2181 def mkdir(self, name: str) -> Never:
2182 raise TypeError("The control.tar never contains subdirectories.")
2184 @contextlib.contextmanager
2185 def add_file(
2186 self,
2187 name: str,
2188 *,
2189 unlink_if_exists: bool = True,
2190 use_fs_path_mode: bool = False,
2191 mode: int = 0o0644,
2192 mtime: float | None = None,
2193 ) -> Iterator["VirtualPath"]:
2194 if "/" in name or name in {".", ".."}:
2195 raise ValueError(f'Invalid file name: "{name}"')
2196 if not self.is_dir:
2197 raise TypeError(
2198 f"Cannot create {self._orphan_safe_path()}/{name}:"
2199 f" {self._orphan_safe_path()} is not a directory"
2200 )
2201 self._rw_check()
2202 existing = self.get(name)
2203 if existing is not None:
2204 if not unlink_if_exists:
2205 raise ValueError(
2206 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2207 f" and exist_ok was False"
2208 )
2209 assert existing.is_file
2211 fs_path = os.path.join(self.fs_path, name)
2212 # This truncates the existing file if any, so we do not have to unlink the previous entry.
2213 with open(fs_path, "wb") as fd:
2214 # Ensure that the fs_path exists and default mode is reasonable
2215 os.chmod(fd.fileno(), mode)
2216 child = OSFSControlPath(
2217 name,
2218 fs_path,
2219 self,
2220 )
2221 yield child
2222 _check_fs_path_is_file(fs_path, unlink_on_error=child)
2223 child.mode = mode
2225 @contextlib.contextmanager
2226 def replace_fs_path_content(
2227 self,
2228 *,
2229 use_fs_path_mode: bool = False,
2230 ) -> Iterator[str]:
2231 if not self.is_file:
2232 raise TypeError(
2233 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file'
2234 )
2235 restore_mode = self.mode if use_fs_path_mode else None
2236 yield self.fs_path
2237 _check_fs_path_is_file(self.fs_path, self)
2238 if restore_mode is not None:
2239 self.mode = restore_mode
2241 def add_symlink(self, link_name: str, link_target: str) -> Never:
2242 raise TypeError("The control.tar never contains symlinks.")
2244 def unlink(self, *, recursive: bool = False) -> None:
2245 if self._parent is None:
2246 return
2247 # By virtue of the control FS only containing paths, we can assume `recursive` never
2248 # matters and that `os.unlink` will be sufficient.
2249 assert self.is_file
2250 os.unlink(self.fs_path)
2253class FSControlRootDir(OSFSControlPath):
2255 @classmethod
2256 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir":
2257 return FSControlRootDir(".", fs_path, None)
2259 def insert_file_from_fs_path(
2260 self,
2261 name: str,
2262 fs_path: str,
2263 *,
2264 exist_ok: bool = True,
2265 use_fs_path_mode: bool = False,
2266 mode: int = 0o0644,
2267 # Ignored, but accepted for compat with FSPath's variant of this function.
2268 # - This is used by install_or_generate_conffiles.
2269 reference_path: VirtualPath | None = None, # noqa
2270 ) -> "OSFSControlPath":
2271 if "/" in name or name in {".", ".."}:
2272 raise ValueError(f'Invalid file name: "{name}"')
2273 if not self.is_dir:
2274 raise TypeError(
2275 f"Cannot create {self._orphan_safe_path()}/{name}:"
2276 f" {self._orphan_safe_path()} is not a directory"
2277 )
2278 self._rw_check()
2279 if name in self and not exist_ok:
2280 raise ValueError(
2281 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"'
2282 f" and exist_ok was False"
2283 )
2285 target_path = os.path.join(self.fs_path, name)
2286 if use_fs_path_mode:
2287 shutil.copymode(
2288 fs_path,
2289 target_path,
2290 follow_symlinks=True,
2291 )
2292 else:
2293 shutil.copyfile(
2294 fs_path,
2295 target_path,
2296 follow_symlinks=True,
2297 )
2298 os.chmod(target_path, mode)
2299 return cast("OSFSControlPath", self[name])
2302def as_path_def(pd: str | PathDef) -> PathDef:
2303 return PathDef(pd) if isinstance(pd, str) else pd
2306def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]:
2307 yield from (as_path_def(p) for p in paths)
2310def build_virtual_fs(
2311 paths: Iterable[str | PathDef],
2312 read_write_fs: bool = False,
2313) -> "FSPath":
2314 root_dir: FSRootDir | None = None
2315 directories: dict[str, FSPath] = {}
2316 non_directories = set()
2318 def _ensure_parent_dirs(p: str) -> None:
2319 current = p.rstrip("/")
2320 missing_dirs = []
2321 while True:
2322 current = os.path.dirname(current)
2323 if current in directories:
2324 break
2325 if current in non_directories: 2325 ↛ 2326line 2325 didn't jump to line 2326 because the condition on line 2325 was never true
2326 raise ValueError(
2327 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,'
2328 ' but it is defined as a non-directory. (Ensure dirs end with "/")'
2329 )
2330 missing_dirs.append(current)
2331 for dir_path in reversed(missing_dirs):
2332 parent_dir = directories[os.path.dirname(dir_path)]
2333 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True)
2334 directories[dir_path] = d
2336 for path_def in as_path_defs(paths):
2337 path = path_def.path_name
2338 if path in directories or path in non_directories: 2338 ↛ 2339line 2338 didn't jump to line 2339 because the condition on line 2338 was never true
2339 raise ValueError(
2340 f'Duplicate definition of "{path}". Can be false positive if input is not in'
2341 ' "correct order" (ensure directories occur before their children)'
2342 )
2343 if root_dir is None:
2344 root_fs_path = None
2345 if path in (".", "./", "/"):
2346 root_fs_path = path_def.fs_path
2347 root_dir = FSRootDir(fs_path=root_fs_path)
2348 directories["."] = root_dir
2350 if path not in (".", "./", "/") and not path.startswith("./"):
2351 path = "./" + path
2352 if path not in (".", "./", "/"):
2353 _ensure_parent_dirs(path)
2354 if path in (".", "./"):
2355 assert "." in directories
2356 continue
2357 is_dir = False
2358 if path.endswith("/"):
2359 path = path[:-1]
2360 is_dir = True
2361 directory = directories[os.path.dirname(path)]
2362 assert not is_dir or not bool(
2363 path_def.link_target
2364 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}"
2365 fs_path = VirtualTestPath(
2366 os.path.basename(path),
2367 directory,
2368 is_dir=is_dir,
2369 mode=path_def.mode,
2370 mtime=path_def.mtime,
2371 has_fs_path=path_def.has_fs_path,
2372 fs_path=path_def.fs_path,
2373 link_target=path_def.link_target,
2374 content=path_def.content,
2375 materialized_content=path_def.materialized_content,
2376 )
2377 assert not fs_path.is_detached
2378 if fs_path.is_dir:
2379 directories[fs_path.path] = fs_path
2380 else:
2381 non_directories.add(fs_path.path)
2383 if root_dir is None:
2384 root_dir = FSRootDir()
2386 root_dir.is_read_write = read_write_fs
2387 return root_dir