Coverage for src/debputy/filesystem_scan.py: 66%

1295 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import shutil 

9import stat 

10import subprocess 

11import tempfile 

12import time 

13from abc import ABC 

14from contextlib import suppress 

15from typing import ( 

16 List, 

17 Dict, 

18 Optional, 

19 Tuple, 

20 Union, 

21 cast, 

22 Any, 

23 ContextManager, 

24 TextIO, 

25 BinaryIO, 

26 NoReturn, 

27 Type, 

28 Generic, 

29 TypeVar, 

30 overload, 

31 Literal, 

32) 

33from collections.abc import Iterable, Iterator, Mapping, Callable 

34from weakref import ref, ReferenceType 

35 

36from debputy.exceptions import ( 

37 PureVirtualPathError, 

38 DebputyFSIsROError, 

39 DebputyMetadataAccessError, 

40 TestPathWithNonExistentFSPathError, 

41 SymlinkLoopError, 

42) 

43from debputy.intermediate_manifest import PathType 

44from debputy.manifest_parser.base_types import ( 

45 ROOT_DEFINITION, 

46 StaticFileSystemOwner, 

47 StaticFileSystemGroup, 

48) 

49from debputy.plugin.api.spec import ( 

50 VirtualPath, 

51 PathDef, 

52 PathMetadataReference, 

53 PMT, 

54) 

55from debputy.types import VP 

56from debputy.util import ( 

57 generated_content_dir, 

58 _error, 

59 escape_shell, 

60 assume_not_none, 

61 _normalize_path, 

62 _debug_log, 

63) 

64 

65BY_BASENAME = operator.attrgetter("name") 

66 

67FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

68FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

69 

70 

71BinaryOpenMode = Literal[ 

72 "rb", 

73 "r+b", 

74 "wb", 

75 "w+b", 

76 "xb", 

77 "ab", 

78] 

79TextOpenMode = Literal[ 

80 "r", 

81 "r+", 

82 "rt", 

83 "r+t", 

84 "w", 

85 "w+", 

86 "wt", 

87 "w+t", 

88 "x", 

89 "xt", 

90 "a", 

91 "at", 

92] 

93OpenMode = Union[BinaryOpenMode, TextOpenMode] 

94 

95 

96class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

97 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

98 

99 def __init__( 

100 self, 

101 owning_plugin: str, 

102 current_plugin: str, 

103 metadata_type: type[PMT], 

104 ) -> None: 

105 self._owning_plugin = owning_plugin 

106 self._current_plugin = current_plugin 

107 self._metadata_type = metadata_type 

108 

109 @property 

110 def is_present(self) -> bool: 

111 return False 

112 

113 @property 

114 def can_read(self) -> bool: 

115 return self._owning_plugin == self._current_plugin 

116 

117 @property 

118 def can_write(self) -> bool: 

119 return False 

120 

121 @property 

122 def value(self) -> PMT | None: 

123 if self.can_read: 123 ↛ 125line 123 didn't jump to line 125 because the condition on line 123 was always true

124 return None 

125 raise DebputyMetadataAccessError( 

126 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

127 f" {self._owning_plugin} as the metadata has not been made" 

128 f" readable to the plugin {self._current_plugin}." 

129 ) 

130 

131 @value.setter 

132 def value(self, new_value: PMT) -> None: 

133 if self._is_owner: 

134 raise DebputyFSIsROError( 

135 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

136 ) 

137 raise DebputyMetadataAccessError( 

138 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

139 f" {self._owning_plugin} as the metadata has not been made" 

140 f" read-write to the plugin {self._current_plugin}." 

141 ) 

142 

143 @property 

144 def _is_owner(self) -> bool: 

145 return self._owning_plugin == self._current_plugin 

146 

147 

148@dataclasses.dataclass(slots=True) 

149class PathMetadataValue(Generic[PMT]): 

150 owning_plugin: str 

151 metadata_type: type[PMT] 

152 value: PMT | None = None 

153 

154 def can_read_value(self, current_plugin: str) -> bool: 

155 return self.owning_plugin == current_plugin 

156 

157 def can_write_value(self, current_plugin: str) -> bool: 

158 return self.owning_plugin == current_plugin 

159 

160 

161class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

162 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

163 

164 def __init__( 

165 self, 

166 owning_path: VirtualPath, 

167 current_plugin: str, 

168 path_metadata_value: PathMetadataValue[PMT], 

169 ) -> None: 

170 self._owning_path = owning_path 

171 self._current_plugin = current_plugin 

172 self._path_metadata_value = path_metadata_value 

173 

174 @property 

175 def is_present(self) -> bool: 

176 if not self.can_read: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 return False 

178 return self._path_metadata_value.value is not None 

179 

180 @property 

181 def can_read(self) -> bool: 

182 return self._path_metadata_value.can_read_value(self._current_plugin) 

183 

184 @property 

185 def can_write(self) -> bool: 

186 if not self._path_metadata_value.can_write_value(self._current_plugin): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 return False 

188 owning_path = self._owning_path 

189 return owning_path.is_read_write and not owning_path.is_detached 

190 

191 @property 

192 def value(self) -> PMT | None: 

193 if self.can_read: 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was always true

194 return self._path_metadata_value.value 

195 raise DebputyMetadataAccessError( 

196 f"Cannot read the metadata {self._metadata_type_name} owned by" 

197 f" {self._owning_plugin} as the metadata has not been made" 

198 f" readable to the plugin {self._current_plugin}." 

199 ) 

200 

201 @value.setter 

202 def value(self, new_value: PMT) -> None: 

203 if not self.can_write: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 m = "set" if new_value is not None else "delete" 

205 raise DebputyMetadataAccessError( 

206 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

207 f" {self._owning_plugin} as the metadata has not been made" 

208 f" read-write to the plugin {self._current_plugin}." 

209 ) 

210 owning_path = self._owning_path 

211 if not owning_path.is_read_write: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise DebputyFSIsROError( 

213 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

214 ) 

215 if owning_path.is_detached: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise TypeError( 

217 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

218 ) 

219 self._path_metadata_value.value = new_value 

220 

221 @property 

222 def _is_owner(self) -> bool: 

223 return self._owning_plugin == self._current_plugin 

224 

225 @property 

226 def _owning_plugin(self) -> str: 

227 return self._path_metadata_value.owning_plugin 

228 

229 @property 

230 def _metadata_type_name(self) -> str: 

231 return self._path_metadata_value.metadata_type.__name__ 

232 

233 

234def _cp_a(source: str, dest: str) -> None: 

235 cmd = ["cp", "-a", source, dest] 

236 try: 

237 subprocess.check_call(cmd) 

238 except subprocess.CalledProcessError: 

239 full_command = escape_shell(*cmd) 

240 _error( 

241 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

242 f" above to understand what went wrong. The full command was: {full_command}" 

243 ) 

244 

245 

246def _split_path(path: str) -> tuple[bool, bool, list[str]]: 

247 must_be_dir = True if path.endswith("/") else False 

248 absolute = False 

249 if path.startswith("/"): 

250 absolute = True 

251 path = "." + path 

252 path_parts = path.rstrip("/").split("/") 

253 if must_be_dir: 

254 path_parts.append(".") 

255 return absolute, must_be_dir, path_parts 

256 

257 

258def _root(path: VP) -> VP: 

259 current = path 

260 while True: 

261 parent = current.parent_dir 

262 if parent is None: 

263 return current 

264 current = parent 

265 

266 

267def _check_fs_path_is_file( 

268 fs_path: str, 

269 unlink_on_error: Optional["VirtualPath"] = None, 

270) -> None: 

271 had_issue = False 

272 try: 

273 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

274 st = os.lstat(fs_path) 

275 except FileNotFoundError: 

276 had_issue = True 

277 else: 

278 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 had_issue = True 

280 if not had_issue: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true

281 return 

282 

283 if unlink_on_error: 

284 with suppress(FileNotFoundError): 

285 os.unlink(fs_path) 

286 raise TypeError( 

287 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

288 " linked to another file. The entry has been disconnected." 

289 ) 

290 

291 

292class CurrentPluginContextManager: 

293 __slots__ = ("_plugin_names",) 

294 

295 def __init__(self, initial_plugin_name: str) -> None: 

296 self._plugin_names = [initial_plugin_name] 

297 

298 @property 

299 def current_plugin_name(self) -> str: 

300 return self._plugin_names[-1] 

301 

302 @contextlib.contextmanager 

303 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

304 self._plugin_names.append(new_plugin_name) 

305 yield new_plugin_name 

306 self._plugin_names.pop() 

307 

308 

309class VirtualPathBase(VirtualPath, ABC): 

310 __slots__ = () 

311 

312 def _orphan_safe_path(self) -> str: 

313 return self.path 

314 

315 def _rw_check(self) -> None: 

316 if not self.is_read_write: 

317 raise DebputyFSIsROError( 

318 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

319 " Debputy Virtual File system is R/O." 

320 ) 

321 

322 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

323 match, missing = self.attempt_lookup(path) 

324 if missing: 

325 return None 

326 return match 

327 

328 def attempt_lookup(self, path: str) -> tuple["VirtualPathBase", list[str]]: 

329 if self.is_detached: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 raise ValueError( 

331 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

332 ) 

333 absolute, must_be_dir, path_parts = _split_path(path) 

334 current = _root(self) if absolute else self 

