Coverage for src/debputy/substitution.py: 85%
154 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import os
3import re
4from enum import IntEnum
5from typing import FrozenSet, NoReturn, Optional, Set, TYPE_CHECKING, Self
6from collections.abc import Mapping
8from debputy.architecture_support import (
9 dpkg_architecture_table,
10 DpkgArchitectureBuildProcessValuesTable,
11)
12from debputy.exceptions import DebputySubstitutionError
13from debputy.util import glob_escape
15if TYPE_CHECKING:
16 from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
17 from debputy.plugin.api import VirtualPath
20SUBST_VAR_RE = re.compile(
21 r"""
22 ([{][{][ ]*)
24 (
25 _?[A-Za-z0-9]+
26 (?:[-_:][A-Za-z0-9]+)*
27 )
29 ([ ]*[}][}])
30""",
31 re.VERBOSE,
32)
35class VariableNameState(IntEnum):
36 UNDEFINED = 1
37 RESERVED = 2
38 DEFINED = 3
41@dataclasses.dataclass(slots=True, frozen=True)
42class VariableContext:
43 debian_dir: "VirtualPath"
46class Substitution:
47 def substitute(
48 self,
49 value: str,
50 definition_source: str,
51 /,
52 escape_glob_characters: bool = False,
53 ) -> str:
54 raise NotImplementedError
56 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
57 raise NotImplementedError
59 def with_unresolvable_substitutions(
60 self, *extra_substitutions: str
61 ) -> "Substitution":
62 raise NotImplementedError
64 def variable_state(self, variable_name: str) -> VariableNameState:
65 return VariableNameState.UNDEFINED
67 def is_used(self, variable_name: str) -> bool:
68 return False
70 def _mark_used(self, variable_name: str) -> None:
71 pass
73 def _replacement(self, matched_key: str, definition_source: str) -> str:
74 self._error(
75 "Cannot resolve {{" + matched_key + "}}."
76 f" The error occurred while trying to process {definition_source}"
77 )
79 def _error(
80 self,
81 msg: str,
82 *,
83 caused_by: BaseException | None = None,
84 ) -> NoReturn:
85 raise DebputySubstitutionError(msg) from caused_by
87 def _apply_substitution(
88 self,
89 pattern: re.Pattern[str],
90 value: str,
91 definition_source: str,
92 /,
93 escape_glob_characters: bool = False,
94 ) -> str:
95 replacement = value
96 offset = 0
97 for match in pattern.finditer(value):
98 prefix, matched_key, suffix = match.groups()
99 replacement_value = self._replacement(matched_key, definition_source)
100 self._mark_used(matched_key)
101 if escape_glob_characters: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 replacement_value = glob_escape(replacement_value)
103 s, e = match.span()
104 s += offset
105 e += offset
106 replacement = replacement[:s] + replacement_value + replacement[e:]
107 token_fluff_len = len(prefix) + len(suffix)
108 offset += len(replacement_value) - len(matched_key) - token_fluff_len
109 return replacement
112class NullSubstitution(Substitution):
113 def substitute(
114 self,
115 value: str,
116 definition_source: str,
117 /,
118 escape_glob_characters: bool = False,
119 ) -> str:
120 return value
122 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
123 return self
125 def with_unresolvable_substitutions(
126 self, *extra_substitutions: str
127 ) -> "Substitution":
128 return self
131NULL_SUBSTITUTION = NullSubstitution()
132del NullSubstitution
135class SubstitutionImpl(Substitution):
136 __slots__ = (
137 "_used",
138 "_env",
139 "_plugin_feature_set",
140 "_static_variables",
141 "_unresolvable_substitutions",
142 "_dpkg_arch_table",
143 "_parent",
144 "_variable_context",
145 )
147 def __init__(
148 self,
149 /,
150 plugin_feature_set: Optional["PluginProvidedFeatureSet"] = None,
151 static_variables: Mapping[str, str] | None = None,
152 unresolvable_substitutions: frozenset[str] = frozenset(),
153 dpkg_arch_table: DpkgArchitectureBuildProcessValuesTable | None = None,
154 environment: Mapping[str, str] | None = None,
155 parent: Optional["SubstitutionImpl"] = None,
156 variable_context: VariableContext | None = None,
157 ) -> None:
158 self._used: set[str] = set()
159 self._plugin_feature_set = plugin_feature_set
160 self._static_variables = (
161 dict(static_variables) if static_variables is not None else None
162 )
163 self._unresolvable_substitutions = unresolvable_substitutions
164 self._dpkg_arch_table = (
165 dpkg_arch_table
166 if dpkg_arch_table is not None
167 else dpkg_architecture_table()
168 )
169 self._env = environment if environment is not None else os.environ
170 self._parent = parent
171 if variable_context is not None:
172 self._variable_context = variable_context
173 elif self._parent is not None: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true
174 self._variable_context = self._parent._variable_context
175 else:
176 raise ValueError(
177 "variable_context is required either directly or via the parent"
178 )
180 def copy_for_subst_test(
181 self,
182 plugin_feature_set: "PluginProvidedFeatureSet",
183 variable_context: VariableContext,
184 *,
185 extra_substitutions: Mapping[str, str] | None = None,
186 environment: Mapping[str, str] | None = None,
187 ) -> "Self":
188 extra_substitutions_impl = (
189 dict(self._static_variables.items()) if self._static_variables else {}
190 )
191 if extra_substitutions: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 extra_substitutions_impl.update(extra_substitutions)
193 return self.__class__(
194 plugin_feature_set=plugin_feature_set,
195 variable_context=variable_context,
196 static_variables=extra_substitutions_impl,
197 unresolvable_substitutions=self._unresolvable_substitutions,
198 dpkg_arch_table=self._dpkg_arch_table,
199 environment=environment if environment is not None else {},
200 )
202 def variable_state(self, key: str) -> VariableNameState:
203 if key.startswith("DEB_"):
204 if key in self._dpkg_arch_table:
205 return VariableNameState.DEFINED
206 return VariableNameState.RESERVED
207 plugin_feature_set = self._plugin_feature_set
208 if (
209 plugin_feature_set is not None
210 and key in plugin_feature_set.manifest_variables
211 ):
212 return VariableNameState.DEFINED
213 if key.startswith("env:"):
214 k = key[4:]
215 if k in self._env: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 return VariableNameState.DEFINED
217 return VariableNameState.RESERVED
218 if self._static_variables is not None and key in self._static_variables: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 return VariableNameState.DEFINED
220 if key in self._unresolvable_substitutions:
221 return VariableNameState.RESERVED
222 if self._parent is not None:
223 return self._parent.variable_state(key)
224 return VariableNameState.UNDEFINED
226 def is_used(self, variable_name: str) -> bool:
227 if variable_name in self._used:
228 return True
229 parent = self._parent
230 if parent is not None:
231 return parent.is_used(variable_name)
232 return False
234 def _mark_used(self, variable_name: str) -> None:
235 p = self._parent
236 while p:
237 # Find the parent that has the variable if possible. This ensures that is_used works
238 # correctly.
239 if p._static_variables is not None and variable_name in p._static_variables:
240 p._mark_used(variable_name)
241 break
242 plugin_feature_set = p._plugin_feature_set
243 if ( 243 ↛ 250line 243 didn't jump to line 250 because the condition on line 243 was never true
244 plugin_feature_set is not None
245 and variable_name in plugin_feature_set.manifest_variables
246 and not plugin_feature_set.manifest_variables[
247 variable_name
248 ].is_documentation_placeholder
249 ):
250 p._mark_used(variable_name)
251 break
252 p = p._parent
253 self._used.add(variable_name)
255 def _replacement(self, key: str, definition_source: str) -> str:
256 if key.startswith("DEB_") and key in self._dpkg_arch_table:
257 return self._dpkg_arch_table[key]
258 if key.startswith("env:"): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 k = key[4:]
260 if k in self._env:
261 return self._env[k]
262 self._error(
263 f'The environment does not contain the variable "{key}" '
264 f"(error occurred while trying to process {definition_source})"
265 )
267 # The order between extra_substitution and plugin_feature_set is leveraged by
268 # the tests to implement mocking variables. If the order needs tweaking,
269 # you will need a custom resolver for the tests to support mocking.
270 static_variables = self._static_variables
271 if static_variables and key in static_variables:
272 return static_variables[key]
273 plugin_feature_set = self._plugin_feature_set
274 if plugin_feature_set is not None:
275 provided_var = plugin_feature_set.manifest_variables.get(key)
276 if (
277 provided_var is not None
278 and not provided_var.is_documentation_placeholder
279 ):
280 v = provided_var.resolve(self._variable_context)
281 # cache it for next time.
282 if static_variables is None:
283 static_variables = {}
284 self._static_variables = static_variables
285 static_variables[key] = v
286 return v
287 if key in self._unresolvable_substitutions:
288 self._error(
289 "The variable {{" + key + "}}"
290 f" is not available while processing {definition_source}."
291 )
292 parent = self._parent
293 if parent is not None:
294 return parent._replacement(key, definition_source)
295 self._error(
296 "Cannot resolve {{" + key + "}}: it is not a known key."
297 f" The error occurred while trying to process {definition_source}"
298 )
300 def with_extra_substitutions(self, **extra_substitutions: str) -> "Substitution":
301 if not extra_substitutions: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 return self
303 return SubstitutionImpl(
304 dpkg_arch_table=self._dpkg_arch_table,
305 environment=self._env,
306 static_variables=extra_substitutions,
307 parent=self,
308 )
310 def with_unresolvable_substitutions(
311 self,
312 *extra_substitutions: str,
313 ) -> "Substitution":
314 if not extra_substitutions:
315 return self
316 return SubstitutionImpl(
317 dpkg_arch_table=self._dpkg_arch_table,
318 environment=self._env,
319 unresolvable_substitutions=frozenset(extra_substitutions),
320 parent=self,
321 )
323 def substitute(
324 self,
325 value: str,
326 definition_source: str,
327 /,
328 escape_glob_characters: bool = False,
329 ) -> str:
330 if "{{" not in value:
331 return value
332 return self._apply_substitution(
333 SUBST_VAR_RE,
334 value,
335 definition_source,
336 escape_glob_characters=escape_glob_characters,
337 )