Coverage for src/debputy/filesystem_scan.py: 70%

1279 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-03-24 16:38 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import stat 

9import subprocess 

10import tempfile 

11import time 

12from abc import ABC 

13from contextlib import suppress 

14from typing import ( 

15 List, 

16 Iterable, 

17 Dict, 

18 Optional, 

19 Tuple, 

20 Union, 

21 Iterator, 

22 Mapping, 

23 cast, 

24 Any, 

25 ContextManager, 

26 TextIO, 

27 BinaryIO, 

28 NoReturn, 

29 Type, 

30 Generic, 

31 Callable, 

32 TypeVar, 

33 overload, 

34 Literal, 

35) 

36from weakref import ref, ReferenceType 

37 

38from debputy.exceptions import ( 

39 PureVirtualPathError, 

40 DebputyFSIsROError, 

41 DebputyMetadataAccessError, 

42 TestPathWithNonExistentFSPathError, 

43 SymlinkLoopError, 

44) 

45from debputy.intermediate_manifest import PathType 

46from debputy.manifest_parser.base_types import ( 

47 ROOT_DEFINITION, 

48 StaticFileSystemOwner, 

49 StaticFileSystemGroup, 

50) 

51from debputy.plugin.api.spec import ( 

52 VirtualPath, 

53 PathDef, 

54 PathMetadataReference, 

55 PMT, 

56) 

57from debputy.types import VP 

58from debputy.util import ( 

59 generated_content_dir, 

60 _error, 

61 escape_shell, 

62 assume_not_none, 

63 _normalize_path, 

64 _debug_log, 

65) 

66 

67BY_BASENAME = operator.attrgetter("name") 

68 

69FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

70FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

71 

72 

73BinaryOpenMode = Literal[ 

74 "rb", 

75 "r+b", 

76 "wb", 

77 "w+b", 

78 "xb", 

79 "ab", 

80] 

81TextOpenMode = Literal[ 

82 "r", 

83 "r+", 

84 "rt", 

85 "r+t", 

86 "w", 

87 "w+", 

88 "wt", 

89 "w+t", 

90 "x", 

91 "xt", 

92 "a", 

93 "at", 

94] 

95OpenMode = Union[BinaryOpenMode, TextOpenMode] 

96 

97 

98class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

99 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

100 

101 def __init__( 

102 self, 

103 owning_plugin: str, 

104 current_plugin: str, 

105 metadata_type: Type[PMT], 

106 ) -> None: 

107 self._owning_plugin = owning_plugin 

108 self._current_plugin = current_plugin 

109 self._metadata_type = metadata_type 

110 

111 @property 

112 def is_present(self) -> bool: 

113 return False 

114 

115 @property 

116 def can_read(self) -> bool: 

117 return self._owning_plugin == self._current_plugin 

118 

119 @property 

120 def can_write(self) -> bool: 

121 return False 

122 

123 @property 

124 def value(self) -> Optional[PMT]: 

125 if self.can_read: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true

126 return None 

127 raise DebputyMetadataAccessError( 

128 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

129 f" {self._owning_plugin} as the metadata has not been made" 

130 f" readable to the plugin {self._current_plugin}." 

131 ) 

132 

133 @value.setter 

134 def value(self, new_value: PMT) -> None: 

135 if self._is_owner: 

136 raise DebputyFSIsROError( 

137 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

138 ) 

139 raise DebputyMetadataAccessError( 

140 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

141 f" {self._owning_plugin} as the metadata has not been made" 

142 f" read-write to the plugin {self._current_plugin}." 

143 ) 

144 

145 @property 

146 def _is_owner(self) -> bool: 

147 return self._owning_plugin == self._current_plugin 

148 

149 

150@dataclasses.dataclass(slots=True) 

151class PathMetadataValue(Generic[PMT]): 

152 owning_plugin: str 

153 metadata_type: Type[PMT] 

154 value: Optional[PMT] = None 

155 

156 def can_read_value(self, current_plugin: str) -> bool: 

157 return self.owning_plugin == current_plugin 

158 

159 def can_write_value(self, current_plugin: str) -> bool: 

160 return self.owning_plugin == current_plugin 

161 

162 

163class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

164 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

165 

166 def __init__( 

167 self, 

168 owning_path: VirtualPath, 

169 current_plugin: str, 

170 path_metadata_value: PathMetadataValue[PMT], 

171 ) -> None: 

172 self._owning_path = owning_path 

173 self._current_plugin = current_plugin 

174 self._path_metadata_value = path_metadata_value 

175 

176 @property 

177 def is_present(self) -> bool: 

178 if not self.can_read: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 return False 

180 return self._path_metadata_value.value is not None 

181 

182 @property 

183 def can_read(self) -> bool: 

184 return self._path_metadata_value.can_read_value(self._current_plugin) 

185 

186 @property 

187 def can_write(self) -> bool: 

188 if not self._path_metadata_value.can_write_value(self._current_plugin): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 return False 

190 owning_path = self._owning_path 

191 return owning_path.is_read_write and not owning_path.is_detached 

192 

193 @property 

194 def value(self) -> Optional[PMT]: 

195 if self.can_read: 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true

196 return self._path_metadata_value.value 

197 raise DebputyMetadataAccessError( 

198 f"Cannot read the metadata {self._metadata_type_name} owned by" 

199 f" {self._owning_plugin} as the metadata has not been made" 

200 f" readable to the plugin {self._current_plugin}." 

201 ) 

202 

203 @value.setter 

204 def value(self, new_value: PMT) -> None: 

205 if not self.can_write: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 m = "set" if new_value is not None else "delete" 

207 raise DebputyMetadataAccessError( 

208 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

209 f" {self._owning_plugin} as the metadata has not been made" 

210 f" read-write to the plugin {self._current_plugin}." 

211 ) 

212 owning_path = self._owning_path 

213 if not owning_path.is_read_write: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 raise DebputyFSIsROError( 

215 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

216 ) 

217 if owning_path.is_detached: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 raise TypeError( 

219 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

220 ) 

221 self._path_metadata_value.value = new_value 

222 

223 @property 

224 def _is_owner(self) -> bool: 

225 return self._owning_plugin == self._current_plugin 

226 

227 @property 

228 def _owning_plugin(self) -> str: 

229 return self._path_metadata_value.owning_plugin 

230 

231 @property 

232 def _metadata_type_name(self) -> str: 

233 return self._path_metadata_value.metadata_type.__name__ 

234 

235 

236def _cp_a(source: str, dest: str) -> None: 

237 cmd = ["cp", "-a", source, dest] 

238 try: 

239 subprocess.check_call(cmd) 

240 except subprocess.CalledProcessError: 

241 full_command = escape_shell(*cmd) 

242 _error( 

243 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

244 f" above to understand what went wrong. The full command was: {full_command}" 

245 ) 

246 

247 

248def _split_path(path: str) -> Tuple[bool, bool, List[str]]: 

249 must_be_dir = True if path.endswith("/") else False 

250 absolute = False 

251 if path.startswith("/"): 

252 absolute = True 

253 path = "." + path 

254 path_parts = path.rstrip("/").split("/") 

255 if must_be_dir: 

256 path_parts.append(".") 

257 return absolute, must_be_dir, path_parts 

258 

259 

260def _root(path: VP) -> VP: 

261 current = path 

262 while True: 

263 parent = current.parent_dir 

264 if parent is None: 

265 return current 

266 current = parent 

267 

268 

269def _check_fs_path_is_file( 

270 fs_path: str, 

271 unlink_on_error: Optional["VirtualPath"] = None, 

272) -> None: 

273 had_issue = False 

274 try: 

275 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

276 st = os.lstat(fs_path) 

277 except FileNotFoundError: 

278 had_issue = True 

279 else: 

280 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 had_issue = True 

282 if not had_issue: 282 ↛ 285line 282 didn't jump to line 285 because the condition on line 282 was always true

283 return 

284 

285 if unlink_on_error: 

286 with suppress(FileNotFoundError): 

287 os.unlink(fs_path) 

288 raise TypeError( 

289 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

290 " linked to another file. The entry has been disconnected." 

291 ) 

292 

293 

294class CurrentPluginContextManager: 

295 __slots__ = ("_plugin_names",) 

296 

297 def __init__(self, initial_plugin_name: str) -> None: 

298 self._plugin_names = [initial_plugin_name] 

299 

300 @property 

301 def current_plugin_name(self) -> str: 

302 return self._plugin_names[-1] 

303 

304 @contextlib.contextmanager 

305 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

306 self._plugin_names.append(new_plugin_name) 

307 yield new_plugin_name 

308 self._plugin_names.pop() 

309 

310 

311class VirtualPathBase(VirtualPath, ABC): 

312 __slots__ = () 

313 

314 def _orphan_safe_path(self) -> str: 

315 return self.path 

316 

317 def _rw_check(self) -> None: 

318 if not self.is_read_write: 

319 raise DebputyFSIsROError( 

320 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

321 " Debputy Virtual File system is R/O." 

322 ) 

323 

324 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

325 match, missing = self.attempt_lookup(path) 

