Coverage for src/debputy/filesystem_scan.py: 66%

1294 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import atexit 

2import contextlib 

3import dataclasses 

4import errno 

5import io 

6import operator 

7import os 

8import shutil 

9import stat 

10import subprocess 

11import tempfile 

12import time 

13from abc import ABC 

14from contextlib import suppress 

15from typing import ( 

16 List, 

17 Iterable, 

18 Dict, 

19 Optional, 

20 Tuple, 

21 Union, 

22 Iterator, 

23 Mapping, 

24 cast, 

25 Any, 

26 ContextManager, 

27 TextIO, 

28 BinaryIO, 

29 NoReturn, 

30 Type, 

31 Generic, 

32 Callable, 

33 TypeVar, 

34 overload, 

35 Literal, 

36) 

37from weakref import ref, ReferenceType 

38 

39from debputy.exceptions import ( 

40 PureVirtualPathError, 

41 DebputyFSIsROError, 

42 DebputyMetadataAccessError, 

43 TestPathWithNonExistentFSPathError, 

44 SymlinkLoopError, 

45) 

46from debputy.intermediate_manifest import PathType 

47from debputy.manifest_parser.base_types import ( 

48 ROOT_DEFINITION, 

49 StaticFileSystemOwner, 

50 StaticFileSystemGroup, 

51) 

52from debputy.plugin.api.spec import ( 

53 VirtualPath, 

54 PathDef, 

55 PathMetadataReference, 

56 PMT, 

57) 

58from debputy.types import VP 

59from debputy.util import ( 

60 generated_content_dir, 

61 _error, 

62 escape_shell, 

63 assume_not_none, 

64 _normalize_path, 

65 _debug_log, 

66) 

67 

68BY_BASENAME = operator.attrgetter("name") 

69 

70FSP = TypeVar("FSP", bound="FSOverlayBase", covariant=True) 

71FSC = TypeVar("FSC", bound="FSOverlayBase", covariant=True) 

72 

73 

74BinaryOpenMode = Literal[ 

75 "rb", 

76 "r+b", 

77 "wb", 

78 "w+b", 

79 "xb", 

80 "ab", 

81] 

82TextOpenMode = Literal[ 

83 "r", 

84 "r+", 

85 "rt", 

86 "r+t", 

87 "w", 

88 "w+", 

89 "wt", 

90 "w+t", 

91 "x", 

92 "xt", 

93 "a", 

94 "at", 

95] 

96OpenMode = Union[BinaryOpenMode, TextOpenMode] 

97 

98 

99class AlwaysEmptyReadOnlyMetadataReference(PathMetadataReference[PMT]): 

100 __slots__ = ("_metadata_type", "_owning_plugin", "_current_plugin") 

101 

102 def __init__( 

103 self, 

104 owning_plugin: str, 

105 current_plugin: str, 

106 metadata_type: Type[PMT], 

107 ) -> None: 

108 self._owning_plugin = owning_plugin 

109 self._current_plugin = current_plugin 

110 self._metadata_type = metadata_type 

111 

112 @property 

113 def is_present(self) -> bool: 

114 return False 

115 

116 @property 

117 def can_read(self) -> bool: 

118 return self._owning_plugin == self._current_plugin 

119 

120 @property 

121 def can_write(self) -> bool: 

122 return False 

123 

124 @property 

125 def value(self) -> Optional[PMT]: 

126 if self.can_read: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 return None 

128 raise DebputyMetadataAccessError( 

129 f"Cannot read the metadata {self._metadata_type.__name__} owned by" 

130 f" {self._owning_plugin} as the metadata has not been made" 

131 f" readable to the plugin {self._current_plugin}." 

132 ) 

133 

134 @value.setter 

135 def value(self, new_value: PMT) -> None: 

136 if self._is_owner: 

137 raise DebputyFSIsROError( 

138 f"Cannot set the metadata {self._metadata_type.__name__} as the path is read-only" 

139 ) 

140 raise DebputyMetadataAccessError( 

141 f"Cannot set the metadata {self._metadata_type.__name__} owned by" 

142 f" {self._owning_plugin} as the metadata has not been made" 

143 f" read-write to the plugin {self._current_plugin}." 

144 ) 

145 

146 @property 

147 def _is_owner(self) -> bool: 

148 return self._owning_plugin == self._current_plugin 

149 

150 

151@dataclasses.dataclass(slots=True) 

152class PathMetadataValue(Generic[PMT]): 

153 owning_plugin: str 

154 metadata_type: Type[PMT] 

155 value: Optional[PMT] = None 

156 

157 def can_read_value(self, current_plugin: str) -> bool: 

158 return self.owning_plugin == current_plugin 

159 

160 def can_write_value(self, current_plugin: str) -> bool: 

161 return self.owning_plugin == current_plugin 

162 

163 

164class PathMetadataReferenceImplementation(PathMetadataReference[PMT]): 

165 __slots__ = ("_owning_path", "_current_plugin", "_path_metadata_value") 

166 

167 def __init__( 

168 self, 

169 owning_path: VirtualPath, 

170 current_plugin: str, 

171 path_metadata_value: PathMetadataValue[PMT], 

172 ) -> None: 

173 self._owning_path = owning_path 

174 self._current_plugin = current_plugin 

175 self._path_metadata_value = path_metadata_value 

176 

177 @property 

178 def is_present(self) -> bool: 

179 if not self.can_read: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 return False 

181 return self._path_metadata_value.value is not None 

182 

183 @property 

184 def can_read(self) -> bool: 

185 return self._path_metadata_value.can_read_value(self._current_plugin) 

186 

187 @property 

188 def can_write(self) -> bool: 

189 if not self._path_metadata_value.can_write_value(self._current_plugin): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 return False 

191 owning_path = self._owning_path 

192 return owning_path.is_read_write and not owning_path.is_detached 

193 

194 @property 

195 def value(self) -> Optional[PMT]: 

196 if self.can_read: 196 ↛ 198line 196 didn't jump to line 198 because the condition on line 196 was always true

197 return self._path_metadata_value.value 

198 raise DebputyMetadataAccessError( 

199 f"Cannot read the metadata {self._metadata_type_name} owned by" 

200 f" {self._owning_plugin} as the metadata has not been made" 

201 f" readable to the plugin {self._current_plugin}." 

202 ) 

203 

204 @value.setter 

205 def value(self, new_value: PMT) -> None: 

206 if not self.can_write: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 m = "set" if new_value is not None else "delete" 

208 raise DebputyMetadataAccessError( 

209 f"Cannot {m} the metadata {self._metadata_type_name} owned by" 

210 f" {self._owning_plugin} as the metadata has not been made" 

211 f" read-write to the plugin {self._current_plugin}." 

212 ) 

213 owning_path = self._owning_path 

214 if not owning_path.is_read_write: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 raise DebputyFSIsROError( 

216 f"Cannot set the metadata {self._metadata_type_name} as the path is read-only" 

217 ) 

218 if owning_path.is_detached: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 raise TypeError( 

220 f"Cannot set the metadata {self._metadata_type_name} as the path is detached" 

221 ) 

222 self._path_metadata_value.value = new_value 

223 

224 @property 

225 def _is_owner(self) -> bool: 

226 return self._owning_plugin == self._current_plugin 

227 

228 @property 

229 def _owning_plugin(self) -> str: 

230 return self._path_metadata_value.owning_plugin 

231 

232 @property 

233 def _metadata_type_name(self) -> str: 

234 return self._path_metadata_value.metadata_type.__name__ 

235 

236 

237def _cp_a(source: str, dest: str) -> None: 

238 cmd = ["cp", "-a", source, dest] 

239 try: 

240 subprocess.check_call(cmd) 

241 except subprocess.CalledProcessError: 

242 full_command = escape_shell(*cmd) 

243 _error( 

244 f"The attempt to make an internal copy of {escape_shell(source)} failed. Please review the output of cp" 

245 f" above to understand what went wrong. The full command was: {full_command}" 

246 ) 

247 

248 

249def _split_path(path: str) -> Tuple[bool, bool, List[str]]: 

250 must_be_dir = True if path.endswith("/") else False 

251 absolute = False 

252 if path.startswith("/"): 

253 absolute = True 

254 path = "." + path 

255 path_parts = path.rstrip("/").split("/") 

256 if must_be_dir: 

257 path_parts.append(".") 

258 return absolute, must_be_dir, path_parts 

259 

260 

261def _root(path: VP) -> VP: 

262 current = path 

263 while True: 

264 parent = current.parent_dir 

265 if parent is None: 

266 return current 

267 current = parent 

268 

269 

270def _check_fs_path_is_file( 

271 fs_path: str, 

272 unlink_on_error: Optional["VirtualPath"] = None, 

273) -> None: 

274 had_issue = False 

275 try: 

276 # FIXME: Check mode, and use the Virtual Path to cache the result as a side-effect 

277 st = os.lstat(fs_path) 

278 except FileNotFoundError: 

279 had_issue = True 

280 else: 

281 if not stat.S_ISREG(st.st_mode) or st.st_nlink > 1: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 had_issue = True 

283 if not had_issue: 283 ↛ 286line 283 didn't jump to line 286 because the condition on line 283 was always true

284 return 

285 

286 if unlink_on_error: 

287 with suppress(FileNotFoundError): 

288 os.unlink(fs_path) 

289 raise TypeError( 

290 "The provided FS backing file was deleted, replaced with a non-file entry or it was hard" 

291 " linked to another file. The entry has been disconnected." 

292 ) 

293 

294 

295class CurrentPluginContextManager: 

296 __slots__ = ("_plugin_names",) 

297 

298 def __init__(self, initial_plugin_name: str) -> None: 

299 self._plugin_names = [initial_plugin_name] 

300 

301 @property 

302 def current_plugin_name(self) -> str: 

303 return self._plugin_names[-1] 

304 

305 @contextlib.contextmanager 

306 def change_plugin_context(self, new_plugin_name: str) -> Iterator[str]: 

307 self._plugin_names.append(new_plugin_name) 

308 yield new_plugin_name 

309 self._plugin_names.pop() 

310 

311 

312class VirtualPathBase(VirtualPath, ABC): 

313 __slots__ = () 

314 

315 def _orphan_safe_path(self) -> str: 

316 return self.path 

317 

318 def _rw_check(self) -> None: 

319 if not self.is_read_write: 