335 path_parts.reverse() 

336 link_expansions = set() 

337 while path_parts: 

338 dir_part = path_parts.pop() 

339 if dir_part == ".": 

340 continue 

341 if dir_part == "..": 

342 p = current.parent_dir 

343 if p is None: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 raise ValueError(f'The path "{path}" escapes the root dir') 

345 current = p 

346 continue 

347 try: 

348 current = current[dir_part] 

349 except KeyError: 

350 path_parts.append(dir_part) 

351 path_parts.reverse() 

352 if must_be_dir: 

353 path_parts.pop() 

354 return current, path_parts 

355 if current.is_symlink and path_parts: 

356 if current.path in link_expansions: 

357 # This is our loop detection for now. It might have some false positives where you 

358 # could safely resolve the same symlink twice. However, given that this use-case is 

359 # basically non-existent in practice for packaging, we just stop here for now. 

360 raise SymlinkLoopError( 

361 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

362 " times. Currently, traversing the same symlink twice is considered" 

363 " a loop by `debputy` even if the path would eventually resolve." 

364 " Consider filing a feature request if you have a benign case that" 

365 " triggers this error." 

366 ) 

367 link_expansions.add(current.path) 

368 link_target = current.readlink() 

369 link_absolute, _, link_path_parts = _split_path(link_target) 

370 if link_absolute: 

371 current = _root(current) 

372 else: 

373 current = assume_not_none(current.parent_dir) 

374 link_path_parts.reverse() 

375 path_parts.extend(link_path_parts) 

376 return current, [] 

377 

378 def mkdirs(self, path: str) -> "VirtualPath": 

379 current: VirtualPath 

380 current, missing_parts = self.attempt_lookup( 

381 f"{path}/" if not path.endswith("/") else path 

382 ) 

383 if not current.is_dir: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true

384 raise ValueError( 

385 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

386 " a directory. However, that path exist AND is a not directory." 

387 ) 

388 for missing_part in missing_parts: 

389 assert missing_part not in (".", "..") 

390 current = current.mkdir(missing_part) 

391 return current 

392 

393 def prune_if_empty_dir(self) -> None: 

394 """Remove this and all (now) empty parent directories 

395 

396 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

397 

398 This operation may cause the path (and any of its parent directories) to become "detached" 

399 and therefore unsafe to use in further operations. 

400 """ 

401 self._rw_check() 

402 

403 if not self.is_dir: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

405 if any(self.iterdir): 

406 return 

407 parent_dir = assume_not_none(self.parent_dir) 

408 

409 # Recursive does not matter; we already know the directory is empty. 

410 self.unlink() 

411 

412 # Note: The root dir must never be deleted. This works because when delegating it to the root 

413 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

414 # inline loop (rather than recursion), be sure to preserve this feature. 

415 parent_dir.prune_if_empty_dir() 

416 

417 def _current_plugin(self) -> str: 

418 if self.is_detached: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true

419 raise TypeError("Cannot resolve the current plugin; path is detached") 

420 current = self 

421 while True: 

422 next_parent = current.parent_dir 

423 if next_parent is None: 

424 break 

425 current = next_parent 

426 assert current is not None 

427 return cast("FSRootDir", current)._current_plugin() 

428 

429 @overload 

430 def open_child( 430 ↛ exitline 430 didn't return from function 'open_child' because

431 self, 

432 name: str, 

433 mode: TextOpenMode = ..., 

434 buffering: int = -1, 

435 ) -> TextIO: ... 

436 

437 @overload 

438 def open_child( 438 ↛ exitline 438 didn't return from function 'open_child' because

439 self, 

440 name: str, 

441 mode: BinaryOpenMode = ..., 

442 buffering: int = -1, 

443 ) -> BinaryIO: ... 

444 

445 @contextlib.contextmanager 

446 def open_child( 

447 self, 

448 name: str, 

449 mode: OpenMode = "r", 

450 buffering: int = -1, 

451 ) -> TextIO | BinaryIO: 

452 """Open a child path of the current directory in a given mode. Usually used with a context manager 

453 

454 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

455 The following symbols are accepted with the same meaning as Python's open: 

456 * `r` 

457 * `w` 

458 * `x` 

459 * `a` 

460 * `+` 

461 * `b` 

462 * `t` 

463 

464 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

465 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

466 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

467 as-is. 

468 

469 :param name: The name of the child path to open. Must be a basename. 

470 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

471 examples. 

472 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

473 testing, the content may be purely in memory and use a BytesIO/StringIO 

474 (which does not accept that parameter, but then it is buffered in a different way) 

475 :return: The file handle. 

476 """ 

477 existing = self.get(name) 

478 if "r" in mode: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 if existing is None: 

480 raise ValueError( 

481 f"Path {self.path}/{name} does not exist and mode had `r`" 

482 ) 

483 if "+" not in mode: 

484 with existing.open(byte_io="b" in mode, buffering=buffering) as fd: 

485 yield fd 

486 

487 encoding = None if "b" in mode else "utf-8" 

488 

489 if existing: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 if "x" in mode: 

491 raise ValueError( 

492 f"Path {existing.path} already exists and mode had `x`" 

493 ) 

494 with ( 

495 existing.replace_fs_path_content() as fs_path, 

496 open(fs_path, mode, encoding=encoding) as fd, 

497 ): 

498 yield fd 

499 else: 

500 assert "r" not in mode 

501 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

502 with ( 

503 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

504 open(new_path.fs_path, mode, encoding=encoding) as fd, 

505 ): 

506 yield fd 

507 

508 

509class FSPath(VirtualPathBase, ABC): 

510 __slots__ = ( 

511 "_basename", 

512 "_parent_dir", 

513 "_children", 

514 "_path_cache", 

515 "_parent_path_cache", 

516 "_last_known_parent_path", 

517 "_mode", 

518 "_owner", 

519 "_group", 

520 "_mtime", 

521 "_stat_cache", 

522 "_metadata", 

523 "__weakref__", 

524 ) 

525 

526 def __init__( 

527 self, 

528 basename: str, 

529 parent: Optional["FSPath"], 

530 children: dict[str, "FSPath"] | None = None, 

531 initial_mode: int | None = None, 

532 mtime: float | None = None, 

533 stat_cache: os.stat_result | None = None, 

534 ) -> None: 

535 self._basename = basename 

536 self._path_cache: str | None = None 

537 self._parent_path_cache: str | None = None 

538 self._children = children 

539 self._last_known_parent_path: str | None = None 

540 self._mode = initial_mode 

541 self._mtime = mtime 

542 self._stat_cache = stat_cache 

543 self._metadata: dict[tuple[str, type[Any]], PathMetadataValue[Any]] = {} 

544 self._owner = ROOT_DEFINITION 

545 self._group = ROOT_DEFINITION 

546 

547 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

548 # is_orphaned, which assumes self._parent_dir is an attribute. 

549 self._parent_dir: ReferenceType["FSPath"] | None = None 

550 if parent is not None: 

551 self.parent_dir = parent 

552 

553 def __repr__(self) -> str: 

554 return ( 

555 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

556 f" is_file={self.is_file}," 

557 f" is_dir={self.is_dir}," 

558 f" is_symlink={self.is_symlink}," 

559 f" has_fs_path={self.has_fs_path}," 

560 f" children_len={len(self._children) if self._children else 0})" 

561 ) 

562 

563 @property 

564 def name(self) -> str: 

565 return self._basename 

566 

567 @name.setter 

568 def name(self, new_name: str) -> None: 

569 self._rw_check() 

570 if new_name == self._basename: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 return 

572 if self.is_detached: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true

573 self._basename = new_name 

574 return 

575 self._rw_check() 

576 parent = self.parent_dir 

577 # This little parent_dir dance ensures the parent dir detects the rename properly 

578 self.parent_dir = None 

579 self._basename = new_name 

580 self.parent_dir = parent 

581 

582 @property 

583 def iterdir(self) -> Iterable["FSPath"]: 

584 if self._children is not None: 

585 yield from self._children.values() 

586 

587 def all_paths(self) -> Iterable["FSPath"]: 

588 yield self 

589 if not self.is_dir: 

590 return 

591 by_basename = BY_BASENAME 

592 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

593 while stack: 

594 current = stack.pop() 

595 yield current 

596 if current.is_dir and not current.is_detached: 

597 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

598 

599 def walk(self) -> Iterable[tuple["FSPath", list["FSPath"]]]: 

600 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

601 if not self.is_dir: 601 ↛ 602line 601 didn't jump to line 602 because the condition on line 601 was never true

602 yield self, [] 

603 return 

604 by_basename = BY_BASENAME 

605 stack = [self] 

606 while stack: 

607 current = stack.pop() 

608 children = sorted(current.iterdir, key=by_basename) 

609 assert not children or current.is_dir 

610 yield current, children 

611 # Removing the directory counts as discarding the children. 

612 if not current.is_detached: 612 ↛ 606line 612 didn't jump to line 606 because the condition on line 612 was always true

613 stack.extend(reversed(children)) 

614 

615 def _orphan_safe_path(self) -> str: 

616 if not self.is_detached or self._last_known_parent_path is not None: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true

617 return self.path 

618 return f"<orphaned>/{self.name}" 

619 

620 @property 

621 def is_detached(self) -> bool: 

622 parent = self._parent_dir 

623 if parent is None: 

624 return True 

