Coverage for src/debputy/intermediate_manifest.py: 61%

173 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import json 

3import os 

4import stat 

5import sys 

6import tarfile 

7from enum import Enum 

8 

9 

10from typing import Optional, List, Dict, Any, Union, Self, IO 

11from collections.abc import Iterable, Mapping 

12 

13IntermediateManifest = list["TarMember"] 

14 

15 

16class PathType(Enum): 

17 FILE = ("file", tarfile.REGTYPE) 

18 DIRECTORY = ("directory", tarfile.DIRTYPE) 

19 SYMLINK = ("symlink", tarfile.SYMTYPE) 

20 # TODO: Add hardlink, FIFO, Char device, BLK device, etc. 

21 

22 @property 

23 def manifest_key(self) -> str: 

24 return self.value[0] 

25 

26 @property 

27 def tarinfo_type(self) -> bytes: 

28 return self.value[1] 

29 

30 @property 

31 def can_be_virtual(self) -> bool: 

32 return self in (PathType.DIRECTORY, PathType.SYMLINK) 

33 

34 

35KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType} 

36 

37 

38def _dirname(path: str) -> str: 

39 path = path.rstrip("/") 

40 if path == ".": 40 ↛ 42line 40 didn't jump to line 42 because the condition on line 40 was always true

41 return path 

42 return os.path.dirname(path) 

43 

44 

45def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType: 

46 if stat.S_ISREG(st_mode): 

47 path_type = PathType.FILE 

48 elif stat.S_ISDIR(st_mode): 

49 path_type = PathType.DIRECTORY 

50 # elif stat.S_ISFIFO(st_result): 

51 # type = FIFOTYPE 

52 elif stat.S_ISLNK(st_mode): 

53 raise ValueError( 

54 "Symlinks should have been rewritten to use the virtual rule." 

55 " Otherwise, the link would not be normalized according to Debian Policy." 

56 ) 

57 # elif stat.S_ISCHR(st_result): 

58 # type = CHRTYPE 

59 # elif stat.S_ISBLK(st_result): 

60 # type = BLKTYPE 

61 else: 

62 raise ValueError( 

63 f"The path {fs_path} had an unsupported/unknown file type." 

64 f" Probably a bug in the tool" 

65 ) 

66 return path_type 

67 

68 

69@dataclasses.dataclass(slots=True) 

70class TarMember: 

71 member_path: str 

72 path_type: PathType 

73 fs_path: str | None 

74 mode: int 

75 owner: str 

76 uid: int 

77 group: str 

78 gid: int 

79 mtime: float 

80 link_target: str = "" 

81 is_virtual_entry: bool = False 

82 may_steal_fs_path: bool = False 

83 

84 def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo: 

85 tar_info: tarfile.TarInfo 

86 if self.is_virtual_entry: 

87 assert self.path_type.can_be_virtual 

88 tar_info = tar_fd.tarinfo(self.member_path) 

89 tar_info.size = 0 

90 tar_info.type = self.path_type.tarinfo_type 

91 tar_info.linkpath = self.link_target 

92 else: 

93 try: 

94 tar_info = tar_fd.gettarinfo( 

95 name=self.fs_path, arcname=self.member_path 

96 ) 

97 except (TypeError, ValueError) as e: 

98 raise ValueError( 

99 f"Unable to prepare tar info for {self.member_path}" 

100 ) from e 

101 # TODO: Eventually, we should be able to unconditionally rely on link_target. However, 

102 # until we got symlinks and hardlinks correctly done in the JSON generator, it will be 

103 # conditional for now. 

104 if self.link_target != "": 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 tar_info.linkpath = self.link_target 

106 tar_info.mode = self.mode 

107 tar_info.uname = self.owner 

108 tar_info.uid = self.uid 

109 tar_info.gname = self.group 

110 tar_info.gid = self.gid 

111 tar_info.mode = self.mode 

112 tar_info.mtime = int(self.mtime) 

113 

114 return tar_info 

115 

116 @classmethod 

