Coverage for src/debputy/filesystem_scan.py: 66%

1295 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-19 09:24 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import shutil 

9import stat 

10import subprocess 

11import tempfile 

12import time 

13from abc import ABC 

14from contextlib import suppress 

15from typing import ( 

16 Optional, 

17 cast, 

18 Any, 

19 ContextManager, 

20 TextIO, 

21 BinaryIO, 

22 Generic, 

23 TypeVar, 

24 overload, 

25 Literal, 

26 Never, 

27) 

28from collections.abc import Iterable, Iterator, Mapping, Callable 

29from weakref import ref, ReferenceType 

30 

31from debputy.exceptions import ( 

32 PureVirtualPathError, 

33 DebputyFSIsROError, 

34 DebputyMetadataAccessError, 

35 TestPathWithNonExistentFSPathError, 

36 SymlinkLoopError, 

37) 

38from debputy.intermediate_manifest import PathType 

39from debputy.manifest_parser.base_types import ( 

40 ROOT_DEFINITION, 

41 StaticFileSystemOwner, 

42 StaticFileSystemGroup, 

43) 

44from debputy.plugin.api.spec import ( 

45 VirtualPath, 

46 PathDef, 

47 PathMetadataReference, 

48 PMT, 

49) 

50from debputy.types import VP 

51from debputy.util import ( 

52 generated_content_dir, 

53 _error, 

54 escape_shell, 

55 assume_not_none, 

56 _normalize_path, 

57 _debug_log, 

58) 

59 

60BY_BASENAME = operator.attrgetter("name") 

61 

62FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

63FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

64 

65 

66BinaryOpenMode = Literal[ 

67 "rb", 

68 "r+b", 

69 "wb", 

70 "w+b", 

71 "xb", 

72 "ab", 

73] 

74TextOpenMode = Literal[ 

75 "r", 

76 "r+", 

77 "rt", 

78 "r+t", 

79 "w", 

80 "w+", 

81 "wt", 

82 "w+t", 

83 "x", 

84 "xt", 

85 "a", 

86 "at", 

87] 

88OpenMode = Literal[BinaryOpenMode, TextOpenMode] 

89 

90 

91class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

92 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

93 

94 def __init__( 

95 self, 

96 owning_plugin: str, 

97 current_plugin: str, 

98 metadata_type: type[PMT], 

99 ) -> None: 

100 self._owning_plugin = owning_plugin 

101 self._current_plugin = current_plugin 

102 self._metadata_type = metadata_type 

103 

104 @property 

105 def is_present(self) -> bool: 

106 return False 

107 

108 @property 

109 def can_read(self) -> bool: 

110 return self._owning_plugin == self._current_plugin 

111 

112 @property 

113 def can_write(self) -> bool: 

114 return False 

115 

116 @property 

117 def value(self) -> PMT | None: 

118 if self.can_read: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 return None 

120 raise DebputyMetadataAccessError( 

121 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

122 f" {self._owning_plugin} as the metadata has not been made" 

123 f" readable to the plugin {self._current_plugin}." 

124 ) 

125 

126 @value.setter 

127 def value(self, new_value: PMT) -> None: 

128 if self._is_owner: 

129 raise DebputyFSIsROError( 

130 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

131 ) 

132 raise DebputyMetadataAccessError( 

133 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

134 f" {self._owning_plugin} as the metadata has not been made" 

135 f" read-write to the plugin {self._current_plugin}." 

136 ) 

137 

138 @property 

139 def _is_owner(self) -> bool: 

140 return self._owning_plugin == self._current_plugin 

141 

142 

143@dataclasses.dataclass(slots=True) 

144class PathMetadataValue(Generic[PMT]): 

145 owning_plugin: str 

146 metadata_type: type[PMT] 

147 value: PMT | None = None 

148 

149 def can_read_value(self, current_plugin: str) -> bool: 

150 return self.owning_plugin == current_plugin 

151 

152 def can_write_value(self, current_plugin: str) -> bool: 

153 return self.owning_plugin == current_plugin 

154 

155 

156class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

157 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

158 

159 def __init__( 

160 self, 

161 owning_path: VirtualPath, 

162 current_plugin: str, 

163 path_metadata_value: PathMetadataValue[PMT], 

164 ) -> None: 

165 self._owning_path = owning_path 

166 self._current_plugin = current_plugin 

167 self._path_metadata_value = path_metadata_value 

168 

169 @property 

170 def is_present(self) -> bool: 

171 if not self.can_read: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 return False 

173 return self._path_metadata_value.value is not None 

174 

175 @property 

176 def can_read(self) -> bool: 

177 return self._path_metadata_value.can_read_value(self._current_plugin) 

178 

179 @property 

180 def can_write(self) -> bool: 

181 if not self._path_metadata_value.can_write_value(self._current_plugin): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 return False 

183 owning_path = self._owning_path 

184 return owning_path.is_read_write and not owning_path.is_detached 

185 

186 @property 

187 def value(self) -> PMT | None: 

188 if self.can_read: 188 ↛ 190line 188 didn't jump to line 190 because the condition on line 188 was always true

189 return self._path_metadata_value.value 

190 raise DebputyMetadataAccessError( 

191 f"Cannot read the metadata {self._metadata_type_name} owned by" 

192 f" {self._owning_plugin} as the metadata has not been made" 

193 f" readable to the plugin {self._current_plugin}." 

194 ) 

195 

196 @value.setter 

197 def value(self, new_value: PMT) -> None: 

198 if not self.can_write: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 m = "set" if new_value is not None else "delete" 

200 raise DebputyMetadataAccessError( 

201 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

202 f" {self._owning_plugin} as the metadata has not been made" 

203 f" read-write to the plugin {self._current_plugin}." 

204 ) 

205 owning_path = self._owning_path 

206 if not owning_path.is_read_write: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 raise DebputyFSIsROError( 

208 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

209 ) 

210 if owning_path.is_detached: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 raise TypeError( 

212 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

213 ) 

214 self._path_metadata_value.value = new_value 

215 

216 @property 

217 def _is_owner(self) -> bool: 

218 return self._owning_plugin == self._current_plugin 

219 

220 @property 

221 def _owning_plugin(self) -> str: 

222 return self._path_metadata_value.owning_plugin 

223 

224 @property 

225 def _metadata_type_name(self) -> str: 

226 return self._path_metadata_value.metadata_type.__name__ 

227 

228 

229def _cp_a(source: str, dest: str) -> None: 

230 cmd = ["cp", "-a", source, dest] 

231 try: 

232 subprocess.check_call(cmd) 

233 except subprocess.CalledProcessError: 

234 full_command = escape_shell(*cmd) 

235 _error( 

236 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

237 f" above to understand what went wrong. The full command was: {full_command}" 

238 ) 

239 

240 

241def _split_path(path: str) -> tuple[bool, bool, list[str]]: 

242 must_be_dir = True if path.endswith("/") else False 

243 absolute = False 

244 if path.startswith("/"): 

245 absolute = True 

246 path = "." + path 

247 path_parts = path.rstrip("/").split("/") 

248 if must_be_dir: 

249 path_parts.append(".") 

250 return absolute, must_be_dir, path_parts 

251 

252 

253def _root(path: VP) -> VP: 

254 current = path 

255 while True: 

256 parent = current.parent_dir 

257 if parent is None: 

258 return current 

259 current = parent 

260 

261 

262def _check_fs_path_is_file( 

263 fs_path: str, 

264 unlink_on_error: Optional["VirtualPath"] = None, 

265) -> None: 

266 had_issue = False 

267 try: 

268 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

269 st = os.lstat(fs_path) 

270 except FileNotFoundError: 

271 had_issue = True 

272 else: 

273 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 had_issue = True 

275 if not had_issue: 275 ↛ 278line 275 didn't jump to line 278 because the condition on line 275 was always true

276 return 

277 

278 if unlink_on_error: 

279 with suppress(FileNotFoundError): 

280 os.unlink(fs_path) 

281 raise TypeError( 

282 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

283 " linked to another file. The entry has been disconnected." 

284 ) 

285 

286 

287class CurrentPluginContextManager: 

288 __slots__ = ("_plugin_names",) 

289 

290 def __init__(self, initial_plugin_name: str) -> None: 

291 self._plugin_names = [initial_plugin_name] 

292 

293 @property 

294 def current_plugin_name(self) -> str: 

295 return self._plugin_names[-1] 

296 

297 @contextlib.contextmanager 

298 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

299 self._plugin_names.append(new_plugin_name) 

300 yield new_plugin_name 

301 self._plugin_names.pop() 

302 

303 

304class VirtualPathBase(VirtualPath, ABC): 

305 __slots__ = () 

306 

307 def _orphan_safe_path(self) -> str: 

308 return self.path 

309 

310 def _rw_check(self) -> None: 

311 if not self.is_read_write: 

312 raise DebputyFSIsROError( 

313 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

314 " Debputy Virtual File system is R/O." 

315 ) 

316 

317 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

318 match, missing = self.attempt_lookup(path) 

319 if missing: 

320 return None 

321 return match 

322 

323 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]: 

324 if self.is_detached: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 raise ValueError( 

326 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

327 ) 