320 raise DebputyFSIsROError( 

321 f'Attempt to write to "{self._orphan_safe_path()}" failed:' 

322 " Debputy Virtual File system is R/O." 

323 ) 

324 

325 def lookup(self, path: str) -> Optional["VirtualPathBase"]: 

326 match, missing = self.attempt_lookup(path) 

327 if missing: 

328 return None 

329 return match 

330 

331 def attempt_lookup(self, path: str) -> Tuple["VirtualPathBase", List[str]]: 

332 if self.is_detached: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 raise ValueError( 

334 f'Cannot perform lookup via "{self._orphan_safe_path()}": The path is detached' 

335 ) 

336 absolute, must_be_dir, path_parts = _split_path(path) 

337 current = _root(self) if absolute else self 

338 path_parts.reverse() 

339 link_expansions = set() 

340 while path_parts: 

341 dir_part = path_parts.pop() 

342 if dir_part == ".": 

343 continue 

344 if dir_part == "..": 

345 p = current.parent_dir 

346 if p is None: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 raise ValueError(f'The path "{path}" escapes the root dir') 

348 current = p 

349 continue 

350 try: 

351 current = current[dir_part] 

352 except KeyError: 

353 path_parts.append(dir_part) 

354 path_parts.reverse() 

355 if must_be_dir: 

356 path_parts.pop() 

357 return current, path_parts 

358 if current.is_symlink and path_parts: 

359 if current.path in link_expansions: 

360 # This is our loop detection for now. It might have some false positives where you 

361 # could safely resolve the same symlink twice. However, given that this use-case is 

362 # basically non-existent in practice for packaging, we just stop here for now. 

363 raise SymlinkLoopError( 

364 f'The path "{path}" traversed the symlink "{current.path}" multiple' 

365 " times. Currently, traversing the same symlink twice is considered" 

366 " a loop by `debputy` even if the path would eventually resolve." 

367 " Consider filing a feature request if you have a benign case that" 

368 " triggers this error." 

369 ) 

370 link_expansions.add(current.path) 

371 link_target = current.readlink() 

372 link_absolute, _, link_path_parts = _split_path(link_target) 

373 if link_absolute: 

374 current = _root(current) 

375 else: 

376 current = assume_not_none(current.parent_dir) 

377 link_path_parts.reverse() 

378 path_parts.extend(link_path_parts) 

379 return current, [] 

380 

381 def mkdirs(self, path: str) -> "VirtualPath": 

382 current: VirtualPath 

383 current, missing_parts = self.attempt_lookup( 

384 f"{path}/" if not path.endswith("/") else path 

385 ) 

386 if not current.is_dir: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise ValueError( 

388 f'mkdirs of "{path}" failed: This would require {current.path} to not exist OR be' 

389 " a directory. However, that path exist AND is a not directory." 

390 ) 

391 for missing_part in missing_parts: 

392 assert missing_part not in (".", "..") 

393 current = current.mkdir(missing_part) 

394 return current 

395 

396 def prune_if_empty_dir(self) -> None: 

397 """Remove this and all (now) empty parent directories 

398 

399 Same as: `rmdir --ignore-fail-on-non-empty --parents` 

400 

401 This operation may cause the path (and any of its parent directories) to become "detached" 

402 and therefore unsafe to use in further operations. 

403 """ 

404 self._rw_check() 

405 

406 if not self.is_dir: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true

407 raise TypeError(f"{self._orphan_safe_path()} is not a directory") 

408 if any(self.iterdir): 

409 return 

410 parent_dir = assume_not_none(self.parent_dir) 

411 

412 # Recursive does not matter; we already know the directory is empty. 

413 self.unlink() 

414 

415 # Note: The root dir must never be deleted. This works because when delegating it to the root 

416 # directory, its implementation of this method is a no-op. If this is later rewritten to an 

417 # inline loop (rather than recursion), be sure to preserve this feature. 

418 parent_dir.prune_if_empty_dir() 

419 

420 def _current_plugin(self) -> str: 

421 if self.is_detached: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 raise TypeError("Cannot resolve the current plugin; path is detached") 

423 current = self 

424 while True: 

425 next_parent = current.parent_dir 

426 if next_parent is None: 

427 break 

428 current = next_parent 

429 assert current is not None 

430 return cast("FSRootDir", current)._current_plugin() 

431 

432 @overload 

433 def open_child( 433 ↛ exitline 433 didn't return from function 'open_child' because

434 self, 

435 name: str, 

436 mode: TextOpenMode = ..., 

437 buffering: int = -1, 

438 ) -> TextIO: ... 

439 

440 @overload 

441 def open_child( 441 ↛ exitline 441 didn't return from function 'open_child' because

442 self, 

443 name: str, 

444 mode: BinaryOpenMode = ..., 

445 buffering: int = -1, 

446 ) -> BinaryIO: ... 

447 

448 @contextlib.contextmanager 

449 def open_child( 

450 self, 

451 name: str, 

452 mode: OpenMode = "r", 

453 buffering: int = -1, 

454 ) -> Union[TextIO, BinaryIO]: 

455 """Open a child path of the current directory in a given mode. Usually used with a context manager 

456 

457 The path is opened according to the `mode` parameter similar to the built-in `open` in Python. 

458 The following symbols are accepted with the same meaning as Python's open: 

459 * `r` 

460 * `w` 

461 * `x` 

462 * `a` 

463 * `+` 

464 * `b` 

465 * `t` 

466 

467 Like Python's `open`, this can create a new file provided the file system is in read-write mode. 

468 Though unlike Python's open, symlinks are not followed and cannot be opened. Any newly created 

469 file will start with (os.stat) mode of 0o0644. The (os.stat) mode of existing paths are left 

470 as-is. 

471 

472 :param name: The name of the child path to open. Must be a basename. 

473 :param mode: The mode to open the file with such as `r` or `w`. See Python's `open` for more 

474 examples. 

475 :param buffering: Same as open(..., buffering=...) where supported. Notably during 

476 testing, the content may be purely in memory and use a BytesIO/StringIO 

477 (which does not accept that parameter, but then it is buffered in a different way) 

478 :return: The file handle. 

479 """ 

480 existing = self.get(name) 

481 if "r" in mode: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 if existing is None: 

483 raise ValueError( 

484 f"Path {self.path}/{name} does not exist and mode had `r`" 

485 ) 

486 if "+" not in mode: 

487 with existing.open(byte_io="b" in mode, buffering=buffering) as fd: 

488 yield fd 

489 

490 encoding = None if "b" in mode else "utf-8" 

491 

492 if existing: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 if "x" in mode: 

494 raise ValueError( 

495 f"Path {existing.path} already exists and mode had `x`" 

496 ) 

497 with ( 

498 existing.replace_fs_path_content() as fs_path, 

499 open(fs_path, mode, encoding=encoding) as fd, 

500 ): 

501 yield fd 

502 else: 

503 assert "r" not in mode 

504 # unlink_if_exists=False as a precaution (the "already exists" should not end up here). 

505 with ( 

506 self.add_file(name, mode=0o0644, unlink_if_exists=False) as new_path, 

507 open(new_path.fs_path, mode, encoding=encoding) as fd, 

508 ): 

509 yield fd 

510 

511 

512class FSPath(VirtualPathBase, ABC): 

513 __slots__ = ( 

514 "_basename", 

515 "_parent_dir", 

516 "_children", 

517 "_path_cache", 

518 "_parent_path_cache", 

519 "_last_known_parent_path", 

520 "_mode", 

521 "_owner", 

522 "_group", 

523 "_mtime", 

524 "_stat_cache", 

525 "_metadata", 

526 "__weakref__", 

527 ) 

528 

529 def __init__( 

530 self, 

531 basename: str, 

532 parent: Optional["FSPath"], 

533 children: Optional[Dict[str, "FSPath"]] = None, 

534 initial_mode: Optional[int] = None, 

535 mtime: Optional[float] = None, 

536 stat_cache: Optional[os.stat_result] = None, 

537 ) -> None: 

538 self._basename = basename 

539 self._path_cache: Optional[str] = None 

540 self._parent_path_cache: Optional[str] = None 

541 self._children = children 

542 self._last_known_parent_path: Optional[str] = None 

543 self._mode = initial_mode 

544 self._mtime = mtime 

545 self._stat_cache = stat_cache 

546 self._metadata: Dict[Tuple[str, Type[Any]], PathMetadataValue[Any]] = {} 

547 self._owner = ROOT_DEFINITION 

548 self._group = ROOT_DEFINITION 

549 

550 # The self._parent_dir = None is to create `_parent_dir` because the parent_dir setter calls 

551 # is_orphaned, which assumes self._parent_dir is an attribute. 

552 self._parent_dir: Optional[ReferenceType["FSPath"]] = None 

553 if parent is not None: 

554 self.parent_dir = parent 

555 

556 def __repr__(self) -> str: 

557 return ( 

558 f"{self.__class__.__name__}({self._orphan_safe_path()!r}," 

559 f" is_file={self.is_file}," 

560 f" is_dir={self.is_dir}," 

561 f" is_symlink={self.is_symlink}," 

562 f" has_fs_path={self.has_fs_path}," 

563 f" children_len={len(self._children) if self._children else 0})" 

564 ) 

565 

566 @property 

567 def name(self) -> str: 

568 return self._basename 

569 

570 @name.setter 

571 def name(self, new_name: str) -> None: 

572 self._rw_check() 

573 if new_name == self._basename: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true

574 return 

575 if self.is_detached: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 self._basename = new_name 

577 return 

578 self._rw_check() 

579 parent = self.parent_dir 

580 # This little parent_dir dance ensures the parent dir detects the rename properly 

581 self.parent_dir = None 

582 self._basename = new_name 

583 self.parent_dir = parent 

584 

585 @property 

586 def iterdir(self) -> Iterable["FSPath"]: 

587 if self._children is not None: 

588 yield from self._children.values() 

589 

590 def all_paths(self) -> Iterable["FSPath"]: 

591 yield self 

592 if not self.is_dir: 

593 return 

594 by_basename = BY_BASENAME 

595 stack = sorted(self.iterdir, key=by_basename, reverse=True) 

596 while stack: 

597 current = stack.pop() 

598 yield current 

599 if current.is_dir and not current.is_detached: 

600 stack.extend(sorted(current.iterdir, key=by_basename, reverse=True)) 

