Coverage for src/debputy/filesystem_scan.py: 64%

1363 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-28 21:56 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import shutil 

9import stat 

10import subprocess 

11import tempfile 

12import time 

13import typing 

14from abc import ABC 

15from contextlib import suppress 

16from typing import ( 

17 Optional, 

18 cast, 

19 Any, 

20 ContextManager, 

21 TextIO, 

22 BinaryIO, 

23 Generic, 

24 TypeVar, 

25 overload, 

26 Literal, 

27 Never, 

28) 

29from collections.abc import Iterable, Iterator, Mapping, Callable 

30from weakref import ref, ReferenceType 

31 

32from debputy.exceptions import ( 

33 PureVirtualPathError, 

34 DebputyFSIsROError, 

35 DebputyMetadataAccessError, 

36 TestPathWithNonExistentFSPathError, 

37 SymlinkLoopError, 

38) 

39from debputy.intermediate_manifest import PathType 

40from debputy.manifest_parser.base_types import ( 

41 ROOT_DEFINITION, 

42 StaticFileSystemOwner, 

43 StaticFileSystemGroup, 

44) 

45from debputy.plugin.api.spec import ( 

46 VirtualPath, 

47 PathDef, 

48 PathMetadataReference, 

49 PMT, 

50) 

51from debputy.types import VP 

52from debputy.util import ( 

53 generated_content_dir, 

54 _error, 

55 escape_shell, 

56 assume_not_none, 

57 _normalize_path, 

58 _debug_log, 

59) 

60 

61BY_BASENAME = operator.attrgetter("name") 

62 

63FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

64FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

65 

66 

67BinaryOpenMode = Literal[ 

68 "rb", 

69 "r+b", 

70 "wb", 

71 "w+b", 

72 "xb", 

73 "ab", 

74] 

75TextOpenMode = Literal[ 

76 "r", 

77 "r+", 

78 "rt", 

79 "r+t", 

80 "w", 

81 "w+", 

82 "wt", 

83 "w+t", 

84 "x", 

85 "xt", 

86 "a", 

87 "at", 

88] 

89OpenMode = Literal[BinaryOpenMode, TextOpenMode] 

90 

91 

92class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

93 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

94 

95 def __init__( 

96 self, 

97 owning_plugin: str, 

98 current_plugin: str, 

99 metadata_type: type[PMT], 

100 ) -> None: 

101 self._owning_plugin = owning_plugin 

102 self._current_plugin = current_plugin 

103 self._metadata_type = metadata_type 

104 

105 @property 

106 def is_present(self) -> bool: 

107 return False 

108 

109 @property 

110 def can_read(self) -> bool: 

111 return self._owning_plugin == self._current_plugin 

112 

113 @property 

114 def can_write(self) -> bool: 

115 return False 

116 

117 @property 

118 def value(self) -> PMT | None: 

119 if self.can_read: 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true

120 return None 

121 raise DebputyMetadataAccessError( 

122 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

123 f" {self._owning_plugin} as the metadata has not been made" 

124 f" readable to the plugin {self._current_plugin}." 

125 ) 

126 

127 @value.setter 

128 def value(self, new_value: PMT) -> None: 

129 if self._is_owner: 

130 raise DebputyFSIsROError( 

131 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

132 ) 

133 raise DebputyMetadataAccessError( 

134 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

135 f" {self._owning_plugin} as the metadata has not been made" 

136 f" read-write to the plugin {self._current_plugin}." 

137 ) 

138 

139 @property 

140 def _is_owner(self) -> bool: 

141 return self._owning_plugin == self._current_plugin 

142 

143 

144@dataclasses.dataclass(slots=True) 

145class PathMetadataValue(Generic[PMT]): 

146 owning_plugin: str 

147 metadata_type: type[PMT] 

148 value: PMT | None = None 

149 

150 def can_read_value(self, current_plugin: str) -> bool: 

151 return self.owning_plugin == current_plugin 

152 

153 def can_write_value(self, current_plugin: str) -> bool: 

154 return self.owning_plugin == current_plugin 

155 

156 

157class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

158 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

159 

160 def __init__( 

161 self, 

162 owning_path: "VirtualPathBase", 

163 current_plugin: str, 

164 path_metadata_value: PathMetadataValue[PMT], 

165 ) -> None: 

166 self._owning_path = owning_path 

167 self._current_plugin = current_plugin 

168 self._path_metadata_value = path_metadata_value 

169 

170 @property 

171 def is_present(self) -> bool: 

172 if not self.can_read: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 return False 

174 return self._path_metadata_value.value is not None 

175 

176 @property 

177 def can_read(self) -> bool: 

178 return self._path_metadata_value.can_read_value(self._current_plugin) 

179 

180 @property 

181 def can_write(self) -> bool: 

182 if not self._path_metadata_value.can_write_value(self._current_plugin): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 return False 

184 owning_path = self._owning_path 

185 return owning_path.is_read_write and not owning_path.is_detached 

186 

187 @property 

188 def value(self) -> PMT | None: 

189 if self.can_read: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 return self._path_metadata_value.value 

191 raise DebputyMetadataAccessError( 

192 f"Cannot read the metadata {self._metadata_type_name} owned by" 

193 f" {self._owning_plugin} as the metadata has not been made" 

194 f" readable to the plugin {self._current_plugin}." 

195 ) 

196 

197 @value.setter 

198 def value(self, new_value: PMT) -> None: 

199 if not self.can_write: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 m = "set" if new_value is not None else "delete" 

201 raise DebputyMetadataAccessError( 

202 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

203 f" {self._owning_plugin} as the metadata has not been made" 

204 f" read-write to the plugin {self._current_plugin}." 

205 ) 

206 owning_path = self._owning_path 

207 if not owning_path.is_read_write: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise DebputyFSIsROError( 

209 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

210 ) 

211 if owning_path.is_detached: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise TypeError( 

213 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

214 ) 

215 self._path_metadata_value.value = new_value 

216 

217 @property 

218 def _is_owner(self) -> bool: 

219 return self._owning_plugin == self._current_plugin 

220 

221 @property 

222 def _owning_plugin(self) -> str: 

223 return self._path_metadata_value.owning_plugin 

224 

225 @property 

226 def _metadata_type_name(self) -> str: 

227 return self._path_metadata_value.metadata_type.__name__ 

228 

229 

230def _cp_a(source: str, dest: str) -> None: 

231 cmd = ["cp", "-a", source, dest] 

232 try: 

233 subprocess.check_call(cmd) 

234 except subprocess.CalledProcessError: 

235 full_command = escape_shell(*cmd) 

236 _error( 

237 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

238 f" above to understand what went wrong. The full command was: {full_command}" 

239 ) 

240 

241 

242def _split_path(path: str) -> tuple[bool, bool, list[str]]: 

243 must_be_dir = True if path.endswith("/") else False 

244 absolute = False 

245 if path.startswith("/"): 

246 absolute = True 

247 path = "." + path 

248 path_parts = path.rstrip("/").split("/") 

249 if must_be_dir: 

250 path_parts.append(".") 

251 return absolute, must_be_dir, path_parts 

252 

253 

254def _root(path: VP) -> "VirtualPathBase": 

255 current = path 

256 while not current.is_root_dir(): 

257 parent = current.parent_dir 

258 assert parent is not None # type hint 

259 current = parent 

260 assert isinstance(current, VirtualPathBase) 

261 return current 

262 

263 

264def _check_fs_path_is_file( 

265 fs_path: str, 

266 unlink_on_error: Optional["VirtualPath"] = None, 

267) -> None: 

268 had_issue = False 

269 try: 

270 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

271 st = os.lstat(fs_path) 

272 except FileNotFoundError: 

273 had_issue = True 

274 else: 

275 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 had_issue = True 

277 if not had_issue: 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true

278 return 

279 

280 if unlink_on_error: 

281 with suppress(FileNotFoundError): 

282 os.unlink(fs_path) 

283 raise TypeError( 

284 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

285 " linked to another file. The entry has been disconnected." 

286 ) 

287 

288 

289class CurrentPluginContextManager: 

290 __slots__ = ("_plugin_names",) 

291 

292 def __init__(self, initial_plugin_name: str) -> None: 

293 self._plugin_names = [initial_plugin_name] 

294 

295 @property 

296 def current_plugin_name(self) -> str: 

297 return self._plugin_names[-1] 

298 

299 @contextlib.contextmanager 

300 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

301 self._plugin_names.append(new_plugin_name) 

302 yield new_plugin_name 

303 self._plugin_names.pop() 

304 

305 

306class VirtualPathBase(VirtualPath, ABC): 

307 __slots__ = () 

308 

309 def stat(self) -> os.stat_result: 

310 # TODO: Remove 

311 """Attempt to do stat of the underlying path (if it exists) 

312 

313 *Avoid* using `stat()` whenever possible where a more specialized attribute exist. The 

314 `stat()` call returns the data from the file system and often, `debputy` does *not* track 

315 its state in the file system. As an example, if you want to know the file system mode of 

316 a path, please use the `mode` attribute instead. 

317 

318 This never follow symlinks (it behaves like `os.lstat`). It will raise an error 

319 if the path is not backed by a file system object (that is, `has_fs_path` is False). 

320 

321 :return: The stat result or an error. 

322 """ 

323 raise NotImplementedError() 

324 

325 @property 

326 def size(self) -> int: 

327 """Resolve the file size (`st_size`) 

328 

329 This may be using `stat()` and therefore `fs_path`. 

330 

331 :return: The size of the file in bytes 

332 """ 

333 return self.stat().st_size 

334 

335 def _orphan_safe_path(self) -> str: 

336 return self.path 

337 

338 def _rw_check(self) -> None: 

339 if not self.is_read_write: 

340 raise DebputyFSIsROError( 

341 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

342 " Debputy Virtual File system is R/O." 

343 ) 

344 

345 @property 

346 def is_detached(self) -> bool: 

347 raise NotImplementedError 

348 

349 def is_root_dir(self) -> bool: 

350 # The root directory is never detachable in the current setup 

351 return not self.is_detached and self.parent_dir is None 

352 

353 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

354 match, missing = self.attempt_lookup(path) 

355 if missing: 

356 return None 

357 return match 

358 

359 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]: 

360 if self.is_detached: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 raise ValueError( 

362 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

363 ) 

364 absolute, must_be_dir, path_parts = _split_path(path) 

365 current = _root(self) if absolute else self 

366 path_parts.reverse() 

367 link_expansions = set() 

368 while path_parts: 

369 dir_part = path_parts.pop() 

