Coverage for src/debputy/filesystem_scan.py: 70%

1276 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import stat 

9import subprocess 

10import tempfile 

11import time 

12from abc import ABC 

13from contextlib import suppress 

14from typing import ( 

15 List, 

16 Iterable, 

17 Dict, 

18 Optional, 

19 Tuple, 

20 Union, 

21 Iterator, 

22 Mapping, 

23 cast, 

24 Any, 

25 ContextManager, 

26 TextIO, 

27 BinaryIO, 

28 NoReturn, 

29 Type, 

30 Generic, 

31 Callable, 

32 TypeVar, 

33 overload, 

34 Literal, 

35) 

36from weakref import ref, ReferenceType 

37 

38from debputy.exceptions import ( 

39 PureVirtualPathError, 

40 DebputyFSIsROError, 

41 DebputyMetadataAccessError, 

42 TestPathWithNonExistentFSPathError, 

43 SymlinkLoopError, 

44) 

45from debputy.intermediate_manifest import PathType 

46from debputy.manifest_parser.base_types import ( 

47 ROOT_DEFINITION, 

48 StaticFileSystemOwner, 

49 StaticFileSystemGroup, 

50) 

51from debputy.plugin.api.spec import ( 

52 VirtualPath, 

53 PathDef, 

54 PathMetadataReference, 

55 PMT, 

56) 

57from debputy.types import VP 

58from debputy.util import ( 

59 generated_content_dir, 

60 _error, 

61 escape_shell, 

62 assume_not_none, 

63 _normalize_path, 

64 _debug_log, 

65) 

66 

67BY_BASENAME = operator.attrgetter("name") 

68 

69FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

70FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

71 

72 

73BinaryOpenMode = Literal[ 

74 "rb", 

75 "r+b", 

76 "wb", 

77 "w+b", 

78 "xb", 

79 "ab", 

80] 

81TextOpenMode = Literal[ 

82 "r", 

83 "r+", 

84 "rt", 

85 "r+t", 

86 "w", 

87 "w+", 

88 "wt", 

89 "w+t", 

90 "x", 

91 "xt", 

92 "a", 

93 "at", 

94] 

95OpenMode = Union[BinaryOpenMode, TextOpenMode] 

96 

97 

98class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

99 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

100 

101 def __init__( 

102 self, 

103 owning_plugin: str, 

104 current_plugin: str, 

105 metadata_type: Type[PMT], 

106 ) -> None: 

107 self._owning_plugin = owning_plugin 

108 self._current_plugin = current_plugin 

109 self._metadata_type = metadata_type 

110 

111 @property 

112 def is_present(self) -> bool: 

113 return False 

114 

115 @property 

116 def can_read(self) -> bool: 

117 return self._owning_plugin == self._current_plugin 

118 

119 @property 

120 def can_write(self) -> bool: 

121 return False 

122 

123 @property 

124 def value(self) -> Optional[PMT]: 

125 if self.can_read: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true

126 return None 

127 raise DebputyMetadataAccessError( 

128 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

129 f" {self._owning_plugin} as the metadata has not been made" 

130 f" readable to the plugin {self._current_plugin}." 

131 ) 

132 

133 @value.setter 

134 def value(self, new_value: PMT) -> None: 

135 if self._is_owner: 

136 raise DebputyFSIsROError( 

137 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

138 ) 

139 raise DebputyMetadataAccessError( 

140 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

141 f" {self._owning_plugin} as the metadata has not been made" 

142 f" read-write to the plugin {self._current_plugin}." 

143 ) 

144 

145 @property 

146 def _is_owner(self) -> bool: 

147 return self._owning_plugin == self._current_plugin 

148 

149 

150@dataclasses.dataclass(slots=True) 

151class PathMetadataValue(Generic[PMT]): 

152 owning_plugin: str 

153 metadata_type: Type[PMT] 

154 value: Optional[PMT] = None 

155 

156 def can_read_value(self, current_plugin: str) -> bool: 

157 return self.owning_plugin == current_plugin 

158 

159 def can_write_value(self, current_plugin: str) -> bool: 

160 return self.owning_plugin == current_plugin 

161 

162 

163class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

164 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

165 

166 def __init__( 

167 self, 

168 owning_path: VirtualPath, 

169 current_plugin: str, 

170 path_metadata_value: PathMetadataValue[PMT], 

171 ) -> None: 

172 self._owning_path = owning_path 

173 self._current_plugin = current_plugin 

174 self._path_metadata_value = path_metadata_value 

175 

176 @property 

177 def is_present(self) -> bool: 

178 if not self.can_read: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 return False 

180 return self._path_metadata_value.value is not None 

181 

182 @property 

183 def can_read(self) -> bool: 

184 return self._path_metadata_value.can_read_value(self._current_plugin) 

185 

186 @property 

187 def can_write(self) -> bool: 

188 if not self._path_metadata_value.can_write_value(self._current_plugin): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 return False 

190 owning_path = self._owning_path 

191 return owning_path.is_read_write and not owning_path.is_detached 

192 

193 @property 

194 def value(self) -> Optional[PMT]: 

195 if self.can_read: 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true

196 return self._path_metadata_value.value 

197 raise DebputyMetadataAccessError( 

198 f"Cannot read the metadata {self._metadata_type_name} owned by" 

199 f" {self._owning_plugin} as the metadata has not been made" 

200 f" readable to the plugin {self._current_plugin}." 

201 ) 

202 

203 @value.setter 

204 def value(self, new_value: PMT) -> None: 

205 if not self.can_write: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 m = "set" if new_value is not None else "delete" 

207 raise DebputyMetadataAccessError( 

208 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

209 f" {self._owning_plugin} as the metadata has not been made" 

210 f" read-write to the plugin {self._current_plugin}." 

211 ) 

212 owning_path = self._owning_path 

213 if not owning_path.is_read_write: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 raise DebputyFSIsROError( 

215 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

216 ) 

217 if owning_path.is_detached: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 raise TypeError( 

219 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

220 ) 

221 self._path_metadata_value.value = new_value 

222 

223 @property 

224 def _is_owner(self) -> bool: 

225 return self._owning_plugin == self._current_plugin 

226 

227 @property 

228 def _owning_plugin(self) -> str: 

229 return self._path_metadata_value.owning_plugin 

230 

231 @property 

232 def _metadata_type_name(self) -> str: 

233 return self._path_metadata_value.metadata_type.__name__ 

234 

235 

236def _cp_a(source: str, dest: str) -> None: 

237 cmd = ["cp", "-a", source, dest] 

238 try: 

239 subprocess.check_call(cmd) 

240 except subprocess.CalledProcessError: 

241 full_command = escape_shell(*cmd) 

242 _error( 

243 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

244 f" above to understand what went wrong. The full command was: {full_command}" 

245 ) 

246 

247 

248def _split_path(path: str) -> Tuple[bool, bool, List[str]]: 

249 must_be_dir = True if path.endswith("/") else False 

250 absolute = False 

251 if path.startswith("/"): 

252 absolute = True 

253 path = "." + path 

254 path_parts = path.rstrip("/").split("/") 

255 if must_be_dir: 

256 path_parts.append(".") 

257 return absolute, must_be_dir, path_parts 

258 

259 

260def _root(path: VP) -> VP: 

261 current = path 

262 while True: 

263 parent = current.parent_dir 

264 if parent is None: 

265 return current 

266 current = parent 

267 

268 

269def _check_fs_path_is_file( 

270 fs_path: str, 

271 unlink_on_error: Optional["VirtualPath"] = None, 

272) -> None: 

273 had_issue = False 

274 try: 

275 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

276 st = os.lstat(fs_path) 

277 except FileNotFoundError: 

278 had_issue = True 

279 else: 

280 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 had_issue = True 

282 if not had_issue: 282 ↛ 285line 282 didn't jump to line 285 because the condition on line 282 was always true

283 return 

284 

285 if unlink_on_error: 

286 with suppress(FileNotFoundError): 

287 os.unlink(fs_path) 

288 raise TypeError( 

289 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

290 " linked to another file. The entry has been disconnected." 

291 ) 

292 

293 

294class CurrentPluginContextManager: 

295 __slots__ = ("_plugin_names",) 

296 

297 def __init__(self, initial_plugin_name: str) -> None: 

298 self._plugin_names = [initial_plugin_name] 

299 

300 @property 

301 def current_plugin_name(self) -> str: 

302 return self._plugin_names[-1] 

303 

304 @contextlib.contextmanager 

305 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

306 self._plugin_names.append(new_plugin_name) 

307 yield new_plugin_name 

308 self._plugin_names.pop() 

309 

310 

311class VirtualPathBase(VirtualPath, ABC): 

312 __slots__ = () 

313 

314 def _orphan_safe_path(self) -> str: 

315 return self.path 

316 

317 def _rw_check(self) -> None: 

318 if not self.is_read_write: 

319 raise DebputyFSIsROError( 

320 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

321 " Debputy Virtual File system is R/O." 

322 ) 

323 

324 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

325 match, missing = self.attempt_lookup(path) 

