Coverage for src/debputy/substitution.py: 85%

153 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-07 09:46 +0000

1import dataclasses 

2import os 

3import re 

4from enum import IntEnum 

5from typing import FrozenSet, NoReturn, Optional, Set, TYPE_CHECKING, Self 

6from collections.abc import Mapping 

7 

8from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

9from debputy.exceptions import DebputySubstitutionError 

10from debputy.util import glob_escape 

11 

12if TYPE_CHECKING: 

13 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

14 from debputy.plugin.api import VirtualPath 

15 

16 

17SUBST_VAR_RE = re.compile( 

18 r""" 

19 ([{][{][ ]*) 

20 

21 ( 

22 _?[A-Za-z0-9]+ 

23 (?:[-_:][A-Za-z0-9]+)* 

24 ) 

25 

26 ([ ]*[}][}]) 

27""", 

28 re.VERBOSE, 

29) 

30 

31 

32class VariableNameState(IntEnum): 

33 UNDEFINED = 1 

34 RESERVED = 2 

35 DEFINED = 3 

36 

37 

38@dataclasses.dataclass(slots=True, frozen=True) 

39class VariableContext: 

40 debian_dir: "VirtualPath" 

41 

42 

43class Substitution: 

44 def substitute( 

45 self, 

46 value: str, 

47 definition_source: str, 

48 /, 

49 escape_glob_characters: bool = False, 

50 ) -> str: 

51 raise NotImplementedError 

52 

53 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

54 raise NotImplementedError 

55 

56 def with_unresolvable_substitutions( 

57 self, *extra_substitutions: str 

58 ) -> "Substitution": 

59 raise NotImplementedError 

60 

61 def variable_state(self, variable_name: str) -> VariableNameState: 

62 return VariableNameState.UNDEFINED 

63 

64 def is_used(self, variable_name: str) -> bool: 

65 return False 

66 

67 def _mark_used(self, variable_name: str) -> None: 

68 pass 

69 

70 def _replacement(self, matched_key: str, definition_source: str) -> str: 

71 self._error( 

72 "Cannot resolve {{" + matched_key + "}}." 

73 f" The error occurred while trying to process {definition_source}" 

74 ) 

75 

76 def _error( 

77 self, 

78 msg: str, 

79 *, 

80 caused_by: BaseException | None = None, 

81 ) -> NoReturn: 

82 raise DebputySubstitutionError(msg) from caused_by 

83 

84 def _apply_substitution( 

85 self, 

86 pattern: re.Pattern[str], 

87 value: str, 

88 definition_source: str, 

89 /, 

90 escape_glob_characters: bool = False, 

91 ) -> str: 

92 replacement = value 

93 offset = 0 

94 for match in pattern.finditer(value): 

95 prefix, matched_key, suffix = match.groups() 

96 replacement_value = self._replacement(matched_key, definition_source) 

97 self._mark_used(matched_key) 

98 if escape_glob_characters: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 replacement_value = glob_escape(replacement_value) 

100 s, e = match.span() 

101 s += offset 

102 e += offset 

103 replacement = replacement[:s] + replacement_value + replacement[e:] 

104 token_fluff_len = len(prefix) + len(suffix) 

105 offset += len(replacement_value) - len(matched_key) - token_fluff_len 

106 return replacement 

107 

108 

109class _NullSubstitution(Substitution): 

110 def substitute( 

111 self, 

112 value: str, 

113 definition_source: str, 

114 /, 

115 escape_glob_characters: bool = False, 

116 ) -> str: 

117 return value 

118 

119 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

120 return self 

121 

122 def with_unresolvable_substitutions( 

123 self, *extra_substitutions: str 

124 ) -> "Substitution": 

125 return self 

126 

127 

128NULL_SUBSTITUTION: Substitution = _NullSubstitution() 

129 

130 

131class SubstitutionImpl(Substitution): 

132 __slots__ = ( 

133 "_used", 

134 "_env", 

135 "_plugin_feature_set", 

136 "_static_variables", 

137 "_unresolvable_substitutions", 

138 "_dpkg_arch_table", 

139 "_parent", 

140 "_variable_context", 

141 ) 

142 

143 def __init__( 

144 self, 

145 /, 

146 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None, 

147 static_variables: Mapping[str, str] | None = None, 

148 unresolvable_substitutions: frozenset[str] = frozenset(), 

149 dpkg_arch_table: DpkgArchitectureBuildProcessValuesTable | None = None, 

150 environment: Mapping[str, str] | None = None, 

151 parent: Optional["SubstitutionImpl"] = None, 

152 variable_context: VariableContext | None = None, 

153 ) -> None: 

154 self._used: set[str] = set() 

155 self._plugin_feature_set = plugin_feature_set 

156 self._static_variables = ( 

157 dict(static_variables) if static_variables is not None else None 

158 ) 

159 self._unresolvable_substitutions = unresolvable_substitutions 

160 self._dpkg_arch_table = ( 

161 dpkg_arch_table 

162 if dpkg_arch_table is not None 

163 else DpkgArchitectureBuildProcessValuesTable() 

164 ) 

165 self._env = environment if environment is not None else os.environ 

166 self._parent = parent 

167 if variable_context is not None: 

168 self._variable_context = variable_context 

169 elif self._parent is not None: 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true

170 self._variable_context = self._parent._variable_context 

171 else: 

172 raise ValueError( 

173 "variable_context is required either directly or via the parent" 

174 ) 

175 

176 def copy_for_subst_test( 

177 self, 

178 plugin_feature_set: "PluginProvidedFeatureSet", 

179 variable_context: VariableContext, 

180 *, 

181 extra_substitutions: Mapping[str, str] | None = None, 

182 environment: Mapping[str, str] | None = None, 

183 ) -> "Self": 