370 if dir_part == ".": 

371 continue 

372 if dir_part == "..": 

373 if current.is_root_dir(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 raise ValueError(f'The path "{path}" escapes the root dir') 

375 p = current.parent_dir 

376 assert p is not None # type hint 

377 current = cast("VirtualPathBase", p) 

378 continue 

379 try: 

380 current = cast("VirtualPathBase", current[dir_part]) 

381 except KeyError: 

382 path_parts.append(dir_part) 

383 path_parts.reverse() 

384 if must_be_dir: 

385 path_parts.pop() 

386 return current, path_parts 

387 if current.is_symlink and path_parts: 

388 if current.path in link_expansions: 

389 # This is our loop detection for now. It might have some false positives where you 

390 # could safely resolve the same symlink twice. However, given that this use-case is 

391 # basically non-existent in practice for packaging, we just stop here for now. 

392 raise SymlinkLoopError( 

393 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

394 " times. Currently, traversing the same symlink twice is considered" 

395 " a loop by `debputy` even if the path would eventually resolve." 

396 " Consider filing a feature request if you have a benign case that" 

397 " triggers this error." 

398 ) 

399 link_expansions.add(current.path) 

400 link_target = current.readlink() 

401 link_absolute, _, link_path_parts = _split_path(link_target) 

402 if link_absolute: 

403 current = _root(current) 

404 else: 

405 current = cast( 

406 "VirtualPathBase", assume_not_none(current.parent_dir) 

407 ) 

408 link_path_parts.reverse() 

409 path_parts.extend(link_path_parts) 

410 return current, [] 

411 

412 def mkdirs(self, path: str) -> "VirtualPath": 

413 current: VirtualPath 

414 current, missing_parts = self.attempt_lookup( 

415 f"{path}/" if not path.endswith("/") else path 

416 ) 

417 if not current.is_dir: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 raise ValueError( 

419 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

420 " a directory. However, that path exist AND is a not directory." 

421 ) 

422 for missing_part in missing_parts: 

423 assert missing_part not in (".", "..") 

424 current = current.mkdir(missing_part) 

425 return current 

426 

427 def prune_if_empty_dir(self) -> None: 

428 """Remove this and all (now) empty parent directories 

429 

430 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

431 

432 This operation may cause the path (and any of its parent directories) to become "detached" 

433 and therefore unsafe to use in further operations. 

434 """ 

435 self._rw_check() 

436 

437 if not self.is_dir: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

439 # No-op for the root directory. There is never a case where you want to delete this directory 

440 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

441 if any(self.iterdir()) or self.is_root_dir(): 

442 return 

443 parent_dir = self.parent_dir 

444 

445 # Recursive does not matter; we already know the directory is empty. 

446 self.unlink() 

447 

448 if parent_dir: 448 ↛ exitline 448 didn't return from function 'prune_if_empty_dir' because the condition on line 448 was always true

449 parent_dir.prune_if_empty_dir() 

450 

451 def _current_plugin(self) -> str: 

452 if self.is_detached: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 raise TypeError("Cannot resolve the current plugin; path is detached") 

454 return cast("FSRootDir", _root(self))._current_plugin() 

455 

456 @overload 

457 def open_child( 457 ↛ exitline 457 didn't return from function 'open_child' because

458 self, 

459 name: str, 

460 mode: TextOpenMode = "r", 

461 buffering: int = -1, 

462 ) -> TextIO: ... 

463 

464 @overload 

465 def open_child( 465 ↛ exitline 465 didn't return from function 'open_child' because

466 self, 

467 name: str, 

468 mode: BinaryOpenMode, 

469 buffering: int = -1, 

470 ) -> BinaryIO: ... 

471 

472 @contextlib.contextmanager 

473 def open_child(self, name, mode="r", buffering=-1): 

474 """Open a child path of the current directory in a given mode. Usually used with a context manager 

475 

476 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

477 The following symbols are accepted with the same meaning as Python's open: 

478 * `r` 

479 * `w` 

480 * `x` 

481 * `a` 

482 * `+` 

483 * `b` 

484 * `t` 

485 

486 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

487 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

488 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

489 as-is. 

490 

491 :param name: The name of the child path to open. Must be a basename. 

492 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

493 examples. 

494 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

495 testing, the content may be purely in memory and use a BytesIO/StringIO 

496 (which does not accept that parameter, but then it is buffered in a different way) 

497 :return: The file handle. 

498 """ 

499 existing = self.get(name) 

500 if "r" in mode: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 if existing is None: 

502 raise ValueError( 

503 f"Path {self.path}/{name} does not exist and mode had `r`" 

504 ) 

505 if "+" not in mode: 

506 # open(byte_io="b" in mode) matches no typed signature overload. 

507 if "b" in mode: 

508 with existing.open(byte_io=True, buffering=buffering) as fd: 

509 yield fd 

510 else: 

511 with existing.open(byte_io=False, buffering=buffering) as fd: 

512 yield fd 

513 

514 encoding = None if "b" in mode else "utf-8" 

515 

516 if existing: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 if "x" in mode: 

518 raise ValueError( 

519 f"Path {existing.path} already exists and mode had `x`" 

520 ) 

521 with ( 

522 existing.replace_fs_path_content() as fs_path, 

523 open(fs_path, mode, encoding=encoding) as fd, 

524 ): 

525 yield fd 

526 else: 

527 assert "r" not in mode 

528 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

529 with ( 

530 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

531 open(new_path.fs_path, mode, encoding=encoding) as fd, 

532 ): 

533 yield fd 

534 

535 

536class InMemoryVirtualPathBase(VirtualPathBase, ABC): 

537 __slots__ = ( 

538 "_basename", 

539 "_parent_dir", 

540 "_path_cache", 

541 "_parent_path_cache", 

542 "_last_known_parent_path", 

543 "_mode", 

544 "_owner", 

545 "_group", 

546 "_mtime", 

547 "_stat_cache", 

548 "_metadata", 

549 "__weakref__", 

550 ) 

551 

552 def __init__( 

553 self, 

554 basename: str, 

555 parent: Optional["InMemoryVirtualPathBase"], 

556 initial_mode: int | None = None, 

557 mtime: float | None = None, 

558 stat_cache: os.stat_result | None = None, 

559 ) -> None: 

560 self._basename = basename 

561 self._path_cache: str | None = None 

562 self._parent_path_cache: str | None = None 

563 self._last_known_parent_path: str | None = None 

564 self._mode = initial_mode 

565 self._mtime = mtime 

566 self._stat_cache = stat_cache 

567 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {} 

568 # The `_owner` and `_group` is directly access outside the class via `tar_owner_info` 

569 self._owner = ROOT_DEFINITION 

570 self._group = ROOT_DEFINITION 

571 

572 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

573 # is_orphaned, which assumes self._parent_dir is an attribute. 

574 self._parent_dir: ReferenceType["InMemoryVirtualPathBase"] | None = None 

575 if parent is not None: 

576 self.parent_dir = parent 

577 

578 @property 

579 def name(self) -> str: 

580 return self._basename 

581 

582 @name.setter 

583 def name(self, new_name: str) -> None: 

584 self._rw_check() 

585 if new_name == self._basename: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 return 

587 if self.is_detached: 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 self._basename = new_name 

589 return 

590 self._rw_check() 

591 parent = self.parent_dir 

592 # This little parent_dir dance ensures the parent dir detects the rename properly 

593 self.parent_dir = None 

594 self._basename = new_name 

595 self.parent_dir = parent 

596 

597 # Overridden for new return type 

598 def iterdir(self) -> Iterable["InMemoryVirtualPathBase"]: 

599 raise NotImplementedError 

600 

601 def all_paths(self) -> Iterable["InMemoryVirtualPathBase"]: 

602 yield self 

603 if not self.is_dir: 

604 return 

605 by_basename = BY_BASENAME 

606 stack = sorted(self.iterdir(), key=by_basename, reverse=True) 

607 while stack: 

608 current = stack.pop() 

609 yield current 

610 if current.is_dir and not current.is_detached: 

611 stack.extend(sorted(current.iterdir(), key=by_basename, reverse=True)) 

612 

613 def walk( 

614 self, 

615 ) -> Iterable[tuple["InMemoryVirtualPathBase", list["InMemoryVirtualPathBase"]]]: 

616 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

617 if not self.is_dir: 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true

618 yield self, [] 

619 return 

620 by_basename = BY_BASENAME 

621 stack = [self] 

622 while stack: 

623 current = stack.pop() 

624 children = sorted(current.iterdir(), key=by_basename) 

625 assert not children or current.is_dir 

626 yield current, children 

627 # Removing the directory counts as discarding the children. 

628 if not current.is_detached: 628 ↛ 622line 628 didn't jump to line 622 because the condition on line 628 was always true

629 stack.extend(reversed(children)) 

630 

631 def _orphan_safe_path(self) -> str: 

632 if not self.is_detached or self._last_known_parent_path is not None: 632 ↛ 634line 632 didn't jump to line 634 because the condition on line 632 was always true

633 return self.path 

634 return f"<orphaned>/{self.name}" 

635 

636 @property 

637 def is_detached(self) -> bool: 

638 parent = self._parent_dir 

639 if parent is None: 

640 return True 

641 resolved_parent = parent() 

642 if resolved_parent is None: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 return True 

644 return resolved_parent.is_detached 

645 

646 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

647 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

648 # behavior to avoid surprises for now. 

649 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

650 # to using it) 

651 __iter__ = None 

652 

653 # Overridden for new return type 

654 def __getitem__(self, key: object) -> "InMemoryVirtualPathBase": 

655 raise NotImplementedError 

656 

657 # Overridden for new return type 

658 def get(self, key: str) -> "InMemoryVirtualPathBase | None": 

659 return typing.cast("InMemoryVirtualPathBase | None", super().get(key)) 

660 

661 def _add_child(self, child: "InMemoryVirtualPathBase") -> None: 

662 raise TypeError( 

663 f"{self._orphan_safe_path()!r} is not a directory (or did not implement this method)" 

664 ) 

665 

666 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None: 

667 raise TypeError( 

668 f"{self._orphan_safe_path()!r} is not a directory (or did not implement this method)" 

669 ) 

670 

671 @property 

672 def path(self) -> str: 

673 parent_path = self.parent_dir_path 

674 if ( 

675 self._parent_path_cache is not None 

676 and self._parent_path_cache == parent_path 

677 ): 

678 return assume_not_none(self._path_cache) 

679 if parent_path is None: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true

680 raise ReferenceError( 

681 f"The path {self.name} is detached! {self.__class__.__name__}" 

682 ) 

683 self._parent_path_cache = parent_path 

684 ret = os.path.join(parent_path, self.name) 

685 self._path_cache = ret 

686 return ret 

687 

688 @property 

689 def parent_dir(self) -> Optional["InMemoryVirtualPathBase"]: 

690 p_ref = self._parent_dir 

691 p = p_ref() if p_ref is not None else None 

692 if p is None: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true

693 raise ReferenceError( 

694 f"The path {self.name} is detached! {self.__class__.__name__}" 

695 ) 

696 return p 

697 

698 @parent_dir.setter 

699 def parent_dir(self, new_parent: Optional["InMemoryVirtualPathBase"]) -> None: 

700 self._rw_check() 

701 if new_parent is not None: 

702 if not new_parent.is_dir: 702 ↛ 703line 702 didn't jump to line 703 because the condition on line 702 was never true

703 raise ValueError( 

704 f"The parent {new_parent._orphan_safe_path()!r} must be a directory" 

705 ) 

706 new_parent._rw_check() 

707 old_parent = None 

708 self._last_known_parent_path = None 

709 if not self.is_detached: 

710 old_parent = self.parent_dir 

711 assume_not_none(old_parent)._remove_child(self) 

712 if new_parent is not None: 

713 self._parent_dir = ref(new_parent) 

714 new_parent._add_child(self) 

715 else: 

716 if old_parent is not None and not old_parent.is_detached: 716 ↛ 718line 716 didn't jump to line 718 because the condition on line 716 was always true

717 self._last_known_parent_path = old_parent.path 

718 self._parent_dir = None 

719 self._parent_path_cache = None 

720 

721 @property 

722 def parent_dir_path(self) -> str | None: 

723 if self.is_detached: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 return self._last_known_parent_path 

725 return assume_not_none(self.parent_dir).path 

726 

727 def chown( 

728 self, 

729 owner: StaticFileSystemOwner | None, 

730 group: StaticFileSystemGroup | None, 

731 ) -> None: 

732 """Change the owner/group of this path 

733 

734 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

735 :param group: The desired group definition for this path. If None, then no change of group is performed. 

736 """ 

737 self._rw_check() 

738 

739 if owner is not None: 

740 self._owner = owner.ownership_definition 

741 if group is not None: 

742 self._group = group.ownership_definition 

743 

744 def stat(self) -> os.stat_result: 

745 st = self._stat_cache 

746 if st is None: 

747 st = self._uncached_stat() 

748 self._stat_cache = st 

749 return st 

750 

751 def _uncached_stat(self) -> os.stat_result: 

752 raise NotImplementedError 

753 

754 @property 

755 def mode(self) -> int: 

756 current_mode = self._mode 

757 if current_mode is None: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 current_mode = stat.S_IMODE(self.stat().st_mode) 

759 self._mode = current_mode 

760 return current_mode 

761 

762 @mode.setter 

763 def mode(self, new_mode: int) -> None: 

764 self._rw_check() 

765 min_bit = 0o700 if self.is_dir else 0o400 

766 if (new_mode & min_bit) != min_bit: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 omode = oct(new_mode)[2:] 

768 omin = oct(min_bit)[2:] 

769 raise ValueError( 

770 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

771 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

772 " There are no paths that do not need these requirements met and they can cause" 

773 " problems during build or on the final system." 

774 ) 

775 self._mode = new_mode 

776 

777 def _ensure_min_mode(self) -> None: 

778 min_bit = 0o700 if self.is_dir else 0o600 

779 if self.has_fs_path and (self.mode & 0o600) != 0o600: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true

780 try: 

781 fs_path = self.fs_path 

782 except TestPathWithNonExistentFSPathError: 

783 pass 

784 else: 

785 st = os.stat(fs_path) 

786 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

787 _debug_log( 

788 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

789 ) 

790 os.chmod(fs_path, new_fs_mode) 

791 self.mode |= min_bit 

792 

793 def _resolve_initial_mtime(self) -> float: 

794 raise NotImplementedError 

795 

796 @property 

797 def mtime(self) -> float: 

798 mtime = self._mtime 

799 if mtime is None: 

800 mtime = self._resolve_initial_mtime() 

801 self._mtime = mtime 

802 return mtime 

803 

804 @mtime.setter 

805 def mtime(self, new_mtime: float) -> None: 

806 self._rw_check() 

807 self._mtime = new_mtime 

808 

809 @property 

810 def _can_replace_inline(self) -> bool: 

811 return False 

812 

813 @contextlib.contextmanager 

814 def add_file( 

815 self, 

816 name: str, 

817 *, 

818 unlink_if_exists: bool = True, 

819 use_fs_path_mode: bool = False, 

820 mode: int = 0o0644, 

821 mtime: float | None = None, 

822 # Special-case parameters that are not exposed in the API 

823 fs_basename_matters: bool = False, 

824 subdir_key: str | None = None, 

825 ) -> Iterator["InMemoryVirtualPathBase"]: 

826 if "/" in name or name in {".", ".."}: 826 ↛ 827line 826 didn't jump to line 827 because the condition on line 826 was never true

827 raise ValueError(f'Invalid file name: "{name}"') 

828 if not self.is_dir: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 raise TypeError( 

830 f"Cannot create {self._orphan_safe_path()}/{name}:" 

831 f" {self._orphan_safe_path()} is not a directory" 

832 ) 

833 self._rw_check() 

834 existing = self.get(name) 

835 if existing is not None: 835 ↛ 836line 835 didn't jump to line 836 because the condition on line 835 was never true

836 if not unlink_if_exists: 

837 raise ValueError( 

838 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

839 f" and exist_ok was False" 

840 ) 

841 existing.unlink(recursive=False) 

842 

843 if fs_basename_matters and subdir_key is None: 843 ↛ 844line 843 didn't jump to line 844 because the condition on line 843 was never true

844 raise ValueError( 

845 "When fs_basename_matters is True, a subdir_key must be provided" 

846 ) 

847 

848 directory = generated_content_dir(subdir_key=subdir_key) 

849 

850 if fs_basename_matters: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true

851 fs_path = os.path.join(directory, name) 

852 with open(fs_path, "xb") as _: 

853 # Ensure that the fs_path exists 

854 pass 

855 child = FSBackedFilePath( 

856 name, 

857 self, 

858 fs_path, 

859 replaceable_inline=True, 

860 mtime=mtime, 

861 ) 

862 yield child 

863 else: 

864 with tempfile.NamedTemporaryFile( 

865 dir=directory, suffix=f"__{name}", delete=False 

866 ) as fd: 

867 fs_path = fd.name 

868 child = FSBackedFilePath( 

869 name, 

870 self, 

871 fs_path, 

872 replaceable_inline=True, 

873 mtime=mtime, 

874 ) 

875 fd.close() 

876 yield child 

877 

878 if use_fs_path_mode: 878 ↛ 880line 878 didn't jump to line 880 because the condition on line 878 was never true

879 # Ensure the caller can see the current mode 

880 os.chmod(fs_path, mode) 

881 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

882 child._reset_caches() 

883 if not use_fs_path_mode: 883 ↛ exitline 883 didn't return from function 'add_file' because the condition on line 883 was always true

884 child.mode = mode 

885 

886 def insert_file_from_fs_path( 

887 self, 

888 name: str, 

889 fs_path: str, 

890 *, 

891 exist_ok: bool = True, 

892 use_fs_path_mode: bool = False, 

893 mode: int = 0o0644, 

894 require_copy_on_write: bool = True, 

895 follow_symlinks: bool = True, 

896 reference_path: VirtualPath | None = None, 

897 ) -> "InMemoryVirtualPathBase": 

898 if "/" in name or name in {".", ".."}: 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true

899 raise ValueError(f'Invalid file name: "{name}"') 

900 if not self.is_dir: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true

901 raise TypeError( 

902 f"Cannot create {self._orphan_safe_path()}/{name}:" 

903 f" {self._orphan_safe_path()} is not a directory" 

904 ) 

905 self._rw_check() 

906 if name in self and not exist_ok: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true

907 raise ValueError( 

908 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

909 f" and exist_ok was False" 

910 ) 

911 new_fs_path = fs_path 

912 if follow_symlinks: 

913 if reference_path is not None: 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true

914 raise ValueError( 

915 "The reference_path cannot be used with follow_symlinks" 

916 ) 

917 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

918 

919 fmode: int | None = mode 

920 if use_fs_path_mode: 

921 fmode = None 

922 

923 st = None 

924 if reference_path is None: 

925 st = os.lstat(new_fs_path) 

926 if stat.S_ISDIR(st.st_mode): 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 raise ValueError( 

928 f'The provided path "{fs_path}" is a directory. However, this' 

929 " method does not support directories" 

930 ) 

931 

932 if not stat.S_ISREG(st.st_mode): 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 if follow_symlinks: 

934 raise ValueError( 

935 f"The resolved fs_path ({new_fs_path}) was not a file." 

936 ) 

937 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

938 return FSBackedFilePath( 

939 name, 

940 self, 

941 new_fs_path, 

942 initial_mode=fmode, 

943 stat_cache=st, 

944 replaceable_inline=not require_copy_on_write, 

945 reference_path=reference_path, 

946 ) 

947 

948 def add_symlink( 

949 self, 

950 link_name: str, 

951 link_target: str, 

952 *, 

953 reference_path: VirtualPath | None = None, 

954 ) -> "InMemoryVirtualPathBase": 

955 if "/" in link_name or link_name in {".", ".."}: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 raise ValueError( 

957 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

958 ) 

959 if not self.is_dir: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 raise TypeError( 

961 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

962 f" {self._orphan_safe_path()} is not a directory" 

963 ) 

964 self._rw_check() 

965 

966 existing = self.get(link_name) 

967 if existing: 967 ↛ 969line 967 didn't jump to line 969 because the condition on line 967 was never true

968 # Emulate ln -sf with attempts a non-recursive unlink first. 

969 existing.unlink(recursive=False) 

970 

971 return SymlinkVirtualPath( 

972 link_name, 

973 self, 

974 link_target, 

975 reference_path=reference_path, 

976 ) 

977 

978 def mkdir( 

979 self, 

980 name: str, 

981 *, 

982 reference_path: VirtualPath | None = None, 

983 ) -> "InMemoryVirtualPathBase": 

984 if "/" in name or name in {".", ".."}: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true

985 raise ValueError( 

986 f'Invalid file name: "{name}" (it must be a valid basename)' 

987 ) 

988 if not self.is_dir: 988 ↛ 989line 988 didn't jump to line 989 because the condition on line 988 was never true

989 raise TypeError( 

990 f"Cannot create {self._orphan_safe_path()}/{name}:" 

991 f" {self._orphan_safe_path()} is not a directory" 

992 ) 

993 if reference_path is not None and not reference_path.is_dir: 993 ↛ 994line 993 didn't jump to line 994 because the condition on line 993 was never true

994 raise ValueError( 

995 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

996 ) 

997 self._rw_check() 

998 

999 existing = self.get(name) 

1000 if existing: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true

1001 raise ValueError(f"Path {existing.path} already exist") 

1002 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1003 

1004 def mkdirs(self, path: str) -> "InMemoryVirtualPathBase": 

1005 return cast("InMemoryVirtualPathBase", super().mkdirs(path)) 

1006 

1007 @property 

1008 def is_read_write(self) -> bool: 

1009 """When true, the file system entry may be mutated 

1010 

1011 :return: Whether file system mutations are permitted. 

1012 """ 

1013 if self.is_detached: 

1014 return True 

1015 return assume_not_none(self.parent_dir).is_read_write 

1016 

1017 def unlink(self, *, recursive: bool = False) -> None: 

1018 """Unlink a file or a directory 

1019 

1020 This operation will detach the path from the file system (causing "is_detached" to return True). 

1021 

1022 Note that the root directory cannot be deleted. 

1023 

1024 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1025 as well. When False, an error is raised if the path is a non-empty directory 

1026 """ 

1027 if self.is_detached: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true

1028 return 

1029 if not recursive and self.is_dir and any(self.iterdir()): 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true

1030 raise ValueError( 

1031 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1032 ) 

1033 # The .parent_dir setter does a _rw_check() for us. 

1034 self.parent_dir = None 

1035 

1036 def _reset_caches(self) -> None: 

1037 self._mtime = None 

1038 self._stat_cache = None 

1039 

1040 def metadata( 

1041 self, 

1042 metadata_type: type[PMT], 

1043 *, 

1044 owning_plugin: str | None = None, 

1045 ) -> PathMetadataReference[PMT]: 

1046 current_plugin = self._current_plugin() 

1047 if owning_plugin is None: 1047 ↛ 1049line 1047 didn't jump to line 1049 because the condition on line 1047 was always true

1048 owning_plugin = current_plugin 

1049 metadata_key = (owning_plugin, metadata_type) 

1050 metadata_value = self._metadata.get(metadata_key) 

1051 if metadata_value is None: 

1052 if self.is_detached: 1052 ↛ 1053line 1052 didn't jump to line 1053 because the condition on line 1052 was never true

1053 raise TypeError( 

1054 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1055 ) 

1056 if not self.is_read_write: 

1057 return AlwaysEmptyReadOnlyMetadataReference( 

1058 owning_plugin, 

1059 current_plugin, 

1060 metadata_type, 

1061 ) 

1062 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1063 self._metadata[metadata_key] = metadata_value 

1064 return PathMetadataReferenceImplementation( 

1065 self, 

1066 current_plugin, 

1067 metadata_value, 

1068 ) 

1069 

1070 @contextlib.contextmanager 

1071 def replace_fs_path_content( 

1072 self, 

1073 *, 

1074 use_fs_path_mode: bool = False, 

1075 ) -> Iterator[str]: 

1076 if not self.is_file: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true

1077 raise TypeError( 

1078 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1079 ) 

1080 self._rw_check() 

1081 fs_path = self.fs_path 

1082 if not self._can_replace_inline: 1082 ↛ 1094line 1082 didn't jump to line 1094 because the condition on line 1082 was always true

1083 fs_path = self.fs_path 

1084 directory = generated_content_dir() 

1085 with tempfile.NamedTemporaryFile( 

1086 dir=directory, suffix=f"__{self.name}", delete=False 

1087 ) as new_path_fd: 

1088 new_path_fd.close() 

1089 _cp_a(fs_path, new_path_fd.name) 

1090 fs_path = new_path_fd.name 

1091 self._replaced_path(fs_path) 

1092 assert self.fs_path == fs_path 

1093 

1094 current_mtime = self._mtime 

1095 if current_mtime is not None: 

1096 os.utime(fs_path, (current_mtime, current_mtime)) 

1097 

1098 current_mode = self.mode 

1099 yield fs_path 

1100 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1101 if not use_fs_path_mode: 1101 ↛ 1103line 1101 didn't jump to line 1103 because the condition on line 1101 was always true

1102 os.chmod(fs_path, current_mode) 

1103 self._reset_caches() 

1104 

1105 def _replaced_path(self, new_fs_path: str) -> None: 

1106 raise NotImplementedError 

1107 

1108 

1109class InMemoryVirtualPath(InMemoryVirtualPathBase, ABC): 

1110 __slots__ = ("_children",) 

1111 

1112 def __init__( 

1113 self, 

1114 basename: str, 

1115 parent: Optional["InMemoryVirtualPathBase"], 

1116 children: dict[str, "InMemoryVirtualPathBase"] | None = None, 

1117 initial_mode: int | None = None, 

1118 mtime: float | None = None, 

1119 stat_cache: os.stat_result | None = None, 

1120 ) -> None: 

1121 super().__init__( 

1122 basename, 

1123 parent, 

1124 initial_mode, 

1125 mtime, 

1126 stat_cache, 

1127 ) 

1128 self._children = children 

1129 

1130 def __repr__(self) -> str: 

1131 return ( 

1132 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

1133 f" is_file={self.is_file}," 

1134 f" is_dir={self.is_dir}," 

1135 f" is_symlink={self.is_symlink}," 

1136 f" has_fs_path={self.has_fs_path}," 

1137 f" children_len={len(self._children) if self._children else 0})" 

1138 ) 

1139 

1140 def iterdir(self) -> Iterable["InMemoryVirtualPathBase"]: 

1141 if self._children is not None: 

1142 yield from self._children.values() 

1143 

1144 def __getitem__(self, key) -> "InMemoryVirtualPathBase": 

1145 if self._children is None: 

1146 raise KeyError( 

1147 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

1148 ) 

1149 if isinstance(key, InMemoryVirtualPathBase): 1149 ↛ 1150line 1149 didn't jump to line 1150 because the condition on line 1149 was never true

1150 key = key.name 

1151 return self._children[key] 

1152 

1153 def __delitem__(self, key) -> None: 

1154 self._rw_check() 

1155 children = self._children 

1156 if children is None: 1156 ↛ 1157line 1156 didn't jump to line 1157 because the condition on line 1156 was never true

1157 raise KeyError(key) 

1158 del children[key] 

1159 

1160 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None: 

1161 children = assume_not_none(self._children) 

1162 del children[child.name] 

1163 

1164 def _add_child(self, child: "InMemoryVirtualPathBase") -> None: 

1165 self._rw_check() 

1166 if not self.is_dir: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true

1167 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

1168 if self._children is None: 

1169 self._children = {} 

1170 

1171 conflict_child = self.get(child.name) 

1172 if conflict_child is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true

1173 conflict_child.unlink(recursive=True) 

1174 self._children[child.name] = child 

1175 

1176 def _uncached_stat(self) -> os.stat_result: 

1177 return os.lstat(self.fs_path) 

1178 

1179 def _resolve_initial_mtime(self) -> float: 

1180 return self.stat().st_mtime 

1181 

1182 

1183class InMemoryOverlayFSDirectory(InMemoryVirtualPathBase): 

1184 __slots__ = ( 

1185 "_underlying_directory", 

1186 "_virtual_children", 

1187 "_deleted_children", 

1188 "_has_underlying_read_children", 

1189 ) 

1190 

1191 def __init__( 

1192 self, 

1193 underlying_directory: VirtualPath, 

1194 basename: str, 

1195 parent: Optional["InMemoryVirtualPathBase"], 

1196 initial_mode: int | None = None, 

1197 mtime: float | None = None, 

1198 stat_cache: os.stat_result | None = None, 

1199 ) -> None: 

1200 super().__init__( 

1201 basename, 

1202 parent, 

1203 initial_mode, 

1204 mtime, 

1205 stat_cache, 

1206 ) 

1207 self._underlying_directory: VirtualPath = underlying_directory 

1208 self._virtual_children: dict[str, InMemoryVirtualPathBase] = {} 

1209 self._deleted_children = set[str]() 

1210 self._has_underlying_read_children = False 

1211 

1212 def __getitem__(self, key: object) -> "InMemoryVirtualPathBase": 

1213 if not isinstance(key, str) or key in self._deleted_children: 

1214 raise KeyError(key) 

1215 if c := self._virtual_children.get(key): 

1216 return c 

1217 if not self._ensure_read_underlying_children(): 

1218 raise KeyError(key) 

1219 return self._virtual_children[key] 

1220 

1221 def _ensure_read_underlying_children(self) -> bool: 

1222 if self._has_underlying_read_children: 

1223 return False 

1224 for real_child in self._underlying_directory.iterdir(): 

1225 basename = real_child.name 

1226 if basename in self._virtual_children or basename in self._deleted_children: 

1227 continue 

1228 if real_child.is_dir: 

1229 child = InMemoryOverlayFSDirectory( 

1230 real_child, 

1231 basename, 

1232 self, 

1233 ) 

1234 elif real_child.is_symlink: 

1235 child = SymlinkVirtualPath( 

1236 basename, 

1237 self, 

1238 real_child.readlink(), 

1239 reference_path=real_child, 

1240 ) 

1241 else: 

1242 assert real_child.is_file and real_child.has_fs_path 

1243 child = FSBackedFilePath( 

1244 basename, 

1245 self, 

1246 real_child.fs_path, 

1247 reference_path=real_child, 

1248 ) 

1249 self._virtual_children[child.name] = child 

1250 

1251 def _add_child(self, child: "InMemoryVirtualPathBase") -> None: 

1252 self._virtual_children[child.name] = child 

1253 

1254 def _remove_child(self, child: "InMemoryVirtualPathBase") -> None: 

1255 if child.name in self._deleted_children: 

1256 raise KeyError(child.name) 

1257 try: 

1258 del self._virtual_children[child.name] 

1259 deleted = True 

1260 except KeyError: 

1261 deleted = False 

1262 if not deleted and child.name not in self._underlying_directory: 

1263 raise KeyError(child.name) 

1264 self._deleted_children.add(child.name) 

1265 

1266 def _uncached_stat(self) -> os.stat_result: 

1267 try: 

1268 return self._underlying_directory.stat() # type: ignore 

1269 except AttributeError: 

1270 raise PureVirtualPathError("The stat method has not been implemented") 

1271 

1272 def _resolve_initial_mtime(self) -> float: 

1273 return self._underlying_directory.mtime 

1274 

1275 

1276class InMemoryOverlayFSRootDirectory(InMemoryOverlayFSDirectory): 

1277 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1278 

1279 def __init__(self, underlying_directory: VirtualPath) -> None: 

1280 self._fs_read_write = True 

1281 super().__init__( 

1282 underlying_directory, 

1283 ".", 

1284 None, 

1285 initial_mode=0o755, 

1286 ) 

1287 self._plugin_context = CurrentPluginContextManager("debputy") 

1288 

1289 def is_root_dir(self) -> bool: 

1290 return True 

1291 

1292 @property 

1293 def is_detached(self) -> bool: 

1294 return False 

1295 

1296 def unlink(self, *, recursive: bool = False) -> None: 

1297 # There is never a case where you want to delete this directory (and even if you could, 

1298 # debputy will need it for technical reasons, so the root dir stays) 

1299 raise TypeError("Cannot delete the root directory") 

1300 

1301 def _current_plugin(self) -> str: 

1302 return self._plugin_context.current_plugin_name 

1303 

1304 @contextlib.contextmanager 

1305 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1306 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1307 yield r 

1308 

1309 

1310class VirtualFSPathBase(InMemoryVirtualPath, ABC): 

1311 __slots__ = () 

1312 

1313 def __init__( 

1314 self, 

1315 basename: str, 

1316 parent: Optional["InMemoryVirtualPathBase"], 

1317 children: dict[str, "InMemoryVirtualPathBase"] | None = None, 

1318 initial_mode: int | None = None, 

1319 mtime: float | None = None, 

1320 stat_cache: os.stat_result | None = None, 

1321 ) -> None: 

1322 super().__init__( 

1323 basename, 

1324 parent, 

1325 children, 

1326 initial_mode=initial_mode, 

1327 mtime=mtime, 

1328 stat_cache=stat_cache, 

1329 ) 

1330 

1331 def _resolve_initial_mtime(self) -> float: 

1332 return time.time() 

1333 

1334 @property 

1335 def has_fs_path(self) -> bool: 

1336 return False 

1337 

1338 def stat(self) -> os.stat_result: 

1339 if not self.has_fs_path: 

1340 raise PureVirtualPathError( 

1341 "stat() is only applicable to paths backed by the file system. The path" 

1342 f" {self._orphan_safe_path()!r} is purely virtual" 

1343 ) 

1344 return super().stat() 

1345 

1346 @property 

1347 def fs_path(self) -> str: 

1348 if not self.has_fs_path: 

1349 raise PureVirtualPathError( 

1350 "fs_path is only applicable to paths backed by the file system. The path" 

1351 f" {self._orphan_safe_path()!r} is purely virtual" 

1352 ) 

1353 return self.fs_path 

1354 

1355 

1356class InMemoryVirtualRootDir(InMemoryVirtualPath): 

1357 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1358 

1359 def __init__(self, fs_path: str | None = None) -> None: 

1360 self._fs_path = fs_path 

1361 self._fs_read_write = True 

1362 super().__init__( 

1363 ".", 

1364 None, 

1365 children={}, 

1366 initial_mode=0o755, 

1367 ) 

1368 self._plugin_context = CurrentPluginContextManager("debputy") 

1369 

1370 @property 

1371 def is_detached(self) -> bool: 

1372 return False 

1373 

1374 def _orphan_safe_path(self) -> str: 

1375 return self.name 

1376 

1377 @property 

1378 def path(self) -> str: 

1379 return self.name 

1380 

1381 @property 

1382 def parent_dir(self) -> Optional["InMemoryVirtualPathBase"]: 

1383 return None 

1384 

1385 @parent_dir.setter 

1386 def parent_dir(self, new_parent: InMemoryVirtualPathBase | None) -> None: 

1387 if new_parent is not None: 

1388 raise ValueError("The root directory cannot become a non-root directory") 

1389 

1390 @property 

1391 def parent_dir_path(self) -> str | None: 

1392 return None 

1393 

1394 @property 

1395 def is_dir(self) -> bool: 

1396 return True 

1397 

1398 @property 

1399 def is_file(self) -> bool: 

1400 return False 

1401 

1402 @property 

1403 def is_symlink(self) -> bool: 

1404 return False 

1405 

1406 def readlink(self) -> str: 

1407 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1408 

1409 @property 

1410 def has_fs_path(self) -> bool: 

1411 return self._fs_path is not None 

1412 

1413 def stat(self) -> os.stat_result: 

1414 if not self.has_fs_path: 

1415 raise PureVirtualPathError( 

1416 "stat() is only applicable to paths backed by the file system. The path" 

1417 f" {self._orphan_safe_path()!r} is purely virtual" 

1418 ) 

1419 return os.stat(self.fs_path) 

1420 

1421 @property 

1422 def fs_path(self) -> str: 

1423 if not self.has_fs_path: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true

1424 raise PureVirtualPathError( 

1425 "fs_path is only applicable to paths backed by the file system. The path" 

1426 f" {self._orphan_safe_path()!r} is purely virtual" 

1427 ) 

1428 return assume_not_none(self._fs_path) 

1429 

1430 @property 

1431 def is_read_write(self) -> bool: 

1432 return self._fs_read_write 

1433 

1434 @is_read_write.setter 

1435 def is_read_write(self, new_value: bool) -> None: 

1436 self._fs_read_write = new_value 

1437 

1438 def unlink(self, *, recursive: bool = False) -> None: 

1439 # There is never a case where you want to delete this directory (and even if you could, 

1440 # debputy will need it for technical reasons, so the root dir stays) 

1441 raise TypeError("Cannot delete the root directory") 

1442 

1443 def _current_plugin(self) -> str: 

1444 return self._plugin_context.current_plugin_name 

1445 

1446 @contextlib.contextmanager 

1447 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1448 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1449 yield r 

1450 

1451 

1452class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1453 __slots__ = ("_reference_path",) 

1454 

1455 def __init__( 

1456 self, 

1457 basename: str, 

1458 parent: InMemoryVirtualPathBase, 

1459 *, 

1460 default_mode: int, 

1461 reference_path: VirtualPath | None = None, 

1462 ) -> None: 

1463 super().__init__( 

1464 basename, 

1465 parent=parent, 

1466 initial_mode=reference_path.mode if reference_path else default_mode, 

1467 ) 

1468 self._reference_path = reference_path 

1469 

1470 @property 

1471 def has_fs_path(self) -> bool: 

1472 ref_path = self._reference_path 

1473 return ref_path is not None and ref_path.has_fs_path 

1474 

1475 def _resolve_initial_mtime(self) -> float: 

1476 ref_path = self._reference_path 

1477 if ref_path: 1477 ↛ 1479line 1477 didn't jump to line 1479 because the condition on line 1477 was always true

1478 return ref_path.mtime 

1479 return super()._resolve_initial_mtime() 

1480 

1481 @property 

1482 def fs_path(self) -> str: 

1483 ref_path = self._reference_path 

1484 if ref_path is not None and ( 1484 ↛ 1488line 1484 didn't jump to line 1488 because the condition on line 1484 was always true

1485 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1486 ): 

1487 return ref_path.fs_path 

1488 return super().fs_path 

1489 

1490 def stat(self) -> os.stat_result: 

1491 ref_path = self._reference_path 

1492 if ref_path is not None and ( 

1493 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1494 ): 

1495 return typing.cast(VirtualPathBase, ref_path).stat() 

1496 return super().stat() 

1497 

1498 @overload 

1499 def open( 1499 ↛ exitline 1499 didn't return from function 'open' because

1500 self, 

1501 *, 

1502 byte_io: Literal[False] = False, 

1503 buffering: int = -1, 

1504 ) -> TextIO: ... 

1505 

1506 @overload 

1507 def open( 1507 ↛ exitline 1507 didn't return from function 'open' because

1508 self, 

1509 *, 

1510 byte_io: Literal[True], 

1511 buffering: Literal[0] = ..., 

1512 ) -> io.FileIO: ... 

1513 

1514 @overload 

1515 def open( 1515 ↛ exitline 1515 didn't return from function 'open' because

1516 self, 

1517 *, 

1518 byte_io: Literal[True], 

1519 buffering: int = -1, 

1520 ) -> io.BufferedReader: ... 

1521 

1522 def open(self, *, byte_io=False, buffering=-1): 

1523 reference_path = self._reference_path 

1524 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1525 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1526 return super().open(byte_io=byte_io, buffering=buffering) 

1527 

1528 

1529class VirtualDirectoryFSPath(VirtualPathWithReference): 

1530 __slots__ = ("_reference_path",) 

1531 

1532 def __init__( 

1533 self, 

1534 basename: str, 

1535 parent: InMemoryVirtualPathBase, 

1536 *, 

1537 reference_path: VirtualPath | None = None, 

1538 ) -> None: 

1539 super().__init__( 

1540 basename, 

1541 parent, 

1542 reference_path=reference_path, 

1543 default_mode=0o755, 

1544 ) 

1545 self._reference_path = reference_path 

1546 assert reference_path is None or reference_path.is_dir 

1547 self._ensure_min_mode() 

1548 

1549 @property 

1550 def is_dir(self) -> bool: 

1551 return True 

1552 

1553 @property 

1554 def is_file(self) -> bool: 

1555 return False 

1556 

1557 @property 

1558 def is_symlink(self) -> bool: 

1559 return False 

1560 

1561 def readlink(self) -> str: 

1562 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1563 

1564 

1565class SymlinkVirtualPath(VirtualPathWithReference): 

1566 __slots__ = ("_link_target",) 

1567 

1568 def __init__( 

1569 self, 

1570 basename: str, 

1571 parent_dir: InMemoryVirtualPathBase, 

1572 link_target: str, 

1573 *, 

1574 reference_path: VirtualPath | None = None, 

1575 ) -> None: 

1576 super().__init__( 

1577 basename, 

1578 parent=parent_dir, 

1579 default_mode=_SYMLINK_MODE, 

1580 reference_path=reference_path, 

1581 ) 

1582 self._link_target = link_target 

1583 

1584 @property 

1585 def is_dir(self) -> bool: 

1586 return False 

1587 

1588 @property 

1589 def is_file(self) -> bool: 

1590 return False 

1591 

1592 @property 

1593 def is_symlink(self) -> bool: 

1594 return True 

1595 

1596 def readlink(self) -> str: 

1597 return self._link_target 

1598 

1599 @property 

1600 def size(self) -> int: 

1601 return len(self.readlink()) 

1602 

1603 

1604class FSBackedFilePath(VirtualPathWithReference): 

1605 __slots__ = ("_fs_path", "_replaceable_inline") 

1606 

1607 def __init__( 

1608 self, 

1609 basename: str, 

1610 parent_dir: InMemoryVirtualPathBase, 

1611 fs_path: str, 

1612 *, 

1613 replaceable_inline: bool = False, 

1614 initial_mode: int | None = None, 

1615 mtime: float | None = None, 

1616 stat_cache: os.stat_result | None = None, 

1617 reference_path: VirtualPath | None = None, 

1618 ) -> None: 

1619 super().__init__( 

1620 basename, 

1621 parent_dir, 

1622 default_mode=0o644, 

1623 reference_path=reference_path, 

1624 ) 

1625 self._fs_path = fs_path 

1626 self._replaceable_inline = replaceable_inline 

1627 if initial_mode is not None: 

1628 self.mode = initial_mode 

1629 if mtime is not None: 

1630 self._mtime = mtime 

1631 self._stat_cache = stat_cache 

1632 assert ( 

1633 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1634 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1635 self._ensure_min_mode() 

1636 

1637 @property 

1638 def is_dir(self) -> bool: 

1639 return False 

1640 

1641 @property 

1642 def is_file(self) -> bool: 

1643 return True 

1644 

1645 @property 

1646 def is_symlink(self) -> bool: 

1647 return False 

1648 

1649 def readlink(self) -> str: 

1650 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1651 

1652 @property 

1653 def has_fs_path(self) -> bool: 

1654 return True 

1655 

1656 @property 

1657 def fs_path(self) -> str: 

1658 return self._fs_path 

1659 

1660 @property 

1661 def _can_replace_inline(self) -> bool: 

1662 return self._replaceable_inline 

1663 

1664 def _replaced_path(self, new_fs_path: str) -> None: 

1665 self._fs_path = new_fs_path 

1666 self._reference_path = None 

1667 self._replaceable_inline = True 

1668 

1669 

1670_SYMLINK_MODE = 0o777 

1671 

1672 

1673class VirtualTestPath(InMemoryVirtualPath): 

1674 __slots__ = ( 

1675 "_path_type", 

1676 "_has_fs_path", 

1677 "_fs_path", 

1678 "_link_target", 

1679 "_content", 

1680 "_materialized_content", 

1681 ) 

1682 

1683 def __init__( 

1684 self, 

1685 basename: str, 

1686 parent_dir: InMemoryVirtualPathBase | None, 

1687 mode: int | None = None, 

1688 mtime: float | None = None, 

1689 is_dir: bool = False, 

1690 has_fs_path: bool | None = False, 

1691 fs_path: str | None = None, 

1692 link_target: str | None = None, 

1693 content: str | None = None, 

1694 materialized_content: str | None = None, 

1695 ) -> None: 

1696 if is_dir: 

1697 self._path_type = PathType.DIRECTORY 

1698 elif link_target is not None: 

1699 self._path_type = PathType.SYMLINK 

1700 if mode is not None and mode != _SYMLINK_MODE: 1700 ↛ 1701line 1700 didn't jump to line 1701 because the condition on line 1700 was never true

1701 raise ValueError( 

1702 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1703 ) 

1704 assert mode is None or mode == _SYMLINK_MODE 

1705 else: 

1706 self._path_type = PathType.FILE 

1707 

1708 if mode is not None: 

1709 initial_mode = mode 

1710 else: 

1711 initial_mode = 0o755 if is_dir else 0o644 

1712 

1713 self._link_target = link_target 

1714 if has_fs_path is None: 

1715 has_fs_path = bool(fs_path) 

1716 self._has_fs_path = has_fs_path 

1717 self._fs_path = fs_path 

1718 self._materialized_content = materialized_content 

1719 super().__init__( 

1720 basename, 

1721 parent=parent_dir, 

1722 initial_mode=initial_mode, 

1723 mtime=mtime, 

1724 ) 

1725 self._content = content 

1726 

1727 @property 

1728 def is_dir(self) -> bool: 

1729 return self._path_type == PathType.DIRECTORY 

1730 

1731 @property 

1732 def is_file(self) -> bool: 

1733 return self._path_type == PathType.FILE 

1734 

1735 @property 

1736 def is_symlink(self) -> bool: 

1737 return self._path_type == PathType.SYMLINK 

1738 

1739 def readlink(self) -> str: 

1740 if not self.is_symlink: 1740 ↛ 1741line 1740 didn't jump to line 1741 because the condition on line 1740 was never true

1741 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1742 link_target = self._link_target 

1743 assert link_target is not None 

1744 return link_target 

1745 

1746 def _resolve_initial_mtime(self) -> float: 

1747 return time.time() 

1748 

1749 @property 

1750 def has_fs_path(self) -> bool: 

1751 return self._has_fs_path 

1752 

1753 def stat(self) -> os.stat_result: 

1754 if self.has_fs_path: 

1755 path = self.fs_path 

1756 if path is None: 1756 ↛ 1757line 1756 didn't jump to line 1757 because the condition on line 1756 was never true

1757 raise PureVirtualPathError( 

1758 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1759 " cannot provide!" 

1760 ) 

1761 try: 

1762 return os.stat(path) 

1763 except FileNotFoundError as e: 

1764 raise PureVirtualPathError( 

1765 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1766 " cannot provide! (An fs_path was provided, but it did not exist)" 

1767 ) from e 

1768 

1769 raise PureVirtualPathError( 

1770 "stat() is only applicable to paths backed by the file system. The path" 

1771 f" {self._orphan_safe_path()!r} is purely virtual" 

1772 ) 

1773 

1774 @property 

1775 def size(self) -> int: 

1776 if self._content is not None: 

1777 return len(self._content.encode("utf-8")) 

1778 if self.is_symlink: 

1779 return len(self.readlink()) 

1780 if not self.has_fs_path or self.fs_path is None: 

1781 return 0 

1782 return self.stat().st_size 

1783 

1784 @property 

1785 def fs_path(self) -> str: 

1786 if self.has_fs_path: 

1787 if self._fs_path is None and self._materialized_content is not None: 

1788 with tempfile.NamedTemporaryFile( 

1789 mode="w+t", 

1790 encoding="utf-8", 

1791 suffix=f"__{self.name}", 

1792 delete=False, 

1793 ) as fd: 

1794 filepath = fd.name 

1795 fd.write(self._materialized_content) 

1796 self._fs_path = filepath 

1797 atexit.register(lambda: os.unlink(filepath)) 

1798 

1799 path = self._fs_path 

1800 if path is None: 1800 ↛ 1801line 1800 didn't jump to line 1801 because the condition on line 1800 was never true

1801 raise PureVirtualPathError( 

1802 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1803 " mock path cannot provide!" 

1804 ) 

1805 return path 

1806 raise PureVirtualPathError( 

1807 "fs_path is only applicable to paths backed by the file system. The path" 

1808 f" {self._orphan_safe_path()!r} is purely virtual" 

1809 ) 

1810 

1811 def replace_fs_path_content( 

1812 self, 

1813 *, 

1814 use_fs_path_mode: bool = False, 

1815 ) -> ContextManager[str]: 

1816 if self._content is not None: 1816 ↛ 1817line 1816 didn't jump to line 1817 because the condition on line 1816 was never true

1817 raise TypeError( 

1818 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1819 " created with `content` but for this method to work, the path should have been" 

1820 " created with `materialized_content`" 

1821 ) 

1822 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1823 

1824 @overload 

1825 def open_child( 1825 ↛ exitline 1825 didn't return from function 'open_child' because

1826 self, 

1827 name: str, 

1828 mode: TextOpenMode = "r", 

1829 buffering: int = -1, 

1830 ) -> TextIO: ... 

1831 

1832 @overload 

1833 def open_child( 1833 ↛ exitline 1833 didn't return from function 'open_child' because

1834 self, 

1835 name: str, 

1836 mode: BinaryOpenMode, 

1837 buffering: int = -1, 

1838 ) -> BinaryIO: ... 

1839 

1840 @contextlib.contextmanager 

1841 def open_child(self, name, mode="r", buffering=-1): 

1842 existing = self.get(name) 

1843 if existing or "r" in mode: 

1844 with super().open_child(name, mode, buffering=buffering) as fd: 

1845 yield fd 

1846 return 

1847 if "b" in mode: 

1848 fd = io.BytesIO(b"") 

1849 yield fd 

1850 content = fd.getvalue().decode("utf-8") 

1851 else: 

1852 fd = io.StringIO("") 

1853 yield fd 

1854 content = fd.getvalue() 

1855 VirtualTestPath( 

1856 name, 

1857 self, 

1858 mode=0o644, 

1859 content=content, 

1860 has_fs_path=True, 

1861 ) 

1862 

1863 @overload 

1864 def open( 1864 ↛ exitline 1864 didn't return from function 'open' because

1865 self, 

1866 *, 

1867 byte_io: Literal[False] = False, 

1868 buffering: int = -1, 

1869 ) -> TextIO: ... 

1870 

1871 @overload 

1872 def open( 1872 ↛ exitline 1872 didn't return from function 'open' because

1873 self, 

1874 *, 

1875 byte_io: Literal[True], 

1876 buffering: Literal[0] = ..., 

1877 ) -> io.FileIO: ... 

1878 

1879 @overload 

1880 def open( 1880 ↛ exitline 1880 didn't return from function 'open' because

1881 self, 

1882 *, 

1883 byte_io: Literal[True], 

1884 buffering: int = -1, 

1885 ) -> io.BufferedReader: ... 

1886 

1887 def open(self, *, byte_io=False, buffering=-1): 

1888 if self._content is None: 

1889 try: 

1890 return super().open(byte_io=byte_io, buffering=buffering) 

1891 except FileNotFoundError as e: 

1892 raise TestPathWithNonExistentFSPathError( 

1893 f"The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1894 " exist. This exception can only occur in the testsuite. Either have the" 

1895 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1896 " if that is too painful in general, have the code accept this error as a " 

1897 " test only-case and provide a default." 

1898 ) from e 

1899 

1900 if byte_io: 

1901 return io.BytesIO(self._content.encode("utf-8")) 

1902 return io.StringIO(self._content) 

1903 

1904 def _replaced_path(self, new_fs_path: str) -> None: 

1905 self._fs_path = new_fs_path 

1906 

1907 

1908class OSFSOverlayBase(VirtualPathBase, Generic[FSP]): 

1909 __slots__ = ( 

1910 "_path", 

1911 "_fs_path", 

1912 "_parent", 

1913 "__weakref__", 

1914 ) 

1915 

1916 def __init__( 

1917 self, 

1918 path: str, 

1919 fs_path: str, 

1920 parent: FSP | None, 

1921 ) -> None: 

1922 self._path: str = path 

1923 prefix = "/" if fs_path.startswith("/") else "" 

1924 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1925 self._parent: ReferenceType[FSP] | None = ( 

1926 ref(parent) if parent is not None else None 

1927 ) 

1928 

1929 @property 

1930 def name(self) -> str: 

1931 return os.path.basename(self._path) 

1932 

1933 @property 

1934 def path(self) -> str: 

1935 return self._path 

1936 

1937 @property 

1938 def parent_dir(self) -> Optional["FSP"]: 

1939 parent = self._parent 

1940 if parent is None: 

1941 return None 

1942 resolved = parent() 

1943 if resolved is None: 

1944 raise RuntimeError("Parent was garbage collected!") 

1945 return resolved 

1946 

1947 @property 

1948 def fs_path(self) -> str: 

1949 return self._fs_path 

1950 

1951 def stat(self) -> os.stat_result: 

1952 return os.lstat(self.fs_path) 

1953 

1954 @property 

1955 def is_dir(self) -> bool: 

1956 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1957 try: 

1958 return stat.S_ISDIR(self.stat().st_mode) 

1959 except FileNotFoundError: 

1960 return False 

1961 

1962 @property 

1963 def is_file(self) -> bool: 

1964 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1965 try: 

1966 return stat.S_ISREG(self.stat().st_mode) 

1967 except FileNotFoundError: 

1968 return False 

1969 

1970 @property 

1971 def is_symlink(self) -> bool: 

1972 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1973 try: 

1974 return stat.S_ISLNK(self.stat().st_mode) 

1975 except FileNotFoundError: 

1976 return False 

1977 

1978 @property 

1979 def has_fs_path(self) -> bool: 

1980 return True 

1981 

1982 @overload 

1983 def open( 1983 ↛ exitline 1983 didn't return from function 'open' because

1984 self, 

1985 *, 

1986 byte_io: Literal[False] = False, 

1987 buffering: int = -1, 

1988 ) -> TextIO: ... 

1989 

1990 @overload 

1991 def open( 1991 ↛ exitline 1991 didn't return from function 'open' because

1992 self, 

1993 *, 

1994 byte_io: Literal[True], 

1995 buffering: Literal[0] = ..., 

1996 ) -> io.FileIO: ... 

1997 

1998 @overload 

1999 def open( 1999 ↛ exitline 1999 didn't return from function 'open' because

2000 self, 

2001 *, 

2002 byte_io: Literal[True], 

2003 buffering: int = -1, 

2004 ) -> io.BufferedReader: ... 

2005 

2006 def open(self, *, byte_io=False, buffering=-1): 

2007 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

2008 # case. 

2009 if not self.is_file and not self.is_symlink: 

2010 raise TypeError( 

2011 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

2012 ) 

2013 

2014 if byte_io: 

2015 return open(self.fs_path, "rb", buffering=buffering) 

2016 return open(self.fs_path, encoding="utf-8", buffering=buffering) 

2017 

2018 def metadata( 

2019 self, 

2020 metadata_type: type[PMT], 

2021 *, 

2022 owning_plugin: str | None = None, 

2023 ) -> PathMetadataReference[PMT]: 

2024 current_plugin = self._current_plugin() 

2025 if owning_plugin is None: 

2026 owning_plugin = current_plugin 

2027 return AlwaysEmptyReadOnlyMetadataReference( 

2028 owning_plugin, 

2029 current_plugin, 

2030 metadata_type, 

2031 ) 

2032 

2033 def all_paths(self) -> Iterable["OSFSControlPath"]: 

2034 yield cast("OSFSControlPath", self) 

2035 if not self.is_dir: 

2036 return 

2037 stack = list(self.iterdir()) 

2038 stack.reverse() 

2039 while stack: 

2040 current = cast("OSFSControlPath", stack.pop()) 

2041 yield current 

2042 if current.is_dir: 

2043 stack.extend(reversed(list(current.iterdir()))) 

2044 

2045 def _resolve_children( 

2046 self, 

2047 new_child: Callable[[str, str, FSP], FSC], 

2048 ) -> Mapping[str, FSC]: 

2049 if not self.is_dir: 

2050 return {} 

2051 dir_path = self.path 

2052 dir_fs_path = self.fs_path 

2053 children = {} 

2054 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

2055 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

2056 child_fs_path = ( 

2057 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

2058 ) 

2059 children[name] = new_child( 

2060 child_path, 

2061 child_fs_path, 

2062 cast("FSP", self), 

2063 ) 

2064 return children 

2065 

2066 

2067class OSFSROOverlay(OSFSOverlayBase["OSFSROOverlay"]): 

2068 __slots__ = ( 

2069 "_stat_cache", 

2070 "_readlink_cache", 

2071 "_children", 

2072 "_stat_failed_cache", 

2073 ) 

2074 

2075 def __init__( 

2076 self, 

2077 path: str, 

2078 fs_path: str, 

2079 parent: Optional["OSFSROOverlay"], 

2080 ) -> None: 

2081 super().__init__(path, fs_path, parent=parent) 

2082 self._stat_cache: os.stat_result | None = None 

2083 self._readlink_cache: str | None = None 

2084 self._stat_failed_cache = False 

2085 self._children: Mapping[str, OSFSROOverlay] | None = None 

2086 

2087 @classmethod 

2088 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay": 

2089 return OSFSROOverlay(path, fs_path, None) 

2090 

2091 def iterdir(self) -> Iterable["OSFSROOverlay"]: 

2092 if not self.is_dir: 

2093 return 

2094 if self._children is None: 

2095 self._ensure_children_are_resolved() 

2096 yield from assume_not_none(self._children).values() 

2097 

2098 def lookup(self, path: str) -> Optional["OSFSROOverlay"]: 

2099 if not self.is_dir: 

2100 return None 

2101 if self._children is None: 

2102 self._ensure_children_are_resolved() 

2103 

2104 absolute, _, path_parts = _split_path(path) 

2105 current = cast("OSFSROOverlay", _root(self)) if absolute else self 

2106 for no, dir_part in enumerate(path_parts): 

2107 if dir_part == ".": 

2108 continue 

2109 if dir_part == "..": 

2110 if current.is_root_dir(): 

2111 raise ValueError(f'The path "{path}" escapes the root dir') 

2112 p = current.parent_dir 

2113 assert p is not None # Type hint 

2114 current = cast("OSFSROOverlay", p) 

2115 continue 

2116 try: 

2117 current = cast("OSFSROOverlay", current[dir_part]) 

2118 except KeyError: 

2119 return None 

2120 return current 

2121 

2122 def _ensure_children_are_resolved(self) -> None: 

2123 if not self.is_dir or self._children: 

2124 return 

2125 self._children = self._resolve_children( 

2126 lambda n, fsp, p: OSFSROOverlay(n, fsp, p) 

2127 ) 

2128 

2129 @property 

2130 def is_detached(self) -> bool: 

2131 return False 

2132 

2133 def __getitem__(self, key) -> "VirtualPath": 

2134 if not self.is_dir: 2134 ↛ 2136line 2134 didn't jump to line 2136 because the condition on line 2134 was always true

2135 raise KeyError(key) 

2136 if self._children is None: 

2137 self._ensure_children_are_resolved() 

2138 if isinstance(key, InMemoryVirtualPathBase): 

2139 key = key.name 

2140 return assume_not_none(self._children)[key] 

2141 

2142 def __delitem__(self, key) -> Never: 

2143 self._error_ro_fs() 

2144 

2145 @property 

2146 def is_read_write(self) -> bool: 

2147 return False 

2148 

2149 def _rw_check(self) -> Never: 

2150 self._error_ro_fs() 

2151 

2152 def _error_ro_fs(self) -> Never: 

2153 raise DebputyFSIsROError( 

2154 f'Attempt to write to "{self.path}" failed:' 

2155 " Debputy Virtual File system is R/O." 

2156 ) 

2157 

2158 def stat(self) -> os.stat_result: 

2159 if self._stat_failed_cache: 2159 ↛ 2160line 2159 didn't jump to line 2160 because the condition on line 2159 was never true

2160 raise FileNotFoundError( 

2161 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

2162 ) 

2163 

2164 if self._stat_cache is None: 2164 ↛ 2170line 2164 didn't jump to line 2170 because the condition on line 2164 was always true

2165 try: 

2166 self._stat_cache = os.lstat(self.fs_path) 

2167 except FileNotFoundError: 

2168 self._stat_failed_cache = True 

2169 raise 

2170 return self._stat_cache 

2171 

2172 @property 

2173 def mode(self) -> int: 

2174 return stat.S_IMODE(self.stat().st_mode) 

2175 

2176 @mode.setter 

2177 def mode(self, _unused: int) -> Never: 

2178 self._error_ro_fs() 

2179 

2180 @property 

2181 def mtime(self) -> float: 

2182 return self.stat().st_mtime 

2183 

2184 @mtime.setter 

2185 def mtime(self, new_mtime: float) -> Never: 

2186 self._error_ro_fs() 

2187 

2188 def readlink(self) -> str: 

2189 if not self.is_symlink: 

2190 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2191 if self._readlink_cache is None: 

2192 self._readlink_cache = os.readlink(self.fs_path) 

2193 return self._readlink_cache 

2194 

2195 def chown( 

2196 self, 

2197 owner: StaticFileSystemOwner | None, 

2198 group: StaticFileSystemGroup | None, 

2199 ) -> Never: 

2200 self._error_ro_fs() 

2201 

2202 def mkdir(self, name: str) -> Never: 

2203 self._error_ro_fs() 

2204 

2205 def add_file( 

2206 self, 

2207 name: str, 

2208 *, 

2209 unlink_if_exists: bool = True, 

2210 use_fs_path_mode: bool = False, 

2211 mode: int = 0o0644, 

2212 mtime: float | None = None, 

2213 ) -> Never: 

2214 self._error_ro_fs() 

2215 

2216 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2217 self._error_ro_fs() 

2218 

2219 def unlink(self, *, recursive: bool = False) -> Never: 

2220 self._error_ro_fs() 

2221 

2222 

2223class OSFSROOverlayRootDir(OSFSROOverlay): 

2224 __slots__ = ("_plugin_context",) 

2225 

2226 def __init__(self, path: str, fs_path: str) -> None: 

2227 super().__init__(path, fs_path, None) 

2228 self._plugin_context = CurrentPluginContextManager("debputy") 

2229 

2230 def _current_plugin(self) -> str: 

2231 return self._plugin_context.current_plugin_name 

2232 

2233 @contextlib.contextmanager 

2234 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2235 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2236 yield r 

2237 

2238 

2239class OSFSControlPath(OSFSOverlayBase["OSFSControlPath"]): 

2240 

2241 def iterdir(self) -> Iterable["OSFSControlPath"]: 

2242 if not self.is_dir: 

2243 return 

2244 yield from self._resolve_children( 

2245 lambda n, fsp, p: OSFSControlPath(n, fsp, p) 

2246 ).values() 

2247 

2248 def lookup(self, path: str) -> Optional["OSFSControlPath"]: 

2249 if not self.is_dir: 

2250 return None 

2251 

2252 absolute, _, path_parts = _split_path(path) 

2253 current = cast("OSFSControlPath", _root(self)) if absolute else self 

2254 for no, dir_part in enumerate(path_parts): 

2255 if dir_part == ".": 

2256 continue 

2257 if dir_part == "..": 

2258 if current.is_root_dir(): 

2259 raise ValueError(f'The path "{path}" escapes the root dir') 

2260 p = current.parent_dir 

2261 assert p is not None # type hint 

2262 current = cast("OSFSControlPath", p) 

2263 continue 

2264 try: 

2265 current = cast("OSFSControlPath", current[dir_part]) 

2266 except KeyError: 

2267 return None 

2268 return current 

2269 

2270 @property 

2271 def is_detached(self) -> bool: 

2272 try: 

2273 self.stat() 

2274 except FileNotFoundError: 

2275 return True 

2276 else: 

2277 return False 

2278 

2279 def __getitem__(self, key) -> "VirtualPath": 

2280 if not self.is_dir: 

2281 raise KeyError(key) 

2282 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p)) 

