Coverage for src/debputy/plugins/debputy/package_processors.py: 54%
173 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
1import contextlib
2import functools
3import gzip
4import os
5import re
6import subprocess
7from contextlib import ExitStack
8from typing import IO, Any
9from collections.abc import Iterator, Callable
11from debputy.plugin.api import VirtualPath
12from debputy.util import (
13 _error,
14 xargs,
15 escape_shell,
16 _info,
17 assume_not_none,
18 _debug_log,
19)
22@contextlib.contextmanager
23def _open_maybe_gzip(path: VirtualPath) -> Iterator[IO[bytes] | gzip.GzipFile]:
24 if path.name.endswith(".gz"):
25 with gzip.GzipFile(path.fs_path, "rb") as fd:
26 yield fd
27 else:
28 with path.open(byte_io=True) as fd:
29 yield fd
32_SO_LINK_RE = re.compile(rb"[.]so\s+(.*)\s*")
33_LA_DEP_LIB_RE = re.compile(rb"'.+'")
36def _detect_so_link(path: VirtualPath) -> str | None:
37 so_link_re = _SO_LINK_RE
38 with _open_maybe_gzip(path) as fd:
39 for line in fd:
40 m = so_link_re.search(line)
41 if m:
42 return m.group(1).decode("utf-8")
43 return None
46def _replace_with_symlink(path: VirtualPath, so_link_target: str) -> None:
47 adjusted_target = so_link_target
48 parent_dir = path.parent_dir
49 assert parent_dir is not None # For the type checking
50 if parent_dir.name == os.path.dirname(adjusted_target):
51 # Avoid man8/../man8/foo links
52 adjusted_target = os.path.basename(adjusted_target)
53 elif "/" in so_link_target:
54 # symlinks and so links have a different base directory when the link has a "/".
55 # Adjust with an extra "../" to align the result
56 adjusted_target = "../" + adjusted_target
58 path.unlink()
59 parent_dir.add_symlink(path.name, adjusted_target)
62@functools.lru_cache(1)
63def _has_man_recode() -> bool:
64 # Ideally, we would just use shutil.which or something like that.
65 # Unfortunately, in debhelper, we experienced problems with which
66 # returning "yes" for a man tool that actually could not be run
67 # on salsa CI.
68 #
69 # Therefore, we adopt the logic of dh_installman to run the tool
70 # with --help to confirm it is not broken, because no one could
71 # figure out what happened in the salsa CI and my life is still
72 # too short to figure it out.
73 try:
74 subprocess.check_call(
75 ["man-recode", "--help"],
76 stdin=subprocess.DEVNULL,
77 stdout=subprocess.DEVNULL,
78 stderr=subprocess.DEVNULL,
79 restore_signals=True,
80 )
81 except subprocess.CalledProcessError:
82 return False
83 return True
86def process_manpages(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
87 man_dir = fs_root.lookup("./usr/share/man")
88 if not man_dir:
89 return
91 re_encode = []
92 for path in (p for p in man_dir.all_paths() if p.is_file and p.has_fs_path):
93 size = path.size
94 if size == 0:
95 continue
96 so_link_target = None
97 if size <= 1024:
98 # debhelper has a 1024 byte guard on the basis that ".so file tend to be small".
99 # That guard worked well for debhelper, so lets keep it for now on that basis alone.
100 so_link_target = _detect_so_link(path)
101 if so_link_target:
102 _replace_with_symlink(path, so_link_target)
103 else:
104 re_encode.append(path)
106 if not re_encode or not _has_man_recode():
107 return
109 with ExitStack() as manager:
110 manpages = [
111 manager.enter_context(p.replace_fs_path_content()) for p in re_encode
112 ]
113 static_cmd = ["man-recode", "--to-code", "UTF-8", "--suffix", ".encoded"]
114 for cmd in xargs(static_cmd, manpages):
115 _info(f"Ensuring manpages have utf-8 encoding via: {escape_shell(*cmd)}")
116 try:
117 subprocess.check_call(
118 cmd,
119 stdin=subprocess.DEVNULL,
120 restore_signals=True,
121 )
122 except subprocess.CalledProcessError:
123 _error(
124 "The man-recode process failed. Please review the output of `man-recode` to understand"
125 " what went wrong."
126 )
127 for manpage in manpages:
128 dest_name = manpage
129 if dest_name.endswith(".gz"):
130 encoded_name = dest_name[:-3] + ".encoded"
131 with open(dest_name, "wb") as out:
132 _debug_log(
133 f"Recompressing {dest_name} via gzip -9nc {escape_shell(encoded_name)}"
134 )
135 try:
136 subprocess.check_call(
137 [
138 "gzip",
139 "-9nc",
140 encoded_name,
141 ],
142 stdin=subprocess.DEVNULL,
143 stdout=out,
144 )
145 except subprocess.CalledProcessError:
146 _error(
147 f"The command {escape_shell('gzip', '-nc', f'{encoded_name}')} > {dest_name} failed!"
148 )
149 else:
150 os.rename(f"{dest_name}.encoded", manpage)
153def _filter_compress_paths() -> Callable[[VirtualPath], Iterator[VirtualPath]]:
154 ignore_dir_basenames = {
155 "_sources",
156 }
157 ignore_basenames = {
158 ".htaccess",
159 "index.sgml",
160 "objects.inv",
161 "search_index.json",
162 "copyright",
163 }
164 ignore_extensions = {
165 ".htm",
166 ".html",
167 ".xhtml",
168 ".gif",
169 ".png",
170 ".jpg",
171 ".jpeg",
172 ".gz",
173 ".taz",
174 ".tgz",
175 ".z",
176 ".bz2",
177 ".epub",
178 ".jar",
179 ".zip",
180 ".odg",
181 ".odp",
182 ".odt",
183 ".css",
184 ".xz",
185 ".lz",
186 ".lzma",
187 ".haddock",
188 ".hs",
189 ".woff",
190 ".woff2",
191 ".svg",
192 ".svgz",
193 ".js",
194 ".devhelp2",
195 ".map", # Technically, dh_compress has this one case-sensitive
196 }
197 ignore_special_cases = ("-gz", "-z", "_z")
199 def _filtered_walk(path: VirtualPath) -> Iterator[VirtualPath]:
200 for path, children in path.walk():
201 if path.name in ignore_dir_basenames: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 children.clear()
203 continue
204 if path.is_dir and path.name == "examples": 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was never true
205 # Ignore anything beneath /usr/share/doc/*/examples
206 parent = path.parent_dir
207 grand_parent = parent.parent_dir if parent else None
208 if grand_parent and grand_parent.absolute == "/usr/share/doc":
209 children.clear()
210 continue
211 name = path.name
212 if (
213 path.is_symlink
214 or not path.is_file
215 or name in ignore_basenames
216 or not path.has_fs_path
217 ):
218 continue
220 name_lc = name.lower()
221 _, ext = os.path.splitext(name_lc)
223 if ext in ignore_extensions or name_lc.endswith(ignore_special_cases): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 continue
225 yield path
227 return _filtered_walk
230def _find_compressable_paths(fs_root: VirtualPath) -> Iterator[VirtualPath]:
231 path_filter = _filter_compress_paths()
233 for p, compress_size_threshold in (
234 ("./usr/share/info", 0),
235 ("./usr/share/man", 0),
236 ("./usr/share/doc", 4096),
237 ):
238 path = fs_root.lookup(p)
239 if path is None:
240 continue
241 paths = path_filter(path)
242 if compress_size_threshold: 242 ↛ 245line 242 didn't jump to line 245 because the condition on line 242 was never true
243 # The special-case for changelog and NEWS is from dh_compress. Generally these files
244 # have always been compressed regardless of their size.
245 paths = (
246 p
247 for p in paths
248 if p.size > compress_size_threshold
249 # Case-insensitivity is a known delta from `dh_compress` at the time of writing.
250 or p.name.lower().startswith(("changelog", "news"))
251 )
252 yield from paths
253 x11_path = fs_root.lookup("./usr/share/fonts/X11")
254 if x11_path: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 yield from (
256 p for p in x11_path.all_paths() if p.is_file and p.name.endswith(".pcf")
257 )
260def apply_compression(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
261 # TODO: Support hardlinks
262 compressed_files: dict[str, str] = {}
263 for path in _find_compressable_paths(fs_root):
264 parent_dir = assume_not_none(path.parent_dir)
265 with (
266 parent_dir.add_file(f"{path.name}.gz", mtime=path.mtime) as new_file,
267 open(new_file.fs_path, "wb") as fd,
268 ):
269 try:
270 subprocess.check_call(["gzip", "-9nc", path.fs_path], stdout=fd)
271 except subprocess.CalledProcessError:
272 full_command = f"gzip -9nc {escape_shell(path.fs_path)} > {escape_shell(new_file.fs_path)}"
273 _error(
274 f"The compression of {path.path} failed. Please review the error message from gzip to"
275 f" understand what went wrong. Full command was: {full_command}"
276 )
277 compressed_files[path.path] = new_file.path
278 del parent_dir[path.name]
280 all_remaining_symlinks = {p.path: p for p in fs_root.all_paths() if p.is_symlink}
281 changed = True
282 while changed:
283 changed = False
284 remaining: list[VirtualPath] = list(all_remaining_symlinks.values())
285 for symlink in remaining:
286 target = symlink.readlink()
287 dir_target, basename_target = os.path.split(target)
288 new_basename_target = f"{basename_target}.gz"
289 symlink_parent_dir = assume_not_none(symlink.parent_dir)
290 dir_path = (
291 symlink_parent_dir.lookup(dir_target)
292 if dir_target
293 else symlink_parent_dir
294 )
295 if ( 295 ↛ 300line 295 didn't jump to line 300 because the condition on line 295 was never true
296 not dir_path
297 or basename_target in dir_path
298 or new_basename_target not in dir_path
299 ):
300 continue
301 del all_remaining_symlinks[symlink.path]
302 changed = True
304 new_link_name = (
305 f"{symlink.name}.gz"
306 if not symlink.name.endswith(".gz")
307 else symlink.name
308 )
309 symlink_parent_dir.add_symlink(
310 new_link_name, os.path.join(dir_target, new_basename_target)
311 )
312 symlink.unlink()
315def _la_files(fs_root: VirtualPath) -> Iterator[VirtualPath]:
316 lib_dir = fs_root.lookup("/usr/lib")
317 if not lib_dir:
318 return
319 # Original code only iterators directly in /usr/lib. To be a faithful conversion, we do the same
320 # here.
321 # Eagerly resolve the list as the replacement can trigger a runtime error otherwise
322 paths = list(lib_dir.iterdir())
323 yield from (p for p in paths if p.is_file and p.name.endswith(".la"))
326# Conceptually, the same feature that dh_gnome provides.
327# The clean_la_files function based on the dh_gnome version written by Luca Falavigna in 2010,
328# who in turn references a Makefile version of the feature.
329# https://salsa.debian.org/gnome-team/gnome-pkg-tools/-/commit/2868e1e41ea45443b0fb340bf4c71c4de87d4a5b
330def clean_la_files(
331 fs_root: VirtualPath,
332 _unused1: Any,
333 _unused2: Any,
334) -> None:
335 for path in _la_files(fs_root):
336 buffer = []
337 with path.open(byte_io=True) as fd:
338 replace_file = False
339 for line in fd:
340 if line.startswith(b"dependency_libs"):
341 replacement = _LA_DEP_LIB_RE.sub(b"''", line)
342 if replacement != line:
343 replace_file = True
344 line = replacement
345 buffer.append(line)
347 if not replace_file:
348 continue
349 _info(f"Clearing the dependency_libs line in {path.path}")
350 with path.replace_fs_path_content() as fs_path, open(fs_path, "wb") as wfd:
351 wfd.writelines(buffer)