326 if missing: 

327 return None 

328 return match 

329 

330 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]: 

331 if self.is_detached: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValueError( 

333 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

334 ) 

335 absolute, must_be_dir, path_parts = _split_path(path) 

336 current = _root(self) if absolute else self 

337 path_parts.reverse() 

338 link_expansions = set() 

339 while path_parts: 

340 dir_part = path_parts.pop() 

341 if dir_part == ".": 

342 continue 

343 if dir_part == "..": 

344 p = current.parent_dir 

345 if p is None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 raise ValueError(f'The path "{path}" escapes the root dir') 

347 current = p 

348 continue 

349 try: 

350 current = current[dir_part] 

351 except KeyError: 

352 path_parts.append(dir_part) 

353 path_parts.reverse() 

354 if must_be_dir: 

355 path_parts.pop() 

356 return current, path_parts 

357 if current.is_symlink and path_parts: 

358 if current.path in link_expansions: 

359 # This is our loop detection for now. It might have some false positives where you 

360 # could safely resolve the same symlink twice. However, given that this use-case is 

361 # basically non-existent in practice for packaging, we just stop here for now. 

362 raise SymlinkLoopError( 

363 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

364 " times. Currently, traversing the same symlink twice is considered" 

365 " a loop by `debputy` even if the path would eventually resolve." 

366 " Consider filing a feature request if you have a benign case that" 

367 " triggers this error." 

368 ) 

369 link_expansions.add(current.path) 

370 link_target = current.readlink() 

371 link_absolute, _, link_path_parts = _split_path(link_target) 

372 if link_absolute: 

373 current = _root(current) 

374 else: 

375 current = assume_not_none(current.parent_dir) 

376 link_path_parts.reverse() 

377 path_parts.extend(link_path_parts) 

378 return current, [] 

379 

380 def mkdirs(self, path: str) -> "VirtualPath": 

381 current: VirtualPath 

382 current, missing_parts = self.attempt_lookup( 

383 f"{path}/" if not path.endswith("/") else path 

384 ) 

385 if not current.is_dir: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 raise ValueError( 

387 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

388 " a directory. However, that path exist AND is a not directory." 

389 ) 

390 for missing_part in missing_parts: 

391 assert missing_part not in (".", "..") 

392 current = current.mkdir(missing_part) 

393 return current 

394 

395 def prune_if_empty_dir(self) -> None: 

396 """Remove this and all (now) empty parent directories 

397 

398 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

399 

400 This operation may cause the path (and any of its parent directories) to become "detached" 

401 and therefore unsafe to use in further operations. 

402 """ 

403 self._rw_check() 

404 

405 if not self.is_dir: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

407 if any(self.iterdir): 

408 return 

409 parent_dir = assume_not_none(self.parent_dir) 

410 

411 # Recursive does not matter; we already know the directory is empty. 

412 self.unlink() 

413 

414 # Note: The root dir must never be deleted. This works because when delegating it to the root 

415 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

416 # inline loop (rather than recursion), be sure to preserve this feature. 

417 parent_dir.prune_if_empty_dir() 

418 

419 def _current_plugin(self) -> str: 

420 if self.is_detached: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 raise TypeError("Cannot resolve the current plugin; path is detached") 

422 current = self 

423 while True: 

424 next_parent = current.parent_dir 

425 if next_parent is None: 

426 break 

427 current = next_parent 

428 assert current is not None 

429 return cast("FSRootDir", current)._current_plugin() 

430 

431 @overload 

432 def open_child( 432 ↛ exitline 432 didn't jump to the function exit

433 self, 

434 name: str, 

435 mode: TextOpenMode = ..., 

436 buffering: int = -1, 

437 ) -> TextIO: ... 

438 

439 @overload 

440 def open_child( 440 ↛ exitline 440 didn't jump to the function exit

441 self, 

442 name: str, 

443 mode: BinaryOpenMode = ..., 

444 buffering: int = -1, 

445 ) -> BinaryIO: ... 

446 

447 @contextlib.contextmanager 

448 def open_child( 

449 self, 

450 name: str, 

451 mode: OpenMode = "r", 

452 buffering: int = -1, 

453 ) -> Union[TextIO, BinaryIO]: 

454 """Open a child path of the current directory in a given mode. Usually used with a context manager 

455 

456 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

457 The following symbols are accepted with the same meaning as Python's open: 

458 * `r` 

459 * `w` 

460 * `x` 

461 * `a` 

462 * `+` 

463 * `b` 

464 * `t` 

465 

466 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

467 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

468 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

469 as-is. 

470 

471 :param name: The name of the child path to open. Must be a basename. 

472 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

473 examples. 

474 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

475 testing, the content may be purely in memory and use a BytesIO/StringIO 

476 (which does not accept that parameter, but then it is buffered in a different way) 

477 :return: The file handle. 

478 """ 

479 existing = self.get(name) 

480 if "r" in mode: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 if existing is None: 

482 raise ValueError( 

483 f"Path {self.path}/{name} does not exist and mode had `r`" 

484 ) 

485 if "+" not in mode: 

486 with existing.open(byte_io="b" in mode, buffering=buffering) as fd: 

487 yield fd 

488 

489 encoding = None if "b" in mode else "utf-8" 

490 

491 if existing: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 if "x" in mode: 

493 raise ValueError( 

494 f"Path {existing.path} already exists and mode had `x`" 

495 ) 

496 with ( 

497 existing.replace_fs_path_content() as fs_path, 

498 open(fs_path, mode, encoding=encoding) as fd, 

499 ): 

500 yield fd 

501 else: 

502 assert "r" not in mode 

503 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

504 with ( 

505 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

506 open(new_path.fs_path, mode, encoding=encoding) as fd, 

507 ): 

508 yield fd 

509 

510 

511class FSPath(VirtualPathBase, ABC): 

512 __slots__ = ( 

513 "_basename", 

514 "_parent_dir", 

515 "_children", 

516 "_path_cache", 

517 "_parent_path_cache", 

518 "_last_known_parent_path", 

519 "_mode", 

520 "_owner", 

521 "_group", 

522 "_mtime", 

523 "_stat_cache", 

524 "_metadata", 

525 "__weakref__", 

526 ) 

527 

528 def __init__( 

529 self, 

530 basename: str, 

531 parent: Optional["FSPath"], 

532 children: Optional[Dict[str, "FSPath"]] = None, 

533 initial_mode: Optional[int] = None, 

534 mtime: Optional[float] = None, 

535 stat_cache: Optional[os.stat_result] = None, 

536 ) -> None: 

537 self._basename = basename 

538 self._path_cache: Optional[str] = None 

539 self._parent_path_cache: Optional[str] = None 

540 self._children = children 

541 self._last_known_parent_path: Optional[str] = None 

542 self._mode = initial_mode 

543 self._mtime = mtime 

544 self._stat_cache = stat_cache 

545 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {} 

546 self._owner = ROOT_DEFINITION 

547 self._group = ROOT_DEFINITION 

548 

549 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

550 # is_orphaned, which assumes self._parent_dir is an attribute. 

551 self._parent_dir: Optional[ReferenceType["FSPath"]] = None 

552 if parent is not None: 

553 self.parent_dir = parent 

554 

555 def __repr__(self) -> str: 

556 return ( 

557 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

558 f" is_file={self.is_file}," 

559 f" is_dir={self.is_dir}," 

560 f" is_symlink={self.is_symlink}," 

561 f" has_fs_path={self.has_fs_path}," 

562 f" children_len={len(self._children) if self._children else 0})" 

563 ) 

564 

565 @property 

566 def name(self) -> str: 

567 return self._basename 

568 

569 @name.setter 

570 def name(self, new_name: str) -> None: 

571 self._rw_check() 

572 if new_name == self._basename: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 return 

574 if self.is_detached: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 self._basename = new_name 

576 return 

577 self._rw_check() 

578 parent = self.parent_dir 

579 # This little parent_dir dance ensures the parent dir detects the rename properly 

580 self.parent_dir = None 

581 self._basename = new_name 

582 self.parent_dir = parent 

583 

584 @property 

585 def iterdir(self) -> Iterable["FSPath"]: 

586 if self._children is not None: 

587 yield from self._children.values() 

588 

589 def all_paths(self) -> Iterable["FSPath"]: 

590 yield self 

591 if not self.is_dir: 

592 return 

593 by_basename = BY_BASENAME 

594 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

595 while stack: 

596 current = stack.pop() 

597 yield current 

598 if current.is_dir and not current.is_detached: 

599 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

600 

601 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]: 

602 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

603 if not self.is_dir: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 yield self, [] 

605 return 

606 by_basename = BY_BASENAME 

607 stack = [self] 

608 while stack: 

609 current = stack.pop() 

610 children = sorted(current.iterdir, key=by_basename) 

611 assert not children or current.is_dir 

612 yield current, children 

613 # Removing the directory counts as discarding the children. 

614 if not current.is_detached: 614 ↛ 608line 614 didn't jump to line 608 because the condition on line 614 was always true

615 stack.extend(reversed(children)) 

616 