328 absolute, must_be_dir, path_parts = _split_path(path) 

329 current = _root(self) if absolute else self 

330 path_parts.reverse() 

331 link_expansions = set() 

332 while path_parts: 

333 dir_part = path_parts.pop() 

334 if dir_part == ".": 

335 continue 

336 if dir_part == "..": 

337 p = current.parent_dir 

338 if p is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 raise ValueError(f'The path "{path}" escapes the root dir') 

340 current = p 

341 continue 

342 try: 

343 current = current[dir_part] 

344 except KeyError: 

345 path_parts.append(dir_part) 

346 path_parts.reverse() 

347 if must_be_dir: 

348 path_parts.pop() 

349 return current, path_parts 

350 if current.is_symlink and path_parts: 

351 if current.path in link_expansions: 

352 # This is our loop detection for now. It might have some false positives where you 

353 # could safely resolve the same symlink twice. However, given that this use-case is 

354 # basically non-existent in practice for packaging, we just stop here for now. 

355 raise SymlinkLoopError( 

356 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

357 " times. Currently, traversing the same symlink twice is considered" 

358 " a loop by `debputy` even if the path would eventually resolve." 

359 " Consider filing a feature request if you have a benign case that" 

360 " triggers this error." 

361 ) 

362 link_expansions.add(current.path) 

363 link_target = current.readlink() 

364 link_absolute, _, link_path_parts = _split_path(link_target) 

365 if link_absolute: 

366 current = _root(current) 

367 else: 

368 current = assume_not_none(current.parent_dir) 

369 link_path_parts.reverse() 

370 path_parts.extend(link_path_parts) 

371 return current, [] 

372 

373 def mkdirs(self, path: str) -> "VirtualPath": 

374 current: VirtualPath 

375 current, missing_parts = self.attempt_lookup( 

376 f"{path}/" if not path.endswith("/") else path 

377 ) 

378 if not current.is_dir: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true

379 raise ValueError( 

380 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

381 " a directory. However, that path exist AND is a not directory." 

382 ) 

383 for missing_part in missing_parts: 

384 assert missing_part not in (".", "..") 

385 current = current.mkdir(missing_part) 

386 return current 

387 

388 def prune_if_empty_dir(self) -> None: 

389 """Remove this and all (now) empty parent directories 

390 

391 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

392 

393 This operation may cause the path (and any of its parent directories) to become "detached" 

394 and therefore unsafe to use in further operations. 

395 """ 

396 self._rw_check() 

397 

398 if not self.is_dir: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true

399 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

400 if any(self.iterdir): 

401 return 

402 parent_dir = assume_not_none(self.parent_dir) 

403 

404 # Recursive does not matter; we already know the directory is empty. 

405 self.unlink() 

406 

407 # Note: The root dir must never be deleted. This works because when delegating it to the root 

408 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

409 # inline loop (rather than recursion), be sure to preserve this feature. 

410 parent_dir.prune_if_empty_dir() 

411 

412 def _current_plugin(self) -> str: 

413 if self.is_detached: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 raise TypeError("Cannot resolve the current plugin; path is detached") 

415 current = self 

416 while True: 

417 next_parent = current.parent_dir 

418 if next_parent is None: 

419 break 

420 current = next_parent 

421 assert current is not None 

422 return cast("FSRootDir", current)._current_plugin() 

423 

424 @overload 

425 def open_child( 425 ↛ exitline 425 didn't return from function 'open_child' because

426 self, 

427 name: str, 

428 mode: TextOpenMode = ..., 

429 buffering: int = -1, 

430 ) -> TextIO: ... 

431 

432 @overload 

433 def open_child( 433 ↛ exitline 433 didn't return from function 'open_child' because

434 self, 

435 name: str, 

436 mode: BinaryOpenMode = ..., 

437 buffering: int = -1, 

438 ) -> BinaryIO: ... 

439 

440 @overload 

441 def open_child( 441 ↛ exitline 441 didn't return from function 'open_child' because

442 self, 

443 name: str, 

444 mode: OpenMode = ..., 

445 buffering: int = -1, 

446 ) -> TextIO | BinaryIO: ... 

447 

448 @contextlib.contextmanager 

449 def open_child( 

450 self, 

451 name: str, 

452 mode: OpenMode = "r", 

453 buffering: int = -1, 

454 ) -> TextIO | BinaryIO: 

455 """Open a child path of the current directory in a given mode. Usually used with a context manager 

456 

457 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

458 The following symbols are accepted with the same meaning as Python's open: 

459 * `r` 

460 * `w` 

461 * `x` 

462 * `a` 

463 * `+` 

464 * `b` 

465 * `t` 

466 

467 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

468 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

469 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

470 as-is. 

471 

472 :param name: The name of the child path to open. Must be a basename. 

473 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

474 examples. 

475 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

476 testing, the content may be purely in memory and use a BytesIO/StringIO 

477 (which does not accept that parameter, but then it is buffered in a different way) 

478 :return: The file handle. 

479 """ 

480 existing = self.get(name) 

481 if "r" in mode: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 if existing is None: 

483 raise ValueError( 

484 f"Path {self.path}/{name} does not exist and mode had `r`" 

485 ) 

486 if "+" not in mode: 

487 with existing.open(byte_io="b" in mode, buffering=buffering) as fd: 

488 yield fd 

489 

490 encoding = None if "b" in mode else "utf-8" 

491 

492 if existing: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 if "x" in mode: 

494 raise ValueError( 

495 f"Path {existing.path} already exists and mode had `x`" 

496 ) 

497 with ( 

498 existing.replace_fs_path_content() as fs_path, 

499 open(fs_path, mode, encoding=encoding) as fd, 

500 ): 

501 yield fd 

502 else: 

503 assert "r" not in mode 

504 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

505 with ( 

506 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

507 open(new_path.fs_path, mode, encoding=encoding) as fd, 

508 ): 

509 yield fd 

510 

511 

512class FSPath(VirtualPathBase, ABC): 

513 __slots__ = ( 

514 "_basename", 

515 "_parent_dir", 

516 "_children", 

517 "_path_cache", 

518 "_parent_path_cache", 

519 "_last_known_parent_path", 

520 "_mode", 

521 "_owner", 

522 "_group", 

523 "_mtime", 

524 "_stat_cache", 

525 "_metadata", 

526 "__weakref__", 

527 ) 

528 

529 def __init__( 

530 self, 

531 basename: str, 

532 parent: Optional["FSPath"], 

533 children: dict[str, "FSPath"] | None = None, 

534 initial_mode: int | None = None, 

535 mtime: float | None = None, 

536 stat_cache: os.stat_result | None = None, 

537 ) -> None: 

538 self._basename = basename 

539 self._path_cache: str | None = None 

540 self._parent_path_cache: str | None = None 

541 self._children = children 

542 self._last_known_parent_path: str | None = None 

543 self._mode = initial_mode 

544 self._mtime = mtime 

545 self._stat_cache = stat_cache 

546 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {} 

547 self._owner = ROOT_DEFINITION 

548 self._group = ROOT_DEFINITION 

549 

550 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

551 # is_orphaned, which assumes self._parent_dir is an attribute. 

552 self._parent_dir: ReferenceType["FSPath"] | None = None 

553 if parent is not None: 

554 self.parent_dir = parent 

555 

556 def __repr__(self) -> str: 

557 return ( 

558 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

559 f" is_file={self.is_file}," 

560 f" is_dir={self.is_dir}," 

561 f" is_symlink={self.is_symlink}," 

562 f" has_fs_path={self.has_fs_path}," 

563 f" children_len={len(self._children) if self._children else 0})" 

564 ) 

565 

566 @property 

567 def name(self) -> str: 

568 return self._basename 

569 

570 @name.setter 

571 def name(self, new_name: str) -> None: 

572 self._rw_check() 

573 if new_name == self._basename: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true

574 return 

575 if self.is_detached: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 self._basename = new_name 

577 return 

578 self._rw_check() 

579 parent = self.parent_dir 

580 # This little parent_dir dance ensures the parent dir detects the rename properly 

581 self.parent_dir = None 

582 self._basename = new_name 

583 self.parent_dir = parent 

584 

585 @property 

586 def iterdir(self) -> Iterable["FSPath"]: 

587 if self._children is not None: 

588 yield from self._children.values() 

589 

590 def all_paths(self) -> Iterable["FSPath"]: 

591 yield self 

592 if not self.is_dir: 

593 return 

594 by_basename = BY_BASENAME 

595 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

596 while stack: 

597 current = stack.pop() 

598 yield current 

599 if current.is_dir and not current.is_detached: 

600 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

601 

602 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]: 

603 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

604 if not self.is_dir: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 yield self, [] 

606 return 

607 by_basename = BY_BASENAME 

608 stack = [self] 

609 while stack: 

610 current = stack.pop() 

611 children = sorted(current.iterdir, key=by_basename) 

612 assert not children or current.is_dir 

613 yield current, children 

614 # Removing the directory counts as discarding the children. 

615 if not current.is_detached: 615 ↛ 609line 615 didn't jump to line 609 because the condition on line 615 was always true

616 stack.extend(reversed(children)) 

617 