2283 if isinstance(key, InMemoryVirtualPathBase): 

2284 key = key.name 

2285 return children[key] 

2286 

2287 def __delitem__(self, key) -> None: 

2288 self[key].unlink() 

2289 

2290 @property 

2291 def is_read_write(self) -> bool: 

2292 return True 

2293 

2294 @property 

2295 def mode(self) -> int: 

2296 return stat.S_IMODE(self.stat().st_mode) 

2297 

2298 @mode.setter 

2299 def mode(self, new_mode: int) -> None: 

2300 os.chmod(self.fs_path, new_mode) 

2301 

2302 @property 

2303 def mtime(self) -> float: 

2304 return self.stat().st_mtime 

2305 

2306 @mtime.setter 

2307 def mtime(self, new_mtime: float) -> None: 

2308 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2309 

2310 def readlink(self) -> Never: 

2311 if not self.is_symlink: 

2312 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2313 assert False 

2314 

2315 def chown( 

2316 self, 

2317 owner: StaticFileSystemOwner | None, 

2318 group: StaticFileSystemGroup | None, 

2319 ) -> None: 

2320 raise ValueError( 

2321 "No need to chown paths in the control.tar: They are always root:root" 

2322 ) 

2323 

2324 def mkdir(self, name: str) -> Never: 