617 def _orphan_safe_path(self) -> str: 

618 if not self.is_detached or self._last_known_parent_path is not None: 618 ↛ 620line 618 didn't jump to line 620 because the condition on line 618 was always true

619 return self.path 

620 return f"<orphaned>/{self.name}" 

621 

622 @property 

623 def is_detached(self) -> bool: 

624 parent = self._parent_dir 

625 if parent is None: 

626 return True 

627 resolved_parent = parent() 

628 if resolved_parent is None: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 return True 

630 return resolved_parent.is_detached 

631 

632 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

633 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

634 # behavior to avoid surprises for now. 

635 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

636 # to using it) 

637 __iter__ = None 

638 

639 def __getitem__(self, key) -> "FSPath": 

640 if self._children is None: 

641 raise KeyError( 

642 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

643 ) 

644 if isinstance(key, FSPath): 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 key = key.name 

646 return self._children[key] 

647 

648 def __delitem__(self, key) -> None: 

649 self._rw_check() 

650 children = self._children 

651 if children is None: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true

652 raise KeyError(key) 

653 del children[key] 

654 

655 def get(self, key: str) -> "Optional[FSPath]": 

656 try: 

657 return self[key] 

658 except KeyError: 

659 return None 

660 

661 def __contains__(self, item: object) -> bool: 

662 if isinstance(item, VirtualPath): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 return item.parent_dir is self 

664 if not isinstance(item, str): 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true

665 return False 

666 m = self.get(item) 

667 return m is not None 

668 

669 def _add_child(self, child: "FSPath") -> None: 

670 self._rw_check() 

671 if not self.is_dir: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

673 if self._children is None: 

674 self._children = {} 

675 

676 conflict_child = self.get(child.name) 

677 if conflict_child is not None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 conflict_child.unlink(recursive=True) 

679 self._children[child.name] = child 

680 

681 @property 

682 def tar_path(self) -> str: 

683 path = self.path 

684 if self.is_dir: 

685 return path + "/" 

686 return path 

687 

688 @property 

689 def path(self) -> str: 

690 parent_path = self.parent_dir_path 

691 if ( 

692 self._parent_path_cache is not None 

693 and self._parent_path_cache == parent_path 

694 ): 

695 return assume_not_none(self._path_cache) 

696 if parent_path is None: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 raise ReferenceError( 

698 f"The path {self.name} is detached! {self.__class__.__name__}" 

699 ) 

700 self._parent_path_cache = parent_path 

701 ret = os.path.join(parent_path, self.name) 

702 self._path_cache = ret 

703 return ret 

704 

705 @property 

706 def parent_dir(self) -> Optional["FSPath"]: 

707 p_ref = self._parent_dir 

708 p = p_ref() if p_ref is not None else None 

709 if p is None: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 raise ReferenceError( 

711 f"The path {self.name} is detached! {self.__class__.__name__}" 

712 ) 

713 return p 

714 

715 @parent_dir.setter 

716 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

717 self._rw_check() 

718 if new_parent is not None: 

719 if not new_parent.is_dir: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 raise ValueError( 

721 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

722 ) 

723 new_parent._rw_check() 

724 old_parent = None 

725 self._last_known_parent_path = None 

726 if not self.is_detached: 

727 old_parent = self.parent_dir 

728 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

729 del old_parent_children[self.name] 

730 if new_parent is not None: 

731 self._parent_dir = ref(new_parent) 

732 new_parent._add_child(self) 

733 else: 

734 if old_parent is not None and not old_parent.is_detached: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true

735 self._last_known_parent_path = old_parent.path 

736 self._parent_dir = None 

737 self._parent_path_cache = None 

738 

739 @property 

740 def parent_dir_path(self) -> Optional[str]: 

741 if self.is_detached: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true

742 return self._last_known_parent_path 

743 return assume_not_none(self.parent_dir).path 

744 

745 def chown( 

746 self, 

747 owner: Optional[StaticFileSystemOwner], 

748 group: Optional[StaticFileSystemGroup], 

749 ) -> None: 

750 """Change the owner/group of this path 

751 

752 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

753 :param group: The desired group definition for this path. If None, then no change of group is performed. 

754 """ 

755 self._rw_check() 

756 

757 if owner is not None: 

758 self._owner = owner.ownership_definition 

759 if group is not None: 

760 self._group = group.ownership_definition 

761 

762 def stat(self) -> os.stat_result: 

763 st = self._stat_cache 

764 if st is None: 

765 st = self._uncached_stat() 

766 self._stat_cache = st 

767 return st 

768 

769 def _uncached_stat(self) -> os.stat_result: 

770 return os.lstat(self.fs_path) 

771 

772 @property 

773 def mode(self) -> int: 

774 current_mode = self._mode 

775 if current_mode is None: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 current_mode = stat.S_IMODE(self.stat().st_mode) 

777 self._mode = current_mode 

778 return current_mode 

779 

780 @mode.setter 

781 def mode(self, new_mode: int) -> None: 

782 self._rw_check() 

783 min_bit = 0o700 if self.is_dir else 0o400 

784 if (new_mode & min_bit) != min_bit: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 omode = oct(new_mode)[2:] 

786 omin = oct(min_bit)[2:] 

787 raise ValueError( 

788 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

789 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

790 " There are no paths that do not need these requirements met and they can cause" 

791 " problems during build or on the final system." 

792 ) 

793 self._mode = new_mode 

794 

795 def _ensure_min_mode(self) -> None: 

796 min_bit = 0o700 if self.is_dir else 0o600 

797 if self.has_fs_path and (self.mode & 0o600) != 0o600: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 try: 

799 fs_path = self.fs_path 

800 except TestPathWithNonExistentFSPathError: 

801 pass 

802 else: 

803 st = os.stat(fs_path) 

804 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

805 _debug_log( 

806 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

807 ) 

808 os.chmod(fs_path, new_fs_mode) 

809 self.mode |= min_bit 

810 

811 @property 

812 def mtime(self) -> float: 

813 mtime = self._mtime 

814 if mtime is None: 

815 mtime = self.stat().st_mtime 

816 self._mtime = mtime 

817 return mtime 

818 

819 @mtime.setter 

820 def mtime(self, new_mtime: float) -> None: 

821 self._rw_check() 

822 self._mtime = new_mtime 

823 

824 @property 

825 def tar_owner_info(self) -> Tuple[str, int, str, int]: 

826 owner = self._owner 

827 group = self._group 

828 return ( 

829 owner.entity_name, 

830 owner.entity_id, 

831 group.entity_name, 

832 group.entity_id, 

833 ) 

834 

835 @property 

836 def _can_replace_inline(self) -> bool: 

837 return False 

838 

839 @contextlib.contextmanager 

840 def add_file( 

841 self, 

842 name: str, 

843 *, 

844 unlink_if_exists: bool = True, 

845 use_fs_path_mode: bool = False, 

846 mode: int = 0o0644, 

847 mtime: Optional[float] = None, 

848 # Special-case parameters that are not exposed in the API 

849 fs_basename_matters: bool = False, 

850 subdir_key: Optional[str] = None, 

851 ) -> Iterator["FSPath"]: 

852 if "/" in name or name in {".", ".."}: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 raise ValueError(f'Invalid file name: "{name}"') 

854 if not self.is_dir: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 raise TypeError( 

856 f"Cannot create {self._orphan_safe_path()}/{name}:" 

857 f" {self._orphan_safe_path()} is not a directory" 

858 ) 

859 self._rw_check() 

860 existing = self.get(name) 

861 if existing is not None: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true

862 if not unlink_if_exists: 

863 raise ValueError( 

864 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

865 f" and exist_ok was False" 

866 ) 

867 existing.unlink(recursive=False) 

868 

869 if fs_basename_matters and subdir_key is None: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 raise ValueError( 

871 "When fs_basename_matters is True, a subdir_key must be provided" 

872 ) 

873 

874 directory = generated_content_dir(subdir_key=subdir_key) 

875 

876 if fs_basename_matters: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 fs_path = os.path.join(directory, name) 

878 with open(fs_path, "xb") as _: 

879 # Ensure that the fs_path exists 

880 pass 

881 child = FSBackedFilePath( 

882 name, 

883 self, 

884 fs_path, 

885 replaceable_inline=True, 

886 mtime=mtime, 

887 ) 

888 yield child 

889 else: 

890 with tempfile.NamedTemporaryFile( 

891 dir=directory, suffix=f"__{name}", delete=False 

892 ) as fd: 

893 fs_path = fd.name 

894 child = FSBackedFilePath( 

895 name, 

896 self, 

897 fs_path, 

898 replaceable_inline=True, 

899 mtime=mtime, 

900 ) 

901 fd.close() 

902 yield child 

903 

904 if use_fs_path_mode: 904 ↛ 906line 904 didn't jump to line 906 because the condition on line 904 was never true

905 # Ensure the caller can see the current mode 

906 os.chmod(fs_path, mode) 

907 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

908 child._reset_caches() 

909 if not use_fs_path_mode: 909 ↛ exitline 909 didn't return from function 'add_file' because the condition on line 909 was always true

