Coverage for src/debputy/plugins/debputy/strip_non_determinism.py: 69%

110 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import os.path 

3import re 

4import subprocess 

5from contextlib import ExitStack 

6from enum import IntEnum 

7from typing import Optional, List, Any, Tuple, Union 

8from collections.abc import Iterator, Callable 

9 

10from debputy.plugin.api import VirtualPath 

11from debputy.plugin.api.impl_types import PackageProcessingContextProvider 

12from debputy.util import xargs, _info, escape_shell, _error 

13 

14 

15class DetectionVerdict(IntEnum): 

16 NOT_RELEVANT = 1 

17 NEEDS_FILE_OUTPUT = 2 

18 PROCESS = 3 

19 

20 

21def _file_starts_with( 

22 sequences: bytes | tuple[bytes, ...], 

23) -> Callable[[VirtualPath], bool]: 

24 if isinstance(sequences, bytes): 

25 longest_sequence = len(sequences) 

26 sequences = (sequences,) 

27 else: 

28 longest_sequence = max(len(s) for s in sequences) 

29 

30 def _checker(path: VirtualPath) -> bool: 

31 with path.open(byte_io=True, buffering=4096) as fd: 

32 buffer = fd.read(longest_sequence) 

33 return buffer.startswith(sequences) 

34 

35 return _checker 

36 

37 

38def _is_javadoc_file(path: VirtualPath) -> bool: 

39 with path.open(buffering=4096) as fd: 

40 c = fd.read(1024) 

41 return "<!-- Generated by javadoc" in c 

42 

43 

44class SndDetectionRule: 

45 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

46 raise NotImplementedError 

47 

48 def file_output_verdict( 

49 self, 

50 path: VirtualPath, 

51 file_analysis: str | None, 

52 ) -> bool: 

53 raise TypeError( 

54 "Should not have been called or the rule forgot to implement this method" 

55 ) 

56 

57 

58@dataclasses.dataclass(frozen=True, slots=True) 

59class ExtensionPlusFileOutputRule(SndDetectionRule): 

60 extensions: tuple[str, ...] 

61 file_pattern: re.Pattern[str] | None = None 

62 

63 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

64 _, ext = os.path.splitext(path.name) 

65 if ext not in self.extensions: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true

66 return DetectionVerdict.NOT_RELEVANT 

67 if self.file_pattern is None: 

68 return DetectionVerdict.PROCESS 

69 return DetectionVerdict.NEEDS_FILE_OUTPUT 

70 

71 def file_output_verdict( 

72 self, 

73 path: VirtualPath, 

74 file_analysis: str | None, 

75 ) -> bool: 

76 file_pattern = self.file_pattern 

77 assert file_pattern is not None and file_analysis is not None 

78 m = file_pattern.search(file_analysis) 

79 return m is not None 

80 

81 

82@dataclasses.dataclass(frozen=True, slots=True) 

83class ExtensionPlusContentCheck(SndDetectionRule): 

84 extensions: tuple[str, ...] 

85 content_check: Callable[[VirtualPath], bool] 

86 

87 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

88 _, ext = os.path.splitext(path.name) 

89 if ext not in self.extensions: 

90 return DetectionVerdict.NOT_RELEVANT 

91 content_verdict = self.content_check(path) 

92 if content_verdict: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 return DetectionVerdict.PROCESS 

94 return DetectionVerdict.NOT_RELEVANT 

95 

96 

97class PyzipFileCheck(SndDetectionRule): 

98 def _is_pyzip_file(self, path: VirtualPath) -> bool: 

99 with path.open(byte_io=True, buffering=4096) as fd: 

100 c = fd.read(32) 

101 if not c.startswith(b"#!"): 101 ↛ 104line 101 didn't jump to line 104 because the condition on line 101 was always true

102 return False 

103 

104 return b"\nPK\x03\x04" in c 

105 

106 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

107 if self._is_pyzip_file(path): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 return DetectionVerdict.PROCESS 

109 return DetectionVerdict.NOT_RELEVANT 

110 

111 

112# These detection rules should be aligned with `get_normalizer_for_file` in File::StripNondeterminism. 

113# Note if we send a file too much, it is just bad for performance. If we send a file to little, we 

114# risk non-determinism in the final output. 