601 

602 def walk(self) -> Iterable[Tuple["FSPath", List["FSPath"]]]: 

603 # FIXME: can this be more "os.walk"-like without making it harder to implement? 

604 if not self.is_dir: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true

605 yield self, [] 

606 return 

607 by_basename = BY_BASENAME 

608 stack = [self] 

609 while stack: 

610 current = stack.pop() 

611 children = sorted(current.iterdir, key=by_basename) 

612 assert not children or current.is_dir 

613 yield current, children 

614 # Removing the directory counts as discarding the children. 

615 if not current.is_detached: 615 ↛ 609line 615 didn't jump to line 609 because the condition on line 615 was always true

616 stack.extend(reversed(children)) 

617 

618 def _orphan_safe_path(self) -> str: 

619 if not self.is_detached or self._last_known_parent_path is not None: 619 ↛ 621line 619 didn't jump to line 621 because the condition on line 619 was always true

620 return self.path 

621 return f"<orphaned>/{self.name}" 

622 

623 @property 

624 def is_detached(self) -> bool: 

625 parent = self._parent_dir 

626 if parent is None: 

627 return True 

628 resolved_parent = parent() 

629 if resolved_parent is None: 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true

630 return True 

631 return resolved_parent.is_detached 

632 

633 # The __getitem__ behaves like __getitem__ from Dict but __iter__ would ideally work like a Sequence. 

634 # However, that does not feel compatible, so lets force people to use .children instead for the Sequence 

635 # behavior to avoid surprises for now. 

636 # (Maybe it is a non-issue, but it is easier to add the API later than to remove it once we have committed 

637 # to using it) 

638 __iter__ = None 

639 

640 def __getitem__(self, key) -> "FSPath": 

641 if self._children is None: 

642 raise KeyError( 

643 f"{key} (note: {self._orphan_safe_path()!r} has no children)" 

644 ) 

645 if isinstance(key, FSPath): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 key = key.name 

647 return self._children[key] 

648 

649 def __delitem__(self, key) -> None: 

650 self._rw_check() 

651 children = self._children 

652 if children is None: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true

653 raise KeyError(key) 

654 del children[key] 

655 

656 def get(self, key: str) -> "Optional[FSPath]": 

657 try: 

658 return self[key] 

659 except KeyError: 

660 return None 

661 

662 def __contains__(self, item: object) -> bool: 

663 if isinstance(item, VirtualPath): 663 ↛ 664line 663 didn't jump to line 664 because the condition on line 663 was never true

664 return item.parent_dir is self 

665 if not isinstance(item, str): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 return False 

667 m = self.get(item) 

668 return m is not None 

669 

670 def _add_child(self, child: "FSPath") -> None: 

671 self._rw_check() 

672 if not self.is_dir: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true

673 raise TypeError(f"{self._orphan_safe_path()!r} is not a directory") 

674 if self._children is None: 

675 self._children = {} 

676 

677 conflict_child = self.get(child.name) 

678 if conflict_child is not None: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 conflict_child.unlink(recursive=True) 

680 self._children[child.name] = child 

681 

682 @property 

683 def tar_path(self) -> str: 

684 path = self.path 

685 if self.is_dir: 

686 return path + "/" 

687 return path 

688 

689 @property 

690 def path(self) -> str: 

691 parent_path = self.parent_dir_path 

692 if ( 

693 self._parent_path_cache is not None 

694 and self._parent_path_cache == parent_path 

695 ): 

696 return assume_not_none(self._path_cache) 

697 if parent_path is None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 raise ReferenceError( 

699 f"The path {self.name} is detached! {self.__class__.__name__}" 

700 ) 

701 self._parent_path_cache = parent_path 

702 ret = os.path.join(parent_path, self.name) 

703 self._path_cache = ret 

704 return ret 

705 

706 @property 

707 def parent_dir(self) -> Optional["FSPath"]: 

708 p_ref = self._parent_dir 

709 p = p_ref() if p_ref is not None else None 

710 if p is None: 710 ↛ 711line 710 didn't jump to line 711 because the condition on line 710 was never true

711 raise ReferenceError( 

712 f"The path {self.name} is detached! {self.__class__.__name__}" 

713 ) 

714 return p 

715 

716 @parent_dir.setter 

717 def parent_dir(self, new_parent: Optional["FSPath"]) -> None: 

718 self._rw_check() 

719 if new_parent is not None: 

720 if not new_parent.is_dir: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 raise ValueError( 

722 f"The parent {new_parent._orphan_safe_path()} must be a directory" 

723 ) 

724 new_parent._rw_check() 

725 old_parent = None 

726 self._last_known_parent_path = None 

727 if not self.is_detached: 

728 old_parent = self.parent_dir 

729 old_parent_children = assume_not_none(assume_not_none(old_parent)._children) 

730 del old_parent_children[self.name] 

731 if new_parent is not None: 

732 self._parent_dir = ref(new_parent) 

733 new_parent._add_child(self) 

734 else: 

735 if old_parent is not None and not old_parent.is_detached: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true

736 self._last_known_parent_path = old_parent.path 

737 self._parent_dir = None 

738 self._parent_path_cache = None 

739 

740 @property 

741 def parent_dir_path(self) -> Optional[str]: 

742 if self.is_detached: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true

743 return self._last_known_parent_path 

744 return assume_not_none(self.parent_dir).path 

745 

746 def chown( 

747 self, 

748 owner: Optional[StaticFileSystemOwner], 

749 group: Optional[StaticFileSystemGroup], 

750 ) -> None: 

751 """Change the owner/group of this path 

752 

753 :param owner: The desired owner definition for this path. If None, then no change of owner is performed. 

754 :param group: The desired group definition for this path. If None, then no change of group is performed. 

755 """ 

756 self._rw_check() 

757 

758 if owner is not None: 

759 self._owner = owner.ownership_definition 

760 if group is not None: 

761 self._group = group.ownership_definition 

762 

763 def stat(self) -> os.stat_result: 

764 st = self._stat_cache 

765 if st is None: 

766 st = self._uncached_stat() 

767 self._stat_cache = st 

768 return st 

769 

770 def _uncached_stat(self) -> os.stat_result: 

771 return os.lstat(self.fs_path) 

772 

773 @property 

774 def mode(self) -> int: 

775 current_mode = self._mode 

776 if current_mode is None: 776 ↛ 777line 776 didn't jump to line 777 because the condition on line 776 was never true

777 current_mode = stat.S_IMODE(self.stat().st_mode) 

778 self._mode = current_mode 

779 return current_mode 

780 

781 @mode.setter 

782 def mode(self, new_mode: int) -> None: 

783 self._rw_check() 

784 min_bit = 0o700 if self.is_dir else 0o400 

785 if (new_mode & min_bit) != min_bit: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 omode = oct(new_mode)[2:] 

787 omin = oct(min_bit)[2:] 

788 raise ValueError( 

789 f'Attempt to set mode of path "{self._orphan_safe_path()}" to {omode} rejected;' 

790 f" Minimum requirements are {omin} (read-bit and, for dirs, exec bit for user)." 

791 " There are no paths that do not need these requirements met and they can cause" 

792 " problems during build or on the final system." 

793 ) 

794 self._mode = new_mode 

795 

796 def _ensure_min_mode(self) -> None: 

797 min_bit = 0o700 if self.is_dir else 0o600 

798 if self.has_fs_path and (self.mode & 0o600) != 0o600: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true

799 try: 

800 fs_path = self.fs_path 

801 except TestPathWithNonExistentFSPathError: 

802 pass 

803 else: 

804 st = os.stat(fs_path) 

805 new_fs_mode = stat.S_IMODE(st.st_mode) | min_bit 

806 _debug_log( 

807 f"Applying chmod {oct(min_bit)[2:]} {fs_path} ({self.path}) to avoid problems down the line" 

808 ) 

809 os.chmod(fs_path, new_fs_mode) 

810 self.mode |= min_bit 

811 

812 @property 

813 def mtime(self) -> float: 

814 mtime = self._mtime 

815 if mtime is None: 

816 mtime = self.stat().st_mtime 

817 self._mtime = mtime 

818 return mtime 

819 

820 @mtime.setter 

821 def mtime(self, new_mtime: float) -> None: 

822 self._rw_check() 

823 self._mtime = new_mtime 

824 

825 @property 

826 def tar_owner_info(self) -> Tuple[str, int, str, int]: 

827 owner = self._owner 

828 group = self._group 

829 return ( 

830 owner.entity_name, 

831 owner.entity_id, 

832 group.entity_name, 

833 group.entity_id, 

834 ) 

835 

836 @property 

837 def _can_replace_inline(self) -> bool: 

838 return False 

839 

840 @contextlib.contextmanager 

841 def add_file( 

842 self, 

843 name: str, 

844 *, 

845 unlink_if_exists: bool = True, 

846 use_fs_path_mode: bool = False, 

847 mode: int = 0o0644, 

848 mtime: Optional[float] = None, 

849 # Special-case parameters that are not exposed in the API 

850 fs_basename_matters: bool = False, 

851 subdir_key: Optional[str] = None, 

852 ) -> Iterator["FSPath"]: 

853 if "/" in name or name in {".", ".."}: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true

854 raise ValueError(f'Invalid file name: "{name}"') 

855 if not self.is_dir: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 raise TypeError( 

857 f"Cannot create {self._orphan_safe_path()}/{name}:" 

858 f" {self._orphan_safe_path()} is not a directory" 

859 ) 

860 self._rw_check() 

861 existing = self.get(name) 

862 if existing is not None: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 if not unlink_if_exists: 

864 raise ValueError( 

865 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

866 f" and exist_ok was False" 

867 ) 

868 existing.unlink(recursive=False) 

869 

870 if fs_basename_matters and subdir_key is None: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true

871 raise ValueError( 

872 "When fs_basename_matters is True, a subdir_key must be provided" 

873 ) 

874 

875 directory = generated_content_dir(subdir_key=subdir_key) 

876 

877 if fs_basename_matters: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 fs_path = os.path.join(directory, name) 

879 with open(fs_path, "xb") as _: 

880 # Ensure that the fs_path exists 

881 pass 

882 child = FSBackedFilePath( 

883 name, 

884 self, 

885 fs_path, 

886 replaceable_inline=True, 

887 mtime=mtime, 

888 ) 

889 yield child 

890 else: 