625 resolved_parent = parent() 

626 if resolved_parent is None: 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true

627 return True 

628 return resolved_parent.is_detached 

629 

630 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

631 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

632 # behavior to avoid surprises for now. 

633 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

634 # to using it) 

635 __iter__ = None 

636 

637 def __getitem__(self, key) -> "FSPath": 

638 if self._children is None: 

639 raise KeyError( 

640 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

641 ) 

642 if isinstance(key, FSPath): 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 key = key.name 

644 return self._children[key] 

645 

646 def __delitem__(self, key) -> None: 

647 self._rw_check() 

648 children = self._children 

649 if children is None: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true

650 raise KeyError(key) 

651 del children[key] 

652 

653 def get(self, key: str) -> "Optional[FSPath]": 

654 try: 

655 return self[key] 

656 except KeyError: 

657 return None 

658 

659 def __contains__(self, item: object) -> bool: 

660 if isinstance(item, VirtualPath): 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 return item.parent_dir is self 

662 if not isinstance(item, str): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 return False 

664 m = self.get(item) 

665 return m is not None 

666 

667 def _add_child(self, child: "FSPath") -> None: 

668 self._rw_check() 

669 if not self.is_dir: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true

670 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

671 if self._children is None: 

672 self._children = {} 

673 

674 conflict_child = self.get(child.name) 

675 if conflict_child is not None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 conflict_child.unlink(recursive=True) 

677 self._children[child.name] = child 

678 

679 @property 

680 def tar_path(self) -> str: 

681 path = self.path 

682 if self.is_dir: 

683 return path + "/" 

684 return path 

685 

686 @property 

687 def path(self) -> str: 

688 parent_path = self.parent_dir_path 

689 if ( 

690 self._parent_path_cache is not None 

691 and self._parent_path_cache == parent_path 

692 ): 

693 return assume_not_none(self._path_cache) 

694 if parent_path is None: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true

695 raise ReferenceError( 

696 f"The path {self.name} is detached! {self.__class__.__name__}" 

697 ) 

698 self._parent_path_cache = parent_path 

699 ret = os.path.join(parent_path, self.name) 

700 self._path_cache = ret 

701 return ret 

702 

703 @property 

704 def parent_dir(self) -> Optional["FSPath"]: 

705 p_ref = self._parent_dir 

706 p = p_ref() if p_ref is not None else None 

707 if p is None: 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 raise ReferenceError( 

709 f"The path {self.name} is detached! {self.__class__.__name__}" 

710 ) 

711 return p 

712 

713 @parent_dir.setter 

714 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

715 self._rw_check() 

716 if new_parent is not None: 

717 if not new_parent.is_dir: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 raise ValueError( 

719 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

720 ) 

721 new_parent._rw_check() 

722 old_parent = None 

723 self._last_known_parent_path = None 

724 if not self.is_detached: 

725 old_parent = self.parent_dir 

726 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

727 del old_parent_children[self.name] 

728 if new_parent is not None: 

729 self._parent_dir = ref(new_parent) 

730 new_parent._add_child(self) 

731 else: 

732 if old_parent is not None and not old_parent.is_detached: 732 ↛ 734line 732 didn't jump to line 734 because the condition on line 732 was always true

733 self._last_known_parent_path = old_parent.path 

734 self._parent_dir = None 

735 self._parent_path_cache = None 

736 

737 @property 

738 def parent_dir_path(self) -> str | None: 

739 if self.is_detached: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true

740 return self._last_known_parent_path 

741 return assume_not_none(self.parent_dir).path 

742 

743 def chown( 

744 self, 

745 owner: StaticFileSystemOwner | None, 

746 group: StaticFileSystemGroup | None, 

747 ) -> None: 

748 """Change the owner/group of this path 

749 

750 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

751 :param group: The desired group definition for this path. If None, then no change of group is performed. 

752 """ 

753 self._rw_check() 

754 

755 if owner is not None: 

756 self._owner = owner.ownership_definition 

757 if group is not None: 

758 self._group = group.ownership_definition 

759 

760 def stat(self) -> os.stat_result: 

761 st = self._stat_cache 

762 if st is None: 

763 st = self._uncached_stat() 

764 self._stat_cache = st 

765 return st 

766 

767 def _uncached_stat(self) -> os.stat_result: 

768 return os.lstat(self.fs_path) 

769 

770 @property 

771 def mode(self) -> int: 

772 current_mode = self._mode 

773 if current_mode is None: 773 ↛ 774line 773 didn't jump to line 774 because the condition on line 773 was never true

774 current_mode = stat.S_IMODE(self.stat().st_mode) 

775 self._mode = current_mode 

776 return current_mode 

777 

778 @mode.setter 

779 def mode(self, new_mode: int) -> None: 

780 self._rw_check() 

781 min_bit = 0o700 if self.is_dir else 0o400 

782 if (new_mode & min_bit) != min_bit: 782 ↛ 783line 782 didn't jump to line 783 because the condition on line 782 was never true

783 omode = oct(new_mode)[2:] 

784 omin = oct(min_bit)[2:] 

785 raise ValueError( 

786 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

787 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

788 " There are no paths that do not need these requirements met and they can cause" 

789 " problems during build or on the final system." 

790 ) 

791 self._mode = new_mode 

792 

793 def _ensure_min_mode(self) -> None: 

794 min_bit = 0o700 if self.is_dir else 0o600 

795 if self.has_fs_path and (self.mode & 0o600) != 0o600: 795 ↛ 796line 795 didn't jump to line 796 because the condition on line 795 was never true

796 try: 

797 fs_path = self.fs_path 

798 except TestPathWithNonExistentFSPathError: 

799 pass 

800 else: 

801 st = os.stat(fs_path) 

802 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

803 _debug_log( 

804 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

805 ) 

806 os.chmod(fs_path, new_fs_mode) 

807 self.mode |= min_bit 

808 

809 @property 

810 def mtime(self) -> float: 

811 mtime = self._mtime 

812 if mtime is None: 

813 mtime = self.stat().st_mtime 

814 self._mtime = mtime 

815 return mtime 

816 

817 @mtime.setter 

818 def mtime(self, new_mtime: float) -> None: 

819 self._rw_check() 

820 self._mtime = new_mtime 

821 

822 @property 

823 def tar_owner_info(self) -> tuple[str, int, str, int]: 

824 owner = self._owner 

825 group = self._group 

826 return ( 

827 owner.entity_name, 

828 owner.entity_id, 

829 group.entity_name, 

830 group.entity_id, 

831 ) 

832 

833 @property 

834 def _can_replace_inline(self) -> bool: 

835 return False 

836 

837 @contextlib.contextmanager 

838 def add_file( 

839 self, 

840 name: str, 

841 *, 

842 unlink_if_exists: bool = True, 

843 use_fs_path_mode: bool = False, 

844 mode: int = 0o0644, 

845 mtime: float | None = None, 

846 # Special-case parameters that are not exposed in the API 

847 fs_basename_matters: bool = False, 

848 subdir_key: str | None = None, 

849 ) -> Iterator["FSPath"]: 

850 if "/" in name or name in {".", ".."}: 850 ↛ 851line 850 didn't jump to line 851 because the condition on line 850 was never true

851 raise ValueError(f'Invalid file name: "{name}"') 

852 if not self.is_dir: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true

853 raise TypeError( 

854 f"Cannot create {self._orphan_safe_path()}/{name}:" 

855 f" {self._orphan_safe_path()} is not a directory" 

856 ) 

857 self._rw_check() 

858 existing = self.get(name) 

859 if existing is not None: 859 ↛ 860line 859 didn't jump to line 860 because the condition on line 859 was never true

860 if not unlink_if_exists: 

861 raise ValueError( 

862 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

863 f" and exist_ok was False" 

864 ) 

865 existing.unlink(recursive=False) 

866 

867 if fs_basename_matters and subdir_key is None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true

868 raise ValueError( 

869 "When fs_basename_matters is True, a subdir_key must be provided" 

870 ) 

871 

872 directory = generated_content_dir(subdir_key=subdir_key) 

873 

874 if fs_basename_matters: 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true

875 fs_path = os.path.join(directory, name) 

876 with open(fs_path, "xb") as _: 

877 # Ensure that the fs_path exists 

878 pass 

879 child = FSBackedFilePath( 

880 name, 

881 self, 

882 fs_path, 

883 replaceable_inline=True, 

884 mtime=mtime, 

885 ) 

886 yield child 

887 else: 

888 with tempfile.NamedTemporaryFile( 

889 dir=directory, suffix=f"__{name}", delete=False 

890 ) as fd: 

891 fs_path = fd.name 

892 child = FSBackedFilePath( 

893 name, 

894 self, 

895 fs_path, 

896 replaceable_inline=True, 

897 mtime=mtime, 

898 ) 

899 fd.close() 

900 yield child 

901 

902 if use_fs_path_mode: 902 ↛ 904line 902 didn't jump to line 904 because the condition on line 902 was never true

903 # Ensure the caller can see the current mode 

904 os.chmod(fs_path, mode) 

905 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

906 child._reset_caches() 

907 if not use_fs_path_mode: 907 ↛ exitline 907 didn't return from function 'add_file' because the condition on line 907 was always true

908 child.mode = mode 

909 

