Coverage for src/debputy/plugins/debputy/package_processors.py: 54%

173 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-05-11 16:06 +0000

1import contextlib 

2import functools 

3import gzip 

4import os 

5import re 

6import subprocess 

7from contextlib import ExitStack 

8from typing import IO, Any 

9from collections.abc import Iterator, Callable 

10 

11from debputy.plugin.api import VirtualPath 

12from debputy.util import ( 

13 _error, 

14 xargs, 

15 escape_shell, 

16 _info, 

17 assume_not_none, 

18 _debug_log, 

19) 

20 

21 

22@contextlib.contextmanager 

23def _open_maybe_gzip(path: VirtualPath) -> Iterator[IO[bytes] | gzip.GzipFile]: 

24 if path.name.endswith(".gz"): 

25 with gzip.GzipFile(path.fs_path, "rb") as fd: 

26 yield fd 

27 else: 

28 with path.open(byte_io=True) as fd: 

29 yield fd 

30 

31 

32_SO_LINK_RE = re.compile(rb"[.]so\s+(.*)\s*") 

33_LA_DEP_LIB_RE = re.compile(rb"'.+'") 

34 

35 

36def _detect_so_link(path: VirtualPath) -> str | None: 

37 so_link_re = _SO_LINK_RE 

38 with _open_maybe_gzip(path) as fd: 

39 for line in fd: 

40 m = so_link_re.search(line) 

41 if m: 

42 return m.group(1).decode("utf-8") 

43 return None 

44 

45 

46def _replace_with_symlink(path: VirtualPath, so_link_target: str) -> None: 

47 adjusted_target = so_link_target 

48 parent_dir = path.parent_dir 

49 assert parent_dir is not None # For the type checking 

50 if parent_dir.name == os.path.dirname(adjusted_target): 

51 # Avoid man8/../man8/foo links 

52 adjusted_target = os.path.basename(adjusted_target) 

53 elif "/" in so_link_target: 

54 # symlinks and so links have a different base directory when the link has a "/". 

55 # Adjust with an extra "../" to align the result 

56 adjusted_target = "../" + adjusted_target 

57 

58 path.unlink() 

59 parent_dir.add_symlink(path.name, adjusted_target) 

60 

61 

62@functools.lru_cache(1) 

63def _has_man_recode() -> bool: 

64 # Ideally, we would just use shutil.which or something like that. 

65 # Unfortunately, in debhelper, we experienced problems with which 

66 # returning "yes" for a man tool that actually could not be run 

67 # on salsa CI. 

68 # 

69 # Therefore, we adopt the logic of dh_installman to run the tool 

70 # with --help to confirm it is not broken, because no one could 

71 # figure out what happened in the salsa CI and my life is still 

72 # too short to figure it out. 

73 try: 

74 subprocess.check_call( 

75 ["man-recode", "--help"], 

76 stdin=subprocess.DEVNULL, 

77 stdout=subprocess.DEVNULL, 

78 stderr=subprocess.DEVNULL, 

79 restore_signals=True, 

80 ) 

81 except subprocess.CalledProcessError: 

82 return False 

83 return True 

84 

85 

