Coverage for src/debputy/plugin/debputy/strip_non_determinism.py: 69%
109 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import os.path
3import re
4import subprocess
5from contextlib import ExitStack
6from enum import IntEnum
7from typing import Iterator, Optional, List, Callable, Any, Tuple, Union
9from debputy.plugin.api import VirtualPath
10from debputy.plugin.api.impl_types import PackageProcessingContextProvider
11from debputy.util import xargs, _info, escape_shell, _error
14class DetectionVerdict(IntEnum):
15 NOT_RELEVANT = 1
16 NEEDS_FILE_OUTPUT = 2
17 PROCESS = 3
20def _file_starts_with(
21 sequences: Union[bytes, Tuple[bytes, ...]]
22) -> Callable[[VirtualPath], bool]:
23 if isinstance(sequences, bytes):
24 longest_sequence = len(sequences)
25 sequences = (sequences,)
26 else:
27 longest_sequence = max(len(s) for s in sequences)
29 def _checker(path: VirtualPath) -> bool:
30 with path.open(byte_io=True, buffering=4096) as fd:
31 buffer = fd.read(longest_sequence)
32 return buffer.startswith(sequences)
34 return _checker
37def _is_javadoc_file(path: VirtualPath) -> bool:
38 with path.open(buffering=4096) as fd:
39 c = fd.read(1024)
40 return "<!-- Generated by javadoc" in c
43class SndDetectionRule:
44 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
45 raise NotImplementedError
47 def file_output_verdict(
48 self,
49 path: VirtualPath,
50 file_analysis: Optional[str],
51 ) -> bool:
52 raise TypeError(
53 "Should not have been called or the rule forgot to implement this method"
54 )
57@dataclasses.dataclass(frozen=True, slots=True)
58class ExtensionPlusFileOutputRule(SndDetectionRule):
59 extensions: Tuple[str, ...]
60 file_pattern: Optional[re.Pattern[str]] = None
62 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
63 _, ext = os.path.splitext(path.name)
64 if ext not in self.extensions: 64 ↛ 66line 64 didn't jump to line 66 because the condition on line 64 was always true
65 return DetectionVerdict.NOT_RELEVANT
66 if self.file_pattern is None:
67 return DetectionVerdict.PROCESS
68 return DetectionVerdict.NEEDS_FILE_OUTPUT
70 def file_output_verdict(
71 self,
72 path: VirtualPath,
73 file_analysis: Optional[str],
74 ) -> bool:
75 file_pattern = self.file_pattern
76 assert file_pattern is not None and file_analysis is not None
77 m = file_pattern.search(file_analysis)
78 return m is not None
81@dataclasses.dataclass(frozen=True, slots=True)
82class ExtensionPlusContentCheck(SndDetectionRule):
83 extensions: Tuple[str, ...]
84 content_check: Callable[[VirtualPath], bool]
86 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
87 _, ext = os.path.splitext(path.name)
88 if ext not in self.extensions:
89 return DetectionVerdict.NOT_RELEVANT
90 content_verdict = self.content_check(path)
91 if content_verdict: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 return DetectionVerdict.PROCESS
93 return DetectionVerdict.NOT_RELEVANT
96class PyzipFileCheck(SndDetectionRule):
97 def _is_pyzip_file(self, path: VirtualPath) -> bool:
98 with path.open(byte_io=True, buffering=4096) as fd:
99 c = fd.read(32)
100 if not c.startswith(b"#!"): 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true
101 return False
103 return b"\nPK\x03\x04" in c
105 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
106 if self._is_pyzip_file(path): 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 return DetectionVerdict.PROCESS
108 return DetectionVerdict.NOT_RELEVANT
111# These detection rules should be aligned with `get_normalizer_for_file` in File::StripNondeterminism.
112# Note if we send a file too much, it is just bad for performance. If we send a file to little, we
113# risk non-determinism in the final output.
114SND_DETECTION_RULES: List[SndDetectionRule] = [
115 ExtensionPlusContentCheck(
116 extensions=(".a",),
117 content_check=_file_starts_with(
118 (
119 b"!<arch>\n",
120 b"!<thin>\n",
121 ),
122 ),
123 ),
124 ExtensionPlusContentCheck(
125 extensions=(".png",),
126 content_check=_file_starts_with(b"\x89PNG\x0D\x0A\x1A\x0A"),
127 ),
128 ExtensionPlusContentCheck(
129 extensions=(".gz", ".dz"),
130 content_check=_file_starts_with(b"\x1F\x8B"),
131 ),
132 ExtensionPlusContentCheck(
133 extensions=(
134 # .zip related
135 ".zip",
136 ".pk3",
137 ".epub",
138 ".whl",
139 ".xpi",
140 ".htb",
141 ".zhfst",
142 ".par",
143 ".codadef",
144 # .jar related
145 ".jar",
146 ".war",
147 ".hpi",
148 ".apk",
149 ".sym",
150 ),
151 content_check=_file_starts_with(
152 (
153 b"PK\x03\x04\x1F",
154 b"PK\x05\x06",
155 b"PK\x07\x08",
156 )
157 ),
158 ),
159 ExtensionPlusContentCheck(
160 extensions=(
161 ".mo",
162 ".gmo",
163 ),
164 content_check=_file_starts_with(
165 (
166 b"\x95\x04\x12\xde",
167 b"\xde\x12\x04\x95",
168 )
169 ),
170 ),
171 ExtensionPlusContentCheck(
172 extensions=(".uimage",),
173 content_check=_file_starts_with(b"\x27\x05\x19\x56"),
174 ),
175 ExtensionPlusContentCheck(
176 extensions=(".bflt",),
177 content_check=_file_starts_with(b"\x62\x46\x4C\x54"),
178 ),
179 ExtensionPlusContentCheck(
180 extensions=(".jmod",),
181 content_check=_file_starts_with(b"JM"),
182 ),
183 ExtensionPlusContentCheck(
184 extensions=(".html",),
185 content_check=_is_javadoc_file,
186 ),
187 PyzipFileCheck(),
188 ExtensionPlusFileOutputRule(
189 extensions=(".cpio",),
190 # XXX: Add file output check (requires the file output support)
191 ),
192]
195def _detect_paths_with_possible_non_determinism(
196 fs_root: VirtualPath,
197) -> Iterator[VirtualPath]:
198 needs_file_output = []
199 for path in fs_root.all_paths():
200 if not path.is_file:
201 continue
202 verdict = DetectionVerdict.NOT_RELEVANT
203 needs_file_output_rules = []
204 for rule in SND_DETECTION_RULES:
205 v = rule.initial_verdict(path)
206 if v > verdict: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 verdict = v
208 if verdict == DetectionVerdict.PROCESS: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 yield path
210 break
211 elif verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 needs_file_output_rules.append(rule)
214 if verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 needs_file_output.append((path, needs_file_output_rules))
217 assert not needs_file_output
218 # FIXME: Implement file check
221def _apply_strip_non_determinism(timestamp: str, paths: List[VirtualPath]) -> None:
222 static_cmd = [
223 "strip-nondeterminism",
224 f"--timestamp={timestamp}",
225 "--normalizers=+all",
226 ]
227 with ExitStack() as manager:
228 affected_files = [
229 manager.enter_context(p.replace_fs_path_content()) for p in paths
230 ]
231 for cmd in xargs(static_cmd, affected_files):
232 _info(
233 f"Removing (possible) unnecessary non-deterministic content via: {escape_shell(*cmd)}"
234 )
235 try:
236 subprocess.check_call(
237 cmd,
238 stdin=subprocess.DEVNULL,
239 restore_signals=True,
240 )
241 except subprocess.CalledProcessError:
242 _error(
243 "Attempting to remove unnecessary non-deterministic content failed. Please review"
244 " the error from strip-nondeterminism above understand what went wrong."
245 )
248def strip_non_determinism(
249 fs_root: VirtualPath, _: Any, context: PackageProcessingContextProvider
250) -> None:
251 paths = list(_detect_paths_with_possible_non_determinism(fs_root))
253 if not paths: 253 ↛ 257line 253 didn't jump to line 257 because the condition on line 253 was always true
254 _info("Detected no paths to be processed by strip-nondeterminism")
255 return
257 substitution = context._manifest.substitution
259 source_date_epoch = substitution.substitute(
260 "{{_DEBPUTY_SND_SOURCE_DATE_EPOCH}}", "Internal; strip-nondeterminism"
261 )
263 _apply_strip_non_determinism(source_date_epoch, paths)