2325 raise TypeError("The control.tar never contains subdirectories.") 

2326 

2327 @contextlib.contextmanager 

2328 def add_file( 

2329 self, 

2330 name: str, 

2331 *, 

2332 unlink_if_exists: bool = True, 

2333 use_fs_path_mode: bool = False, 

2334 mode: int = 0o0644, 

2335 mtime: float | None = None, 

2336 ) -> Iterator["VirtualPath"]: 

2337 if "/" in name or name in {".", ".."}: 

2338 raise ValueError(f'Invalid file name: "{name}"') 

2339 if not self.is_dir: 

2340 raise TypeError( 

2341 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2342 f" {self._orphan_safe_path()} is not a directory" 

2343 ) 

2344 self._rw_check() 

2345 existing = self.get(name) 

2346 if existing is not None: 

2347 if not unlink_if_exists: 

2348 raise ValueError( 

2349 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2350 f" and exist_ok was False" 

2351 ) 

2352 assert existing.is_file 

2353 

2354 fs_path = os.path.join(self.fs_path, name) 

2355 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2356 with open(fs_path, "wb") as fd: 

2357 # Ensure that the fs_path exists and default mode is reasonable 

2358 os.chmod(fd.fileno(), mode) 

2359 child = OSFSControlPath( 

2360 name, 

2361 fs_path, 

2362 self, 

2363 ) 