326 if missing: 

327 return None 

328 return match 

329 

330 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]: 

331 if self.is_detached: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValueError( 

333 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

334 ) 

335 absolute, must_be_dir, path_parts = _split_path(path) 

336 current = _root(self) if absolute else self 

337 path_parts.reverse() 

338 link_expansions = set() 

339 while path_parts: 

340 dir_part = path_parts.pop() 

341 if dir_part == ".": 

342 continue 

343 if dir_part == "..": 

344 p = current.parent_dir 

345 if p is None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 raise ValueError(f'The path "{path}" escapes the root dir') 

347 current = p 

348 continue 

349 try: 

350 current = current[dir_part] 

351 except KeyError: 

352 path_parts.append(dir_part) 

353 path_parts.reverse() 

354 if must_be_dir: 

355 path_parts.pop() 

356 return current, path_parts 

357 if current.is_symlink and path_parts: 

358 if current.path in link_expansions: 

359 # This is our loop detection for now. It might have some false positives where you 

360 # could safely resolve the same symlink twice. However, given that this use-case is 

361 # basically non-existent in practice for packaging, we just stop here for now. 

362 raise SymlinkLoopError( 

363 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

364 " times. Currently, traversing the same symlink twice is considered" 

365 " a loop by `debputy` even if the path would eventually resolve." 

366 " Consider filing a feature request if you have a benign case that" 

367 " triggers this error." 

368 ) 

369 link_expansions.add(current.path) 

370 link_target = current.readlink() 

371 link_absolute, _, link_path_parts = _split_path(link_target) 

372 if link_absolute: 

373 current = _root(current) 

374 else: 

375 current = assume_not_none(current.parent_dir) 

376 link_path_parts.reverse() 

377 path_parts.extend(link_path_parts) 

378 return current, [] 

379 

380 def mkdirs(self, path: str) -> "VirtualPath": 

381 current: VirtualPath 

382 current, missing_parts = self.attempt_lookup( 

383 f"{path}/" if not path.endswith("/") else path 

384 ) 

385 if not current.is_dir: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 raise ValueError( 

387 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

388 " a directory. However, that path exist AND is a not directory." 

389 ) 

390 for missing_part in missing_parts: 

391 assert missing_part not in (".", "..") 

392 current = current.mkdir(missing_part) 

393 return current 

394 

395 def prune_if_empty_dir(self) -> None: 

396 """Remove this and all (now) empty parent directories 

397 

398 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

399 

400 This operation may cause the path (and any of its parent directories) to become "detached" 

401 and therefore unsafe to use in further operations. 

402 """ 

403 self._rw_check() 

404 

405 if not self.is_dir: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

407 if any(self.iterdir): 

408 return 

409 parent_dir = assume_not_none(self.parent_dir) 

410 

411 # Recursive does not matter; we already know the directory is empty. 

412 self.unlink() 

413 

414 # Note: The root dir must never be deleted. This works because when delegating it to the root 

415 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

416 # inline loop (rather than recursion), be sure to preserve this feature. 

417 parent_dir.prune_if_empty_dir() 

418 

419 def _current_plugin(self) -> str: 

420 if self.is_detached: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 raise TypeError("Cannot resolve the current plugin; path is detached") 

422 current = self 

423 while True: 

424 next_parent = current.parent_dir 

425 if next_parent is None: 

426 break 

427 current = next_parent 

428 assert current is not None 

429 return cast("FSRootDir", current)._current_plugin() 

430 

431 @overload 

432 def open_child( 432 ↛ exitline 432 didn't jump to the function exit

433 self, 

434 name: str, 

435 mode: TextOpenMode = ..., 

436 buffering: int = -1, 

437 ) -> TextIO: ... 

438 

439 @overload 

440 def open_child( 440 ↛ exitline 440 didn't jump to the function exit

441 self, 

442 name: str, 

443 mode: BinaryOpenMode = ..., 

444 buffering: int = -1, 

445 ) -> BinaryIO: ... 

446 

447 @contextlib.contextmanager 

448 def open_child( 

449 self, 

450 name: str, 

451 mode: OpenMode = "r", 

452 buffering: int = -1, 

453 ) -> Union[TextIO, BinaryIO]: 

454 """Open a child path of the current directory in a given mode. Usually used with a context manager 

455 

456 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

457 The following symbols are accepted with the same meaning as Python's open: 

458 * `r` 

459 * `w` 

460 * `x` 

461 * `a` 

462 * `+` 

463 * `b` 

464 * `t` 

465 

466 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

467 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

468 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

469 as-is. 

470 

471 :param name: The name of the child path to open. Must be a basename. 

472 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

473 examples. 

474 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

475 testing, the content may be purely in memory and use a BytesIO/StringIO 

476 (which does not accept that parameter, but then it is buffered in a different way) 

477 :return: The file handle. 

478 """ 

479 existing = self.get(name) 

480 if "r" in mode: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 if existing is None: 

482 raise ValueError( 

483 f"Path {self.path}/{name} does not exist and mode had `r`" 

484 ) 

485 if "+" not in mode: 

486 with existing.open(byte_io="b" in mode, buffering=buffering) as fd: 

487 yield fd 

488 

489 encoding = None if "b" in mode else "utf-8" 

490 

491 if existing: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 if "x" in mode: 

493 raise ValueError( 

494 f"Path {existing.path} already exists and mode had `x`" 

495 ) 

496 with ( 

497 existing.replace_fs_path_content() as fs_path, 

498 open(fs_path, mode, encoding=encoding) as fd, 

499 ): 

500 yield fd 

501 else: 

502 assert "r" not in mode 

503 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

504 with ( 

505 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

506 open(new_path.fs_path, mode, encoding=encoding) as fd, 

507 ): 

508 yield fd 

509 

510 

511class FSPath(VirtualPathBase, ABC): 

512 __slots__ = ( 

513 "_basename", 

514 "_parent_dir", 

515 "_children", 

516 "_path_cache", 

517 "_parent_path_cache", 

518 "_last_known_parent_path", 

519 "_mode", 

520 "_owner", 

521 "_group", 

522 "_mtime", 

523 "_stat_cache", 

524 "_metadata", 

525 "__weakref__", 

526 ) 

527 

528 def __init__( 

529 self, 

530 basename: str, 

531 parent: Optional["FSPath"], 

532 children: Optional[Dict[str, "FSPath"]] = None, 

533 initial_mode: Optional[int] = None, 

534 mtime: Optional[float] = None, 

535 stat_cache: Optional[os.stat_result] = None, 

536 ) -> None: 

537 self._basename = basename 

538 self._path_cache: Optional[str] = None 

539 self._parent_path_cache: Optional[str] = None 

540 self._children = children 

541 self._last_known_parent_path: Optional[str] = None 

542 self._mode = initial_mode 

543 self._mtime = mtime 

544 self._stat_cache = stat_cache 

545 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {} 

546 self._owner = ROOT_DEFINITION 

547 self._group = ROOT_DEFINITION 

548 

549 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

550 # is_orphaned, which assumes self._parent_dir is an attribute. 

551 self._parent_dir: Optional[ReferenceType["FSPath"]] = None 

552 if parent is not None: 

553 self.parent_dir = parent 

554 

555 def __repr__(self) -> str: 

556 return ( 

557 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

558 f" is_file={self.is_file}," 

559 f" is_dir={self.is_dir}," 

560 f" is_symlink={self.is_symlink}," 

561 f" has_fs_path={self.has_fs_path}," 

562 f" children_len={len(self._children) if self._children else 0})" 

563 ) 

564 

565 @property 

566 def name(self) -> str: 

567 return self._basename 

568 

569 @name.setter 

570 def name(self, new_name: str) -> None: 

571 self._rw_check() 

572 if new_name == self._basename: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 return 

574 if self.is_detached: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 self._basename = new_name 

576 return 

577 self._rw_check() 

578 parent = self.parent_dir 

579 # This little parent_dir dance ensures the parent dir detects the rename properly 

580 self.parent_dir = None 

581 self._basename = new_name 

582 self.parent_dir = parent 

583 

584 @property 

585 def iterdir(self) -> Iterable["FSPath"]: 

586 if self._children is not None: 

587 yield from self._children.values() 

588 

589 def all_paths(self) -> Iterable["FSPath"]: 

590 yield self 

591 if not self.is_dir: 

592 return 

593 by_basename = BY_BASENAME 

594 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

595 while stack: 

596 current = stack.pop() 

597 yield current 

598 if current.is_dir and not current.is_detached: 

599 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

600 

601 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]: 

602 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

603 if not self.is_dir: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 yield self, [] 

605 return 

606 by_basename = BY_BASENAME 

607 stack = [self] 

608 while stack: 

609 current = stack.pop() 

610 children = sorted(current.iterdir, key=by_basename) 

611 assert not children or current.is_dir 

612 yield current, children 

613 # Removing the directory counts as discarding the children. 

614 if not current.is_detached: 614 ↛ 608line 614 didn't jump to line 608 because the condition on line 614 was always true

615 stack.extend(reversed(children)) 

616 