618 def _orphan_safe_path(self) -> str: 

619 if not self.is_detached or self._last_known_parent_path is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true

620 return self.path 

621 return f"<orphaned>/{self.name}" 

622 

623 @property 

624 def is_detached(self) -> bool: 

625 parent = self._parent_dir 

626 if parent is None: 

627 return True 

628 resolved_parent = parent() 

629 if resolved_parent is None: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true

630 return True 

631 return resolved_parent.is_detached 

632 

633 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

634 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

635 # behavior to avoid surprises for now. 

636 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

637 # to using it) 

638 __iter__ = None 

639 

640 def __getitem__(self, key) -> "FSPath": 

641 if self._children is None: 

642 raise KeyError( 

643 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

644 ) 

645 if isinstance(key, FSPath): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 key = key.name 

647 return self._children[key] 

648 

649 def __delitem__(self, key) -> None: 

650 self._rw_check() 

651 children = self._children 

652 if children is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true

653 raise KeyError(key) 

654 del children[key] 

655 

656 def get(self, key: str) -> "Optional[FSPath]": 

657 try: 

658 return self[key] 

659 except KeyError: 

660 return None 

661 

662 def __contains__(self, item: object) -> bool: 

663 if isinstance(item, VirtualPath): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true

664 return item.parent_dir is self 

665 if not isinstance(item, str): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 return False 

667 m = self.get(item) 

668 return m is not None 

669 

670 def _add_child(self, child: "FSPath") -> None: 

671 self._rw_check() 

672 if not self.is_dir: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

674 if self._children is None: 

675 self._children = {} 

676 

677 conflict_child = self.get(child.name) 

678 if conflict_child is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 conflict_child.unlink(recursive=True) 

680 self._children[child.name] = child 

681 

682 @property 

683 def tar_path(self) -> str: 

684 path = self.path 

685 if self.is_dir: 

686 return path + "/" 

687 return path 

688 

689 @property 

690 def path(self) -> str: 

691 parent_path = self.parent_dir_path 

692 if ( 

693 self._parent_path_cache is not None 

694 and self._parent_path_cache == parent_path 

695 ): 

696 return assume_not_none(self._path_cache) 

697 if parent_path is None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 raise ReferenceError( 

699 f"The path {self.name} is detached! {self.__class__.__name__}" 

700 ) 

701 self._parent_path_cache = parent_path 

702 ret = os.path.join(parent_path, self.name) 

703 self._path_cache = ret 

704 return ret 

705 

706 @property 

707 def parent_dir(self) -> Optional["FSPath"]: 

708 p_ref = self._parent_dir 

709 p = p_ref() if p_ref is not None else None 

710 if p is None: 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true

711 raise ReferenceError( 

712 f"The path {self.name} is detached! {self.__class__.__name__}" 

713 ) 

714 return p 

715 

716 @parent_dir.setter 

717 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

718 self._rw_check() 

719 if new_parent is not None: 

720 if not new_parent.is_dir: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 raise ValueError( 

722 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

723 ) 

724 new_parent._rw_check() 

725 old_parent = None 

726 self._last_known_parent_path = None 

727 if not self.is_detached: 

728 old_parent = self.parent_dir 

729 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

730 del old_parent_children[self.name] 

731 if new_parent is not None: 

732 self._parent_dir = ref(new_parent) 

733 new_parent._add_child(self) 

734 else: 

735 if old_parent is not None and not old_parent.is_detached: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true

736 self._last_known_parent_path = old_parent.path 

737 self._parent_dir = None 

738 self._parent_path_cache = None 

739 

740 @property 

741 def parent_dir_path(self) -> str | None: 

742 if self.is_detached: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true

743 return self._last_known_parent_path 

744 return assume_not_none(self.parent_dir).path 

745 

746 def chown( 

747 self, 

748 owner: StaticFileSystemOwner | None, 

749 group: StaticFileSystemGroup | None, 

750 ) -> None: 

751 """Change the owner/group of this path 

752 

753 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

754 :param group: The desired group definition for this path. If None, then no change of group is performed. 

755 """ 

756 self._rw_check() 

757 

758 if owner is not None: 

759 self._owner = owner.ownership_definition 

760 if group is not None: 

761 self._group = group.ownership_definition 

762 

763 def stat(self) -> os.stat_result: 

764 st = self._stat_cache 

765 if st is None: 

766 st = self._uncached_stat() 

767 self._stat_cache = st 

768 return st 

769 

770 def _uncached_stat(self) -> os.stat_result: 

771 return os.lstat(self.fs_path) 

772 

773 @property 

774 def mode(self) -> int: 

775 current_mode = self._mode 

776 if current_mode is None: 776 ↛ 777line 776 didn't jump to line 777 because the condition on line 776 was never true

777 current_mode = stat.S_IMODE(self.stat().st_mode) 

778 self._mode = current_mode 

779 return current_mode 

780 

781 @mode.setter 

782 def mode(self, new_mode: int) -> None: 

783 self._rw_check() 

784 min_bit = 0o700 if self.is_dir else 0o400 

785 if (new_mode & min_bit) != min_bit: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 omode = oct(new_mode)[2:] 

787 omin = oct(min_bit)[2:] 

788 raise ValueError( 

789 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

790 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

791 " There are no paths that do not need these requirements met and they can cause" 

792 " problems during build or on the final system." 

793 ) 

794 self._mode = new_mode 

795 

796 def _ensure_min_mode(self) -> None: 

797 min_bit = 0o700 if self.is_dir else 0o600 

798 if self.has_fs_path and (self.mode & 0o600) != 0o600: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true

799 try: 

800 fs_path = self.fs_path 

801 except TestPathWithNonExistentFSPathError: 

802 pass 

803 else: 

804 st = os.stat(fs_path) 

805 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

806 _debug_log( 

807 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

808 ) 

809 os.chmod(fs_path, new_fs_mode) 

810 self.mode |= min_bit 

811 

812 @property 

813 def mtime(self) -> float: 

814 mtime = self._mtime 

815 if mtime is None: 

816 mtime = self.stat().st_mtime 

817 self._mtime = mtime 

818 return mtime 

819 

820 @mtime.setter 

821 def mtime(self, new_mtime: float) -> None: 

822 self._rw_check() 

823 self._mtime = new_mtime 

824 

825 @property 

826 def tar_owner_info(self) -> tuple[str, int, str, int]: 

827 owner = self._owner 

828 group = self._group 

829 return ( 

830 owner.entity_name, 

831 owner.entity_id, 

832 group.entity_name, 

833 group.entity_id, 

834 ) 

835 

836 @property 

837 def _can_replace_inline(self) -> bool: 

838 return False 

839 

840 @contextlib.contextmanager 

841 def add_file( 

842 self, 

843 name: str, 

844 *, 

845 unlink_if_exists: bool = True, 

846 use_fs_path_mode: bool = False, 

847 mode: int = 0o0644, 

848 mtime: float | None = None, 

849 # Special-case parameters that are not exposed in the API 

850 fs_basename_matters: bool = False, 

851 subdir_key: str | None = None, 

852 ) -> Iterator["FSPath"]: 

853 if "/" in name or name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true

854 raise ValueError(f'Invalid file name: "{name}"') 

855 if not self.is_dir: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 raise TypeError( 

857 f"Cannot create {self._orphan_safe_path()}/{name}:" 

858 f" {self._orphan_safe_path()} is not a directory" 

859 ) 

860 self._rw_check() 

861 existing = self.get(name) 

862 if existing is not None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 if not unlink_if_exists: 

864 raise ValueError( 

865 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

866 f" and exist_ok was False" 

867 ) 

868 existing.unlink(recursive=False) 

869 

870 if fs_basename_matters and subdir_key is None: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true

871 raise ValueError( 

872 "When fs_basename_matters is True, a subdir_key must be provided" 

873 ) 

874 

875 directory = generated_content_dir(subdir_key=subdir_key) 

876 

877 if fs_basename_matters: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 fs_path = os.path.join(directory, name) 

879 with open(fs_path, "xb") as _: 

880 # Ensure that the fs_path exists 

881 pass 

882 child = FSBackedFilePath( 

883 name, 

884 self, 

885 fs_path, 

886 replaceable_inline=True, 

887 mtime=mtime, 

888 ) 

889 yield child 

890 else: 

891 with tempfile.NamedTemporaryFile( 

892 dir=directory, suffix=f"__{name}", delete=False 

893 ) as fd: 

894 fs_path = fd.name 

895 child = FSBackedFilePath( 

896 name, 

897 self, 

898 fs_path, 

899 replaceable_inline=True, 

900 mtime=mtime, 

901 ) 

902 fd.close() 

903 yield child 

904 

905 if use_fs_path_mode: 905 ↛ 907line 905 didn't jump to line 907 because the condition on line 905 was never true

906 # Ensure the caller can see the current mode 

907 os.chmod(fs_path, mode) 

908 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

909 child._reset_caches() 

910 if not use_fs_path_mode: 910 ↛ exitline 910 didn't return from function 'add_file' because the condition on line 910 was always true

911 child.mode = mode 

912 

