Coverage for src/debputy/filesystem_scan.py: 67%

1302 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-14 10:41 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import shutil 

9import stat 

10import subprocess 

11import tempfile 

12import time 

13import typing 

14from abc import ABC 

15from contextlib import suppress 

16from typing import ( 

17 Optional, 

18 cast, 

19 Any, 

20 ContextManager, 

21 TextIO, 

22 BinaryIO, 

23 Generic, 

24 TypeVar, 

25 overload, 

26 Literal, 

27 Never, 

28) 

29from collections.abc import Iterable, Iterator, Mapping, Callable 

30from weakref import ref, ReferenceType 

31 

32from debputy.exceptions import ( 

33 PureVirtualPathError, 

34 DebputyFSIsROError, 

35 DebputyMetadataAccessError, 

36 TestPathWithNonExistentFSPathError, 

37 SymlinkLoopError, 

38) 

39from debputy.intermediate_manifest import PathType 

40from debputy.manifest_parser.base_types import ( 

41 ROOT_DEFINITION, 

42 StaticFileSystemOwner, 

43 StaticFileSystemGroup, 

44) 

45from debputy.plugin.api.spec import ( 

46 VirtualPath, 

47 PathDef, 

48 PathMetadataReference, 

49 PMT, 

50) 

51from debputy.types import VP 

52from debputy.util import ( 

53 generated_content_dir, 

54 _error, 

55 escape_shell, 

56 assume_not_none, 

57 _normalize_path, 

58 _debug_log, 

59) 

60 

61BY_BASENAME = operator.attrgetter("name") 

62 

63FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

64FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

65 

66 

67BinaryOpenMode = Literal[ 

68 "rb", 

69 "r+b", 

70 "wb", 

71 "w+b", 

72 "xb", 

73 "ab", 

74] 

75TextOpenMode = Literal[ 

76 "r", 

77 "r+", 

78 "rt", 

79 "r+t", 

80 "w", 

81 "w+", 

82 "wt", 

83 "w+t", 

84 "x", 

85 "xt", 

86 "a", 

87 "at", 

88] 

89OpenMode = Literal[BinaryOpenMode, TextOpenMode] 

90 

91 

92class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

93 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

94 

95 def __init__( 

96 self, 

97 owning_plugin: str, 

98 current_plugin: str, 

99 metadata_type: type[PMT], 

100 ) -> None: 

101 self._owning_plugin = owning_plugin 

102 self._current_plugin = current_plugin 

103 self._metadata_type = metadata_type 

104 

105 @property 

106 def is_present(self) -> bool: 

107 return False 

108 

109 @property 

110 def can_read(self) -> bool: 

111 return self._owning_plugin == self._current_plugin 

112 

113 @property 

114 def can_write(self) -> bool: 

115 return False 

116 

117 @property 

118 def value(self) -> PMT | None: 

119 if self.can_read: 119 ↛ 121line 119 didn't jump to line 121 because the condition on line 119 was always true

120 return None 

121 raise DebputyMetadataAccessError( 

122 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

123 f" {self._owning_plugin} as the metadata has not been made" 

124 f" readable to the plugin {self._current_plugin}." 

125 ) 

126 

127 @value.setter 

128 def value(self, new_value: PMT) -> None: 

129 if self._is_owner: 

130 raise DebputyFSIsROError( 

131 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

132 ) 

133 raise DebputyMetadataAccessError( 

134 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

135 f" {self._owning_plugin} as the metadata has not been made" 

136 f" read-write to the plugin {self._current_plugin}." 

137 ) 

138 

139 @property 

140 def _is_owner(self) -> bool: 

141 return self._owning_plugin == self._current_plugin 

142 

143 

144@dataclasses.dataclass(slots=True) 

145class PathMetadataValue(Generic[PMT]): 

146 owning_plugin: str 

147 metadata_type: type[PMT] 

148 value: PMT | None = None 

149 

150 def can_read_value(self, current_plugin: str) -> bool: 

151 return self.owning_plugin == current_plugin 

152 

153 def can_write_value(self, current_plugin: str) -> bool: 

154 return self.owning_plugin == current_plugin 

155 

156 

157class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

158 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

159 

160 def __init__( 

161 self, 

162 owning_path: "VirtualPathBase", 

163 current_plugin: str, 

164 path_metadata_value: PathMetadataValue[PMT], 

165 ) -> None: 

166 self._owning_path = owning_path 

167 self._current_plugin = current_plugin 

168 self._path_metadata_value = path_metadata_value 

169 

170 @property 

171 def is_present(self) -> bool: 

172 if not self.can_read: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 return False 

174 return self._path_metadata_value.value is not None 

175 

176 @property 

177 def can_read(self) -> bool: 

178 return self._path_metadata_value.can_read_value(self._current_plugin) 

179 

180 @property 

181 def can_write(self) -> bool: 

182 if not self._path_metadata_value.can_write_value(self._current_plugin): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 return False 

184 owning_path = self._owning_path 

185 return owning_path.is_read_write and not owning_path.is_detached 

186 

187 @property 

188 def value(self) -> PMT | None: 

189 if self.can_read: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 return self._path_metadata_value.value 

191 raise DebputyMetadataAccessError( 

192 f"Cannot read the metadata {self._metadata_type_name} owned by" 

193 f" {self._owning_plugin} as the metadata has not been made" 

194 f" readable to the plugin {self._current_plugin}." 

195 ) 

196 

197 @value.setter 

198 def value(self, new_value: PMT) -> None: 

199 if not self.can_write: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 m = "set" if new_value is not None else "delete" 

201 raise DebputyMetadataAccessError( 

202 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

203 f" {self._owning_plugin} as the metadata has not been made" 

204 f" read-write to the plugin {self._current_plugin}." 

205 ) 

206 owning_path = self._owning_path 

207 if not owning_path.is_read_write: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise DebputyFSIsROError( 

209 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

210 ) 

211 if owning_path.is_detached: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise TypeError( 

213 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

214 ) 

215 self._path_metadata_value.value = new_value 

216 

217 @property 

218 def _is_owner(self) -> bool: 

219 return self._owning_plugin == self._current_plugin 

220 

221 @property 

222 def _owning_plugin(self) -> str: 

223 return self._path_metadata_value.owning_plugin 

224 

225 @property 

226 def _metadata_type_name(self) -> str: 

227 return self._path_metadata_value.metadata_type.__name__ 

228 

229 

230def _cp_a(source: str, dest: str) -> None: 

231 cmd = ["cp", "-a", source, dest] 

232 try: 

233 subprocess.check_call(cmd) 

234 except subprocess.CalledProcessError: 

235 full_command = escape_shell(*cmd) 

236 _error( 

237 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

238 f" above to understand what went wrong. The full command was: {full_command}" 

239 ) 

240 

241 

242def _split_path(path: str) -> tuple[bool, bool, list[str]]: 

243 must_be_dir = True if path.endswith("/") else False 

244 absolute = False 

245 if path.startswith("/"): 

246 absolute = True 

247 path = "." + path 

248 path_parts = path.rstrip("/").split("/") 

249 if must_be_dir: 

250 path_parts.append(".") 

251 return absolute, must_be_dir, path_parts 

252 

253 

254def _root(path: VP) -> "VirtualPathBase": 

255 current = path 

256 while not current.is_root_dir(): 

257 parent = current.parent_dir 

258 assert parent is not None # type hint 

259 current = parent 

260 assert isinstance(current, VirtualPathBase) 

261 return current 

262 

263 

264def _check_fs_path_is_file( 

265 fs_path: str, 

266 unlink_on_error: Optional["VirtualPath"] = None, 

267) -> None: 

268 had_issue = False 

269 try: 

270 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

271 st = os.lstat(fs_path) 

272 except FileNotFoundError: 

273 had_issue = True 

274 else: 

275 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 had_issue = True 

277 if not had_issue: 277 ↛ 280line 277 didn't jump to line 280 because the condition on line 277 was always true

278 return 

279 

280 if unlink_on_error: 

281 with suppress(FileNotFoundError): 

282 os.unlink(fs_path) 

283 raise TypeError( 

284 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

285 " linked to another file. The entry has been disconnected." 

286 ) 

287 

288 

289class CurrentPluginContextManager: 

290 __slots__ = ("_plugin_names",) 

291 

292 def __init__(self, initial_plugin_name: str) -> None: 

293 self._plugin_names = [initial_plugin_name] 

294 

295 @property 

296 def current_plugin_name(self) -> str: 

297 return self._plugin_names[-1] 

298 

299 @contextlib.contextmanager 

300 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

301 self._plugin_names.append(new_plugin_name) 

302 yield new_plugin_name 

303 self._plugin_names.pop() 

304 

305 

306class VirtualPathBase(VirtualPath, ABC): 

307 __slots__ = () 

308 

309 def stat(self) -> os.stat_result: 

310 # TODO: Remove 

311 """Attempt to do stat of the underlying path (if it exists) 

312 

313 *Avoid* using `stat()` whenever possible where a more specialized attribute exist. The 

314 `stat()` call returns the data from the file system and often, `debputy` does *not* track 

315 its state in the file system. As an example, if you want to know the file system mode of 

316 a path, please use the `mode` attribute instead. 

317 

318 This never follow symlinks (it behaves like `os.lstat`). It will raise an error 

319 if the path is not backed by a file system object (that is, `has_fs_path` is False). 

320 

321 :return: The stat result or an error. 

322 """ 

323 raise NotImplementedError() 

324 

325 @property 

326 def size(self) -> int: 

327 """Resolve the file size (`st_size`) 

328 

329 This may be using `stat()` and therefore `fs_path`. 

330 

331 :return: The size of the file in bytes 

332 """ 