910 child.mode = mode 

911 

912 def insert_file_from_fs_path( 

913 self, 

914 name: str, 

915 fs_path: str, 

916 *, 

917 exist_ok: bool = True, 

918 use_fs_path_mode: bool = False, 

919 mode: int = 0o0644, 

920 require_copy_on_write: bool = True, 

921 follow_symlinks: bool = True, 

922 reference_path: Optional[VirtualPath] = None, 

923 ) -> "FSPath": 

924 if "/" in name or name in {".", ".."}: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true

925 raise ValueError(f'Invalid file name: "{name}"') 

926 if not self.is_dir: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 raise TypeError( 

928 f"Cannot create {self._orphan_safe_path()}/{name}:" 

929 f" {self._orphan_safe_path()} is not a directory" 

930 ) 

931 self._rw_check() 

932 if name in self and not exist_ok: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 raise ValueError( 

934 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

935 f" and exist_ok was False" 

936 ) 

937 new_fs_path = fs_path 

938 if follow_symlinks: 

939 if reference_path is not None: 939 ↛ 940line 939 didn't jump to line 940 because the condition on line 939 was never true

940 raise ValueError( 

941 "The reference_path cannot be used with follow_symlinks" 

942 ) 

943 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

944 

945 fmode: Optional[int] = mode 

946 if use_fs_path_mode: 

947 fmode = None 

948 

949 st = None 

950 if reference_path is None: 

951 st = os.lstat(new_fs_path) 

952 if stat.S_ISDIR(st.st_mode): 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true

953 raise ValueError( 

954 f'The provided path "{fs_path}" is a directory. However, this' 

955 " method does not support directories" 

956 ) 

957 

958 if not stat.S_ISREG(st.st_mode): 958 ↛ 959line 958 didn't jump to line 959 because the condition on line 958 was never true

959 if follow_symlinks: 

960 raise ValueError( 

961 f"The resolved fs_path ({new_fs_path}) was not a file." 

962 ) 

963 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

964 return FSBackedFilePath( 

965 name, 

966 self, 

967 new_fs_path, 

968 initial_mode=fmode, 

969 stat_cache=st, 

970 replaceable_inline=not require_copy_on_write, 

971 reference_path=reference_path, 

972 ) 

973 

974 def add_symlink( 

975 self, 

976 link_name: str, 

977 link_target: str, 

978 *, 

979 reference_path: Optional[VirtualPath] = None, 

980 ) -> "FSPath": 

981 if "/" in link_name or link_name in {".", ".."}: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true

982 raise ValueError( 

983 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

984 ) 

985 if not self.is_dir: 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true

986 raise TypeError( 

987 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

988 f" {self._orphan_safe_path()} is not a directory" 

989 ) 

990 self._rw_check() 

991 

992 existing = self.get(link_name) 

993 if existing: 993 ↛ 995line 993 didn't jump to line 995 because the condition on line 993 was never true

994 # Emulate ln -sf with attempts a non-recursive unlink first. 

995 existing.unlink(recursive=False) 

996 

997 return SymlinkVirtualPath( 

998 link_name, 

999 self, 

1000 link_target, 

1001 reference_path=reference_path, 

1002 ) 

1003 

1004 def mkdir( 

1005 self, 

1006 name: str, 

1007 *, 

1008 reference_path: Optional[VirtualPath] = None, 

1009 ) -> "FSPath": 

1010 if "/" in name or name in {".", ".."}: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 raise ValueError( 

1012 f'Invalid file name: "{name}" (it must be a valid basename)' 

1013 ) 

1014 if not self.is_dir: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true

1015 raise TypeError( 

1016 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1017 f" {self._orphan_safe_path()} is not a directory" 

1018 ) 

1019 if reference_path is not None and not reference_path.is_dir: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 raise ValueError( 

1021 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1022 ) 

1023 self._rw_check() 

1024 

1025 existing = self.get(name) 

1026 if existing: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 raise ValueError(f"Path {existing.path} already exist") 

1028 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1029 

1030 def mkdirs(self, path: str) -> "FSPath": 

1031 return cast("FSPath", super().mkdirs(path)) 

1032 

1033 @property 

1034 def is_read_write(self) -> bool: 

1035 """When true, the file system entry may be mutated 

1036 

1037 :return: Whether file system mutations are permitted. 

1038 """ 

1039 if self.is_detached: 

1040 return True 

1041 return assume_not_none(self.parent_dir).is_read_write 

1042 

1043 def unlink(self, *, recursive: bool = False) -> None: 

1044 """Unlink a file or a directory 

1045 

1046 This operation will detach the path from the file system (causing "is_detached" to return True). 

1047 

1048 Note that the root directory cannot be deleted. 

1049 

1050 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1051 as well. When False, an error is raised if the path is a non-empty directory 

1052 """ 

1053 if self.is_detached: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 return 

1055 if not recursive and any(self.iterdir): 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true

1056 raise ValueError( 

1057 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1058 ) 

1059 # The .parent_dir setter does a _rw_check() for us. 

1060 self.parent_dir = None 

1061 

1062 def _reset_caches(self) -> None: 

1063 self._mtime = None 

1064 self._stat_cache = None 

1065 

1066 def metadata( 

1067 self, 

1068 metadata_type: Type[PMT], 

1069 *, 

1070 owning_plugin: Optional[str] = None, 

1071 ) -> PathMetadataReference[PMT]: 

1072 current_plugin = self._current_plugin() 

1073 if owning_plugin is None: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true

1074 owning_plugin = current_plugin 

1075 metadata_key = (owning_plugin, metadata_type) 

1076 metadata_value = self._metadata.get(metadata_key) 

1077 if metadata_value is None: 

1078 if self.is_detached: 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true

1079 raise TypeError( 

1080 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1081 ) 

1082 if not self.is_read_write: 

1083 return AlwaysEmptyReadOnlyMetadataReference( 

1084 owning_plugin, 

1085 current_plugin, 

1086 metadata_type, 

1087 ) 

1088 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1089 self._metadata[metadata_key] = metadata_value 

1090 return PathMetadataReferenceImplementation( 

1091 self, 

1092 current_plugin, 

1093 metadata_value, 

1094 ) 

1095 

1096 @contextlib.contextmanager 

1097 def replace_fs_path_content( 

1098 self, 

1099 *, 

1100 use_fs_path_mode: bool = False, 

1101 ) -> Iterator[str]: 

1102 if not self.is_file: 1102 ↛ 1103line 1102 didn't jump to line 1103 because the condition on line 1102 was never true

1103 raise TypeError( 

1104 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1105 ) 

1106 self._rw_check() 

1107 fs_path = self.fs_path 

1108 if not self._can_replace_inline: 1108 ↛ 1120line 1108 didn't jump to line 1120 because the condition on line 1108 was always true

1109 fs_path = self.fs_path 

1110 directory = generated_content_dir() 

1111 with tempfile.NamedTemporaryFile( 

1112 dir=directory, suffix=f"__{self.name}", delete=False 

1113 ) as new_path_fd: 

1114 new_path_fd.close() 

1115 _cp_a(fs_path, new_path_fd.name) 

1116 fs_path = new_path_fd.name 

1117 self._replaced_path(fs_path) 

1118 assert self.fs_path == fs_path 

1119 

1120 current_mtime = self._mtime 

1121 if current_mtime is not None: 

1122 os.utime(fs_path, (current_mtime, current_mtime)) 

1123 

1124 current_mode = self.mode 

1125 yield fs_path 

1126 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1127 if not use_fs_path_mode: 1127 ↛ 1129line 1127 didn't jump to line 1129 because the condition on line 1127 was always true

1128 os.chmod(fs_path, current_mode) 

1129 self._reset_caches() 

1130 

1131 def _replaced_path(self, new_fs_path: str) -> None: 

1132 raise NotImplementedError 

1133 

1134 

1135class VirtualFSPathBase(FSPath, ABC): 

1136 __slots__ = () 

1137 

1138 def __init__( 

1139 self, 

1140 basename: str, 

1141 parent: Optional["FSPath"], 

1142 children: Optional[Dict[str, "FSPath"]] = None, 

1143 initial_mode: Optional[int] = None, 

1144 mtime: Optional[float] = None, 

1145 stat_cache: Optional[os.stat_result] = None, 

1146 ) -> None: 

1147 super().__init__( 

1148 basename, 

1149 parent, 

1150 children, 

1151 initial_mode=initial_mode, 

1152 mtime=mtime, 

1153 stat_cache=stat_cache, 

1154 ) 

1155 

1156 @property 

1157 def mtime(self) -> float: 

1158 mtime = self._mtime 

1159 if mtime is None: 

1160 mtime = time.time() 

1161 self._mtime = mtime 

1162 return mtime 

1163 

1164 @property 

1165 def has_fs_path(self) -> bool: 

1166 return False 

1167 

1168 def stat(self) -> os.stat_result: 

1169 if not self.has_fs_path: 

1170 raise PureVirtualPathError( 

1171 "stat() is only applicable to paths backed by the file system. The path" 

1172 f" {self._orphan_safe_path()!r} is purely virtual" 

1173 ) 