617 def _orphan_safe_path(self) -> str: 

618 if not self.is_detached or self._last_known_parent_path is not None: 618 ↛ 620line 618 didn't jump to line 620 because the condition on line 618 was always true

619 return self.path 

620 return f"<orphaned>/{self.name}" 

621 

622 @property 

623 def is_detached(self) -> bool: 

624 parent = self._parent_dir 

625 if parent is None: 

626 return True 

627 resolved_parent = parent() 

628 if resolved_parent is None: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 return True 

630 return resolved_parent.is_detached 

631 

632 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

633 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

634 # behavior to avoid surprises for now. 

635 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

636 # to using it) 

637 __iter__ = None 

638 

639 def __getitem__(self, key) -> "FSPath": 

640 if self._children is None: 

641 raise KeyError( 

642 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

643 ) 

644 if isinstance(key, FSPath): 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 key = key.name 

646 return self._children[key] 

647 

648 def __delitem__(self, key) -> None: 

649 self._rw_check() 

650 children = self._children 

651 if children is None: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true

652 raise KeyError(key) 

653 del children[key] 

654 

655 def get(self, key: str) -> "Optional[FSPath]": 

656 try: 

657 return self[key] 

658 except KeyError: 

659 return None 

660 

661 def __contains__(self, item: object) -> bool: 

662 if isinstance(item, VirtualPath): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 return item.parent_dir is self 

664 if not isinstance(item, str): 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true

665 return False 

666 m = self.get(item) 

667 return m is not None 

668 

669 def _add_child(self, child: "FSPath") -> None: 

670 self._rw_check() 

671 if not self.is_dir: 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

673 if self._children is None: 

674 self._children = {} 

675 

676 conflict_child = self.get(child.name) 

677 if conflict_child is not None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 conflict_child.unlink(recursive=True) 

679 self._children[child.name] = child 

680 

681 @property 

682 def tar_path(self) -> str: 

683 path = self.path 

684 if self.is_dir: 

685 return path + "/" 

686 return path 

687 

688 @property 

689 def path(self) -> str: 

690 parent_path = self.parent_dir_path 

691 if ( 

692 self._parent_path_cache is not None 

693 and self._parent_path_cache == parent_path 

694 ): 

695 return assume_not_none(self._path_cache) 

696 if parent_path is None: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true

697 raise ReferenceError( 

698 f"The path {self.name} is detached! {self.__class__.__name__}" 

699 ) 

700 self._parent_path_cache = parent_path 

701 ret = os.path.join(parent_path, self.name) 

702 self._path_cache = ret 

703 return ret 

704 

705 @property 

706 def parent_dir(self) -> Optional["FSPath"]: 

707 p_ref = self._parent_dir 

708 p = p_ref() if p_ref is not None else None 

709 if p is None: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 raise ReferenceError( 

711 f"The path {self.name} is detached! {self.__class__.__name__}" 

712 ) 

713 return p 

714 

715 @parent_dir.setter 

716 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

717 self._rw_check() 

718 if new_parent is not None: 

719 if not new_parent.is_dir: 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 raise ValueError( 

721 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

722 ) 

723 new_parent._rw_check() 

724 old_parent = None 

725 self._last_known_parent_path = None 

726 if not self.is_detached: 

727 old_parent = self.parent_dir 

728 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

729 del old_parent_children[self.name] 

730 if new_parent is not None: 

731 self._parent_dir = ref(new_parent) 

732 new_parent._add_child(self) 

733 else: 

734 if old_parent is not None and not old_parent.is_detached: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was always true

735 self._last_known_parent_path = old_parent.path 

736 self._parent_dir = None 

737 self._parent_path_cache = None 

738 

739 @property 

740 def parent_dir_path(self) -> Optional[str]: 

741 if self.is_detached: 741 ↛ 742line 741 didn't jump to line 742 because the condition on line 741 was never true

742 return self._last_known_parent_path 

743 return assume_not_none(self.parent_dir).path 

744 

745 def chown( 

746 self, 

747 owner: Optional[StaticFileSystemOwner], 

748 group: Optional[StaticFileSystemGroup], 

749 ) -> None: 

750 """Change the owner/group of this path 

751 

752 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

753 :param group: The desired group definition for this path. If None, then no change of group is performed. 

754 """ 

755 self._rw_check() 

756 

757 if owner is not None: 

758 self._owner = owner.ownership_definition 

759 if group is not None: 

760 self._group = group.ownership_definition 

761 

762 def stat(self) -> os.stat_result: 

763 st = self._stat_cache 

764 if st is None: 

765 st = self._uncached_stat() 

766 self._stat_cache = st 

767 return st 

768 

769 def _uncached_stat(self) -> os.stat_result: 

770 return os.lstat(self.fs_path) 

771 

772 @property 

773 def mode(self) -> int: 

774 current_mode = self._mode 

775 if current_mode is None: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 current_mode = stat.S_IMODE(self.stat().st_mode) 

777 self._mode = current_mode 

778 return current_mode 

779 

780 @mode.setter 

781 def mode(self, new_mode: int) -> None: 

782 self._rw_check() 

783 min_bit = 0o700 if self.is_dir else 0o400 

784 if (new_mode & min_bit) != min_bit: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 omode = oct(new_mode)[2:] 

786 omin = oct(min_bit)[2:] 

787 raise ValueError( 

788 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

789 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

790 " There are no paths that do not need these requirements met and they can cause" 

791 " problems during build or on the final system." 

792 ) 

793 self._mode = new_mode 

794 

795 def _ensure_min_mode(self) -> None: 

796 min_bit = 0o700 if self.is_dir else 0o600 

797 if self.has_fs_path and (self.mode & 0o600) != 0o600: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 try: 

799 fs_path = self.fs_path 

800 except TestPathWithNonExistentFSPathError: 

801 pass 

802 else: 

803 st = os.stat(fs_path) 

804 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

805 _debug_log( 

806 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

807 ) 

808 os.chmod(fs_path, new_fs_mode) 

809 self.mode |= min_bit 

810 

811 @property 

812 def mtime(self) -> float: 

813 mtime = self._mtime 

814 if mtime is None: 

815 mtime = self.stat().st_mtime 

816 self._mtime = mtime 

817 return mtime 

818 

819 @mtime.setter 

820 def mtime(self, new_mtime: float) -> None: 

821 self._rw_check() 

822 self._mtime = new_mtime 

823 

824 @property 

825 def tar_owner_info(self) -> Tuple[str, int, str, int]: 

826 owner = self._owner 

827 group = self._group 

828 return ( 

829 owner.entity_name, 

830 owner.entity_id, 

831 group.entity_name, 

832 group.entity_id, 

833 ) 

834 

835 @property 

836 def _can_replace_inline(self) -> bool: 

837 return False 

838 

839 @contextlib.contextmanager 

840 def add_file( 

841 self, 

842 name: str, 

843 *, 

844 unlink_if_exists: bool = True, 

845 use_fs_path_mode: bool = False, 

846 mode: int = 0o0644, 

847 mtime: Optional[float] = None, 

848 # Special-case parameters that are not exposed in the API 

849 fs_basename_matters: bool = False, 

850 subdir_key: Optional[str] = None, 

851 ) -> Iterator["FSPath"]: 

852 if "/" in name or name in {".", ".."}: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 raise ValueError(f'Invalid file name: "{name}"') 

854 if not self.is_dir: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 raise TypeError( 

856 f"Cannot create {self._orphan_safe_path()}/{name}:" 

857 f" {self._orphan_safe_path()} is not a directory" 

858 ) 

859 self._rw_check() 

860 existing = self.get(name) 

861 if existing is not None: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true

862 if not unlink_if_exists: 

863 raise ValueError( 

864 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

865 f" and exist_ok was False" 

866 ) 

867 existing.unlink(recursive=False) 

868 

869 if fs_basename_matters and subdir_key is None: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true

870 raise ValueError( 

871 "When fs_basename_matters is True, a subdir_key must be provided" 

872 ) 

873 

874 directory = generated_content_dir(subdir_key=subdir_key) 

875 

876 if fs_basename_matters: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 fs_path = os.path.join(directory, name) 

878 with open(fs_path, "xb") as _: 

879 # Ensure that the fs_path exists 

880 pass 

881 child = FSBackedFilePath( 

882 name, 

883 self, 

884 fs_path, 

885 replaceable_inline=True, 

886 mtime=mtime, 

887 ) 

888 yield child 

889 else: 

890 with tempfile.NamedTemporaryFile( 

891 dir=directory, suffix=f"__{name}", delete=False 

892 ) as fd: 

893 fs_path = fd.name 

894 child = FSBackedFilePath( 

895 name, 

896 self, 

897 fs_path, 

898 replaceable_inline=True, 

899 mtime=mtime, 

900 ) 

901 fd.close() 

902 yield child 

903 

904 if use_fs_path_mode: 904 ↛ 906line 904 didn't jump to line 906 because the condition on line 904 was never true

905 # Ensure the caller can see the current mode 

906 os.chmod(fs_path, mode) 

907 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

908 child._reset_caches() 