891 with tempfile.NamedTemporaryFile( 

892 dir=directory, suffix=f"__{name}", delete=False 

893 ) as fd: 

894 fs_path = fd.name 

895 child = FSBackedFilePath( 

896 name, 

897 self, 

898 fs_path, 

899 replaceable_inline=True, 

900 mtime=mtime, 

901 ) 

902 fd.close() 

903 yield child 

904 

905 if use_fs_path_mode: 905 ↛ 907line 905 didn't jump to line 907 because the condition on line 905 was never true

906 # Ensure the caller can see the current mode 

907 os.chmod(fs_path, mode) 

908 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

909 child._reset_caches() 

910 if not use_fs_path_mode: 910 ↛ exitline 910 didn't return from function 'add_file' because the condition on line 910 was always true

911 child.mode = mode 

912 

913 def insert_file_from_fs_path( 

914 self, 

915 name: str, 

916 fs_path: str, 

917 *, 

918 exist_ok: bool = True, 

919 use_fs_path_mode: bool = False, 

920 mode: int = 0o0644, 

921 require_copy_on_write: bool = True, 

922 follow_symlinks: bool = True, 

923 reference_path: Optional[VirtualPath] = None, 

924 ) -> "FSPath": 

925 if "/" in name or name in {".", ".."}: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 raise ValueError(f'Invalid file name: "{name}"') 

927 if not self.is_dir: 927 ↛ 928line 927 didn't jump to line 928 because the condition on line 927 was never true

928 raise TypeError( 

929 f"Cannot create {self._orphan_safe_path()}/{name}:" 

930 f" {self._orphan_safe_path()} is not a directory" 

931 ) 

932 self._rw_check() 

933 if name in self and not exist_ok: 933 ↛ 934line 933 didn't jump to line 934 because the condition on line 933 was never true

934 raise ValueError( 

935 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

936 f" and exist_ok was False" 

937 ) 

938 new_fs_path = fs_path 

939 if follow_symlinks: 

940 if reference_path is not None: 940 ↛ 941line 940 didn't jump to line 941 because the condition on line 940 was never true

941 raise ValueError( 

942 "The reference_path cannot be used with follow_symlinks" 

943 ) 

944 new_fs_path = os.path.realpath(new_fs_path, strict=True) 

945 

946 fmode: Optional[int] = mode 

947 if use_fs_path_mode: 

948 fmode = None 

949 

950 st = None 

951 if reference_path is None: 

952 st = os.lstat(new_fs_path) 

953 if stat.S_ISDIR(st.st_mode): 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true

954 raise ValueError( 

955 f'The provided path "{fs_path}" is a directory. However, this' 

956 " method does not support directories" 

957 ) 

958 

959 if not stat.S_ISREG(st.st_mode): 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 if follow_symlinks: 

961 raise ValueError( 

962 f"The resolved fs_path ({new_fs_path}) was not a file." 

963 ) 

964 raise ValueError(f"The provided fs_path ({fs_path}) was not a file.") 

965 return FSBackedFilePath( 

966 name, 

967 self, 

968 new_fs_path, 

969 initial_mode=fmode, 

970 stat_cache=st, 

971 replaceable_inline=not require_copy_on_write, 

972 reference_path=reference_path, 

973 ) 

974 

975 def add_symlink( 

976 self, 

977 link_name: str, 

978 link_target: str, 

979 *, 

980 reference_path: Optional[VirtualPath] = None, 

981 ) -> "FSPath": 

982 if "/" in link_name or link_name in {".", ".."}: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true

983 raise ValueError( 

984 f'Invalid file name: "{link_name}" (it must be a valid basename)' 

985 ) 

986 if not self.is_dir: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true

987 raise TypeError( 

988 f"Cannot create {self._orphan_safe_path()}/{link_name}:" 

989 f" {self._orphan_safe_path()} is not a directory" 

990 ) 

991 self._rw_check() 

992 

993 existing = self.get(link_name) 

994 if existing: 994 ↛ 996line 994 didn't jump to line 996 because the condition on line 994 was never true

995 # Emulate ln -sf with attempts a non-recursive unlink first. 

996 existing.unlink(recursive=False) 

997 

998 return SymlinkVirtualPath( 

999 link_name, 

1000 self, 

1001 link_target, 

1002 reference_path=reference_path, 

1003 ) 

1004 

1005 def mkdir( 

1006 self, 

1007 name: str, 

1008 *, 

1009 reference_path: Optional[VirtualPath] = None, 

1010 ) -> "FSPath": 

1011 if "/" in name or name in {".", ".."}: 1011 ↛ 1012line 1011 didn't jump to line 1012 because the condition on line 1011 was never true

1012 raise ValueError( 

1013 f'Invalid file name: "{name}" (it must be a valid basename)' 

1014 ) 

1015 if not self.is_dir: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 raise TypeError( 

1017 f"Cannot create {self._orphan_safe_path()}/{name}:" 

1018 f" {self._orphan_safe_path()} is not a directory" 

1019 ) 

1020 if reference_path is not None and not reference_path.is_dir: 1020 ↛ 1021line 1020 didn't jump to line 1021 because the condition on line 1020 was never true

1021 raise ValueError( 

1022 f'The provided fs_path "{reference_path.fs_path}" exist but it is not a directory!' 

1023 ) 

1024 self._rw_check() 

1025 

1026 existing = self.get(name) 

1027 if existing: 1027 ↛ 1028line 1027 didn't jump to line 1028 because the condition on line 1027 was never true

1028 raise ValueError(f"Path {existing.path} already exist") 

1029 return VirtualDirectoryFSPath(name, self, reference_path=reference_path) 

1030 

1031 def mkdirs(self, path: str) -> "FSPath": 

1032 return cast("FSPath", super().mkdirs(path)) 

1033 

1034 @property 

1035 def is_read_write(self) -> bool: 

1036 """When true, the file system entry may be mutated 

1037 

1038 :return: Whether file system mutations are permitted. 

1039 """ 

1040 if self.is_detached: 

1041 return True 

1042 return assume_not_none(self.parent_dir).is_read_write 

1043 

1044 def unlink(self, *, recursive: bool = False) -> None: 

1045 """Unlink a file or a directory 

1046 

1047 This operation will detach the path from the file system (causing "is_detached" to return True). 

1048 

1049 Note that the root directory cannot be deleted. 

1050 

1051 :param recursive: If True, then non-empty directories will be unlinked as well removing everything inside them 

1052 as well. When False, an error is raised if the path is a non-empty directory 

1053 """ 

1054 if self.is_detached: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 return 

1056 if not recursive and any(self.iterdir): 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true

1057 raise ValueError( 

1058 f'Refusing to unlink "{self.path}": The directory was not empty and recursive was False' 

1059 ) 

1060 # The .parent_dir setter does a _rw_check() for us. 

1061 self.parent_dir = None 

1062 

1063 def _reset_caches(self) -> None: 

1064 self._mtime = None 

1065 self._stat_cache = None 

1066 

1067 def metadata( 

1068 self, 

1069 metadata_type: Type[PMT], 

1070 *, 

1071 owning_plugin: Optional[str] = None, 

1072 ) -> PathMetadataReference[PMT]: 

1073 current_plugin = self._current_plugin() 

1074 if owning_plugin is None: 1074 ↛ 1076line 1074 didn't jump to line 1076 because the condition on line 1074 was always true

1075 owning_plugin = current_plugin 

1076 metadata_key = (owning_plugin, metadata_type) 

1077 metadata_value = self._metadata.get(metadata_key) 

1078 if metadata_value is None: 

1079 if self.is_detached: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true

1080 raise TypeError( 

1081 f"Cannot access the metadata {metadata_type.__name__}: The path is detached." 

1082 ) 

1083 if not self.is_read_write: 

1084 return AlwaysEmptyReadOnlyMetadataReference( 

1085 owning_plugin, 

1086 current_plugin, 

1087 metadata_type, 

1088 ) 

1089 metadata_value = PathMetadataValue(owning_plugin, metadata_type) 

1090 self._metadata[metadata_key] = metadata_value 

1091 return PathMetadataReferenceImplementation( 

1092 self, 

1093 current_plugin, 

1094 metadata_value, 

1095 ) 

1096 

1097 @contextlib.contextmanager 

1098 def replace_fs_path_content( 

1099 self, 

1100 *, 

1101 use_fs_path_mode: bool = False, 

1102 ) -> Iterator[str]: 

1103 if not self.is_file: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 raise TypeError( 

1105 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

1106 ) 

1107 self._rw_check() 

1108 fs_path = self.fs_path 

1109 if not self._can_replace_inline: 1109 ↛ 1121line 1109 didn't jump to line 1121 because the condition on line 1109 was always true

1110 fs_path = self.fs_path 

1111 directory = generated_content_dir() 

1112 with tempfile.NamedTemporaryFile( 

1113 dir=directory, suffix=f"__{self.name}", delete=False 

1114 ) as new_path_fd: 

1115 new_path_fd.close() 

1116 _cp_a(fs_path, new_path_fd.name) 

1117 fs_path = new_path_fd.name 

1118 self._replaced_path(fs_path) 

1119 assert self.fs_path == fs_path 

1120 

1121 current_mtime = self._mtime 

1122 if current_mtime is not None: 

1123 os.utime(fs_path, (current_mtime, current_mtime)) 

1124 

1125 current_mode = self.mode 

1126 yield fs_path 

1127 _check_fs_path_is_file(fs_path, unlink_on_error=self) 

1128 if not use_fs_path_mode: 1128 ↛ 1130line 1128 didn't jump to line 1130 because the condition on line 1128 was always true

1129 os.chmod(fs_path, current_mode) 

1130 self._reset_caches() 

1131 

1132 def _replaced_path(self, new_fs_path: str) -> None: 

1133 raise NotImplementedError 

1134 

1135 

1136class VirtualFSPathBase(FSPath, ABC): 

1137 __slots__ = () 

1138 

1139 def __init__( 

1140 self, 

1141 basename: str, 

1142 parent: Optional["FSPath"], 

1143 children: Optional[Dict[str, "FSPath"]] = None, 

1144 initial_mode: Optional[int] = None, 

1145 mtime: Optional[float] = None, 

1146 stat_cache: Optional[os.stat_result] = None, 

1147 ) -> None: 