86def process_manpages(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None: 

87 man_dir = fs_root.lookup("./usr/share/man") 

88 if not man_dir: 

89 return 

90 

91 re_encode = [] 

92 for path in (p for p in man_dir.all_paths() if p.is_file and p.has_fs_path): 

93 size = path.size 

94 if size == 0: 

95 continue 

96 so_link_target = None 

97 if size <= 1024: 

98 # debhelper has a 1024 byte guard on the basis that ".so file tend to be small". 

99 # That guard worked well for debhelper, so lets keep it for now on that basis alone. 

100 so_link_target = _detect_so_link(path) 

101 if so_link_target: 

102 _replace_with_symlink(path, so_link_target) 

103 else: 

104 re_encode.append(path) 

105 

106 if not re_encode or not _has_man_recode(): 

107 return 

108 

109 with ExitStack() as manager: 

110 manpages = [ 

111 manager.enter_context(p.replace_fs_path_content()) for p in re_encode 

112 ] 

113 static_cmd = ["man-recode", "--to-code", "UTF-8", "--suffix", ".encoded"] 

114 for cmd in xargs(static_cmd, manpages): 

115 _info(f"Ensuring manpages have utf-8 encoding via: {escape_shell(*cmd)}") 

116 try: 

117 subprocess.check_call( 

118 cmd, 

119 stdin=subprocess.DEVNULL, 

120 restore_signals=True, 

121 ) 

122 except subprocess.CalledProcessError: 

123 _error( 

124 "The man-recode process failed. Please review the output of `man-recode` to understand" 

125 " what went wrong." 

126 ) 

127 for manpage in manpages: 

128 dest_name = manpage 

129 if dest_name.endswith(".gz"): 

130 encoded_name = dest_name[:-3] + ".encoded" 

131 with open(dest_name, "wb") as out: 

132 _debug_log( 

133 f"Recompressing {dest_name} via gzip -9nc {escape_shell(encoded_name)}" 

134 ) 

135 try: 

136 subprocess.check_call( 

137 [ 

138 "gzip", 

139 "-9nc", 

140 encoded_name, 

141 ], 

142 stdin=subprocess.DEVNULL, 

143 stdout=out, 

144 ) 

145 except subprocess.CalledProcessError: 

146 _error( 

147 f"The command {escape_shell('gzip', '-nc', f'{encoded_name}')} > {dest_name} failed!" 

148 ) 

149 else: 

150 os.rename(f"{dest_name}.encoded", manpage) 

151 

152 

153def _filter_compress_paths() -> Callable[[VirtualPath], Iterator[VirtualPath]]: 

154 ignore_dir_basenames = { 

155 "_sources", 

156 } 

157 ignore_basenames = { 

158 ".htaccess", 

159 "index.sgml", 

160 "objects.inv", 

161 "search_index.json", 

162 "copyright", 

163 } 

164 ignore_extensions = { 

165 ".htm", 

166 ".html", 

167 ".xhtml", 

168 ".gif", 

169 ".png", 

170 ".jpg", 

171 ".jpeg", 

172 ".gz", 

173 ".taz", 

174 ".tgz", 

175 ".z", 

176 ".bz2", 

177 ".epub", 

178 ".jar", 

179 ".zip", 

180 ".odg", 

181 ".odp", 

182 ".odt", 

183 ".css", 

184 ".xz", 

185 ".lz", 

186 ".lzma", 

187 ".haddock", 

188 ".hs", 

189 ".woff", 

190 ".woff2", 

191 ".svg", 

192 ".svgz", 

193 ".js", 

194 ".devhelp2", 

195 ".map", # Technically, dh_compress has this one case-sensitive 

196 ".rda", 

197 ".rdata", 

198 ".rds", 

199 } 

200 ignore_special_cases = ("-gz", "-z", "_z") 

201 

202 def _filtered_walk(path: VirtualPath) -> Iterator[VirtualPath]: 

203 for path, children in path.walk(): 

204 if path.name in ignore_dir_basenames: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 children.clear() 

206 continue 

207 if path.is_dir and path.name == "examples": 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was never true

208 # Ignore anything beneath /usr/share/doc/*/examples 

209 parent = path.parent_dir 

210 grand_parent = parent.parent_dir if parent else None 

211 if grand_parent and grand_parent.absolute == "/usr/share/doc": 

212 children.clear() 

213 continue 

214 name = path.name 

215 if ( 

216 path.is_symlink 

217 or not path.is_file 

218 or name in ignore_basenames 

219 or not path.has_fs_path 

220 ): 

221 continue 

222 

223 name_lc = name.lower() 

224 _, ext = os.path.splitext(name_lc) 

225 

226 if ext in ignore_extensions or name_lc.endswith(ignore_special_cases): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 continue 

228 yield path 

229 

230 return _filtered_walk 

231 

232 

233def _find_compressable_paths(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

234 path_filter = _filter_compress_paths() 

235 

236 for p, compress_size_threshold in ( 

237 ("./usr/share/info", 0), 

238 ("./usr/share/man", 0), 

239 ("./usr/share/doc", 4096), 

240 ): 

241 path = fs_root.lookup(p) 

242 if path is None: 

243 continue 

244 paths = path_filter(path) 

245 if compress_size_threshold: 245 ↛ 248line 245 didn't jump to line 248 because the condition on line 245 was never true

246 # The special-case for changelog and NEWS is from dh_compress. Generally these files 

247 # have always been compressed regardless of their size. 

248 paths = ( 

249 p 

250 for p in paths 

251 if p.size > compress_size_threshold 

252 # Case-insensitivity is a known delta from `dh_compress` at the time of writing. 

253 or p.name.lower().startswith(("changelog", "news")) 

254 ) 

255 yield from paths 

256 x11_path = fs_root.lookup("./usr/share/fonts/X11") 

257 if x11_path: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 yield from ( 

259 p for p in x11_path.all_paths() if p.is_file and p.name.endswith(".pcf") 

260 ) 

261 

262 

263def apply_compression(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None: 

264 # TODO: Support hardlinks 

265 compressed_files: dict[str, str] = {} 

266 for path in _find_compressable_paths(fs_root): 

267 parent_dir = assume_not_none(path.parent_dir) 

268 with ( 

269 parent_dir.add_file(f"{path.name}.gz", mtime=path.mtime) as new_file, 

270 open(new_file.fs_path, "wb") as fd, 

271 ): 

272 try: 

273 subprocess.check_call(["gzip", "-9nc", path.fs_path], stdout=fd) 

274 except subprocess.CalledProcessError: 

275 full_command = f"gzip -9nc {escape_shell(path.fs_path)} > {escape_shell(new_file.fs_path)}" 

276 _error( 

277 f"The compression of {path.path} failed. Please review the error message from gzip to" 

278 f" understand what went wrong. Full command was: {full_command}" 

279 ) 

280 compressed_files[path.path] = new_file.path 

281 del parent_dir[path.name] 

282 

283 all_remaining_symlinks = {p.path: p for p in fs_root.all_paths() if p.is_symlink} 

284 changed = True 

285 while changed: 

286 changed = False 

287 remaining: list[VirtualPath] = list(all_remaining_symlinks.values()) 

288 for symlink in remaining: 

289 target = symlink.readlink() 

290 dir_target, basename_target = os.path.split(target) 

291 new_basename_target = f"{basename_target}.gz" 

292 symlink_parent_dir = assume_not_none(symlink.parent_dir) 

293 dir_path = ( 

294 symlink_parent_dir.lookup(dir_target) 

295 if dir_target 

296 else symlink_parent_dir 

297 ) 

298 if ( 298 ↛ 303line 298 didn't jump to line 303 because the condition on line 298 was never true

299 not dir_path 

300 or basename_target in dir_path 

301 or new_basename_target not in dir_path 

302 ): 

303 continue 

304 del all_remaining_symlinks[symlink.path] 

305 changed = True 

306 

307 new_link_name = ( 

308 f"{symlink.name}.gz" 

309 if not symlink.name.endswith(".gz") 

310 else symlink.name 

311 ) 

312 symlink_parent_dir.add_symlink( 

313 new_link_name, os.path.join(dir_target, new_basename_target) 

314 ) 

315 symlink.unlink() 

316 

317 

318def _la_files(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

319 lib_dir = fs_root.lookup("/usr/lib") 

320 if not lib_dir: 

321 return 

322 # Original code only iterators directly in /usr/lib. To be a faithful conversion, we do the same 

323 # here. 

324 # Eagerly resolve the list as the replacement can trigger a runtime error otherwise 

325 paths = list(lib_dir.iterdir()) 

326 yield from (p for p in paths if p.is_file and p.name.endswith(".la")) 

327 

328 

329# Conceptually, the same feature that dh_gnome provides. 

330# The clean_la_files function based on the dh_gnome version written by Luca Falavigna in 2010, 

331# who in turn references a Makefile version of the feature. 

332# https://salsa.debian.org/gnome-team/gnome-pkg-tools/-/commit/2868e1e41ea45443b0fb340bf4c71c4de87d4a5b 

333def clean_la_files( 

334 fs_root: VirtualPath, 

335 _unused1: Any, 

336 _unused2: Any, 

337) -> None: 

338 for path in _la_files(fs_root): 

339 buffer = [] 

340 with path.open(byte_io=True) as fd: 

341 replace_file = False 

342 for line in fd: 

343 if line.startswith(b"dependency_libs"): 

344 replacement = _LA_DEP_LIB_RE.sub(b"''", line) 

345 if replacement != line: 

346 replace_file = True 

347 line = replacement 

348 buffer.append(line) 

349 

350 if not replace_file: 

351 continue 

352 _info(f"Clearing the dependency_libs line in {path.path}") 

353 with path.replace_fs_path_content() as fs_path, open(fs_path, "wb") as wfd: 

354 wfd.writelines(buffer)