Coverage for src/debputy/plugins/debputy/strip_non_determinism.py: 69%
110 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import os.path
3import re
4import subprocess
5from contextlib import ExitStack
6from enum import IntEnum
7from typing import Optional, List, Any, Tuple, Union
8from collections.abc import Iterator, Callable
10from debputy.plugin.api import VirtualPath
11from debputy.plugin.api.impl_types import PackageProcessingContextProvider
12from debputy.util import xargs, _info, escape_shell, _error
15class DetectionVerdict(IntEnum):
16 NOT_RELEVANT = 1
17 NEEDS_FILE_OUTPUT = 2
18 PROCESS = 3
21def _file_starts_with(
22 sequences: bytes | tuple[bytes, ...],
23) -> Callable[[VirtualPath], bool]:
24 if isinstance(sequences, bytes):
25 longest_sequence = len(sequences)
26 sequences = (sequences,)
27 else:
28 longest_sequence = max(len(s) for s in sequences)
30 def _checker(path: VirtualPath) -> bool:
31 with path.open(byte_io=True, buffering=4096) as fd:
32 buffer = fd.read(longest_sequence)
33 return buffer.startswith(sequences)
35 return _checker
38def _is_javadoc_file(path: VirtualPath) -> bool:
39 with path.open(buffering=4096) as fd:
40 c = fd.read(1024)
41 return "<!-- Generated by javadoc" in c
44class SndDetectionRule:
45 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
46 raise NotImplementedError
48 def file_output_verdict(
49 self,
50 path: VirtualPath,
51 file_analysis: str | None,
52 ) -> bool:
53 raise TypeError(
54 "Should not have been called or the rule forgot to implement this method"
55 )
58@dataclasses.dataclass(frozen=True, slots=True)
59class ExtensionPlusFileOutputRule(SndDetectionRule):
60 extensions: tuple[str, ...]
61 file_pattern: re.Pattern[str] | None = None
63 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
64 _, ext = os.path.splitext(path.name)
65 if ext not in self.extensions: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true
66 return DetectionVerdict.NOT_RELEVANT
67 if self.file_pattern is None:
68 return DetectionVerdict.PROCESS
69 return DetectionVerdict.NEEDS_FILE_OUTPUT
71 def file_output_verdict(
72 self,
73 path: VirtualPath,
74 file_analysis: str | None,
75 ) -> bool:
76 file_pattern = self.file_pattern
77 assert file_pattern is not None and file_analysis is not None
78 m = file_pattern.search(file_analysis)
79 return m is not None
82@dataclasses.dataclass(frozen=True, slots=True)
83class ExtensionPlusContentCheck(SndDetectionRule):
84 extensions: tuple[str, ...]
85 content_check: Callable[[VirtualPath], bool]
87 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
88 _, ext = os.path.splitext(path.name)
89 if ext not in self.extensions:
90 return DetectionVerdict.NOT_RELEVANT
91 content_verdict = self.content_check(path)
92 if content_verdict: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 return DetectionVerdict.PROCESS
94 return DetectionVerdict.NOT_RELEVANT
97class PyzipFileCheck(SndDetectionRule):
98 def _is_pyzip_file(self, path: VirtualPath) -> bool:
99 with path.open(byte_io=True, buffering=4096) as fd:
100 c = fd.read(32)
101 if not c.startswith(b"#!"): 101 ↛ 104line 101 didn't jump to line 104 because the condition on line 101 was always true
102 return False
104 return b"\nPK\x03\x04" in c
106 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
107 if self._is_pyzip_file(path): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 return DetectionVerdict.PROCESS
109 return DetectionVerdict.NOT_RELEVANT
112# These detection rules should be aligned with `get_normalizer_for_file` in File::StripNondeterminism.
113# Note if we send a file too much, it is just bad for performance. If we send a file to little, we
114# risk non-determinism in the final output.
115SND_DETECTION_RULES: list[SndDetectionRule] = [
116 ExtensionPlusContentCheck(
117 extensions=(".a",),
118 content_check=_file_starts_with(
119 (
120 b"!<arch>\n",
121 b"!<thin>\n",
122 ),
123 ),
124 ),
125 ExtensionPlusContentCheck(
126 extensions=(".png",),
127 content_check=_file_starts_with(b"\x89PNG\x0d\x0a\x1a\x0a"),
128 ),
129 ExtensionPlusContentCheck(
130 extensions=(".gz", ".dz"),
131 content_check=_file_starts_with(b"\x1f\x8b"),
132 ),
133 ExtensionPlusContentCheck(
134 extensions=(
135 # .zip related
136 ".zip",
137 ".pk3",
138 ".epub",
139 ".whl",
140 ".xpi",
141 ".htb",
142 ".zhfst",
143 ".par",
144 ".codadef",
145 # .jar related
146 ".jar",
147 ".war",
148 ".hpi",
149 ".apk",
150 ".sym",
151 ),
152 content_check=_file_starts_with(
153 (
154 b"PK\x03\x04\x1f",
155 b"PK\x05\x06",
156 b"PK\x07\x08",
157 )
158 ),
159 ),
160 ExtensionPlusContentCheck(
161 extensions=(
162 ".mo",
163 ".gmo",
164 ),
165 content_check=_file_starts_with(
166 (
167 b"\x95\x04\x12\xde",
168 b"\xde\x12\x04\x95",
169 )
170 ),
171 ),
172 ExtensionPlusContentCheck(
173 extensions=(".uimage",),
174 content_check=_file_starts_with(b"\x27\x05\x19\x56"),
175 ),
176 ExtensionPlusContentCheck(
177 extensions=(".bflt",),
178 content_check=_file_starts_with(b"\x62\x46\x4c\x54"),
179 ),
180 ExtensionPlusContentCheck(
181 extensions=(".jmod",),
182 content_check=_file_starts_with(b"JM"),
183 ),
184 ExtensionPlusContentCheck(
185 extensions=(".html",),
186 content_check=_is_javadoc_file,
187 ),
188 PyzipFileCheck(),
189 ExtensionPlusFileOutputRule(
190 extensions=(".cpio",),
191 # XXX: Add file output check (requires the file output support)
192 ),
193]
196def _detect_paths_with_possible_non_determinism(
197 fs_root: VirtualPath,
198) -> Iterator[VirtualPath]:
199 needs_file_output = []
200 for path in fs_root.all_paths():
201 if not path.is_file:
202 continue
203 verdict = DetectionVerdict.NOT_RELEVANT
204 needs_file_output_rules = []
205 for rule in SND_DETECTION_RULES:
206 v = rule.initial_verdict(path)
207 if v > verdict: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 verdict = v
209 if verdict == DetectionVerdict.PROCESS: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 yield path
211 break
212 elif verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 needs_file_output_rules.append(rule)
215 if verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 needs_file_output.append((path, needs_file_output_rules))
218 assert not needs_file_output
219 # FIXME: Implement file check
222def _apply_strip_non_determinism(timestamp: str, paths: list[VirtualPath]) -> None:
223 static_cmd = [
224 "strip-nondeterminism",
225 f"--timestamp={timestamp}",
226 "--normalizers=+all",
227 ]
228 with ExitStack() as manager:
229 affected_files = [
230 manager.enter_context(p.replace_fs_path_content()) for p in paths
231 ]
232 for cmd in xargs(static_cmd, affected_files):
233 _info(
234 f"Removing (possible) unnecessary non-deterministic content via: {escape_shell(*cmd)}"
235 )
236 try:
237 subprocess.check_call(
238 cmd,
239 stdin=subprocess.DEVNULL,
240 restore_signals=True,
241 )
242 except subprocess.CalledProcessError:
243 _error(
244 "Attempting to remove unnecessary non-deterministic content failed. Please review"
245 " the error from strip-nondeterminism above understand what went wrong."
246 )
249def strip_non_determinism(
250 fs_root: VirtualPath, _: Any, context: PackageProcessingContextProvider
251) -> None:
252 paths = list(_detect_paths_with_possible_non_determinism(fs_root))
254 if not paths: 254 ↛ 258line 254 didn't jump to line 258 because the condition on line 254 was always true
255 _info("Detected no paths to be processed by strip-nondeterminism")
256 return
258 substitution = context._manifest.substitution
260 source_date_epoch = substitution.substitute(
261 "{{_DEBPUTY_SND_SOURCE_DATE_EPOCH}}", "Internal; strip-nondeterminism"
262 )
264 _apply_strip_non_determinism(source_date_epoch, paths)