184 extra_substitutions_impl = ( 

185 dict(self._static_variables.items()) if self._static_variables else {} 

186 ) 

187 if extra_substitutions: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 extra_substitutions_impl.update(extra_substitutions) 

189 return self.__class__( 

190 plugin_feature_set=plugin_feature_set, 

191 variable_context=variable_context, 

192 static_variables=extra_substitutions_impl, 

193 unresolvable_substitutions=self._unresolvable_substitutions, 

194 dpkg_arch_table=self._dpkg_arch_table, 

195 environment=environment if environment is not None else {}, 

196 ) 

197 

198 def variable_state(self, key: str) -> VariableNameState: 

199 if key.startswith("DEB_"): 

200 if key in self._dpkg_arch_table: 

201 return VariableNameState.DEFINED 

202 return VariableNameState.RESERVED 

203 plugin_feature_set = self._plugin_feature_set 

204 if ( 

205 plugin_feature_set is not None 

206 and key in plugin_feature_set.manifest_variables 

207 ): 

208 return VariableNameState.DEFINED 

209 if key.startswith("env:"): 

210 k = key[4:] 

211 if k in self._env: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return VariableNameState.DEFINED 

213 return VariableNameState.RESERVED 

214 if self._static_variables is not None and key in self._static_variables: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 return VariableNameState.DEFINED 

216 if key in self._unresolvable_substitutions: 

217 return VariableNameState.RESERVED 

218 if self._parent is not None: 

219 return self._parent.variable_state(key) 

220 return VariableNameState.UNDEFINED 

221 

222 def is_used(self, variable_name: str) -> bool: 

223 if variable_name in self._used: 

224 return True 

225 parent = self._parent 

226 if parent is not None: 

227 return parent.is_used(variable_name) 

228 return False 

229 

230 def _mark_used(self, variable_name: str) -> None: 

231 p = self._parent 

232 while p: 

233 # Find the parent that has the variable if possible. This ensures that is_used works 

234 # correctly. 

235 if p._static_variables is not None and variable_name in p._static_variables: 

236 p._mark_used(variable_name) 

237 break 

238 plugin_feature_set = p._plugin_feature_set 

239 if ( 239 ↛ 246line 239 didn't jump to line 246 because the condition on line 239 was never true

240 plugin_feature_set is not None 

241 and variable_name in plugin_feature_set.manifest_variables 

242 and not plugin_feature_set.manifest_variables[ 

243 variable_name 

244 ].is_documentation_placeholder 

245 ): 

246 p._mark_used(variable_name) 

247 break 

248 p = p._parent 

249 self._used.add(variable_name) 

250 

251 def _replacement(self, key: str, definition_source: str) -> str: 

252 if key.startswith("DEB_") and key in self._dpkg_arch_table: 

253 return self._dpkg_arch_table[key] 

254 if key.startswith("env:"): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 k = key[4:] 

256 if k in self._env: 

257 return self._env[k] 

258 self._error( 

259 f'The environment does not contain the variable "{key}" ' 

260 f"(error occurred while trying to process {definition_source})" 

261 ) 

262 

263 # The order between extra_substitution and plugin_feature_set is leveraged by 

264 # the tests to implement mocking variables. If the order needs tweaking, 

265 # you will need a custom resolver for the tests to support mocking. 

266 static_variables = self._static_variables 

267 if static_variables and key in static_variables: 

268 return static_variables[key] 

269 plugin_feature_set = self._plugin_feature_set 

270 if plugin_feature_set is not None: 

271 provided_var = plugin_feature_set.manifest_variables.get(key) 

272 if ( 

273 provided_var is not None 

274 and not provided_var.is_documentation_placeholder 

275 ): 

276 v = provided_var.resolve(self._variable_context) 

277 # cache it for next time. 

278 if static_variables is None: 

279 static_variables = {} 

280 self._static_variables = static_variables 

281 static_variables[key] = v 

282 return v 

283 if key in self._unresolvable_substitutions: 

284 self._error( 

285 "The variable {{" + key + "}}" 

286 f" is not available while processing {definition_source}." 

287 ) 

288 parent = self._parent 

289 if parent is not None: 

290 return parent._replacement(key, definition_source) 

291 self._error( 

292 "Cannot resolve {{" + key + "}}: it is not a known key." 

293 f" The error occurred while trying to process {definition_source}" 

294 ) 

295 

296 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution": 

297 if not extra_substitutions: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 return self 

299 return SubstitutionImpl( 

300 dpkg_arch_table=self._dpkg_arch_table, 

301 environment=self._env, 

302 static_variables=extra_substitutions, 

303 parent=self, 

304 ) 

305 

306 def with_unresolvable_substitutions( 

307 self, 

308 *extra_substitutions: str, 

309 ) -> "Substitution": 

310 if not extra_substitutions: 

311 return self 

312 return SubstitutionImpl( 

313 dpkg_arch_table=self._dpkg_arch_table, 

314 environment=self._env, 

315 unresolvable_substitutions=frozenset(extra_substitutions), 

316 parent=self, 

317 ) 

318 

319 def substitute( 

320 self, 

321 value: str, 

322 definition_source: str, 

323 /, 

324 escape_glob_characters: bool = False, 

325 ) -> str: 

326 if "{{" not in value: 

327 return value 

328 return self._apply_substitution( 

329 SUBST_VAR_RE, 

330 value, 

331 definition_source, 

332 escape_glob_characters=escape_glob_characters, 

333 )