909 if not use_fs_path_mode: 909 ↛ exitline 909 didn't return from function 'add_file' because the condition on line 909 was always true

910 child.mode = mode 

911 

912 def insert_file_from_fs_path( 

913 self, 

914 name: str, 

915 fs_path: str, 

916 *, 

917 exist_ok: bool = True, 

918 use_fs_path_mode: bool = False, 

919 mode: int = 0o0644, 

920 require_copy_on_write: bool = True, 

921 follow_symlinks: bool = True, 

922 reference_path: Optional[VirtualPath] = None, 

923 ) -> "FSPath": 

924 if "/" in name or name in {".", ".."}: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true

925 raise ValueError(f'Invalid file name: "{name}"') 

926 if not self.is_dir: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 raise TypeError( 

928 f"Cannot create {self._orphan_safe_path()}/{name}:" 

929 f" {self._orphan_safe_path()} is not a directory" 

930 ) 

931 self._rw_check() 

932 if name in self and not exist_ok: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 raise ValueError( 

934 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

935 f" and exist_ok was False" 

936 ) 

937 new_fs_path = fs_path 

938 if follow_symlinks: 

939 if reference_path is not None: 939 ↛ 940line 939 didn't jump to line 940 because the condition on line 939 was never true

940 raise ValueError( 

941 "The reference_path cannot be used with follow_symlinks" 

942 ) 

943 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

944 

945 fmode: Optional[int] = mode 

946 if use_fs_path_mode: 

947 fmode = None 

948 

949 st = None 

950 if reference_path is None: 

951 st = os.lstat(new_fs_path) 

952 if stat.S_ISDIR(st.st_mode): 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true

953 raise ValueError( 

954 f'The provided path "{fs_path}" is a directory. However, this' 

955 " method does not support directories" 

956 ) 

957 

958 if not stat.S_ISREG(st.st_mode): 958 ↛ 959line 958 didn't jump to line 959 because the condition on line 958 was never true

959 if follow_symlinks: 

960 raise ValueError( 

961 f"The resolved fs_path ({new_fs_path}) was not a file." 

962 ) 

963 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

964 return FSBackedFilePath( 

965 name, 

966 self, 

967 new_fs_path, 

968 initial_mode=fmode, 

969 stat_cache=st, 

970 replaceable_inline=not require_copy_on_write, 

971 reference_path=reference_path, 

972 ) 

973 

974 def add_symlink( 

975 self, 

976 link_name: str, 

977 link_target: str, 

978 *, 

979 reference_path: Optional[VirtualPath] = None, 

980 ) -> "FSPath": 

981 if "/" in link_name or link_name in {".", ".."}: 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true

982 raise ValueError( 

983 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

984 ) 

985 if not self.is_dir: 985 ↛ 986line 985 didn't jump to line 986 because the condition on line 985 was never true

986 raise TypeError( 

987 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

988 f" {self._orphan_safe_path()} is not a directory" 

989 ) 

990 self._rw_check() 

991 

992 existing = self.get(link_name) 

993 if existing: 993 ↛ 995line 993 didn't jump to line 995 because the condition on line 993 was never true

994 # Emulate ln -sf with attempts a non-recursive unlink first. 

995 existing.unlink(recursive=False) 

996 

997 return SymlinkVirtualPath( 

998 link_name, 

999 self, 

1000 link_target, 

1001 reference_path=reference_path, 

1002 ) 

1003 

1004 def mkdir( 

1005 self, 

1006 name: str, 

1007 *, 

1008 reference_path: Optional[VirtualPath] = None, 

1009 ) -> "FSPath": 

1010 if "/" in name or name in {".", ".."}: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 raise ValueError( 

1012 f'Invalid file name: "{name}" (it must be a valid basename)' 

1013 ) 

1014 if not self.is_dir: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true

1015 raise TypeError( 

1016 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1017 f" {self._orphan_safe_path()} is not a directory" 

1018 ) 

1019 if reference_path is not None and not reference_path.is_dir: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 raise ValueError( 

1021 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1022 ) 

1023 self._rw_check() 

1024 

1025 existing = self.get(name) 

1026 if existing: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 raise ValueError(f"Path {existing.path} already exist") 

1028 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1029 

1030 def mkdirs(self, path: str) -> "FSPath": 

1031 return cast("FSPath", super().mkdirs(path)) 

1032 

1033 @property 

1034 def is_read_write(self) -> bool: 

1035 """When true, the file system entry may be mutated 

1036 

1037 :return: Whether file system mutations are permitted. 

1038 """ 

1039 if self.is_detached: 

1040 return True 

1041 return assume_not_none(self.parent_dir).is_read_write 

1042 

1043 def unlink(self, *, recursive: bool = False) -> None: 

1044 """Unlink a file or a directory 

1045 

1046 This operation will detach the path from the file system (causing "is_detached" to return True). 

1047 

1048 Note that the root directory cannot be deleted. 

1049 

1050 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1051 as well. When False, an error is raised if the path is a non-empty directory 

1052 """ 

1053 if self.is_detached: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 return 

1055 if not recursive and any(self.iterdir): 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true

1056 raise ValueError( 

1057 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1058 ) 

1059 # The .parent_dir setter does a _rw_check() for us. 

1060 self.parent_dir = None 

1061 

1062 def _reset_caches(self) -> None: 

1063 self._mtime = None 

1064 self._stat_cache = None 

1065 

1066 def metadata( 

1067 self, 

1068 metadata_type: Type[PMT], 

1069 *, 

1070 owning_plugin: Optional[str] = None, 

1071 ) -> PathMetadataReference[PMT]: 

1072 current_plugin = self._current_plugin() 

1073 if owning_plugin is None: 1073 ↛ 1075line 1073 didn't jump to line 1075 because the condition on line 1073 was always true

1074 owning_plugin = current_plugin 

1075 metadata_key = (owning_plugin, metadata_type) 

1076 metadata_value = self._metadata.get(metadata_key) 

1077 if metadata_value is None: 

1078 if self.is_detached: 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true

1079 raise TypeError( 

1080 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1081 ) 

1082 if not self.is_read_write: 

1083 return AlwaysEmptyReadOnlyMetadataReference( 

1084 owning_plugin, 

1085 current_plugin, 

1086 metadata_type, 

1087 ) 

1088 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1089 self._metadata[metadata_key] = metadata_value 

1090 return PathMetadataReferenceImplementation( 

1091 self, 

1092 current_plugin, 

1093 metadata_value, 

1094 ) 

1095 

1096 @contextlib.contextmanager 

1097 def replace_fs_path_content( 

1098 self, 

1099 *, 

1100 use_fs_path_mode: bool = False, 

1101 ) -> Iterator[str]: 

1102 if not self.is_file: 1102 ↛ 1103line 1102 didn't jump to line 1103 because the condition on line 1102 was never true

1103 raise TypeError( 

1104 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1105 ) 

1106 self._rw_check() 

1107 fs_path = self.fs_path 

1108 if not self._can_replace_inline: 1108 ↛ 1120line 1108 didn't jump to line 1120 because the condition on line 1108 was always true

1109 fs_path = self.fs_path 

1110 directory = generated_content_dir() 

1111 with tempfile.NamedTemporaryFile( 

1112 dir=directory, suffix=f"__{self.name}", delete=False 

1113 ) as new_path_fd: 

1114 new_path_fd.close() 

1115 _cp_a(fs_path, new_path_fd.name) 

1116 fs_path = new_path_fd.name 

1117 self._replaced_path(fs_path) 

1118 assert self.fs_path == fs_path 

1119 

1120 current_mtime = self._mtime 

1121 if current_mtime is not None: 

1122 os.utime(fs_path, (current_mtime, current_mtime)) 

1123 

1124 current_mode = self.mode 

1125 yield fs_path 

1126 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1127 if not use_fs_path_mode: 1127 ↛ 1129line 1127 didn't jump to line 1129 because the condition on line 1127 was always true

1128 os.chmod(fs_path, current_mode) 

1129 self._reset_caches() 

1130 

1131 def _replaced_path(self, new_fs_path: str) -> None: 

1132 raise NotImplementedError 

1133 

1134 

1135class VirtualFSPathBase(FSPath, ABC): 

1136 __slots__ = () 

1137 

1138 def __init__( 

1139 self, 

1140 basename: str, 

1141 parent: Optional["FSPath"], 

1142 children: Optional[Dict[str, "FSPath"]] = None, 

1143 initial_mode: Optional[int] = None, 

1144 mtime: Optional[float] = None, 

1145 stat_cache: Optional[os.stat_result] = None, 

1146 ) -> None: 

1147 super().__init__( 

1148 basename, 

1149 parent, 

1150 children, 

1151 initial_mode=initial_mode, 

1152 mtime=mtime, 

1153 stat_cache=stat_cache, 

1154 ) 

1155 

1156 @property 

1157 def mtime(self) -> float: 

1158 mtime = self._mtime 

1159 if mtime is None: 

1160 mtime = time.time() 

1161 self._mtime = mtime 

1162 return mtime 

1163 

1164 @property 

