Coverage for src/debputy/plugins/debputy/package_processors.py: 54%

175 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import contextlib 

2import functools 

3import gzip 

4import os 

5import re 

6import subprocess 

7from contextlib import ExitStack 

8from typing import Optional, IO, Any, List, Dict, Union 

9from collections.abc import Iterator, Callable 

10 

11from debputy.plugin.api import VirtualPath 

12from debputy.util import ( 

13 _error, 

14 xargs, 

15 escape_shell, 

16 _info, 

17 assume_not_none, 

18 print_command, 

19 _debug_log, 

20) 

21 

22 

23@contextlib.contextmanager 

24def _open_maybe_gzip(path: VirtualPath) -> Iterator[IO[bytes] | gzip.GzipFile]: 

25 if path.name.endswith(".gz"): 

26 with gzip.GzipFile(path.fs_path, "rb") as fd: 

27 yield fd 

28 else: 

29 with path.open(byte_io=True) as fd: 

30 yield fd 

31 

32 

33_SO_LINK_RE = re.compile(rb"[.]so\s+(.*)\s*") 

34_LA_DEP_LIB_RE = re.compile(rb"'.+'") 

35 

36 

37def _detect_so_link(path: VirtualPath) -> str | None: 

38 so_link_re = _SO_LINK_RE 

39 with _open_maybe_gzip(path) as fd: 

40 for line in fd: 

41 m = so_link_re.search(line) 

42 if m: 

43 return m.group(1).decode("utf-8") 

44 return None 

45 

46 

47def _replace_with_symlink(path: VirtualPath, so_link_target: str) -> None: 

48 adjusted_target = so_link_target 

49 parent_dir = path.parent_dir 

50 assert parent_dir is not None # For the type checking 

51 if parent_dir.name == os.path.dirname(adjusted_target): 

52 # Avoid man8/../man8/foo links 

53 adjusted_target = os.path.basename(adjusted_target) 

54 elif "/" in so_link_target: 

55 # symlinks and so links have a different base directory when the link has a "/". 

56 # Adjust with an extra "../" to align the result 

57 adjusted_target = "../" + adjusted_target 

58 

59 path.unlink() 

60 parent_dir.add_symlink(path.name, adjusted_target) 

61 

62 

63@functools.lru_cache(1) 

64def _has_man_recode() -> bool: 

65 # Ideally, we would just use shutil.which or something like that. 

66 # Unfortunately, in debhelper, we experienced problems with which 

67 # returning "yes" for a man tool that actually could not be run 

68 # on salsa CI. 

69 # 

70 # Therefore, we adopt the logic of dh_installman to run the tool 

71 # with --help to confirm it is not broken, because no one could 

72 # figure out what happened in the salsa CI and my life is still 

73 # too short to figure it out. 

74 try: 

75 subprocess.check_call( 

76 ["man-recode", "--help"], 

77 stdin=subprocess.DEVNULL, 

78 stdout=subprocess.DEVNULL, 

79 stderr=subprocess.DEVNULL, 

80 restore_signals=True, 

81 ) 

82 except subprocess.CalledProcessError: 

83 return False 

84 return True 

85 

86 