2364 yield child 

2365 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2366 child.mode = mode 

2367 

2368 @contextlib.contextmanager 

2369 def replace_fs_path_content( 

2370 self, 

2371 *, 

2372 use_fs_path_mode: bool = False, 

2373 ) -> Iterator[str]: 

2374 if not self.is_file: 

2375 raise TypeError( 

2376 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2377 ) 

2378 restore_mode = self.mode if use_fs_path_mode else None 

2379 yield self.fs_path 

2380 _check_fs_path_is_file(self.fs_path, self) 

2381 if restore_mode is not None: 

2382 self.mode = restore_mode 

2383 

2384 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2385 raise TypeError("The control.tar never contains symlinks.") 

2386 

2387 def unlink(self, *, recursive: bool = False) -> None: 

2388 if self._parent is None: 

2389 return 

2390 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2391 # matters and that `os.unlink` will be sufficient. 

2392 assert self.is_file 

2393 os.unlink(self.fs_path) 

2394 

2395 

2396class FSControlRootDir(OSFSControlPath): 

2397 

2398 @classmethod 

2399 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2400 return FSControlRootDir(".", fs_path, None) 

2401 

2402 def insert_file_from_fs_path( 

2403 self, 

2404 name: str, 

2405 fs_path: str, 

2406 *, 

2407 exist_ok: bool = True, 

2408 use_fs_path_mode: bool = False, 

2409 mode: int = 0o0644, 

2410 # Ignored, but accepted for compat with FSPath's variant of this function. 

2411 # - This is used by install_or_generate_conffiles. 

2412 reference_path: VirtualPath | None = None, # noqa 

2413 ) -> "OSFSControlPath": 