1148 super().__init__( 

1149 basename, 

1150 parent, 

1151 children, 

1152 initial_mode=initial_mode, 

1153 mtime=mtime, 

1154 stat_cache=stat_cache, 

1155 ) 

1156 

1157 @property 

1158 def mtime(self) -> float: 

1159 mtime = self._mtime 

1160 if mtime is None: 

1161 mtime = time.time() 

1162 self._mtime = mtime 

1163 return mtime 

1164 

1165 @property 

1166 def has_fs_path(self) -> bool: 

1167 return False 

1168 

1169 def stat(self) -> os.stat_result: 

1170 if not self.has_fs_path: 

1171 raise PureVirtualPathError( 

1172 "stat() is only applicable to paths backed by the file system. The path" 

1173 f" {self._orphan_safe_path()!r} is purely virtual" 

1174 ) 

1175 return super().stat() 

1176 

1177 @property 

1178 def fs_path(self) -> str: 

1179 if not self.has_fs_path: 

1180 raise PureVirtualPathError( 

1181 "fs_path is only applicable to paths backed by the file system. The path" 

1182 f" {self._orphan_safe_path()!r} is purely virtual" 

1183 ) 

1184 return self.fs_path 

1185 

1186 

1187class FSRootDir(FSPath): 

1188 __slots__ = ("_fs_path", "_fs_read_write", "_plugin_context") 

1189 

1190 def __init__(self, fs_path: Optional[str] = None) -> None: 

1191 self._fs_path = fs_path 

1192 self._fs_read_write = True 

1193 super().__init__( 

1194 ".", 

1195 None, 

1196 children={}, 

1197 initial_mode=0o755, 

1198 ) 

1199 self._plugin_context = CurrentPluginContextManager("debputy") 

1200 

1201 @property 

1202 def is_detached(self) -> bool: 

1203 return False 

1204 

1205 def _orphan_safe_path(self) -> str: 

1206 return self.name 

1207 

1208 @property 

1209 def path(self) -> str: 

1210 return self.name 

1211 

1212 @property 

1213 def parent_dir(self) -> Optional["FSPath"]: 

1214 return None 

1215 

1216 @parent_dir.setter 

1217 def parent_dir(self, new_parent: Optional[FSPath]) -> None: 

1218 if new_parent is not None: 

1219 raise ValueError("The root directory cannot become a non-root directory") 

1220 

1221 @property 

1222 def parent_dir_path(self) -> Optional[str]: 

1223 return None 

1224 

1225 @property 

1226 def is_dir(self) -> bool: 

1227 return True 

1228 

1229 @property 

1230 def is_file(self) -> bool: 

1231 return False 

1232 

1233 @property 

1234 def is_symlink(self) -> bool: 

1235 return False 

1236 

1237 def readlink(self) -> str: 

1238 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1239 

1240 @property 

1241 def has_fs_path(self) -> bool: 

1242 return self._fs_path is not None 

1243 

1244 def stat(self) -> os.stat_result: 

1245 if not self.has_fs_path: 

1246 raise PureVirtualPathError( 

1247 "stat() is only applicable to paths backed by the file system. The path" 

1248 f" {self._orphan_safe_path()!r} is purely virtual" 

1249 ) 

1250 return os.stat(self.fs_path) 

1251 

1252 @property 

1253 def fs_path(self) -> str: 

1254 if not self.has_fs_path: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true

1255 raise PureVirtualPathError( 

1256 "fs_path is only applicable to paths backed by the file system. The path" 

1257 f" {self._orphan_safe_path()!r} is purely virtual" 

1258 ) 

1259 return assume_not_none(self._fs_path) 

1260 

1261 @property 

1262 def is_read_write(self) -> bool: 

1263 return self._fs_read_write 

1264 

1265 @is_read_write.setter 

1266 def is_read_write(self, new_value: bool) -> None: 

1267 self._fs_read_write = new_value 

1268 

1269 def prune_if_empty_dir(self) -> None: 

1270 # No-op for the root directory. There is never a case where you want to delete this directory 

1271 # (and even if you could, debputy will need it for technical reasons, so the root dir stays) 

1272 return 

1273 

1274 def unlink(self, *, recursive: bool = False) -> None: 

1275 # There is never a case where you want to delete this directory (and even if you could, 

1276 # debputy will need it for technical reasons, so the root dir stays) 

1277 raise TypeError("Cannot delete the root directory") 

1278 

1279 def _current_plugin(self) -> str: 

1280 return self._plugin_context.current_plugin_name 

1281 

1282 @contextlib.contextmanager 

1283 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

1284 with self._plugin_context.change_plugin_context(new_plugin) as r: 

1285 yield r 

1286 

1287 

1288class VirtualPathWithReference(VirtualFSPathBase, ABC): 

1289 __slots__ = ("_reference_path",) 

1290 

1291 def __init__( 

1292 self, 

1293 basename: str, 

1294 parent: FSPath, 

1295 *, 

1296 default_mode: int, 

1297 reference_path: Optional[VirtualPath] = None, 

1298 ) -> None: 

1299 super().__init__( 

1300 basename, 

1301 parent=parent, 

1302 initial_mode=reference_path.mode if reference_path else default_mode, 

1303 ) 

1304 self._reference_path = reference_path 

1305 

1306 @property 

1307 def has_fs_path(self) -> bool: 

1308 ref_path = self._reference_path 

1309 return ref_path is not None and ref_path.has_fs_path 

1310 

1311 @property 

1312 def mtime(self) -> float: 

1313 mtime = self._mtime 

1314 if mtime is None: 1314 ↛ 1321line 1314 didn't jump to line 1321 because the condition on line 1314 was always true

1315 ref_path = self._reference_path 

1316 if ref_path: 1316 ↛ 1319line 1316 didn't jump to line 1319 because the condition on line 1316 was always true

1317 mtime = ref_path.mtime 

1318 else: 

1319 mtime = super().mtime 

1320 self._mtime = mtime 

1321 return mtime 

1322 

1323 @mtime.setter 

1324 def mtime(self, new_mtime: float) -> None: 

1325 self._rw_check() 

1326 self._mtime = new_mtime 

1327 

1328 @property 

1329 def fs_path(self) -> str: 

1330 ref_path = self._reference_path 

1331 if ref_path is not None and ( 1331 ↛ 1335line 1331 didn't jump to line 1335 because the condition on line 1331 was always true

1332 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1333 ): 

1334 return ref_path.fs_path 

1335 return super().fs_path 

1336 

1337 def stat(self) -> os.stat_result: 

1338 ref_path = self._reference_path 

1339 if ref_path is not None and ( 

1340 not super().has_fs_path or super().fs_path == ref_path.fs_path 

1341 ): 

1342 return ref_path.stat() 

1343 return super().stat() 

1344 

1345 def open( 

1346 self, 

1347 *, 

1348 byte_io: bool = False, 

1349 buffering: int = -1, 

1350 ) -> Union[TextIO, BinaryIO]: 

1351 reference_path = self._reference_path 

1352 if reference_path is not None and reference_path.fs_path == self.fs_path: 

1353 return reference_path.open(byte_io=byte_io, buffering=buffering) 

1354 return super().open(byte_io=byte_io, buffering=buffering) 

1355 

1356 

1357class VirtualDirectoryFSPath(VirtualPathWithReference): 

1358 __slots__ = ("_reference_path",) 

1359 

1360 def __init__( 

1361 self, 

1362 basename: str, 

1363 parent: FSPath, 

1364 *, 

1365 reference_path: Optional[VirtualPath] = None, 

1366 ) -> None: 

1367 super().__init__( 

1368 basename, 

1369 parent, 

1370 reference_path=reference_path, 

1371 default_mode=0o755, 

1372 ) 

1373 self._reference_path = reference_path 

1374 assert reference_path is None or reference_path.is_dir 

1375 self._ensure_min_mode() 

1376 

1377 @property 

1378 def is_dir(self) -> bool: 

1379 return True 

1380 

1381 @property 

1382 def is_file(self) -> bool: 

1383 return False 

1384 

1385 @property 

1386 def is_symlink(self) -> bool: 

1387 return False 

1388 

1389 def readlink(self) -> str: 

1390 raise TypeError(f'"{self._orphan_safe_path()!r}" is a directory; not a symlink') 

1391 

1392 

1393class SymlinkVirtualPath(VirtualPathWithReference): 

1394 __slots__ = ("_link_target",) 

1395 

1396 def __init__( 

1397 self, 

1398 basename: str, 

1399 parent_dir: FSPath, 

1400 link_target: str, 

1401 *, 

1402 reference_path: Optional[VirtualPath] = None, 

1403 ) -> None: 

1404 super().__init__( 

1405 basename, 

1406 parent=parent_dir, 

1407 default_mode=_SYMLINK_MODE, 

1408 reference_path=reference_path, 

1409 ) 

1410 self._link_target = link_target 

1411 

1412 @property 

1413 def is_dir(self) -> bool: 

1414 return False 

1415 

1416 @property 

1417 def is_file(self) -> bool: 

1418 return False 

1419 

1420 @property 

1421 def is_symlink(self) -> bool: 

1422 return True 

1423 

1424 def readlink(self) -> str: 

1425 return self._link_target 

1426 

1427 @property 

1428 def size(self) -> int: 

1429 return len(self.readlink()) 

1430 

1431 

1432class FSBackedFilePath(VirtualPathWithReference): 

1433 __slots__ = ("_fs_path", "_replaceable_inline") 

1434 

1435 def __init__( 

1436 self, 

1437 basename: str, 

1438 parent_dir: FSPath, 

1439 fs_path: str, 

1440 *, 

1441 replaceable_inline: bool = False, 

1442 initial_mode: Optional[int] = None, 

1443 mtime: Optional[float] = None, 

1444 stat_cache: Optional[os.stat_result] = None, 

1445 reference_path: Optional[VirtualPath] = None, 

1446 ) -> None: 

1447 super().__init__( 

1448 basename, 

1449 parent_dir, 

1450 default_mode=0o644, 

1451 reference_path=reference_path, 

1452 ) 

1453 self._fs_path = fs_path 

1454 self._replaceable_inline = replaceable_inline 

1455 if initial_mode is not None: 

1456 self.mode = initial_mode 

1457 if mtime is not None: 

1458 self._mtime = mtime 

1459 self._stat_cache = stat_cache 