913 def insert_file_from_fs_path( 

914 self, 

915 name: str, 

916 fs_path: str, 

917 *, 

918 exist_ok: bool = True, 

919 use_fs_path_mode: bool = False, 

920 mode: int = 0o0644, 

921 require_copy_on_write: bool = True, 

922 follow_symlinks: bool = True, 

923 reference_path: VirtualPath | None = None, 

924 ) -> "FSPath": 

925 if "/" in name or name in {".", ".."}: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 raise ValueError(f'Invalid file name: "{name}"') 

927 if not self.is_dir: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 raise TypeError( 

929 f"Cannot create {self._orphan_safe_path()}/{name}:" 

930 f" {self._orphan_safe_path()} is not a directory" 

931 ) 

932 self._rw_check() 

933 if name in self and not exist_ok: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 raise ValueError( 

935 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

936 f" and exist_ok was False" 

937 ) 

938 new_fs_path = fs_path 

939 if follow_symlinks: 

940 if reference_path is not None: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true

941 raise ValueError( 

942 "The reference_path cannot be used with follow_symlinks" 

943 ) 

944 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

945 

946 fmode: int | None = mode 

947 if use_fs_path_mode: 

948 fmode = None 

949 

950 st = None 

951 if reference_path is None: 

952 st = os.lstat(new_fs_path) 

953 if stat.S_ISDIR(st.st_mode): 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 raise ValueError( 

955 f'The provided path "{fs_path}" is a directory. However, this' 

956 " method does not support directories" 

957 ) 

958 

959 if not stat.S_ISREG(st.st_mode): 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 if follow_symlinks: 

961 raise ValueError( 

962 f"The resolved fs_path ({new_fs_path}) was not a file." 

963 ) 

964 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

965 return FSBackedFilePath( 

966 name, 

967 self, 

968 new_fs_path, 

969 initial_mode=fmode, 

970 stat_cache=st, 

971 replaceable_inline=not require_copy_on_write, 

972 reference_path=reference_path, 

973 ) 

974 

975 def add_symlink( 

976 self, 

977 link_name: str, 

978 link_target: str, 

979 *, 

980 reference_path: VirtualPath | None = None, 

981 ) -> "FSPath": 

982 if "/" in link_name or link_name in {".", ".."}: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true

983 raise ValueError( 

984 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

985 ) 

986 if not self.is_dir: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true

987 raise TypeError( 

988 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

989 f" {self._orphan_safe_path()} is not a directory" 

990 ) 

991 self._rw_check() 

992 

993 existing = self.get(link_name) 

994 if existing: 994 ↛ 996line 994 didn't jump to line 996 because the condition on line 994 was never true

995 # Emulate ln -sf with attempts a non-recursive unlink first. 

996 existing.unlink(recursive=False) 

997 

998 return SymlinkVirtualPath( 

999 link_name, 

1000 self, 

1001 link_target, 

1002 reference_path=reference_path, 

1003 ) 

1004 

1005 def mkdir( 

1006 self, 

1007 name: str, 

1008 *, 

1009 reference_path: VirtualPath | None = None, 

1010 ) -> "FSPath": 

1011 if "/" in name or name in {".", ".."}: 1011 ↛ 1012line 1011 didn't jump to line 1012 because the condition on line 1011 was never true

1012 raise ValueError( 

1013 f'Invalid file name: "{name}" (it must be a valid basename)' 

1014 ) 

1015 if not self.is_dir: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 raise TypeError( 

1017 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1018 f" {self._orphan_safe_path()} is not a directory" 

1019 ) 

1020 if reference_path is not None and not reference_path.is_dir: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true

1021 raise ValueError( 

1022 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1023 ) 

1024 self._rw_check() 

1025 

1026 existing = self.get(name) 

1027 if existing: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true

1028 raise ValueError(f"Path {existing.path} already exist") 

1029 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1030 

1031 def mkdirs(self, path: str) -> "FSPath": 

1032 return cast("FSPath", super().mkdirs(path)) 

1033 

1034 @property 

1035 def is_read_write(self) -> bool: 

1036 """When true, the file system entry may be mutated 

1037 

1038 :return: Whether file system mutations are permitted. 

1039 """ 

1040 if self.is_detached: 

1041 return True 

1042 return assume_not_none(self.parent_dir).is_read_write 

1043 

1044 def unlink(self, *, recursive: bool = False) -> None: 

1045 """Unlink a file or a directory 

1046 

1047 This operation will detach the path from the file system (causing "is_detached" to return True). 

1048 

1049 Note that the root directory cannot be deleted. 

1050 

1051 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1052 as well. When False, an error is raised if the path is a non-empty directory 

1053 """ 

1054 if self.is_detached: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 return 

1056 if not recursive and any(self.iterdir): 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true

1057 raise ValueError( 

1058 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1059 ) 

1060 # The .parent_dir setter does a _rw_check() for us. 

1061 self.parent_dir = None 

1062 

1063 def _reset_caches(self) -> None: 

1064 self._mtime = None 

1065 self._stat_cache = None 

1066 

1067 def metadata( 

1068 self, 

1069 metadata_type: type[PMT], 

1070 *, 

1071 owning_plugin: str | None = None, 

1072 ) -> PathMetadataReference[PMT]: 

1073 current_plugin = self._current_plugin() 

1074 if owning_plugin is None: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true

1075 owning_plugin = current_plugin 

1076 metadata_key = (owning_plugin, metadata_type) 

1077 metadata_value = self._metadata.get(metadata_key) 

1078 if metadata_value is None: 

1079 if self.is_detached: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true

1080 raise TypeError( 

1081 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1082 ) 

1083 if not self.is_read_write: 

1084 return AlwaysEmptyReadOnlyMetadataReference( 

1085 owning_plugin, 

1086 current_plugin, 

1087 metadata_type, 

1088 ) 

1089 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1090 self._metadata[metadata_key] = metadata_value 

1091 return PathMetadataReferenceImplementation( 

1092 self, 

1093 current_plugin, 

1094 metadata_value, 

1095 ) 

1096 

1097 @contextlib.contextmanager 

1098 def replace_fs_path_content( 

1099 self, 

1100 *, 

1101 use_fs_path_mode: bool = False, 

1102 ) -> Iterator[str]: 

1103 if not self.is_file: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 raise TypeError( 

1105 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1106 ) 

1107 self._rw_check() 

1108 fs_path = self.fs_path 

1109 if not self._can_replace_inline: 1109 ↛ 1121line 1109 didn't jump to line 1121 because the condition on line 1109 was always true

1110 fs_path = self.fs_path 

1111 directory = generated_content_dir() 

1112 with tempfile.NamedTemporaryFile( 

1113 dir=directory, suffix=f"__{self.name}", delete=False 

1114 ) as new_path_fd: 

1115 new_path_fd.close() 

1116 _cp_a(fs_path, new_path_fd.name) 

1117 fs_path = new_path_fd.name 

1118 self._replaced_path(fs_path) 

1119 assert self.fs_path == fs_path 

1120 

1121 current_mtime = self._mtime 

1122 if current_mtime is not None: 

1123 os.utime(fs_path, (current_mtime, current_mtime)) 

1124 

1125 current_mode = self.mode 

1126 yield fs_path 

1127 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1128 if not use_fs_path_mode: 1128 ↛ 1130line 1128 didn't jump to line 1130 because the condition on line 1128 was always true

1129 os.chmod(fs_path, current_mode) 

1130 self._reset_caches() 

1131 

1132 def _replaced_path(self, new_fs_path: str) -> None: 

1133 raise NotImplementedError 

1134 

1135 

1136class VirtualFSPathBase(FSPath, ABC): 

1137 __slots__ = () 

1138 

1139 def __init__( 

1140 self, 

1141 basename: str, 

1142 parent: Optional["FSPath"], 

1143 children: dict[str, "FSPath"] | None = None, 

1144 initial_mode: int | None = None, 

1145 mtime: float | None = None, 

1146 stat_cache: os.stat_result | None = None, 

1147 ) -> None: 

1148 super().__init__( 

1149 basename, 

1150 parent, 

1151 children, 

1152 initial_mode=initial_mode, 

1153 mtime=mtime, 

1154 stat_cache=stat_cache, 

1155 ) 

1156 

1157 @property 

1158 def mtime(self) -> float: 

1159 mtime = self._mtime 

1160 if mtime is None: 

1161 mtime = time.time() 

1162 self._mtime = mtime 

1163 return mtime 

1164 

1165 @property 

1166 def has_fs_path(self) -> bool: 

1167 return False 

1168 

1169 def stat(self) -> os.stat_result: 

1170 if not self.has_fs_path: 

1171 raise PureVirtualPathError( 

1172 "stat() is only applicable to paths backed by the file system. The path" 

1173 f" {self._orphan_safe_path()!r} is purely virtual" 

1174 ) 

1175 return super().stat() 

1176 

1177 @property 

1178 def fs_path(self) -> str: 

1179 if not self.has_fs_path: 

1180 raise PureVirtualPathError( 

1181 "fs_path is only applicable to paths backed by the file system. The path" 

1182 f" {self._orphan_safe_path()!r} is purely virtual" 

1183 ) 

1184 return self.fs_path 

1185 

1186 

1187class FSRootDir(FSPath): 