910 def insert_file_from_fs_path( 

911 self, 

912 name: str, 

913 fs_path: str, 

914 *, 

915 exist_ok: bool = True, 

916 use_fs_path_mode: bool = False, 

917 mode: int = 0o0644, 

918 require_copy_on_write: bool = True, 

919 follow_symlinks: bool = True, 

920 reference_path: VirtualPath | None = None, 

921 ) -> "FSPath": 

922 if "/" in name or name in {".", ".."}: 922 ↛ 923line 922 didn't jump to line 923 because the condition on line 922 was never true

923 raise ValueError(f'Invalid file name: "{name}"') 

924 if not self.is_dir: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true

925 raise TypeError( 

926 f"Cannot create {self._orphan_safe_path()}/{name}:" 

927 f" {self._orphan_safe_path()} is not a directory" 

928 ) 

929 self._rw_check() 

930 if name in self and not exist_ok: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true

931 raise ValueError( 

932 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

933 f" and exist_ok was False" 

934 ) 

935 new_fs_path = fs_path 

936 if follow_symlinks: 

937 if reference_path is not None: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 raise ValueError( 

939 "The reference_path cannot be used with follow_symlinks" 

940 ) 

941 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

942 

943 fmode: int | None = mode 

944 if use_fs_path_mode: 

945 fmode = None 

946 

947 st = None 

948 if reference_path is None: 

949 st = os.lstat(new_fs_path) 

950 if stat.S_ISDIR(st.st_mode): 950 ↛ 951line 950 didn't jump to line 951 because the condition on line 950 was never true

951 raise ValueError( 

952 f'The provided path "{fs_path}" is a directory. However, this' 

953 " method does not support directories" 

954 ) 

955 

956 if not stat.S_ISREG(st.st_mode): 956 ↛ 957line 956 didn't jump to line 957 because the condition on line 956 was never true

957 if follow_symlinks: 

958 raise ValueError( 

959 f"The resolved fs_path ({new_fs_path}) was not a file." 

960 ) 

961 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

962 return FSBackedFilePath( 

963 name, 

964 self, 

965 new_fs_path, 

966 initial_mode=fmode, 

967 stat_cache=st, 

968 replaceable_inline=not require_copy_on_write, 

969 reference_path=reference_path, 

970 ) 

971 

972 def add_symlink( 

973 self, 

974 link_name: str, 

975 link_target: str, 

976 *, 

977 reference_path: VirtualPath | None = None, 

978 ) -> "FSPath": 

979 if "/" in link_name or link_name in {".", ".."}: 979 ↛ 980line 979 didn't jump to line 980 because the condition on line 979 was never true

980 raise ValueError( 

981 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

982 ) 

983 if not self.is_dir: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true

984 raise TypeError( 

985 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

986 f" {self._orphan_safe_path()} is not a directory" 

987 ) 

988 self._rw_check() 

989 

990 existing = self.get(link_name) 

991 if existing: 991 ↛ 993line 991 didn't jump to line 993 because the condition on line 991 was never true

992 # Emulate ln -sf with attempts a non-recursive unlink first. 

993 existing.unlink(recursive=False) 

994 

995 return SymlinkVirtualPath( 

996 link_name, 

997 self, 

998 link_target, 

999 reference_path=reference_path, 

1000 ) 

1001 

1002 def mkdir( 

1003 self, 

1004 name: str, 

1005 *, 

1006 reference_path: VirtualPath | None = None, 

1007 ) -> "FSPath": 

1008 if "/" in name or name in {".", ".."}: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 raise ValueError( 

1010 f'Invalid file name: "{name}" (it must be a valid basename)' 

1011 ) 

1012 if not self.is_dir: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true

1013 raise TypeError( 

1014 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1015 f" {self._orphan_safe_path()} is not a directory" 

1016 ) 

1017 if reference_path is not None and not reference_path.is_dir: 1017 ↛ 1018line 1017 didn't jump to line 1018 because the condition on line 1017 was never true

1018 raise ValueError( 

1019 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1020 ) 

1021 self._rw_check() 

1022 

1023 existing = self.get(name) 

1024 if existing: 1024 ↛ 1025line 1024 didn't jump to line 1025 because the condition on line 1024 was never true

1025 raise ValueError(f"Path {existing.path} already exist") 

1026 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1027 

1028 def mkdirs(self, path: str) -> "FSPath": 

1029 return cast("FSPath", super().mkdirs(path)) 

1030 

1031 @property 

1032 def is_read_write(self) -> bool: 

1033 """When true, the file system entry may be mutated 

1034 

1035 :return: Whether file system mutations are permitted. 

1036 """ 

1037 if self.is_detached: 

1038 return True 

1039 return assume_not_none(self.parent_dir).is_read_write 

1040 

1041 def unlink(self, *, recursive: bool = False) -> None: 

1042 """Unlink a file or a directory 

1043 

1044 This operation will detach the path from the file system (causing "is_detached" to return True). 

1045 

1046 Note that the root directory cannot be deleted. 

1047 

1048 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1049 as well. When False, an error is raised if the path is a non-empty directory 

1050 """ 

1051 if self.is_detached: 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 return 

1053 if not recursive and any(self.iterdir): 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 raise ValueError( 

1055 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1056 ) 

1057 # The .parent_dir setter does a _rw_check() for us. 

1058 self.parent_dir = None 

1059 

1060 def _reset_caches(self) -> None: 

1061 self._mtime = None 

1062 self._stat_cache = None 

1063 

1064 def metadata( 

1065 self, 

1066 metadata_type: type[PMT], 

1067 *, 

1068 owning_plugin: str | None = None, 

1069 ) -> PathMetadataReference[PMT]: 

1070 current_plugin = self._current_plugin() 

1071 if owning_plugin is None: 1071 ↛ 1073line 1071 didn't jump to line 1073 because the condition on line 1071 was always true

1072 owning_plugin = current_plugin 

1073 metadata_key = (owning_plugin, metadata_type) 

1074 metadata_value = self._metadata.get(metadata_key) 

1075 if metadata_value is None: 

1076 if self.is_detached: 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true

1077 raise TypeError( 

1078 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1079 ) 

1080 if not self.is_read_write: 

1081 return AlwaysEmptyReadOnlyMetadataReference( 

1082 owning_plugin, 

1083 current_plugin, 

1084 metadata_type, 

1085 ) 

1086 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1087 self._metadata[metadata_key] = metadata_value 

1088 return PathMetadataReferenceImplementation( 

1089 self, 

1090 current_plugin, 

1091 metadata_value, 

1092 ) 

1093 

1094 @contextlib.contextmanager 

1095 def replace_fs_path_content( 

1096 self, 

1097 *, 

1098 use_fs_path_mode: bool = False, 

1099 ) -> Iterator[str]: 

1100 if not self.is_file: 1100 ↛ 1101line 1100 didn't jump to line 1101 because the condition on line 1100 was never true

1101 raise TypeError( 

1102 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1103 ) 

1104 self._rw_check() 

1105 fs_path = self.fs_path 

1106 if not self._can_replace_inline: 1106 ↛ 1118line 1106 didn't jump to line 1118 because the condition on line 1106 was always true

1107 fs_path = self.fs_path 

1108 directory = generated_content_dir() 

1109 with tempfile.NamedTemporaryFile( 

1110 dir=directory, suffix=f"__{self.name}", delete=False 

1111 ) as new_path_fd: 

1112 new_path_fd.close() 

1113 _cp_a(fs_path, new_path_fd.name) 

1114 fs_path = new_path_fd.name 

1115 self._replaced_path(fs_path) 

1116 assert self.fs_path == fs_path 

1117 

1118 current_mtime = self._mtime 

1119 if current_mtime is not None: 

1120 os.utime(fs_path, (current_mtime, current_mtime)) 

1121 

1122 current_mode = self.mode 

1123 yield fs_path 

1124 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1125 if not use_fs_path_mode: 1125 ↛ 1127line 1125 didn't jump to line 1127 because the condition on line 1125 was always true

1126 os.chmod(fs_path, current_mode) 

1127 self._reset_caches() 

1128 

1129 def _replaced_path(self, new_fs_path: str) -> None: 

1130 raise NotImplementedError 

1131 

1132 

1133class VirtualFSPathBase(FSPath, ABC): 

1134 __slots__ = () 

1135 

1136 def __init__( 

1137 self, 

1138 basename: str, 

1139 parent: Optional["FSPath"], 

1140 children: dict[str, "FSPath"] | None = None, 

1141 initial_mode: int | None = None, 

1142 mtime: float | None = None, 

1143 stat_cache: os.stat_result | None = None, 

1144 ) -> None: 

1145 super().__init__( 

1146 basename, 

1147 parent, 

1148 children, 

1149 initial_mode=initial_mode, 

1150 mtime=mtime, 

1151 stat_cache=stat_cache, 

1152 ) 

1153 

1154 @property 

1155 def mtime(self) -> float: 

1156 mtime = self._mtime 

1157 if mtime is None: 

1158 mtime = time.time() 

1159 self._mtime = mtime 

1160 return mtime 

1161 

1162 @property 

1163 def has_fs_path(self) -> bool: 

1164 return False 

1165 

1166 def stat(self) -> os.stat_result: 

1167 if not self.has_fs_path: 

1168 raise PureVirtualPathError( 

1169 "stat() is only applicable to paths backed by the file system. The path" 

1170 f" {self._orphan_safe_path()!r} is purely virtual" 

1171 ) 