333 return self.stat().st_size 

334 

335 def _orphan_safe_path(self) -> str: 

336 return self.path 

337 

338 def _rw_check(self) -> None: 

339 if not self.is_read_write: 

340 raise DebputyFSIsROError( 

341 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

342 " Debputy Virtual File system is R/O." 

343 ) 

344 

345 @property 

346 def is_detached(self) -> bool: 

347 raise NotImplementedError 

348 

349 def is_root_dir(self) -> bool: 

350 # The root directory is never detachable in the current setup 

351 return not self.is_detached and self.parent_dir is None 

352 

353 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

354 match, missing = self.attempt_lookup(path) 

355 if missing: 

356 return None 

357 return match 

358 

359 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]: 

360 if self.is_detached: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 raise ValueError( 

362 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

363 ) 

364 absolute, must_be_dir, path_parts = _split_path(path) 

365 current = _root(self) if absolute else self 

366 path_parts.reverse() 

367 link_expansions = set() 

368 while path_parts: 

369 dir_part = path_parts.pop() 

370 if dir_part == ".": 

371 continue 

372 if dir_part == "..": 

373 if current.is_root_dir(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 raise ValueError(f'The path "{path}" escapes the root dir') 

375 p = current.parent_dir 

376 assert p is not None # type hint 

377 current = cast("VirtualPathBase", p) 

378 continue 

379 try: 

380 current = cast("VirtualPathBase", current[dir_part]) 

381 except KeyError: 

382 path_parts.append(dir_part) 

383 path_parts.reverse() 

384 if must_be_dir: 

385 path_parts.pop() 

386 return current, path_parts 

387 if current.is_symlink and path_parts: 

388 if current.path in link_expansions: 

389 # This is our loop detection for now. It might have some false positives where you 

390 # could safely resolve the same symlink twice. However, given that this use-case is 

391 # basically non-existent in practice for packaging, we just stop here for now. 

392 raise SymlinkLoopError( 

393 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

394 " times. Currently, traversing the same symlink twice is considered" 

395 " a loop by `debputy` even if the path would eventually resolve." 

396 " Consider filing a feature request if you have a benign case that" 

397 " triggers this error." 

398 ) 

399 link_expansions.add(current.path) 

400 link_target = current.readlink() 

401 link_absolute, _, link_path_parts = _split_path(link_target) 

402 if link_absolute: 

403 current = _root(current) 

404 else: 

405 current = cast( 

406 "VirtualPathBase", assume_not_none(current.parent_dir) 

407 ) 

408 link_path_parts.reverse() 

409 path_parts.extend(link_path_parts) 

410 return current, [] 

411 

412 def mkdirs(self, path: str) -> "VirtualPath": 

413 current: VirtualPath 

414 current, missing_parts = self.attempt_lookup( 

415 f"{path}/" if not path.endswith("/") else path 

416 ) 

417 if not current.is_dir: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 raise ValueError( 

419 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

420 " a directory. However, that path exist AND is a not directory." 

421 ) 

422 for missing_part in missing_parts: 

423 assert missing_part not in (".", "..") 

424 current = current.mkdir(missing_part) 

425 return current 

426 

427 def prune_if_empty_dir(self) -> None: 

428 """Remove this and all (now) empty parent directories 

429 

430 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

431 

432 This operation may cause the path (and any of its parent directories) to become "detached" 

433 and therefore unsafe to use in further operations. 

434 """ 

435 self._rw_check() 

436 

437 if not self.is_dir: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

439 if any(self.iterdir()): 

440 return 

441 parent_dir = typing.cast("VirtualPathBase", assume_not_none(self.parent_dir)) 

442 

443 # Recursive does not matter; we already know the directory is empty. 

444 self.unlink() 

445 

446 # Note: The root dir must never be deleted. This works because when delegating it to the root 

447 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

448 # inline loop (rather than recursion), be sure to preserve this feature. 

449 parent_dir.prune_if_empty_dir() 

450 

451 def _current_plugin(self) -> str: 

452 if self.is_detached: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 raise TypeError("Cannot resolve the current plugin; path is detached") 

454 return cast("FSRootDir", _root(self))._current_plugin() 

455 

456 @overload 

457 def open_child( 457 ↛ exitline 457 didn't return from function 'open_child' because

458 self, 

459 name: str, 

460 mode: TextOpenMode = "r", 

461 buffering: int = -1, 

462 ) -> TextIO: ... 

463 

464 @overload 

465 def open_child( 465 ↛ exitline 465 didn't return from function 'open_child' because

466 self, 

467 name: str, 

468 mode: BinaryOpenMode, 

469 buffering: int = -1, 

470 ) -> BinaryIO: ... 

471 

472 @contextlib.contextmanager 

473 def open_child(self, name, mode="r", buffering=-1): 

474 """Open a child path of the current directory in a given mode. Usually used with a context manager 

475 

476 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

477 The following symbols are accepted with the same meaning as Python's open: 

478 * `r` 

479 * `w` 

480 * `x` 

481 * `a` 

482 * `+` 

483 * `b` 

484 * `t` 

485 

486 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

487 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

488 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

489 as-is. 

490 

491 :param name: The name of the child path to open. Must be a basename. 

492 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

493 examples. 

494 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

495 testing, the content may be purely in memory and use a BytesIO/StringIO 

496 (which does not accept that parameter, but then it is buffered in a different way) 

497 :return: The file handle. 

498 """ 

499 existing = self.get(name) 

500 if "r" in mode: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 if existing is None: 

502 raise ValueError( 

503 f"Path {self.path}/{name} does not exist and mode had `r`" 

504 ) 

505 if "+" not in mode: 

506 # open(byte_io="b" in mode) matches no typed signature overload. 

507 if "b" in mode: 

508 with existing.open(byte_io=True, buffering=buffering) as fd: 

509 yield fd 

510 else: 

511 with existing.open(byte_io=False, buffering=buffering) as fd: 

512 yield fd 

513 

514 encoding = None if "b" in mode else "utf-8" 

515 

516 if existing: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 if "x" in mode: 

518 raise ValueError( 

519 f"Path {existing.path} already exists and mode had `x`" 

520 ) 

521 with ( 

522 existing.replace_fs_path_content() as fs_path, 

523 open(fs_path, mode, encoding=encoding) as fd, 

524 ): 

525 yield fd 

526 else: 

527 assert "r" not in mode 

528 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

529 with ( 

530 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

531 open(new_path.fs_path, mode, encoding=encoding) as fd, 

532 ): 

533 yield fd 

534 

535 

536class FSPath(VirtualPathBase, ABC): 

537 __slots__ = ( 

538 "_basename", 

539 "_parent_dir", 

540 "_children", 

541 "_path_cache", 

542 "_parent_path_cache", 

543 "_last_known_parent_path", 

544 "_mode", 

545 "_owner", 

546 "_group", 

547 "_mtime", 

548 "_stat_cache", 

549 "_metadata", 

550 "__weakref__", 

551 ) 

552 

553 def __init__( 

554 self, 

555 basename: str, 

556 parent: Optional["FSPath"], 

557 children: dict[str, "FSPath"] | None = None, 

558 initial_mode: int | None = None, 

559 mtime: float | None = None, 

560 stat_cache: os.stat_result | None = None, 

561 ) -> None: 

562 self._basename = basename 

563 self._path_cache: str | None = None 

564 self._parent_path_cache: str | None = None 

565 self._children = children 

566 self._last_known_parent_path: str | None = None 

567 self._mode = initial_mode 

568 self._mtime = mtime 

569 self._stat_cache = stat_cache 

570 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {} 

571 self._owner = ROOT_DEFINITION 

572 self._group = ROOT_DEFINITION 

573 

574 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

575 # is_orphaned, which assumes self._parent_dir is an attribute. 

576 self._parent_dir: ReferenceType["FSPath"] | None = None 

577 if parent is not None: 

578 self.parent_dir = parent 

579 

580 def __repr__(self) -> str: 

581 return ( 

582 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

583 f" is_file={self.is_file}," 

584 f" is_dir={self.is_dir}," 

585 f" is_symlink={self.is_symlink}," 

586 f" has_fs_path={self.has_fs_path}," 

587 f" children_len={len(self._children) if self._children else 0})" 

588 ) 

589 

590 @property 

591 def name(self) -> str: 

592 return self._basename 

593 

594 @name.setter 

595 def name(self, new_name: str) -> None: 

596 self._rw_check() 

597 if new_name == self._basename: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true

598 return 

599 if self.is_detached: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 self._basename = new_name 

601 return 

602 self._rw_check() 

603 parent = self.parent_dir 

604 # This little parent_dir dance ensures the parent dir detects the rename properly 

605 self.parent_dir = None 

606 self._basename = new_name 

607 self.parent_dir = parent 

608 

609 def iterdir(self) -> Iterable["FSPath"]: 

610 if self._children is not None: 

611 yield from self._children.values() 

612 

613 def all_paths(self) -> Iterable["FSPath"]: 

614 yield self 

615 if not self.is_dir: 

616 return 

617 by_basename = BY_BASENAME 

618 stack = sorted(self.iterdir(), key=by_basename, reverse=True) 

619 while stack: 

620 current = stack.pop() 

621 yield current 

622 if current.is_dir and not current.is_detached: 

623 stack.extend(sorted(current.iterdir(), key=by_basename, reverse=True)) 

624 

625 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]: 

626 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

627 if not self.is_dir: 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true

628 yield self, [] 

629 return 

630 by_basename = BY_BASENAME 

631 stack = [self] 

632 while stack: 

633 current = stack.pop() 

634 children = sorted(current.iterdir(), key=by_basename) 

635 assert not children or current.is_dir 

636 yield current, children 