1188 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1189 

1190 def __init__(self, fs_path: str | None = None) -> None: 

1191 self._fs_path = fs_path 

1192 self._fs_read_write = True 

1193 super().__init__( 

1194 ".", 

1195 None, 

1196 children={}, 

1197 initial_mode=0o755, 

1198 ) 

1199 self._plugin_context = CurrentPluginContextManager("debputy") 

1200 

1201 @property 

1202 def is_detached(self) -> bool: 

1203 return False 

1204 

1205 def _orphan_safe_path(self) -> str: 

1206 return self.name 

1207 

1208 @property 

1209 def path(self) -> str: 

1210 return self.name 

1211 

1212 @property 

1213 def parent_dir(self) -> Optional["FSPath"]: 

1214 return None 

1215 

1216 @parent_dir.setter 

1217 def parent_dir(self, new_parent: FSPath | None) -> None: 

1218 if new_parent is not None: 

1219 raise ValueError("The root directory cannot become a non-root directory") 

1220 

1221 @property 

1222 def parent_dir_path(self) -> str | None: 

1223 return None 

1224 

1225 @property 

1226 def is_dir(self) -> bool: 

1227 return True 

1228 

1229 @property 

1230 def is_file(self) -> bool: 

1231 return False 

1232 

1233 @property 

1234 def is_symlink(self) -> bool: 

1235 return False 

1236 

1237 def readlink(self) -> str: 

1238 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1239 

1240 @property 

1241 def has_fs_path(self) -> bool: 

1242 return self._fs_path is not None 

1243 

1244 def stat(self) -> os.stat_result: 

1245 if not self.has_fs_path: 

1246 raise PureVirtualPathError( 

1247 "stat() is only applicable to paths backed by the file system. The path" 

1248 f" {self._orphan_safe_path()!r} is purely virtual" 

1249 ) 

1250 return os.stat(self.fs_path) 

1251 

1252 @property 

1253 def fs_path(self) -> str: 

1254 if not self.has_fs_path: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true

1255 raise PureVirtualPathError( 

1256 "fs_path is only applicable to paths backed by the file system. The path" 

1257 f" {self._orphan_safe_path()!r} is purely virtual" 

1258 ) 

1259 return assume_not_none(self._fs_path) 

1260 

1261 @property 

1262 def is_read_write(self) -> bool: 

1263 return self._fs_read_write 

1264 

1265 @is_read_write.setter 

1266 def is_read_write(self, new_value: bool) -> None: 

1267 self._fs_read_write = new_value 

1268 

1269 def prune_if_empty_dir(self) -> None: 

1270 # No-op for the root directory. There is never a case where you want to delete this directory 

1271 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1272 return 

1273 

1274 def unlink(self, *, recursive: bool = False) -> None: 

1275 # There is never a case where you want to delete this directory (and even if you could, 

1276 # debputy will need it for technical reasons, so the root dir stays) 

1277 raise TypeError("Cannot delete the root directory") 

1278 

1279 def _current_plugin(self) -> str: 

1280 return self._plugin_context.current_plugin_name 

1281 

1282 @contextlib.contextmanager 

1283 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1284 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1285 yield r 

1286 

1287 

1288class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1289 __slots__ = ("_reference_path",) 

1290 

1291 def __init__( 

1292 self, 

1293 basename: str, 

1294 parent: FSPath, 

1295 *, 

1296 default_mode: int, 

1297 reference_path: VirtualPath | None = None, 

1298 ) -> None: 

1299 super().__init__( 

1300 basename, 

1301 parent=parent, 

1302 initial_mode=reference_path.mode if reference_path else default_mode, 

1303 ) 

1304 self._reference_path = reference_path 

1305 

1306 @property 

1307 def has_fs_path(self) -> bool: 

1308 ref_path = self._reference_path 

1309 return ref_path is not None and ref_path.has_fs_path 

1310 

1311 @property 

1312 def mtime(self) -> float: 

1313 mtime = self._mtime 

1314 if mtime is None: 1314 ↛ 1321line 1314 didn't jump to line 1321 because the condition on line 1314 was always true

1315 ref_path = self._reference_path 

1316 if ref_path: 1316 ↛ 1319line 1316 didn't jump to line 1319 because the condition on line 1316 was always true

1317 mtime = ref_path.mtime 

1318 else: 

1319 mtime = super().mtime 

1320 self._mtime = mtime 

1321 return mtime 

1322 

1323 @mtime.setter 

1324 def mtime(self, new_mtime: float) -> None: 

1325 self._rw_check() 

1326 self._mtime = new_mtime 

1327 

1328 @property 

1329 def fs_path(self) -> str: 

1330 ref_path = self._reference_path 

1331 if ref_path is not None and ( 1331 ↛ 1335line 1331 didn't jump to line 1335 because the condition on line 1331 was always true

1332 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1333 ): 

1334 return ref_path.fs_path 

1335 return super().fs_path 

1336 

1337 def stat(self) -> os.stat_result: 

1338 ref_path = self._reference_path 

1339 if ref_path is not None and ( 

1340 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1341 ): 

1342 return ref_path.stat() 

1343 return super().stat() 

1344 

1345 def open( 

1346 self, 

1347 *, 

1348 byte_io: bool = False, 

1349 buffering: int = -1, 

1350 ) -> TextIO | BinaryIO: 

1351 reference_path = self._reference_path 

1352 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1353 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1354 return super().open(byte_io=byte_io, buffering=buffering) 

1355 

1356 

1357class VirtualDirectoryFSPath(VirtualPathWithReference): 

1358 __slots__ = ("_reference_path",) 

1359 

1360 def __init__( 

1361 self, 

1362 basename: str, 

1363 parent: FSPath, 

1364 *, 

1365 reference_path: VirtualPath | None = None, 

1366 ) -> None: 

1367 super().__init__( 

1368 basename, 

1369 parent, 

1370 reference_path=reference_path, 

1371 default_mode=0o755, 

1372 ) 

1373 self._reference_path = reference_path 

1374 assert reference_path is None or reference_path.is_dir 

1375 self._ensure_min_mode() 

1376 

1377 @property 

1378 def is_dir(self) -> bool: 

1379 return True 

1380 

1381 @property 

1382 def is_file(self) -> bool: 

1383 return False 

1384 

1385 @property 

1386 def is_symlink(self) -> bool: 

1387 return False 

1388 

1389 def readlink(self) -> str: 

1390 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1391 

1392 

1393class SymlinkVirtualPath(VirtualPathWithReference): 

1394 __slots__ = ("_link_target",) 

1395 

1396 def __init__( 

1397 self, 

1398 basename: str, 

1399 parent_dir: FSPath, 

1400 link_target: str, 

1401 *, 

1402 reference_path: VirtualPath | None = None, 

1403 ) -> None: 

1404 super().__init__( 

1405 basename, 

1406 parent=parent_dir, 

1407 default_mode=_SYMLINK_MODE, 

1408 reference_path=reference_path, 

1409 ) 

1410 self._link_target = link_target 

1411 

1412 @property 

1413 def is_dir(self) -> bool: 

1414 return False 

1415 

1416 @property 

1417 def is_file(self) -> bool: 

1418 return False 

1419 

1420 @property 

1421 def is_symlink(self) -> bool: 

1422 return True 

1423 

1424 def readlink(self) -> str: 

1425 return self._link_target 

1426 

1427 @property 

1428 def size(self) -> int: 

1429 return len(self.readlink()) 

1430 

1431 

1432class FSBackedFilePath(VirtualPathWithReference): 

1433 __slots__ = ("_fs_path", "_replaceable_inline") 

1434 

1435 def __init__( 

1436 self, 

1437 basename: str, 

1438 parent_dir: FSPath, 

1439 fs_path: str, 

1440 *, 

1441 replaceable_inline: bool = False, 

1442 initial_mode: int | None = None, 

1443 mtime: float | None = None, 

1444 stat_cache: os.stat_result | None = None, 

1445 reference_path: VirtualPath | None = None, 

1446 ) -> None: 

1447 super().__init__( 

1448 basename, 

1449 parent_dir, 

1450 default_mode=0o644, 

1451 reference_path=reference_path, 

1452 ) 

1453 self._fs_path = fs_path 

1454 self._replaceable_inline = replaceable_inline 

1455 if initial_mode is not None: 

1456 self.mode = initial_mode 

1457 if mtime is not None: 

1458 self._mtime = mtime 

1459 self._stat_cache = stat_cache 