1460 assert ( 

1461 not replaceable_inline or "debputy/scratch-dir/" in fs_path 

1462 ), f"{fs_path} should not be inline-replaceable -- {self.path}" 

1463 self._ensure_min_mode() 

1464 

1465 @property 

1466 def is_dir(self) -> bool: 

1467 return False 

1468 

1469 @property 

1470 def is_file(self) -> bool: 

1471 return True 

1472 

1473 @property 

1474 def is_symlink(self) -> bool: 

1475 return False 

1476 

1477 def readlink(self) -> str: 

1478 raise TypeError(f'"{self._orphan_safe_path()!r}" is a file; not a symlink') 

1479 

1480 @property 

1481 def has_fs_path(self) -> bool: 

1482 return True 

1483 

1484 @property 

1485 def fs_path(self) -> str: 

1486 return self._fs_path 

1487 

1488 @property 

1489 def _can_replace_inline(self) -> bool: 

1490 return self._replaceable_inline 

1491 

1492 def _replaced_path(self, new_fs_path: str) -> None: 

1493 self._fs_path = new_fs_path 

1494 self._reference_path = None 

1495 self._replaceable_inline = True 

1496 

1497 

1498_SYMLINK_MODE = 0o777 

1499 

1500 

1501class VirtualTestPath(FSPath): 

1502 __slots__ = ( 

1503 "_path_type", 

1504 "_has_fs_path", 

1505 "_fs_path", 

1506 "_link_target", 

1507 "_content", 

1508 "_materialized_content", 

1509 ) 

1510 

1511 def __init__( 

1512 self, 

1513 basename: str, 

1514 parent_dir: Optional[FSPath], 

1515 mode: Optional[int] = None, 

1516 mtime: Optional[float] = None, 

1517 is_dir: bool = False, 

1518 has_fs_path: Optional[bool] = False, 

1519 fs_path: Optional[str] = None, 

1520 link_target: Optional[str] = None, 

1521 content: Optional[str] = None, 

1522 materialized_content: Optional[str] = None, 

1523 ) -> None: 

1524 if is_dir: 

1525 self._path_type = PathType.DIRECTORY 

1526 elif link_target is not None: 

1527 self._path_type = PathType.SYMLINK 

1528 if mode is not None and mode != _SYMLINK_MODE: 1528 ↛ 1529line 1528 didn't jump to line 1529 because the condition on line 1528 was never true

1529 raise ValueError( 

1530 f'Please do not assign a mode to symlinks. Triggered for "{basename}".' 

1531 ) 

1532 assert mode is None or mode == _SYMLINK_MODE 

1533 else: 

1534 self._path_type = PathType.FILE 

1535 

1536 if mode is not None: 

1537 initial_mode = mode 

1538 else: 

1539 initial_mode = 0o755 if is_dir else 0o644 

1540 

1541 self._link_target = link_target 

1542 if has_fs_path is None: 

1543 has_fs_path = bool(fs_path) 

1544 self._has_fs_path = has_fs_path 

1545 self._fs_path = fs_path 

1546 self._materialized_content = materialized_content 

1547 super().__init__( 

1548 basename, 

1549 parent=parent_dir, 

1550 initial_mode=initial_mode, 

1551 mtime=mtime, 

1552 ) 

1553 self._content = content 

1554 

1555 @property 

1556 def is_dir(self) -> bool: 

1557 return self._path_type == PathType.DIRECTORY 

1558 

1559 @property 

1560 def is_file(self) -> bool: 

1561 return self._path_type == PathType.FILE 

1562 

1563 @property 

1564 def is_symlink(self) -> bool: 

1565 return self._path_type == PathType.SYMLINK 

1566 

1567 def readlink(self) -> str: 

1568 if not self.is_symlink: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true

1569 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1570 link_target = self._link_target 

1571 assert link_target is not None 

1572 return link_target 

1573 

1574 @property 

1575 def mtime(self) -> float: 

1576 if self._mtime is None: 

1577 self._mtime = time.time() 

1578 return self._mtime 

1579 

1580 @mtime.setter 

1581 def mtime(self, new_mtime: float) -> None: 

1582 self._rw_check() 

1583 self._mtime = new_mtime 

1584 

1585 @property 

1586 def has_fs_path(self) -> bool: 

1587 return self._has_fs_path 

1588 

1589 def stat(self) -> os.stat_result: 

1590 if self.has_fs_path: 

1591 path = self.fs_path 

1592 if path is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true

1593 raise PureVirtualPathError( 

1594 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1595 " cannot provide!" 

1596 ) 

1597 try: 

1598 return os.stat(path) 

1599 except FileNotFoundError as e: 

1600 raise PureVirtualPathError( 

1601 f"The test wants a real stat of {self._orphan_safe_path()!r}, which this mock path" 

1602 " cannot provide! (An fs_path was provided, but it did not exist)" 

1603 ) from e 

1604 

1605 raise PureVirtualPathError( 

1606 "stat() is only applicable to paths backed by the file system. The path" 

1607 f" {self._orphan_safe_path()!r} is purely virtual" 

1608 ) 

1609 

1610 @property 

1611 def size(self) -> int: 

1612 if self._content is not None: 

1613 return len(self._content.encode("utf-8")) 

1614 if self.is_symlink: 

1615 return len(self.readlink()) 

1616 if not self.has_fs_path or self.fs_path is None: 

1617 return 0 

1618 return self.stat().st_size 

1619 

1620 @property 

1621 def fs_path(self) -> str: 

1622 if self.has_fs_path: 

1623 if self._fs_path is None and self._materialized_content is not None: 

1624 with tempfile.NamedTemporaryFile( 

1625 mode="w+t", 

1626 encoding="utf-8", 

1627 suffix=f"__{self.name}", 

1628 delete=False, 

1629 ) as fd: 

1630 filepath = fd.name 

1631 fd.write(self._materialized_content) 

1632 self._fs_path = filepath 

1633 atexit.register(lambda: os.unlink(filepath)) 

1634 

1635 path = self._fs_path 

1636 if path is None: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true

1637 raise PureVirtualPathError( 

1638 f"The test wants a real file system entry of {self._orphan_safe_path()!r}, which this " 

1639 " mock path cannot provide!" 

1640 ) 

1641 return path 

1642 raise PureVirtualPathError( 

1643 "fs_path is only applicable to paths backed by the file system. The path" 

1644 f" {self._orphan_safe_path()!r} is purely virtual" 

1645 ) 

1646 

1647 def replace_fs_path_content( 

1648 self, 

1649 *, 

1650 use_fs_path_mode: bool = False, 

1651 ) -> ContextManager[str]: 

1652 if self._content is not None: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true

1653 raise TypeError( 

1654 f"The `replace_fs_path_content()` method was called on {self.path}. Said path was" 

1655 " created with `content` but for this method to work, the path should have been" 

1656 " created with `materialized_content`" 

1657 ) 

1658 return super().replace_fs_path_content(use_fs_path_mode=use_fs_path_mode) 

1659 

1660 @contextlib.contextmanager 

1661 def open_child( 

1662 self, 

1663 name: str, 

1664 mode: OpenMode = "r", 

1665 buffering: int = -1, 

1666 ) -> Union[TextIO, BinaryIO]: 

1667 existing = self.get(name) 

1668 if existing or "r" in mode: 

1669 with super().open_child(name, mode, buffering=buffering) as fd: 

1670 yield fd 

1671 return 

1672 if "b" in mode: 

1673 fd = io.BytesIO(b"") 

1674 yield fd 

1675 content = fd.getvalue().decode("utf-8") 

1676 else: 

1677 fd = io.StringIO("") 

1678 yield fd 

1679 content = fd.getvalue() 

1680 VirtualTestPath( 

1681 name, 

1682 self, 

1683 mode=0o644, 

1684 content=content, 

1685 has_fs_path=True, 

1686 ) 

1687 

1688 def open( 

1689 self, 

1690 *, 

1691 byte_io: bool = False, 

1692 buffering: int = -1, 

1693 ) -> Union[TextIO, BinaryIO]: 

1694 if self._content is None: 

1695 try: 

1696 return super().open(byte_io=byte_io, buffering=buffering) 

1697 except FileNotFoundError as e: 

1698 raise TestPathWithNonExistentFSPathError( 

1699 f"The test path {self.path} had an fs_path {self._fs_path}, which does not" 

1700 " exist. This exception can only occur in the testsuite. Either have the" 

1701 " test provide content for the path (`virtual_path_def(..., content=...) or," 

1702 " if that is too painful in general, have the code accept this error as a " 

1703 " test only-case and provide a default." 

1704 ) from e 

1705 

1706 if byte_io: 

1707 return io.BytesIO(self._content.encode("utf-8")) 

1708 return io.StringIO(self._content) 

1709 

1710 def _replaced_path(self, new_fs_path: str) -> None: 

1711 self._fs_path = new_fs_path 

1712 

1713 

1714class FSOverlayBase(VirtualPathBase, Generic[FSP]): 

1715 __slots__ = ( 

1716 "_path", 

1717 "_fs_path", 

1718 "_parent", 

1719 "__weakref__", 

1720 ) 

1721 

1722 def __init__( 

1723 self, 

1724 path: str, 

1725 fs_path: str, 

1726 parent: Optional[FSP], 

1727 ) -> None: 

1728 self._path: str = path 

1729 prefix = "/" if fs_path.startswith("/") else "" 

1730 self._fs_path: str = prefix + _normalize_path(fs_path, with_prefix=False) 

1731 self._parent: Optional[ReferenceType[FSP]] = ( 

1732 ref(parent) if parent is not None else None 

1733 ) 

1734 

1735 @property 

1736 def name(self) -> str: 

1737 return os.path.basename(self._path) 

1738 

1739 @property 

1740 def path(self) -> str: 

1741 return self._path 

1742 

1743 @property 

1744 def parent_dir(self) -> Optional["FSP"]: 

1745 parent = self._parent 

1746 if parent is None: 

1747 return None 

1748 resolved = parent() 

1749 if resolved is None: 

1750 raise RuntimeError("Parent was garbage collected!") 

1751 return resolved 

1752 

1753 @property 

1754 def fs_path(self) -> str: 

1755 return self._fs_path 

1756 

1757 def stat(self) -> os.stat_result: 

1758 return os.lstat(self.fs_path) 

1759 

1760 @property 

1761 def is_dir(self) -> bool: 

1762 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1763 try: 