1174 return super().stat() 

1175 

1176 @property 

1177 def fs_path(self) -> str: 

1178 if not self.has_fs_path: 

1179 raise PureVirtualPathError( 

1180 "fs_path is only applicable to paths backed by the file system. The path" 

1181 f" {self._orphan_safe_path()!r} is purely virtual" 

1182 ) 

1183 return self.fs_path 

1184 

1185 

1186class FSRootDir(FSPath): 

1187 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1188 

1189 def __init__(self, fs_path: Optional[str] = None) -> None: 

1190 self._fs_path = fs_path 

1191 self._fs_read_write = True 

1192 super().__init__( 

1193 ".", 

1194 None, 

1195 children={}, 

1196 initial_mode=0o755, 

1197 ) 

1198 self._plugin_context = CurrentPluginContextManager("debputy") 

1199 

1200 @property 

1201 def is_detached(self) -> bool: 

1202 return False 

1203 

1204 def _orphan_safe_path(self) -> str: 

1205 return self.name 

1206 

1207 @property 

1208 def path(self) -> str: 

1209 return self.name 

1210 

1211 @property 

1212 def parent_dir(self) -> Optional["FSPath"]: 

1213 return None 

1214 

1215 @parent_dir.setter 

1216 def parent_dir(self, new_parent: Optional[FSPath]) -> None: 

1217 if new_parent is not None: 

1218 raise ValueError("The root directory cannot become a non-root directory") 

1219 

1220 @property 

1221 def parent_dir_path(self) -> Optional[str]: 

1222 return None 

1223 

1224 @property 

1225 def is_dir(self) -> bool: 

1226 return True 

1227 

1228 @property 

1229 def is_file(self) -> bool: 

1230 return False 

1231 

1232 @property 

1233 def is_symlink(self) -> bool: 

1234 return False 

1235 

1236 def readlink(self) -> str: 

1237 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1238 

1239 @property 

1240 def has_fs_path(self) -> bool: 

1241 return self._fs_path is not None 

1242 

1243 def stat(self) -> os.stat_result: 

1244 if not self.has_fs_path: 

1245 raise PureVirtualPathError( 

1246 "stat() is only applicable to paths backed by the file system. The path" 

1247 f" {self._orphan_safe_path()!r} is purely virtual" 

1248 ) 

1249 return os.stat(self.fs_path) 

1250 

1251 @property 

1252 def fs_path(self) -> str: 

1253 if not self.has_fs_path: 1253 ↛ 1254line 1253 didn't jump to line 1254 because the condition on line 1253 was never true

1254 raise PureVirtualPathError( 

1255 "fs_path is only applicable to paths backed by the file system. The path" 

1256 f" {self._orphan_safe_path()!r} is purely virtual" 

1257 ) 

1258 return assume_not_none(self._fs_path) 

1259 

1260 @property 

1261 def is_read_write(self) -> bool: 

1262 return self._fs_read_write 

1263 

1264 @is_read_write.setter 

1265 def is_read_write(self, new_value: bool) -> None: 

1266 self._fs_read_write = new_value 

1267 

1268 def prune_if_empty_dir(self) -> None: 

1269 # No-op for the root directory. There is never a case where you want to delete this directory 

1270 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1271 return 

1272 

1273 def unlink(self, *, recursive: bool = False) -> None: 

1274 # There is never a case where you want to delete this directory (and even if you could, 

1275 # debputy will need it for technical reasons, so the root dir stays) 

1276 raise TypeError("Cannot delete the root directory") 

1277 

1278 def _current_plugin(self) -> str: 

1279 return self._plugin_context.current_plugin_name 

1280 

1281 @contextlib.contextmanager 

1282 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1283 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1284 yield r 

1285 

1286 

1287class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1288 __slots__ = ("_reference_path",) 

1289 

1290 def __init__( 

1291 self, 

1292 basename: str, 

1293 parent: FSPath, 

1294 *, 

1295 default_mode: int, 

1296 reference_path: Optional[VirtualPath] = None, 

1297 ) -> None: 

1298 super().__init__( 

1299 basename, 

1300 parent=parent, 

1301 initial_mode=reference_path.mode if reference_path else default_mode, 

1302 ) 

1303 self._reference_path = reference_path 

1304 

1305 @property 

1306 def has_fs_path(self) -> bool: 

1307 ref_path = self._reference_path 

1308 return ref_path is not None and ref_path.has_fs_path 

1309 

1310 @property 

1311 def mtime(self) -> float: 

1312 mtime = self._mtime 

1313 if mtime is None: 1313 ↛ 1320line 1313 didn't jump to line 1320 because the condition on line 1313 was always true

1314 ref_path = self._reference_path 

1315 if ref_path: 1315 ↛ 1318line 1315 didn't jump to line 1318 because the condition on line 1315 was always true

1316 mtime = ref_path.mtime 

1317 else: 

1318 mtime = super().mtime 

1319 self._mtime = mtime 

1320 return mtime 

1321 

1322 @mtime.setter 

1323 def mtime(self, new_mtime: float) -> None: 

1324 self._rw_check() 

1325 self._mtime = new_mtime 

1326 

1327 @property 

1328 def fs_path(self) -> str: 

1329 ref_path = self._reference_path 

1330 if ref_path is not None and ( 1330 ↛ 1334line 1330 didn't jump to line 1334 because the condition on line 1330 was always true

1331 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1332 ): 

1333 return ref_path.fs_path 

1334 return super().fs_path 

1335 

1336 def stat(self) -> os.stat_result: 

1337 ref_path = self._reference_path 

1338 if ref_path is not None and ( 

1339 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1340 ): 

1341 return ref_path.stat() 

1342 return super().stat() 

1343 

1344 def open( 

1345 self, 

1346 *, 

1347 byte_io: bool = False, 

1348 buffering: int = -1, 

1349 ) -> Union[TextIO, BinaryIO]: 

1350 reference_path = self._reference_path 

1351 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1352 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1353 return super().open(byte_io=byte_io, buffering=buffering) 

1354 

1355 

1356class VirtualDirectoryFSPath(VirtualPathWithReference): 

1357 __slots__ = ("_reference_path",) 

1358 

1359 def __init__( 

1360 self, 

1361 basename: str, 

1362 parent: FSPath, 

1363 *, 

1364 reference_path: Optional[VirtualPath] = None, 

1365 ) -> None: 

1366 super().__init__( 

1367 basename, 

1368 parent, 

1369 reference_path=reference_path, 

1370 default_mode=0o755, 

1371 ) 

1372 self._reference_path = reference_path 

1373 assert reference_path is None or reference_path.is_dir 

1374 self._ensure_min_mode() 

1375 

1376 @property 

1377 def is_dir(self) -> bool: 

1378 return True 

1379 

1380 @property 

1381 def is_file(self) -> bool: 

1382 return False 

1383 

1384 @property 

1385 def is_symlink(self) -> bool: 

1386 return False 

1387 

1388 def readlink(self) -> str: 

1389 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1390 

1391 

1392class SymlinkVirtualPath(VirtualPathWithReference): 

1393 __slots__ = ("_link_target",) 

1394 

1395 def __init__( 

1396 self, 

1397 basename: str, 

1398 parent_dir: FSPath, 

1399 link_target: str, 

1400 *, 

1401 reference_path: Optional[VirtualPath] = None, 

1402 ) -> None: 

1403 super().__init__( 

1404 basename, 

1405 parent=parent_dir, 

1406 default_mode=_SYMLINK_MODE, 

1407 reference_path=reference_path, 

1408 ) 

1409 self._link_target = link_target 

1410 

1411 @property 

1412 def is_dir(self) -> bool: 

1413 return False 

1414 

1415 @property 

1416 def is_file(self) -> bool: 

1417 return False 

1418 

1419 @property 

1420 def is_symlink(self) -> bool: 

1421 return True 

1422 

1423 def readlink(self) -> str: 

1424 return self._link_target 

1425 

1426 @property 

1427 def size(self) -> int: 

1428 return len(self.readlink()) 

1429 

1430 

1431class FSBackedFilePath(VirtualPathWithReference): 

1432 __slots__ = ("_fs_path", "_replaceable_inline") 

1433 

1434 def __init__( 

1435 self, 

1436 basename: str, 

1437 parent_dir: FSPath, 

1438 fs_path: str, 

1439 *, 

1440 replaceable_inline: bool = False, 

1441 initial_mode: Optional[int] = None, 

1442 mtime: Optional[float] = None, 

1443 stat_cache: Optional[os.stat_result] = None, 

1444 reference_path: Optional[VirtualPath] = None, 

1445 ) -> None: 

1446 super().__init__( 

1447 basename, 

1448 parent_dir, 

1449 default_mode=0o644, 

1450 reference_path=reference_path, 

1451 ) 

1452 self._fs_path = fs_path 

1453 self._replaceable_inline = replaceable_inline 

1454 if initial_mode is not None: 

1455 self.mode = initial_mode 

1456 if mtime is not None: 

1457 self._mtime = mtime 