1460 assert ( 

1461 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1462 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1463 self._ensure_min_mode() 

1464 

1465 @property 

1466 def is_dir(self) -> bool: 

1467 return False 

1468 

1469 @property 

1470 def is_file(self) -> bool: 

1471 return True 

1472 

1473 @property 

1474 def is_symlink(self) -> bool: 

1475 return False 

1476 

1477 def readlink(self) -> str: 

1478 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1479 

1480 @property 

1481 def has_fs_path(self) -> bool: 

1482 return True 

1483 

1484 @property 

1485 def fs_path(self) -> str: 

1486 return self._fs_path 

1487 

1488 @property 

1489 def _can_replace_inline(self) -> bool: 

1490 return self._replaceable_inline 

1491 

1492 def _replaced_path(self, new_fs_path: str) -> None: 

1493 self._fs_path = new_fs_path 

1494 self._reference_path = None 

1495 self._replaceable_inline = True 

1496 

1497 

1498_SYMLINK_MODE = 0o777 

1499 

1500 

1501class VirtualTestPath(FSPath): 

1502 __slots__ = ( 

1503 "_path_type", 

1504 "_has_fs_path", 

1505 "_fs_path", 

1506 "_link_target", 

1507 "_content", 

1508 "_materialized_content", 

1509 ) 

1510 

1511 def __init__( 

1512 self, 

1513 basename: str, 

1514 parent_dir: FSPath | None, 

1515 mode: int | None = None, 

1516 mtime: float | None = None, 

1517 is_dir: bool = False, 

1518 has_fs_path: bool | None = False, 

1519 fs_path: str | None = None, 

1520 link_target: str | None = None, 

1521 content: str | None = None, 

1522 materialized_content: str | None = None, 

1523 ) -> None: 

1524 if is_dir: 

1525 self._path_type = PathType.DIRECTORY 

1526 elif link_target is not None: 

1527 self._path_type = PathType.SYMLINK 

1528 if mode is not None and mode != _SYMLINK_MODE: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true

1529 raise ValueError( 

1530 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1531 ) 

1532 assert mode is None or mode == _SYMLINK_MODE 

1533 else: 

1534 self._path_type = PathType.FILE 

1535 

1536 if mode is not None: 

1537 initial_mode = mode 

1538 else: 

1539 initial_mode = 0o755 if is_dir else 0o644 

1540 

1541 self._link_target = link_target 

1542 if has_fs_path is None: 

1543 has_fs_path = bool(fs_path) 

1544 self._has_fs_path = has_fs_path 

1545 self._fs_path = fs_path 

1546 self._materialized_content = materialized_content 

1547 super().__init__( 

1548 basename, 

1549 parent=parent_dir, 

1550 initial_mode=initial_mode, 

1551 mtime=mtime, 

1552 ) 

1553 self._content = content 

1554 

1555 @property 

1556 def is_dir(self) -> bool: 

1557 return self._path_type == PathType.DIRECTORY 

1558 

1559 @property 

1560 def is_file(self) -> bool: 

1561 return self._path_type == PathType.FILE 

1562 

1563 @property 

1564 def is_symlink(self) -> bool: 

1565 return self._path_type == PathType.SYMLINK 

1566 

1567 def readlink(self) -> str: 

1568 if not self.is_symlink: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true

1569 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1570 link_target = self._link_target 

1571 assert link_target is not None 

1572 return link_target 

1573 

1574 @property 

1575 def mtime(self) -> float: 

1576 if self._mtime is None: 

1577 self._mtime = time.time() 

1578 return self._mtime 

1579 

1580 @mtime.setter 

1581 def mtime(self, new_mtime: float) -> None: 

1582 self._rw_check() 

1583 self._mtime = new_mtime 

1584 

1585 @property 

1586 def has_fs_path(self) -> bool: 

1587 return self._has_fs_path 

1588 

1589 def stat(self) -> os.stat_result: 

1590 if self.has_fs_path: 

1591 path = self.fs_path 

1592 if path is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true

1593 raise PureVirtualPathError( 

1594 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1595 " cannot provide!" 

1596 ) 

1597 try: 

1598 return os.stat(path) 

1599 except FileNotFoundError as e: 

1600 raise PureVirtualPathError( 

1601 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1602 " cannot provide! (An fs_path was provided, but it did not exist)" 

1603 ) from e 

1604 

1605 raise PureVirtualPathError( 

1606 "stat() is only applicable to paths backed by the file system. The path" 

1607 f" {self._orphan_safe_path()!r} is purely virtual" 

1608 ) 

1609 

1610 @property 

1611 def size(self) -> int: 

1612 if self._content is not None: 

1613 return len(self._content.encode("utf-8")) 

1614 if self.is_symlink: 

1615 return len(self.readlink()) 

1616 if not self.has_fs_path or self.fs_path is None: 

1617 return 0 

1618 return self.stat().st_size 

1619 

1620 @property 

1621 def fs_path(self) -> str: 

1622 if self.has_fs_path: 

1623 if self._fs_path is None and self._materialized_content is not None: 

1624 with tempfile.NamedTemporaryFile( 

1625 mode="w+t", 

1626 encoding="utf-8", 

1627 suffix=f"__{self.name}", 

1628 delete=False, 

1629 ) as fd: 

1630 filepath = fd.name 

1631 fd.write(self._materialized_content) 

1632 self._fs_path = filepath 

1633 atexit.register(lambda: os.unlink(filepath)) 

1634 

1635 path = self._fs_path 

1636 if path is None: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true

1637 raise PureVirtualPathError( 

1638 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1639 " mock path cannot provide!" 

1640 ) 

1641 return path 

1642 raise PureVirtualPathError( 

1643 "fs_path is only applicable to paths backed by the file system. The path" 

1644 f" {self._orphan_safe_path()!r} is purely virtual" 

1645 ) 

1646 

1647 def replace_fs_path_content( 

1648 self, 

1649 *, 

1650 use_fs_path_mode: bool = False, 

1651 ) -> ContextManager[str]: 

1652 if self._content is not None: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true

1653 raise TypeError( 

1654 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1655 " created with `content` but for this method to work, the path should have been" 

1656 " created with `materialized_content`" 

1657 ) 

1658 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1659 

1660 @contextlib.contextmanager 

1661 def open_child( 

1662 self, 

1663 name: str, 

1664 mode: OpenMode = "r", 

1665 buffering: int = -1, 

1666 ) -> TextIO | BinaryIO: 

1667 existing = self.get(name) 

1668 if existing or "r" in mode: 

1669 with super().open_child(name, mode, buffering=buffering) as fd: 

1670 yield fd 

1671 return 

1672 if "b" in mode: 

1673 fd = io.BytesIO(b"") 

1674 yield fd 

1675 content = fd.getvalue().decode("utf-8") 

1676 else: 

1677 fd = io.StringIO("") 

1678 yield fd 

1679 content = fd.getvalue() 

1680 VirtualTestPath( 

1681 name, 

1682 self, 

1683 mode=0o644, 

1684 content=content, 

1685 has_fs_path=True, 

1686 ) 

1687 

1688 def open( 

1689 self, 

1690 *, 

1691 byte_io: bool = False, 

1692 buffering: int = -1, 

1693 ) -> TextIO | BinaryIO: 

1694 if self._content is None: 

1695 try: 

1696 return super().open(byte_io=byte_io, buffering=buffering) 

1697 except FileNotFoundError as e: 

1698 raise TestPathWithNonExistentFSPathError( 

1699 f"The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1700 " exist. This exception can only occur in the testsuite. Either have the" 

1701 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1702 " if that is too painful in general, have the code accept this error as a " 

1703 " test only-case and provide a default." 

1704 ) from e 

1705 

1706 if byte_io: 

1707 return io.BytesIO(self._content.encode("utf-8")) 

1708 return io.StringIO(self._content) 

1709 

1710 def _replaced_path(self, new_fs_path: str) -> None: 

1711 self._fs_path = new_fs_path 

1712 

1713 

1714class OSFSOverlayBase(VirtualPathBase, Generic[FSP]): 

1715 __slots__ = ( 

1716 "_path", 

1717 "_fs_path", 

1718 "_parent", 

1719 "__weakref__", 

1720 ) 

1721 

1722 def __init__( 

1723 self, 

1724 path: str, 

1725 fs_path: str, 

1726 parent: FSP | None, 

1727 ) -> None: 

1728 self._path: str = path 

1729 prefix = "/" if fs_path.startswith("/") else "" 

1730 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1731 self._parent: ReferenceType[FSP] | None = ( 

1732 ref(parent) if parent is not None else None 

1733 ) 

1734 

1735 @property 

1736 def name(self) -> str: 

1737 return os.path.basename(self._path) 

1738 

1739 @property 

1740 def path(self) -> str: 

1741 return self._path 

1742 

1743 @property 

1744 def parent_dir(self) -> Optional["FSP"]: 

1745 parent = self._parent 

1746 if parent is None: 

1747 return None 

1748 resolved = parent() 

1749 if resolved is None: 

1750 raise RuntimeError("Parent was garbage collected!") 

1751 return resolved 

1752 

1753 @property 

1754 def fs_path(self) -> str: 

1755 return self._fs_path 

1756 

1757 def stat(self) -> os.stat_result: 

1758 return os.lstat(self.fs_path) 

1759 

1760 @property 

1761 def is_dir(self) -> bool: 

1762 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1763 try: 

1764 return stat.S_ISDIR(self.stat().st_mode) 

1765 except FileNotFoundError: 

1766 return False 

1767 

1768 @property 

1769 def is_file(self) -> bool: 

1770 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1771 try: 

1772 return stat.S_ISREG(self.stat().st_mode) 

1773 except FileNotFoundError: 

1774 return False 

1775 

1776 @property 

1777 def is_symlink(self) -> bool: 

1778 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1779 try: 

1780 return stat.S_ISLNK(self.stat().st_mode) 

1781 except FileNotFoundError: 

1782 return False 

1783 

1784 @property 

1785 def has_fs_path(self) -> bool: 

1786 return True 

1787 

1788 def open( 

1789 self, 

1790 *, 

1791 byte_io: bool = False, 

1792 buffering: int = -1, 

1793 ) -> TextIO | BinaryIO: 

1794 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1795 # case. 

1796 if not self.is_file and not self.is_symlink: 

1797 raise TypeError( 

1798 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1799 ) 

1800 

1801 if byte_io: 

1802 return open(self.fs_path, "rb", buffering=buffering) 

1803 return open(self.fs_path, encoding="utf-8", buffering=buffering) 

1804 

1805 def metadata( 

1806 self, 

1807 metadata_type: type[PMT], 

1808 *, 

1809 owning_plugin: str | None = None, 

1810 ) -> PathMetadataReference[PMT]: 

1811 current_plugin = self._current_plugin() 

1812 if owning_plugin is None: 

1813 owning_plugin = current_plugin 

1814 return AlwaysEmptyReadOnlyMetadataReference( 

1815 owning_plugin, 

1816 current_plugin, 

1817 metadata_type, 

1818 ) 

1819 

1820 def all_paths(self) -> Iterable["OSFSControlPath"]: 

1821 yield self 

1822 if not self.is_dir: 

1823 return 

1824 stack = list(self.iterdir) 

1825 stack.reverse() 

1826 while stack: 

1827 current = stack.pop() 

1828 yield current 

1829 if current.is_dir: 

1830 stack.extend(reversed(list(current.iterdir))) 

1831 

1832 def _resolve_children( 

1833 self, new_child: Callable[[str, str, FSP], FSC] 

1834 ) -> Mapping[str, FSC]: 

1835 if not self.is_dir: 

1836 return {} 

1837 dir_path = self.path 

1838 dir_fs_path = self.fs_path 

1839 children = {} 

1840 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1841 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1842 child_fs_path = ( 

1843 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1844 ) 

1845 children[name] = new_child( 

1846 child_path, 

1847 child_fs_path, 

1848 self, 

1849 ) 

1850 return children 

1851 

1852 

1853class OSFSROOverlay(OSFSOverlayBase["FSROOverlay"]): 

1854 __slots__ = ( 

1855 "_stat_cache", 

1856 "_readlink_cache", 

1857 "_children", 

1858 "_stat_failed_cache", 

1859 ) 

1860 

1861 def __init__( 

1862 self, 

1863 path: str, 

1864 fs_path: str, 

1865 parent: Optional["OSFSROOverlay"], 

1866 ) -> None: 

1867 super().__init__(path, fs_path, parent=parent) 

1868 self._stat_cache: os.stat_result | None = None 

1869 self._readlink_cache: str | None = None 

1870 self._stat_failed_cache = False 

1871 self._children: Mapping[str, OSFSROOverlay] | None = None 

1872 

1873 @classmethod 

1874 def create_root_dir(cls, path: str, fs_path: str) -> "OSFSROOverlay": 

1875 return OSFSROOverlay(path, fs_path, None) 

1876 

1877 @property 

1878 def iterdir(self) -> Iterable["OSFSROOverlay"]: 

1879 if not self.is_dir: 

1880 return 

1881 if self._children is None: 

1882 self._ensure_children_are_resolved() 

1883 yield from assume_not_none(self._children).values() 

1884 

1885 def lookup(self, path: str) -> Optional["OSFSROOverlay"]: 

1886 if not self.is_dir: 

1887 return None 

1888 if self._children is None: 

1889 self._ensure_children_are_resolved() 

1890 

1891 absolute, _, path_parts = _split_path(path) 

1892 current = cast("FSROOverlay", _root(self)) if absolute else self 

1893 for no, dir_part in enumerate(path_parts): 

1894 if dir_part == ".": 

1895 continue 

1896 if dir_part == "..": 

1897 p = current.parent_dir 

1898 if p is None: 

1899 raise ValueError(f'The path "{path}" escapes the root dir') 

1900 current = cast("FSROOverlay", p) 

1901 continue 

1902 try: 

1903 current = current[dir_part] 

1904 except KeyError: 

1905 return None 

1906 return current 

1907 

1908 def _ensure_children_are_resolved(self) -> None: 

1909 if not self.is_dir or self._children: 

1910 return 

1911 self._children = self._resolve_children( 

1912 lambda n, fsp, p: OSFSROOverlay(n, fsp, p) 

1913 ) 

1914 

1915 @property 

1916 def is_detached(self) -> bool: 

1917 return False 

1918 

1919 def __getitem__(self, key) -> "VirtualPath": 

1920 if not self.is_dir: 1920 ↛ 1922line 1920 didn't jump to line 1922 because the condition on line 1920 was always true

1921 raise KeyError(key) 

1922 if self._children is None: 

1923 self._ensure_children_are_resolved() 

1924 if isinstance(key, FSPath): 

1925 key = key.name 

1926 return self._children[key] 

1927 

1928 def __delitem__(self, key) -> Never: 

1929 self._error_ro_fs() 

1930 

1931 @property 

1932 def is_read_write(self) -> bool: 

1933 return False 

1934 

1935 def _rw_check(self) -> Never: 

1936 self._error_ro_fs() 

1937 

1938 def _error_ro_fs(self) -> Never: 

1939 raise DebputyFSIsROError( 

1940 f'Attempt to write to "{self.path}" failed:' 

1941 " Debputy Virtual File system is R/O." 

1942 ) 

1943 

1944 def stat(self) -> os.stat_result: 

1945 if self._stat_failed_cache: 1945 ↛ 1946line 1945 didn't jump to line 1946 because the condition on line 1945 was never true

1946 raise FileNotFoundError( 

1947 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1948 ) 

1949 

1950 if self._stat_cache is None: 1950 ↛ 1956line 1950 didn't jump to line 1956 because the condition on line 1950 was always true

1951 try: 

1952 self._stat_cache = os.lstat(self.fs_path) 

1953 except FileNotFoundError: 

1954 self._stat_failed_cache = True 

1955 raise 

1956 return self._stat_cache 

1957 

1958 @property 

1959 def mode(self) -> int: 

1960 return stat.S_IMODE(self.stat().st_mode) 

1961 

1962 @mode.setter 

1963 def mode(self, _unused: int) -> Never: 

1964 self._error_ro_fs() 

1965 

1966 @property 

1967 def mtime(self) -> float: 

1968 return self.stat().st_mtime 

1969 

1970 @mtime.setter 

1971 def mtime(self, new_mtime: float) -> Never: 

1972 self._error_ro_fs() 

1973 

1974 def readlink(self) -> str: 

1975 if not self.is_symlink: 

1976 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1977 if self._readlink_cache is None: 

1978 self._readlink_cache = os.readlink(self.fs_path) 

1979 return self._readlink_cache 

1980 

1981 def chown( 

1982 self, 

1983 owner: StaticFileSystemOwner | None, 

1984 group: StaticFileSystemGroup | None, 

1985 ) -> Never: 

1986 self._error_ro_fs() 

1987 

1988 def mkdir(self, name: str) -> Never: 

1989 self._error_ro_fs() 

1990 

1991 def add_file( 

1992 self, 

1993 name: str, 

1994 *, 

1995 unlink_if_exists: bool = True, 

1996 use_fs_path_mode: bool = False, 

1997 mode: int = 0o0644, 

1998 mtime: float | None = None, 

1999 ) -> Never: 

2000 self._error_ro_fs() 

2001 

2002 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2003 self._error_ro_fs() 

2004 

2005 def unlink(self, *, recursive: bool = False) -> Never: 

2006 self._error_ro_fs() 

2007 

2008 

2009class OSFSROOverlayRootDir(OSFSROOverlay): 

2010 __slots__ = ("_plugin_context",) 

2011 

2012 def __init__(self, path: str, fs_path: str) -> None: 

2013 super().__init__(path, fs_path, None) 

2014 self._plugin_context = CurrentPluginContextManager("debputy") 

2015 

2016 def _current_plugin(self) -> str: 

2017 return self._plugin_context.current_plugin_name 

2018 

2019 @contextlib.contextmanager 

2020 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2021 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2022 yield r 

2023 

2024 

2025class OSFSControlPath(OSFSOverlayBase["FSControlPath"]): 

2026 

2027 @property 

2028 def iterdir(self) -> Iterable["OSFSControlPath"]: 

2029 if not self.is_dir: 

2030 return 

2031 yield from self._resolve_children( 

2032 lambda n, fsp, p: OSFSControlPath(n, fsp, p) 

2033 ).values() 

2034 

2035 def lookup(self, path: str) -> Optional["OSFSControlPath"]: 

2036 if not self.is_dir: 

2037 return None 

2038 

2039 absolute, _, path_parts = _split_path(path) 

2040 current = cast("FSControlPath", _root(self)) if absolute else self 

2041 for no, dir_part in enumerate(path_parts): 

2042 if dir_part == ".": 

2043 continue 

2044 if dir_part == "..": 

2045 p = current.parent_dir 

2046 if p is None: 

2047 raise ValueError(f'The path "{path}" escapes the root dir') 

2048 current = cast("FSControlPath", p) 

2049 continue 

2050 try: 

2051 current = current[dir_part] 

2052 except KeyError: 

2053 return None 

2054 return current 

2055 

2056 @property 

2057 def is_detached(self) -> bool: 

2058 try: 

2059 self.stat() 

2060 except FileNotFoundError: 

2061 return True 

2062 else: 

2063 return False 

2064 

2065 def __getitem__(self, key) -> "VirtualPath": 

2066 if not self.is_dir: 

2067 raise KeyError(key) 

2068 children = self._resolve_children(lambda n, fsp, p: OSFSControlPath(n, fsp, p)) 

2069 if isinstance(key, FSPath): 

2070 key = key.name 

2071 return children[key] 

2072 

2073 def __delitem__(self, key) -> None: 

2074 self[key].unlink() 

2075 

2076 @property 

2077 def is_read_write(self) -> bool: 

2078 return True 

2079 

2080 @property 

2081 def mode(self) -> int: 

2082 return stat.S_IMODE(self.stat().st_mode) 

2083 

2084 @mode.setter 

2085 def mode(self, new_mode: int) -> None: 

2086 os.chmod(self.fs_path, new_mode) 

2087 

2088 @property 

2089 def mtime(self) -> float: 

2090 return self.stat().st_mtime 

2091 

2092 @mtime.setter 

2093 def mtime(self, new_mtime: float) -> None: 

2094 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2095 

2096 def readlink(self) -> Never: 

2097 if not self.is_symlink: 

2098 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2099 assert False 

2100 

2101 def chown( 

2102 self, 

2103 owner: StaticFileSystemOwner | None, 

2104 group: StaticFileSystemGroup | None, 

2105 ) -> None: 

2106 raise ValueError( 

2107 "No need to chown paths in the control.tar: They are always root:root" 

2108 ) 

2109 

2110 def mkdir(self, name: str) -> Never: 

2111 raise TypeError("The control.tar never contains subdirectories.") 

2112 

2113 @contextlib.contextmanager 

2114 def add_file( 

2115 self, 

2116 name: str, 

2117 *, 

2118 unlink_if_exists: bool = True, 

2119 use_fs_path_mode: bool = False, 

2120 mode: int = 0o0644, 

2121 mtime: float | None = None, 

2122 ) -> ContextManager["VirtualPath"]: 

2123 if "/" in name or name in {".", ".."}: 

2124 raise ValueError(f'Invalid file name: "{name}"') 

2125 if not self.is_dir: 

2126 raise TypeError( 

2127 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2128 f" {self._orphan_safe_path()} is not a directory" 

2129 ) 

2130 self._rw_check() 

2131 existing = self.get(name) 

2132 if existing is not None: 

2133 if not unlink_if_exists: 

2134 raise ValueError( 

2135 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2136 f" and exist_ok was False" 

2137 ) 

2138 assert existing.is_file 

2139 

2140 fs_path = os.path.join(self.fs_path, name) 

2141 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2142 with open(fs_path, "wb") as fd: 

2143 # Ensure that the fs_path exists and default mode is reasonable 

2144 os.chmod(fd.fileno(), mode) 

2145 child = OSFSControlPath( 

2146 name, 

2147 fs_path, 

2148 self, 

2149 ) 

2150 yield child 

2151 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2152 child.mode = mode 

2153 

2154 @contextlib.contextmanager 

2155 def replace_fs_path_content( 

2156 self, 

2157 *, 

2158 use_fs_path_mode: bool = False, 

2159 ) -> ContextManager[str]: 

2160 if not self.is_file: 

2161 raise TypeError( 

2162 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2163 ) 

2164 restore_mode = self.mode if use_fs_path_mode else None 

2165 yield self.fs_path 

2166 _check_fs_path_is_file(self.fs_path, self) 

2167 if restore_mode is not None: 

2168 self.mode = restore_mode 

2169 

2170 def add_symlink(self, link_name: str, link_target: str) -> Never: 

2171 raise TypeError("The control.tar never contains symlinks.") 

2172 

2173 def unlink(self, *, recursive: bool = False) -> None: 

2174 if self._parent is None: 

2175 return 

2176 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2177 # matters and that `os.unlink` will be sufficient. 

2178 assert self.is_file 

2179 os.unlink(self.fs_path) 

2180 

2181 

2182class FSControlRootDir(OSFSControlPath): 

2183 

2184 @classmethod 

2185 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2186 return FSControlRootDir(".", fs_path, None) 

2187 

2188 def insert_file_from_fs_path( 

2189 self, 

2190 name: str, 

2191 fs_path: str, 

2192 *, 

2193 exist_ok: bool = True, 

2194 use_fs_path_mode: bool = False, 

2195 mode: int = 0o0644, 

2196 # Ignored, but accepted for compat with FSPath's variant of this function. 

2197 # - This is used by install_or_generate_conffiles. 

2198 reference_path: VirtualPath | None = None, # noqa 

2199 ) -> "OSFSControlPath": 

2200 if "/" in name or name in {".", ".."}: 

2201 raise ValueError(f'Invalid file name: "{name}"') 

2202 if not self.is_dir: 

2203 raise TypeError( 

2204 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2205 f" {self._orphan_safe_path()} is not a directory" 

2206 ) 

2207 self._rw_check() 

2208 if name in self and not exist_ok: 

2209 raise ValueError( 

2210 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2211 f" and exist_ok was False" 

2212 ) 

2213 

2214 target_path = os.path.join(self.fs_path, name) 

2215 if use_fs_path_mode: 

2216 shutil.copymode( 

2217 fs_path, 

2218 target_path, 

2219 follow_symlinks=True, 

2220 ) 

2221 else: 

2222 shutil.copyfile( 

2223 fs_path, 

2224 target_path, 

2225 follow_symlinks=True, 

2226 ) 

2227 os.chmod(target_path, mode) 

2228 return cast("FSControlPath", self[name]) 

2229 

2230 

2231def as_path_def(pd: str | PathDef) -> PathDef: 

2232 return PathDef(pd) if isinstance(pd, str) else pd 

2233 

2234 

2235def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]: 

