Coverage for src/debputy/plugins/debputy/package_processors.py: 54%
175 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import contextlib
2import functools
3import gzip
4import os
5import re
6import subprocess
7from contextlib import ExitStack
8from typing import Optional, IO, Any, List, Dict, Union
9from collections.abc import Iterator, Callable
11from debputy.plugin.api import VirtualPath
12from debputy.util import (
13 _error,
14 xargs,
15 escape_shell,
16 _info,
17 assume_not_none,
18 print_command,
19 _debug_log,
20)
23@contextlib.contextmanager
24def _open_maybe_gzip(path: VirtualPath) -> Iterator[IO[bytes] | gzip.GzipFile]:
25 if path.name.endswith(".gz"):
26 with gzip.GzipFile(path.fs_path, "rb") as fd:
27 yield fd
28 else:
29 with path.open(byte_io=True) as fd:
30 yield fd
33_SO_LINK_RE = re.compile(rb"[.]so\s+(.*)\s*")
34_LA_DEP_LIB_RE = re.compile(rb"'.+'")
37def _detect_so_link(path: VirtualPath) -> str | None:
38 so_link_re = _SO_LINK_RE
39 with _open_maybe_gzip(path) as fd:
40 for line in fd:
41 m = so_link_re.search(line)
42 if m:
43 return m.group(1).decode("utf-8")
44 return None
47def _replace_with_symlink(path: VirtualPath, so_link_target: str) -> None:
48 adjusted_target = so_link_target
49 parent_dir = path.parent_dir
50 assert parent_dir is not None # For the type checking
51 if parent_dir.name == os.path.dirname(adjusted_target):
52 # Avoid man8/../man8/foo links
53 adjusted_target = os.path.basename(adjusted_target)
54 elif "/" in so_link_target:
55 # symlinks and so links have a different base directory when the link has a "/".
56 # Adjust with an extra "../" to align the result
57 adjusted_target = "../" + adjusted_target
59 path.unlink()
60 parent_dir.add_symlink(path.name, adjusted_target)
63@functools.lru_cache(1)
64def _has_man_recode() -> bool:
65 # Ideally, we would just use shutil.which or something like that.
66 # Unfortunately, in debhelper, we experienced problems with which
67 # returning "yes" for a man tool that actually could not be run
68 # on salsa CI.
69 #
70 # Therefore, we adopt the logic of dh_installman to run the tool
71 # with --help to confirm it is not broken, because no one could
72 # figure out what happened in the salsa CI and my life is still
73 # too short to figure it out.
74 try:
75 subprocess.check_call(
76 ["man-recode", "--help"],
77 stdin=subprocess.DEVNULL,
78 stdout=subprocess.DEVNULL,
79 stderr=subprocess.DEVNULL,
80 restore_signals=True,
81 )
82 except subprocess.CalledProcessError:
83 return False
84 return True
87def process_manpages(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
88 man_dir = fs_root.lookup("./usr/share/man")
89 if not man_dir:
90 return
92 re_encode = []
93 for path in (p for p in man_dir.all_paths() if p.is_file and p.has_fs_path):
94 size = path.size
95 if size == 0:
96 continue
97 so_link_target = None
98 if size <= 1024:
99 # debhelper has a 1024 byte guard on the basis that ".so file tend to be small".
100 # That guard worked well for debhelper, so lets keep it for now on that basis alone.
101 so_link_target = _detect_so_link(path)
102 if so_link_target:
103 _replace_with_symlink(path, so_link_target)
104 else:
105 re_encode.append(path)
107 if not re_encode or not _has_man_recode():
108 return
110 with ExitStack() as manager:
111 manpages = [
112 manager.enter_context(p.replace_fs_path_content()) for p in re_encode
113 ]
114 static_cmd = ["man-recode", "--to-code", "UTF-8", "--suffix", ".encoded"]
115 for cmd in xargs(static_cmd, manpages):
116 _info(f"Ensuring manpages have utf-8 encoding via: {escape_shell(*cmd)}")
117 try:
118 subprocess.check_call(
119 cmd,
120 stdin=subprocess.DEVNULL,
121 restore_signals=True,
122 )
123 except subprocess.CalledProcessError:
124 _error(
125 "The man-recode process failed. Please review the output of `man-recode` to understand"
126 " what went wrong."
127 )
128 for manpage in manpages:
129 dest_name = manpage
130 if dest_name.endswith(".gz"):
131 encoded_name = dest_name[:-3] + ".encoded"
132 with open(dest_name, "wb") as out:
133 _debug_log(
134 f"Recompressing {dest_name} via gzip -9nc {escape_shell(encoded_name)}"
135 )
136 try:
137 subprocess.check_call(
138 [
139 "gzip",
140 "-9nc",
141 encoded_name,
142 ],
143 stdin=subprocess.DEVNULL,
144 stdout=out,
145 )
146 except subprocess.CalledProcessError:
147 _error(
148 f"The command {escape_shell('gzip', '-nc', f'{encoded_name}')} > {dest_name} failed!"
149 )
150 else:
151 os.rename(f"{dest_name}.encoded", manpage)
154def _filter_compress_paths() -> Callable[[VirtualPath], Iterator[VirtualPath]]:
155 ignore_dir_basenames = {
156 "_sources",
157 }
158 ignore_basenames = {
159 ".htaccess",
160 "index.sgml",
161 "objects.inv",
162 "search_index.json",
163 "copyright",
164 }
165 ignore_extensions = {
166 ".htm",
167 ".html",
168 ".xhtml",
169 ".gif",
170 ".png",
171 ".jpg",
172 ".jpeg",
173 ".gz",
174 ".taz",
175 ".tgz",
176 ".z",
177 ".bz2",
178 ".epub",
179 ".jar",
180 ".zip",
181 ".odg",
182 ".odp",
183 ".odt",
184 ".css",
185 ".xz",
186 ".lz",
187 ".lzma",
188 ".haddock",
189 ".hs",
190 ".woff",
191 ".woff2",
192 ".svg",
193 ".svgz",
194 ".js",
195 ".devhelp2",
196 ".map", # Technically, dh_compress has this one case-sensitive
197 }
198 ignore_special_cases = ("-gz", "-z", "_z")
200 def _filtered_walk(path: VirtualPath) -> Iterator[VirtualPath]:
201 for path, children in path.walk():
202 if path.name in ignore_dir_basenames: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 children.clear()
204 continue
205 if path.is_dir and path.name == "examples": 205 ↛ 207line 205 didn't jump to line 207 because the condition on line 205 was never true
206 # Ignore anything beneath /usr/share/doc/*/examples
207 parent = path.parent_dir
208 grand_parent = parent.parent_dir if parent else None
209 if grand_parent and grand_parent.absolute == "/usr/share/doc":
210 children.clear()
211 continue
212 name = path.name
213 if (
214 path.is_symlink
215 or not path.is_file
216 or name in ignore_basenames
217 or not path.has_fs_path
218 ):
219 continue
221 name_lc = name.lower()
222 _, ext = os.path.splitext(name_lc)
224 if ext in ignore_extensions or name_lc.endswith(ignore_special_cases): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 continue
226 yield path
228 return _filtered_walk
231def _find_compressable_paths(fs_root: VirtualPath) -> Iterator[VirtualPath]:
232 path_filter = _filter_compress_paths()
234 for p, compress_size_threshold in (
235 ("./usr/share/info", 0),
236 ("./usr/share/man", 0),
237 ("./usr/share/doc", 4096),
238 ):
239 path = fs_root.lookup(p)
240 if path is None:
241 continue
242 paths = path_filter(path)
243 if compress_size_threshold: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was never true
244 # The special-case for changelog and NEWS is from dh_compress. Generally these files
245 # have always been compressed regardless of their size.
246 paths = (
247 p
248 for p in paths
249 if p.size > compress_size_threshold
250 or p.name.startswith(("changelog", "NEWS"))
251 )
252 yield from paths
253 x11_path = fs_root.lookup("./usr/share/fonts/X11")
254 if x11_path: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 yield from (
256 p for p in x11_path.all_paths() if p.is_file and p.name.endswith(".pcf")
257 )
260def apply_compression(fs_root: VirtualPath, _unused1: Any, _unused2: Any) -> None:
261 # TODO: Support hardlinks
262 compressed_files: dict[str, str] = {}
263 for path in _find_compressable_paths(fs_root):
264 parent_dir = assume_not_none(path.parent_dir)
265 with (
266 parent_dir.add_file(f"{path.name}.gz", mtime=path.mtime) as new_file,
267 open(new_file.fs_path, "wb") as fd,
268 ):
269 try:
270 subprocess.check_call(["gzip", "-9nc", path.fs_path], stdout=fd)
271 except subprocess.CalledProcessError:
272 full_command = f"gzip -9nc {escape_shell(path.fs_path)} > {escape_shell(new_file.fs_path)}"
273 _error(
274 f"The compression of {path.path} failed. Please review the error message from gzip to"
275 f" understand what went wrong. Full command was: {full_command}"
276 )
277 compressed_files[path.path] = new_file.path
278 del parent_dir[path.name]
280 all_remaining_symlinks = {p.path: p for p in fs_root.all_paths() if p.is_symlink}
281 changed = True
282 while changed:
283 changed = False
284 remaining: list[VirtualPath] = list(all_remaining_symlinks.values())
285 for symlink in remaining:
286 target = symlink.readlink()
287 dir_target, basename_target = os.path.split(target)
288 new_basename_target = f"{basename_target}.gz"
289 symlink_parent_dir = assume_not_none(symlink.parent_dir)
290 dir_path = symlink_parent_dir
291 if dir_target != "":
292 dir_path = dir_path.lookup(dir_target)
293 if ( 293 ↛ 298line 293 didn't jump to line 298 because the condition on line 293 was never true
294 not dir_path
295 or basename_target in dir_path
296 or new_basename_target not in dir_path
297 ):
298 continue
299 del all_remaining_symlinks[symlink.path]
300 changed = True
302 new_link_name = (
303 f"{symlink.name}.gz"
304 if not symlink.name.endswith(".gz")
305 else symlink.name
306 )
307 symlink_parent_dir.add_symlink(
308 new_link_name, os.path.join(dir_target, new_basename_target)
309 )
310 symlink.unlink()
313def _la_files(fs_root: VirtualPath) -> Iterator[VirtualPath]:
314 lib_dir = fs_root.lookup("/usr/lib")
315 if not lib_dir:
316 return
317 # Original code only iterators directly in /usr/lib. To be a faithful conversion, we do the same
318 # here.
319 # Eagerly resolve the list as the replacement can trigger a runtime error otherwise
320 paths = list(lib_dir.iterdir)
321 yield from (p for p in paths if p.is_file and p.name.endswith(".la"))
324# Conceptually, the same feature that dh_gnome provides.
325# The clean_la_files function based on the dh_gnome version written by Luca Falavigna in 2010,
326# who in turn references a Makefile version of the feature.
327# https://salsa.debian.org/gnome-team/gnome-pkg-tools/-/commit/2868e1e41ea45443b0fb340bf4c71c4de87d4a5b
328def clean_la_files(
329 fs_root: VirtualPath,
330 _unused1: Any,
331 _unused2: Any,
332) -> None:
333 for path in _la_files(fs_root):
334 buffer = []
335 with path.open(byte_io=True) as fd:
336 replace_file = False
337 for line in fd:
338 if line.startswith(b"dependency_libs"):
339 replacement = _LA_DEP_LIB_RE.sub(b"''", line)
340 if replacement != line:
341 replace_file = True
342 line = replacement
343 buffer.append(line)
345 if not replace_file:
346 continue
347 _info(f"Clearing the dependency_libs line in {path.path}")
348 with path.replace_fs_path_content() as fs_path, open(fs_path, "wb") as wfd:
349 wfd.writelines(buffer)