1458 self._stat_cache = stat_cache 

1459 assert ( 

1460 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1461 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1462 self._ensure_min_mode() 

1463 

1464 @property 

1465 def is_dir(self) -> bool: 

1466 return False 

1467 

1468 @property 

1469 def is_file(self) -> bool: 

1470 return True 

1471 

1472 @property 

1473 def is_symlink(self) -> bool: 

1474 return False 

1475 

1476 def readlink(self) -> str: 

1477 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1478 

1479 @property 

1480 def has_fs_path(self) -> bool: 

1481 return True 

1482 

1483 @property 

1484 def fs_path(self) -> str: 

1485 return self._fs_path 

1486 

1487 @property 

1488 def _can_replace_inline(self) -> bool: 

1489 return self._replaceable_inline 

1490 

1491 def _replaced_path(self, new_fs_path: str) -> None: 

1492 self._fs_path = new_fs_path 

1493 self._reference_path = None 

1494 self._replaceable_inline = True 

1495 

1496 

1497_SYMLINK_MODE = 0o777 

1498 

1499 

1500class VirtualTestPath(FSPath): 

1501 __slots__ = ( 

1502 "_path_type", 

1503 "_has_fs_path", 

1504 "_fs_path", 

1505 "_link_target", 

1506 "_content", 

1507 "_materialized_content", 

1508 ) 

1509 

1510 def __init__( 

1511 self, 

1512 basename: str, 

1513 parent_dir: Optional[FSPath], 

1514 mode: Optional[int] = None, 

1515 mtime: Optional[float] = None, 

1516 is_dir: bool = False, 

1517 has_fs_path: Optional[bool] = False, 

1518 fs_path: Optional[str] = None, 

1519 link_target: Optional[str] = None, 

1520 content: Optional[str] = None, 

1521 materialized_content: Optional[str] = None, 

1522 ) -> None: 

1523 if is_dir: 

1524 self._path_type = PathType.DIRECTORY 

1525 elif link_target is not None: 

1526 self._path_type = PathType.SYMLINK 

1527 if mode is not None and mode != _SYMLINK_MODE: 1527 ↛ 1528line 1527 didn't jump to line 1528 because the condition on line 1527 was never true

1528 raise ValueError( 

1529 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1530 ) 

1531 assert mode is None or mode == _SYMLINK_MODE 

1532 else: 

1533 self._path_type = PathType.FILE 

1534 

1535 if mode is not None: 

1536 initial_mode = mode 

1537 else: 

1538 initial_mode = 0o755 if is_dir else 0o644 

1539 

1540 self._link_target = link_target 

1541 if has_fs_path is None: 

1542 has_fs_path = bool(fs_path) 

1543 self._has_fs_path = has_fs_path 

1544 self._fs_path = fs_path 

1545 self._materialized_content = materialized_content 

1546 super().__init__( 

1547 basename, 

1548 parent=parent_dir, 

1549 initial_mode=initial_mode, 

1550 mtime=mtime, 

1551 ) 

1552 self._content = content 

1553 

1554 @property 

1555 def is_dir(self) -> bool: 

1556 return self._path_type == PathType.DIRECTORY 

1557 

1558 @property 

1559 def is_file(self) -> bool: 

1560 return self._path_type == PathType.FILE 

1561 

1562 @property 

1563 def is_symlink(self) -> bool: 

1564 return self._path_type == PathType.SYMLINK 

1565 

1566 def readlink(self) -> str: 

1567 if not self.is_symlink: 1567 ↛ 1568line 1567 didn't jump to line 1568 because the condition on line 1567 was never true

1568 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1569 link_target = self._link_target 

1570 assert link_target is not None 

1571 return link_target 

1572 

1573 @property 

1574 def mtime(self) -> float: 

1575 if self._mtime is None: 

1576 self._mtime = time.time() 

1577 return self._mtime 

1578 

1579 @mtime.setter 

1580 def mtime(self, new_mtime: float) -> None: 

1581 self._rw_check() 

1582 self._mtime = new_mtime 

1583 

1584 @property 

1585 def has_fs_path(self) -> bool: 

1586 return self._has_fs_path 

1587 

1588 def stat(self) -> os.stat_result: 

1589 if self.has_fs_path: 

1590 path = self.fs_path 

1591 if path is None: 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true

1592 raise PureVirtualPathError( 

1593 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1594 " cannot provide!" 

1595 ) 

1596 try: 

1597 return os.stat(path) 

1598 except FileNotFoundError as e: 

1599 raise PureVirtualPathError( 

1600 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1601 " cannot provide! (An fs_path was provided, but it did not exist)" 

1602 ) from e 

1603 

1604 raise PureVirtualPathError( 

1605 "stat() is only applicable to paths backed by the file system. The path" 

1606 f" {self._orphan_safe_path()!r} is purely virtual" 

1607 ) 

1608 

1609 @property 

1610 def size(self) -> int: 

1611 if self._content is not None: 

1612 return len(self._content.encode("utf-8")) 

1613 if self.is_symlink: 

1614 return len(self.readlink()) 

1615 if not self.has_fs_path or self.fs_path is None: 

1616 return 0 

1617 return self.stat().st_size 

1618 

1619 @property 

1620 def fs_path(self) -> str: 

1621 if self.has_fs_path: 

1622 if self._fs_path is None and self._materialized_content is not None: 

1623 with tempfile.NamedTemporaryFile( 

1624 mode="w+t", 

1625 encoding="utf-8", 

1626 suffix=f"__{self.name}", 

1627 delete=False, 

1628 ) as fd: 

1629 filepath = fd.name 

1630 fd.write(self._materialized_content) 

1631 self._fs_path = filepath 

1632 atexit.register(lambda: os.unlink(filepath)) 1632 ↛ exitline 1632 didn't run the lambda on line 1632

1633 

1634 path = self._fs_path 

1635 if path is None: 1635 ↛ 1636line 1635 didn't jump to line 1636 because the condition on line 1635 was never true

1636 raise PureVirtualPathError( 

1637 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1638 " mock path cannot provide!" 

1639 ) 

1640 return path 

1641 raise PureVirtualPathError( 

1642 "fs_path is only applicable to paths backed by the file system. The path" 

1643 f" {self._orphan_safe_path()!r} is purely virtual" 

1644 ) 

1645 

1646 def replace_fs_path_content( 

1647 self, 

1648 *, 

1649 use_fs_path_mode: bool = False, 

1650 ) -> ContextManager[str]: 

1651 if self._content is not None: 1651 ↛ 1652line 1651 didn't jump to line 1652 because the condition on line 1651 was never true

1652 raise TypeError( 

1653 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1654 " created with `content` but for this method to work, the path should have been" 

1655 " created with `materialized_content`" 

1656 ) 

1657 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1658 

1659 @contextlib.contextmanager 

1660 def open_child( 

1661 self, 

1662 name: str, 

1663 mode: OpenMode = "r", 

1664 buffering: int = -1, 

1665 ) -> Union[TextIO, BinaryIO]: 

1666 existing = self.get(name) 

1667 if existing or "r" in mode: 

1668 with super().open_child(name, mode, buffering=buffering) as fd: 

1669 yield fd 

1670 return 

1671 if "b" in mode: 

1672 fd = io.BytesIO(b"") 

1673 yield fd 

1674 content = fd.getvalue().decode("utf-8") 

1675 else: 

1676 fd = io.StringIO("") 

1677 yield fd 

1678 content = fd.getvalue() 

1679 VirtualTestPath( 

1680 name, 

1681 self, 

1682 mode=0o644, 

1683 content=content, 

1684 has_fs_path=True, 

1685 ) 

1686 

1687 def open( 

1688 self, 

1689 *, 

1690 byte_io: bool = False, 

1691 buffering: int = -1, 

1692 ) -> Union[TextIO, BinaryIO]: 

1693 if self._content is None: 

1694 try: 

1695 return super().open(byte_io=byte_io, buffering=buffering) 

1696 except FileNotFoundError as e: 

1697 raise TestPathWithNonExistentFSPathError( 

1698 "The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1699 " exist. This exception can only occur in the testsuite. Either have the" 

1700 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1701 " if that is too painful in general, have the code accept this error as a " 

1702 " test only-case and provide a default." 

1703 ) from e 

1704 

1705 if byte_io: 

1706 return io.BytesIO(self._content.encode("utf-8")) 

1707 return io.StringIO(self._content) 

1708 

1709 def _replaced_path(self, new_fs_path: str) -> None: 

1710 self._fs_path = new_fs_path 

1711 

1712 

1713class FSOverlayBase(VirtualPathBase, Generic[FSP]): 

1714 __slots__ = ( 

1715 "_path", 

1716 "_fs_path", 

1717 "_parent", 

1718 "__weakref__", 

1719 ) 

1720 

1721 def __init__( 

1722 self, 

1723 path: str, 

1724 fs_path: str, 

1725 parent: Optional[FSP], 

1726 ) -> None: 

1727 self._path: str = path 

1728 prefix = "/" if fs_path.startswith("/") else "" 

1729 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1730 self._parent: Optional[ReferenceType[FSP]] = ( 

1731 ref(parent) if parent is not None else None 

1732 ) 

1733 

1734 @property 

1735 def name(self) -> str: 

1736 return os.path.basename(self._path) 

1737 

1738 @property 

1739 def path(self) -> str: 

1740 return self._path 

1741 

1742 @property 

1743 def parent_dir(self) -> Optional["FSP"]: 

1744 parent = self._parent 

1745 if parent is None: 

1746 return None 

1747 resolved = parent() 

1748 if resolved is None: 

1749 raise RuntimeError("Parent was garbage collected!") 

1750 return resolved 

1751 

1752 @property 

1753 def fs_path(self) -> str: 

1754 return self._fs_path 

1755 

1756 def stat(self) -> os.stat_result: 

1757 return os.lstat(self.fs_path) 

1758 

1759 @property 

1760 def is_dir(self) -> bool: 

1761 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1762 try: 

1763 return stat.S_ISDIR(self.stat().st_mode) 

1764 except FileNotFoundError: 1764 ↛ 1766line 1764 didn't jump to line 1766

1765 return False 

1766 except NotImplementedError: 

1767 print(self.__class__) 

1768 

1769 @property 

1770 def is_file(self) -> bool: 

1771 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1772 try: 

1773 return stat.S_ISREG(self.stat().st_mode) 

1774 except FileNotFoundError: 

1775 return False 

1776 

1777 @property 

1778 def is_symlink(self) -> bool: 

1779 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1780 try: 

1781 return stat.S_ISLNK(self.stat().st_mode) 

1782 except FileNotFoundError: 

1783 return False 

1784 

1785 @property 

1786 def has_fs_path(self) -> bool: 

1787 return True 

1788 

1789 def open( 

1790 self, 

1791 *, 

1792 byte_io: bool = False, 

1793 buffering: int = -1, 

1794 ) -> Union[TextIO, BinaryIO]: 

1795 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1796 # case. 

1797 if not self.is_file and not self.is_symlink: 

1798 raise TypeError( 

1799 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1800 ) 

1801 

1802 if byte_io: 

1803 return open(self.fs_path, "rb", buffering=buffering) 

1804 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering) 