637 # Removing the directory counts as discarding the children. 

638 if not current.is_detached: 638 ↛ 632line 638 didn't jump to line 632 because the condition on line 638 was always true

639 stack.extend(reversed(children)) 

640 

641 def _orphan_safe_path(self) -> str: 

642 if not self.is_detached or self._last_known_parent_path is not None: 642 ↛ 644line 642 didn't jump to line 644 because the condition on line 642 was always true

643 return self.path 

644 return f"<orphaned>/{self.name}" 

645 

646 @property 

647 def is_detached(self) -> bool: 

648 parent = self._parent_dir 

649 if parent is None: 

650 return True 

651 resolved_parent = parent() 

652 if resolved_parent is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true

653 return True 

654 return resolved_parent.is_detached 

655 

656 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

657 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

658 # behavior to avoid surprises for now. 

659 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

660 # to using it) 

661 __iter__ = None 

662 

663 def __getitem__(self, key) -> "FSPath": 

664 if self._children is None: 

665 raise KeyError( 

666 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

667 ) 

668 if isinstance(key, FSPath): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 key = key.name 

670 return self._children[key] 

671 

672 def __delitem__(self, key) -> None: 

673 self._rw_check() 

674 children = self._children 

675 if children is None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 raise KeyError(key) 

677 del children[key] 

678 

679 def get(self, key: str) -> "Optional[FSPath]": 

680 try: 

681 return self[key] 

682 except KeyError: 

683 return None 

684 

685 def __contains__(self, item: object) -> bool: 

686 if isinstance(item, VirtualPath): 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 return item.parent_dir is self 

688 if not isinstance(item, str): 688 ↛ 689line 688 didn't jump to line 689 because the condition on line 688 was never true

689 return False 

690 m = self.get(item) 

691 return m is not None 

692 

693 def _add_child(self, child: "FSPath") -> None: 

694 self._rw_check() 

695 if not self.is_dir: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true

696 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

697 if self._children is None: 

698 self._children = {} 

699 

700 conflict_child = self.get(child.name) 

701 if conflict_child is not None: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true

702 conflict_child.unlink(recursive=True) 

703 self._children[child.name] = child 

704 

705 @property 

706 def tar_path(self) -> str: 

707 path = self.path 

708 if self.is_dir: 

709 return path + "/" 

710 return path 

711 

712 @property 

713 def path(self) -> str: 

714 parent_path = self.parent_dir_path 

715 if ( 

716 self._parent_path_cache is not None 

717 and self._parent_path_cache == parent_path 

718 ): 

719 return assume_not_none(self._path_cache) 

720 if parent_path is None: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 raise ReferenceError( 

722 f"The path {self.name} is detached! {self.__class__.__name__}" 

723 ) 

724 self._parent_path_cache = parent_path 

725 ret = os.path.join(parent_path, self.name) 

726 self._path_cache = ret 

727 return ret 

728 

729 @property 

730 def parent_dir(self) -> Optional["FSPath"]: 

731 p_ref = self._parent_dir 

732 p = p_ref() if p_ref is not None else None 

733 if p is None: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true

734 raise ReferenceError( 

735 f"The path {self.name} is detached! {self.__class__.__name__}" 

736 ) 

737 return p 

738 

739 @parent_dir.setter 

740 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

741 self._rw_check() 

742 if new_parent is not None: 

743 if not new_parent.is_dir: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true

744 raise ValueError( 

745 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

746 ) 

747 new_parent._rw_check() 

748 old_parent = None 

749 self._last_known_parent_path = None 

750 if not self.is_detached: 

751 old_parent = self.parent_dir 

752 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

753 del old_parent_children[self.name] 

754 if new_parent is not None: 

755 self._parent_dir = ref(new_parent) 

756 new_parent._add_child(self) 

757 else: 

758 if old_parent is not None and not old_parent.is_detached: 758 ↛ 760line 758 didn't jump to line 760 because the condition on line 758 was always true

759 self._last_known_parent_path = old_parent.path 

760 self._parent_dir = None 

761 self._parent_path_cache = None 

762 

763 @property 

764 def parent_dir_path(self) -> str | None: 

765 if self.is_detached: 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true

766 return self._last_known_parent_path 

767 return assume_not_none(self.parent_dir).path 

768 

769 def chown( 

770 self, 

771 owner: StaticFileSystemOwner | None, 

772 group: StaticFileSystemGroup | None, 

773 ) -> None: 

774 """Change the owner/group of this path 

775 

776 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

777 :param group: The desired group definition for this path. If None, then no change of group is performed. 

778 """ 

779 self._rw_check() 

780 

781 if owner is not None: 

782 self._owner = owner.ownership_definition 

783 if group is not None: 

784 self._group = group.ownership_definition 

785 

786 def stat(self) -> os.stat_result: 

787 st = self._stat_cache 

788 if st is None: 

789 st = self._uncached_stat() 

790 self._stat_cache = st 

791 return st 

792 

793 def _uncached_stat(self) -> os.stat_result: 

794 return os.lstat(self.fs_path) 

795 

796 @property 

797 def mode(self) -> int: 

798 current_mode = self._mode 

799 if current_mode is None: 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true

800 current_mode = stat.S_IMODE(self.stat().st_mode) 

801 self._mode = current_mode 

802 return current_mode 

803 

804 @mode.setter 

805 def mode(self, new_mode: int) -> None: 

806 self._rw_check() 

807 min_bit = 0o700 if self.is_dir else 0o400 

808 if (new_mode & min_bit) != min_bit: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true

809 omode = oct(new_mode)[2:] 

810 omin = oct(min_bit)[2:] 

811 raise ValueError( 

812 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

813 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

814 " There are no paths that do not need these requirements met and they can cause" 

815 " problems during build or on the final system." 

816 ) 

817 self._mode = new_mode 

818 

819 def _ensure_min_mode(self) -> None: 

820 min_bit = 0o700 if self.is_dir else 0o600 

821 if self.has_fs_path and (self.mode & 0o600) != 0o600: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true

822 try: 

823 fs_path = self.fs_path 

824 except TestPathWithNonExistentFSPathError: 

825 pass 

826 else: 

827 st = os.stat(fs_path) 

828 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

829 _debug_log( 

830 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

831 ) 

832 os.chmod(fs_path, new_fs_mode) 

833 self.mode |= min_bit 

834 

835 def _resolve_initial_mtime(self) -> float: 

836 return self.stat().st_mtime 

837 

838 @property 

839 def mtime(self) -> float: 

840 mtime = self._mtime 

841 if mtime is None: 

842 mtime = self._resolve_initial_mtime() 

843 self._mtime = mtime 

844 return mtime 

845 

846 @mtime.setter 

847 def mtime(self, new_mtime: float) -> None: 

848 self._rw_check() 

849 self._mtime = new_mtime 

850 

851 @property 

852 def tar_owner_info(self) -> tuple[str, int, str, int]: 

853 owner = self._owner 

854 group = self._group 

855 return ( 

856 owner.entity_name, 

857 owner.entity_id, 

858 group.entity_name, 

859 group.entity_id, 

860 ) 

861 

862 @property 

863 def _can_replace_inline(self) -> bool: 

864 return False 

865 

866 @contextlib.contextmanager 

867 def add_file( 

868 self, 

869 name: str, 

870 *, 

871 unlink_if_exists: bool = True, 

872 use_fs_path_mode: bool = False, 

873 mode: int = 0o0644, 

874 mtime: float | None = None, 

875 # Special-case parameters that are not exposed in the API 

876 fs_basename_matters: bool = False, 

877 subdir_key: str | None = None, 

878 ) -> Iterator["FSPath"]: 

879 if "/" in name or name in {".", ".."}: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true

880 raise ValueError(f'Invalid file name: "{name}"') 

881 if not self.is_dir: 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true

882 raise TypeError( 

883 f"Cannot create {self._orphan_safe_path()}/{name}:" 

884 f" {self._orphan_safe_path()} is not a directory" 

885 ) 

886 self._rw_check() 

887 existing = self.get(name) 

888 if existing is not None: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 if not unlink_if_exists: 

890 raise ValueError( 

891 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

892 f" and exist_ok was False" 

893 ) 

894 existing.unlink(recursive=False) 

895 

896 if fs_basename_matters and subdir_key is None: 896 ↛ 897line 896 didn't jump to line 897 because the condition on line 896 was never true

897 raise ValueError( 

898 "When fs_basename_matters is True, a subdir_key must be provided" 

899 ) 

900 

901 directory = generated_content_dir(subdir_key=subdir_key) 

902 

903 if fs_basename_matters: 903 ↛ 904line 903 didn't jump to line 904 because the condition on line 903 was never true

904 fs_path = os.path.join(directory, name) 

905 with open(fs_path, "xb") as _: 

906 # Ensure that the fs_path exists 

907 pass 

908 child = FSBackedFilePath( 

909 name, 

910 self, 

911 fs_path, 

912 replaceable_inline=True, 

913 mtime=mtime, 

914 ) 

915 yield child 

916 else: 

917 with tempfile.NamedTemporaryFile( 

918 dir=directory, suffix=f"__{name}", delete=False 

919 ) as fd: 

920 fs_path = fd.name 

921 child = FSBackedFilePath( 

922 name, 

923 self, 

924 fs_path, 

925 replaceable_inline=True, 

926 mtime=mtime, 

927 ) 

928 fd.close() 

929 yield child 

930 

931 if use_fs_path_mode: 931 ↛ 933line 931 didn't jump to line 933 because the condition on line 931 was never true

932 # Ensure the caller can see the current mode 

933 os.chmod(fs_path, mode) 

934 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

935 child._reset_caches() 