1172 return super().stat() 

1173 

1174 @property 

1175 def fs_path(self) -> str: 

1176 if not self.has_fs_path: 

1177 raise PureVirtualPathError( 

1178 "fs_path is only applicable to paths backed by the file system. The path" 

1179 f" {self._orphan_safe_path()!r} is purely virtual" 

1180 ) 

1181 return self.fs_path 

1182 

1183 

1184class FSRootDir(FSPath): 

1185 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1186 

1187 def __init__(self, fs_path: str | None = None) -> None: 

1188 self._fs_path = fs_path 

1189 self._fs_read_write = True 

1190 super().__init__( 

1191 ".", 

1192 None, 

1193 children={}, 

1194 initial_mode=0o755, 

1195 ) 

1196 self._plugin_context = CurrentPluginContextManager("debputy") 

1197 

1198 @property 

1199 def is_detached(self) -> bool: 

1200 return False 

1201 

1202 def _orphan_safe_path(self) -> str: 

1203 return self.name 

1204 

1205 @property 

1206 def path(self) -> str: 

1207 return self.name 

1208 

1209 @property 

1210 def parent_dir(self) -> Optional["FSPath"]: 

1211 return None 

1212 

1213 @parent_dir.setter 

1214 def parent_dir(self, new_parent: FSPath | None) -> None: 

1215 if new_parent is not None: 

1216 raise ValueError("The root directory cannot become a non-root directory") 

1217 

1218 @property 

1219 def parent_dir_path(self) -> str | None: 

1220 return None 

1221 

1222 @property 

1223 def is_dir(self) -> bool: 

1224 return True 

1225 

1226 @property 

1227 def is_file(self) -> bool: 

1228 return False 

1229 

1230 @property 

1231 def is_symlink(self) -> bool: 

1232 return False 

1233 

1234 def readlink(self) -> str: 

1235 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1236 

1237 @property 

1238 def has_fs_path(self) -> bool: 

1239 return self._fs_path is not None 

1240 

1241 def stat(self) -> os.stat_result: 

1242 if not self.has_fs_path: 

1243 raise PureVirtualPathError( 

1244 "stat() is only applicable to paths backed by the file system. The path" 

1245 f" {self._orphan_safe_path()!r} is purely virtual" 

1246 ) 

1247 return os.stat(self.fs_path) 

1248 

1249 @property 

1250 def fs_path(self) -> str: 

1251 if not self.has_fs_path: 1251 ↛ 1252line 1251 didn't jump to line 1252 because the condition on line 1251 was never true

1252 raise PureVirtualPathError( 

1253 "fs_path is only applicable to paths backed by the file system. The path" 

1254 f" {self._orphan_safe_path()!r} is purely virtual" 

1255 ) 

1256 return assume_not_none(self._fs_path) 

1257 

1258 @property 

1259 def is_read_write(self) -> bool: 

1260 return self._fs_read_write 

1261 

1262 @is_read_write.setter 

1263 def is_read_write(self, new_value: bool) -> None: 

1264 self._fs_read_write = new_value 

1265 

1266 def prune_if_empty_dir(self) -> None: 

1267 # No-op for the root directory. There is never a case where you want to delete this directory 

1268 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1269 return 

1270 

1271 def unlink(self, *, recursive: bool = False) -> None: 

1272 # There is never a case where you want to delete this directory (and even if you could, 

1273 # debputy will need it for technical reasons, so the root dir stays) 

1274 raise TypeError("Cannot delete the root directory") 

1275 

1276 def _current_plugin(self) -> str: 

1277 return self._plugin_context.current_plugin_name 

1278 

1279 @contextlib.contextmanager 

1280 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1281 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1282 yield r 

1283 

1284 

1285class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1286 __slots__ = ("_reference_path",) 

1287 

1288 def __init__( 

1289 self, 

1290 basename: str, 

1291 parent: FSPath, 

1292 *, 

1293 default_mode: int, 

1294 reference_path: VirtualPath | None = None, 

1295 ) -> None: 

1296 super().__init__( 

1297 basename, 

1298 parent=parent, 

1299 initial_mode=reference_path.mode if reference_path else default_mode, 

1300 ) 

1301 self._reference_path = reference_path 

1302 

1303 @property 

1304 def has_fs_path(self) -> bool: 

1305 ref_path = self._reference_path 

1306 return ref_path is not None and ref_path.has_fs_path 

1307 

1308 @property 

1309 def mtime(self) -> float: 

1310 mtime = self._mtime 

1311 if mtime is None: 1311 ↛ 1318line 1311 didn't jump to line 1318 because the condition on line 1311 was always true

1312 ref_path = self._reference_path 

1313 if ref_path: 1313 ↛ 1316line 1313 didn't jump to line 1316 because the condition on line 1313 was always true

1314 mtime = ref_path.mtime 

1315 else: 

1316 mtime = super().mtime 

1317 self._mtime = mtime 

1318 return mtime 

1319 

1320 @mtime.setter 

1321 def mtime(self, new_mtime: float) -> None: 

1322 self._rw_check() 

1323 self._mtime = new_mtime 

1324 

1325 @property 

1326 def fs_path(self) -> str: 

1327 ref_path = self._reference_path 

1328 if ref_path is not None and ( 1328 ↛ 1332line 1328 didn't jump to line 1332 because the condition on line 1328 was always true

1329 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1330 ): 

1331 return ref_path.fs_path 

1332 return super().fs_path 

1333 

1334 def stat(self) -> os.stat_result: 

1335 ref_path = self._reference_path 

1336 if ref_path is not None and ( 

1337 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1338 ): 

1339 return ref_path.stat() 

1340 return super().stat() 

1341 

1342 def open( 

1343 self, 

1344 *, 

1345 byte_io: bool = False, 

1346 buffering: int = -1, 

1347 ) -> TextIO | BinaryIO: 

1348 reference_path = self._reference_path 

1349 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1350 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1351 return super().open(byte_io=byte_io, buffering=buffering) 

1352 

1353 

1354class VirtualDirectoryFSPath(VirtualPathWithReference): 

1355 __slots__ = ("_reference_path",) 

1356 

1357 def __init__( 

1358 self, 

1359 basename: str, 

1360 parent: FSPath, 

1361 *, 

1362 reference_path: VirtualPath | None = None, 

1363 ) -> None: 

1364 super().__init__( 

1365 basename, 

1366 parent, 

1367 reference_path=reference_path, 

1368 default_mode=0o755, 

1369 ) 

1370 self._reference_path = reference_path 

1371 assert reference_path is None or reference_path.is_dir 

1372 self._ensure_min_mode() 

1373 

1374 @property 

1375 def is_dir(self) -> bool: 

1376 return True 

1377 

1378 @property 

1379 def is_file(self) -> bool: 

1380 return False 

1381 

1382 @property 

1383 def is_symlink(self) -> bool: 

1384 return False 

1385 

1386 def readlink(self) -> str: 

1387 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1388 

1389 

1390class SymlinkVirtualPath(VirtualPathWithReference): 

1391 __slots__ = ("_link_target",) 

1392 

1393 def __init__( 

1394 self, 

1395 basename: str, 

1396 parent_dir: FSPath, 

1397 link_target: str, 

1398 *, 

1399 reference_path: VirtualPath | None = None, 

1400 ) -> None: 

1401 super().__init__( 

1402 basename, 

1403 parent=parent_dir, 

1404 default_mode=_SYMLINK_MODE, 

1405 reference_path=reference_path, 

1406 ) 

1407 self._link_target = link_target 

1408 

1409 @property 

1410 def is_dir(self) -> bool: 

1411 return False 

1412 

1413 @property 

1414 def is_file(self) -> bool: 

1415 return False 

1416 

1417 @property 

1418 def is_symlink(self) -> bool: 

1419 return True 

1420 

1421 def readlink(self) -> str: 

1422 return self._link_target 

1423 

1424 @property 

1425 def size(self) -> int: 

1426 return len(self.readlink()) 

1427 

1428 

1429class FSBackedFilePath(VirtualPathWithReference): 

1430 __slots__ = ("_fs_path", "_replaceable_inline") 

1431 

1432 def __init__( 

1433 self, 

1434 basename: str, 

1435 parent_dir: FSPath, 

1436 fs_path: str, 

1437 *, 

1438 replaceable_inline: bool = False, 

1439 initial_mode: int | None = None, 

1440 mtime: float | None = None, 

1441 stat_cache: os.stat_result | None = None, 

1442 reference_path: VirtualPath | None = None, 

1443 ) -> None: 

1444 super().__init__( 

1445 basename, 

1446 parent_dir, 

1447 default_mode=0o644, 

1448 reference_path=reference_path, 

1449 ) 

1450 self._fs_path = fs_path 

1451 self._replaceable_inline = replaceable_inline 

1452 if initial_mode is not None: 

1453 self.mode = initial_mode 

1454 if mtime is not None: 

1455 self._mtime = mtime 

1456 self._stat_cache = stat_cache 