1805 

1806 def metadata( 

1807 self, 

1808 metadata_type: Type[PMT], 

1809 *, 

1810 owning_plugin: Optional[str] = None, 

1811 ) -> PathMetadataReference[PMT]: 

1812 current_plugin = self._current_plugin() 

1813 if owning_plugin is None: 

1814 owning_plugin = current_plugin 

1815 return AlwaysEmptyReadOnlyMetadataReference( 

1816 owning_plugin, 

1817 current_plugin, 

1818 metadata_type, 

1819 ) 

1820 

1821 def all_paths(self) -> Iterable["FSControlPath"]: 

1822 yield self 

1823 if not self.is_dir: 

1824 return 

1825 stack = list(self.iterdir) 

1826 stack.reverse() 

1827 while stack: 

1828 current = stack.pop() 

1829 yield current 

1830 if current.is_dir: 

1831 stack.extend(reversed(list(current.iterdir))) 

1832 

1833 def _resolve_children( 

1834 self, new_child: Callable[[str, str, FSP], FSC] 

1835 ) -> Mapping[str, FSC]: 

1836 if not self.is_dir: 

1837 return {} 

1838 dir_path = self.path 

1839 dir_fs_path = self.fs_path 

1840 children = {} 

1841 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1842 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1843 child_fs_path = ( 

1844 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1845 ) 

1846 children[name] = new_child( 

1847 child_path, 

1848 child_fs_path, 

1849 self, 

1850 ) 

1851 return children 

1852 

1853 

1854class FSROOverlay(FSOverlayBase["FSROOverlay"]): 

1855 __slots__ = ( 

1856 "_stat_cache", 

1857 "_readlink_cache", 

1858 "_children", 

1859 "_stat_failed_cache", 

1860 ) 

1861 

1862 def __init__( 

1863 self, 

1864 path: str, 

1865 fs_path: str, 

1866 parent: Optional["FSROOverlay"], 

1867 ) -> None: 

1868 super().__init__(path, fs_path, parent=parent) 

1869 self._stat_cache: Optional[os.stat_result] = None 

1870 self._readlink_cache: Optional[str] = None 

1871 self._stat_failed_cache = False 

1872 self._children: Optional[Mapping[str, FSROOverlay]] = None 

1873 

1874 @classmethod 

1875 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": 

1876 return FSROOverlay(path, fs_path, None) 

1877 

1878 @property 

1879 def iterdir(self) -> Iterable["FSROOverlay"]: 

1880 if not self.is_dir: 

1881 return 

1882 if self._children is None: 

1883 self._ensure_children_are_resolved() 

1884 yield from assume_not_none(self._children).values() 

1885 

1886 def lookup(self, path: str) -> Optional["FSROOverlay"]: 

1887 if not self.is_dir: 

1888 return None 

1889 if self._children is None: 

1890 self._ensure_children_are_resolved() 

1891 

1892 absolute, _, path_parts = _split_path(path) 

1893 current = cast("FSROOverlay", _root(self)) if absolute else self 

1894 for no, dir_part in enumerate(path_parts): 

1895 if dir_part == ".": 

1896 continue 

1897 if dir_part == "..": 

1898 p = current.parent_dir 

1899 if p is None: 

1900 raise ValueError(f'The path "{path}" escapes the root dir') 

1901 current = cast("FSROOverlay", p) 

1902 continue 

1903 try: 

1904 current = current[dir_part] 

1905 except KeyError: 

1906 return None 

1907 return current 

1908 

1909 def _ensure_children_are_resolved(self) -> None: 

1910 if not self.is_dir or self._children: 

1911 return 

1912 self._children = self._resolve_children( 

1913 lambda n, fsp, p: FSROOverlay(n, fsp, p) 

1914 ) 

1915 

1916 @property 

1917 def is_detached(self) -> bool: 

1918 return False 

1919 

1920 def __getitem__(self, key) -> "VirtualPath": 

1921 if not self.is_dir: 1921 ↛ 1923line 1921 didn't jump to line 1923 because the condition on line 1921 was always true

1922 raise KeyError(key) 

1923 if self._children is None: 

1924 self._ensure_children_are_resolved() 

1925 if isinstance(key, FSPath): 

1926 key = key.name 

1927 return self._children[key] 

1928 

1929 def __delitem__(self, key) -> None: 

1930 self._error_ro_fs() 

1931 

1932 @property 

1933 def is_read_write(self) -> bool: 

1934 return False 

1935 

1936 def _rw_check(self) -> None: 

1937 self._error_ro_fs() 

1938 

1939 def _error_ro_fs(self) -> NoReturn: 

1940 raise DebputyFSIsROError( 

1941 f'Attempt to write to "{self.path}" failed:' 

1942 " Debputy Virtual File system is R/O." 

1943 ) 

1944 

1945 def stat(self) -> os.stat_result: 

1946 if self._stat_failed_cache: 1946 ↛ 1947line 1946 didn't jump to line 1947 because the condition on line 1946 was never true

1947 raise FileNotFoundError( 

1948 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1949 ) 

1950 

1951 if self._stat_cache is None: 1951 ↛ 1957line 1951 didn't jump to line 1957 because the condition on line 1951 was always true

1952 try: 

1953 self._stat_cache = os.lstat(self.fs_path) 

1954 except FileNotFoundError: 

1955 self._stat_failed_cache = True 

1956 raise 

1957 return self._stat_cache 

1958 

1959 @property 

1960 def mode(self) -> int: 

1961 return stat.S_IMODE(self.stat().st_mode) 

1962 

1963 @mode.setter 

1964 def mode(self, _unused: int) -> None: 

1965 self._error_ro_fs() 

1966 

1967 @property 

1968 def mtime(self) -> float: 

1969 return self.stat().st_mtime 

1970 

1971 @mtime.setter 

1972 def mtime(self, new_mtime: float) -> None: 

1973 self._error_ro_fs() 

1974 

1975 def readlink(self) -> str: 

1976 if not self.is_symlink: 

1977 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1978 if self._readlink_cache is None: 

1979 self._readlink_cache = os.readlink(self.fs_path) 

1980 return self._readlink_cache 

1981 

1982 def chown( 

1983 self, 

1984 owner: Optional[StaticFileSystemOwner], 

1985 group: Optional[StaticFileSystemGroup], 

1986 ) -> None: 

1987 self._error_ro_fs() 

1988 

1989 def mkdir(self, name: str) -> "VirtualPath": 

1990 self._error_ro_fs() 

1991 

1992 def add_file( 

1993 self, 

1994 name: str, 

1995 *, 

1996 unlink_if_exists: bool = True, 

1997 use_fs_path_mode: bool = False, 

1998 mode: int = 0o0644, 

1999 mtime: Optional[float] = None, 

2000 ) -> ContextManager["VirtualPath"]: 

2001 self._error_ro_fs() 

2002 

2003 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2004 self._error_ro_fs() 

2005 

2006 def unlink(self, *, recursive: bool = False) -> None: 