936 if not use_fs_path_mode: 936 ↛ exitline 936 didn't return from function 'add_file' because the condition on line 936 was always true

937 child.mode = mode 

938 

939 def insert_file_from_fs_path( 

940 self, 

941 name: str, 

942 fs_path: str, 

943 *, 

944 exist_ok: bool = True, 

945 use_fs_path_mode: bool = False, 

946 mode: int = 0o0644, 

947 require_copy_on_write: bool = True, 

948 follow_symlinks: bool = True, 

949 reference_path: VirtualPath | None = None, 

950 ) -> "FSPath": 

951 if "/" in name or name in {".", ".."}: 951 ↛ 952line 951 didn't jump to line 952 because the condition on line 951 was never true

952 raise ValueError(f'Invalid file name: "{name}"') 

953 if not self.is_dir: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 raise TypeError( 

955 f"Cannot create {self._orphan_safe_path()}/{name}:" 

956 f" {self._orphan_safe_path()} is not a directory" 

957 ) 

958 self._rw_check() 

959 if name in self and not exist_ok: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 raise ValueError( 

961 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

962 f" and exist_ok was False" 

963 ) 

964 new_fs_path = fs_path 

965 if follow_symlinks: 

966 if reference_path is not None: 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true

967 raise ValueError( 

968 "The reference_path cannot be used with follow_symlinks" 

969 ) 

970 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

971 

972 fmode: int | None = mode 

973 if use_fs_path_mode: 

974 fmode = None 

975 

976 st = None 

977 if reference_path is None: 

978 st = os.lstat(new_fs_path) 

979 if stat.S_ISDIR(st.st_mode): 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true

980 raise ValueError( 

981 f'The provided path "{fs_path}" is a directory. However, this' 

982 " method does not support directories" 

983 ) 

984 

985 if not stat.S_ISREG(st.st_mode): 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true

986 if follow_symlinks: 

987 raise ValueError( 

988 f"The resolved fs_path ({new_fs_path}) was not a file." 

989 ) 

990 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

991 return FSBackedFilePath( 

992 name, 

993 self, 

994 new_fs_path, 

995 initial_mode=fmode, 

996 stat_cache=st, 

997 replaceable_inline=not require_copy_on_write, 

998 reference_path=reference_path, 

999 ) 

1000 

1001 def add_symlink( 

1002 self, 

1003 link_name: str, 

1004 link_target: str, 

1005 *, 

1006 reference_path: VirtualPath | None = None, 

1007 ) -> "FSPath": 

1008 if "/" in link_name or link_name in {".", ".."}: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 raise ValueError( 

1010 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

1011 ) 

1012 if not self.is_dir: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true

1013 raise TypeError( 

1014 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

1015 f" {self._orphan_safe_path()} is not a directory" 

1016 ) 

1017 self._rw_check() 

1018 

1019 existing = self.get(link_name) 

1020 if existing: 1020 ↛ 1022line 1020 didn't jump to line 1022 because the condition on line 1020 was never true

1021 # Emulate ln -sf with attempts a non-recursive unlink first. 

1022 existing.unlink(recursive=False) 

1023 

1024 return SymlinkVirtualPath( 

1025 link_name, 

1026 self, 

1027 link_target, 

1028 reference_path=reference_path, 

1029 ) 

1030 

1031 def mkdir( 

1032 self, 

1033 name: str, 

1034 *, 

1035 reference_path: VirtualPath | None = None, 

1036 ) -> "FSPath": 

1037 if "/" in name or name in {".", ".."}: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true

1038 raise ValueError( 

1039 f'Invalid file name: "{name}" (it must be a valid basename)' 

1040 ) 

1041 if not self.is_dir: 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true

1042 raise TypeError( 

1043 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1044 f" {self._orphan_safe_path()} is not a directory" 

1045 ) 

1046 if reference_path is not None and not reference_path.is_dir: 1046 ↛ 1047line 1046 didn't jump to line 1047 because the condition on line 1046 was never true

1047 raise ValueError( 

1048 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1049 ) 

1050 self._rw_check() 

1051 

1052 existing = self.get(name) 

1053 if existing: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 raise ValueError(f"Path {existing.path} already exist") 

1055 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1056 

1057 def mkdirs(self, path: str) -> "FSPath": 

1058 return cast("FSPath", super().mkdirs(path)) 

1059 

1060 @property 

1061 def is_read_write(self) -> bool: 

1062 """When true, the file system entry may be mutated 

1063 

1064 :return: Whether file system mutations are permitted. 

1065 """ 

1066 if self.is_detached: 

1067 return True 

1068 return assume_not_none(self.parent_dir).is_read_write 

1069 

1070 def unlink(self, *, recursive: bool = False) -> None: 

1071 """Unlink a file or a directory 

1072 

1073 This operation will detach the path from the file system (causing "is_detached" to return True). 

1074 

1075 Note that the root directory cannot be deleted. 

1076 

1077 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1078 as well. When False, an error is raised if the path is a non-empty directory 

1079 """ 

1080 if self.is_detached: 1080 ↛ 1081line 1080 didn't jump to line 1081 because the condition on line 1080 was never true

1081 return 

1082 if not recursive and any(self.iterdir()): 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 raise ValueError( 

1084 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1085 ) 

1086 # The .parent_dir setter does a _rw_check() for us. 

1087 self.parent_dir = None 

1088 

1089 def _reset_caches(self) -> None: 

1090 self._mtime = None 

1091 self._stat_cache = None 

1092 

1093 def metadata( 

1094 self, 

1095 metadata_type: type[PMT], 

1096 *, 

1097 owning_plugin: str | None = None, 

1098 ) -> PathMetadataReference[PMT]: 

1099 current_plugin = self._current_plugin() 

1100 if owning_plugin is None: 1100 ↛ 1102line 1100 didn't jump to line 1102 because the condition on line 1100 was always true

1101 owning_plugin = current_plugin 

1102 metadata_key = (owning_plugin, metadata_type) 

1103 metadata_value = self._metadata.get(metadata_key) 

1104 if metadata_value is None: 

1105 if self.is_detached: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 raise TypeError( 

1107 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1108 ) 

1109 if not self.is_read_write: 

1110 return AlwaysEmptyReadOnlyMetadataReference( 

1111 owning_plugin, 

1112 current_plugin, 

1113 metadata_type, 

1114 ) 

1115 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1116 self._metadata[metadata_key] = metadata_value 

1117 return PathMetadataReferenceImplementation( 

1118 self, 

1119 current_plugin, 

1120 metadata_value, 

1121 ) 

1122 

1123 @contextlib.contextmanager 

1124 def replace_fs_path_content( 

1125 self, 

1126 *, 

1127 use_fs_path_mode: bool = False, 

1128 ) -> Iterator[str]: 

1129 if not self.is_file: 1129 ↛ 1130line 1129 didn't jump to line 1130 because the condition on line 1129 was never true

1130 raise TypeError( 

1131 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1132 ) 

1133 self._rw_check() 

1134 fs_path = self.fs_path 

1135 if not self._can_replace_inline: 1135 ↛ 1147line 1135 didn't jump to line 1147 because the condition on line 1135 was always true

1136 fs_path = self.fs_path 

1137 directory = generated_content_dir() 

1138 with tempfile.NamedTemporaryFile( 

1139 dir=directory, suffix=f"__{self.name}", delete=False 

1140 ) as new_path_fd: 

1141 new_path_fd.close() 

1142 _cp_a(fs_path, new_path_fd.name) 

1143 fs_path = new_path_fd.name 

1144 self._replaced_path(fs_path) 

1145 assert self.fs_path == fs_path 

1146 

1147 current_mtime = self._mtime 

1148 if current_mtime is not None: 

1149 os.utime(fs_path, (current_mtime, current_mtime)) 

1150 

1151 current_mode = self.mode 

1152 yield fs_path 

1153 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1154 if not use_fs_path_mode: 1154 ↛ 1156line 1154 didn't jump to line 1156 because the condition on line 1154 was always true

1155 os.chmod(fs_path, current_mode) 

1156 self._reset_caches() 

1157 

1158 def _replaced_path(self, new_fs_path: str) -> None: 

1159 raise NotImplementedError 

1160 

1161 

1162class VirtualFSPathBase(FSPath, ABC): 

1163 __slots__ = () 

1164 

1165 def __init__( 

1166 self, 

1167 basename: str, 

1168 parent: Optional["FSPath"], 

1169 children: dict[str, "FSPath"] | None = None, 

1170 initial_mode: int | None = None, 

1171 mtime: float | None = None, 

1172 stat_cache: os.stat_result | None = None, 

1173 ) -> None: 

1174 super().__init__( 

1175 basename, 

1176 parent, 

1177 children, 

1178 initial_mode=initial_mode, 

1179 mtime=mtime, 

1180 stat_cache=stat_cache, 

1181 ) 

1182 

1183 def _resolve_initial_mtime(self) -> float: 

1184 return time.time() 

1185 

1186 @property 

1187 def has_fs_path(self) -> bool: 

1188 return False 

1189 

1190 def stat(self) -> os.stat_result: 

1191 if not self.has_fs_path: 

1192 raise PureVirtualPathError( 

1193 "stat() is only applicable to paths backed by the file system. The path" 

1194 f" {self._orphan_safe_path()!r} is purely virtual" 

1195 ) 

1196 return super().stat() 

1197 

1198 @property 

1199 def fs_path(self) -> str: 

1200 if not self.has_fs_path: 

1201 raise PureVirtualPathError( 

1202 "fs_path is only applicable to paths backed by the file system. The path" 

1203 f" {self._orphan_safe_path()!r} is purely virtual" 

1204 ) 

1205 return self.fs_path 

1206 

1207 

1208class FSRootDir(FSPath): 

1209 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1210 

