Coverage for src/debputy/manifest_parser/util.py: 89%
246 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2from typing import (
3 Iterator,
4 Union,
5 Optional,
6 List,
7 Tuple,
8 Mapping,
9 get_origin,
10 get_args,
11 Any,
12 Type,
13 TypeVar,
14 TYPE_CHECKING,
15 Iterable,
16 Container,
17 Literal,
18 FrozenSet,
19 cast,
20)
22from debputy.yaml.compat import CommentedBase
24from debputy.manifest_parser.exceptions import ManifestParseException
26if TYPE_CHECKING:
27 from debputy.manifest_parser.parser_data import ParserContextData
28 from debputy.plugin.api.spec import DebputyIntegrationMode, PackageTypeSelector
31MP = TypeVar("MP", bound="DebputyParseHint")
32StrOrInt = Union[str, int]
33AttributePathAliasMapping = Mapping[
34 StrOrInt, Tuple[StrOrInt, Optional["AttributePathAliasMapping"]]
35]
36LineReportKind = Literal["key", "value", "container"]
39_PACKAGE_TYPE_DEB_ONLY = frozenset(["deb"])
40_ALL_PACKAGE_TYPES = frozenset(["deb", "udeb"])
43def resolve_package_type_selectors(
44 package_type: "PackageTypeSelector",
45) -> FrozenSet[str]:
46 if package_type is _ALL_PACKAGE_TYPES or package_type is _PACKAGE_TYPE_DEB_ONLY:
47 return cast("FrozenSet[str]", package_type)
48 if isinstance(package_type, str):
49 return (
50 _PACKAGE_TYPE_DEB_ONLY
51 if package_type == "deb"
52 else frozenset([package_type])
53 )
54 else:
55 return frozenset(package_type)
58class AttributePath:
59 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint")
61 def __init__(
62 self,
63 parent: Optional["AttributePath"],
64 key: Optional[Union[str, int]],
65 *,
66 container: Optional[Any] = None,
67 alias_mapping: Optional[AttributePathAliasMapping] = None,
68 ) -> None:
69 self.parent = parent
70 self.container = container
71 self.name = key
72 self.path_hint: Optional[str] = None
73 self.alias_mapping = alias_mapping
75 @classmethod
76 def root_path(cls, container: Optional[Any]) -> "AttributePath":
77 return AttributePath(None, None, container=container)
79 @classmethod
80 def builtin_path(cls) -> "AttributePath":
81 return AttributePath(None, "$builtin$")
83 @classmethod
84 def test_path(cls) -> "AttributePath":
85 return AttributePath(None, "$test$")
87 def __bool__(self) -> bool:
88 return self.name is not None or self.parent is not None
90 def copy_with_path_hint(self, path_hint: str) -> "AttributePath":
91 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping)
92 p.path_hint = path_hint
93 return p
95 def path_segments(self) -> Iterable[Union[str, int]]:
96 segments = list(self._iter_path())
97 segments.reverse()
98 yield from (s.name for s in segments)
100 def _resolve_path(self, report_kind: LineReportKind) -> str:
101 parent = self.parent
102 key = self.name
103 if report_kind == "container":
104 key = parent.name if parent else None
105 parent = parent.parent if parent else None
106 container = parent.container if parent is not None else None
108 if isinstance(container, CommentedBase):
109 lc = container.lc
110 try:
111 if isinstance(key, str):
112 if report_kind == "key":
113 lc_data = lc.key(key)
114 else:
115 lc_data = lc.value(key)
116 else:
117 lc_data = lc.item(key)
118 except (AttributeError, RuntimeError, LookupError, TypeError):
119 lc_data = None
120 else:
121 lc_data = None
123 segments = list(self._iter_path())
124 segments.reverse()
125 parts: List[str] = []
126 path_hint = None
128 for s in segments:
129 k = s.name
130 s_path_hint = s.path_hint
131 if s_path_hint is not None:
132 path_hint = s_path_hint
133 if isinstance(k, int):
134 parts.append(f"[{k}]")
135 elif k is not None: 135 ↛ 128line 135 didn't jump to line 128 because the condition on line 135 was always true
136 if parts:
137 parts.append(".")
138 parts.append(k)
140 if lc_data is not None:
141 line_pos, col = lc_data
142 # Translate 0-based (index) to 1-based (line number)
143 line_pos += 1
144 parts.append(f" [Line {line_pos} column {col}]")
146 elif path_hint: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 parts.append(f" <Search for: {path_hint}>")
148 if not parts: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 return "document root"
150 return "".join(parts)
152 @property
153 def path_container_lc(self) -> str:
154 return self._resolve_path("container")
156 @property
157 def path_key_lc(self) -> str:
158 return self._resolve_path("key")
160 @property
161 def path(self) -> str:
162 return self._resolve_path("value")
164 def __str__(self) -> str:
165 return self.path
167 def __getitem__(self, item: Union[str, int]) -> "AttributePath":
168 alias_mapping = None
169 if self.alias_mapping:
170 match = self.alias_mapping.get(item)
171 if match:
172 item, alias_mapping = match
173 if item == "":
174 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`.
175 return AttributePath(
176 self.parent,
177 self.name,
178 alias_mapping=alias_mapping,
179 container=self.container,
180 )
181 container = self.container
182 if container is not None:
183 try:
184 child_container = self.container[item]
185 except (AttributeError, RuntimeError, LookupError, TypeError):
186 child_container = None
187 else:
188 child_container = None
189 return AttributePath(
190 self,
191 item,
192 alias_mapping=alias_mapping,
193 container=child_container,
194 )
196 def _iter_path(self) -> Iterator["AttributePath"]:
197 current = self
198 yield current
199 while True:
200 parent = current.parent
201 if not parent:
202 break
203 current = parent
204 yield current
207def check_integration_mode(
208 path: AttributePath,
209 parser_context: Optional["ParserContextData"] = None,
210 expected_debputy_integration_mode: Optional[
211 Container["DebputyIntegrationMode"]
212 ] = None,
213) -> None:
214 if expected_debputy_integration_mode is None:
215 return
216 if parser_context is None:
217 raise AssertionError(
218 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context"
219 )
220 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise ManifestParseException(
222 f"The attribute {path.path} cannot be used as it is not allowed for"
223 f" the current debputy integration mode ({parser_context.debputy_integration_mode})."
224 f" Please remove the manifest definition or change the integration mode"
225 )
228@dataclasses.dataclass(slots=True, frozen=True)
229class _SymbolicModeSegment:
230 base_mode: int
231 base_mask: int
232 cap_x_mode: int
233 cap_x_mask: int
235 def apply(self, current_mode: int, is_dir: bool) -> int:
236 if current_mode & 0o111 or is_dir:
237 chosen_mode = self.cap_x_mode
238 mode_mask = self.cap_x_mask
239 else:
240 chosen_mode = self.base_mode
241 mode_mask = self.base_mask
242 # set ("="): mode mask clears relevant segment and current_mode are the desired bits
243 # add ("+"): mode mask keeps everything and current_mode are the desired bits
244 # remove ("-"): mode mask clears relevant bits and current_mode are 0
245 return (current_mode & mode_mask) | chosen_mode
248def _symbolic_mode_bit_inverse(v: int) -> int:
249 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits
250 # required, which is not what we want.
251 return ~v & 0o7777
254def parse_symbolic_mode(
255 symbolic_mode: str,
256 attribute_path: Optional[AttributePath],
257) -> Iterator[_SymbolicModeSegment]:
258 sticky_bit = 0o01000
259 setuid_bit = 0o04000
260 setgid_bit = 0o02000
261 mode_group_flag = 0o7
262 subject_mask_and_shift = {
263 "u": (mode_group_flag << 6, 6),
264 "g": (mode_group_flag << 3, 3),
265 "o": (mode_group_flag << 0, 0),
266 }
267 bits = {
268 "r": (0o4, 0o4),
269 "w": (0o2, 0o2),
270 "x": (0o1, 0o1),
271 "X": (0o0, 0o1),
272 "s": (0o0, 0o0), # Special-cased below (it depends on the subject)
273 "t": (0o0, 0o0), # Special-cased below
274 }
275 modifiers = {
276 "+",
277 "-",
278 "=",
279 }
280 in_path = f" in {attribute_path.path}" if attribute_path is not None else ""
281 for orig_part in symbolic_mode.split(","):
282 base_mode = 0
283 cap_x_mode = 0
284 part = orig_part
285 subjects = set()
286 while part and part[0] in ("u", "g", "o", "a"):
287 subject = part[0]
288 if subject == "a":
289 subjects = {"u", "g", "o"}
290 else:
291 subjects.add(subject)
292 part = part[1:]
293 if not subjects:
294 subjects = {"u", "g", "o"}
296 if part and part[0] in modifiers: 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true
297 modifier = part[0]
298 elif not part:
299 raise ValueError(
300 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")'
301 )
302 else:
303 raise ValueError(
304 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]'
305 f' (from "{orig_part}")'
306 )
307 part = part[1:]
308 s_bit_seen = False
309 t_bit_seen = False
310 while part and part[0] in bits:
311 if part == "s":
312 s_bit_seen = True
313 elif part == "t": 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 t_bit_seen = True
315 elif part in ("u", "g", "o"): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 raise NotImplementedError(
317 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an"
318 " existing subject's permissions (a=u) in symbolic modes."
319 )
320 else:
321 matched_bits = bits.get(part[0])
322 if matched_bits is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 valid_bits = "".join(bits)
324 raise ValueError(
325 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters'
326 f' in "{valid_bits}" (from "{orig_part}")'
327 )
328 base_mode_bits, cap_x_mode_bits = bits[part[0]]
329 base_mode |= base_mode_bits
330 cap_x_mode |= cap_x_mode_bits
331 part = part[1:]
333 if part: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 raise ValueError(
335 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"'
336 )
338 final_base_mode = 0
339 final_cap_x_mode = 0
340 segment_mask = 0
341 for subject in subjects:
342 mask, shift = subject_mask_and_shift[subject]
343 segment_mask |= mask
344 final_base_mode |= base_mode << shift
345 final_cap_x_mode |= cap_x_mode << shift
346 if modifier == "=":
347 segment_mask |= setuid_bit if "u" in subjects else 0
348 segment_mask |= setgid_bit if "g" in subjects else 0
349 segment_mask |= sticky_bit if "o" in subjects else 0
350 if s_bit_seen:
351 if "u" in subjects: 351 ↛ 354line 351 didn't jump to line 354 because the condition on line 351 was always true
352 final_base_mode |= setuid_bit
353 final_cap_x_mode |= setuid_bit
354 if "g" in subjects:
355 final_base_mode |= setgid_bit
356 final_cap_x_mode |= setgid_bit
357 if t_bit_seen: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 final_base_mode |= sticky_bit
359 final_cap_x_mode |= sticky_bit
360 if modifier == "+":
361 final_base_mask = ~0
362 final_cap_x_mask = ~0
363 elif modifier == "-":
364 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode)
365 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode)
366 final_base_mode = 0
367 final_cap_x_mode = 0
368 elif modifier == "=":
369 # FIXME: Handle "unmentioned directory's setgid/setuid bits"
370 inverted_mask = _symbolic_mode_bit_inverse(segment_mask)
371 final_base_mask = inverted_mask
372 final_cap_x_mask = inverted_mask
373 else:
374 raise AssertionError(
375 f"Unknown modifier in symbolic mode: {modifier} - should not have happened"
376 )
377 yield _SymbolicModeSegment(
378 base_mode=final_base_mode,
379 base_mask=final_base_mask,
380 cap_x_mode=final_cap_x_mode,
381 cap_x_mask=final_cap_x_mask,
382 )
385def unpack_type(
386 orig_type: Any,
387 parsing_typed_dict_attribute: bool,
388) -> Tuple[Any, Optional[Any], Tuple[Any, ...]]:
389 raw_type = orig_type
390 origin = get_origin(raw_type)
391 args = get_args(raw_type)
392 if not parsing_typed_dict_attribute and repr(origin) in ( 392 ↛ 396line 392 didn't jump to line 396 because the condition on line 392 was never true
393 "typing.NotRequired",
394 "typing.Required",
395 ):
396 raise ValueError(
397 f"The Required/NotRequired attributes cannot be used outside typed dicts,"
398 f" the type that triggered the error: {orig_type}"
399 )
401 while repr(origin) in ("typing.NotRequired", "typing.Required"):
402 if len(args) != 1: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 raise ValueError(
404 f"The type {raw_type} should have exactly one type parameter"
405 )
406 raw_type = args[0]
407 origin = get_origin(raw_type)
408 args = get_args(raw_type)
410 assert not isinstance(raw_type, tuple)
412 return raw_type, origin, args
415def find_annotation(
416 annotations: Tuple[Any, ...],
417 anno_class: Type[MP],
418) -> Optional[MP]:
419 m = None
420 for anno in annotations:
421 if isinstance(anno, anno_class):
422 if m is not None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 raise ValueError(
424 f"The annotation {anno_class.__name__} was used more than once"
425 )
426 m = anno
427 return m