Coverage for src/debputy/substitution.py: 85%
153 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-07 09:46 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-07 09:46 +0000
1import dataclasses
2import os
3import re
4from enum import IntEnum
5from typing import FrozenSet, NoReturn, Optional, Set, TYPE_CHECKING, Self
6from collections.abc import Mapping
8from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
9from debputy.exceptions import DebputySubstitutionError
10from debputy.util import glob_escape
12if TYPE_CHECKING:
13 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
14 from debputy.plugin.api import VirtualPath
17SUBST_VAR_RE = re.compile(
18 r"""
19 ([{][{][ ]*)
21 (
22 _?[A-Za-z0-9]+
23 (?:[-_:][A-Za-z0-9]+)*
24 )
26 ([ ]*[}][}])
27""",
28 re.VERBOSE,
29)
32class VariableNameState(IntEnum):
33 UNDEFINED = 1
34 RESERVED = 2
35 DEFINED = 3
38@dataclasses.dataclass(slots=True, frozen=True)
39class VariableContext:
40 debian_dir: "VirtualPath"
43class Substitution:
44 def substitute(
45 self,
46 value: str,
47 definition_source: str,
48 /,
49 escape_glob_characters: bool = False,
50 ) -> str:
51 raise NotImplementedError
53 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
54 raise NotImplementedError
56 def with_unresolvable_substitutions(
57 self, *extra_substitutions: str
58 ) -> "Substitution":
59 raise NotImplementedError
61 def variable_state(self, variable_name: str) -> VariableNameState:
62 return VariableNameState.UNDEFINED
64 def is_used(self, variable_name: str) -> bool:
65 return False
67 def _mark_used(self, variable_name: str) -> None:
68 pass
70 def _replacement(self, matched_key: str, definition_source: str) -> str:
71 self._error(
72 "Cannot resolve {{" + matched_key + "}}."
73 f" The error occurred while trying to process {definition_source}"
74 )
76 def _error(
77 self,
78 msg: str,
79 *,
80 caused_by: BaseException | None = None,
81 ) -> NoReturn:
82 raise DebputySubstitutionError(msg) from caused_by
84 def _apply_substitution(
85 self,
86 pattern: re.Pattern[str],
87 value: str,
88 definition_source: str,
89 /,
90 escape_glob_characters: bool = False,
91 ) -> str:
92 replacement = value
93 offset = 0
94 for match in pattern.finditer(value):
95 prefix, matched_key, suffix = match.groups()
96 replacement_value = self._replacement(matched_key, definition_source)
97 self._mark_used(matched_key)
98 if escape_glob_characters: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 replacement_value = glob_escape(replacement_value)
100 s, e = match.span()
101 s += offset
102 e += offset
103 replacement = replacement[:s] + replacement_value + replacement[e:]
104 token_fluff_len = len(prefix) + len(suffix)
105 offset += len(replacement_value) - len(matched_key) - token_fluff_len
106 return replacement
109class _NullSubstitution(Substitution):
110 def substitute(
111 self,
112 value: str,
113 definition_source: str,
114 /,
115 escape_glob_characters: bool = False,
116 ) -> str:
117 return value
119 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
120 return self
122 def with_unresolvable_substitutions(
123 self, *extra_substitutions: str
124 ) -> "Substitution":
125 return self
128NULL_SUBSTITUTION: Substitution = _NullSubstitution()
131class SubstitutionImpl(Substitution):
132 __slots__ = (
133 "_used",
134 "_env",
135 "_plugin_feature_set",
136 "_static_variables",
137 "_unresolvable_substitutions",
138 "_dpkg_arch_table",
139 "_parent",
140 "_variable_context",
141 )
143 def __init__(
144 self,
145 /,
146 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None,
147 static_variables: Mapping[str, str] | None = None,
148 unresolvable_substitutions: frozenset[str] = frozenset(),
149 dpkg_arch_table: DpkgArchitectureBuildProcessValuesTable | None = None,
150 environment: Mapping[str, str] | None = None,
151 parent: Optional["SubstitutionImpl"] = None,
152 variable_context: VariableContext | None = None,
153 ) -> None:
154 self._used: set[str] = set()
155 self._plugin_feature_set = plugin_feature_set
156 self._static_variables = (
157 dict(static_variables) if static_variables is not None else None
158 )
159 self._unresolvable_substitutions = unresolvable_substitutions
160 self._dpkg_arch_table = (
161 dpkg_arch_table
162 if dpkg_arch_table is not None
163 else DpkgArchitectureBuildProcessValuesTable()
164 )
165 self._env = environment if environment is not None else os.environ
166 self._parent = parent
167 if variable_context is not None:
168 self._variable_context = variable_context
169 elif self._parent is not None: 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true
170 self._variable_context = self._parent._variable_context
171 else:
172 raise ValueError(
173 "variable_context is required either directly or via the parent"
174 )
176 def copy_for_subst_test(
177 self,
178 plugin_feature_set: "PluginProvidedFeatureSet",
179 variable_context: VariableContext,
180 *,
181 extra_substitutions: Mapping[str, str] | None = None,
182 environment: Mapping[str, str] | None = None,
183 ) -> "Self":
184 extra_substitutions_impl = (
185 dict(self._static_variables.items()) if self._static_variables else {}
186 )
187 if extra_substitutions: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 extra_substitutions_impl.update(extra_substitutions)
189 return self.__class__(
190 plugin_feature_set=plugin_feature_set,
191 variable_context=variable_context,
192 static_variables=extra_substitutions_impl,
193 unresolvable_substitutions=self._unresolvable_substitutions,
194 dpkg_arch_table=self._dpkg_arch_table,
195 environment=environment if environment is not None else {},
196 )
198 def variable_state(self, key: str) -> VariableNameState:
199 if key.startswith("DEB_"):
200 if key in self._dpkg_arch_table:
201 return VariableNameState.DEFINED
202 return VariableNameState.RESERVED
203 plugin_feature_set = self._plugin_feature_set
204 if (
205 plugin_feature_set is not None
206 and key in plugin_feature_set.manifest_variables
207 ):
208 return VariableNameState.DEFINED
209 if key.startswith("env:"):
210 k = key[4:]
211 if k in self._env: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 return VariableNameState.DEFINED
213 return VariableNameState.RESERVED
214 if self._static_variables is not None and key in self._static_variables: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 return VariableNameState.DEFINED
216 if key in self._unresolvable_substitutions:
217 return VariableNameState.RESERVED
218 if self._parent is not None:
219 return self._parent.variable_state(key)
220 return VariableNameState.UNDEFINED
222 def is_used(self, variable_name: str) -> bool:
223 if variable_name in self._used:
224 return True
225 parent = self._parent
226 if parent is not None:
227 return parent.is_used(variable_name)
228 return False
230 def _mark_used(self, variable_name: str) -> None:
231 p = self._parent
232 while p:
233 # Find the parent that has the variable if possible. This ensures that is_used works
234 # correctly.
235 if p._static_variables is not None and variable_name in p._static_variables:
236 p._mark_used(variable_name)
237 break
238 plugin_feature_set = p._plugin_feature_set
239 if ( 239 ↛ 246line 239 didn't jump to line 246 because the condition on line 239 was never true
240 plugin_feature_set is not None
241 and variable_name in plugin_feature_set.manifest_variables
242 and not plugin_feature_set.manifest_variables[
243 variable_name
244 ].is_documentation_placeholder
245 ):
246 p._mark_used(variable_name)
247 break
248 p = p._parent
249 self._used.add(variable_name)
251 def _replacement(self, key: str, definition_source: str) -> str:
252 if key.startswith("DEB_") and key in self._dpkg_arch_table:
253 return self._dpkg_arch_table[key]
254 if key.startswith("env:"): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 k = key[4:]
256 if k in self._env:
257 return self._env[k]
258 self._error(
259 f'The environment does not contain the variable "{key}" '
260 f"(error occurred while trying to process {definition_source})"
261 )
263 # The order between extra_substitution and plugin_feature_set is leveraged by
264 # the tests to implement mocking variables. If the order needs tweaking,
265 # you will need a custom resolver for the tests to support mocking.
266 static_variables = self._static_variables
267 if static_variables and key in static_variables:
268 return static_variables[key]
269 plugin_feature_set = self._plugin_feature_set
270 if plugin_feature_set is not None:
271 provided_var = plugin_feature_set.manifest_variables.get(key)
272 if (
273 provided_var is not None
274 and not provided_var.is_documentation_placeholder
275 ):
276 v = provided_var.resolve(self._variable_context)
277 # cache it for next time.
278 if static_variables is None:
279 static_variables = {}
280 self._static_variables = static_variables
281 static_variables[key] = v
282 return v
283 if key in self._unresolvable_substitutions:
284 self._error(
285 "The variable {{" + key + "}}"
286 f" is not available while processing {definition_source}."
287 )
288 parent = self._parent
289 if parent is not None:
290 return parent._replacement(key, definition_source)
291 self._error(
292 "Cannot resolve {{" + key + "}}: it is not a known key."
293 f" The error occurred while trying to process {definition_source}"
294 )
296 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
297 if not extra_substitutions: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 return self
299 return SubstitutionImpl(
300 dpkg_arch_table=self._dpkg_arch_table,
301 environment=self._env,
302 static_variables=extra_substitutions,
303 parent=self,
304 )
306 def with_unresolvable_substitutions(
307 self,
308 *extra_substitutions: str,
309 ) -> "Substitution":
310 if not extra_substitutions:
311 return self
312 return SubstitutionImpl(
313 dpkg_arch_table=self._dpkg_arch_table,
314 environment=self._env,
315 unresolvable_substitutions=frozenset(extra_substitutions),
316 parent=self,
317 )
319 def substitute(
320 self,
321 value: str,
322 definition_source: str,
323 /,
324 escape_glob_characters: bool = False,
325 ) -> str:
326 if "{{" not in value:
327 return value
328 return self._apply_substitution(
329 SUBST_VAR_RE,
330 value,
331 definition_source,
332 escape_glob_characters=escape_glob_characters,
333 )