1211 def __init__(self, fs_path: str | None = None) -> None: 

1212 self._fs_path = fs_path 

1213 self._fs_read_write = True 

1214 super().__init__( 

1215 ".", 

1216 None, 

1217 children={}, 

1218 initial_mode=0o755, 

1219 ) 

1220 self._plugin_context = CurrentPluginContextManager("debputy") 

1221 

1222 @property 

1223 def is_detached(self) -> bool: 

1224 return False 

1225 

1226 def _orphan_safe_path(self) -> str: 

1227 return self.name 

1228 

1229 @property 

1230 def path(self) -> str: 

1231 return self.name 

1232 

1233 @property 

1234 def parent_dir(self) -> Optional["FSPath"]: 

1235 return None 

1236 

1237 @parent_dir.setter 

1238 def parent_dir(self, new_parent: FSPath | None) -> None: 

1239 if new_parent is not None: 

1240 raise ValueError("The root directory cannot become a non-root directory") 

1241 

1242 @property 

1243 def parent_dir_path(self) -> str | None: 

1244 return None 

1245 

1246 @property 

1247 def is_dir(self) -> bool: 

1248 return True 

1249 

1250 @property 

1251 def is_file(self) -> bool: 

1252 return False 

1253 

1254 @property 

1255 def is_symlink(self) -> bool: 

1256 return False 

1257 

1258 def readlink(self) -> str: 

1259 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1260 

1261 @property 

1262 def has_fs_path(self) -> bool: 

1263 return self._fs_path is not None 

1264 

1265 def stat(self) -> os.stat_result: 

1266 if not self.has_fs_path: 

1267 raise PureVirtualPathError( 

1268 "stat() is only applicable to paths backed by the file system. The path" 

1269 f" {self._orphan_safe_path()!r} is purely virtual" 

1270 ) 

1271 return os.stat(self.fs_path) 

1272 

1273 @property 

1274 def fs_path(self) -> str: 

1275 if not self.has_fs_path: 1275 ↛ 1276line 1275 didn't jump to line 1276 because the condition on line 1275 was never true

1276 raise PureVirtualPathError( 

1277 "fs_path is only applicable to paths backed by the file system. The path" 

1278 f" {self._orphan_safe_path()!r} is purely virtual" 

1279 ) 

1280 return assume_not_none(self._fs_path) 

1281 

1282 @property 

1283 def is_read_write(self) -> bool: 

1284 return self._fs_read_write 

1285 

1286 @is_read_write.setter 

1287 def is_read_write(self, new_value: bool) -> None: 

1288 self._fs_read_write = new_value 

1289 

1290 def prune_if_empty_dir(self) -> None: 

1291 # No-op for the root directory. There is never a case where you want to delete this directory 

1292 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1293 return 

1294 

1295 def unlink(self, *, recursive: bool = False) -> None: 

1296 # There is never a case where you want to delete this directory (and even if you could, 

1297 # debputy will need it for technical reasons, so the root dir stays) 

1298 raise TypeError("Cannot delete the root directory") 

1299 

1300 def _current_plugin(self) -> str: 

1301 return self._plugin_context.current_plugin_name 

1302 

1303 @contextlib.contextmanager 

1304 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1305 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1306 yield r 

1307 

1308 

1309class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1310 __slots__ = ("_reference_path",) 

1311 

1312 def __init__( 

1313 self, 

1314 basename: str, 

1315 parent: FSPath, 

1316 *, 

1317 default_mode: int, 

1318 reference_path: VirtualPath | None = None, 

1319 ) -> None: 

1320 super().__init__( 

1321 basename, 

1322 parent=parent, 

1323 initial_mode=reference_path.mode if reference_path else default_mode, 

1324 ) 

1325 self._reference_path = reference_path 

1326 

1327 @property 

1328 def has_fs_path(self) -> bool: 

1329 ref_path = self._reference_path 

1330 return ref_path is not None and ref_path.has_fs_path 

1331 

1332 def _resolve_initial_mtime(self) -> float: 

1333 ref_path = self._reference_path 

1334 if ref_path: 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was always true

1335 return ref_path.mtime 

1336 return super()._resolve_initial_mtime() 

1337 

1338 @property 

1339 def fs_path(self) -> str: 

1340 ref_path = self._reference_path 

1341 if ref_path is not None and ( 1341 ↛ 1345line 1341 didn't jump to line 1345 because the condition on line 1341 was always true

1342 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1343 ): 

1344 return ref_path.fs_path 

1345 return super().fs_path 

1346 

1347 def stat(self) -> os.stat_result: 

1348 ref_path = self._reference_path 

1349 if ref_path is not None and ( 

1350 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1351 ): 

1352 return typing.cast(VirtualPathBase, ref_path).stat() 

1353 return super().stat() 

1354 

1355 @overload 

1356 def open( 1356 ↛ exitline 1356 didn't return from function 'open' because

1357 self, 

1358 *, 

1359 byte_io: Literal[False] = False, 

1360 buffering: int = -1, 

1361 ) -> TextIO: ... 

1362 

1363 @overload 

1364 def open( 1364 ↛ exitline 1364 didn't return from function 'open' because

1365 self, 

1366 *, 

1367 byte_io: Literal[True], 

1368 buffering: Literal[0] = ..., 

1369 ) -> io.FileIO: ... 

1370 

1371 @overload 

1372 def open( 1372 ↛ exitline 1372 didn't return from function 'open' because

1373 self, 

1374 *, 

1375 byte_io: Literal[True], 

1376 buffering: int = -1, 

1377 ) -> io.BufferedReader: ... 

1378 

1379 def open(self, *, byte_io=False, buffering=-1): 

1380 reference_path = self._reference_path 

1381 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1382 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1383 return super().open(byte_io=byte_io, buffering=buffering) 

1384 

1385 

1386class VirtualDirectoryFSPath(VirtualPathWithReference): 

1387 __slots__ = ("_reference_path",) 

1388 

1389 def __init__( 

1390 self, 

1391 basename: str, 

1392 parent: FSPath, 

1393 *, 

1394 reference_path: VirtualPath | None = None, 

1395 ) -> None: 

1396 super().__init__( 

1397 basename, 

1398 parent, 

1399 reference_path=reference_path, 

1400 default_mode=0o755, 

1401 ) 

1402 self._reference_path = reference_path 

1403 assert reference_path is None or reference_path.is_dir 

1404 self._ensure_min_mode() 

1405 

1406 @property 

1407 def is_dir(self) -> bool: 

1408 return True 

1409 

1410 @property 

1411 def is_file(self) -> bool: 

1412 return False 

1413 

1414 @property 

1415 def is_symlink(self) -> bool: 

1416 return False 

1417 

1418 def readlink(self) -> str: 

1419 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1420 

1421 

1422class SymlinkVirtualPath(VirtualPathWithReference): 

1423 __slots__ = ("_link_target",) 

1424 

1425 def __init__( 

1426 self, 

1427 basename: str, 

1428 parent_dir: FSPath, 

1429 link_target: str, 

1430 *, 

1431 reference_path: VirtualPath | None = None, 

1432 ) -> None: 

1433 super().__init__( 

1434 basename, 

1435 parent=parent_dir, 

1436 default_mode=_SYMLINK_MODE, 

1437 reference_path=reference_path, 

1438 ) 

1439 self._link_target = link_target 

1440 

1441 @property 

1442 def is_dir(self) -> bool: 

1443 return False 

1444 

1445 @property 

1446 def is_file(self) -> bool: 

1447 return False 

1448 

1449 @property 

1450 def is_symlink(self) -> bool: 

1451 return True 

1452 

1453 def readlink(self) -> str: 

1454 return self._link_target 

1455 

1456 @property 

1457 def size(self) -> int: 

1458 return len(self.readlink()) 

1459 

1460 

1461class FSBackedFilePath(VirtualPathWithReference): 

1462 __slots__ = ("_fs_path", "_replaceable_inline") 

1463 

1464 def __init__( 

1465 self, 

1466 basename: str, 

1467 parent_dir: FSPath, 

1468 fs_path: str, 

1469 *, 

1470 replaceable_inline: bool = False, 

1471 initial_mode: int | None = None, 

1472 mtime: float | None = None, 

1473 stat_cache: os.stat_result | None = None, 

1474 reference_path: VirtualPath | None = None, 

1475 ) -> None: 

1476 super().__init__( 

1477 basename, 

1478 parent_dir, 

1479 default_mode=0o644, 

1480 reference_path=reference_path, 

1481 ) 

1482 self._fs_path = fs_path 

1483 self._replaceable_inline = replaceable_inline 

1484 if initial_mode is not None: 

1485 self.mode = initial_mode 

1486 if mtime is not None: 

1487 self._mtime = mtime 

1488 self._stat_cache = stat_cache 

