Coverage for src/debputy/substitution.py: 85%

154 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import os 

3import re 

4from enum import IntEnum 

5from typing import FrozenSet, NoReturn, Optional, Set, TYPE_CHECKING, Self 

6from collections.abc import Mapping 

7 

8from debputy.architecture_support import ( 

9 dpkg_architecture_table, 

10 DpkgArchitectureBuildProcessValuesTable, 

11) 

12from debputy.exceptions import DebputySubstitutionError 

13from debputy.util import glob_escape 

14 

15if TYPE_CHECKING: 

16 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

17 from debputy.plugin.api import VirtualPath 

18 

19 

20SUBST_VAR_RE = re.compile( 

21 r""" 

22 ([{][{][ ]*) 

23 

24 ( 

25 _?[A-Za-z0-9]+ 

26 (?:[-_:][A-Za-z0-9]+)* 

27 ) 

28 

29 ([ ]*[}][}]) 

30""", 

31 re.VERBOSE, 

32) 

33 

34 

35class VariableNameState(IntEnum): 

36 UNDEFINED = 1 

37 RESERVED = 2 

38 DEFINED = 3 

39 

40 

41@dataclasses.dataclass(slots=True, frozen=True) 

42class VariableContext: 

43 debian_dir: "VirtualPath" 

44 

45 

46class Substitution: 

47 def substitute( 

48 self, 

49 value: str, 

50 definition_source: str, 

51 /, 

52 escape_glob_characters: bool = False, 

53 ) -> str: 

54 raise NotImplementedError 

55 

56 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

57 raise NotImplementedError 

58 

59 def with_unresolvable_substitutions( 

60 self, *extra_substitutions: str 

61 ) -> "Substitution": 

62 raise NotImplementedError 

63 

64 def variable_state(self, variable_name: str) -> VariableNameState: 

65 return VariableNameState.UNDEFINED 

66 

67 def is_used(self, variable_name: str) -> bool: 

68 return False 

69 

70 def _mark_used(self, variable_name: str) -> None: 

71 pass 

72 

73 def _replacement(self, matched_key: str, definition_source: str) -> str: 

74 self._error( 

75 "Cannot resolve {{" + matched_key + "}}." 

76 f" The error occurred while trying to process {definition_source}" 

77 ) 

78 

79 def _error( 

80 self, 

81 msg: str, 

82 *, 

83 caused_by: BaseException | None = None, 

84 ) -> NoReturn: 

85 raise DebputySubstitutionError(msg) from caused_by 

86 

87 def _apply_substitution( 

88 self, 

89 pattern: re.Pattern[str], 

90 value: str, 

91 definition_source: str, 

92 /, 

93 escape_glob_characters: bool = False, 

94 ) -> str: 

95 replacement = value 

96 offset = 0 

97 for match in pattern.finditer(value): 

98 prefix, matched_key, suffix = match.groups() 

99 replacement_value = self._replacement(matched_key, definition_source) 

100 self._mark_used(matched_key) 

101 if escape_glob_characters: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 replacement_value = glob_escape(replacement_value) 

103 s, e = match.span() 

104 s += offset 

105 e += offset 

106 replacement = replacement[:s] + replacement_value + replacement[e:] 

107 token_fluff_len = len(prefix) + len(suffix) 

108 offset += len(replacement_value) - len(matched_key) - token_fluff_len 

109 return replacement 

110 

111 

112class NullSubstitution(Substitution): 

113 def substitute( 

114 self, 

115 value: str, 

116 definition_source: str, 

117 /, 

118 escape_glob_characters: bool = False, 

119 ) -> str: 

120 return value 

121 

122 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

123 return self 

124 

125 def with_unresolvable_substitutions( 

126 self, *extra_substitutions: str 

127 ) -> "Substitution": 

128 return self 

129 

130 

131NULL_SUBSTITUTION = NullSubstitution() 

132del NullSubstitution 

133 

134 

135class SubstitutionImpl(Substitution): 

136 __slots__ = ( 

137 "_used", 

138 "_env", 

139 "_plugin_feature_set", 

140 "_static_variables", 

141 "_unresolvable_substitutions", 

142 "_dpkg_arch_table", 

143 "_parent", 

144 "_variable_context", 

145 ) 

146 

147 def __init__( 

148 self, 

149 /, 

150 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None, 

151 static_variables: Mapping[str, str] | None = None, 

152 unresolvable_substitutions: frozenset[str] = frozenset(), 

153 dpkg_arch_table: DpkgArchitectureBuildProcessValuesTable | None = None, 

154 environment: Mapping[str, str] | None = None, 

155 parent: Optional["SubstitutionImpl"] = None, 

156 variable_context: VariableContext | None = None, 

157 ) -> None: 

158 self._used: set[str] = set() 

159 self._plugin_feature_set = plugin_feature_set 

160 self._static_variables = ( 

161 dict(static_variables) if static_variables is not None else None 

162 ) 

163 self._unresolvable_substitutions = unresolvable_substitutions 

164 self._dpkg_arch_table = ( 

165 dpkg_arch_table 

166 if dpkg_arch_table is not None 

167 else dpkg_architecture_table() 

168 ) 

169 self._env = environment if environment is not None else os.environ 

170 self._parent = parent 

171 if variable_context is not None: 

172 self._variable_context = variable_context 

173 elif self._parent is not None: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true

174 self._variable_context = self._parent._variable_context 

175 else: 

176 raise ValueError( 

177 "variable_context is required either directly or via the parent" 

178 ) 

179 