117 def from_file( 

118 cls, 

119 member_path: str, 

120 fs_path: str, 

121 mode: int | None = None, 

122 owner: str = "root", 

123 uid: int = 0, 

124 group: str = "root", 

125 gid: int = 0, 

126 path_mtime: float | int | None = None, 

127 clamp_mtime_to: int | None = None, 

128 path_type: PathType | None = None, 

129 may_steal_fs_path: bool = False, 

130 ) -> "TarMember": 

131 # Avoid lstat'ing if we can as it makes it easier to do tests of the code 

132 # (as we do not need an existing physical fs path) 

133 if path_type is None or path_mtime is None or mode is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 st_result = os.lstat(fs_path) 

135 st_mode = st_result.st_mode 

136 if mode is None: 

137 mode = st_mode 

138 if path_mtime is None: 

139 path_mtime = st_result.st_mtime 

140 if path_type is None: 

141 path_type = _fs_type_from_st_mode(fs_path, st_mode) 

142 

143 if clamp_mtime_to is not None and path_mtime > clamp_mtime_to: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 path_mtime = clamp_mtime_to 

145 

146 if may_steal_fs_path: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 assert ( 

148 "debputy/scratch-dir/" in fs_path 

149 ), f"{fs_path} should not have been stealable" 

150 

151 return cls( 

152 member_path=member_path, 

153 path_type=path_type, 

154 fs_path=fs_path, 

155 mode=mode, 

156 owner=owner, 

157 uid=uid, 

158 group=group, 

159 gid=gid, 

160 mtime=float(path_mtime), 

161 is_virtual_entry=False, 

162 may_steal_fs_path=may_steal_fs_path, 

163 ) 

164 

165 @classmethod 

166 def virtual_path( 

167 cls, 

168 member_path: str, 

169 path_type: PathType, 

170 mtime: float, 

171 mode: int, 

172 link_target: str = "", 

173 owner: str = "root", 

174 uid: int = 0, 

175 group: str = "root", 

176 gid: int = 0, 

177 ) -> Self: 

178 if not path_type.can_be_virtual: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise ValueError(f"The path type {path_type.name} cannot be virtual") 

180 if (path_type == PathType.SYMLINK) ^ bool(link_target): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 if not link_target: 

182 raise ValueError("Symlinks must have a link target") 

183 # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this 

184 # code then! 

185 raise ValueError("Non-symlinks must not have a link target") 

186 return cls( 

187 member_path=member_path, 

188 path_type=path_type, 

189 fs_path=None, 

190 link_target=link_target, 

191 mode=mode, 

192 owner=owner, 

193 uid=uid, 

194 group=group, 

195 gid=gid, 

196 mtime=mtime, 

197 is_virtual_entry=True, 

198 ) 

199 

200 def clone_and_replace(self, /, **changes: Any) -> "TarMember": 

201 return dataclasses.replace(self, **changes) 

202 

203 def to_manifest(self) -> dict[str, Any]: 

204 d = dataclasses.asdict(self) 

205 try: 

206 d["mode"] = oct(self.mode) 

207 except (TypeError, ValueError) as e: 

208 raise TypeError(f"Bad mode in TarMember {self.member_path}") from e 

209 d["path_type"] = self.path_type.manifest_key 

210 # "compress" the output by removing redundant fields 

211 if self.link_target is None or self.link_target == "": 211 ↛ 213line 211 didn't jump to line 213 because the condition on line 211 was always true

212 del d["link_target"] 

213 if self.is_virtual_entry: 213 ↛ 217line 213 didn't jump to line 217 because the condition on line 213 was always true

214 assert self.fs_path is None 

215 del d["fs_path"] 

216 else: 

217 del d["is_virtual_entry"] 

218 return d 

219 

220 @classmethod 

221 def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest: 

222 directories = {"."} 

223 if manifest_path == "-": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 with sys.stdin as fd: 

225 data = json.load(fd) 

226 contents = [TarMember.from_dict(m) for m in data] 

227 else: 

228 with open(manifest_path) as fd: 

229 data = json.load(fd) 

230 contents = [TarMember.from_dict(m) for m in data] 

231 if not contents: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 raise ValueError( 

233 "Empty manifest (note that the root directory should always be present" 

234 ) 