1489 assert ( 

1490 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1491 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1492 self._ensure_min_mode() 

1493 

1494 @property 

1495 def is_dir(self) -> bool: 

1496 return False 

1497 

1498 @property 

1499 def is_file(self) -> bool: 

1500 return True 

1501 

1502 @property 

1503 def is_symlink(self) -> bool: 

1504 return False 

1505 

1506 def readlink(self) -> str: 

1507 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1508 

1509 @property 

1510 def has_fs_path(self) -> bool: 

1511 return True 

1512 

1513 @property 

1514 def fs_path(self) -> str: 

1515 return self._fs_path 

1516 

1517 @property 

1518 def _can_replace_inline(self) -> bool: 

1519 return self._replaceable_inline 

1520 

1521 def _replaced_path(self, new_fs_path: str) -> None: 

1522 self._fs_path = new_fs_path 

1523 self._reference_path = None 

1524 self._replaceable_inline = True 

1525 

1526 

1527_SYMLINK_MODE = 0o777 

1528 

1529 

1530class VirtualTestPath(FSPath): 

1531 __slots__ = ( 

1532 "_path_type", 

1533 "_has_fs_path", 

1534 "_fs_path", 

1535 "_link_target", 

1536 "_content", 

1537 "_materialized_content", 

1538 ) 

1539 

1540 def __init__( 

1541 self, 

1542 basename: str, 

1543 parent_dir: FSPath | None, 

1544 mode: int | None = None, 

1545 mtime: float | None = None, 

1546 is_dir: bool = False, 

1547 has_fs_path: bool | None = False, 

1548 fs_path: str | None = None, 

1549 link_target: str | None = None, 

1550 content: str | None = None, 

1551 materialized_content: str | None = None, 

1552 ) -> None: 

1553 if is_dir: 

1554 self._path_type = PathType.DIRECTORY 

1555 elif link_target is not None: 

1556 self._path_type = PathType.SYMLINK 

1557 if mode is not None and mode != _SYMLINK_MODE: 1557 ↛ 1558line 1557 didn't jump to line 1558 because the condition on line 1557 was never true

1558 raise ValueError( 

1559 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1560 ) 

1561 assert mode is None or mode == _SYMLINK_MODE 

1562 else: 

1563 self._path_type = PathType.FILE 

1564 

1565 if mode is not None: 

1566 initial_mode = mode 

1567 else: 

1568 initial_mode = 0o755 if is_dir else 0o644 

1569 

1570 self._link_target = link_target 

1571 if has_fs_path is None: 

1572 has_fs_path = bool(fs_path) 

1573 self._has_fs_path = has_fs_path 

1574 self._fs_path = fs_path 

1575 self._materialized_content = materialized_content 

1576 super().__init__( 

1577 basename, 

1578 parent=parent_dir, 

1579 initial_mode=initial_mode, 

1580 mtime=mtime, 

1581 ) 

1582 self._content = content 

1583 

1584 @property 

1585 def is_dir(self) -> bool: 

1586 return self._path_type == PathType.DIRECTORY 

1587 

1588 @property 

1589 def is_file(self) -> bool: 

1590 return self._path_type == PathType.FILE 

1591 

1592 @property 

1593 def is_symlink(self) -> bool: 

1594 return self._path_type == PathType.SYMLINK 

1595 

1596 def readlink(self) -> str: 

1597 if not self.is_symlink: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true

1598 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1599 link_target = self._link_target 

1600 assert link_target is not None 

1601 return link_target 

1602 

1603 def _resolve_initial_mtime(self) -> float: 

1604 return time.time() 

1605 

1606 @property 

1607 def has_fs_path(self) -> bool: 

1608 return self._has_fs_path 

1609 

1610 def stat(self) -> os.stat_result: 

1611 if self.has_fs_path: 

1612 path = self.fs_path 

1613 if path is None: 1613 ↛ 1614line 1613 didn't jump to line 1614 because the condition on line 1613 was never true

1614 raise PureVirtualPathError( 

1615 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1616 " cannot provide!" 

1617 ) 

1618 try: 

1619 return os.stat(path) 

1620 except FileNotFoundError as e: 

1621 raise PureVirtualPathError( 

1622 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1623 " cannot provide! (An fs_path was provided, but it did not exist)" 

1624 ) from e 

1625 

1626 raise PureVirtualPathError( 

1627 "stat() is only applicable to paths backed by the file system. The path" 

1628 f" {self._orphan_safe_path()!r} is purely virtual" 

1629 ) 

1630 

1631 @property 

1632 def size(self) -> int: 

1633 if self._content is not None: 

1634 return len(self._content.encode("utf-8")) 

1635 if self.is_symlink: 

1636 return len(self.readlink()) 

1637 if not self.has_fs_path or self.fs_path is None: 

1638 return 0 

1639 return self.stat().st_size 

1640 

1641 @property 

1642 def fs_path(self) -> str: 

1643 if self.has_fs_path: 

1644 if self._fs_path is None and self._materialized_content is not None: 

1645 with tempfile.NamedTemporaryFile( 

1646 mode="w+t", 

1647 encoding="utf-8", 

1648 suffix=f"__{self.name}", 

1649 delete=False, 

1650 ) as fd: 

1651 filepath = fd.name 

1652 fd.write(self._materialized_content) 

1653 self._fs_path = filepath 

1654 atexit.register(lambda: os.unlink(filepath)) 

1655 

1656 path = self._fs_path 

1657 if path is None: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true

1658 raise PureVirtualPathError( 

1659 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1660 " mock path cannot provide!" 

1661 ) 

1662 return path 

1663 raise PureVirtualPathError( 

1664 "fs_path is only applicable to paths backed by the file system. The path" 

1665 f" {self._orphan_safe_path()!r} is purely virtual" 

1666 ) 

1667 

1668 def replace_fs_path_content( 

1669 self, 

1670 *, 

1671 use_fs_path_mode: bool = False, 

1672 ) -> ContextManager[str]: 

1673 if self._content is not None: 1673 ↛ 1674line 1673 didn't jump to line 1674 because the condition on line 1673 was never true

1674 raise TypeError( 

1675 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1676 " created with `content` but for this method to work, the path should have been" 

1677 " created with `materialized_content`" 

1678 ) 

1679 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1680 

1681 @overload 

1682 def open_child( 1682 ↛ exitline 1682 didn't return from function 'open_child' because

1683 self, 

1684 name: str, 

1685 mode: TextOpenMode = "r", 

1686 buffering: int = -1, 

1687 ) -> TextIO: ... 

1688 

1689 @overload 

1690 def open_child( 1690 ↛ exitline 1690 didn't return from function 'open_child' because

1691 self, 

1692 name: str, 

1693 mode: BinaryOpenMode, 

1694 buffering: int = -1, 

1695 ) -> BinaryIO: ... 

1696 

1697 @contextlib.contextmanager 

1698 def open_child(self, name, mode="r", buffering=-1): 

1699 existing = self.get(name) 

1700 if existing or "r" in mode: 

1701 with super().open_child(name, mode, buffering=buffering) as fd: 

1702 yield fd 

1703 return 

1704 if "b" in mode: 

1705 fd = io.BytesIO(b"") 

1706 yield fd 

1707 content = fd.getvalue().decode("utf-8") 

1708 else: 

1709 fd = io.StringIO("") 

1710 yield fd 

1711 content = fd.getvalue() 

1712 VirtualTestPath( 

1713 name, 

1714 self, 

1715 mode=0o644, 

1716 content=content, 

1717 has_fs_path=True, 

1718 ) 

1719 

1720 @overload 

1721 def open( 1721 ↛ exitline 1721 didn't return from function 'open' because

1722 self, 

1723 *, 

1724 byte_io: Literal[False] = False, 

1725 buffering: int = -1, 

1726 ) -> TextIO: ... 

1727 

1728 @overload 

1729 def open( 1729 ↛ exitline 1729 didn't return from function 'open' because

1730 self, 

1731 *, 

1732 byte_io: Literal[True], 

1733 buffering: Literal[0] = ..., 

1734 ) -> io.FileIO: ... 

1735 

1736 @overload 

1737 def open( 1737 ↛ exitline 1737 didn't return from function 'open' because

1738 self, 

1739 *, 

1740 byte_io: Literal[True], 

1741 buffering: int = -1, 

1742 ) -> io.BufferedReader: ... 

1743 

1744 def open(self, *, byte_io=False, buffering=-1): 

1745 if self._content is None: 

1746 try: 

1747 return super().open(byte_io=byte_io, buffering=buffering) 

1748 except FileNotFoundError as e: 

1749 raise TestPathWithNonExistentFSPathError( 

1750 f"The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1751 " exist. This exception can only occur in the testsuite. Either have the" 

1752 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1753 " if that is too painful in general, have the code accept this error as a " 

1754 " test only-case and provide a default." 

1755 ) from e 

1756 

1757 if byte_io: 

1758 return io.BytesIO(self._content.encode("utf-8")) 

1759 return io.StringIO(self._content) 

1760 

1761 def _replaced_path(self, new_fs_path: str) -> None: 

1762 self._fs_path = new_fs_path 

1763 

1764 

1765class OSFSOverlayBase(VirtualPathBase, Generic[FSP]): 

1766 __slots__ = ( 

1767 "_path", 

1768 "_fs_path", 

1769 "_parent", 

1770 "__weakref__", 

1771 ) 

1772 

1773 def __init__( 

1774 self, 

1775 path: str, 

1776 fs_path: str, 

1777 parent: FSP | None, 

1778 ) -> None: 

1779 self._path: str = path 

1780 prefix = "/" if fs_path.startswith("/") else "" 

1781 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1782 self._parent: ReferenceType[FSP] | None = ( 

1783 ref(parent) if parent is not None else None 

1784 ) 

1785 

1786 @property 

1787 def name(self) -> str: 

1788 return os.path.basename(self._path) 

1789 

1790 @property 

1791 def path(self) -> str: 

1792 return self._path 

1793 

1794 @property 

1795 def parent_dir(self) -> Optional["FSP"]: 

1796 parent = self._parent 

1797 if parent is None: 

1798 return None 

1799 resolved = parent() 

1800 if resolved is None: 

1801 raise RuntimeError("Parent was garbage collected!") 

1802 return resolved 

1803 

1804 @property 

1805 def fs_path(self) -> str: 

1806 return self._fs_path 

1807 

1808 def stat(self) -> os.stat_result: 

1809 return os.lstat(self.fs_path) 

1810 

1811 @property 

1812 def is_dir(self) -> bool: 

1813 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1814 try: 

1815 return stat.S_ISDIR(self.stat().st_mode) 

1816 except FileNotFoundError: 

1817 return False 

1818 

1819 @property 

1820 def is_file(self) -> bool: 

1821 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1822 try: 

1823 return stat.S_ISREG(self.stat().st_mode) 

1824 except FileNotFoundError: 

1825 return False 

1826 

1827 @property 

1828 def is_symlink(self) -> bool: 

1829 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1830 try: 

1831 return stat.S_ISLNK(self.stat().st_mode) 

1832 except FileNotFoundError: 

1833 return False 

1834 

1835 @property 

1836 def has_fs_path(self) -> bool: 

1837 return True 

1838 

1839 @overload 

1840 def open( 1840 ↛ exitline 1840 didn't return from function 'open' because

1841 self, 

1842 *, 

1843 byte_io: Literal[False] = False, 

1844 buffering: int = -1, 

1845 ) -> TextIO: ... 

1846 

1847 @overload 

1848 def open( 1848 ↛ exitline 1848 didn't return from function 'open' because

1849 self, 

1850 *, 

1851 byte_io: Literal[True], 

1852 buffering: Literal[0] = ..., 

1853 ) -> io.FileIO: ... 

1854 

1855 @overload 

1856 def open( 1856 ↛ exitline 1856 didn't return from function 'open' because

1857 self, 

1858 *, 

1859 byte_io: Literal[True], 

1860 buffering: int = -1, 

1861 ) -> io.BufferedReader: ... 

1862 

1863 def open(self, *, byte_io=False, buffering=-1): 

1864 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1865 # case. 

1866 if not self.is_file and not self.is_symlink: 

1867 raise TypeError( 

1868 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1869 ) 

1870 

1871 if byte_io: 

1872 return open(self.fs_path, "rb", buffering=buffering) 

1873 return open(self.fs_path, encoding="utf-8", buffering=buffering) 

1874 

1875 def metadata( 

1876 self, 

1877 metadata_type: type[PMT], 

1878 *, 

1879 owning_plugin: str | None = None, 

1880 ) -> PathMetadataReference[PMT]: 

1881 current_plugin = self._current_plugin() 

1882 if owning_plugin is None: 

1883 owning_plugin = current_plugin 

1884 return AlwaysEmptyReadOnlyMetadataReference( 

1885 owning_plugin, 

1886 current_plugin, 

1887 metadata_type, 

1888 ) 

1889 

1890 def all_paths(self) -> Iterable["OSFSControlPath"]: 

1891 yield cast("OSFSControlPath", self) 

1892 if not self.is_dir: 

1893 return 

1894 stack = list(self.iterdir()) 

1895 stack.reverse() 

1896 while stack: 

1897 current = cast("OSFSControlPath", stack.pop()) 

1898 yield current 

1899 if current.is_dir: 

1900 stack.extend(reversed(list(current.iterdir()))) 

1901 

1902 def _resolve_children( 

1903 self, 

1904 new_child: Callable[[str, str, FSP], FSC], 

1905 ) -> Mapping[str, FSC]: 

1906 if not self.is_dir: 

1907 return {} 

1908 dir_path = self.path 

1909 dir_fs_path = self.fs_path 

1910 children = {} 

1911 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1912 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1913 child_fs_path = ( 

1914 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1915 ) 

1916 children[name] = new_child( 

1917 child_path, 

1918 child_fs_path, 

1919 cast("FSP", self), 

1920 ) 

1921 return children 

1922 

1923 

1924class OSFSROOverlay(OSFSOverlayBase["OSFSROOverlay"]): 

1925 __slots__ = ( 

1926 "_stat_cache", 

1927 "_readlink_cache", 

1928 "_children", 

1929 "_stat_failed_cache", 

1930 ) 

1931 

1932 def __init__( 

1933 self, 

1934 path: str, 

1935 fs_path: str, 

1936 parent: Optional["OSFSROOverlay"], 

1937 ) -> None: 

1938 super().__init__(path, fs_path, parent=parent) 

1939 self._stat_cache: os.stat_result | None = None 

1940 self._readlink_cache: str | None = None 

1941 self._stat_failed_cache = False 

1942 self._children: Mapping[str, OSFSROOverlay] | None = None 

1943 

1944 @classmethod 

1945 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay": 

1946 return OSFSROOverlay(path, fs_path, None) 

1947 

1948 def iterdir(self) -> Iterable["OSFSROOverlay"]: 

1949 if not self.is_dir: 

1950 return 

1951 if self._children is None: 

1952 self._ensure_children_are_resolved() 

1953 yield from assume_not_none(self._children).values() 

1954 

1955 def lookup(self, path: str) -> Optional["OSFSROOverlay"]: 

1956 if not self.is_dir: 

1957 return None 

1958 if self._children is None: 

1959 self._ensure_children_are_resolved() 

1960 

1961 absolute, _, path_parts = _split_path(path) 

1962 current = cast("OSFSROOverlay", _root(self)) if absolute else self 

1963 for no, dir_part in enumerate(path_parts): 

1964 if dir_part == ".": 

1965 continue 

1966 if dir_part == "..": 

1967 if current.is_root_dir(): 

1968 raise ValueError(f'The path "{path}" escapes the root dir') 

1969 p = current.parent_dir 

1970 assert p is not None # Type hint 

1971 current = cast("OSFSROOverlay", p) 

1972 continue 

1973 try: 

1974 current = cast("OSFSROOverlay", current[dir_part]) 

1975 except KeyError: 

1976 return None 

1977 return current 

1978 

1979 def _ensure_children_are_resolved(self) -> None: 

1980 if not self.is_dir or self._children: 

1981 return 

1982 self._children = self._resolve_children( 

1983 lambda n, fsp, p: OSFSROOverlay(n, fsp, p) 

1984 ) 

1985 

1986 @property 

1987 def is_detached(self) -> bool: 

1988 return False 

1989 

1990 def __getitem__(self, key) -> "VirtualPath": 

1991 if not self.is_dir: 1991 ↛ 1993line 1991 didn't jump to line 1993 because the condition on line 1991 was always true

1992 raise KeyError(key) 

1993 if self._children is None: 

1994 self._ensure_children_are_resolved() 

1995 if isinstance(key, FSPath): 

1996 key = key.name 

1997 return assume_not_none(self._children)[key] 

1998 

1999 def __delitem__(self, key) -> Never: 

2000 self._error_ro_fs() 

2001 

2002 @property 

2003 def is_read_write(self) -> bool: 

2004 return False 

2005 

2006 def _rw_check(self) -> Never: 

2007 self._error_ro_fs() 

2008 

2009 def _error_ro_fs(self) -> Never: 

2010 raise DebputyFSIsROError( 

2011 f'Attempt to write to "{self.path}" failed:' 

2012 " Debputy Virtual File system is R/O." 

2013 ) 

2014 

2015 def stat(self) -> os.stat_result: 

2016 if self._stat_failed_cache: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true

2017 raise FileNotFoundError( 

2018 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

2019 ) 

2020 

2021 if self._stat_cache is None: 2021 ↛ 2027line 2021 didn't jump to line 2027 because the condition on line 2021 was always true

2022 try: 

2023 self._stat_cache = os.lstat(self.fs_path) 

2024 except FileNotFoundError: 

2025 self._stat_failed_cache = True 

2026 raise 

2027 return self._stat_cache 

2028 

2029 @property 

2030 def mode(self) -> int: 

2031 return stat.S_IMODE(self.stat().st_mode) 

2032 

2033 @mode.setter 

2034 def mode(self, _unused: int) -> Never: 

2035 self._error_ro_fs() 

2036 

2037 @property 

2038 def mtime(self) -> float: 

2039 return self.stat().st_mtime 

2040 

2041 @mtime.setter 

2042 def mtime(self, new_mtime: float) -> Never: 

2043 self._error_ro_fs() 

2044 

2045 def readlink(self) -> str: 

2046 if not self.is_symlink: 

2047 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2048 if self._readlink_cache is None: 

2049 self._readlink_cache = os.readlink(self.fs_path) 

2050 return self._readlink_cache 

2051 

2052 def chown( 

2053 self, 

2054 owner: StaticFileSystemOwner | None, 

2055 group: StaticFileSystemGroup | None, 

2056 ) -> Never: 

2057 self._error_ro_fs() 

2058 

2059 def mkdir(self, name: str) -> Never: 

2060 self._error_ro_fs() 

2061 

2062 def add_file( 

2063 self, 

2064 name: str, 

2065 *, 

2066 unlink_if_exists: bool = True, 

2067 use_fs_path_mode: bool = False, 

2068 mode: int = 0o0644, 

2069 mtime: float | None = None, 

2070 ) -> Never: 

2071 self._error_ro_fs() 

2072 

2073 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2074 self._error_ro_fs() 

2075 

2076 def unlink(self, *, recursive: bool = False) -> Never: 

2077 self._error_ro_fs() 

2078 

2079 

2080class OSFSROOverlayRootDir(OSFSROOverlay): 

2081 __slots__ = ("_plugin_context",) 

2082 

2083 def __init__(self, path: str, fs_path: str) -> None: 

2084 super().__init__(path, fs_path, None) 

2085 self._plugin_context = CurrentPluginContextManager("debputy") 

2086 

2087 def _current_plugin(self) -> str: 

2088 return self._plugin_context.current_plugin_name 

2089 

2090 @contextlib.contextmanager 

2091 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2092 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2093 yield r 

2094 

2095 

2096class OSFSControlPath(OSFSOverlayBase["OSFSControlPath"]): 

2097 

2098 def iterdir(self) -> Iterable["OSFSControlPath"]: 

2099 if not self.is_dir: 

2100 return 

2101 yield from self._resolve_children( 

2102 lambda n, fsp, p: OSFSControlPath(n, fsp, p) 

2103 ).values() 

2104 

2105 def lookup(self, path: str) -> Optional["OSFSControlPath"]: 

2106 if not self.is_dir: 

2107 return None 

2108 

2109 absolute, _, path_parts = _split_path(path) 

2110 current = cast("OSFSControlPath", _root(self)) if absolute else self 

2111 for no, dir_part in enumerate(path_parts): 

2112 if dir_part == ".": 

2113 continue 

2114 if dir_part == "..": 

2115 if current.is_root_dir(): 

2116 raise ValueError(f'The path "{path}" escapes the root dir') 

2117 p = current.parent_dir 

2118 assert p is not None # type hint 

2119 current = cast("OSFSControlPath", p) 

2120 continue 

2121 try: 

2122 current = cast("OSFSControlPath", current[dir_part]) 

2123 except KeyError: 

2124 return None 

2125 return current 

2126 

2127 @property 

2128 def is_detached(self) -> bool: 

2129 try: 

2130 self.stat() 

2131 except FileNotFoundError: 

2132 return True 

2133 else: 

2134 return False 

2135 

2136 def __getitem__(self, key) -> "VirtualPath": 

2137 if not self.is_dir: 

2138 raise KeyError(key) 

2139 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p)) 