1457 assert ( 

1458 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1459 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1460 self._ensure_min_mode() 

1461 

1462 @property 

1463 def is_dir(self) -> bool: 

1464 return False 

1465 

1466 @property 

1467 def is_file(self) -> bool: 

1468 return True 

1469 

1470 @property 

1471 def is_symlink(self) -> bool: 

1472 return False 

1473 

1474 def readlink(self) -> str: 

1475 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1476 

1477 @property 

1478 def has_fs_path(self) -> bool: 

1479 return True 

1480 

1481 @property 

1482 def fs_path(self) -> str: 

1483 return self._fs_path 

1484 

1485 @property 

1486 def _can_replace_inline(self) -> bool: 

1487 return self._replaceable_inline 

1488 

1489 def _replaced_path(self, new_fs_path: str) -> None: 

1490 self._fs_path = new_fs_path 

1491 self._reference_path = None 

1492 self._replaceable_inline = True 

1493 

1494 

1495_SYMLINK_MODE = 0o777 

1496 

1497 

1498class VirtualTestPath(FSPath): 

1499 __slots__ = ( 

1500 "_path_type", 

1501 "_has_fs_path", 

1502 "_fs_path", 

1503 "_link_target", 

1504 "_content", 

1505 "_materialized_content", 

1506 ) 

1507 

1508 def __init__( 

1509 self, 

1510 basename: str, 

1511 parent_dir: FSPath | None, 

1512 mode: int | None = None, 

1513 mtime: float | None = None, 

1514 is_dir: bool = False, 

1515 has_fs_path: bool | None = False, 

1516 fs_path: str | None = None, 

1517 link_target: str | None = None, 

1518 content: str | None = None, 

1519 materialized_content: str | None = None, 

1520 ) -> None: 

1521 if is_dir: 

1522 self._path_type = PathType.DIRECTORY 

1523 elif link_target is not None: 

1524 self._path_type = PathType.SYMLINK 

1525 if mode is not None and mode != _SYMLINK_MODE: 1525 ↛ 1526line 1525 didn't jump to line 1526 because the condition on line 1525 was never true

1526 raise ValueError( 

1527 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1528 ) 

1529 assert mode is None or mode == _SYMLINK_MODE 

1530 else: 

1531 self._path_type = PathType.FILE 

1532 

1533 if mode is not None: 

1534 initial_mode = mode 

1535 else: 

1536 initial_mode = 0o755 if is_dir else 0o644 

1537 

1538 self._link_target = link_target 

1539 if has_fs_path is None: 

1540 has_fs_path = bool(fs_path) 

1541 self._has_fs_path = has_fs_path 

1542 self._fs_path = fs_path 

1543 self._materialized_content = materialized_content 

1544 super().__init__( 

1545 basename, 

1546 parent=parent_dir, 

1547 initial_mode=initial_mode, 

1548 mtime=mtime, 

1549 ) 

1550 self._content = content 

1551 

1552 @property 

1553 def is_dir(self) -> bool: 

1554 return self._path_type == PathType.DIRECTORY 

1555 

1556 @property 

1557 def is_file(self) -> bool: 

1558 return self._path_type == PathType.FILE 

1559 

1560 @property 

1561 def is_symlink(self) -> bool: 

1562 return self._path_type == PathType.SYMLINK 

1563 

1564 def readlink(self) -> str: 

1565 if not self.is_symlink: 1565 ↛ 1566line 1565 didn't jump to line 1566 because the condition on line 1565 was never true

1566 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1567 link_target = self._link_target 

1568 assert link_target is not None 

1569 return link_target 

1570 

1571 @property 

1572 def mtime(self) -> float: 

1573 if self._mtime is None: 

1574 self._mtime = time.time() 

1575 return self._mtime 

1576 

1577 @mtime.setter 

1578 def mtime(self, new_mtime: float) -> None: 

1579 self._rw_check() 

1580 self._mtime = new_mtime 

1581 

1582 @property 

1583 def has_fs_path(self) -> bool: 

1584 return self._has_fs_path 

1585 

1586 def stat(self) -> os.stat_result: 

1587 if self.has_fs_path: 

1588 path = self.fs_path 

1589 if path is None: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true

1590 raise PureVirtualPathError( 

1591 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1592 " cannot provide!" 

1593 ) 

1594 try: 

1595 return os.stat(path) 

1596 except FileNotFoundError as e: 

1597 raise PureVirtualPathError( 

1598 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1599 " cannot provide! (An fs_path was provided, but it did not exist)" 

1600 ) from e 

1601 

1602 raise PureVirtualPathError( 

1603 "stat() is only applicable to paths backed by the file system. The path" 

1604 f" {self._orphan_safe_path()!r} is purely virtual" 

1605 ) 

1606 

1607 @property 

1608 def size(self) -> int: 

1609 if self._content is not None: 

1610 return len(self._content.encode("utf-8")) 

1611 if self.is_symlink: 

1612 return len(self.readlink()) 

1613 if not self.has_fs_path or self.fs_path is None: 

1614 return 0 

1615 return self.stat().st_size 

1616 

1617 @property 

1618 def fs_path(self) -> str: 

1619 if self.has_fs_path: 

1620 if self._fs_path is None and self._materialized_content is not None: 

1621 with tempfile.NamedTemporaryFile( 

1622 mode="w+t", 

1623 encoding="utf-8", 

1624 suffix=f"__{self.name}", 

1625 delete=False, 

1626 ) as fd: 

1627 filepath = fd.name 

1628 fd.write(self._materialized_content) 

1629 self._fs_path = filepath 

1630 atexit.register(lambda: os.unlink(filepath)) 

1631 

1632 path = self._fs_path 

1633 if path is None: 1633 ↛ 1634line 1633 didn't jump to line 1634 because the condition on line 1633 was never true

1634 raise PureVirtualPathError( 

1635 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1636 " mock path cannot provide!" 

1637 ) 

1638 return path 

1639 raise PureVirtualPathError( 

1640 "fs_path is only applicable to paths backed by the file system. The path" 

1641 f" {self._orphan_safe_path()!r} is purely virtual" 

1642 ) 

1643 

1644 def replace_fs_path_content( 

1645 self, 

1646 *, 

1647 use_fs_path_mode: bool = False, 

1648 ) -> ContextManager[str]: 

1649 if self._content is not None: 1649 ↛ 1650line 1649 didn't jump to line 1650 because the condition on line 1649 was never true

1650 raise TypeError( 

1651 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1652 " created with `content` but for this method to work, the path should have been" 

1653 " created with `materialized_content`" 

1654 ) 

1655 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1656 

1657 @contextlib.contextmanager 

1658 def open_child( 

1659 self, 

1660 name: str, 

1661 mode: OpenMode = "r", 

1662 buffering: int = -1, 

1663 ) -> TextIO | BinaryIO: 

1664 existing = self.get(name) 

1665 if existing or "r" in mode: 

1666 with super().open_child(name, mode, buffering=buffering) as fd: 

1667 yield fd 

1668 return 

1669 if "b" in mode: 

1670 fd = io.BytesIO(b"") 

1671 yield fd 

1672 content = fd.getvalue().decode("utf-8") 

1673 else: 

1674 fd = io.StringIO("") 

1675 yield fd 

1676 content = fd.getvalue() 

1677 VirtualTestPath( 

1678 name, 

1679 self, 

1680 mode=0o644, 

1681 content=content, 

1682 has_fs_path=True, 

1683 ) 

1684 

1685 def open( 

1686 self, 

1687 *, 

1688 byte_io: bool = False, 

1689 buffering: int = -1, 

1690 ) -> TextIO | BinaryIO: 

1691 if self._content is None: 

1692 try: 

1693 return super().open(byte_io=byte_io, buffering=buffering) 

1694 except FileNotFoundError as e: 

1695 raise TestPathWithNonExistentFSPathError( 

1696 f"The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1697 " exist. This exception can only occur in the testsuite. Either have the" 

1698 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1699 " if that is too painful in general, have the code accept this error as a " 

1700 " test only-case and provide a default." 

1701 ) from e 

1702 

1703 if byte_io: 

1704 return io.BytesIO(self._content.encode("utf-8")) 

1705 return io.StringIO(self._content) 

1706 

1707 def _replaced_path(self, new_fs_path: str) -> None: 

1708 self._fs_path = new_fs_path 

1709 

1710 

1711class FSOverlayBase(VirtualPathBase, Generic[FSP]): 

1712 __slots__ = ( 

1713 "_path", 

1714 "_fs_path", 

1715 "_parent", 

1716 "__weakref__", 

1717 ) 

1718 

1719 def __init__( 

1720 self, 

1721 path: str, 

1722 fs_path: str, 

1723 parent: FSP | None, 

1724 ) -> None: 

1725 self._path: str = path 

1726 prefix = "/" if fs_path.startswith("/") else "" 

1727 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1728 self._parent: ReferenceType[FSP] | None = ( 

1729 ref(parent) if parent is not None else None 

1730 ) 

1731 

1732 @property 

1733 def name(self) -> str: 

1734 return os.path.basename(self._path) 

1735 

1736 @property 

1737 def path(self) -> str: 

1738 return self._path 

1739 

1740 @property 

1741 def parent_dir(self) -> Optional["FSP"]: 

1742 parent = self._parent 

1743 if parent is None: 

1744 return None 

1745 resolved = parent() 

1746 if resolved is None: 

1747 raise RuntimeError("Parent was garbage collected!") 

1748 return resolved 

1749 

1750 @property 

1751 def fs_path(self) -> str: 

1752 return self._fs_path 

1753 

1754 def stat(self) -> os.stat_result: 

1755 return os.lstat(self.fs_path) 

1756 

1757 @property 

1758 def is_dir(self) -> bool: 

1759 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1760 try: 

1761 return stat.S_ISDIR(self.stat().st_mode) 

1762 except FileNotFoundError: 

1763 return False 

1764 except NotImplementedError: 

1765 print(self.__class__) 

1766 

1767 @property 

1768 def is_file(self) -> bool: 

1769 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1770 try: 

1771 return stat.S_ISREG(self.stat().st_mode) 

1772 except FileNotFoundError: 

1773 return False 

1774 

1775 @property 

1776 def is_symlink(self) -> bool: 

1777 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1778 try: 

1779 return stat.S_ISLNK(self.stat().st_mode) 

1780 except FileNotFoundError: 

1781 return False 

1782 

1783 @property 

1784 def has_fs_path(self) -> bool: 

1785 return True 

1786 

1787 def open( 

1788 self, 

1789 *, 

1790 byte_io: bool = False, 

1791 buffering: int = -1, 

1792 ) -> TextIO | BinaryIO: 

1793 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1794 # case. 

1795 if not self.is_file and not self.is_symlink: 

1796 raise TypeError( 

1797 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1798 ) 

1799 

1800 if byte_io: 

1801 return open(self.fs_path, "rb", buffering=buffering) 

1802 return open(self.fs_path, encoding="utf-8", buffering=buffering) 

1803 

1804 def metadata( 

1805 self, 

1806 metadata_type: type[PMT], 

1807 *, 

1808 owning_plugin: str | None = None, 

1809 ) -> PathMetadataReference[PMT]: 

1810 current_plugin = self._current_plugin() 

1811 if owning_plugin is None: 

1812 owning_plugin = current_plugin 

1813 return AlwaysEmptyReadOnlyMetadataReference( 

1814 owning_plugin, 

1815 current_plugin, 

1816 metadata_type, 

1817 ) 

1818 

1819 def all_paths(self) -> Iterable["FSControlPath"]: 

1820 yield self 

1821 if not self.is_dir: 

1822 return 

1823 stack = list(self.iterdir) 

1824 stack.reverse() 

1825 while stack: 

1826 current = stack.pop() 

1827 yield current 

1828 if current.is_dir: 

1829 stack.extend(reversed(list(current.iterdir))) 

1830 

1831 def _resolve_children( 

1832 self, new_child: Callable[[str, str, FSP], FSC] 

1833 ) -> Mapping[str, FSC]: 

1834 if not self.is_dir: 

1835 return {} 

1836 dir_path = self.path 

1837 dir_fs_path = self.fs_path 

1838 children = {} 

1839 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1840 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1841 child_fs_path = ( 

1842 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1843 ) 

1844 children[name] = new_child( 

1845 child_path, 

1846 child_fs_path, 

1847 self, 

1848 ) 

1849 return children 

1850 

1851 

1852class FSROOverlay(FSOverlayBase["FSROOverlay"]): 

1853 __slots__ = ( 

1854 "_stat_cache", 

1855 "_readlink_cache", 

1856 "_children", 

1857 "_stat_failed_cache", 

1858 ) 

1859 

1860 def __init__( 

1861 self, 

1862 path: str, 

1863 fs_path: str, 

1864 parent: Optional["FSROOverlay"], 

1865 ) -> None: 

1866 super().__init__(path, fs_path, parent=parent) 

1867 self._stat_cache: os.stat_result | None = None 

1868 self._readlink_cache: str | None = None 

1869 self._stat_failed_cache = False 

1870 self._children: Mapping[str, FSROOverlay] | None = None 

1871 

1872 @classmethod 

1873 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": 

1874 return FSROOverlay(path, fs_path, None) 

1875 

1876 @property 

1877 def iterdir(self) -> Iterable["FSROOverlay"]: 

1878 if not self.is_dir: 

1879 return 

1880 if self._children is None: 

1881 self._ensure_children_are_resolved() 

1882 yield from assume_not_none(self._children).values() 

1883 

1884 def lookup(self, path: str) -> Optional["FSROOverlay"]: 

1885 if not self.is_dir: 

1886 return None 

1887 if self._children is None: 

1888 self._ensure_children_are_resolved() 

1889 

1890 absolute, _, path_parts = _split_path(path) 

1891 current = cast("FSROOverlay", _root(self)) if absolute else self 

1892 for no, dir_part in enumerate(path_parts): 

1893 if dir_part == ".": 

1894 continue 

1895 if dir_part == "..": 

1896 p = current.parent_dir 

1897 if p is None: 

1898 raise ValueError(f'The path "{path}" escapes the root dir') 

1899 current = cast("FSROOverlay", p) 

1900 continue 

1901 try: 

1902 current = current[dir_part] 

1903 except KeyError: 

1904 return None 

1905 return current 

1906 

1907 def _ensure_children_are_resolved(self) -> None: 

1908 if not self.is_dir or self._children: 

1909 return 

1910 self._children = self._resolve_children( 

1911 lambda n, fsp, p: FSROOverlay(n, fsp, p) 

1912 ) 

1913 

1914 @property 

1915 def is_detached(self) -> bool: 

1916 return False 

1917 

1918 def __getitem__(self, key) -> "VirtualPath": 

1919 if not self.is_dir: 1919 ↛ 1921line 1919 didn't jump to line 1921 because the condition on line 1919 was always true

1920 raise KeyError(key) 

1921 if self._children is None: 

1922 self._ensure_children_are_resolved() 

1923 if isinstance(key, FSPath): 

1924 key = key.name 

1925 return self._children[key] 

1926 

1927 def __delitem__(self, key) -> None: 

1928 self._error_ro_fs() 

1929 

1930 @property 

1931 def is_read_write(self) -> bool: 

1932 return False 

1933 

1934 def _rw_check(self) -> None: 

1935 self._error_ro_fs() 

1936 

1937 def _error_ro_fs(self) -> NoReturn: 

1938 raise DebputyFSIsROError( 

1939 f'Attempt to write to "{self.path}" failed:' 

1940 " Debputy Virtual File system is R/O." 

1941 ) 

1942 

1943 def stat(self) -> os.stat_result: 

1944 if self._stat_failed_cache: 1944 ↛ 1945line 1944 didn't jump to line 1945 because the condition on line 1944 was never true

1945 raise FileNotFoundError( 

1946 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1947 ) 

1948 

1949 if self._stat_cache is None: 1949 ↛ 1955line 1949 didn't jump to line 1955 because the condition on line 1949 was always true

1950 try: 

1951 self._stat_cache = os.lstat(self.fs_path) 

1952 except FileNotFoundError: 

1953 self._stat_failed_cache = True 

1954 raise 

1955 return self._stat_cache 

1956 

1957 @property 

1958 def mode(self) -> int: 

1959 return stat.S_IMODE(self.stat().st_mode) 

1960 

1961 @mode.setter 

1962 def mode(self, _unused: int) -> None: 

1963 self._error_ro_fs() 

1964 

1965 @property 

1966 def mtime(self) -> float: 

1967 return self.stat().st_mtime 

1968 

1969 @mtime.setter 

1970 def mtime(self, new_mtime: float) -> None: 

1971 self._error_ro_fs() 

1972 

1973 def readlink(self) -> str: 

1974 if not self.is_symlink: 

1975 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1976 if self._readlink_cache is None: 

1977 self._readlink_cache = os.readlink(self.fs_path) 

1978 return self._readlink_cache 

1979 

1980 def chown( 

1981 self, 

1982 owner: StaticFileSystemOwner | None, 

1983 group: StaticFileSystemGroup | None, 

1984 ) -> None: 

1985 self._error_ro_fs() 

1986 

1987 def mkdir(self, name: str) -> "VirtualPath": 

1988 self._error_ro_fs() 

1989 

1990 def add_file( 

1991 self, 

1992 name: str, 

1993 *, 

1994 unlink_if_exists: bool = True, 

1995 use_fs_path_mode: bool = False, 

1996 mode: int = 0o0644, 

1997 mtime: float | None = None, 

1998 ) -> ContextManager["VirtualPath"]: 

1999 self._error_ro_fs() 

2000 

2001 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2002 self._error_ro_fs() 

2003 

2004 def unlink(self, *, recursive: bool = False) -> None: 

2005 self._error_ro_fs() 

2006 

2007 

2008class FSROOverlayRootDir(FSROOverlay): 

2009 __slots__ = ("_plugin_context",) 

2010 

2011 def __init__(self, path: str, fs_path: str) -> None: 

2012 super().__init__(path, fs_path, None) 

2013 self._plugin_context = CurrentPluginContextManager("debputy") 

2014 

2015 def _current_plugin(self) -> str: 

2016 return self._plugin_context.current_plugin_name 

2017 

2018 @contextlib.contextmanager 

2019 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2020 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2021 yield r 

2022 

2023 

2024class FSControlPath(FSOverlayBase["FSControlPath"]): 

2025 

2026 @property 

2027 def iterdir(self) -> Iterable["FSControlPath"]: 

2028 if not self.is_dir: 

2029 return 

2030 yield from self._resolve_children( 

2031 lambda n, fsp, p: FSControlPath(n, fsp, p) 

2032 ).values() 

2033 

2034 def lookup(self, path: str) -> Optional["FSControlPath"]: 

2035 if not self.is_dir: 

2036 return None 

2037 

2038 absolute, _, path_parts = _split_path(path) 

2039 current = cast("FSControlPath", _root(self)) if absolute else self 

2040 for no, dir_part in enumerate(path_parts): 

2041 if dir_part == ".": 

2042 continue 

2043 if dir_part == "..": 

2044 p = current.parent_dir 

2045 if p is None: 

2046 raise ValueError(f'The path "{path}" escapes the root dir') 

2047 current = cast("FSControlPath", p) 

2048 continue 

2049 try: 

2050 current = current[dir_part] 

2051 except KeyError: 

2052 return None 

2053 return current 

2054 

2055 @property 

2056 def is_detached(self) -> bool: 

2057 try: 

2058 self.stat() 

2059 except FileNotFoundError: 

2060 return True 

2061 else: 

2062 return False 

2063 

2064 def __getitem__(self, key) -> "VirtualPath": 

2065 if not self.is_dir: 

2066 raise KeyError(key) 

2067 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p)) 

