Coverage for src/debputy/plugin/debputy/strip_non_determinism.py: 69%

109 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2import os.path 

3import re 

4import subprocess 

5from contextlib import ExitStack 

6from enum import IntEnum 

7from typing import Iterator, Optional, List, Callable, Any, Tuple, Union 

8 

9from debputy.plugin.api import VirtualPath 

10from debputy.plugin.api.impl_types import PackageProcessingContextProvider 

11from debputy.util import xargs, _info, escape_shell, _error 

12 

13 

14class DetectionVerdict(IntEnum): 

15 NOT_RELEVANT = 1 

16 NEEDS_FILE_OUTPUT = 2 

17 PROCESS = 3 

18 

19 

20def _file_starts_with( 

21 sequences: Union[bytes, Tuple[bytes, ...]] 

22) -> Callable[[VirtualPath], bool]: 

23 if isinstance(sequences, bytes): 

24 longest_sequence = len(sequences) 

25 sequences = (sequences,) 

26 else: 

27 longest_sequence = max(len(s) for s in sequences) 

28 

29 def _checker(path: VirtualPath) -> bool: 

30 with path.open(byte_io=True, buffering=4096) as fd: 

31 buffer = fd.read(longest_sequence) 

32 return buffer.startswith(sequences) 

33 

34 return _checker 

35 

36 

37def _is_javadoc_file(path: VirtualPath) -> bool: 

38 with path.open(buffering=4096) as fd: 

39 c = fd.read(1024) 

40 return "<!-- Generated by javadoc" in c 

41 

42 

43class SndDetectionRule: 

44 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

45 raise NotImplementedError 

46 

47 def file_output_verdict( 

48 self, 

49 path: VirtualPath, 

50 file_analysis: Optional[str], 

51 ) -> bool: 

52 raise TypeError( 

53 "Should not have been called or the rule forgot to implement this method" 

54 ) 

55 

56 

57@dataclasses.dataclass(frozen=True, slots=True) 

58class ExtensionPlusFileOutputRule(SndDetectionRule): 

59 extensions: Tuple[str, ...] 

60 file_pattern: Optional[re.Pattern[str]] = None 

61 

62 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

63 _, ext = os.path.splitext(path.name) 

64 if ext not in self.extensions: 64 ↛ 66line 64 didn't jump to line 66 because the condition on line 64 was always true

65 return DetectionVerdict.NOT_RELEVANT 

66 if self.file_pattern is None: 

67 return DetectionVerdict.PROCESS 

68 return DetectionVerdict.NEEDS_FILE_OUTPUT 

69 

70 def file_output_verdict( 

71 self, 

72 path: VirtualPath, 

73 file_analysis: Optional[str], 

74 ) -> bool: 

75 file_pattern = self.file_pattern 

76 assert file_pattern is not None and file_analysis is not None 

77 m = file_pattern.search(file_analysis) 

78 return m is not None 

79 

80 

81@dataclasses.dataclass(frozen=True, slots=True) 

82class ExtensionPlusContentCheck(SndDetectionRule): 

83 extensions: Tuple[str, ...] 

84 content_check: Callable[[VirtualPath], bool] 

85 

86 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

87 _, ext = os.path.splitext(path.name) 

88 if ext not in self.extensions: 

89 return DetectionVerdict.NOT_RELEVANT 

90 content_verdict = self.content_check(path) 

91 if content_verdict: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 return DetectionVerdict.PROCESS 

93 return DetectionVerdict.NOT_RELEVANT 

94 

95 

96class PyzipFileCheck(SndDetectionRule): 

97 def _is_pyzip_file(self, path: VirtualPath) -> bool: 

98 with path.open(byte_io=True, buffering=4096) as fd: 

99 c = fd.read(32) 

100 if not c.startswith(b"#!"): 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true

101 return False 

102 

103 return b"\nPK\x03\x04" in c 

104 

105 def initial_verdict(self, path: VirtualPath) -> DetectionVerdict: 

106 if self._is_pyzip_file(path): 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 return DetectionVerdict.PROCESS 

108 return DetectionVerdict.NOT_RELEVANT 

109 

110 

111# These detection rules should be aligned with `get_normalizer_for_file` in File::StripNondeterminism. 

112# Note if we send a file too much, it is just bad for performance. If we send a file to little, we 

113# risk non-determinism in the final output. 