235 if contents[0].member_path != "./": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 raise ValueError('The first member must always be the root directory "./"') 

237 for tar_member in contents: 

238 directory = _dirname(tar_member.member_path) 

239 if directory not in directories: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 raise ValueError( 

241 f'The path "{tar_member.member_path}" came before the directory it is in (or the path' 

242 f" is not a directory). Either way leads to a broken deb." 

243 ) 

244 if tar_member.path_type == PathType.DIRECTORY: 244 ↛ 237line 244 didn't jump to line 237 because the condition on line 244 was always true

245 directories.add(tar_member.member_path.rstrip("/")) 

246 return contents 

247 

248 @classmethod 

249 def from_dict(cls, d: Any) -> "TarMember": 

250 member_path = d["member_path"] 

251 raw_mode = d["mode"] 

252 if not raw_mode.startswith("0o"): 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 raise ValueError(f"Bad mode for {member_path}") 

254 is_virtual_entry = d.get("is_virtual_entry") or False 

255 path_type = KEY2PATH_TYPE[d["path_type"]] 

256 fs_path = d.get("fs_path") 

257 mode = int(raw_mode[2:], 8) 

258 if is_virtual_entry: 258 ↛ 269line 258 didn't jump to line 269 because the condition on line 258 was always true

259 if not path_type.can_be_virtual: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 raise ValueError( 

261 f"Bad file type or is_virtual_entry for {d['member_path']}." 

262 " The file type cannot be virtual" 

263 ) 

264 if fs_path is not None: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 raise ValueError( 

266 f'Invalid declaration for "{member_path}".' 

267 " The path is listed as a virtual entry but has a file system path" 

268 ) 

269 elif fs_path is None: 

270 raise ValueError( 

271 f'Invalid declaration for "{member_path}".' 

272 " The path is neither a virtual path nor does it have a file system path!" 

273 ) 

274 if path_type == PathType.DIRECTORY and not member_path.endswith("/"): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 raise ValueError( 

276 f'Invalid declaration for "{member_path}".' 

277 " The path is listed as a directory but does not end with a slash" 

278 ) 

279 

280 link_target = d.get("link_target") 

281 if path_type == PathType.SYMLINK: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 if mode != 0o777: 

283 raise ValueError( 

284 f'Invalid declaration for "{member_path}".' 

285 f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}." 

286 ) 

287 if not link_target: 

288 raise ValueError( 

289 f'Invalid declaration for "{member_path}".' 

290 " Symlinks must have a link_target" 

291 ) 

292 elif link_target is not None and link_target != "": 292 ↛ 294line 292 didn't jump to line 294 because the condition on line 292 was never true

293 # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer 

294 raise ValueError( 

295 f'Invalid declaration for "{member_path}".' 

296 " Only symlinks can have a link_target" 

297 ) 

298 else: 

299 link_target = "" 

300 may_steal_fs_path = d.get("may_steal_fs_path") or False 

301 

302 if may_steal_fs_path: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 assert ( 

304 "debputy/scratch-dir/" in fs_path 

305 ), f"{fs_path} should not have been stealable" 

306 return cls( 

307 member_path=member_path, 

308 path_type=path_type, 

309 fs_path=fs_path, 

310 mode=mode, 

311 owner=d["owner"], 

312 uid=d["uid"], 

313 group=d["group"], 

314 gid=d["gid"], 

315 mtime=float(d["mtime"]), 

316 link_target=link_target, 

317 is_virtual_entry=is_virtual_entry, 

318 may_steal_fs_path=may_steal_fs_path, 

319 ) 

320 

321 

322def output_intermediate_manifest( 

323 manifest_output_file: str, 

324 members: Iterable[TarMember], 

325) -> None: 

326 with open(manifest_output_file, "w") as fd: 

327 output_intermediate_manifest_to_fd(fd, members) 

328 

329 

330def output_intermediate_manifest_to_fd( 

331 fd: IO[str], members: Iterable[TarMember] 

332) -> None: 

333 serial_format = [m.to_manifest() for m in members] 

334 json.dump(serial_format, fd)