87def process_manpages(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None: 

88 man_dir = fs_root.lookup("./usr/share/man") 

89 if not man_dir: 

90 return 

91 

92 re_encode = [] 

93 for path in (p for p in man_dir.all_paths() if p.is_file and p.has_fs_path): 

94 size = path.size 

95 if size == 0: 

96 continue 

97 so_link_target = None 

98 if size <= 1024: 

99 # debhelper has a 1024 byte guard on the basis that ".so file tend to be small". 

100 # That guard worked well for debhelper, so lets keep it for now on that basis alone. 

101 so_link_target = _detect_so_link(path) 

102 if so_link_target: 

103 _replace_with_symlink(path, so_link_target) 

104 else: 

105 re_encode.append(path) 

106 

107 if not re_encode or not _has_man_recode(): 

108 return 

109 

110 with ExitStack() as manager: 

111 manpages = [ 

112 manager.enter_context(p.replace_fs_path_content()) for p in re_encode 

113 ] 

114 static_cmd = ["man-recode", "--to-code", "UTF-8", "--suffix", ".encoded"] 

115 for cmd in xargs(static_cmd, manpages): 

116 _info(f"Ensuring manpages have utf-8 encoding via: {escape_shell(*cmd)}") 

117 try: 

118 subprocess.check_call( 

119 cmd, 

120 stdin=subprocess.DEVNULL, 

121 restore_signals=True, 

122 ) 

123 except subprocess.CalledProcessError: 

124 _error( 

125 "The man-recode process failed. Please review the output of `man-recode` to understand" 

126 " what went wrong." 

127 ) 

128 for manpage in manpages: 

129 dest_name = manpage 

130 if dest_name.endswith(".gz"): 

131 encoded_name = dest_name[:-3] + ".encoded" 

132 with open(dest_name, "wb") as out: 

133 _debug_log( 

134 f"Recompressing {dest_name} via gzip -9nc {escape_shell(encoded_name)}" 

135 ) 

136 try: 

137 subprocess.check_call( 

138 [ 

139 "gzip", 

140 "-9nc", 

141 encoded_name, 

142 ], 

143 stdin=subprocess.DEVNULL, 

144 stdout=out, 

145 ) 

146 except subprocess.CalledProcessError: 

147 _error( 

148 f"The command {escape_shell('gzip', '-nc', f'{encoded_name}')} > {dest_name} failed!" 

149 ) 

150 else: 

151 os.rename(f"{dest_name}.encoded", manpage) 

152 

153 

154def _filter_compress_paths() -> Callable[[VirtualPath], Iterator[VirtualPath]]: 

155 ignore_dir_basenames = { 

156 "_sources", 

157 } 

158 ignore_basenames = { 

159 ".htaccess", 

160 "index.sgml", 

161 "objects.inv", 

162 "search_index.json", 

163 "copyright", 

164 } 

165 ignore_extensions = { 

166 ".htm", 

167 ".html", 

168 ".xhtml", 

169 ".gif", 

170 ".png", 

171 ".jpg", 

172 ".jpeg", 

173 ".gz", 

174 ".taz", 

175 ".tgz", 

176 ".z", 

177 ".bz2", 

178 ".epub", 

179 ".jar", 

180 ".zip", 

181 ".odg", 

182 ".odp", 

183 ".odt", 

184 ".css", 

185 ".xz", 

186 ".lz", 

187 ".lzma", 

188 ".haddock", 

189 ".hs", 

190 ".woff", 

191 ".woff2", 

192 ".svg", 

193 ".svgz", 

194 ".js", 

195 ".devhelp2", 

196 ".map", # Technically, dh_compress has this one case-sensitive 

197 } 

198 ignore_special_cases = ("-gz", "-z", "_z") 

199 

200 def _filtered_walk(path: VirtualPath) -> Iterator[VirtualPath]: 

201 for path, children in path.walk(): 

202 if path.name in ignore_dir_basenames: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 children.clear() 

204 continue 

205 if path.is_dir and path.name == "examples": 205 ↛ 207line 205 didn't jump to line 207 because the condition on line 205 was never true

206 # Ignore anything beneath /usr/share/doc/*/examples 

207 parent = path.parent_dir 

208 grand_parent = parent.parent_dir if parent else None 

209 if grand_parent and grand_parent.absolute == "/usr/share/doc": 

210 children.clear() 

211 continue 

212 name = path.name 

213 if ( 

214 path.is_symlink 

215 or not path.is_file 

216 or name in ignore_basenames 

217 or not path.has_fs_path 

218 ): 

219 continue 

220 

221 name_lc = name.lower() 

222 _, ext = os.path.splitext(name_lc) 

223 

224 if ext in ignore_extensions or name_lc.endswith(ignore_special_cases): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 continue 

226 yield path 

227 

228 return _filtered_walk 

229 

230 

231def _find_compressable_paths(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

232 path_filter = _filter_compress_paths() 

233 

234 for p, compress_size_threshold in ( 

235 ("./usr/share/info", 0), 

236 ("./usr/share/man", 0), 

237 ("./usr/share/doc", 4096), 

238 ): 

239 path = fs_root.lookup(p) 

240 if path is None: 

241 continue 

242 paths = path_filter(path) 

243 if compress_size_threshold: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was never true

244 # The special-case for changelog and NEWS is from dh_compress. Generally these files 

245 # have always been compressed regardless of their size. 

246 paths = ( 

247 p 

248 for p in paths 

249 if p.size > compress_size_threshold 

250 or p.name.startswith(("changelog", "NEWS")) 

251 ) 

252 yield from paths 

253 x11_path = fs_root.lookup("./usr/share/fonts/X11") 

254 if x11_path: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 yield from ( 

256 p for p in x11_path.all_paths() if p.is_file and p.name.endswith(".pcf") 

257 ) 

258 

259 

260def apply_compression(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None: 

261 # TODO: Support hardlinks 

262 compressed_files: dict[str, str] = {} 

263 for path in _find_compressable_paths(fs_root): 

264 parent_dir = assume_not_none(path.parent_dir) 

265 with ( 

266 parent_dir.add_file(f"{path.name}.gz", mtime=path.mtime) as new_file, 

267 open(new_file.fs_path, "wb") as fd, 

268 ): 

269 try: 

270 subprocess.check_call(["gzip", "-9nc", path.fs_path], stdout=fd) 

271 except subprocess.CalledProcessError: 

272 full_command = f"gzip -9nc {escape_shell(path.fs_path)} > {escape_shell(new_file.fs_path)}" 

273 _error( 

274 f"The compression of {path.path} failed. Please review the error message from gzip to" 

275 f" understand what went wrong. Full command was: {full_command}" 

276 ) 

277 compressed_files[path.path] = new_file.path 

278 del parent_dir[path.name] 

279 

280 all_remaining_symlinks = {p.path: p for p in fs_root.all_paths() if p.is_symlink} 

281 changed = True 

282 while changed: 

283 changed = False 

284 remaining: list[VirtualPath] = list(all_remaining_symlinks.values()) 

285 for symlink in remaining: 

286 target = symlink.readlink() 

287 dir_target, basename_target = os.path.split(target) 

288 new_basename_target = f"{basename_target}.gz" 

289 symlink_parent_dir = assume_not_none(symlink.parent_dir) 

290 dir_path = symlink_parent_dir 

291 if dir_target != "": 

292 dir_path = dir_path.lookup(dir_target) 

293 if ( 293 ↛ 298line 293 didn't jump to line 298 because the condition on line 293 was never true

294 not dir_path 

295 or basename_target in dir_path 

296 or new_basename_target not in dir_path 

297 ): 

298 continue 

299 del all_remaining_symlinks[symlink.path] 

300 changed = True 

301 

302 new_link_name = ( 

303 f"{symlink.name}.gz" 

304 if not symlink.name.endswith(".gz") 

305 else symlink.name 

306 ) 

307 symlink_parent_dir.add_symlink( 

308 new_link_name, os.path.join(dir_target, new_basename_target) 

309 ) 

310 symlink.unlink() 

311 

312 

313def _la_files(fs_root: VirtualPath) -> Iterator[VirtualPath]: 

314 lib_dir = fs_root.lookup("/usr/lib") 

315 if not lib_dir: 

316 return 

317 # Original code only iterators directly in /usr/lib. To be a faithful conversion, we do the same 

318 # here. 

319 # Eagerly resolve the list as the replacement can trigger a runtime error otherwise 

320 paths = list(lib_dir.iterdir) 

321 yield from (p for p in paths if p.is_file and p.name.endswith(".la")) 

322 

323 

324# Conceptually, the same feature that dh_gnome provides. 

325# The clean_la_files function based on the dh_gnome version written by Luca Falavigna in 2010, 

326# who in turn references a Makefile version of the feature. 

327# https://salsa.debian.org/gnome-team/gnome-pkg-tools/-/commit/2868e1e41ea45443b0fb340bf4c71c4de87d4a5b 

328def clean_la_files( 

329 fs_root: VirtualPath, 

330 _unused1: Any, 

331 _unused2: Any, 

332) -> None: 

333 for path in _la_files(fs_root): 

334 buffer = [] 

335 with path.open(byte_io=True) as fd: 

336 replace_file = False 

337 for line in fd: 

338 if line.startswith(b"dependency_libs"): 

339 replacement = _LA_DEP_LIB_RE.sub(b"''", line) 

340 if replacement != line: 

341 replace_file = True 

342 line = replacement 

343 buffer.append(line) 

344 

345 if not replace_file: 

346 continue 

347 _info(f"Clearing the dependency_libs line in {path.path}") 

348 with path.replace_fs_path_content() as fs_path, open(fs_path, "wb") as wfd: 

349 wfd.writelines(buffer)