1764 return stat.S_ISDIR(self.stat().st_mode) 

1765 except FileNotFoundError: 

1766 return False 

1767 except NotImplementedError: 

1768 print(self.__class__) 

1769 

1770 @property 

1771 def is_file(self) -> bool: 

1772 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1773 try: 

1774 return stat.S_ISREG(self.stat().st_mode) 

1775 except FileNotFoundError: 

1776 return False 

1777 

1778 @property 

1779 def is_symlink(self) -> bool: 

1780 # The root path can have a non-existent fs_path (such as d/tmp not always existing) 

1781 try: 

1782 return stat.S_ISLNK(self.stat().st_mode) 

1783 except FileNotFoundError: 

1784 return False 

1785 

1786 @property 

1787 def has_fs_path(self) -> bool: 

1788 return True 

1789 

1790 def open( 

1791 self, 

1792 *, 

1793 byte_io: bool = False, 

1794 buffering: int = -1, 

1795 ) -> Union[TextIO, BinaryIO]: 

1796 # Allow symlinks for open here, because we can let the OS resolve the symlink reliably in this 

1797 # case. 

1798 if not self.is_file and not self.is_symlink: 

1799 raise TypeError( 

1800 f"Cannot open {self.path} for reading: It is not a file nor a symlink" 

1801 ) 

1802 

1803 if byte_io: 

1804 return open(self.fs_path, "rb", buffering=buffering) 

1805 return open(self.fs_path, "rt", encoding="utf-8", buffering=buffering) 

1806 

1807 def metadata( 

1808 self, 

1809 metadata_type: Type[PMT], 

1810 *, 

1811 owning_plugin: Optional[str] = None, 

1812 ) -> PathMetadataReference[PMT]: 

1813 current_plugin = self._current_plugin() 

1814 if owning_plugin is None: 

1815 owning_plugin = current_plugin 

1816 return AlwaysEmptyReadOnlyMetadataReference( 

1817 owning_plugin, 

1818 current_plugin, 

1819 metadata_type, 

1820 ) 

1821 

1822 def all_paths(self) -> Iterable["FSControlPath"]: 

1823 yield self 

1824 if not self.is_dir: 

1825 return 

1826 stack = list(self.iterdir) 

1827 stack.reverse() 

1828 while stack: 

1829 current = stack.pop() 

1830 yield current 

1831 if current.is_dir: 

1832 stack.extend(reversed(list(current.iterdir))) 

1833 

1834 def _resolve_children( 

1835 self, new_child: Callable[[str, str, FSP], FSC] 

1836 ) -> Mapping[str, FSC]: 

1837 if not self.is_dir: 

1838 return {} 

1839 dir_path = self.path 

1840 dir_fs_path = self.fs_path 

1841 children = {} 

1842 for name in sorted(os.listdir(dir_fs_path), key=os.path.basename): 

1843 child_path = os.path.join(dir_path, name) if dir_path != "." else name 

1844 child_fs_path = ( 

1845 os.path.join(dir_fs_path, name) if dir_fs_path != "." else name 

1846 ) 

1847 children[name] = new_child( 

1848 child_path, 

1849 child_fs_path, 

1850 self, 

1851 ) 

1852 return children 

1853 

1854 

1855class FSROOverlay(FSOverlayBase["FSROOverlay"]): 

1856 __slots__ = ( 

1857 "_stat_cache", 

1858 "_readlink_cache", 

1859 "_children", 

1860 "_stat_failed_cache", 

1861 ) 

1862 

1863 def __init__( 

1864 self, 

1865 path: str, 

1866 fs_path: str, 

1867 parent: Optional["FSROOverlay"], 

1868 ) -> None: 

1869 super().__init__(path, fs_path, parent=parent) 

1870 self._stat_cache: Optional[os.stat_result] = None 

1871 self._readlink_cache: Optional[str] = None 

1872 self._stat_failed_cache = False 

1873 self._children: Optional[Mapping[str, FSROOverlay]] = None 

1874 

1875 @classmethod 

1876 def create_root_dir(cls, path: str, fs_path: str) -> "FSROOverlay": 

1877 return FSROOverlay(path, fs_path, None) 

1878 

1879 @property 

1880 def iterdir(self) -> Iterable["FSROOverlay"]: 

1881 if not self.is_dir: 

1882 return 

1883 if self._children is None: 

1884 self._ensure_children_are_resolved() 

1885 yield from assume_not_none(self._children).values() 

1886 

1887 def lookup(self, path: str) -> Optional["FSROOverlay"]: 

1888 if not self.is_dir: 

1889 return None 

1890 if self._children is None: 

1891 self._ensure_children_are_resolved() 

1892 

1893 absolute, _, path_parts = _split_path(path) 

1894 current = cast("FSROOverlay", _root(self)) if absolute else self 

1895 for no, dir_part in enumerate(path_parts): 

1896 if dir_part == ".": 

1897 continue 

1898 if dir_part == "..": 

1899 p = current.parent_dir 

1900 if p is None: 

1901 raise ValueError(f'The path "{path}" escapes the root dir') 

1902 current = cast("FSROOverlay", p) 

1903 continue 

1904 try: 

1905 current = current[dir_part] 

1906 except KeyError: 

1907 return None 

1908 return current 

1909 

1910 def _ensure_children_are_resolved(self) -> None: 

1911 if not self.is_dir or self._children: 

1912 return 

1913 self._children = self._resolve_children( 

1914 lambda n, fsp, p: FSROOverlay(n, fsp, p) 

1915 ) 

1916 

1917 @property 

1918 def is_detached(self) -> bool: 

1919 return False 

1920 

1921 def __getitem__(self, key) -> "VirtualPath": 

1922 if not self.is_dir: 1922 ↛ 1924line 1922 didn't jump to line 1924 because the condition on line 1922 was always true

1923 raise KeyError(key) 

1924 if self._children is None: 

1925 self._ensure_children_are_resolved() 

1926 if isinstance(key, FSPath): 

1927 key = key.name 

1928 return self._children[key] 

1929 

1930 def __delitem__(self, key) -> None: 

1931 self._error_ro_fs() 

1932 

1933 @property 

1934 def is_read_write(self) -> bool: 

1935 return False 

1936 

1937 def _rw_check(self) -> None: 

1938 self._error_ro_fs() 

1939 

1940 def _error_ro_fs(self) -> NoReturn: 

1941 raise DebputyFSIsROError( 

1942 f'Attempt to write to "{self.path}" failed:' 

1943 " Debputy Virtual File system is R/O." 

1944 ) 

1945 

1946 def stat(self) -> os.stat_result: 

1947 if self._stat_failed_cache: 1947 ↛ 1948line 1947 didn't jump to line 1948 because the condition on line 1947 was never true

1948 raise FileNotFoundError( 

1949 errno.ENOENT, os.strerror(errno.ENOENT), self.fs_path 

1950 ) 

1951 

1952 if self._stat_cache is None: 1952 ↛ 1958line 1952 didn't jump to line 1958 because the condition on line 1952 was always true

1953 try: 

1954 self._stat_cache = os.lstat(self.fs_path) 

1955 except FileNotFoundError: 

1956 self._stat_failed_cache = True 

1957 raise 

1958 return self._stat_cache 

1959 

1960 @property 

1961 def mode(self) -> int: 

1962 return stat.S_IMODE(self.stat().st_mode) 

1963 

1964 @mode.setter 

1965 def mode(self, _unused: int) -> None: 

1966 self._error_ro_fs() 

1967 

1968 @property 

1969 def mtime(self) -> float: 

1970 return self.stat().st_mtime 

1971 

1972 @mtime.setter 

1973 def mtime(self, new_mtime: float) -> None: 

1974 self._error_ro_fs() 

1975 

1976 def readlink(self) -> str: 

1977 if not self.is_symlink: 

1978 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

1979 if self._readlink_cache is None: 

1980 self._readlink_cache = os.readlink(self.fs_path) 

1981 return self._readlink_cache 

1982 

1983 def chown( 

1984 self, 

1985 owner: Optional[StaticFileSystemOwner], 

1986 group: Optional[StaticFileSystemGroup], 

1987 ) -> None: 

1988 self._error_ro_fs() 

1989 

1990 def mkdir(self, name: str) -> "VirtualPath": 

1991 self._error_ro_fs() 

1992 

1993 def add_file( 

1994 self, 

1995 name: str, 

1996 *, 

1997 unlink_if_exists: bool = True, 

1998 use_fs_path_mode: bool = False, 

1999 mode: int = 0o0644, 

2000 mtime: Optional[float] = None, 

2001 ) -> ContextManager["VirtualPath"]: 

2002 self._error_ro_fs() 

2003 

2004 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2005 self._error_ro_fs() 

2006 

2007 def unlink(self, *, recursive: bool = False) -> None: 

2008 self._error_ro_fs() 

2009 

2010 

2011class FSROOverlayRootDir(FSROOverlay): 

2012 __slots__ = ("_plugin_context",) 

2013 

2014 def __init__(self, path: str, fs_path: str) -> None: 

2015 super().__init__(path, fs_path, None) 

2016 self._plugin_context = CurrentPluginContextManager("debputy") 

2017 

2018 def _current_plugin(self) -> str: 

2019 return self._plugin_context.current_plugin_name 

2020 

2021 @contextlib.contextmanager 

2022 def change_plugin_context(self, new_plugin: str) -> Iterator[str]: 

2023 with self._plugin_context.change_plugin_context(new_plugin) as r: 

2024 yield r 

2025 

2026 

2027class FSControlPath(FSOverlayBase["FSControlPath"]): 

2028 

2029 @property 

2030 def iterdir(self) -> Iterable["FSControlPath"]: 

2031 if not self.is_dir: 

2032 return 

2033 yield from self._resolve_children( 

2034 lambda n, fsp, p: FSControlPath(n, fsp, p) 

2035 ).values() 

2036 

2037 def lookup(self, path: str) -> Optional["FSControlPath"]: 

2038 if not self.is_dir: 

2039 return None 

2040 

2041 absolute, _, path_parts = _split_path(path) 

2042 current = cast("FSControlPath", _root(self)) if absolute else self 

2043 for no, dir_part in enumerate(path_parts): 

2044 if dir_part == ".": 

2045 continue 

2046 if dir_part == "..": 

2047 p = current.parent_dir 

2048 if p is None: 

