Coverage for src/debputy/manifest_parser/util.py: 89%
234 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
1import dataclasses
2import typing
3from typing import (
4 Optional,
5 get_origin,
6 get_args,
7 Any,
8 TypeVar,
9 TYPE_CHECKING,
10 Literal,
11)
12from collections.abc import Iterator, Mapping, Iterable, Container
14from debputy.yaml.compat import CommentedBase
16from debputy.manifest_parser.exceptions import ManifestParseException
18if TYPE_CHECKING:
19 from debputy.manifest_parser.parser_data import ParserContextData
20 from debputy.manifest_parser.parse_hints import DebputyParseHint
21 from debputy.plugin.api.spec import DebputyIntegrationMode
24MP = TypeVar("MP", bound="DebputyParseHint")
25StrOrInt = str | int
26AttributePathAliasMapping = Mapping[
27 StrOrInt, tuple[StrOrInt, Optional["AttributePathAliasMapping"]]
28]
29LineReportKind = Literal["key", "value", "container"]
32class AttributePath:
33 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint")
35 # Summary: if parent is defined, name/key is too.
37 @typing.overload
38 def __init__( 38 ↛ exitline 38 didn't return from function '__init__' because
39 self,
40 parent: Optional["AttributePath"],
41 key: str | int,
42 *,
43 container: Any | None = None,
44 alias_mapping: AttributePathAliasMapping | None = None,
45 ) -> None: ...
47 @typing.overload
48 def __init__( 48 ↛ exitline 48 didn't return from function '__init__' because
49 self,
50 parent: None,
51 key: None,
52 *,
53 container: Any | None = None,
54 alias_mapping: AttributePathAliasMapping | None = None,
55 ) -> None: ...
57 def __init__(
58 self,
59 parent,
60 key,
61 *,
62 container=None,
63 alias_mapping=None,
64 ) -> None:
65 self.parent = parent
66 self.container = container
67 self.name = key
68 self.path_hint: str | None = None
69 self.alias_mapping = alias_mapping
71 @classmethod
72 def root_path(cls, container: Any | None) -> "AttributePath":
73 return AttributePath(None, None, container=container)
75 @classmethod
76 def builtin_path(cls) -> "AttributePath":
77 return AttributePath(None, "$builtin$")
79 @classmethod
80 def test_path(cls) -> "AttributePath":
81 return AttributePath(None, "$test$")
83 def copy_with_path_hint(self, path_hint: str) -> "AttributePath":
84 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping)
85 p.path_hint = path_hint
86 return p
88 def path_segments(self) -> Iterable[str | int]:
89 for name, _path_hint in self._iter_path():
90 yield name
92 def _resolve_path(self, report_kind: LineReportKind) -> str:
93 parent = self.parent
94 key = self.name
95 if report_kind == "container":
96 named_parent = parent is not None and parent.parent is not None
97 key = parent.name if named_parent else None
98 parent = parent.parent if named_parent else None
99 container = parent.container if parent is not None else None
101 if isinstance(container, CommentedBase):
102 lc = container.lc
103 try:
104 if isinstance(key, str):
105 if report_kind == "key":
106 lc_data = lc.key(key)
107 else:
108 lc_data = lc.value(key)
109 else:
110 lc_data = lc.item(key)
111 except (AttributeError, RuntimeError, LookupError, TypeError):
112 lc_data = None
113 else:
114 lc_data = None
116 parts: list[str] = []
117 path_hint: str | None = None
119 for k, s_path_hint in self._iter_path():
120 if s_path_hint is not None:
121 path_hint = s_path_hint
122 if isinstance(k, int):
123 parts.append(f"[{k}]")
124 else:
125 if parts:
126 parts.append(".")
127 parts.append(k)
129 if lc_data is not None:
130 line_pos, col = lc_data
131 # Translate 0-based (index) to 1-based (line number)
132 line_pos += 1
133 parts.append(f" [Line {line_pos} column {col}]")
135 elif path_hint: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 parts.append(f" <Search for: {path_hint}>")
137 if not parts: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 return "document root"
139 return "".join(parts)
141 @property
142 def path_container_lc(self) -> str:
143 return self._resolve_path("container")
145 @property
146 def path_key_lc(self) -> str:
147 return self._resolve_path("key")
149 @property
150 def path(self) -> str:
151 return self._resolve_path("value")
153 def __str__(self) -> str:
154 return self.path
156 def __getitem__(self, item: str | int) -> "AttributePath":
157 alias_mapping = None
158 if self.alias_mapping:
159 match = self.alias_mapping.get(item)
160 if match:
161 item, alias_mapping = match
162 if item == "":
163 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`.
164 return AttributePath(
165 self.parent,
166 self.name,
167 alias_mapping=alias_mapping,
168 container=self.container,
169 )
170 container = self.container
171 if container is not None:
172 try:
173 child_container = self.container[item]
174 except (AttributeError, RuntimeError, LookupError, TypeError):
175 child_container = None
176 else:
177 child_container = None
178 return AttributePath(
179 self,
180 item,
181 alias_mapping=alias_mapping,
182 container=child_container,
183 )
185 def _iter_path(self) -> Iterator[tuple[str | int, str | None]]:
186 "Parents, from the (excluded) root to (included) self."
187 if self.name is not None:
188 parent = self.parent
189 if parent is not None:
190 yield from parent._iter_path()
191 yield self.name, self.path_hint
194def check_integration_mode(
195 path: AttributePath,
196 parser_context: Optional["ParserContextData"] = None,
197 expected_debputy_integration_mode: (
198 Container["DebputyIntegrationMode"] | None
199 ) = None,
200) -> None:
201 if expected_debputy_integration_mode is None:
202 return
203 if parser_context is None:
204 raise AssertionError(
205 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context"
206 )
207 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise ManifestParseException(
209 f"The attribute {path.path} cannot be used as it is not allowed for"
210 f" the current debputy integration mode ({parser_context.debputy_integration_mode})."
211 f" Please remove the manifest definition or change the integration mode"
212 )
215@dataclasses.dataclass(slots=True, frozen=True)
216class _SymbolicModeSegment:
217 base_mode: int
218 base_mask: int
219 cap_x_mode: int
220 cap_x_mask: int
222 def apply(self, current_mode: int, is_dir: bool) -> int:
223 if current_mode & 0o111 or is_dir:
224 chosen_mode = self.cap_x_mode
225 mode_mask = self.cap_x_mask
226 else:
227 chosen_mode = self.base_mode
228 mode_mask = self.base_mask
229 # set ("="): mode mask clears relevant segment and current_mode are the desired bits
230 # add ("+"): mode mask keeps everything and current_mode are the desired bits
231 # remove ("-"): mode mask clears relevant bits and current_mode are 0
232 return (current_mode & mode_mask) | chosen_mode
235def _symbolic_mode_bit_inverse(v: int) -> int:
236 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits
237 # required, which is not what we want.
238 return ~v & 0o7777
241def parse_symbolic_mode(
242 symbolic_mode: str,
243 attribute_path: AttributePath | None,
244) -> Iterator[_SymbolicModeSegment]:
245 sticky_bit = 0o01000
246 setuid_bit = 0o04000
247 setgid_bit = 0o02000
248 mode_group_flag = 0o7
249 subject_mask_and_shift = {
250 "u": (mode_group_flag << 6, 6),
251 "g": (mode_group_flag << 3, 3),
252 "o": (mode_group_flag << 0, 0),
253 }
254 bits = {
255 "r": (0o4, 0o4),
256 "w": (0o2, 0o2),
257 "x": (0o1, 0o1),
258 "X": (0o0, 0o1),
259 "s": (0o0, 0o0), # Special-cased below (it depends on the subject)
260 "t": (0o0, 0o0), # Special-cased below
261 }
262 modifiers = {
263 "+",
264 "-",
265 "=",
266 }
267 in_path = f" in {attribute_path.path}" if attribute_path is not None else ""
268 for orig_part in symbolic_mode.split(","):
269 base_mode = 0
270 cap_x_mode = 0
271 part = orig_part
272 subjects = set()
273 while part and part[0] in ("u", "g", "o", "a"):
274 subject = part[0]
275 if subject == "a":
276 subjects = {"u", "g", "o"}
277 else:
278 subjects.add(subject)
279 part = part[1:]
280 if not subjects:
281 subjects = {"u", "g", "o"}
283 if part and part[0] in modifiers: 283 ↛ 285line 283 didn't jump to line 285 because the condition on line 283 was always true
284 modifier = part[0]
285 elif not part:
286 raise ValueError(
287 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")'
288 )
289 else:
290 raise ValueError(
291 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]'
292 f' (from "{orig_part}")'
293 )
294 part = part[1:]
295 s_bit_seen = False
296 t_bit_seen = False
297 while part and part[0] in bits:
298 if part == "s":
299 s_bit_seen = True
300 elif part == "t": 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 t_bit_seen = True
302 elif part in ("u", "g", "o"): 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 raise NotImplementedError(
304 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an"
305 " existing subject's permissions (a=u) in symbolic modes."
306 )
307 else:
308 matched_bits = bits.get(part[0])
309 if matched_bits is None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 valid_bits = "".join(bits)
311 raise ValueError(
312 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters'
313 f' in "{valid_bits}" (from "{orig_part}")'
314 )
315 base_mode_bits, cap_x_mode_bits = bits[part[0]]
316 base_mode |= base_mode_bits
317 cap_x_mode |= cap_x_mode_bits
318 part = part[1:]
320 if part: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 raise ValueError(
322 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"'
323 )
325 final_base_mode = 0
326 final_cap_x_mode = 0
327 segment_mask = 0
328 for subject in subjects:
329 mask, shift = subject_mask_and_shift[subject]
330 segment_mask |= mask
331 final_base_mode |= base_mode << shift
332 final_cap_x_mode |= cap_x_mode << shift
333 if modifier == "=":
334 segment_mask |= setuid_bit if "u" in subjects else 0
335 segment_mask |= setgid_bit if "g" in subjects else 0
336 segment_mask |= sticky_bit if "o" in subjects else 0
337 if s_bit_seen:
338 if "u" in subjects: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true
339 final_base_mode |= setuid_bit
340 final_cap_x_mode |= setuid_bit
341 if "g" in subjects:
342 final_base_mode |= setgid_bit
343 final_cap_x_mode |= setgid_bit
344 if t_bit_seen: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 final_base_mode |= sticky_bit
346 final_cap_x_mode |= sticky_bit
347 if modifier == "+":
348 final_base_mask = ~0
349 final_cap_x_mask = ~0
350 elif modifier == "-":
351 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode)
352 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode)
353 final_base_mode = 0
354 final_cap_x_mode = 0
355 elif modifier == "=":
356 # FIXME: Handle "unmentioned directory's setgid/setuid bits"
357 inverted_mask = _symbolic_mode_bit_inverse(segment_mask)
358 final_base_mask = inverted_mask
359 final_cap_x_mask = inverted_mask
360 else:
361 raise AssertionError(
362 f"Unknown modifier in symbolic mode: {modifier} - should not have happened"
363 )
364 yield _SymbolicModeSegment(
365 base_mode=final_base_mode,
366 base_mask=final_base_mask,
367 cap_x_mode=final_cap_x_mode,
368 cap_x_mask=final_cap_x_mask,
369 )
372def unpack_type(
373 orig_type: Any,
374 parsing_typed_dict_attribute: bool,
375) -> tuple[Any, Any | None, tuple[Any, ...]]:
376 raw_type = orig_type
377 origin = get_origin(raw_type)
378 args = get_args(raw_type)
379 if not parsing_typed_dict_attribute and repr(origin) in ( 379 ↛ 383line 379 didn't jump to line 383 because the condition on line 379 was never true
380 "typing.NotRequired",
381 "typing.Required",
382 ):
383 raise ValueError(
384 f"The Required/NotRequired attributes cannot be used outside typed dicts,"
385 f" the type that triggered the error: {orig_type}"
386 )
388 while repr(origin) in ("typing.NotRequired", "typing.Required"):
389 if len(args) != 1: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true
390 raise ValueError(
391 f"The type {raw_type} should have exactly one type parameter"
392 )
393 raw_type = args[0]
394 origin = get_origin(raw_type)
395 args = get_args(raw_type)
397 assert not isinstance(raw_type, tuple)
399 return raw_type, origin, args
402def find_annotation(
403 annotations: tuple[Any, ...],
404 anno_class: type[MP],
405) -> MP | None:
406 m = None
407 for anno in annotations:
408 if isinstance(anno, anno_class):
409 if m is not None: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 raise ValueError(
411 f"The annotation {anno_class.__name__} was used more than once"
412 )
413 m = anno
414 return m