Coverage for src/debputy/intermediate_manifest.py: 61%
173 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import json
3import os
4import stat
5import sys
6import tarfile
7from enum import Enum
10from typing import Optional, List, Dict, Any, Union, Self, IO
11from collections.abc import Iterable, Mapping
13IntermediateManifest = list["TarMember"]
16class PathType(Enum):
17 FILE = ("file", tarfile.REGTYPE)
18 DIRECTORY = ("directory", tarfile.DIRTYPE)
19 SYMLINK = ("symlink", tarfile.SYMTYPE)
20 # TODO: Add hardlink, FIFO, Char device, BLK device, etc.
22 @property
23 def manifest_key(self) -> str:
24 return self.value[0]
26 @property
27 def tarinfo_type(self) -> bytes:
28 return self.value[1]
30 @property
31 def can_be_virtual(self) -> bool:
32 return self in (PathType.DIRECTORY, PathType.SYMLINK)
35KEY2PATH_TYPE = {pt.manifest_key: pt for pt in PathType}
38def _dirname(path: str) -> str:
39 path = path.rstrip("/")
40 if path == ".": 40 ↛ 42line 40 didn't jump to line 42 because the condition on line 40 was always true
41 return path
42 return os.path.dirname(path)
45def _fs_type_from_st_mode(fs_path: str, st_mode: int) -> PathType:
46 if stat.S_ISREG(st_mode):
47 path_type = PathType.FILE
48 elif stat.S_ISDIR(st_mode):
49 path_type = PathType.DIRECTORY
50 # elif stat.S_ISFIFO(st_result):
51 # type = FIFOTYPE
52 elif stat.S_ISLNK(st_mode):
53 raise ValueError(
54 "Symlinks should have been rewritten to use the virtual rule."
55 " Otherwise, the link would not be normalized according to Debian Policy."
56 )
57 # elif stat.S_ISCHR(st_result):
58 # type = CHRTYPE
59 # elif stat.S_ISBLK(st_result):
60 # type = BLKTYPE
61 else:
62 raise ValueError(
63 f"The path {fs_path} had an unsupported/unknown file type."
64 f" Probably a bug in the tool"
65 )
66 return path_type
69@dataclasses.dataclass(slots=True)
70class TarMember:
71 member_path: str
72 path_type: PathType
73 fs_path: str | None
74 mode: int
75 owner: str
76 uid: int
77 group: str
78 gid: int
79 mtime: float
80 link_target: str = ""
81 is_virtual_entry: bool = False
82 may_steal_fs_path: bool = False
84 def create_tar_info(self, tar_fd: tarfile.TarFile) -> tarfile.TarInfo:
85 tar_info: tarfile.TarInfo
86 if self.is_virtual_entry:
87 assert self.path_type.can_be_virtual
88 tar_info = tar_fd.tarinfo(self.member_path)
89 tar_info.size = 0
90 tar_info.type = self.path_type.tarinfo_type
91 tar_info.linkpath = self.link_target
92 else:
93 try:
94 tar_info = tar_fd.gettarinfo(
95 name=self.fs_path, arcname=self.member_path
96 )
97 except (TypeError, ValueError) as e:
98 raise ValueError(
99 f"Unable to prepare tar info for {self.member_path}"
100 ) from e
101 # TODO: Eventually, we should be able to unconditionally rely on link_target. However,
102 # until we got symlinks and hardlinks correctly done in the JSON generator, it will be
103 # conditional for now.
104 if self.link_target != "": 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 tar_info.linkpath = self.link_target
106 tar_info.mode = self.mode
107 tar_info.uname = self.owner
108 tar_info.uid = self.uid
109 tar_info.gname = self.group
110 tar_info.gid = self.gid
111 tar_info.mode = self.mode
112 tar_info.mtime = int(self.mtime)
114 return tar_info
116 @classmethod
117 def from_file(
118 cls,
119 member_path: str,
120 fs_path: str,
121 mode: int | None = None,
122 owner: str = "root",
123 uid: int = 0,
124 group: str = "root",
125 gid: int = 0,
126 path_mtime: float | int | None = None,
127 clamp_mtime_to: int | None = None,
128 path_type: PathType | None = None,
129 may_steal_fs_path: bool = False,
130 ) -> "TarMember":
131 # Avoid lstat'ing if we can as it makes it easier to do tests of the code
132 # (as we do not need an existing physical fs path)
133 if path_type is None or path_mtime is None or mode is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 st_result = os.lstat(fs_path)
135 st_mode = st_result.st_mode
136 if mode is None:
137 mode = st_mode
138 if path_mtime is None:
139 path_mtime = st_result.st_mtime
140 if path_type is None:
141 path_type = _fs_type_from_st_mode(fs_path, st_mode)
143 if clamp_mtime_to is not None and path_mtime > clamp_mtime_to: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 path_mtime = clamp_mtime_to
146 if may_steal_fs_path: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 assert (
148 "debputy/scratch-dir/" in fs_path
149 ), f"{fs_path} should not have been stealable"
151 return cls(
152 member_path=member_path,
153 path_type=path_type,
154 fs_path=fs_path,
155 mode=mode,
156 owner=owner,
157 uid=uid,
158 group=group,
159 gid=gid,
160 mtime=float(path_mtime),
161 is_virtual_entry=False,
162 may_steal_fs_path=may_steal_fs_path,
163 )
165 @classmethod
166 def virtual_path(
167 cls,
168 member_path: str,
169 path_type: PathType,
170 mtime: float,
171 mode: int,
172 link_target: str = "",
173 owner: str = "root",
174 uid: int = 0,
175 group: str = "root",
176 gid: int = 0,
177 ) -> Self:
178 if not path_type.can_be_virtual: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise ValueError(f"The path type {path_type.name} cannot be virtual")
180 if (path_type == PathType.SYMLINK) ^ bool(link_target): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 if not link_target:
182 raise ValueError("Symlinks must have a link target")
183 # TODO: Dear future programmer. Hardlinks will appear here some day and you will have to fix this
184 # code then!
185 raise ValueError("Non-symlinks must not have a link target")
186 return cls(
187 member_path=member_path,
188 path_type=path_type,
189 fs_path=None,
190 link_target=link_target,
191 mode=mode,
192 owner=owner,
193 uid=uid,
194 group=group,
195 gid=gid,
196 mtime=mtime,
197 is_virtual_entry=True,
198 )
200 def clone_and_replace(self, /, **changes: Any) -> "TarMember":
201 return dataclasses.replace(self, **changes)
203 def to_manifest(self) -> dict[str, Any]:
204 d = dataclasses.asdict(self)
205 try:
206 d["mode"] = oct(self.mode)
207 except (TypeError, ValueError) as e:
208 raise TypeError(f"Bad mode in TarMember {self.member_path}") from e
209 d["path_type"] = self.path_type.manifest_key
210 # "compress" the output by removing redundant fields
211 if self.link_target is None or self.link_target == "": 211 ↛ 213line 211 didn't jump to line 213 because the condition on line 211 was always true
212 del d["link_target"]
213 if self.is_virtual_entry: 213 ↛ 217line 213 didn't jump to line 217 because the condition on line 213 was always true
214 assert self.fs_path is None
215 del d["fs_path"]
216 else:
217 del d["is_virtual_entry"]
218 return d
220 @classmethod
221 def parse_intermediate_manifest(cls, manifest_path: str) -> IntermediateManifest:
222 directories = {"."}
223 if manifest_path == "-": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 with sys.stdin as fd:
225 data = json.load(fd)
226 contents = [TarMember.from_dict(m) for m in data]
227 else:
228 with open(manifest_path) as fd:
229 data = json.load(fd)
230 contents = [TarMember.from_dict(m) for m in data]
231 if not contents: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 raise ValueError(
233 "Empty manifest (note that the root directory should always be present"
234 )
235 if contents[0].member_path != "./": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise ValueError('The first member must always be the root directory "./"')
237 for tar_member in contents:
238 directory = _dirname(tar_member.member_path)
239 if directory not in directories: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 raise ValueError(
241 f'The path "{tar_member.member_path}" came before the directory it is in (or the path'
242 f" is not a directory). Either way leads to a broken deb."
243 )
244 if tar_member.path_type == PathType.DIRECTORY: 244 ↛ 237line 244 didn't jump to line 237 because the condition on line 244 was always true
245 directories.add(tar_member.member_path.rstrip("/"))
246 return contents
248 @classmethod
249 def from_dict(cls, d: Any) -> "TarMember":
250 member_path = d["member_path"]
251 raw_mode = d["mode"]
252 if not raw_mode.startswith("0o"): 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 raise ValueError(f"Bad mode for {member_path}")
254 is_virtual_entry = d.get("is_virtual_entry") or False
255 path_type = KEY2PATH_TYPE[d["path_type"]]
256 fs_path = d.get("fs_path")
257 mode = int(raw_mode[2:], 8)
258 if is_virtual_entry: 258 ↛ 269line 258 didn't jump to line 269 because the condition on line 258 was always true
259 if not path_type.can_be_virtual: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 raise ValueError(
261 f"Bad file type or is_virtual_entry for {d['member_path']}."
262 " The file type cannot be virtual"
263 )
264 if fs_path is not None: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 raise ValueError(
266 f'Invalid declaration for "{member_path}".'
267 " The path is listed as a virtual entry but has a file system path"
268 )
269 elif fs_path is None:
270 raise ValueError(
271 f'Invalid declaration for "{member_path}".'
272 " The path is neither a virtual path nor does it have a file system path!"
273 )
274 if path_type == PathType.DIRECTORY and not member_path.endswith("/"): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 raise ValueError(
276 f'Invalid declaration for "{member_path}".'
277 " The path is listed as a directory but does not end with a slash"
278 )
280 link_target = d.get("link_target")
281 if path_type == PathType.SYMLINK: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 if mode != 0o777:
283 raise ValueError(
284 f'Invalid declaration for "{member_path}".'
285 f" Symlinks must have mode 0o0777, got {oct(mode)[2:]}."
286 )
287 if not link_target:
288 raise ValueError(
289 f'Invalid declaration for "{member_path}".'
290 " Symlinks must have a link_target"
291 )
292 elif link_target is not None and link_target != "": 292 ↛ 294line 292 didn't jump to line 294 because the condition on line 292 was never true
293 # TODO: Eventually hardlinks should have them too. But that is a problem for a future programmer
294 raise ValueError(
295 f'Invalid declaration for "{member_path}".'
296 " Only symlinks can have a link_target"
297 )
298 else:
299 link_target = ""
300 may_steal_fs_path = d.get("may_steal_fs_path") or False
302 if may_steal_fs_path: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 assert (
304 "debputy/scratch-dir/" in fs_path
305 ), f"{fs_path} should not have been stealable"
306 return cls(
307 member_path=member_path,
308 path_type=path_type,
309 fs_path=fs_path,
310 mode=mode,
311 owner=d["owner"],
312 uid=d["uid"],
313 group=d["group"],
314 gid=d["gid"],
315 mtime=float(d["mtime"]),
316 link_target=link_target,
317 is_virtual_entry=is_virtual_entry,
318 may_steal_fs_path=may_steal_fs_path,
319 )
322def output_intermediate_manifest(
323 manifest_output_file: str,
324 members: Iterable[TarMember],
325) -> None:
326 with open(manifest_output_file, "w") as fd:
327 output_intermediate_manifest_to_fd(fd, members)
330def output_intermediate_manifest_to_fd(
331 fd: IO[str], members: Iterable[TarMember]
332) -> None:
333 serial_format = [m.to_manifest() for m in members]
334 json.dump(serial_format, fd)