2049 raise ValueError(f'The path "{path}" escapes the root dir') 

2050 current = cast("FSControlPath", p) 

2051 continue 

2052 try: 

2053 current = current[dir_part] 

2054 except KeyError: 

2055 return None 

2056 return current 

2057 

2058 @property 

2059 def is_detached(self) -> bool: 

2060 try: 

2061 self.stat() 

2062 except FileNotFoundError: 

2063 return True 

2064 else: 

2065 return False 

2066 

2067 def __getitem__(self, key) -> "VirtualPath": 

2068 if not self.is_dir: 

2069 raise KeyError(key) 

2070 children = self._resolve_children(lambda n, fsp, p: FSControlPath(n, fsp, p)) 

2071 if isinstance(key, FSPath): 

2072 key = key.name 

2073 return children[key] 

2074 

2075 def __delitem__(self, key) -> None: 

2076 self[key].unlink() 

2077 

2078 @property 

2079 def is_read_write(self) -> bool: 

2080 return True 

2081 

2082 @property 

2083 def mode(self) -> int: 

2084 return stat.S_IMODE(self.stat().st_mode) 

2085 

2086 @mode.setter 

2087 def mode(self, new_mode: int) -> None: 

2088 os.chmod(self.fs_path, new_mode) 

2089 

2090 @property 

2091 def mtime(self) -> float: 

2092 return self.stat().st_mtime 

2093 

2094 @mtime.setter 

2095 def mtime(self, new_mtime: float) -> None: 

2096 os.utime(self.fs_path, (new_mtime, new_mtime)) 

2097 

2098 def readlink(self) -> str: 

2099 if not self.is_symlink: 

2100 raise TypeError(f"readlink is only valid for symlinks ({self.path!r})") 

2101 assert False 

2102 

2103 def chown( 

2104 self, 

2105 owner: Optional[StaticFileSystemOwner], 

2106 group: Optional[StaticFileSystemGroup], 

2107 ) -> None: 

2108 raise ValueError( 

2109 "No need to chown paths in the control.tar: They are always root:root" 

2110 ) 

2111 

2112 def mkdir(self, name: str) -> "VirtualPath": 

2113 raise TypeError("The control.tar never contains subdirectories.") 

2114 

2115 @contextlib.contextmanager 

2116 def add_file( 

2117 self, 

2118 name: str, 

2119 *, 

2120 unlink_if_exists: bool = True, 

2121 use_fs_path_mode: bool = False, 

2122 mode: int = 0o0644, 

2123 mtime: Optional[float] = None, 

2124 ) -> ContextManager["VirtualPath"]: 

2125 if "/" in name or name in {".", ".."}: 

2126 raise ValueError(f'Invalid file name: "{name}"') 

2127 if not self.is_dir: 

2128 raise TypeError( 

2129 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2130 f" {self._orphan_safe_path()} is not a directory" 

2131 ) 

2132 self._rw_check() 

2133 existing = self.get(name) 

2134 if existing is not None: 

2135 if not unlink_if_exists: 

2136 raise ValueError( 

2137 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2138 f" and exist_ok was False" 

2139 ) 

2140 assert existing.is_file 

2141 

2142 fs_path = os.path.join(self.fs_path, name) 

2143 # This truncates the existing file if any, so we do not have to unlink the previous entry. 

2144 with open(fs_path, "wb") as fd: 

2145 # Ensure that the fs_path exists and default mode is reasonable 

2146 os.chmod(fd.fileno(), mode) 

2147 child = FSControlPath( 

2148 name, 

2149 fs_path, 

2150 self, 

2151 ) 

2152 yield child 

2153 _check_fs_path_is_file(fs_path, unlink_on_error=child) 

2154 child.mode = mode 

2155 

2156 @contextlib.contextmanager 

2157 def replace_fs_path_content( 

2158 self, 

2159 *, 

2160 use_fs_path_mode: bool = False, 

2161 ) -> ContextManager[str]: 

2162 if not self.is_file: 

2163 raise TypeError( 

2164 f'Cannot replace contents of "{self._orphan_safe_path()}" as it is not a file' 

2165 ) 

2166 restore_mode = self.mode if use_fs_path_mode else None 

2167 yield self.fs_path 

2168 _check_fs_path_is_file(self.fs_path, self) 

2169 if restore_mode is not None: 

2170 self.mode = restore_mode 

2171 

2172 def add_symlink(self, link_name: str, link_target: str) -> "VirtualPath": 

2173 raise TypeError("The control.tar never contains symlinks.") 

2174 

2175 def unlink(self, *, recursive: bool = False) -> None: 

2176 if self._parent is None: 

2177 return 

2178 # By virtue of the control FS only containing paths, we can assume `recursive` never 

2179 # matters and that `os.unlink` will be sufficient. 

2180 assert self.is_file 

2181 os.unlink(self.fs_path) 

2182 

2183 

2184class FSControlRootDir(FSControlPath): 

2185 

2186 @classmethod 

2187 def create_root_dir(cls, fs_path: str) -> "FSControlRootDir": 

2188 return FSControlRootDir(".", fs_path, None) 

2189 

2190 def insert_file_from_fs_path( 

2191 self, 

2192 name: str, 

2193 fs_path: str, 

2194 *, 

2195 exist_ok: bool = True, 

2196 use_fs_path_mode: bool = False, 

2197 mode: int = 0o0644, 

2198 # Ignored, but accepted for compat with FSPath's variant of this function. 

2199 # - This is used by install_or_generate_conffiles. 

2200 reference_path: Optional[VirtualPath] = None, # noqa 

2201 ) -> "FSControlPath": 

2202 if "/" in name or name in {".", ".."}: 

2203 raise ValueError(f'Invalid file name: "{name}"') 

2204 if not self.is_dir: 

2205 raise TypeError( 

2206 f"Cannot create {self._orphan_safe_path()}/{name}:" 

2207 f" {self._orphan_safe_path()} is not a directory" 

2208 ) 

2209 self._rw_check() 

2210 if name in self and not exist_ok: 

2211 raise ValueError( 

2212 f'The path "{self._orphan_safe_path()}" already contains a file called "{name}"' 

2213 f" and exist_ok was False" 

2214 ) 

2215 

2216 target_path = os.path.join(self.fs_path, name) 

2217 if use_fs_path_mode: 

2218 shutil.copymode( 

2219 fs_path, 

2220 target_path, 

2221 follow_symlinks=True, 

2222 ) 

2223 else: 

2224 shutil.copyfile( 

2225 fs_path, 

2226 target_path, 

2227 follow_symlinks=True, 

2228 ) 

2229 os.chmod(target_path, mode) 

2230 return cast("FSControlPath", self[name]) 

2231 

2232 

2233def as_path_def(pd: Union[str, PathDef]) -> PathDef: 

2234 return PathDef(pd) if isinstance(pd, str) else pd 

2235 

2236 

2237def as_path_defs(paths: Iterable[Union[str, PathDef]]) -> Iterable[PathDef]: 

2238 yield from (as_path_def(p) for p in paths) 

2239 

2240 

2241def build_virtual_fs( 

2242 paths: Iterable[Union[str, PathDef]], 

2243 read_write_fs: bool = False, 

2244) -> "FSPath": 

2245 root_dir: Optional[FSRootDir] = None 

2246 directories: Dict[str, FSPath] = {} 

2247 non_directories = set() 

2248 

2249 def _ensure_parent_dirs(p: str) -> None: 

2250 current = p.rstrip("/") 

2251 missing_dirs = [] 

2252 while True: 

2253 current = os.path.dirname(current) 

2254 if current in directories: 

2255 break 

2256 if current in non_directories: 2256 ↛ 2257line 2256 didn't jump to line 2257 because the condition on line 2256 was never true

2257 raise ValueError( 

2258 f'Conflicting definition for "{current}". The path "{p}" wants it as a directory,' 

2259 ' but it is defined as a non-directory. (Ensure dirs end with "/")' 

2260 ) 

2261 missing_dirs.append(current) 

2262 for dir_path in reversed(missing_dirs): 

2263 parent_dir = directories[os.path.dirname(dir_path)] 

2264 d = VirtualTestPath(os.path.basename(dir_path), parent_dir, is_dir=True) 

2265 directories[dir_path] = d 

2266 

2267 for path_def in as_path_defs(paths): 

2268 path = path_def.path_name 

2269 if path in directories or path in non_directories: 2269 ↛ 2270line 2269 didn't jump to line 2270 because the condition on line 2269 was never true

2270 raise ValueError( 

2271 f'Duplicate definition of "{path}". Can be false positive if input is not in' 

2272 ' "correct order" (ensure directories occur before their children)' 

2273 ) 

2274 if root_dir is None: 

2275 root_fs_path = None 

2276 if path in (".", "./", "/"): 

2277 root_fs_path = path_def.fs_path 

2278 root_dir = FSRootDir(fs_path=root_fs_path) 

2279 directories["."] = root_dir 

2280 

2281 if path not in (".", "./", "/") and not path.startswith("./"): 

2282 path = "./" + path 

2283 if path not in (".", "./", "/"): 

2284 _ensure_parent_dirs(path) 

2285 if path in (".", "./"): 

2286 assert "." in directories 

2287 continue 

2288 is_dir = False 

2289 if path.endswith("/"): 

2290 path = path[:-1] 

2291 is_dir = True 

2292 directory = directories[os.path.dirname(path)] 

2293 assert not is_dir or not bool( 

2294 path_def.link_target 

2295 ), f"is_dir={is_dir} vs. link_target={path_def.link_target}" 

2296 fs_path = VirtualTestPath( 

2297 os.path.basename(path), 

2298 directory, 

2299 is_dir=is_dir, 

2300 mode=path_def.mode, 

2301 mtime=path_def.mtime, 

2302 has_fs_path=path_def.has_fs_path, 

2303 fs_path=path_def.fs_path, 

2304 link_target=path_def.link_target, 

2305 content=path_def.content, 

2306 materialized_content=path_def.materialized_content, 

2307 ) 

2308 assert not fs_path.is_detached 

2309 if fs_path.is_dir: 

2310 directories[fs_path.path] = fs_path 

2311 else: 

2312 non_directories.add(fs_path.path) 

2313 

2314 if root_dir is None: 

2315 root_dir = FSRootDir() 

2316 

2317 root_dir.is_read_write = read_write_fs 

2318 return root_dir