2414 if "/" in name or name in {".", ".."}: 

2415 raise ValueError(f'Invalid file name: "{name}"') 

2416 if not self.is_dir: 

2417 raise TypeError( 

2418 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2419 f" {self._orphan_safe_path()} is not a directory" 

2420 ) 

2421 self._rw_check() 

2422 if name in self and not exist_ok: 

2423 raise ValueError( 

2424 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2425 f" and exist_ok was False" 

2426 ) 

2427 

2428 target_path = os.path.join(self.fs_path, name) 

2429 if use_fs_path_mode: 

2430 shutil.copymode( 

2431 fs_path, 

2432 target_path, 

2433 follow_symlinks=True, 

2434 ) 

2435 else: 

2436 shutil.copyfile( 

2437 fs_path, 

2438 target_path, 

2439 follow_symlinks=True, 

2440 ) 

2441 os.chmod(target_path, mode) 

2442 return cast("OSFSControlPath", self[name]) 

2443 

2444 

2445def as_path_def(pd: str | PathDef) -> PathDef: 

2446 return PathDef(pd) if isinstance(pd, str) else pd 

2447 

2448 

2449def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]: 

2450 yield from (as_path_def(p) for p in paths) 

2451 

2452 

2453def build_virtual_fs( 

2454 paths: Iterable[str | PathDef], 

2455 read_write_fs: bool = False, 

2456) -> "InMemoryVirtualPathBase": 