2236 yield from (as_path_def(p) for p in paths) 

2237 

2238 

2239def build_virtual_fs( 

2240 paths: Iterable[str | PathDef], 

2241 read_write_fs: bool = False, 

2242) -> "FSPath": 

2243 root_dir: FSRootDir | None = None 

2244 directories: dict[str, FSPath] = {} 

2245 non_directories = set() 

2246 

2247 def _ensure_parent_dirs(p: str) -> None: 

2248 current = p.rstrip("/") 

2249 missing_dirs = [] 

2250 while True: 

2251 current = os.path.dirname(current) 

2252 if current in directories: 

2253 break 

2254 if current in non_directories: 2254 ↛ 2255line 2254 didn't jump to line 2255 because the condition on line 2254 was never true

2255 raise ValueError( 

2256 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2257 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2258 ) 

2259 missing_dirs.append(current) 

2260 for dir_path in reversed(missing_dirs): 

2261 parent_dir = directories[os.path.dirname(dir_path)] 

2262 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2263 directories[dir_path] = d 

2264 

2265 for path_def in as_path_defs(paths): 

2266 path = path_def.path_name 

2267 if path in directories or path in non_directories: 2267 ↛ 2268line 2267 didn't jump to line 2268 because the condition on line 2267 was never true

2268 raise ValueError( 

2269 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2270 ' "correct order" (ensure directories occur before their children)' 

2271 ) 