2007 self._error_ro_fs() 

2008 

2009 

2010class FSROOverlayRootDir(FSROOverlay): 

2011 __slots__ = ("_plugin_context",) 

2012 

2013 def __init__(self, path: str, fs_path: str) -> None: 

2014 super().__init__(path, fs_path, None) 

2015 self._plugin_context = CurrentPluginContextManager("debputy") 

2016 

2017 def _current_plugin(self) -> str: 

2018 return self._plugin_context.current_plugin_name 

2019 

2020 @contextlib.contextmanager 

2021 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2022 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2023 yield r 

2024 

2025 

2026class FSControlPath(FSOverlayBase["FSControlPath"]): 

2027 

2028 @property 

2029 def iterdir(self) -> Iterable["FSControlPath"]: 

2030 if not self.is_dir: 

2031 return 

2032 yield from self._resolve_children( 

2033 lambda n, fsp, p: FSControlPath(n, fsp, p) 

2034 ).values() 

2035 

2036 def lookup(self, path: str) -> Optional["FSControlPath"]: 

2037 if not self.is_dir: 

2038 return None 

2039 

2040 absolute, _, path_parts = _split_path(path) 

2041 current = cast("FSControlPath", _root(self)) if absolute else self 

2042 for no, dir_part in enumerate(path_parts): 

2043 if dir_part == ".": 

2044 continue 

2045 if dir_part == "..": 

2046 p = current.parent_dir 

2047 if p is None: 

2048 raise ValueError(f'The path "{path}" escapes the root dir') 

2049 current = cast("FSControlPath", p) 

2050 continue 

2051 try: 

2052 current = current[dir_part] 

2053 except KeyError: 

2054 return None 

2055 return current 

2056 

2057 @property 

2058 def is_detached(self) -> bool: 

2059 try: 

2060 self.stat() 

2061 except FileNotFoundError: 

2062 return True 

2063 else: 

2064 return False 

2065 

2066 def __getitem__(self, key) -> "VirtualPath": 

2067 if not self.is_dir: 

2068 raise KeyError(key) 

2069 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p)) 

2070 if isinstance(key, FSPath): 

2071 key = key.name 

2072 return children[key] 

2073 

2074 def __delitem__(self, key) -> None: 

2075 self[key].unlink() 

2076 

2077 @property 

2078 def is_read_write(self) -> bool: 

2079 return True 

2080 

2081 @property 

2082 def mode(self) -> int: 

2083 return stat.S_IMODE(self.stat().st_mode) 

2084 

2085 @mode.setter 

2086 def mode(self, new_mode: int) -> None: 

2087 os.chmod(self.fs_path, new_mode) 

2088 

2089 @property 

2090 def mtime(self) -> float: 

2091 return self.stat().st_mtime 

2092 

2093 @mtime.setter 

2094 def mtime(self, new_mtime: float) -> None: 

2095 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2096 

2097 def readlink(self) -> str: 

2098 if not self.is_symlink: 

2099 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2100 assert False 

2101 

2102 def chown( 

2103 self, 

2104 owner: Optional[StaticFileSystemOwner], 

2105 group: Optional[StaticFileSystemGroup], 

2106 ) -> None: 

2107 raise ValueError( 

2108 "No need to chown paths in the control.tar: They are always root:root" 

2109 ) 

2110 

2111 def mkdir(self, name: str) -> "VirtualPath": 

2112 raise TypeError("The control.tar never contains subdirectories.") 

2113 

2114 @contextlib.contextmanager 

2115 def add_file( 

2116 self, 

2117 name: str, 

2118 *, 

2119 unlink_if_exists: bool = True, 

2120 use_fs_path_mode: bool = False, 

2121 mode: int = 0o0644, 

2122 mtime: Optional[float] = None, 

2123 ) -> ContextManager["VirtualPath"]: 

2124 if "/" in name or name in {".", ".."}: 

2125 raise ValueError(f'Invalid file name: "{name}"') 

2126 if not self.is_dir: 

2127 raise TypeError( 

2128 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2129 f" {self._orphan_safe_path()} is not a directory" 

2130 ) 

2131 self._rw_check() 

2132 existing = self.get(name) 

2133 if existing is not None: 

2134 if not unlink_if_exists: 

2135 raise ValueError( 

2136 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2137 f" and exist_ok was False" 

2138 ) 

2139 assert existing.is_file 

2140 

2141 fs_path = os.path.join(self.fs_path, name) 

2142 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2143 with open(fs_path, "wb") as fd: 

2144 # Ensure that the fs_path exists and default mode is reasonable 

2145 os.chmod(fd.fileno(), mode) 

2146 child = FSControlPath( 

2147 name, 

2148 fs_path, 

2149 self, 

2150 ) 

2151 yield child 

2152 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2153 child.mode = mode 

2154 

2155 @contextlib.contextmanager 

2156 def replace_fs_path_content( 

2157 self, 

2158 *, 

2159 use_fs_path_mode: bool = False, 

2160 ) -> ContextManager[str]: 

2161 if not self.is_file: 

2162 raise TypeError( 

2163 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2164 ) 

2165 restore_mode = self.mode if use_fs_path_mode else None 

2166 yield self.fs_path 

2167 _check_fs_path_is_file(self.fs_path, self) 

2168 if restore_mode is not None: 

2169 self.mode = restore_mode 

2170 

2171 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2172 raise TypeError("The control.tar never contains symlinks.") 

2173 

2174 def unlink(self, *, recursive: bool = False) -> None: 

2175 if self._parent is None: 

2176 return 

2177 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2178 # matters and that `os.unlink` will be sufficient. 

2179 assert self.is_file 

2180 os.unlink(self.fs_path) 

2181 

2182 

2183class FSControlRootDir(FSControlPath): 

2184 

2185 @classmethod 

2186 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2187 return FSControlRootDir(".", fs_path, None) 

2188 

2189 

2190def as_path_def(pd: Union[str, PathDef]) -> PathDef: 

2191 return PathDef(pd) if isinstance(pd, str) else pd 

2192 

2193 

2194def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]: 

2195 yield from (as_path_def(p) for p in paths) 

2196 

2197 

2198def build_virtual_fs( 

2199 paths: Iterable[Union[str, PathDef]], 

2200 read_write_fs: bool = False, 

2201) -> "FSPath": 

2202 root_dir: Optional[FSRootDir] = None 

2203 directories: Dict[str, FSPath] = {} 

2204 non_directories = set() 

2205 

2206 def _ensure_parent_dirs(p: str) -> None: 

2207 current = p.rstrip("/") 

2208 missing_dirs = [] 

2209 while True: 

2210 current = os.path.dirname(current) 

2211 if current in directories: 

2212 break 

2213 if current in non_directories: 2213 ↛ 2214line 2213 didn't jump to line 2214 because the condition on line 2213 was never true

2214 raise ValueError( 

2215 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2216 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2217 ) 

2218 missing_dirs.append(current) 

2219 for dir_path in reversed(missing_dirs): 

2220 parent_dir = directories[os.path.dirname(dir_path)] 

2221 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2222 directories[dir_path] = d 

2223 

2224 for path_def in as_path_defs(paths): 

2225 path = path_def.path_name 

2226 if path in directories or path in non_directories: 2226 ↛ 2227line 2226 didn't jump to line 2227 because the condition on line 2226 was never true

2227 raise ValueError( 

2228 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2229 ' "correct order" (ensure directories occur before their children)' 

2230 ) 

2231 if root_dir is None: 

2232 root_fs_path = None 

2233 if path in (".", "./", "/"): 

2234 root_fs_path = path_def.fs_path 

2235 root_dir = FSRootDir(fs_path=root_fs_path) 

2236 directories["."] = root_dir 

2237 

2238 if path not in (".", "./", "/") and not path.startswith("./"): 

2239 path = "./" + path 

2240 if path not in (".", "./", "/"): 

2241 _ensure_parent_dirs(path) 

2242 if path in (".", "./"): 

2243 assert "." in directories 

2244 continue 

2245 is_dir = False 

2246 if path.endswith("/"): 

2247 path = path[:-1] 

2248 is_dir = True 

2249 directory = directories[os.path.dirname(path)] 

2250 assert not is_dir or not bool( 

2251 path_def.link_target 

2252 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2253 fs_path = VirtualTestPath( 

2254 os.path.basename(path), 

2255 directory, 

2256 is_dir=is_dir, 

2257 mode=path_def.mode, 

2258 mtime=path_def.mtime, 

2259 has_fs_path=path_def.has_fs_path, 

2260 fs_path=path_def.fs_path, 

2261 link_target=path_def.link_target, 

2262 content=path_def.content, 

2263 materialized_content=path_def.materialized_content, 

2264 ) 

2265 assert not fs_path.is_detached 

2266 if fs_path.is_dir: 

2267 directories[fs_path.path] = fs_path 

2268 else: 

2269 non_directories.add(fs_path.path) 

2270 

2271 if root_dir is None: 

2272 root_dir = FSRootDir() 

2273 

2274 root_dir.is_read_write = read_write_fs 

2275 return root_dir