2457 root_dir: InMemoryVirtualRootDir | None = None 

2458 directories: dict[str, InMemoryVirtualPathBase] = {} 

2459 non_directories = set() 

2460 

2461 def _ensure_parent_dirs(p: str) -> None: 

2462 current = p.rstrip("/") 

2463 missing_dirs = [] 

2464 while True: 

2465 current = os.path.dirname(current) 

2466 if current in directories: 

2467 break 

2468 if current in non_directories: 2468 ↛ 2469line 2468 didn't jump to line 2469 because the condition on line 2468 was never true

2469 raise ValueError( 

2470 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2471 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2472 ) 

2473 missing_dirs.append(current) 

2474 for dir_path in reversed(missing_dirs): 

2475 parent_dir = directories[os.path.dirname(dir_path)] 

2476 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2477 directories[dir_path] = d 

2478 

2479 for path_def in as_path_defs(paths): 

2480 path = path_def.path_name 

2481 if path in directories or path in non_directories: 2481 ↛ 2482line 2481 didn't jump to line 2482 because the condition on line 2481 was never true

2482 raise ValueError( 

2483 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2484 ' "correct order" (ensure directories occur before their children)' 

2485 ) 

2486 if root_dir is None: 

2487 root_fs_path = None 

2488 if path in (".", "./", "/"): 