114SND_DETECTION_RULES: List[SndDetectionRule] = [ 

115 ExtensionPlusContentCheck( 

116 extensions=(".a",), 

117 content_check=_file_starts_with( 

118 ( 

119 b"!<arch>\n", 

120 b"!<thin>\n", 

121 ), 

122 ), 

123 ), 

124 ExtensionPlusContentCheck( 

125 extensions=(".png",), 

126 content_check=_file_starts_with(b"\x89PNG\x0D\x0A\x1A\x0A"), 

127 ), 

128 ExtensionPlusContentCheck( 

129 extensions=(".gz", ".dz"), 

130 content_check=_file_starts_with(b"\x1F\x8B"), 

131 ), 

132 ExtensionPlusContentCheck( 

133 extensions=( 

134 # .zip related 

135 ".zip", 

136 ".pk3", 

137 ".epub", 

138 ".whl", 

139 ".xpi", 

140 ".htb", 

141 ".zhfst", 

142 ".par", 

143 ".codadef", 

144 # .jar related 

145 ".jar", 

146 ".war", 

147 ".hpi", 

148 ".apk", 

149 ".sym", 

150 ), 

151 content_check=_file_starts_with( 

152 ( 

153 b"PK\x03\x04\x1F", 

154 b"PK\x05\x06", 

155 b"PK\x07\x08", 

156 ) 

157 ), 

158 ), 

159 ExtensionPlusContentCheck( 

160 extensions=( 

161 ".mo", 

162 ".gmo", 

163 ), 

164 content_check=_file_starts_with( 

165 ( 

166 b"\x95\x04\x12\xde", 

167 b"\xde\x12\x04\x95", 

168 ) 

169 ), 

170 ), 

171 ExtensionPlusContentCheck( 

172 extensions=(".uimage",), 

173 content_check=_file_starts_with(b"\x27\x05\x19\x56"), 

174 ), 

175 ExtensionPlusContentCheck( 

176 extensions=(".bflt",), 

177 content_check=_file_starts_with(b"\x62\x46\x4C\x54"), 

178 ), 

179 ExtensionPlusContentCheck( 

180 extensions=(".jmod",), 

181 content_check=_file_starts_with(b"JM"), 

182 ), 

183 ExtensionPlusContentCheck( 

184 extensions=(".html",), 

185 content_check=_is_javadoc_file, 

186 ), 

187 PyzipFileCheck(), 

188 ExtensionPlusFileOutputRule( 

189 extensions=(".cpio",), 

190 # XXX: Add file output check (requires the file output support) 

191 ), 

192] 

193 

194 

195def _detect_paths_with_possible_non_determinism( 

196 fs_root: VirtualPath, 

197) -> Iterator[VirtualPath]: 

198 needs_file_output = [] 

199 for path in fs_root.all_paths(): 

200 if not path.is_file: 

201 continue 

202 verdict = DetectionVerdict.NOT_RELEVANT 

203 needs_file_output_rules = [] 

204 for rule in SND_DETECTION_RULES: 

205 v = rule.initial_verdict(path) 

206 if v > verdict: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 verdict = v 

208 if verdict == DetectionVerdict.PROCESS: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 yield path 

210 break 

211 elif verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 needs_file_output_rules.append(rule) 

213 

214 if verdict == DetectionVerdict.NEEDS_FILE_OUTPUT: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 needs_file_output.append((path, needs_file_output_rules)) 

216 

217 assert not needs_file_output 

218 # FIXME: Implement file check 

219 

220 

221def _apply_strip_non_determinism(timestamp: str, paths: List[VirtualPath]) -> None: 

222 static_cmd = [ 

223 "strip-nondeterminism", 

224 f"--timestamp={timestamp}", 

225 "--normalizers=+all", 

226 ] 

227 with ExitStack() as manager: 

228 affected_files = [ 

229 manager.enter_context(p.replace_fs_path_content()) for p in paths 

230 ] 

231 for cmd in xargs(static_cmd, affected_files): 

232 _info( 

233 f"Removing (possible) unnecessary non-deterministic content via: {escape_shell(*cmd)}" 

234 ) 

235 try: 

236 subprocess.check_call( 

237 cmd, 

238 stdin=subprocess.DEVNULL, 

239 restore_signals=True, 

240 ) 

241 except subprocess.CalledProcessError: 

242 _error( 

243 "Attempting to remove unnecessary non-deterministic content failed. Please review" 

244 " the error from strip-nondeterminism above understand what went wrong." 

245 ) 

246 

247 

248def strip_non_determinism( 

249 fs_root: VirtualPath, _: Any, context: PackageProcessingContextProvider 

250) -> None: 

251 paths = list(_detect_paths_with_possible_non_determinism(fs_root)) 

252 

253 if not paths: 253 ↛ 257line 253 didn't jump to line 257 because the condition on line 253 was always true

254 _info("Detected no paths to be processed by strip-nondeterminism") 

255 return 

256 

257 substitution = context._manifest.substitution 

258 

259 source_date_epoch = substitution.substitute( 

260 "{{_DEBPUTY_SND_SOURCE_DATE_EPOCH}}", "Internal; strip-nondeterminism" 

261 ) 

262 

263 _apply_strip_non_determinism(source_date_epoch, paths)