1165 def has_fs_path(self) -> bool: 

1166 return False 

1167 

1168 def stat(self) -> os.stat_result: 

1169 if not self.has_fs_path: 

1170 raise PureVirtualPathError( 

1171 "stat() is only applicable to paths backed by the file system. The path" 

1172 f" {self._orphan_safe_path()!r} is purely virtual" 

1173 ) 

1174 return super().stat() 

1175 

1176 @property 

1177 def fs_path(self) -> str: 

1178 if not self.has_fs_path: 

1179 raise PureVirtualPathError( 

1180 "fs_path is only applicable to paths backed by the file system. The path" 

1181 f" {self._orphan_safe_path()!r} is purely virtual" 

1182 ) 

1183 return self.fs_path 

1184 

1185 

1186class FSRootDir(FSPath): 

1187 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1188 

1189 def __init__(self, fs_path: Optional[str] = None) -> None: 

1190 self._fs_path = fs_path 

1191 self._fs_read_write = True 

1192 super().__init__( 

1193 ".", 

1194 None, 

1195 children={}, 

1196 initial_mode=0o755, 

1197 ) 

1198 self._plugin_context = CurrentPluginContextManager("debputy") 

1199 

1200 @property 

1201 def is_detached(self) -> bool: 

1202 return False 

1203 

1204 def _orphan_safe_path(self) -> str: 

1205 return self.name 

1206 

1207 @property 

1208 def path(self) -> str: 

1209 return self.name 

1210 

1211 @property 

1212 def parent_dir(self) -> Optional["FSPath"]: 

1213 return None 

1214 

1215 @parent_dir.setter 

1216 def parent_dir(self, new_parent: Optional[FSPath]) -> None: 

1217 if new_parent is not None: 

1218 raise ValueError("The root directory cannot become a non-root directory") 

1219 

1220 @property 

1221 def parent_dir_path(self) -> Optional[str]: 

1222 return None 

1223 

1224 @property 

1225 def is_dir(self) -> bool: 

1226 return True 

1227 

1228 @property 

1229 def is_file(self) -> bool: 

1230 return False 

1231 

1232 @property 

1233 def is_symlink(self) -> bool: 

1234 return False 

1235 

1236 def readlink(self) -> str: 

1237 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1238 

1239 @property 

1240 def has_fs_path(self) -> bool: 

1241 return self._fs_path is not None 

1242 

1243 def stat(self) -> os.stat_result: 

1244 if not self.has_fs_path: 

1245 raise PureVirtualPathError( 

1246 "stat() is only applicable to paths backed by the file system. The path" 

1247 f" {self._orphan_safe_path()!r} is purely virtual" 

1248 ) 

1249 return os.stat(self.fs_path) 

1250 

1251 @property 

1252 def fs_path(self) -> str: 

1253 if not self.has_fs_path: 1253 ↛ 1254line 1253 didn't jump to line 1254 because the condition on line 1253 was never true

1254 raise PureVirtualPathError( 

1255 "fs_path is only applicable to paths backed by the file system. The path" 

1256 f" {self._orphan_safe_path()!r} is purely virtual" 

1257 ) 

1258 return assume_not_none(self._fs_path) 

1259 

1260 @property 

1261 def is_read_write(self) -> bool: 

1262 return self._fs_read_write 

1263 

1264 @is_read_write.setter 

1265 def is_read_write(self, new_value: bool) -> None: 

1266 self._fs_read_write = new_value 

1267 

1268 def prune_if_empty_dir(self) -> None: 

1269 # No-op for the root directory. There is never a case where you want to delete this directory 

1270 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1271 return 

1272 

1273 def unlink(self, *, recursive: bool = False) -> None: 

1274 # There is never a case where you want to delete this directory (and even if you could, 

1275 # debputy will need it for technical reasons, so the root dir stays) 

1276 raise TypeError("Cannot delete the root directory") 

1277 

1278 def _current_plugin(self) -> str: 

1279 return self._plugin_context.current_plugin_name 

1280 

1281 @contextlib.contextmanager 

1282 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1283 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1284 yield r 

1285 

1286 

1287class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1288 __slots__ = ("_reference_path",) 

1289 

1290 def __init__( 

1291 self, 

1292 basename: str, 

1293 parent: FSPath, 

1294 *, 

1295 default_mode: int, 

1296 reference_path: Optional[VirtualPath] = None, 

1297 ) -> None: 

1298 super().__init__( 

1299 basename, 

1300 parent=parent, 

1301 initial_mode=reference_path.mode if reference_path else default_mode, 

1302 ) 

1303 self._reference_path = reference_path 

1304 

1305 @property 

1306 def has_fs_path(self) -> bool: 

1307 ref_path = self._reference_path 

1308 return ref_path is not None and ref_path.has_fs_path 

1309 

1310 @property 

1311 def mtime(self) -> float: 

1312 mtime = self._mtime 

1313 if mtime is None: 1313 ↛ 1320line 1313 didn't jump to line 1320 because the condition on line 1313 was always true

1314 ref_path = self._reference_path 

1315 if ref_path: 1315 ↛ 1318line 1315 didn't jump to line 1318 because the condition on line 1315 was always true

1316 mtime = ref_path.mtime 

1317 else: 

1318 mtime = super().mtime 

1319 self._mtime = mtime 

1320 return mtime 

1321 

1322 @mtime.setter 

1323 def mtime(self, new_mtime: float) -> None: 

1324 self._rw_check() 

1325 self._mtime = new_mtime 

1326 

1327 @property 

1328 def fs_path(self) -> str: 

1329 ref_path = self._reference_path 

1330 if ref_path is not None and ( 1330 ↛ 1334line 1330 didn't jump to line 1334 because the condition on line 1330 was always true

1331 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1332 ): 

1333 return ref_path.fs_path 

1334 return super().fs_path 

1335 

1336 def stat(self) -> os.stat_result: 

1337 ref_path = self._reference_path 

1338 if ref_path is not None and ( 

1339 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1340 ): 

1341 return ref_path.stat() 

1342 return super().stat() 

1343 

1344 def open( 

1345 self, 

1346 *, 

1347 byte_io: bool = False, 

1348 buffering: int = -1, 

1349 ) -> Union[TextIO, BinaryIO]: 

1350 reference_path = self._reference_path 

1351 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1352 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1353 return super().open(byte_io=byte_io, buffering=buffering) 

1354 

1355 

1356class VirtualDirectoryFSPath(VirtualPathWithReference): 

1357 __slots__ = ("_reference_path",) 

1358 

1359 def __init__( 

1360 self, 

1361 basename: str, 

1362 parent: FSPath, 

1363 *, 

1364 reference_path: Optional[VirtualPath] = None, 

1365 ) -> None: 

1366 super().__init__( 

1367 basename, 

1368 parent, 

1369 reference_path=reference_path, 

1370 default_mode=0o755, 

1371 ) 

1372 self._reference_path = reference_path 

1373 assert reference_path is None or reference_path.is_dir 

1374 self._ensure_min_mode() 

1375 

1376 @property 

1377 def is_dir(self) -> bool: 

1378 return True 

1379 

1380 @property 

1381 def is_file(self) -> bool: 

1382 return False 

1383 

1384 @property 

1385 def is_symlink(self) -> bool: 

1386 return False 

1387 

1388 def readlink(self) -> str: 

1389 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1390 

1391 

1392class SymlinkVirtualPath(VirtualPathWithReference): 

1393 __slots__ = ("_link_target",) 

1394 

1395 def __init__( 

1396 self, 

1397 basename: str, 

1398 parent_dir: FSPath, 

1399 link_target: str, 

1400 *, 

1401 reference_path: Optional[VirtualPath] = None, 

1402 ) -> None: 

1403 super().__init__( 

1404 basename, 

1405 parent=parent_dir, 

1406 default_mode=_SYMLINK_MODE, 

1407 reference_path=reference_path, 

1408 ) 

1409 self._link_target = link_target 

1410 

1411 @property 

1412 def is_dir(self) -> bool: 

1413 return False 

1414 

1415 @property 

1416 def is_file(self) -> bool: 

1417 return False 

1418 

1419 @property 

1420 def is_symlink(self) -> bool: 

1421 return True 

1422 

1423 def readlink(self) -> str: 

1424 return self._link_target 

1425 

1426 

1427class FSBackedFilePath(VirtualPathWithReference): 

1428 __slots__ = ("_fs_path", "_replaceable_inline") 

1429 

1430 def __init__( 

1431 self, 

1432 basename: str, 

1433 parent_dir: FSPath, 

1434 fs_path: str, 

1435 *, 

1436 replaceable_inline: bool = False, 

1437 initial_mode: Optional[int] = None, 

1438 mtime: Optional[float] = None, 

1439 stat_cache: Optional[os.stat_result] = None, 

1440 reference_path: Optional[VirtualPath] = None, 

1441 ) -> None: 

1442 super().__init__( 

1443 basename, 

1444 parent_dir, 

1445 default_mode=0o644, 

1446 reference_path=reference_path, 

1447 ) 

