Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%
800 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
1import collections
2import dataclasses
3import typing
4from types import UnionType
5from typing import (
6 Any,
7 TypedDict,
8 get_type_hints,
9 Annotated,
10 get_args,
11 get_origin,
12 TypeVar,
13 Generic,
14 Optional,
15 cast,
16 Type,
17 Union,
18 List,
19 NotRequired,
20 Literal,
21 TYPE_CHECKING,
22)
23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container
26from debputy.manifest_parser.base_types import FileSystemMatchRule
27from debputy.manifest_parser.exceptions import (
28 ManifestParseException,
29)
30from debputy.manifest_parser.mapper_code import (
31 normalize_into_list,
32 wrap_into_list,
33 map_each_element,
34)
35from debputy.manifest_parser.parse_hints import (
36 ConditionalRequired,
37 DebputyParseHint,
38 TargetAttribute,
39 ManifestAttribute,
40 ConflictWithSourceAttribute,
41 NotPathHint,
42)
43from debputy.manifest_parser.parser_data import ParserContextData
44from debputy.manifest_parser.tagging_types import (
45 DebputyParsedContent,
46 DebputyDispatchableType,
47 TypeMapping,
48)
49from debputy.manifest_parser.util import (
50 AttributePath,
51 unpack_type,
52 find_annotation,
53 check_integration_mode,
54)
55from debputy.plugin.api.impl_types import (
56 DeclarativeInputParser,
57 TD,
58 ListWrappedDeclarativeInputParser,
59 DispatchingObjectParser,
60 DispatchingTableParser,
61 TTP,
62 TP,
63 InPackageContextParser,
64)
65from debputy.plugin.api.spec import (
66 ParserDocumentation,
67 DebputyIntegrationMode,
68 StandardParserAttributeDocumentation,
69 undocumented_attr,
70 ParserAttributeDocumentation,
71 reference_documentation,
72)
73from debputy.util import _info, _warn, assume_not_none
76if TYPE_CHECKING:
77 from debputy.lsp.diagnostics import LintSeverity
80try:
81 from Levenshtein import distance
82except ImportError:
83 _WARN_ONCE = False
85 def _detect_possible_typo(
86 _key: str,
87 _value: object,
88 _manifest_attributes: Mapping[str, "AttributeDescription"],
89 _path: "AttributePath",
90 ) -> None:
91 global _WARN_ONCE
92 if not _WARN_ONCE:
93 _WARN_ONCE = True
94 _info(
95 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
96 )
98else:
100 def _detect_possible_typo(
101 key: str,
102 value: object,
103 manifest_attributes: Mapping[str, "AttributeDescription"],
104 path: "AttributePath",
105 ) -> None:
106 k_len = len(key)
107 key_path = path[key]
108 matches: list[str] = []
109 current_match_strength = 0
110 for acceptable_key, attr in manifest_attributes.items():
111 if abs(k_len - len(acceptable_key)) > 2:
112 continue
113 d = distance(key, acceptable_key)
114 if d > 2:
115 continue
116 try:
117 attr.type_validator.ensure_type(value, key_path)
118 except ManifestParseException:
119 if attr.type_validator.base_type_match(value):
120 match_strength = 1
121 else:
122 match_strength = 0
123 else:
124 match_strength = 2
126 if match_strength < current_match_strength:
127 continue
128 if match_strength > current_match_strength:
129 current_match_strength = match_strength
130 matches.clear()
131 matches.append(acceptable_key)
133 if not matches:
134 return
135 ref = f'at "{path.path}"' if path else "at the manifest root level"
136 if len(matches) == 1:
137 possible_match = repr(matches[0])
138 _warn(
139 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
140 )
141 else:
142 matches.sort()
143 possible_matches = ", ".join(repr(a) for a in matches)
144 _warn(
145 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
146 )
149SF = TypeVar("SF")
150T = TypeVar("T")
151S = TypeVar("S")
154_NONE_TYPE = type(None)
157# These must be able to appear in an "isinstance" check and must be builtin types.
158BASIC_SIMPLE_TYPES = {
159 str: "string",
160 int: "integer",
161 bool: "boolean",
162}
165class AttributeTypeHandler:
166 __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
168 def __init__(
169 self,
170 description: str,
171 ensure_type: Callable[[Any, AttributePath], None],
172 *,
173 base_type: type[Any] | None = None,
174 mapper: None | (
175 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
176 ) = None,
177 ) -> None:
178 self._description = description
179 self._ensure_type = ensure_type
180 self.base_type = base_type
181 self.mapper = mapper
183 def describe_type(self) -> str:
184 return self._description
186 def ensure_type(self, obj: object, path: AttributePath) -> None:
187 self._ensure_type(obj, path)
189 def base_type_match(self, obj: object) -> bool:
190 base_type = self.base_type
191 return base_type is not None and isinstance(obj, base_type)
193 def map_type(
194 self,
195 value: Any,
196 path: AttributePath,
197 parser_context: Optional["ParserContextData"],
198 ) -> Any:
199 mapper = self.mapper
200 if mapper is not None:
201 return mapper(value, path, parser_context)
202 return value
204 def combine_mapper(
205 self,
206 mapper: None | (
207 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
208 ),
209 ) -> "AttributeTypeHandler":
210 if mapper is None:
211 return self
212 if self.mapper is not None:
213 m = self.mapper
215 def _combined_mapper(
216 value: Any,
217 path: AttributePath,
218 parser_context: Optional["ParserContextData"],
219 ) -> Any:
220 return mapper(m(value, path, parser_context), path, parser_context)
222 else:
223 _combined_mapper = mapper
225 return AttributeTypeHandler(
226 self._description,
227 self._ensure_type,
228 base_type=self.base_type,
229 mapper=_combined_mapper,
230 )
233@dataclasses.dataclass(slots=True)
234class AttributeDescription:
235 source_attribute_name: str
236 target_attribute: str
237 attribute_type: Any
238 type_validator: AttributeTypeHandler
239 annotations: tuple[Any, ...]
240 conflicting_attributes: frozenset[str]
241 conditional_required: Optional["ConditionalRequired"]
242 parse_hints: Optional["DetectedDebputyParseHint"] = None
243 is_optional: bool = False
246def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
247 if attribute_path.path_hint is not None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 return True
249 if isinstance(v, str):
250 attribute_path.path_hint = v
251 return True
252 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
253 attribute_path.path_hint = v[0]
254 return True
255 return False
258@dataclasses.dataclass(slots=True, frozen=True)
259class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
260 alt_form_parser: AttributeDescription
261 inline_reference_documentation: ParserDocumentation | None = None
262 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
264 def parse_input(
265 self,
266 value: object,
267 path: AttributePath,
268 *,
269 parser_context: Optional["ParserContextData"] = None,
270 ) -> TD:
271 check_integration_mode(
272 path,
273 parser_context,
274 self.expected_debputy_integration_mode,
275 )
276 if self.reference_documentation_url is not None:
277 doc_ref = f" (Documentation: {self.reference_documentation_url})"
278 else:
279 doc_ref = ""
281 alt_form_parser = self.alt_form_parser
282 if value is None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
284 if self.reference_documentation_url is not None:
285 doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
286 raise ManifestParseException(
287 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
288 )
289 _extract_path_hint(value, path)
290 alt_form_parser.type_validator.ensure_type(value, path)
291 attribute = alt_form_parser.target_attribute
292 alias_mapping = {
293 attribute: ("", None),
294 }
295 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
296 path.alias_mapping = alias_mapping
297 return cast("TD", {attribute: v})
300@dataclasses.dataclass(slots=True)
301class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
302 input_time_required_parameters: frozenset[str]
303 all_parameters: frozenset[str]
304 manifest_attributes: Mapping[str, "AttributeDescription"]
305 source_attributes: Mapping[str, "AttributeDescription"]
306 at_least_one_of: frozenset[frozenset[str]]
307 alt_form_parser: AttributeDescription | None
308 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset()
309 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None
310 inline_reference_documentation: ParserDocumentation | None = None
311 path_hint_source_attributes: Sequence[str] = tuple()
312 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
314 def _parse_alt_form(
315 self,
316 value: object,
317 path: AttributePath,
318 *,
319 parser_context: Optional["ParserContextData"] = None,
320 ) -> TD:
321 alt_form_parser = self.alt_form_parser
322 if alt_form_parser is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 raise ManifestParseException(
324 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}"
325 )
326 _extract_path_hint(value, path)
327 alt_form_parser.type_validator.ensure_type(value, path)
328 assert (
329 value is not None
330 ), "The alternative form was None, but the parser should have rejected None earlier."
331 attribute = alt_form_parser.target_attribute
332 alias_mapping = {
333 attribute: ("", None),
334 }
335 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
336 path.alias_mapping = alias_mapping
337 return cast("TD", {attribute: v})
339 def _validate_expected_keys(
340 self,
341 value: dict[Any, Any],
342 path: AttributePath,
343 *,
344 parser_context: Optional["ParserContextData"] = None,
345 ) -> None:
346 unknown_keys = value.keys() - self.all_parameters
347 doc_ref = self._doc_url_error_suffix()
348 if unknown_keys: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 for k in unknown_keys:
350 if isinstance(k, str):
351 _detect_possible_typo(k, value[k], self.manifest_attributes, path)
352 unused_keys = self.all_parameters - value.keys()
353 if unused_keys:
354 k = ", ".join(unused_keys)
355 raise ManifestParseException(
356 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}'
357 )
358 raise ManifestParseException(
359 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}'
360 )
361 missing_keys = self.input_time_required_parameters - value.keys()
362 if missing_keys:
363 required = ", ".join(repr(k) for k in sorted(missing_keys))
364 raise ManifestParseException(
365 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}"
366 )
367 for maybe_required in self.all_parameters - value.keys():
368 attr = self.manifest_attributes[maybe_required]
369 assert attr.conditional_required is None or parser_context is not None
370 if ( 370 ↛ 376line 370 didn't jump to line 376 because the condition on line 370 was never true
371 attr.conditional_required is not None
372 and attr.conditional_required.condition_applies(
373 assume_not_none(parser_context)
374 )
375 ):
376 reason = attr.conditional_required.reason
377 raise ManifestParseException(
378 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}'
379 )
380 for keyset in self.at_least_one_of:
381 matched_keys = value.keys() & keyset
382 if not matched_keys: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
384 raise ManifestParseException(
385 f"At least one of the following keys must be present at {path.path_container_lc}:"
386 f" {conditionally_required}{doc_ref}"
387 )
388 for group in self.mutually_exclusive_attributes:
389 matched = value.keys() & group
390 if len(matched) > 1: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 ck = ", ".join(repr(k) for k in sorted(matched))
392 raise ManifestParseException(
393 f"Could not parse {path.path_container_lc}: The following attributes are"
394 f" mutually exclusive: {ck}{doc_ref}"
395 )
397 def _parse_typed_dict_form(
398 self,
399 value: dict[Any, Any],
400 path: AttributePath,
401 *,
402 parser_context: Optional["ParserContextData"] = None,
403 ) -> TD:
404 self._validate_expected_keys(value, path, parser_context=parser_context)
405 result = {}
406 per_attribute_conflicts = self._per_attribute_conflicts()
407 alias_mapping = {}
408 for path_hint_source_attributes in self.path_hint_source_attributes:
409 v = value.get(path_hint_source_attributes)
410 if v is not None and _extract_path_hint(v, path):
411 break
412 for k, v in value.items():
413 attr = self.manifest_attributes[k]
414 matched = value.keys() & per_attribute_conflicts[k]
415 if matched: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 ck = ", ".join(repr(k) for k in sorted(matched))
417 raise ManifestParseException(
418 f'The attribute "{k}" at {path.path} cannot be used with the following'
419 f" attributes: {ck}{self._doc_url_error_suffix()}"
420 )
421 nk = attr.target_attribute
422 key_path = path[k]
423 attr.type_validator.ensure_type(v, key_path)
424 if v is None: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 continue
426 if k != nk:
427 alias_mapping[nk] = k, None
428 v = attr.type_validator.map_type(v, key_path, parser_context)
429 result[nk] = v
430 if alias_mapping:
431 path.alias_mapping = alias_mapping
432 return cast("TD", result)
434 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
435 doc_url = self.reference_documentation_url
436 if doc_url is not None:
437 if see_url_version: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 return f" Please see {doc_url} for the documentation."
439 return f" (Documentation: {doc_url})"
440 return ""
442 def parse_input(
443 self,
444 value: object,
445 path: AttributePath,
446 *,
447 parser_context: Optional["ParserContextData"] = None,
448 ) -> TD:
449 check_integration_mode(
450 path,
451 parser_context,
452 self.expected_debputy_integration_mode,
453 )
454 if value is None: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 form_note = " The attribute must be a mapping."
456 if self.alt_form_parser is not None:
457 form_note = (
458 " The attribute can be a mapping or a non-mapping format"
459 ' (usually, "non-mapping format" means a string or a list of strings).'
460 )
461 doc_ref = self._doc_url_error_suffix(see_url_version=True)
462 raise ManifestParseException(
463 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
464 )
466 if not isinstance(value, dict):
467 return self._parse_alt_form(value, path, parser_context=parser_context)
468 return self._parse_typed_dict_form(value, path, parser_context=parser_context)
470 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]:
471 conflicts = self._per_attribute_conflicts_cache
472 if conflicts is not None:
473 return conflicts
474 attrs = self.source_attributes
475 conflicts = {
476 a.source_attribute_name: frozenset(
477 attrs[ca].source_attribute_name for ca in a.conflicting_attributes
478 )
479 for a in attrs.values()
480 }
481 self._per_attribute_conflicts_cache = conflicts
482 return self._per_attribute_conflicts_cache
485def _is_path_attribute_candidate(
486 source_attribute: AttributeDescription, target_attribute: AttributeDescription
487) -> bool:
488 if (
489 source_attribute.parse_hints
490 and not source_attribute.parse_hints.applicable_as_path_hint
491 ):
492 return False
493 target_type = target_attribute.attribute_type
494 _, origin, args = unpack_type(target_type, False)
495 match_type = target_type
496 if origin == list:
497 match_type = args[0]
498 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
501if typing.is_typeddict(DebputyParsedContent): 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was always true
502 is_typeddict = typing.is_typeddict
503else:
505 def is_typeddict(t: Any) -> bool:
506 if typing.is_typeddict(t):
507 return True
508 return isinstance(t, type) and issubclass(t, DebputyParsedContent)
511class ParserGenerator:
512 def __init__(self) -> None:
513 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {}
514 self._object_parsers: dict[str, DispatchingObjectParser] = {}
515 self._table_parsers: dict[
516 type[DebputyDispatchableType], DispatchingTableParser[Any]
517 ] = {}
518 self._in_package_context_parser: dict[str, Any] = {}
520 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None:
521 existing = self._registered_types.get(mapped_type.target_type)
522 if existing is not None: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 raise ValueError(f"The type {existing} is already registered")
524 self._registered_types[mapped_type.target_type] = mapped_type
526 def get_mapped_type_from_target_type(
527 self,
528 mapped_type: type[T],
529 ) -> TypeMapping[Any, T] | None:
530 return self._registered_types.get(mapped_type)
532 def discard_mapped_type(self, mapped_type: type[T]) -> None:
533 del self._registered_types[mapped_type]
535 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None:
536 assert rt not in self._table_parsers
537 self._table_parsers[rt] = DispatchingTableParser(rt, path)
539 def add_object_parser(
540 self,
541 path: str,
542 *,
543 parser_documentation: ParserDocumentation | None = None,
544 expected_debputy_integration_mode: None | (
545 Container[DebputyIntegrationMode]
546 ) = None,
547 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
548 allow_unknown_keys: bool = False,
549 ) -> DispatchingObjectParser:
550 assert path not in self._in_package_context_parser
551 assert path not in self._object_parsers
552 object_parser = DispatchingObjectParser(
553 path,
554 parser_documentation=parser_documentation,
555 expected_debputy_integration_mode=expected_debputy_integration_mode,
556 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity,
557 allow_unknown_keys=allow_unknown_keys,
558 )
559 self._object_parsers[path] = object_parser
560 return object_parser
562 def add_in_package_context_parser(
563 self,
564 path: str,
565 delegate: DeclarativeInputParser[Any],
566 ) -> None:
567 assert path not in self._in_package_context_parser
568 assert path not in self._object_parsers
569 self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
571 @property
572 def dispatchable_table_parsers(
573 self,
574 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]:
575 return self._table_parsers
577 @property
578 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
579 return self._object_parsers
581 def dispatch_parser_table_for(
582 self, rule_type: TTP
583 ) -> DispatchingTableParser[TP] | None:
584 return cast(
585 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
586 )
588 def generate_parser(
589 self,
590 parsed_content: type[TD],
591 *,
592 source_content: SF | None = None,
593 allow_optional: bool = False,
594 inline_reference_documentation: ParserDocumentation | None = None,
595 expected_debputy_integration_mode: None | (
596 Container[DebputyIntegrationMode]
597 ) = None,
598 automatic_docs: None | (
599 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]]
600 ) = None,
601 ) -> DeclarativeInputParser[TD]:
602 """Derive a parser from a TypedDict
604 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
605 or two that are used as a description.
607 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
608 their types. As an example:
610 >>> class InstallDocsRule(DebputyParsedContent):
611 ... sources: List[str]
612 ... into: List[str]
613 >>> pg = ParserGenerator()
614 >>> simple_parser = pg.generate_parser(InstallDocsRule)
616 This will create a parser that would be able to interpret something like:
618 ```yaml
619 install-docs:
620 sources: ["docs/*"]
621 into: ["my-pkg"]
622 ```
624 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore,
625 you can also provide a TypedDict describing the input, enabling more flexibility:
627 >>> class InstallDocsRule(DebputyParsedContent):
628 ... sources: List[str]
629 ... into: List[str]
630 >>> class InputDocsRuleInputFormat(TypedDict):
631 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
632 ... sources: NotRequired[List[str]]
633 ... into: Union[str, List[str]]
634 >>> pg = ParserGenerator()
635 >>> flexible_parser = pg.generate_parser(
636 ... InstallDocsRule,
637 ... source_content=InputDocsRuleInputFormat,
638 ... )
640 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
641 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
642 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str
643 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that
644 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
645 both the previous input but also the following input:
647 ```yaml
648 install-docs:
649 source: "docs/*"
650 into: "my-pkg"
651 ```
653 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
654 with a single string in them. As noted above, the name of the `source` attribute will also be normalized
655 while parsing.
657 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
658 as part of the input. Example:
660 >>> class DiscardRule(DebputyParsedContent):
661 ... paths: List[str]
662 >>> class DiscardRuleInputDictFormat(TypedDict):
663 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
664 ... paths: NotRequired[List[str]]
665 >>> # This format relies on DiscardRule having exactly one Required attribute
666 >>> DiscardRuleInputWithAltFormat = Union[
667 ... DiscardRuleInputDictFormat,
668 ... str,
669 ... List[str],
670 ... ]
671 >>> pg = ParserGenerator()
672 >>> flexible_parser = pg.generate_parser(
673 ... DiscardRule,
674 ... source_content=DiscardRuleInputWithAltFormat,
675 ... )
678 Supported types:
679 * `List` - must have a fixed type argument (such as `List[str]`)
680 * `str`
681 * `int`
682 * `BinaryPackage` - When provided (or required), the user must provide a package name listed
683 in the debian/control file. The code receives the BinaryPackage instance
684 matching that input.
685 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
686 format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
687 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
688 available statically on all Debian systems (must be in `base-passwd`).
689 The user has multiple options for how to specify it (either via name or id).
690 * `FileSystemGroup` - When provided (or required), the user must a file system group that is
691 available statically on all Debian systems (must be in `base-passwd`).
692 The user has multiple options for how to specify it (either via name or id).
693 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
694 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
695 provides the `debputy' default `when` parameter for conditionals.
697 Supported special type-like parameters:
699 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
700 outermost level. Cannot vary between `parsed_content` and `source_content`.
701 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
702 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
703 Automapping (see below) is restricted to two members in the Union.
705 Notable non-supported types:
706 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
707 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
709 Automatic mapping rules from `source_content` to `parsed_content`:
710 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
711 `lambda value: value if isinstance(value, list) else [value]`
712 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
714 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
715 for concrete features that may be useful to you.
717 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
718 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers).
719 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a
720 `List[TypedDict[...]]`.
721 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
722 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
723 the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
724 Note you should never pass the parsed_content as source_content directly.
725 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
726 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
727 can keep this False).
728 :param inline_reference_documentation: Optionally, programmatic documentation
729 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the
730 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage.
731 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately
732 (resulting in a "compile time" failure rather than a "runtime" failure).
733 :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
734 """
735 orig_parsed_content = parsed_content
736 if source_content is parsed_content: 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true
737 raise ValueError(
738 "Do not provide source_content if it is the same as parsed_content"
739 )
740 is_list_wrapped = False
741 if get_origin(orig_parsed_content) == list:
742 parsed_content = get_args(orig_parsed_content)[0]
743 is_list_wrapped = True
745 if isinstance(parsed_content, type) and issubclass(
746 parsed_content, DebputyDispatchableType
747 ):
748 parser = self.dispatch_parser_table_for(parsed_content)
749 if parser is None: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true
750 raise ValueError(
751 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
752 f" The class {parsed_content.__qualname__} is not a pre-registered type."
753 )
754 # FIXME: Only the list wrapped version has documentation.
755 if is_list_wrapped: 755 ↛ 761line 755 didn't jump to line 761 because the condition on line 755 was always true
756 parser = ListWrappedDeclarativeInputParser(
757 parser,
758 inline_reference_documentation=inline_reference_documentation,
759 expected_debputy_integration_mode=expected_debputy_integration_mode,
760 )
761 return parser
763 if not is_typeddict(parsed_content): 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true
764 raise ValueError(
765 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
766 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
767 )
768 if is_list_wrapped and source_content is not None:
769 if get_origin(source_content) != list: 769 ↛ 770line 769 didn't jump to line 770 because the condition on line 769 was never true
770 raise ValueError(
771 "If the parsed_content is a List type, then source_format must be a List type as well."
772 )
773 source_content = get_args(source_content)[0]
775 target_attributes = self._parse_types(
776 parsed_content,
777 allow_source_attribute_annotations=source_content is None,
778 forbid_optional=not allow_optional,
779 )
780 required_target_parameters = frozenset(parsed_content.__required_keys__)
781 parsed_alt_form = None
782 non_mapping_source_only = False
784 if source_content is not None:
785 default_target_attribute = None
786 if len(required_target_parameters) == 1:
787 default_target_attribute = next(iter(required_target_parameters))
789 source_typed_dict, alt_source_forms = _extract_typed_dict(
790 source_content,
791 default_target_attribute,
792 )
793 if alt_source_forms:
794 parsed_alt_form = self._parse_alt_form(
795 alt_source_forms,
796 default_target_attribute,
797 )
798 if source_typed_dict is not None:
799 source_content_attributes = self._parse_types(
800 source_typed_dict,
801 allow_target_attribute_annotation=True,
802 allow_source_attribute_annotations=True,
803 forbid_optional=not allow_optional,
804 )
805 source_content_parameter = "source_content"
806 source_and_parsed_differs = True
807 else:
808 source_typed_dict = parsed_content
809 source_content_attributes = target_attributes
810 source_content_parameter = "parsed_content"
811 source_and_parsed_differs = True
812 non_mapping_source_only = True
813 else:
814 source_typed_dict = parsed_content
815 source_content_attributes = target_attributes
816 source_content_parameter = "parsed_content"
817 source_and_parsed_differs = False
819 sources = collections.defaultdict(set)
820 seen_targets = set()
821 seen_source_names: dict[str, str] = {}
822 source_attributes: dict[str, AttributeDescription] = {}
823 path_hint_source_attributes = []
825 for k in source_content_attributes:
826 ia = source_content_attributes[k]
828 ta = (
829 target_attributes.get(ia.target_attribute)
830 if source_and_parsed_differs
831 else ia
832 )
833 if ta is None: 833 ↛ 835line 833 didn't jump to line 835 because the condition on line 833 was never true
834 # Error message would be wrong if this assertion is false.
835 assert source_and_parsed_differs
836 raise ValueError(
837 f'The attribute "{k}" from the "source_content" parameter should have mapped'
838 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
839 )
840 if _is_path_attribute_candidate(ia, ta):
841 path_hint_source_attributes.append(ia.source_attribute_name)
842 existing_source_name = seen_source_names.get(ia.source_attribute_name)
843 if existing_source_name: 843 ↛ 844line 843 didn't jump to line 844 because the condition on line 843 was never true
844 raise ValueError(
845 f'The attribute "{k}" and "{existing_source_name}" both share the source name'
846 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
847 f' so only one attribute use "{ia.source_attribute_name}".'
848 )
849 seen_source_names[ia.source_attribute_name] = k
850 seen_targets.add(ta.target_attribute)
851 sources[ia.target_attribute].add(k)
852 if source_and_parsed_differs:
853 bridge_mapper = self._type_normalize(
854 k, ia.attribute_type, ta.attribute_type, False
855 )
856 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
857 source_attributes[k] = ia
859 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]:
860 return frozenset(
861 source_content_attributes[a].source_attribute_name for a in td_name
862 )
864 _check_attributes(
865 parsed_content,
866 source_typed_dict,
867 source_content_attributes,
868 sources,
869 )
871 at_least_one_of = frozenset(
872 _as_attr_names(g)
873 for k, g in sources.items()
874 if len(g) > 1 and k in required_target_parameters
875 )
877 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true
878 missing = ", ".join(
879 repr(k) for k in (target_attributes.keys() - seen_targets)
880 )
881 raise ValueError(
882 'The following attributes in "parsed_content" did not have a source field in "source_content":'
883 f" {missing}"
884 )
885 all_mutually_exclusive_fields = frozenset(
886 _as_attr_names(g) for g in sources.values() if len(g) > 1
887 )
889 all_parameters = (
890 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
891 )
892 _check_conflicts(
893 source_content_attributes,
894 source_typed_dict.__required_keys__,
895 all_parameters,
896 )
898 manifest_attributes = {
899 a.source_attribute_name: a for a in source_content_attributes.values()
900 }
902 if parsed_alt_form is not None:
903 target_attribute = parsed_alt_form.target_attribute
904 if ( 904 ↛ 909line 904 didn't jump to line 909 because the condition on line 904 was never true
905 target_attribute not in required_target_parameters
906 and required_target_parameters
907 or len(required_target_parameters) > 1
908 ):
909 raise NotImplementedError(
910 "When using alternative source formats (Union[TypedDict, ...]), then the"
911 " target must have at most one require parameter"
912 )
913 bridge_mapper = self._type_normalize(
914 target_attribute,
915 parsed_alt_form.attribute_type,
916 target_attributes[target_attribute].attribute_type,
917 False,
918 )
919 parsed_alt_form.type_validator = (
920 parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
921 )
923 inline_reference_documentation = (
924 _verify_and_auto_correct_inline_reference_documentation(
925 parsed_content,
926 source_typed_dict,
927 source_content_attributes,
928 inline_reference_documentation,
929 parsed_alt_form is not None,
930 automatic_docs,
931 )
932 )
933 if non_mapping_source_only:
934 parser = DeclarativeNonMappingInputParser(
935 assume_not_none(parsed_alt_form),
936 inline_reference_documentation=inline_reference_documentation,
937 expected_debputy_integration_mode=expected_debputy_integration_mode,
938 )
939 else:
940 parser = DeclarativeMappingInputParser(
941 _as_attr_names(source_typed_dict.__required_keys__),
942 _as_attr_names(all_parameters),
943 manifest_attributes,
944 source_attributes,
945 mutually_exclusive_attributes=all_mutually_exclusive_fields,
946 alt_form_parser=parsed_alt_form,
947 at_least_one_of=at_least_one_of,
948 inline_reference_documentation=inline_reference_documentation,
949 path_hint_source_attributes=tuple(path_hint_source_attributes),
950 expected_debputy_integration_mode=expected_debputy_integration_mode,
951 )
952 if is_list_wrapped:
953 parser = ListWrappedDeclarativeInputParser(
954 parser,
955 expected_debputy_integration_mode=expected_debputy_integration_mode,
956 )
957 return parser
959 def _as_type_validator(
960 self,
961 attribute: str,
962 provided_type: Any,
963 parsing_typed_dict_attribute: bool,
964 ) -> AttributeTypeHandler:
965 assert not isinstance(provided_type, tuple)
967 if isinstance(provided_type, type) and issubclass(
968 provided_type, DebputyDispatchableType
969 ):
970 return _dispatch_parser(provided_type)
972 unmapped_type = self._strip_mapped_types(
973 provided_type,
974 parsing_typed_dict_attribute,
975 )
976 type_normalizer = self._type_normalize(
977 attribute,
978 unmapped_type,
979 provided_type,
980 parsing_typed_dict_attribute,
981 )
982 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type(
983 unmapped_type,
984 parsing_typed_dict_attribute,
985 )
986 _, t_provided_orig, t_provided_args = unpack_type(
987 provided_type,
988 parsing_typed_dict_attribute,
989 )
991 if ( 991 ↛ 997line 991 didn't jump to line 997 because the condition on line 991 was never true
992 t_unmapped_orig == Union
993 and t_unmapped_args
994 and len(t_unmapped_args) == 2
995 and any(v is _NONE_TYPE for v in t_unmapped_args)
996 ):
997 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
998 actual_type = [a for a in args if a is not _NONE_TYPE][0]
999 validator = self._as_type_validator(
1000 attribute, actual_type, parsing_typed_dict_attribute
1001 )
1003 def _validator(v: Any, path: AttributePath) -> None:
1004 if v is None:
1005 return
1006 validator.ensure_type(v, path)
1008 return AttributeTypeHandler(
1009 validator.describe_type(),
1010 _validator,
1011 base_type=validator.base_type,
1012 mapper=type_normalizer,
1013 )
1015 if unmapped_type in BASIC_SIMPLE_TYPES:
1016 type_name = BASIC_SIMPLE_TYPES[unmapped_type]
1018 type_mapping = self._registered_types.get(provided_type)
1019 if type_mapping is not None:
1020 simple_type = f" ({type_name})"
1021 type_name = type_mapping.target_type.__name__
1022 else:
1023 simple_type = ""
1025 def _validator(v: Any, path: AttributePath) -> None:
1026 if not isinstance(v, unmapped_type):
1027 _validation_type_error(
1028 path, f"The attribute must be a {type_name}{simple_type}"
1029 )
1031 return AttributeTypeHandler(
1032 type_name,
1033 _validator,
1034 base_type=unmapped_type,
1035 mapper=type_normalizer,
1036 )
1037 if t_unmapped_orig == list:
1038 if not t_unmapped_args: 1038 ↛ 1039line 1038 didn't jump to line 1039 because the condition on line 1038 was never true
1039 raise ValueError(
1040 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
1041 )
1043 genetic_type = t_unmapped_args[0]
1044 key_mapper = self._as_type_validator(
1045 attribute,
1046 genetic_type,
1047 parsing_typed_dict_attribute,
1048 )
1050 def _validator(v: Any, path: AttributePath) -> None:
1051 if not isinstance(v, list): 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true
1052 _validation_type_error(path, "The attribute must be a list")
1053 for i, v in enumerate(v):
1054 key_mapper.ensure_type(v, path[i])
1056 list_mapper = (
1057 map_each_element(key_mapper.mapper)
1058 if key_mapper.mapper is not None
1059 else None
1060 )
1062 return AttributeTypeHandler(
1063 f"List of {key_mapper.describe_type()}",
1064 _validator,
1065 base_type=list,
1066 mapper=type_normalizer,
1067 ).combine_mapper(list_mapper)
1068 if is_typeddict(provided_type):
1069 subparser = self.generate_parser(cast("Type[TD]", provided_type))
1070 return AttributeTypeHandler(
1071 description=f"{provided_type.__name__} (Typed Mapping)",
1072 ensure_type=lambda v, ap: None,
1073 base_type=dict,
1074 mapper=lambda v, ap, cv: subparser.parse_input(
1075 v, ap, parser_context=cv
1076 ),
1077 )
1078 if t_unmapped_orig == dict:
1079 if not t_unmapped_args or len(t_unmapped_args) != 2: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true
1080 raise ValueError(
1081 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
1082 )
1083 if t_unmapped_args[0] != str: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true
1084 raise ValueError(
1085 f'The attribute "{attribute}" is Dict and has a non-str type as key.'
1086 " Currently, only `str` is supported (Dict[str, Y])"
1087 )
1088 key_mapper = self._as_type_validator(
1089 attribute,
1090 t_unmapped_args[0],
1091 parsing_typed_dict_attribute,
1092 )
1093 value_mapper = self._as_type_validator(
1094 attribute,
1095 t_unmapped_args[1],
1096 parsing_typed_dict_attribute,
1097 )
1099 if key_mapper.base_type is None: 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true
1100 raise ValueError(
1101 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
1102 f" without trivial base types (such as `str`) are not supported at the moment."
1103 )
1105 if value_mapper.mapper is not None: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true
1106 raise ValueError(
1107 f'The attribute "{attribute}" is Dict and the value requires mapping.'
1108 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
1109 " Better typing may come later"
1110 )
1112 def _validator(uv: Any, path: AttributePath) -> None:
1113 if not isinstance(uv, dict): 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 _validation_type_error(path, "The attribute must be a mapping")
1115 key_name = "the first key in the mapping"
1116 for i, (k, v) in enumerate(uv.items()):
1117 if not key_mapper.base_type_match(k): 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 kp = path.copy_with_path_hint(key_name)
1119 _validation_type_error(
1120 kp,
1121 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
1122 )
1123 key_name = f"the key after {k}"
1124 value_mapper.ensure_type(v, path[k])
1126 return AttributeTypeHandler(
1127 f"Mapping of {value_mapper.describe_type()}",
1128 _validator,
1129 base_type=dict,
1130 mapper=type_normalizer,
1131 ).combine_mapper(key_mapper.mapper)
1132 if t_unmapped_orig in (Union, UnionType):
1133 if _is_two_arg_x_list_x(t_provided_args):
1134 # Force the order to be "X, List[X]" as it simplifies the code
1135 x_list_x = (
1136 t_provided_args
1137 if get_origin(t_provided_args[1]) == list
1138 else (t_provided_args[1], t_provided_args[0])
1139 )
1141 # X, List[X] could match if X was List[Y]. However, our code below assumes
1142 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
1143 # case to avoid this assert and fall into the "generic case".
1144 assert get_origin(x_list_x[0]) != list
1145 x_subtype_checker = self._as_type_validator(
1146 attribute,
1147 x_list_x[0],
1148 parsing_typed_dict_attribute,
1149 )
1150 list_x_subtype_checker = self._as_type_validator(
1151 attribute,
1152 x_list_x[1],
1153 parsing_typed_dict_attribute,
1154 )
1155 type_description = x_subtype_checker.describe_type()
1156 type_description = f"{type_description} or a list of {type_description}"
1158 def _validator(v: Any, path: AttributePath) -> None:
1159 if isinstance(v, list):
1160 list_x_subtype_checker.ensure_type(v, path)
1161 else:
1162 x_subtype_checker.ensure_type(v, path)
1164 return AttributeTypeHandler(
1165 type_description,
1166 _validator,
1167 mapper=type_normalizer,
1168 )
1169 else:
1170 subtype_checker = [
1171 self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
1172 for a in t_unmapped_args
1173 ]
1174 type_description = "one-of: " + ", ".join(
1175 f"{sc.describe_type()}" for sc in subtype_checker
1176 )
1177 mapper = subtype_checker[0].mapper
1178 if any(mapper != sc.mapper for sc in subtype_checker): 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true
1179 raise ValueError(
1180 f'Cannot handle the union "{provided_type}" as the target types need different'
1181 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
1182 " where X is a non-collection type."
1183 )
1185 def _validator(v: Any, path: AttributePath) -> None:
1186 partial_matches = []
1187 for sc in subtype_checker: 1187 ↛ 1195line 1187 didn't jump to line 1195 because the loop on line 1187 didn't complete
1188 try:
1189 sc.ensure_type(v, path)
1190 return
1191 except ManifestParseException as e:
1192 if sc.base_type_match(v): 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true
1193 partial_matches.append((sc, e))
1195 if len(partial_matches) == 1:
1196 raise partial_matches[0][1]
1197 _validation_type_error(
1198 path, f"Could not match against: {type_description}"
1199 )
1201 return AttributeTypeHandler(
1202 type_description,
1203 _validator,
1204 mapper=type_normalizer,
1205 )
1206 if t_unmapped_orig == Literal:
1207 # We want "x" for string values; repr provides 'x'
1208 pretty = ", ".join(
1209 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args
1210 )
1212 def _validator(v: Any, path: AttributePath) -> None:
1213 if v not in t_unmapped_args:
1214 value_hint = ""
1215 if isinstance(v, str): 1215 ↛ 1217line 1215 didn't jump to line 1217 because the condition on line 1215 was always true
1216 value_hint = f"({v}) "
1217 _validation_type_error(
1218 path,
1219 f"Value {value_hint}must be one of the following literal values: {pretty}",
1220 )
1222 return AttributeTypeHandler(
1223 f"One of the following literal values: {pretty}",
1224 _validator,
1225 )
1227 if provided_type == Any: 1227 ↛ 1232line 1227 didn't jump to line 1232 because the condition on line 1227 was always true
1228 return AttributeTypeHandler(
1229 "any (unvalidated)",
1230 lambda *a: None,
1231 )
1232 raise ValueError(
1233 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
1234 )
1236 def _parse_types(
1237 self,
1238 spec: type[TypedDict],
1239 allow_target_attribute_annotation: bool = False,
1240 allow_source_attribute_annotations: bool = False,
1241 forbid_optional: bool = True,
1242 ) -> dict[str, AttributeDescription]:
1243 annotations = get_type_hints(spec, include_extras=True)
1244 return {
1245 k: self._attribute_description(
1246 k,
1247 t,
1248 k in spec.__required_keys__,
1249 allow_target_attribute_annotation=allow_target_attribute_annotation,
1250 allow_source_attribute_annotations=allow_source_attribute_annotations,
1251 forbid_optional=forbid_optional,
1252 )
1253 for k, t in annotations.items()
1254 }
1256 def _attribute_description(
1257 self,
1258 attribute: str,
1259 orig_td: Any,
1260 is_required: bool,
1261 forbid_optional: bool = True,
1262 allow_target_attribute_annotation: bool = False,
1263 allow_source_attribute_annotations: bool = False,
1264 ) -> AttributeDescription:
1265 td, anno, is_optional = _parse_type(
1266 attribute, orig_td, forbid_optional=forbid_optional
1267 )
1268 type_validator = self._as_type_validator(attribute, td, True)
1269 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1270 anno,
1271 f' Seen with attribute "{attribute}".',
1272 attribute,
1273 is_required,
1274 allow_target_attribute_annotation=allow_target_attribute_annotation,
1275 allow_source_attribute_annotations=allow_source_attribute_annotations,
1276 )
1277 return AttributeDescription(
1278 target_attribute=parsed_annotations.target_attribute,
1279 attribute_type=td,
1280 type_validator=type_validator,
1281 annotations=anno,
1282 is_optional=is_optional,
1283 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1284 conditional_required=parsed_annotations.conditional_required,
1285 source_attribute_name=assume_not_none(
1286 parsed_annotations.source_manifest_attribute
1287 ),
1288 parse_hints=parsed_annotations,
1289 )
1291 def _parse_alt_form(
1292 self,
1293 alt_form,
1294 default_target_attribute: str | None,
1295 ) -> AttributeDescription:
1296 td, anno, is_optional = _parse_type(
1297 "source_format alternative form",
1298 alt_form,
1299 forbid_optional=True,
1300 parsing_typed_dict_attribute=False,
1301 )
1302 type_validator = self._as_type_validator(
1303 "source_format alternative form",
1304 td,
1305 True,
1306 )
1307 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1308 anno,
1309 " The alternative for source_format.",
1310 None,
1311 False,
1312 default_target_attribute=default_target_attribute,
1313 allow_target_attribute_annotation=True,
1314 allow_source_attribute_annotations=False,
1315 )
1316 return AttributeDescription(
1317 target_attribute=parsed_annotations.target_attribute,
1318 attribute_type=td,
1319 type_validator=type_validator,
1320 annotations=anno,
1321 is_optional=is_optional,
1322 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1323 conditional_required=parsed_annotations.conditional_required,
1324 source_attribute_name="Alt form of the source_format",
1325 )
1327 def _union_narrowing(
1328 self,
1329 input_type: Any,
1330 target_type: Any,
1331 parsing_typed_dict_attribute: bool,
1332 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1333 _, input_orig, input_args = unpack_type(
1334 input_type, parsing_typed_dict_attribute
1335 )
1336 _, target_orig, target_args = unpack_type(
1337 target_type, parsing_typed_dict_attribute
1338 )
1340 if input_orig not in (Union, UnionType) or not input_args: 1340 ↛ 1341line 1340 didn't jump to line 1341 because the condition on line 1340 was never true
1341 raise ValueError("input_type must be a Union[...] with non-empty args")
1343 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
1344 # - Where X = Y or there is a simple standard transformation from X to Y.
1346 if target_orig not in (Union, UnionType, list) or not target_args:
1347 # Not supported
1348 return None
1350 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1350 ↛ 1352line 1350 didn't jump to line 1352 because the condition on line 1350 was never true
1351 # Not needed (identity mapping)
1352 return None
1354 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1354 ↛ 1356line 1354 didn't jump to line 1356 because the condition on line 1354 was never true
1355 # Not supported
1356 return None
1358 target_arg = target_args[0]
1359 simplified_type = self._strip_mapped_types(
1360 target_arg, parsing_typed_dict_attribute
1361 )
1362 acceptable_types = {
1363 target_arg,
1364 list[target_arg], # type: ignore
1365 List[target_arg], # type: ignore
1366 simplified_type,
1367 list[simplified_type], # type: ignore
1368 List[simplified_type], # type: ignore
1369 }
1370 target_format = (
1371 target_arg,
1372 list[target_arg], # type: ignore
1373 List[target_arg], # type: ignore
1374 )
1375 in_target_format = 0
1376 in_simple_format = 0
1377 for input_arg in input_args:
1378 if input_arg not in acceptable_types: 1378 ↛ 1380line 1378 didn't jump to line 1380 because the condition on line 1378 was never true
1379 # Not supported
1380 return None
1381 if input_arg in target_format:
1382 in_target_format += 1
1383 else:
1384 in_simple_format += 1
1386 assert in_simple_format or in_target_format
1388 if in_target_format and not in_simple_format:
1389 # Union[X, List[X]] -> List[X]
1390 return normalize_into_list
1391 mapped = self._registered_types[target_arg]
1392 if not in_target_format and in_simple_format: 1392 ↛ 1407line 1392 didn't jump to line 1407 because the condition on line 1392 was always true
1393 # Union[X, List[X]] -> List[Y]
1395 def _mapper_x_list_y(
1396 x: Any | list[Any],
1397 ap: AttributePath,
1398 pc: Optional["ParserContextData"],
1399 ) -> list[Any]:
1400 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1402 return [mapped.mapper(x, ap, pc) for x in in_list_form]
1404 return _mapper_x_list_y
1406 # Union[Y, List[X]] -> List[Y]
1407 if not isinstance(target_arg, type):
1408 raise ValueError(
1409 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
1410 f" not support mixed types. Please use either {simplified_type} or {target_arg}"
1411 f" in the source content (but both a mix of both)"
1412 )
1414 def _mapper_mixed_list_y(
1415 x: Any | list[Any],
1416 ap: AttributePath,
1417 pc: Optional["ParserContextData"],
1418 ) -> list[Any]:
1419 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1421 return [
1422 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
1423 for x in in_list_form
1424 ]
1426 return _mapper_mixed_list_y
1428 def _type_normalize(
1429 self,
1430 attribute: str,
1431 input_type: Any,
1432 target_type: Any,
1433 parsing_typed_dict_attribute: bool,
1434 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1435 if input_type == target_type:
1436 return None
1437 _, input_orig, input_args = unpack_type(
1438 input_type, parsing_typed_dict_attribute
1439 )
1440 _, target_orig, target_args = unpack_type(
1441 target_type,
1442 parsing_typed_dict_attribute,
1443 )
1444 if input_orig in (Union, UnionType):
1445 result = self._union_narrowing(
1446 input_type, target_type, parsing_typed_dict_attribute
1447 )
1448 if result:
1449 return result
1450 elif target_orig == list and target_args[0] == input_type:
1451 return wrap_into_list
1453 mapped = self._registered_types.get(target_type)
1454 if mapped is not None and input_type == mapped.source_type:
1455 # Source -> Target
1456 return mapped.mapper
1457 if target_orig == list and target_args: 1457 ↛ 1475line 1457 didn't jump to line 1475 because the condition on line 1457 was always true
1458 mapped = self._registered_types.get(target_args[0])
1459 if mapped is not None: 1459 ↛ 1475line 1459 didn't jump to line 1475 because the condition on line 1459 was always true
1460 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
1461 mapped_type: TypeMapping = mapped
1462 if input_type == mapped.source_type: 1462 ↛ 1464line 1462 didn't jump to line 1464 because the condition on line 1462 was never true
1463 # Source -> List[Target]
1464 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
1465 if ( 1465 ↛ 1475line 1465 didn't jump to line 1475 because the condition on line 1465 was always true
1466 input_orig == list
1467 and input_args
1468 and input_args[0] == mapped_type.source_type
1469 ):
1470 # List[Source] -> List[Target]
1471 return lambda xs, ap, pc: [
1472 mapped_type.mapper(x, ap, pc) for x in xs
1473 ]
1475 raise ValueError(
1476 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
1477 f" {input_type} to {target_type}"
1478 )
1480 def _strip_mapped_types(
1481 self, orig_td: Any, parsing_typed_dict_attribute: bool
1482 ) -> Any:
1483 m = self._registered_types.get(orig_td)
1484 if m is not None:
1485 return m.source_type
1486 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1487 if v == list:
1488 arg = args[0]
1489 m = self._registered_types.get(arg)
1490 if m:
1491 return list[m.source_type] # type: ignore
1492 if v in (Union, UnionType):
1493 stripped_args = tuple(
1494 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
1495 )
1496 if stripped_args != args:
1497 return Union[stripped_args]
1498 return orig_td
1501def _sort_key(attr: StandardParserAttributeDocumentation) -> Any:
1502 key = next(iter(attr.attributes))
1503 return attr.sort_category, key
1506def _apply_std_docs(
1507 std_doc_table: (
1508 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1509 ),
1510 source_format_typed_dict: type[Any],
1511 attribute_docs: Sequence[ParserAttributeDocumentation] | None,
1512) -> Sequence[ParserAttributeDocumentation] | None:
1513 if std_doc_table is None or not std_doc_table: 1513 ↛ 1516line 1513 didn't jump to line 1516 because the condition on line 1513 was always true
1514 return attribute_docs
1516 has_docs_for = set()
1517 if attribute_docs:
1518 for attribute_doc in attribute_docs:
1519 has_docs_for.update(attribute_doc.attributes)
1521 base_seen = set()
1522 std_docs_used = []
1524 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", []))
1525 base_seen.update(remaining_bases)
1526 while remaining_bases:
1527 base = remaining_bases.pop()
1528 new_bases_to_check = {
1529 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen
1530 }
1531 remaining_bases.update(new_bases_to_check)
1532 base_seen.update(new_bases_to_check)
1533 std_docs = std_doc_table.get(base)
1534 if std_docs:
1535 for std_doc in std_docs:
1536 if any(a in has_docs_for for a in std_doc.attributes):
1537 # If there is any overlap, do not add the docs
1538 continue
1539 has_docs_for.update(std_doc.attributes)
1540 std_docs_used.append(std_doc)
1542 if not std_docs_used:
1543 return attribute_docs
1544 docs = sorted(std_docs_used, key=_sort_key)
1545 if attribute_docs:
1546 # Plugin provided attributes first
1547 c = list(attribute_docs)
1548 c.extend(docs)
1549 docs = c
1550 return tuple(docs)
1553def _verify_and_auto_correct_inline_reference_documentation(
1554 parsed_content: type[TD],
1555 source_typed_dict: type[Any],
1556 source_content_attributes: Mapping[str, AttributeDescription],
1557 inline_reference_documentation: ParserDocumentation | None,
1558 has_alt_form: bool,
1559 automatic_docs: (
1560 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1561 ) = None,
1562) -> ParserDocumentation | None:
1563 orig_attribute_docs = (
1564 inline_reference_documentation.attribute_doc
1565 if inline_reference_documentation
1566 else None
1567 )
1568 attribute_docs = _apply_std_docs(
1569 automatic_docs,
1570 source_typed_dict,
1571 orig_attribute_docs,
1572 )
1573 if inline_reference_documentation is None and attribute_docs is None:
1574 return None
1575 changes = {}
1576 if attribute_docs:
1577 seen = set()
1578 had_any_custom_docs = False
1579 for attr_doc in attribute_docs:
1580 if not isinstance(attr_doc, StandardParserAttributeDocumentation):
1581 had_any_custom_docs = True
1582 for attr_name in attr_doc.attributes:
1583 attr = source_content_attributes.get(attr_name)
1584 if attr is None: 1584 ↛ 1585line 1584 didn't jump to line 1585 because the condition on line 1584 was never true
1585 raise ValueError(
1586 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1587 f' references an attribute "{attr_name}", which does not exist in the source format.'
1588 )
1589 if attr_name in seen: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true
1590 raise ValueError(
1591 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1592 f' has documentation for "{attr_name}" twice, which is not supported.'
1593 f" Please document it at most once"
1594 )
1595 seen.add(attr_name)
1596 undocumented = source_content_attributes.keys() - seen
1597 if undocumented: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true
1598 if had_any_custom_docs:
1599 undocumented_attrs = ", ".join(undocumented)
1600 raise ValueError(
1601 f"The following attributes were not documented for the source format of"
1602 f" {parsed_content.__qualname__}. If this is deliberate, then please"
1603 ' declare each them as undocumented (via undocumented_attr("foo")):'
1604 f" {undocumented_attrs}"
1605 )
1606 combined_docs = list(attribute_docs)
1607 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented))
1608 attribute_docs = combined_docs
1610 if attribute_docs and orig_attribute_docs != attribute_docs: 1610 ↛ 1611line 1610 didn't jump to line 1611 because the condition on line 1610 was never true
1611 assert attribute_docs is not None
1612 changes["attribute_doc"] = tuple(attribute_docs)
1614 if ( 1614 ↛ 1619line 1614 didn't jump to line 1619 because the condition on line 1614 was never true
1615 inline_reference_documentation is not None
1616 and inline_reference_documentation.alt_parser_description
1617 and not has_alt_form
1618 ):
1619 raise ValueError(
1620 "The inline_reference_documentation had documentation for an non-mapping format,"
1621 " but the source format does not have a non-mapping format."
1622 )
1623 if changes: 1623 ↛ 1624line 1623 didn't jump to line 1624 because the condition on line 1623 was never true
1624 if inline_reference_documentation is None:
1625 inline_reference_documentation = reference_documentation()
1626 return inline_reference_documentation.replace(**changes)
1627 return inline_reference_documentation
1630def _check_conflicts(
1631 input_content_attributes: dict[str, AttributeDescription],
1632 required_attributes: frozenset[str],
1633 all_attributes: frozenset[str],
1634) -> None:
1635 for attr_name, attr in input_content_attributes.items():
1636 if attr_name in required_attributes and attr.conflicting_attributes: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true
1637 c = ", ".join(repr(a) for a in attr.conflicting_attributes)
1638 raise ValueError(
1639 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
1640 " This makes it impossible to use these attributes. Either remove the attributes"
1641 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
1642 " optional (NotRequired)"
1643 )
1644 else:
1645 required_conflicts = attr.conflicting_attributes & required_attributes
1646 if required_conflicts: 1646 ↛ 1647line 1646 didn't jump to line 1647 because the condition on line 1646 was never true
1647 c = ", ".join(repr(a) for a in required_conflicts)
1648 raise ValueError(
1649 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
1650 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
1651 f" adjust the conflicts or make the listed attributes optional (NotRequired)"
1652 )
1653 unknown_attributes = attr.conflicting_attributes - all_attributes
1654 if unknown_attributes: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the condition on line 1654 was never true
1655 c = ", ".join(repr(a) for a in unknown_attributes)
1656 raise ValueError(
1657 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
1658 f" None of these attributes were declared in the input."
1659 )
1662def _check_attributes(
1663 content: type[TypedDict],
1664 input_content: type[TypedDict],
1665 input_content_attributes: dict[str, AttributeDescription],
1666 sources: Mapping[str, Collection[str]],
1667) -> None:
1668 target_required_keys = content.__required_keys__
1669 input_required_keys = input_content.__required_keys__
1670 all_input_keys = input_required_keys | input_content.__optional_keys__
1672 for input_name in all_input_keys:
1673 attr = input_content_attributes[input_name]
1674 target_name = attr.target_attribute
1675 source_names = sources[target_name]
1676 input_is_required = input_name in input_required_keys
1677 target_is_required = target_name in target_required_keys
1679 assert source_names
1681 if input_is_required and len(source_names) > 1: 1681 ↛ 1682line 1681 didn't jump to line 1682 because the condition on line 1681 was never true
1682 raise ValueError(
1683 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
1684 f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
1685 f' then there is no need for additional sources for "{target_name}". Alternatively,'
1686 f' "{input_name}" might be missing a NotRequired type'
1687 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
1688 )
1689 if not input_is_required and target_is_required and len(source_names) == 1: 1689 ↛ 1690line 1689 didn't jump to line 1690 because the condition on line 1689 was never true
1690 raise ValueError(
1691 f'The source attribute "{input_name}" is not marked as required and maps to'
1692 f' "{target_name}", which is marked as required. As there are no other attributes'
1693 f' mapping to "{target_name}", then "{input_name}" must be required as well'
1694 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
1695 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
1696 )
1699def _validation_type_error(path: AttributePath, message: str) -> None:
1700 raise ManifestParseException(
1701 f'The attribute "{path.path}" did not have a valid structure/type: {message}'
1702 )
1705def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool:
1706 if len(t_args) != 2:
1707 return False
1708 lhs, rhs = t_args
1709 if get_origin(lhs) == list:
1710 if get_origin(rhs) == list: 1710 ↛ 1713line 1710 didn't jump to line 1713 because the condition on line 1710 was never true
1711 # It could still match X, List[X] - but we do not allow this case for now as the caller
1712 # does not support it.
1713 return False
1714 l_args = get_args(lhs)
1715 return bool(l_args and l_args[0] == rhs)
1716 if get_origin(rhs) == list:
1717 r_args = get_args(rhs)
1718 return bool(r_args and r_args[0] == lhs)
1719 return False
1722def _extract_typed_dict(
1723 base_type,
1724 default_target_attribute: str | None,
1725) -> tuple[type[TypedDict] | None, Any]:
1726 if is_typeddict(base_type):
1727 return base_type, None
1728 _, origin, args = unpack_type(base_type, False)
1729 if origin != Union:
1730 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1730 ↛ 1731line 1730 didn't jump to line 1731 because the condition on line 1730 was never true
1731 raise ValueError(
1732 "The source_format cannot be nor contain a (non-TypedDict) dict"
1733 )
1734 return None, base_type
1735 typed_dicts = [x for x in args if is_typeddict(x)]
1736 if len(typed_dicts) > 1: 1736 ↛ 1737line 1736 didn't jump to line 1737 because the condition on line 1736 was never true
1737 raise ValueError(
1738 "When source_format is a Union, it must contain at most one TypedDict"
1739 )
1740 typed_dict = typed_dicts[0] if typed_dicts else None
1742 if any(x is None or x is _NONE_TYPE for x in args): 1742 ↛ 1743line 1742 didn't jump to line 1743 because the condition on line 1742 was never true
1743 raise ValueError(
1744 "The source_format cannot be nor contain Optional[X] or Union[X, None]"
1745 )
1747 if any( 1747 ↛ 1752line 1747 didn't jump to line 1752 because the condition on line 1747 was never true
1748 isinstance(x, type) and issubclass(x, (dict, Mapping))
1749 for x in args
1750 if x is not typed_dict
1751 ):
1752 raise ValueError(
1753 "The source_format cannot be nor contain a (non-TypedDict) dict"
1754 )
1755 remaining = [x for x in args if x is not typed_dict]
1756 has_target_attribute = False
1757 anno = None
1758 if len(remaining) == 1: 1758 ↛ 1759line 1758 didn't jump to line 1759 because the condition on line 1758 was never true
1759 base_type, anno, _ = _parse_type(
1760 "source_format alternative form",
1761 remaining[0],
1762 forbid_optional=True,
1763 parsing_typed_dict_attribute=False,
1764 )
1765 has_target_attribute = bool(anno) and any(
1766 isinstance(x, TargetAttribute) for x in anno
1767 )
1768 target_type = base_type
1769 else:
1770 target_type = Union[tuple(remaining)]
1772 if default_target_attribute is None and not has_target_attribute: 1772 ↛ 1773line 1772 didn't jump to line 1773 because the condition on line 1772 was never true
1773 raise ValueError(
1774 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
1775 " OR the parsed_content format must have exactly one attribute that is required."
1776 )
1777 if anno: 1777 ↛ 1778line 1777 didn't jump to line 1778 because the condition on line 1777 was never true
1778 final_anno = [target_type]
1779 final_anno.extend(anno)
1780 return typed_dict, Annotated[tuple(final_anno)]
1781 return typed_dict, target_type
1784def _dispatch_parse_generator(
1785 dispatch_type: type[DebputyDispatchableType],
1786) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
1787 def _dispatch_parse(
1788 value: Any,
1789 attribute_path: AttributePath,
1790 parser_context: Optional["ParserContextData"],
1791 ):
1792 assert parser_context is not None
1793 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
1794 return dispatching_parser.parse_input(
1795 value, attribute_path, parser_context=parser_context
1796 )
1798 return _dispatch_parse
1801def _dispatch_parser(
1802 dispatch_type: type[DebputyDispatchableType],
1803) -> AttributeTypeHandler:
1804 return AttributeTypeHandler(
1805 dispatch_type.__name__,
1806 lambda *a: None,
1807 mapper=_dispatch_parse_generator(dispatch_type),
1808 )
1811def _parse_type(
1812 attribute: str,
1813 orig_td: Any,
1814 forbid_optional: bool = True,
1815 parsing_typed_dict_attribute: bool = True,
1816) -> tuple[Any, tuple[Any, ...], bool]:
1817 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1818 md: tuple[Any, ...] = tuple()
1819 optional = False
1820 if v is not None:
1821 if v == Annotated:
1822 anno = get_args(td)
1823 md = anno[1:]
1824 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
1826 if td is _NONE_TYPE: 1826 ↛ 1827line 1826 didn't jump to line 1827 because the condition on line 1826 was never true
1827 raise ValueError(
1828 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
1829 " debputy manifest, so this attribute does not make sense in its current form."
1830 )
1831 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1831 ↛ 1832line 1831 didn't jump to line 1832 because the condition on line 1831 was never true
1832 raise ValueError(
1833 f'Detected use of Optional in "{attribute}", which is not allowed here.'
1834 " Please use NotRequired for optional fields"
1835 )
1837 return td, md, optional
1840def _normalize_attribute_name(attribute: str) -> str:
1841 if attribute.endswith("_"):
1842 attribute = attribute[:-1]
1843 return attribute.replace("_", "-")
1846@dataclasses.dataclass
1847class DetectedDebputyParseHint:
1848 target_attribute: str
1849 source_manifest_attribute: str | None
1850 conflict_with_source_attributes: frozenset[str]
1851 conditional_required: ConditionalRequired | None
1852 applicable_as_path_hint: bool
1854 @classmethod
1855 def parse_annotations(
1856 cls,
1857 anno: tuple[Any, ...],
1858 error_context: str,
1859 default_attribute_name: str | None,
1860 is_required: bool,
1861 default_target_attribute: str | None = None,
1862 allow_target_attribute_annotation: bool = False,
1863 allow_source_attribute_annotations: bool = False,
1864 ) -> "DetectedDebputyParseHint":
1865 target_attr_anno = find_annotation(anno, TargetAttribute)
1866 if target_attr_anno:
1867 if not allow_target_attribute_annotation: 1867 ↛ 1868line 1867 didn't jump to line 1868 because the condition on line 1867 was never true
1868 raise ValueError(
1869 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
1870 )
1871 target_attribute = target_attr_anno.attribute
1872 elif default_target_attribute is not None:
1873 target_attribute = default_target_attribute
1874 elif default_attribute_name is not None: 1874 ↛ 1877line 1874 didn't jump to line 1877 because the condition on line 1874 was always true
1875 target_attribute = default_attribute_name
1876 else:
1877 if default_attribute_name is None:
1878 raise ValueError(
1879 "allow_target_attribute_annotation must be True OR "
1880 "default_attribute_name/default_target_attribute must be not None"
1881 )
1882 raise ValueError(
1883 f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
1884 )
1885 source_attribute_anno = find_annotation(anno, ManifestAttribute)
1886 _source_attribute_allowed(
1887 allow_source_attribute_annotations, error_context, source_attribute_anno
1888 )
1889 if source_attribute_anno:
1890 source_attribute_name = source_attribute_anno.attribute
1891 elif default_attribute_name is not None:
1892 source_attribute_name = _normalize_attribute_name(default_attribute_name)
1893 else:
1894 source_attribute_name = None
1895 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
1896 if mutual_exclusive_with_anno:
1897 _source_attribute_allowed(
1898 allow_source_attribute_annotations,
1899 error_context,
1900 mutual_exclusive_with_anno,
1901 )
1902 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
1903 else:
1904 conflicting_attributes = frozenset()
1905 conditional_required = find_annotation(anno, ConditionalRequired)
1907 if conditional_required and is_required: 1907 ↛ 1908line 1907 didn't jump to line 1908 because the condition on line 1907 was never true
1908 if default_attribute_name is None:
1909 raise ValueError(
1910 "is_required cannot be True without default_attribute_name being not None"
1911 )
1912 raise ValueError(
1913 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
1914 ' Please make the attribute "NotRequired" or remove the conditional requirement.'
1915 )
1917 not_path_hint_anno = find_annotation(anno, NotPathHint)
1918 applicable_as_path_hint = not_path_hint_anno is None
1920 return DetectedDebputyParseHint(
1921 target_attribute=target_attribute,
1922 source_manifest_attribute=source_attribute_name,
1923 conflict_with_source_attributes=conflicting_attributes,
1924 conditional_required=conditional_required,
1925 applicable_as_path_hint=applicable_as_path_hint,
1926 )
1929def _source_attribute_allowed(
1930 source_attribute_allowed: bool,
1931 error_context: str,
1932 annotation: DebputyParseHint | None,
1933) -> None:
1934 if source_attribute_allowed or annotation is None: 1934 ↛ 1936line 1934 didn't jump to line 1936 because the condition on line 1934 was always true
1935 return
1936 raise ValueError(
1937 f'The annotation "{annotation}" cannot be used here. {error_context}'
1938 )