2068 if isinstance(key, FSPath): 

2069 key = key.name 

2070 return children[key] 

2071 

2072 def __delitem__(self, key) -> None: 

2073 self[key].unlink() 

2074 

2075 @property 

2076 def is_read_write(self) -> bool: 

2077 return True 

2078 

2079 @property 

2080 def mode(self) -> int: 

2081 return stat.S_IMODE(self.stat().st_mode) 

2082 

2083 @mode.setter 

2084 def mode(self, new_mode: int) -> None: 

2085 os.chmod(self.fs_path, new_mode) 

2086 

2087 @property 

2088 def mtime(self) -> float: 

2089 return self.stat().st_mtime 

2090 

2091 @mtime.setter 

2092 def mtime(self, new_mtime: float) -> None: 

2093 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2094 

2095 def readlink(self) -> str: 

2096 if not self.is_symlink: 

2097 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2098 assert False 

2099 

2100 def chown( 

2101 self, 

2102 owner: StaticFileSystemOwner | None, 

2103 group: StaticFileSystemGroup | None, 

2104 ) -> None: 

2105 raise ValueError( 

2106 "No need to chown paths in the control.tar: They are always root:root" 

2107 ) 

2108 

2109 def mkdir(self, name: str) -> "VirtualPath": 

2110 raise TypeError("The control.tar never contains subdirectories.") 