1448 self._fs_path = fs_path 

1449 self._replaceable_inline = replaceable_inline 

1450 if initial_mode is not None: 

1451 self.mode = initial_mode 

1452 if mtime is not None: 

1453 self._mtime = mtime 

1454 self._stat_cache = stat_cache 

1455 assert ( 

1456 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1457 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1458 self._ensure_min_mode() 

1459 

1460 @property 

1461 def is_dir(self) -> bool: 

1462 return False 

1463 

1464 @property 

1465 def is_file(self) -> bool: 

1466 return True 

1467 

1468 @property 

1469 def is_symlink(self) -> bool: 

1470 return False 

1471 

1472 def readlink(self) -> str: 

1473 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1474 

1475 @property 

1476 def has_fs_path(self) -> bool: 

1477 return True 

1478 

1479 @property 

1480 def fs_path(self) -> str: 

1481 return self._fs_path 

1482 

1483 @property 

1484 def _can_replace_inline(self) -> bool: 

1485 return self._replaceable_inline 

1486 

1487 def _replaced_path(self, new_fs_path: str) -> None: 

1488 self._fs_path = new_fs_path 

1489 self._reference_path = None 

1490 self._replaceable_inline = True 

1491 

1492 

1493_SYMLINK_MODE = 0o777 

1494 

1495 

1496class VirtualTestPath(FSPath): 

1497 __slots__ = ( 

1498 "_path_type", 

1499 "_has_fs_path", 

1500 "_fs_path", 

1501 "_link_target", 

1502 "_content", 

1503 "_materialized_content", 

1504 ) 

1505 

1506 def __init__( 

1507 self, 

1508 basename: str, 

1509 parent_dir: Optional[FSPath], 

1510 mode: Optional[int] = None, 

1511 mtime: Optional[float] = None, 

1512 is_dir: bool = False, 

1513 has_fs_path: Optional[bool] = False, 

1514 fs_path: Optional[str] = None, 

1515 link_target: Optional[str] = None, 

1516 content: Optional[str] = None, 

1517 materialized_content: Optional[str] = None, 

1518 ) -> None: 

1519 if is_dir: 

1520 self._path_type = PathType.DIRECTORY 

1521 elif link_target is not None: 

1522 self._path_type = PathType.SYMLINK 

1523 if mode is not None and mode != _SYMLINK_MODE: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true

1524 raise ValueError( 

1525 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1526 ) 

1527 assert mode is None or mode == _SYMLINK_MODE 

1528 else: 

1529 self._path_type = PathType.FILE 

1530 

1531 if mode is not None: 

1532 initial_mode = mode 

1533 else: 

1534 initial_mode = 0o755 if is_dir else 0o644 

1535 

1536 self._link_target = link_target 

1537 if has_fs_path is None: 

1538 has_fs_path = bool(fs_path) 

1539 self._has_fs_path = has_fs_path 

1540 self._fs_path = fs_path 

1541 self._materialized_content = materialized_content 

1542 super().__init__( 

1543 basename, 

1544 parent=parent_dir, 

1545 initial_mode=initial_mode, 

1546 mtime=mtime, 

1547 ) 

1548 self._content = content 

1549 

1550 @property 

1551 def is_dir(self) -> bool: 

1552 return self._path_type == PathType.DIRECTORY 

1553 

1554 @property 

1555 def is_file(self) -> bool: 

1556 return self._path_type == PathType.FILE 

1557 

1558 @property 

1559 def is_symlink(self) -> bool: 

1560 return self._path_type == PathType.SYMLINK 

1561 

1562 def readlink(self) -> str: 

1563 if not self.is_symlink: 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true

1564 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1565 link_target = self._link_target 

1566 assert link_target is not None 

1567 return link_target 

1568 

1569 @property 

1570 def mtime(self) -> float: 

1571 if self._mtime is None: 

1572 self._mtime = time.time() 

1573 return self._mtime 

1574 

1575 @mtime.setter 

1576 def mtime(self, new_mtime: float) -> None: 

1577 self._rw_check() 

1578 self._mtime = new_mtime 

1579 

1580 @property 

1581 def has_fs_path(self) -> bool: 

1582 return self._has_fs_path 

1583 

1584 def stat(self) -> os.stat_result: 

1585 if self.has_fs_path: 

1586 path = self.fs_path 

1587 if path is None: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true

1588 raise PureVirtualPathError( 

1589 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1590 " cannot provide!" 

1591 ) 

1592 try: 

1593 return os.stat(path) 

1594 except FileNotFoundError as e: 

1595 raise PureVirtualPathError( 

1596 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1597 " cannot provide! (An fs_path was provided, but it did not exist)" 

1598 ) from e 

1599 

1600 raise PureVirtualPathError( 

1601 "stat() is only applicable to paths backed by the file system. The path" 

1602 f" {self._orphan_safe_path()!r} is purely virtual" 

1603 ) 

1604 

1605 @property 

1606 def size(self) -> int: 

1607 if self._content is not None: 

1608 return len(self._content.encode("utf-8")) 

1609 if self.is_symlink: 

1610 return len(self.readlink()) 

1611 if not self.has_fs_path or self.fs_path is None: 

1612 return 0 

1613 return self.stat().st_size 

1614 

1615 @property 

1616 def fs_path(self) -> str: 

1617 if self.has_fs_path: 

1618 if self._fs_path is None and self._materialized_content is not None: 

1619 with tempfile.NamedTemporaryFile( 

1620 mode="w+t", 

1621 encoding="utf-8", 

1622 suffix=f"__{self.name}", 

1623 delete=False, 

1624 ) as fd: 

1625 filepath = fd.name 

1626 fd.write(self._materialized_content) 

1627 self._fs_path = filepath 

1628 atexit.register(lambda: os.unlink(filepath)) 1628 ↛ exitline 1628 didn't run the lambda on line 1628

1629 

1630 path = self._fs_path 

1631 if path is None: 1631 ↛ 1632line 1631 didn't jump to line 1632 because the condition on line 1631 was never true

1632 raise PureVirtualPathError( 

1633 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1634 " mock path cannot provide!" 

1635 ) 

1636 return path 

1637 raise PureVirtualPathError( 

1638 "fs_path is only applicable to paths backed by the file system. The path" 

1639 f" {self._orphan_safe_path()!r} is purely virtual" 

1640 ) 

1641 

1642 def replace_fs_path_content( 

1643 self, 

1644 *, 

1645 use_fs_path_mode: bool = False, 

1646 ) -> ContextManager[str]: 

1647 if self._content is not None: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true

1648 raise TypeError( 

1649 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1650 " created with `content` but for this method to work, the path should have been" 

1651 " created with `materialized_content`" 

1652 ) 

1653 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1654 

1655 @contextlib.contextmanager 

1656 def open_child( 

1657 self, 

1658 name: str, 

1659 mode: OpenMode = "r", 

1660 buffering: int = -1, 

1661 ) -> Union[TextIO, BinaryIO]: 

1662 existing = self.get(name) 

1663 if existing or "r" in mode: 

1664 with super().open_child(name, mode, buffering=buffering) as fd: 

1665 yield fd 

1666 return 

1667 if "b" in mode: 

1668 fd = io.BytesIO(b"") 

1669 yield fd 

1670 content = fd.getvalue().decode("utf-8") 

1671 else: 

1672 fd = io.StringIO("") 

1673 yield fd 

1674 content = fd.getvalue() 

1675 VirtualTestPath( 

1676 name, 

1677 self, 

1678 mode=0o644, 

1679 content=content, 

1680 has_fs_path=True, 

1681 ) 

1682 

1683 def open( 

1684 self, 

1685 *, 

1686 byte_io: bool = False, 

1687 buffering: int = -1, 

1688 ) -> Union[TextIO, BinaryIO]: 

1689 if self._content is None: 

1690 try: 

1691 return super().open(byte_io=byte_io, buffering=buffering) 

1692 except FileNotFoundError as e: 

1693 raise TestPathWithNonExistentFSPathError( 

1694 "The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1695 " exist. This exception can only occur in the testsuite. Either have the" 

1696 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1697 " if that is too painful in general, have the code accept this error as a " 

1698 " test only-case and provide a default." 

1699 ) from e 

1700 

1701 if byte_io: 

1702 return io.BytesIO(self._content.encode("utf-8")) 

1703 return io.StringIO(self._content) 

1704 

1705 def _replaced_path(self, new_fs_path: str) -> None: 

1706 self._fs_path = new_fs_path 

1707 

1708 

1709class FSOverlayBase(VirtualPathBase, Generic[FSP]): 

1710 __slots__ = ( 

1711 "_path", 

1712 "_fs_path", 

1713 "_parent", 

1714 "__weakref__", 

1715 ) 

1716 

1717 def __init__( 

1718 self, 

1719 path: str, 

1720 fs_path: str, 

1721 parent: Optional[FSP], 

1722 ) -> None: 

1723 self._path: str = path 

1724 prefix = "/" if fs_path.startswith("/") else "" 

1725 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1726 self._parent: Optional[ReferenceType[FSP]] = ( 

1727 ref(parent) if parent is not None else None 

1728 ) 

1729 

1730 @property 

1731 def name(self) -> str: 

1732 return os.path.basename(self._path) 

1733 

1734 @property 

1735 def path(self) -> str: 

1736 return self._path 

1737 

1738 @property 

1739 def parent_dir(self) -> Optional["FSP"]: 

1740 parent = self._parent 

1741 if parent is None: 

1742 return None 

1743 resolved = parent() 

1744 if resolved is None: 

1745 raise RuntimeError("Parent was garbage collected!") 

1746 return resolved 

1747 

1748 @property 

1749 def fs_path(self) -> str: 

1750 return self._fs_path 

1751 

1752 def stat(self) -> os.stat_result: 

1753 return os.lstat(self.fs_path) 

1754 

1755 @property 

1756 def is_dir(self) -> bool: 

1757 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1758 try: 

1759 return stat.S_ISDIR(self.stat().st_mode) 

1760 except FileNotFoundError: 1760 ↛ 1762line 1760 didn't jump to line 1762

1761 return False 

1762 except NotImplementedError: 

1763 print(self.__class__) 

1764 

1765 @property 

1766 def is_file(self) -> bool: 

1767 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1768 try: 

1769 return stat.S_ISREG(self.stat().st_mode) 

1770 except FileNotFoundError: 

1771 return False 

1772 

1773 @property 

1774 def is_symlink(self) -> bool: 

1775 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1776 try: 

1777 return stat.S_ISLNK(self.stat().st_mode) 

1778 except FileNotFoundError: 

1779 return False 

1780 

1781 @property 

1782 def has_fs_path(self) -> bool: 

1783 return True 

1784 

1785 def open( 

1786 self, 

1787 *, 

1788 byte_io: bool = False, 

1789 buffering: int = -1, 

1790 ) -> Union[TextIO, BinaryIO]: 

1791 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1792 # case. 

1793 if not self.is_file and not self.is_symlink: 

1794 raise TypeError( 

1795 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1796 ) 

1797 

1798 if byte_io: 

1799 return open(self.fs_path, "rb", buffering=buffering) 

1800 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering) 