2489 root_fs_path = path_def.fs_path 

2490 root_dir = InMemoryVirtualRootDir(fs_path=root_fs_path) 

2491 directories["."] = root_dir 

2492 

2493 if path not in (".", "./", "/") and not path.startswith("./"): 

2494 path = "./" + path 

2495 if path not in (".", "./", "/"): 

2496 _ensure_parent_dirs(path) 

2497 if path in (".", "./"): 

2498 assert "." in directories 

2499 continue 

2500 is_dir = False 

2501 if path.endswith("/"): 

2502 path = path[:-1] 

2503 is_dir = True 

2504 directory = directories[os.path.dirname(path)] 

2505 assert not is_dir or not bool( 

2506 path_def.link_target 

2507 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2508 fs_path = VirtualTestPath( 

2509 os.path.basename(path), 

2510 directory, 

2511 is_dir=is_dir, 

2512 mode=path_def.mode, 

2513 mtime=path_def.mtime, 

2514 has_fs_path=path_def.has_fs_path, 

2515 fs_path=path_def.fs_path, 

2516 link_target=path_def.link_target, 

2517 content=path_def.content, 

2518 materialized_content=path_def.materialized_content, 

2519 ) 

2520 assert not fs_path.is_detached 

2521 if fs_path.is_dir: 

2522 directories[fs_path.path] = fs_path 

2523 else: 

2524 non_directories.add(fs_path.path) 

2525 

2526 if root_dir is None: 

2527 root_dir = InMemoryVirtualRootDir() 

2528 

2529 root_dir.is_read_write = read_write_fs 

2530 return root_dir