2111 

2112 @contextlib.contextmanager 

2113 def add_file( 

2114 self, 

2115 name: str, 

2116 *, 

2117 unlink_if_exists: bool = True, 

2118 use_fs_path_mode: bool = False, 

2119 mode: int = 0o0644, 

2120 mtime: float | None = None, 

2121 ) -> ContextManager["VirtualPath"]: 

2122 if "/" in name or name in {".", ".."}: 

2123 raise ValueError(f'Invalid file name: "{name}"') 

2124 if not self.is_dir: 

2125 raise TypeError( 

2126 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2127 f" {self._orphan_safe_path()} is not a directory" 

2128 ) 

2129 self._rw_check() 

2130 existing = self.get(name) 

2131 if existing is not None: 

2132 if not unlink_if_exists: 

2133 raise ValueError( 

2134 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2135 f" and exist_ok was False" 

2136 ) 

2137 assert existing.is_file 

2138 

2139 fs_path = os.path.join(self.fs_path, name) 

2140 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2141 with open(fs_path, "wb") as fd: 

2142 # Ensure that the fs_path exists and default mode is reasonable 

2143 os.chmod(fd.fileno(), mode) 

2144 child = FSControlPath( 

2145 name, 

2146 fs_path, 

2147 self, 

2148 ) 

2149 yield child 

2150 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2151 child.mode = mode 

2152 

2153 @contextlib.contextmanager 

2154 def replace_fs_path_content( 

2155 self, 

2156 *, 

2157 use_fs_path_mode: bool = False, 

2158 ) -> ContextManager[str]: 

2159 if not self.is_file: 

2160 raise TypeError( 

2161 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2162 ) 

2163 restore_mode = self.mode if use_fs_path_mode else None 

2164 yield self.fs_path 

2165 _check_fs_path_is_file(self.fs_path, self) 

2166 if restore_mode is not None: 

2167 self.mode = restore_mode 

2168 

2169 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2170 raise TypeError("The control.tar never contains symlinks.") 

2171 

2172 def unlink(self, *, recursive: bool = False) -> None: 

2173 if self._parent is None: 

2174 return 

2175 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2176 # matters and that `os.unlink` will be sufficient. 

2177 assert self.is_file 

2178 os.unlink(self.fs_path) 

2179 

2180 

2181class FSControlRootDir(FSControlPath): 

2182 

2183 @classmethod 

2184 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2185 return FSControlRootDir(".", fs_path, None) 

2186 

2187 def insert_file_from_fs_path( 

2188 self, 

2189 name: str, 

2190 fs_path: str, 

2191 *, 

2192 exist_ok: bool = True, 

2193 use_fs_path_mode: bool = False, 

2194 mode: int = 0o0644, 

2195 # Ignored, but accepted for compat with FSPath's variant of this function. 

2196 # - This is used by install_or_generate_conffiles. 

2197 reference_path: VirtualPath | None = None, # noqa 

2198 ) -> "FSControlPath": 

2199 if "/" in name or name in {".", ".."}: 

2200 raise ValueError(f'Invalid file name: "{name}"') 

2201 if not self.is_dir: 

2202 raise TypeError( 

2203 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2204 f" {self._orphan_safe_path()} is not a directory" 

2205 ) 

2206 self._rw_check() 

2207 if name in self and not exist_ok: 

2208 raise ValueError( 

2209 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2210 f" and exist_ok was False" 

2211 ) 

2212 

2213 target_path = os.path.join(self.fs_path, name) 

2214 if use_fs_path_mode: 

2215 shutil.copymode( 

2216 fs_path, 

2217 target_path, 

2218 follow_symlinks=True, 

2219 ) 

2220 else: 

2221 shutil.copyfile( 

2222 fs_path, 

2223 target_path, 

2224 follow_symlinks=True, 

2225 ) 

2226 os.chmod(target_path, mode) 

2227 return cast("FSControlPath", self[name]) 

2228 

2229 

2230def as_path_def(pd: str | PathDef) -> PathDef: 

2231 return PathDef(pd) if isinstance(pd, str) else pd 

2232 

2233 

2234def as_path_defs(paths: Iterable[str | PathDef]) -> Iterable[PathDef]: 

2235 yield from (as_path_def(p) for p in paths) 

2236 

2237 

2238def build_virtual_fs( 

2239 paths: Iterable[str | PathDef], 

2240 read_write_fs: bool = False, 

2241) -> "FSPath": 

2242 root_dir: FSRootDir | None = None 

2243 directories: dict[str, FSPath] = {} 

2244 non_directories = set() 

2245 

2246 def _ensure_parent_dirs(p: str) -> None: 

2247 current = p.rstrip("/") 

2248 missing_dirs = [] 

2249 while True: 

2250 current = os.path.dirname(current) 

2251 if current in directories: 

2252 break 

2253 if current in non_directories: 2253 ↛ 2254line 2253 didn't jump to line 2254 because the condition on line 2253 was never true

2254 raise ValueError( 

2255 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2256 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2257 ) 

2258 missing_dirs.append(current) 

2259 for dir_path in reversed(missing_dirs): 

2260 parent_dir = directories[os.path.dirname(dir_path)] 

2261 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2262 directories[dir_path] = d 

2263 

2264 for path_def in as_path_defs(paths): 

2265 path = path_def.path_name 

2266 if path in directories or path in non_directories: 2266 ↛ 2267line 2266 didn't jump to line 2267 because the condition on line 2266 was never true

2267 raise ValueError( 

2268 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2269 ' "correct order" (ensure directories occur before their children)' 

2270 ) 

2271 if root_dir is None: 

2272 root_fs_path = None 

2273 if path in (".", "./", "/"): 

2274 root_fs_path = path_def.fs_path 

2275 root_dir = FSRootDir(fs_path=root_fs_path) 

2276 directories["."] = root_dir 

2277 

2278 if path not in (".", "./", "/") and not path.startswith("./"): 

2279 path = "./" + path 

2280 if path not in (".", "./", "/"): 

2281 _ensure_parent_dirs(path) 

2282 if path in (".", "./"): 

2283 assert "." in directories 

2284 continue 

2285 is_dir = False 

2286 if path.endswith("/"): 

2287 path = path[:-1] 

2288 is_dir = True 

2289 directory = directories[os.path.dirname(path)] 

2290 assert not is_dir or not bool( 

2291 path_def.link_target 

2292 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2293 fs_path = VirtualTestPath( 

2294 os.path.basename(path), 

2295 directory, 

2296 is_dir=is_dir, 

2297 mode=path_def.mode, 

2298 mtime=path_def.mtime, 

2299 has_fs_path=path_def.has_fs_path, 

2300 fs_path=path_def.fs_path, 

2301 link_target=path_def.link_target, 

2302 content=path_def.content, 

2303 materialized_content=path_def.materialized_content, 

2304 ) 

2305 assert not fs_path.is_detached 

2306 if fs_path.is_dir: 

2307 directories[fs_path.path] = fs_path 

2308 else: 

2309 non_directories.add(fs_path.path) 

2310 

2311 if root_dir is None: 

2312 root_dir = FSRootDir() 

2313 

2314 root_dir.is_read_write = read_write_fs 

2315 return root_dir