180 def copy_for_subst_test( 

181 self, 

182 plugin_feature_set: "PluginProvidedFeatureSet", 

183 variable_context: VariableContext, 

184 *, 

185 extra_substitutions: Mapping[str, str] | None = None, 

186 environment: Mapping[str, str] | None = None, 

187 ) -> "Self": 

188 extra_substitutions_impl = ( 

189 dict(self._static_variables.items()) if self._static_variables else {} 

190 ) 

191 if extra_substitutions: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 extra_substitutions_impl.update(extra_substitutions) 

193 return self.__class__( 

194 plugin_feature_set=plugin_feature_set, 

195 variable_context=variable_context, 

196 static_variables=extra_substitutions_impl, 

197 unresolvable_substitutions=self._unresolvable_substitutions, 

198 dpkg_arch_table=self._dpkg_arch_table, 

199 environment=environment if environment is not None else {}, 

200 ) 

201 

202 def variable_state(self, key: str) -> VariableNameState: 

203 if key.startswith("DEB_"): 

204 if key in self._dpkg_arch_table: 

205 return VariableNameState.DEFINED 

206 return VariableNameState.RESERVED 

207 plugin_feature_set = self._plugin_feature_set 

208 if ( 

209 plugin_feature_set is not None 

210 and key in plugin_feature_set.manifest_variables 

211 ): 

212 return VariableNameState.DEFINED 

213 if key.startswith("env:"): 

214 k = key[4:] 

215 if k in self._env: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 return VariableNameState.DEFINED 

217 return VariableNameState.RESERVED 

218 if self._static_variables is not None and key in self._static_variables: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return VariableNameState.DEFINED 

220 if key in self._unresolvable_substitutions: 

221 return VariableNameState.RESERVED 

222 if self._parent is not None: 

223 return self._parent.variable_state(key) 

224 return VariableNameState.UNDEFINED 

225 

226 def is_used(self, variable_name: str) -> bool: 

227 if variable_name in self._used: 

228 return True 

229 parent = self._parent 

230 if parent is not None: 

231 return parent.is_used(variable_name) 

232 return False 

233 

234 def _mark_used(self, variable_name: str) -> None: 

235 p = self._parent 

236 while p: 

237 # Find the parent that has the variable if possible. This ensures that is_used works 

238 # correctly. 

239 if p._static_variables is not None and variable_name in p._static_variables: 

240 p._mark_used(variable_name) 

241 break 

242 plugin_feature_set = p._plugin_feature_set 

243 if ( 243 ↛ 250line 243 didn't jump to line 250 because the condition on line 243 was never true

244 plugin_feature_set is not None 

245 and variable_name in plugin_feature_set.manifest_variables 

246 and not plugin_feature_set.manifest_variables[ 

247 variable_name 

248 ].is_documentation_placeholder 

249 ): 

250 p._mark_used(variable_name) 

251 break 

252 p = p._parent 

253 self._used.add(variable_name) 

254 

255 def _replacement(self, key: str, definition_source: str) -> str: 

256 if key.startswith("DEB_") and key in self._dpkg_arch_table: 

257 return self._dpkg_arch_table[key] 

258 if key.startswith("env:"): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 k = key[4:] 

260 if k in self._env: 

261 return self._env[k] 

262 self._error( 

263 f'The environment does not contain the variable "{key}" ' 

264 f"(error occurred while trying to process {definition_source})" 

265 ) 

266 

267 # The order between extra_substitution and plugin_feature_set is leveraged by 

268 # the tests to implement mocking variables. If the order needs tweaking, 

269 # you will need a custom resolver for the tests to support mocking. 

270 static_variables = self._static_variables 

271 if static_variables and key in static_variables: 

272 return static_variables[key] 

273 plugin_feature_set = self._plugin_feature_set 

274 if plugin_feature_set is not None: 

275 provided_var = plugin_feature_set.manifest_variables.get(key) 

276 if ( 

277 provided_var is not None 

278 and not provided_var.is_documentation_placeholder 

279 ): 

280 v = provided_var.resolve(self._variable_context) 

281 # cache it for next time. 

282 if static_variables is None: 

283 static_variables = {} 

284 self._static_variables = static_variables 

285 static_variables[key] = v 

286 return v 

287 if key in self._unresolvable_substitutions: 

288 self._error( 

289 "The variable {{" + key + "}}" 

290 f" is not available while processing {definition_source}." 

291 ) 

292 parent = self._parent 

293 if parent is not None: 

294 return parent._replacement(key, definition_source) 

295 self._error( 

296 "Cannot resolve {{" + key + "}}: it is not a known key." 

297 f" The error occurred while trying to process {definition_source}" 

298 ) 

299 

300 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

301 if not extra_substitutions: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 return self 

303 return SubstitutionImpl( 

304 dpkg_arch_table=self._dpkg_arch_table, 

305 environment=self._env, 

306 static_variables=extra_substitutions, 

307 parent=self, 

308 ) 

309 

310 def with_unresolvable_substitutions( 

311 self, 

312 *extra_substitutions: str, 

313 ) -> "Substitution": 

314 if not extra_substitutions: 

315 return self 

316 return SubstitutionImpl( 

317 dpkg_arch_table=self._dpkg_arch_table, 

318 environment=self._env, 

319 unresolvable_substitutions=frozenset(extra_substitutions), 

320 parent=self, 

321 ) 

322 

323 def substitute( 

324 self, 

325 value: str, 

326 definition_source: str, 

327 /, 

328 escape_glob_characters: bool = False, 

329 ) -> str: 

330 if "{{" not in value: 

331 return value 

332 return self._apply_substitution( 

333 SUBST_VAR_RE, 

334 value, 

335 definition_source, 

336 escape_glob_characters=escape_glob_characters, 

337 )