2140 if isinstance(key, FSPath): 

2141 key = key.name 

2142 return children[key] 

2143 

2144 def __delitem__(self, key) -> None: 

2145 self[key].unlink() 

2146 

2147 @property 

2148 def is_read_write(self) -> bool: 

2149 return True 

2150 

2151 @property 

2152 def mode(self) -> int: 

2153 return stat.S_IMODE(self.stat().st_mode) 

2154 

2155 @mode.setter 

2156 def mode(self, new_mode: int) -> None: 

2157 os.chmod(self.fs_path, new_mode) 

2158 

2159 @property 

2160 def mtime(self) -> float: 

2161 return self.stat().st_mtime 

2162 

2163 @mtime.setter 

2164 def mtime(self, new_mtime: float) -> None: 

2165 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2166 

2167 def readlink(self) -> Never: 

2168 if not self.is_symlink: 

2169 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2170 assert False 

2171 

2172 def chown( 

2173 self, 

2174 owner: StaticFileSystemOwner | None, 

2175 group: StaticFileSystemGroup | None, 

2176 ) -> None: 

2177 raise ValueError( 

2178 "No need to chown paths in the control.tar: They are always root:root" 

2179 ) 

2180 

2181 def mkdir(self, name: str) -> Never: 

2182 raise TypeError("The control.tar never contains subdirectories.") 