1801 

1802 def metadata( 

1803 self, 

1804 metadata_type: Type[PMT], 

1805 *, 

1806 owning_plugin: Optional[str] = None, 

1807 ) -> PathMetadataReference[PMT]: 

1808 current_plugin = self._current_plugin() 

1809 if owning_plugin is None: 

1810 owning_plugin = current_plugin 

1811 return AlwaysEmptyReadOnlyMetadataReference( 

1812 owning_plugin, 

1813 current_plugin, 

1814 metadata_type, 

1815 ) 

1816 

1817 def all_paths(self) -> Iterable["FSControlPath"]: 

1818 yield self 

1819 if not self.is_dir: 

1820 return 

1821 stack = list(self.iterdir) 

1822 stack.reverse() 

1823 while stack: 

1824 current = stack.pop() 

1825 yield current 

1826 if current.is_dir: 

1827 stack.extend(reversed(list(current.iterdir))) 

1828 

1829 def _resolve_children( 

1830 self, new_child: Callable[[str, str, FSP], FSC] 

1831 ) -> Mapping[str, FSC]: 

1832 if not self.is_dir: 

1833 return {} 

1834 dir_path = self.path 

1835 dir_fs_path = self.fs_path 

1836 children = {} 

1837 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1838 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1839 child_fs_path = ( 

1840 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1841 ) 

1842 children[name] = new_child( 

1843 child_path, 

1844 child_fs_path, 

1845 self, 

1846 ) 

1847 return children 

1848 

1849 

1850class FSROOverlay(FSOverlayBase["FSROOverlay"]): 

1851 __slots__ = ( 

1852 "_stat_cache", 

1853 "_readlink_cache", 

1854 "_children", 

1855 "_stat_failed_cache", 

1856 ) 

1857 

1858 def __init__( 

1859 self, 

1860 path: str, 

1861 fs_path: str, 

1862 parent: Optional["FSROOverlay"], 

1863 ) -> None: 

1864 super().__init__(path, fs_path, parent=parent) 

1865 self._stat_cache: Optional[os.stat_result] = None 

1866 self._readlink_cache: Optional[str] = None 

1867 self._stat_failed_cache = False 

1868 self._children: Optional[Mapping[str, FSROOverlay]] = None 

1869 

1870 @classmethod 

1871 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": 

1872 return FSROOverlay(path, fs_path, None) 

1873 

1874 @property 

1875 def iterdir(self) -> Iterable["FSROOverlay"]: 

1876 if not self.is_dir: 

1877 return 

1878 if self._children is None: 

1879 self._ensure_children_are_resolved() 

1880 yield from assume_not_none(self._children).values() 

1881 

1882 def lookup(self, path: str) -> Optional["FSROOverlay"]: 

1883 if not self.is_dir: 

1884 return None 

1885 if self._children is None: 

1886 self._ensure_children_are_resolved() 

1887 

1888 absolute, _, path_parts = _split_path(path) 

1889 current = cast("FSROOverlay", _root(self)) if absolute else self 

1890 for no, dir_part in enumerate(path_parts): 

1891 if dir_part == ".": 

1892 continue 

1893 if dir_part == "..": 

1894 p = current.parent_dir 

1895 if p is None: 

1896 raise ValueError(f'The path "{path}" escapes the root dir') 

1897 current = cast("FSROOverlay", p) 

1898 continue 

1899 try: 

1900 current = current[dir_part] 

1901 except KeyError: 

1902 return None 

1903 return current 

1904 

1905 def _ensure_children_are_resolved(self) -> None: 

1906 if not self.is_dir or self._children: 

1907 return 

1908 self._children = self._resolve_children( 

1909 lambda n, fsp, p: FSROOverlay(n, fsp, p) 

1910 ) 

1911 

1912 @property 

1913 def is_detached(self) -> bool: 

1914 return False 

1915 

1916 def __getitem__(self, key) -> "VirtualPath": 

1917 if not self.is_dir: 1917 ↛ 1919line 1917 didn't jump to line 1919 because the condition on line 1917 was always true

1918 raise KeyError(key) 

1919 if self._children is None: 

1920 self._ensure_children_are_resolved() 

1921 if isinstance(key, FSPath): 

1922 key = key.name 

1923 return self._children[key] 

1924 

1925 def __delitem__(self, key) -> None: 

1926 self._error_ro_fs() 

1927 

1928 @property 

1929 def is_read_write(self) -> bool: 

1930 return False 

1931 

1932 def _rw_check(self) -> None: 

1933 self._error_ro_fs() 

1934 

1935 def _error_ro_fs(self) -> NoReturn: 

1936 raise DebputyFSIsROError( 

1937 f'Attempt to write to "{self.path}" failed:' 

1938 " Debputy Virtual File system is R/O." 

1939 ) 

1940 

1941 def stat(self) -> os.stat_result: 

1942 if self._stat_failed_cache: 1942 ↛ 1943line 1942 didn't jump to line 1943 because the condition on line 1942 was never true

1943 raise FileNotFoundError( 

1944 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1945 ) 

1946 

1947 if self._stat_cache is None: 1947 ↛ 1953line 1947 didn't jump to line 1953 because the condition on line 1947 was always true

1948 try: 

1949 self._stat_cache = os.lstat(self.fs_path) 

1950 except FileNotFoundError: 

1951 self._stat_failed_cache = True 

1952 raise 

1953 return self._stat_cache 

1954 

1955 @property 

1956 def mode(self) -> int: 

1957 return stat.S_IMODE(self.stat().st_mode) 

1958 

1959 @mode.setter 

1960 def mode(self, _unused: int) -> None: 

1961 self._error_ro_fs() 

1962 

1963 @property 

1964 def mtime(self) -> float: 

1965 return self.stat().st_mtime 

1966 

1967 @mtime.setter 

1968 def mtime(self, new_mtime: float) -> None: 

1969 self._error_ro_fs() 

1970 

1971 def readlink(self) -> str: 

1972 if not self.is_symlink: 

1973 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1974 if self._readlink_cache is None: 

1975 self._readlink_cache = os.readlink(self.fs_path) 

1976 return self._readlink_cache 

1977 

1978 def chown( 

1979 self, 

1980 owner: Optional[StaticFileSystemOwner], 

1981 group: Optional[StaticFileSystemGroup], 

1982 ) -> None: 

1983 self._error_ro_fs() 

1984 

1985 def mkdir(self, name: str) -> "VirtualPath": 

1986 self._error_ro_fs() 

1987 

1988 def add_file( 

1989 self, 

1990 name: str, 

1991 *, 

1992 unlink_if_exists: bool = True, 

1993 use_fs_path_mode: bool = False, 

1994 mode: int = 0o0644, 

1995 mtime: Optional[float] = None, 

1996 ) -> ContextManager["VirtualPath"]: 

1997 self._error_ro_fs() 

1998 

1999 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2000 self._error_ro_fs() 

2001 

2002 def unlink(self, *, recursive: bool = False) -> None: 

