Coverage for src/debputy/manifest_parser/util.py: 89%
247 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2from typing import (
3 Union,
4 Optional,
5 List,
6 Tuple,
7 get_origin,
8 get_args,
9 Any,
10 Type,
11 TypeVar,
12 TYPE_CHECKING,
13 Literal,
14 FrozenSet,
15 cast,
16)
17from collections.abc import Iterator, Mapping, Iterable, Container
19from debputy.yaml.compat import CommentedBase
21from debputy.manifest_parser.exceptions import ManifestParseException
23if TYPE_CHECKING:
24 from debputy.manifest_parser.parser_data import ParserContextData
25 from debputy.plugin.api.spec import DebputyIntegrationMode, PackageTypeSelector
28MP = TypeVar("MP", bound="DebputyParseHint")
29StrOrInt = Union[str, int]
30AttributePathAliasMapping = Mapping[
31 StrOrInt, tuple[StrOrInt, Optional["AttributePathAliasMapping"]]
32]
33LineReportKind = Literal["key", "value", "container"]
36_PACKAGE_TYPE_DEB_ONLY = frozenset(["deb"])
37_ALL_PACKAGE_TYPES = frozenset(["deb", "udeb"])
40def resolve_package_type_selectors(
41 package_type: "PackageTypeSelector",
42) -> frozenset[str]:
43 if package_type is _ALL_PACKAGE_TYPES or package_type is _PACKAGE_TYPE_DEB_ONLY:
44 return cast("FrozenSet[str]", package_type)
45 if isinstance(package_type, str):
46 return (
47 _PACKAGE_TYPE_DEB_ONLY
48 if package_type == "deb"
49 else frozenset([package_type])
50 )
51 else:
52 return frozenset(package_type)
55class AttributePath:
56 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint")
58 def __init__(
59 self,
60 parent: Optional["AttributePath"],
61 key: str | int | None,
62 *,
63 container: Any | None = None,
64 alias_mapping: AttributePathAliasMapping | None = None,
65 ) -> None:
66 self.parent = parent
67 self.container = container
68 self.name = key
69 self.path_hint: str | None = None
70 self.alias_mapping = alias_mapping
72 @classmethod
73 def root_path(cls, container: Any | None) -> "AttributePath":
74 return AttributePath(None, None, container=container)
76 @classmethod
77 def builtin_path(cls) -> "AttributePath":
78 return AttributePath(None, "$builtin$")
80 @classmethod
81 def test_path(cls) -> "AttributePath":
82 return AttributePath(None, "$test$")
84 def __bool__(self) -> bool:
85 return self.name is not None or self.parent is not None
87 def copy_with_path_hint(self, path_hint: str) -> "AttributePath":
88 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping)
89 p.path_hint = path_hint
90 return p
92 def path_segments(self) -> Iterable[str | int]:
93 segments = list(self._iter_path())
94 segments.reverse()
95 yield from (s.name for s in segments)
97 def _resolve_path(self, report_kind: LineReportKind) -> str:
98 parent = self.parent
99 key = self.name
100 if report_kind == "container":
101 key = parent.name if parent else None
102 parent = parent.parent if parent else None
103 container = parent.container if parent is not None else None
105 if isinstance(container, CommentedBase):
106 lc = container.lc
107 try:
108 if isinstance(key, str):
109 if report_kind == "key":
110 lc_data = lc.key(key)
111 else:
112 lc_data = lc.value(key)
113 else:
114 lc_data = lc.item(key)
115 except (AttributeError, RuntimeError, LookupError, TypeError):
116 lc_data = None
117 else:
118 lc_data = None
120 segments = list(self._iter_path())
121 segments.reverse()
122 parts: list[str] = []
123 path_hint = None
125 for s in segments:
126 k = s.name
127 s_path_hint = s.path_hint
128 if s_path_hint is not None:
129 path_hint = s_path_hint
130 if isinstance(k, int):
131 parts.append(f"[{k}]")
132 elif k is not None: 132 ↛ 125line 132 didn't jump to line 125 because the condition on line 132 was always true
133 if parts:
134 parts.append(".")
135 parts.append(k)
137 if lc_data is not None:
138 line_pos, col = lc_data
139 # Translate 0-based (index) to 1-based (line number)
140 line_pos += 1
141 parts.append(f" [Line {line_pos} column {col}]")
143 elif path_hint: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 parts.append(f" <Search for: {path_hint}>")
145 if not parts: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return "document root"
147 return "".join(parts)
149 @property
150 def path_container_lc(self) -> str:
151 return self._resolve_path("container")
153 @property
154 def path_key_lc(self) -> str:
155 return self._resolve_path("key")
157 @property
158 def path(self) -> str:
159 return self._resolve_path("value")
161 def __str__(self) -> str:
162 return self.path
164 def __getitem__(self, item: str | int) -> "AttributePath":
165 alias_mapping = None
166 if self.alias_mapping:
167 match = self.alias_mapping.get(item)
168 if match:
169 item, alias_mapping = match
170 if item == "":
171 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`.
172 return AttributePath(
173 self.parent,
174 self.name,
175 alias_mapping=alias_mapping,
176 container=self.container,
177 )
178 container = self.container
179 if container is not None:
180 try:
181 child_container = self.container[item]
182 except (AttributeError, RuntimeError, LookupError, TypeError):
183 child_container = None
184 else:
185 child_container = None
186 return AttributePath(
187 self,
188 item,
189 alias_mapping=alias_mapping,
190 container=child_container,
191 )
193 def _iter_path(self) -> Iterator["AttributePath"]:
194 current = self
195 yield current
196 while True:
197 parent = current.parent
198 if not parent:
199 break
200 current = parent
201 yield current
204def check_integration_mode(
205 path: AttributePath,
206 parser_context: Optional["ParserContextData"] = None,
207 expected_debputy_integration_mode: (
208 Container["DebputyIntegrationMode"] | None
209 ) = None,
210) -> None:
211 if expected_debputy_integration_mode is None:
212 return
213 if parser_context is None:
214 raise AssertionError(
215 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context"
216 )
217 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 raise ManifestParseException(
219 f"The attribute {path.path} cannot be used as it is not allowed for"
220 f" the current debputy integration mode ({parser_context.debputy_integration_mode})."
221 f" Please remove the manifest definition or change the integration mode"
222 )
225@dataclasses.dataclass(slots=True, frozen=True)
226class _SymbolicModeSegment:
227 base_mode: int
228 base_mask: int
229 cap_x_mode: int
230 cap_x_mask: int
232 def apply(self, current_mode: int, is_dir: bool) -> int:
233 if current_mode & 0o111 or is_dir:
234 chosen_mode = self.cap_x_mode
235 mode_mask = self.cap_x_mask
236 else:
237 chosen_mode = self.base_mode
238 mode_mask = self.base_mask
239 # set ("="): mode mask clears relevant segment and current_mode are the desired bits
240 # add ("+"): mode mask keeps everything and current_mode are the desired bits
241 # remove ("-"): mode mask clears relevant bits and current_mode are 0
242 return (current_mode & mode_mask) | chosen_mode
245def _symbolic_mode_bit_inverse(v: int) -> int:
246 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits
247 # required, which is not what we want.
248 return ~v & 0o7777
251def parse_symbolic_mode(
252 symbolic_mode: str,
253 attribute_path: AttributePath | None,
254) -> Iterator[_SymbolicModeSegment]:
255 sticky_bit = 0o01000
256 setuid_bit = 0o04000
257 setgid_bit = 0o02000
258 mode_group_flag = 0o7
259 subject_mask_and_shift = {
260 "u": (mode_group_flag << 6, 6),
261 "g": (mode_group_flag << 3, 3),
262 "o": (mode_group_flag << 0, 0),
263 }
264 bits = {
265 "r": (0o4, 0o4),
266 "w": (0o2, 0o2),
267 "x": (0o1, 0o1),
268 "X": (0o0, 0o1),
269 "s": (0o0, 0o0), # Special-cased below (it depends on the subject)
270 "t": (0o0, 0o0), # Special-cased below
271 }
272 modifiers = {
273 "+",
274 "-",
275 "=",
276 }
277 in_path = f" in {attribute_path.path}" if attribute_path is not None else ""
278 for orig_part in symbolic_mode.split(","):
279 base_mode = 0
280 cap_x_mode = 0
281 part = orig_part
282 subjects = set()
283 while part and part[0] in ("u", "g", "o", "a"):
284 subject = part[0]
285 if subject == "a":
286 subjects = {"u", "g", "o"}
287 else:
288 subjects.add(subject)
289 part = part[1:]
290 if not subjects:
291 subjects = {"u", "g", "o"}
293 if part and part[0] in modifiers: 293 ↛ 295line 293 didn't jump to line 295 because the condition on line 293 was always true
294 modifier = part[0]
295 elif not part:
296 raise ValueError(
297 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")'
298 )
299 else:
300 raise ValueError(
301 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]'
302 f' (from "{orig_part}")'
303 )
304 part = part[1:]
305 s_bit_seen = False
306 t_bit_seen = False
307 while part and part[0] in bits:
308 if part == "s":
309 s_bit_seen = True
310 elif part == "t": 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 t_bit_seen = True
312 elif part in ("u", "g", "o"): 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 raise NotImplementedError(
314 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an"
315 " existing subject's permissions (a=u) in symbolic modes."
316 )
317 else:
318 matched_bits = bits.get(part[0])
319 if matched_bits is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 valid_bits = "".join(bits)
321 raise ValueError(
322 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters'
323 f' in "{valid_bits}" (from "{orig_part}")'
324 )
325 base_mode_bits, cap_x_mode_bits = bits[part[0]]
326 base_mode |= base_mode_bits
327 cap_x_mode |= cap_x_mode_bits
328 part = part[1:]
330 if part: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 raise ValueError(
332 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"'
333 )
335 final_base_mode = 0
336 final_cap_x_mode = 0
337 segment_mask = 0
338 for subject in subjects:
339 mask, shift = subject_mask_and_shift[subject]
340 segment_mask |= mask
341 final_base_mode |= base_mode << shift
342 final_cap_x_mode |= cap_x_mode << shift
343 if modifier == "=":
344 segment_mask |= setuid_bit if "u" in subjects else 0
345 segment_mask |= setgid_bit if "g" in subjects else 0
346 segment_mask |= sticky_bit if "o" in subjects else 0
347 if s_bit_seen:
348 if "u" in subjects: 348 ↛ 351line 348 didn't jump to line 351 because the condition on line 348 was always true
349 final_base_mode |= setuid_bit
350 final_cap_x_mode |= setuid_bit
351 if "g" in subjects:
352 final_base_mode |= setgid_bit
353 final_cap_x_mode |= setgid_bit
354 if t_bit_seen: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 final_base_mode |= sticky_bit
356 final_cap_x_mode |= sticky_bit
357 if modifier == "+":
358 final_base_mask = ~0
359 final_cap_x_mask = ~0
360 elif modifier == "-":
361 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode)
362 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode)
363 final_base_mode = 0
364 final_cap_x_mode = 0
365 elif modifier == "=":
366 # FIXME: Handle "unmentioned directory's setgid/setuid bits"
367 inverted_mask = _symbolic_mode_bit_inverse(segment_mask)
368 final_base_mask = inverted_mask
369 final_cap_x_mask = inverted_mask
370 else:
371 raise AssertionError(
372 f"Unknown modifier in symbolic mode: {modifier} - should not have happened"
373 )
374 yield _SymbolicModeSegment(
375 base_mode=final_base_mode,
376 base_mask=final_base_mask,
377 cap_x_mode=final_cap_x_mode,
378 cap_x_mask=final_cap_x_mask,
379 )
382def unpack_type(
383 orig_type: Any,
384 parsing_typed_dict_attribute: bool,
385) -> tuple[Any, Any | None, tuple[Any, ...]]:
386 raw_type = orig_type
387 origin = get_origin(raw_type)
388 args = get_args(raw_type)
389 if not parsing_typed_dict_attribute and repr(origin) in ( 389 ↛ 393line 389 didn't jump to line 393 because the condition on line 389 was never true
390 "typing.NotRequired",
391 "typing.Required",
392 ):
393 raise ValueError(
394 f"The Required/NotRequired attributes cannot be used outside typed dicts,"
395 f" the type that triggered the error: {orig_type}"
396 )
398 while repr(origin) in ("typing.NotRequired", "typing.Required"):
399 if len(args) != 1: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 raise ValueError(
401 f"The type {raw_type} should have exactly one type parameter"
402 )
403 raw_type = args[0]
404 origin = get_origin(raw_type)
405 args = get_args(raw_type)
407 assert not isinstance(raw_type, tuple)
409 return raw_type, origin, args
412def find_annotation(
413 annotations: tuple[Any, ...],
414 anno_class: type[MP],
415) -> MP | None:
416 m = None
417 for anno in annotations:
418 if isinstance(anno, anno_class):
419 if m is not None: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 raise ValueError(
421 f"The annotation {anno_class.__name__} was used more than once"
422 )
423 m = anno
424 return m