115SND_DETECTION_RULES: list[SndDetectionRule] = [ 

116 ExtensionPlusContentCheck( 

117 extensions=(".a",), 

118 content_check=_file_starts_with( 

119 ( 

120 b"!<arch>\n", 

121 b"!<thin>\n", 

122 ), 

123 ), 

124 ), 

125 ExtensionPlusContentCheck( 

126 extensions=(".png",), 

127 content_check=_file_starts_with(b"\x89PNG\x0d\x0a\x1a\x0a"), 

128 ), 

129 ExtensionPlusContentCheck( 

130 extensions=(".gz", ".dz"), 

131 content_check=_file_starts_with(b"\x1f\x8b"), 

132 ), 

133 ExtensionPlusContentCheck( 

134 extensions=( 

135 # .zip related 

136 ".zip", 

137 ".pk3", 

138 ".epub", 

139 ".whl", 

140 ".xpi", 

141 ".htb", 

142 ".zhfst", 

143 ".par", 

144 ".codadef", 

145 # .jar related 

146 ".jar", 

147 ".war", 

148 ".hpi", 

149 ".apk", 

150 ".sym", 

151 ), 

152 content_check=_file_starts_with( 

153 ( 

154 b"PK\x03\x04\x1f", 

155 b"PK\x05\x06", 

156 b"PK\x07\x08", 

157 ) 

158 ), 

159 ), 

160 ExtensionPlusContentCheck( 

161 extensions=( 

162 ".mo", 

163 ".gmo", 

164 ), 

165 content_check=_file_starts_with( 

166 ( 

167 b"\x95\x04\x12\xde", 

168 b"\xde\x12\x04\x95", 

169 ) 

170 ), 

171 ), 

172 ExtensionPlusContentCheck( 

173 extensions=(".uimage",), 

174 content_check=_file_starts_with(b"\x27\x05\x19\x56"), 

175 ), 

176 ExtensionPlusContentCheck( 

177 extensions=(".bflt",), 

178 content_check=_file_starts_with(b"\x62\x46\x4c\x54"), 

179 ), 

180 ExtensionPlusContentCheck( 

181 extensions=(".jmod",), 

182 content_check=_file_starts_with(b"JM"), 

183 ), 

184 ExtensionPlusContentCheck( 

185 extensions=(".html",), 

186 content_check=_is_javadoc_file, 

187 ), 

188 PyzipFileCheck(), 

189 ExtensionPlusFileOutputRule( 

190 extensions=(".cpio",), 

191 # XXX: Add file output check (requires the file output support) 

192 ), 

193] 

194 

195 

196def _detect_paths_with_possible_non_determinism( 

197 fs_root: VirtualPath, 

198) -> Iterator[VirtualPath]: 

199 needs_file_output = [] 

200 for path in fs_root.all_paths(): 

201 if not path.is_file: 

202 continue 

203 verdict = DetectionVerdict.NOT_RELEVANT 

204 needs_file_output_rules = [] 

205 for rule in SND_DETECTION_RULES: 

206 v = rule.initial_verdict(path) 

207 if v > verdict: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 verdict = v 

209 if verdict == DetectionVerdict.PROCESS: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 yield path 

211 break 

212 elif verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 needs_file_output_rules.append(rule) 

214 

215 if verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 needs_file_output.append((path, needs_file_output_rules)) 

217 

218 assert not needs_file_output 

219 # FIXME: Implement file check 

220 

221 

222def _apply_strip_non_determinism(timestamp: str, paths: list[VirtualPath]) -> None: 

223 static_cmd = [ 

224 "strip-nondeterminism", 

225 f"--timestamp={timestamp}", 

226 "--normalizers=+all", 

227 ] 

228 with ExitStack() as manager: 

229 affected_files = [ 

230 manager.enter_context(p.replace_fs_path_content()) for p in paths 

231 ] 

232 for cmd in xargs(static_cmd, affected_files): 

233 _info( 

234 f"Removing (possible) unnecessary non-deterministic content via: {escape_shell(*cmd)}" 

235 ) 

236 try: 

237 subprocess.check_call( 

238 cmd, 

239 stdin=subprocess.DEVNULL, 

240 restore_signals=True, 

241 ) 

242 except subprocess.CalledProcessError: 

243 _error( 

244 "Attempting to remove unnecessary non-deterministic content failed. Please review" 

245 " the error from strip-nondeterminism above understand what went wrong." 

246 ) 

247 

248 

249def strip_non_determinism( 

250 fs_root: VirtualPath, _: Any, context: PackageProcessingContextProvider 

251) -> None: 

252 paths = list(_detect_paths_with_possible_non_determinism(fs_root)) 

253 

254 if not paths: 254 ↛ 258line 254 didn't jump to line 258 because the condition on line 254 was always true

255 _info("Detected no paths to be processed by strip-nondeterminism") 

256 return 

257 

258 substitution = context._manifest.substitution 

259 

260 source_date_epoch = substitution.substitute( 

261 "{{_DEBPUTY_SND_SOURCE_DATE_EPOCH}}", "Internal; strip-nondeterminism" 

262 ) 

263 

264 _apply_strip_non_determinism(source_date_epoch, paths)