2003 self._error_ro_fs() 

2004 

2005 

2006class FSROOverlayRootDir(FSROOverlay): 

2007 __slots__ = ("_plugin_context",) 

2008 

2009 def __init__(self, path: str, fs_path: str) -> None: 

2010 super().__init__(path, fs_path, None) 

2011 self._plugin_context = CurrentPluginContextManager("debputy") 

2012 

2013 def _current_plugin(self) -> str: 

2014 return self._plugin_context.current_plugin_name 

2015 

2016 @contextlib.contextmanager 

2017 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2018 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2019 yield r 

2020 

2021 

2022class FSControlPath(FSOverlayBase["FSControlPath"]): 

2023 

2024 @property 

2025 def iterdir(self) -> Iterable["FSControlPath"]: 

2026 if not self.is_dir: 

2027 return 

2028 yield from self._resolve_children( 

2029 lambda n, fsp, p: FSControlPath(n, fsp, p) 

2030 ).values() 

2031 

2032 def lookup(self, path: str) -> Optional["FSControlPath"]: 

2033 if not self.is_dir: 

2034 return None 

2035 

2036 absolute, _, path_parts = _split_path(path) 

2037 current = cast("FSControlPath", _root(self)) if absolute else self 

2038 for no, dir_part in enumerate(path_parts): 

2039 if dir_part == ".": 

2040 continue 

2041 if dir_part == "..": 

2042 p = current.parent_dir 

2043 if p is None: 

2044 raise ValueError(f'The path "{path}" escapes the root dir') 

2045 current = cast("FSControlPath", p) 

2046 continue 

2047 try: 

2048 current = current[dir_part] 

2049 except KeyError: 

2050 return None 

2051 return current 

2052 

2053 @property 

2054 def is_detached(self) -> bool: 

2055 try: 

2056 self.stat() 

2057 except FileNotFoundError: 

2058 return True 

2059 else: 

2060 return False 

2061 

2062 def __getitem__(self, key) -> "VirtualPath": 

2063 if not self.is_dir: 

2064 raise KeyError(key) 

2065 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p)) 

2066 if isinstance(key, FSPath): 

2067 key = key.name 

2068 return children[key] 

2069 

2070 def __delitem__(self, key) -> None: 

2071 self[key].unlink() 

2072 

2073 @property 

2074 def is_read_write(self) -> bool: 

2075 return True 

2076 

2077 @property 

2078 def mode(self) -> int: 

2079 return stat.S_IMODE(self.stat().st_mode) 

2080 

2081 @mode.setter 

2082 def mode(self, new_mode: int) -> None: 

2083 os.chmod(self.fs_path, new_mode) 

2084 

2085 @property 

2086 def mtime(self) -> float: 

2087 return self.stat().st_mtime 

2088 

2089 @mtime.setter 

2090 def mtime(self, new_mtime: float) -> None: 

2091 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2092 

2093 def readlink(self) -> str: 

2094 if not self.is_symlink: 

2095 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2096 assert False 

2097 

2098 def chown( 

2099 self, 

2100 owner: Optional[StaticFileSystemOwner], 

2101 group: Optional[StaticFileSystemGroup], 

2102 ) -> None: 

2103 raise ValueError( 

2104 "No need to chown paths in the control.tar: They are always root:root" 

2105 ) 

2106 

2107 def mkdir(self, name: str) -> "VirtualPath": 

2108 raise TypeError("The control.tar never contains subdirectories.") 

2109 

2110 @contextlib.contextmanager 

2111 def add_file( 

2112 self, 

2113 name: str, 

2114 *, 

2115 unlink_if_exists: bool = True, 

2116 use_fs_path_mode: bool = False, 

2117 mode: int = 0o0644, 

2118 mtime: Optional[float] = None, 

2119 ) -> ContextManager["VirtualPath"]: 

2120 if "/" in name or name in {".", ".."}: 

2121 raise ValueError(f'Invalid file name: "{name}"') 

2122 if not self.is_dir: 

2123 raise TypeError( 

2124 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2125 f" {self._orphan_safe_path()} is not a directory" 

2126 ) 

2127 self._rw_check() 

2128 existing = self.get(name) 

2129 if existing is not None: 

2130 if not unlink_if_exists: 

2131 raise ValueError( 

2132 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2133 f" and exist_ok was False" 

2134 ) 

2135 assert existing.is_file 

2136 

2137 fs_path = os.path.join(self.fs_path, name) 

2138 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2139 with open(fs_path, "wb") as fd: 

2140 # Ensure that the fs_path exists and default mode is reasonable 

2141 os.chmod(fd.fileno(), mode) 

2142 child = FSControlPath( 

2143 name, 

2144 fs_path, 

2145 self, 

2146 ) 

2147 yield child 

2148 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2149 child.mode = mode 

2150 

2151 @contextlib.contextmanager 

2152 def replace_fs_path_content( 

2153 self, 

2154 *, 

2155 use_fs_path_mode: bool = False, 

2156 ) -> ContextManager[str]: 

2157 if not self.is_file: 

2158 raise TypeError( 

2159 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2160 ) 

2161 restore_mode = self.mode if use_fs_path_mode else None 

2162 yield self.fs_path 

2163 _check_fs_path_is_file(self.fs_path, self) 

2164 if restore_mode is not None: 

2165 self.mode = restore_mode 

2166 

2167 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2168 raise TypeError("The control.tar never contains symlinks.") 

2169 

2170 def unlink(self, *, recursive: bool = False) -> None: 

2171 if self._parent is None: 

2172 return 

2173 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2174 # matters and that `os.unlink` will be sufficient. 

2175 assert self.is_file 

2176 os.unlink(self.fs_path) 

2177 

2178 

2179class FSControlRootDir(FSControlPath): 

2180 

2181 @classmethod 

2182 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2183 return FSControlRootDir(".", fs_path, None) 

2184 

2185 

2186def as_path_def(pd: Union[str, PathDef]) -> PathDef: 

2187 return PathDef(pd) if isinstance(pd, str) else pd 

2188 

2189 

2190def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]: 

2191 yield from (as_path_def(p) for p in paths) 

2192 

2193 

2194def build_virtual_fs( 

2195 paths: Iterable[Union[str, PathDef]], 

2196 read_write_fs: bool = False, 

2197) -> "FSPath": 

2198 root_dir: Optional[FSRootDir] = None 

2199 directories: Dict[str, FSPath] = {} 

2200 non_directories = set() 

2201 

2202 def _ensure_parent_dirs(p: str) -> None: 

2203 current = p.rstrip("/") 

2204 missing_dirs = [] 

2205 while True: 

2206 current = os.path.dirname(current) 

2207 if current in directories: 

2208 break 

2209 if current in non_directories: 2209 ↛ 2210line 2209 didn't jump to line 2210 because the condition on line 2209 was never true

2210 raise ValueError( 

2211 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2212 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2213 ) 

2214 missing_dirs.append(current) 

2215 for dir_path in reversed(missing_dirs): 

2216 parent_dir = directories[os.path.dirname(dir_path)] 

2217 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2218 directories[dir_path] = d 

2219 

2220 for path_def in as_path_defs(paths): 

2221 path = path_def.path_name 

2222 if path in directories or path in non_directories: 2222 ↛ 2223line 2222 didn't jump to line 2223 because the condition on line 2222 was never true

2223 raise ValueError( 

2224 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2225 ' "correct order" (ensure directories occur before their children)' 

2226 ) 

2227 if root_dir is None: 

2228 root_fs_path = None 

2229 if path in (".", "./", "/"): 

2230 root_fs_path = path_def.fs_path 

2231 root_dir = FSRootDir(fs_path=root_fs_path) 

2232 directories["."] = root_dir 

2233 

2234 if path not in (".", "./", "/") and not path.startswith("./"): 

2235 path = "./" + path 

2236 if path not in (".", "./", "/"): 

2237 _ensure_parent_dirs(path) 

2238 if path in (".", "./"): 

2239 assert "." in directories 

2240 continue 

2241 is_dir = False 

2242 if path.endswith("/"): 

2243 path = path[:-1] 

2244 is_dir = True 

2245 directory = directories[os.path.dirname(path)] 

2246 assert not is_dir or not bool( 

2247 path_def.link_target 

2248 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2249 fs_path = VirtualTestPath( 

2250 os.path.basename(path), 

2251 directory, 

2252 is_dir=is_dir, 

2253 mode=path_def.mode, 

2254 mtime=path_def.mtime, 

2255 has_fs_path=path_def.has_fs_path, 

2256 fs_path=path_def.fs_path, 

2257 link_target=path_def.link_target, 

2258 content=path_def.content, 

2259 materialized_content=path_def.materialized_content, 

2260 ) 

2261 assert not fs_path.is_detached 

2262 if fs_path.is_dir: 

2263 directories[fs_path.path] = fs_path 

2264 else: 

2265 non_directories.add(fs_path.path) 

2266 

2267 if root_dir is None: 

2268 root_dir = FSRootDir() 

2269 

2270 root_dir.is_read_write = read_write_fs 

2271 return root_dir