2183 

2184 @contextlib.contextmanager 

2185 def add_file( 

2186 self, 

2187 name: str, 

2188 *, 

2189 unlink_if_exists: bool = True, 

2190 use_fs_path_mode: bool = False, 

2191 mode: int = 0o0644, 

2192 mtime: float | None = None, 

2193 ) -> Iterator["VirtualPath"]: 

2194 if "/" in name or name in {".", ".."}: 

2195 raise ValueError(f'Invalid file name: "{name}"') 

2196 if not self.is_dir: 

2197 raise TypeError( 

2198 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2199 f" {self._orphan_safe_path()} is not a directory" 

2200 ) 

2201 self._rw_check() 

2202 existing = self.get(name) 

2203 if existing is not None: 

2204 if not unlink_if_exists: 

2205 raise ValueError( 

2206 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2207 f" and exist_ok was False" 

2208 ) 

2209 assert existing.is_file 

2210 

2211 fs_path = os.path.join(self.fs_path, name) 

2212 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2213 with open(fs_path, "wb") as fd: 

2214 # Ensure that the fs_path exists and default mode is reasonable 

2215 os.chmod(fd.fileno(), mode) 

2216 child = OSFSControlPath( 

2217 name, 

2218 fs_path, 

2219 self, 

2220 ) 

2221 yield child 

2222 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2223 child.mode = mode 

2224 

2225 @contextlib.contextmanager 

2226 def replace_fs_path_content( 

2227 self, 

2228 *, 

2229 use_fs_path_mode: bool = False, 

2230 ) -> Iterator[str]: 

2231 if not self.is_file: 

2232 raise TypeError( 

2233 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2234 ) 

2235 restore_mode = self.mode if use_fs_path_mode else None 

2236 yield self.fs_path 

2237 _check_fs_path_is_file(self.fs_path, self) 

2238 if restore_mode is not None: 

2239 self.mode = restore_mode 

2240 

2241 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2242 raise TypeError("The control.tar never contains symlinks.") 

2243 

2244 def unlink(self, *, recursive: bool = False) -> None: 

2245 if self._parent is None: 

2246 return 

2247 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2248 # matters and that `os.unlink` will be sufficient. 

2249 assert self.is_file 

2250 os.unlink(self.fs_path) 

2251 

2252 

2253class FSControlRootDir(OSFSControlPath): 

2254 

2255 @classmethod 

2256 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2257 return FSControlRootDir(".", fs_path, None) 

2258 

2259 def insert_file_from_fs_path( 

2260 self, 

2261 name: str, 

2262 fs_path: str, 

2263 *, 

2264 exist_ok: bool = True, 

2265 use_fs_path_mode: bool = False, 

2266 mode: int = 0o0644, 

2267 # Ignored, but accepted for compat with FSPath's variant of this function. 

2268 # - This is used by install_or_generate_conffiles. 

2269 reference_path: VirtualPath | None = None, # noqa 

2270 ) -> "OSFSControlPath": 

2271 if "/" in name or name in {".", ".."}: 

2272 raise ValueError(f'Invalid file name: "{name}"') 

2273 if not self.is_dir: 

2274 raise TypeError( 

2275 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2276 f" {self._orphan_safe_path()} is not a directory" 

2277 ) 

2278 self._rw_check() 

2279 if name in self and not exist_ok: 

2280 raise ValueError( 

2281 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2282 f" and exist_ok was False" 

2283 ) 

2284 

2285 target_path = os.path.join(self.fs_path, name) 

2286 if use_fs_path_mode: 

2287 shutil.copymode( 

2288 fs_path, 

2289 target_path, 

2290 follow_symlinks=True, 

2291 ) 

2292 else: 

2293 shutil.copyfile( 

2294 fs_path, 

2295 target_path, 

2296 follow_symlinks=True, 

2297 ) 

2298 os.chmod(target_path, mode) 

2299 return cast("OSFSControlPath", self[name]) 

2300 

2301 

2302def as_path_def(pd: str | PathDef) -> PathDef: 

2303 return PathDef(pd) if isinstance(pd, str) else pd 

2304 

2305 

2306def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]: 

2307 yield from (as_path_def(p) for p in paths) 

2308 

2309 

2310def build_virtual_fs( 

2311 paths: Iterable[str | PathDef], 

2312 read_write_fs: bool = False, 

2313) -> "FSPath": 

2314 root_dir: FSRootDir | None = None 

2315 directories: dict[str, FSPath] = {} 

2316 non_directories = set() 

2317 

2318 def _ensure_parent_dirs(p: str) -> None: 

2319 current = p.rstrip("/") 

2320 missing_dirs = [] 

2321 while True: 

2322 current = os.path.dirname(current) 

2323 if current in directories: 

2324 break 

2325 if current in non_directories: 2325 ↛ 2326line 2325 didn't jump to line 2326 because the condition on line 2325 was never true

2326 raise ValueError( 

2327 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2328 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2329 ) 

2330 missing_dirs.append(current) 

2331 for dir_path in reversed(missing_dirs): 

2332 parent_dir = directories[os.path.dirname(dir_path)] 

2333 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2334 directories[dir_path] = d 

2335 

2336 for path_def in as_path_defs(paths): 

2337 path = path_def.path_name 

2338 if path in directories or path in non_directories: 2338 ↛ 2339line 2338 didn't jump to line 2339 because the condition on line 2338 was never true

2339 raise ValueError( 

2340 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2341 ' "correct order" (ensure directories occur before their children)' 

2342 ) 

2343 if root_dir is None: 

2344 root_fs_path = None 

2345 if path in (".", "./", "/"): 

2346 root_fs_path = path_def.fs_path 

2347 root_dir = FSRootDir(fs_path=root_fs_path) 

2348 directories["."] = root_dir 

2349 

2350 if path not in (".", "./", "/") and not path.startswith("./"): 

2351 path = "./" + path 

2352 if path not in (".", "./", "/"): 

2353 _ensure_parent_dirs(path) 

2354 if path in (".", "./"): 

2355 assert "." in directories 

2356 continue 

2357 is_dir = False 

2358 if path.endswith("/"): 

2359 path = path[:-1] 

2360 is_dir = True 

2361 directory = directories[os.path.dirname(path)] 

2362 assert not is_dir or not bool( 

2363 path_def.link_target 

2364 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2365 fs_path = VirtualTestPath( 

2366 os.path.basename(path), 

2367 directory, 

2368 is_dir=is_dir, 

2369 mode=path_def.mode, 

2370 mtime=path_def.mtime, 

2371 has_fs_path=path_def.has_fs_path, 

2372 fs_path=path_def.fs_path, 

2373 link_target=path_def.link_target, 

2374 content=path_def.content, 

2375 materialized_content=path_def.materialized_content, 

2376 ) 

2377 assert not fs_path.is_detached 

2378 if fs_path.is_dir: 

2379 directories[fs_path.path] = fs_path 

2380 else: 

2381 non_directories.add(fs_path.path) 

2382 

2383 if root_dir is None: 

2384 root_dir = FSRootDir() 

2385 

2386 root_dir.is_read_write = read_write_fs 

2387 return root_dir