2272 if root_dir is None: 

2273 root_fs_path = None 

2274 if path in (".", "./", "/"): 

2275 root_fs_path = path_def.fs_path 

2276 root_dir = FSRootDir(fs_path=root_fs_path) 

2277 directories["."] = root_dir 

2278 

2279 if path not in (".", "./", "/") and not path.startswith("./"): 

2280 path = "./" + path 

2281 if path not in (".", "./", "/"): 

2282 _ensure_parent_dirs(path) 

2283 if path in (".", "./"): 

2284 assert "." in directories 

2285 continue 

2286 is_dir = False 

2287 if path.endswith("/"): 

2288 path = path[:-1] 

2289 is_dir = True 

2290 directory = directories[os.path.dirname(path)] 

2291 assert not is_dir or not bool( 

2292 path_def.link_target 

2293 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2294 fs_path = VirtualTestPath( 

2295 os.path.basename(path), 

2296 directory, 

2297 is_dir=is_dir, 

2298 mode=path_def.mode, 

2299 mtime=path_def.mtime, 

2300 has_fs_path=path_def.has_fs_path, 

2301 fs_path=path_def.fs_path, 

2302 link_target=path_def.link_target, 

2303 content=path_def.content, 

2304 materialized_content=path_def.materialized_content, 

2305 ) 

2306 assert not fs_path.is_detached 

2307 if fs_path.is_dir: 

2308 directories[fs_path.path] = fs_path 

2309 else: 

2310 non_directories.add(fs_path.path) 

2311 

2312 if root_dir is None: 

2313 root_dir = FSRootDir() 

2314 

2315 root_dir.is_read_write = read_write_fs 

2316 return root_dir