Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%
798 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import collections
2import dataclasses
3import typing
4from typing import (
5 Any,
6 Callable,
7 Tuple,
8 TypedDict,
9 Dict,
10 get_type_hints,
11 Annotated,
12 get_args,
13 get_origin,
14 TypeVar,
15 Generic,
16 FrozenSet,
17 Mapping,
18 Optional,
19 cast,
20 Type,
21 Union,
22 List,
23 Collection,
24 NotRequired,
25 Iterable,
26 Literal,
27 Sequence,
28 Container,
29 TYPE_CHECKING,
30)
33from debputy.manifest_parser.base_types import FileSystemMatchRule
34from debputy.manifest_parser.exceptions import (
35 ManifestParseException,
36)
37from debputy.manifest_parser.mapper_code import (
38 normalize_into_list,
39 wrap_into_list,
40 map_each_element,
41)
42from debputy.manifest_parser.parse_hints import (
43 ConditionalRequired,
44 DebputyParseHint,
45 TargetAttribute,
46 ManifestAttribute,
47 ConflictWithSourceAttribute,
48 NotPathHint,
49)
50from debputy.manifest_parser.parser_data import ParserContextData
51from debputy.manifest_parser.tagging_types import (
52 DebputyParsedContent,
53 DebputyDispatchableType,
54 TypeMapping,
55)
56from debputy.manifest_parser.util import (
57 AttributePath,
58 unpack_type,
59 find_annotation,
60 check_integration_mode,
61)
62from debputy.plugin.api.impl_types import (
63 DeclarativeInputParser,
64 TD,
65 ListWrappedDeclarativeInputParser,
66 DispatchingObjectParser,
67 DispatchingTableParser,
68 TTP,
69 TP,
70 InPackageContextParser,
71)
72from debputy.plugin.api.spec import (
73 ParserDocumentation,
74 DebputyIntegrationMode,
75 StandardParserAttributeDocumentation,
76 undocumented_attr,
77 ParserAttributeDocumentation,
78 reference_documentation,
79)
80from debputy.util import _info, _warn, assume_not_none
83if TYPE_CHECKING:
84 from debputy.lsp.diagnostics import LintSeverity
87try:
88 from Levenshtein import distance
89except ImportError:
90 _WARN_ONCE = False
92 def _detect_possible_typo(
93 _key: str,
94 _value: object,
95 _manifest_attributes: Mapping[str, "AttributeDescription"],
96 _path: "AttributePath",
97 ) -> None:
98 global _WARN_ONCE
99 if not _WARN_ONCE:
100 _WARN_ONCE = True
101 _info(
102 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
103 )
105else:
107 def _detect_possible_typo(
108 key: str,
109 value: object,
110 manifest_attributes: Mapping[str, "AttributeDescription"],
111 path: "AttributePath",
112 ) -> None:
113 k_len = len(key)
114 key_path = path[key]
115 matches: List[str] = []
116 current_match_strength = 0
117 for acceptable_key, attr in manifest_attributes.items():
118 if abs(k_len - len(acceptable_key)) > 2:
119 continue
120 d = distance(key, acceptable_key)
121 if d > 2:
122 continue
123 try:
124 attr.type_validator.ensure_type(value, key_path)
125 except ManifestParseException:
126 if attr.type_validator.base_type_match(value):
127 match_strength = 1
128 else:
129 match_strength = 0
130 else:
131 match_strength = 2
133 if match_strength < current_match_strength:
134 continue
135 if match_strength > current_match_strength:
136 current_match_strength = match_strength
137 matches.clear()
138 matches.append(acceptable_key)
140 if not matches:
141 return
142 ref = f'at "{path.path}"' if path else "at the manifest root level"
143 if len(matches) == 1:
144 possible_match = repr(matches[0])
145 _warn(
146 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
147 )
148 else:
149 matches.sort()
150 possible_matches = ", ".join(repr(a) for a in matches)
151 _warn(
152 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
153 )
156SF = TypeVar("SF")
157T = TypeVar("T")
158S = TypeVar("S")
161_NONE_TYPE = type(None)
164# These must be able to appear in an "isinstance" check and must be builtin types.
165BASIC_SIMPLE_TYPES = {
166 str: "string",
167 int: "integer",
168 bool: "boolean",
169}
172class AttributeTypeHandler:
173 __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
175 def __init__(
176 self,
177 description: str,
178 ensure_type: Callable[[Any, AttributePath], None],
179 *,
180 base_type: Optional[Type[Any]] = None,
181 mapper: Optional[
182 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
183 ] = None,
184 ) -> None:
185 self._description = description
186 self._ensure_type = ensure_type
187 self.base_type = base_type
188 self.mapper = mapper
190 def describe_type(self) -> str:
191 return self._description
193 def ensure_type(self, obj: object, path: AttributePath) -> None:
194 self._ensure_type(obj, path)
196 def base_type_match(self, obj: object) -> bool:
197 base_type = self.base_type
198 return base_type is not None and isinstance(obj, base_type)
200 def map_type(
201 self,
202 value: Any,
203 path: AttributePath,
204 parser_context: Optional["ParserContextData"],
205 ) -> Any:
206 mapper = self.mapper
207 if mapper is not None:
208 return mapper(value, path, parser_context)
209 return value
211 def combine_mapper(
212 self,
213 mapper: Optional[
214 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
215 ],
216 ) -> "AttributeTypeHandler":
217 if mapper is None:
218 return self
219 if self.mapper is not None:
220 m = self.mapper
222 def _combined_mapper(
223 value: Any,
224 path: AttributePath,
225 parser_context: Optional["ParserContextData"],
226 ) -> Any:
227 return mapper(m(value, path, parser_context), path, parser_context)
229 else:
230 _combined_mapper = mapper
232 return AttributeTypeHandler(
233 self._description,
234 self._ensure_type,
235 base_type=self.base_type,
236 mapper=_combined_mapper,
237 )
240@dataclasses.dataclass(slots=True)
241class AttributeDescription:
242 source_attribute_name: str
243 target_attribute: str
244 attribute_type: Any
245 type_validator: AttributeTypeHandler
246 annotations: Tuple[Any, ...]
247 conflicting_attributes: FrozenSet[str]
248 conditional_required: Optional["ConditionalRequired"]
249 parse_hints: Optional["DetectedDebputyParseHint"] = None
250 is_optional: bool = False
253def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
254 if attribute_path.path_hint is not None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 return True
256 if isinstance(v, str):
257 attribute_path.path_hint = v
258 return True
259 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
260 attribute_path.path_hint = v[0]
261 return True
262 return False
265@dataclasses.dataclass(slots=True, frozen=True)
266class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
267 alt_form_parser: AttributeDescription
268 inline_reference_documentation: Optional[ParserDocumentation] = None
269 expected_debputy_integration_mode: Optional[Container[DebputyIntegrationMode]] = (
270 None
271 )
273 def parse_input(
274 self,
275 value: object,
276 path: AttributePath,
277 *,
278 parser_context: Optional["ParserContextData"] = None,
279 ) -> TD:
280 check_integration_mode(
281 path,
282 parser_context,
283 self.expected_debputy_integration_mode,
284 )
285 if self.reference_documentation_url is not None:
286 doc_ref = f" (Documentation: {self.reference_documentation_url})"
287 else:
288 doc_ref = ""
290 alt_form_parser = self.alt_form_parser
291 if value is None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
293 if self.reference_documentation_url is not None:
294 doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
295 raise ManifestParseException(
296 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
297 )
298 _extract_path_hint(value, path)
299 alt_form_parser.type_validator.ensure_type(value, path)
300 attribute = alt_form_parser.target_attribute
301 alias_mapping = {
302 attribute: ("", None),
303 }
304 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
305 path.alias_mapping = alias_mapping
306 return cast("TD", {attribute: v})
309@dataclasses.dataclass(slots=True)
310class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
311 input_time_required_parameters: FrozenSet[str]
312 all_parameters: FrozenSet[str]
313 manifest_attributes: Mapping[str, "AttributeDescription"]
314 source_attributes: Mapping[str, "AttributeDescription"]
315 at_least_one_of: FrozenSet[FrozenSet[str]]
316 alt_form_parser: Optional[AttributeDescription]
317 mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset()
318 _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None
319 inline_reference_documentation: Optional[ParserDocumentation] = None
320 path_hint_source_attributes: Sequence[str] = tuple()
321 expected_debputy_integration_mode: Optional[Container[DebputyIntegrationMode]] = (
322 None
323 )
325 def _parse_alt_form(
326 self,
327 value: object,
328 path: AttributePath,
329 *,
330 parser_context: Optional["ParserContextData"] = None,
331 ) -> TD:
332 alt_form_parser = self.alt_form_parser
333 if alt_form_parser is None: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 raise ManifestParseException(
335 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}"
336 )
337 _extract_path_hint(value, path)
338 alt_form_parser.type_validator.ensure_type(value, path)
339 assert (
340 value is not None
341 ), "The alternative form was None, but the parser should have rejected None earlier."
342 attribute = alt_form_parser.target_attribute
343 alias_mapping = {
344 attribute: ("", None),
345 }
346 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
347 path.alias_mapping = alias_mapping
348 return cast("TD", {attribute: v})
350 def _validate_expected_keys(
351 self,
352 value: Dict[Any, Any],
353 path: AttributePath,
354 *,
355 parser_context: Optional["ParserContextData"] = None,
356 ) -> None:
357 unknown_keys = value.keys() - self.all_parameters
358 doc_ref = self._doc_url_error_suffix()
359 if unknown_keys: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 for k in unknown_keys:
361 if isinstance(k, str):
362 _detect_possible_typo(k, value[k], self.manifest_attributes, path)
363 unused_keys = self.all_parameters - value.keys()
364 if unused_keys:
365 k = ", ".join(unused_keys)
366 raise ManifestParseException(
367 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}'
368 )
369 raise ManifestParseException(
370 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}'
371 )
372 missing_keys = self.input_time_required_parameters - value.keys()
373 if missing_keys:
374 required = ", ".join(repr(k) for k in sorted(missing_keys))
375 raise ManifestParseException(
376 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}"
377 )
378 for maybe_required in self.all_parameters - value.keys():
379 attr = self.manifest_attributes[maybe_required]
380 assert attr.conditional_required is None or parser_context is not None
381 if ( 381 ↛ 387line 381 didn't jump to line 387 because the condition on line 381 was never true
382 attr.conditional_required is not None
383 and attr.conditional_required.condition_applies(
384 assume_not_none(parser_context)
385 )
386 ):
387 reason = attr.conditional_required.reason
388 raise ManifestParseException(
389 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}'
390 )
391 for keyset in self.at_least_one_of:
392 matched_keys = value.keys() & keyset
393 if not matched_keys: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
395 raise ManifestParseException(
396 f"At least one of the following keys must be present at {path.path_container_lc}:"
397 f" {conditionally_required}{doc_ref}"
398 )
399 for group in self.mutually_exclusive_attributes:
400 matched = value.keys() & group
401 if len(matched) > 1: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true
402 ck = ", ".join(repr(k) for k in sorted(matched))
403 raise ManifestParseException(
404 f"Could not parse {path.path_container_lc}: The following attributes are"
405 f" mutually exclusive: {ck}{doc_ref}"
406 )
408 def _parse_typed_dict_form(
409 self,
410 value: Dict[Any, Any],
411 path: AttributePath,
412 *,
413 parser_context: Optional["ParserContextData"] = None,
414 ) -> TD:
415 self._validate_expected_keys(value, path, parser_context=parser_context)
416 result = {}
417 per_attribute_conflicts = self._per_attribute_conflicts()
418 alias_mapping = {}
419 for path_hint_source_attributes in self.path_hint_source_attributes:
420 v = value.get(path_hint_source_attributes)
421 if v is not None and _extract_path_hint(v, path):
422 break
423 for k, v in value.items():
424 attr = self.manifest_attributes[k]
425 matched = value.keys() & per_attribute_conflicts[k]
426 if matched: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 ck = ", ".join(repr(k) for k in sorted(matched))
428 raise ManifestParseException(
429 f'The attribute "{k}" at {path.path} cannot be used with the following'
430 f" attributes: {ck}{self._doc_url_error_suffix()}"
431 )
432 nk = attr.target_attribute
433 key_path = path[k]
434 attr.type_validator.ensure_type(v, key_path)
435 if v is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 continue
437 if k != nk:
438 alias_mapping[nk] = k, None
439 v = attr.type_validator.map_type(v, key_path, parser_context)
440 result[nk] = v
441 if alias_mapping:
442 path.alias_mapping = alias_mapping
443 return cast("TD", result)
445 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
446 doc_url = self.reference_documentation_url
447 if doc_url is not None:
448 if see_url_version: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 return f" Please see {doc_url} for the documentation."
450 return f" (Documentation: {doc_url})"
451 return ""
453 def parse_input(
454 self,
455 value: object,
456 path: AttributePath,
457 *,
458 parser_context: Optional["ParserContextData"] = None,
459 ) -> TD:
460 check_integration_mode(
461 path,
462 parser_context,
463 self.expected_debputy_integration_mode,
464 )
465 if value is None: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 form_note = " The attribute must be a mapping."
467 if self.alt_form_parser is not None:
468 form_note = (
469 " The attribute can be a mapping or a non-mapping format"
470 ' (usually, "non-mapping format" means a string or a list of strings).'
471 )
472 doc_ref = self._doc_url_error_suffix(see_url_version=True)
473 raise ManifestParseException(
474 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
475 )
477 if not isinstance(value, dict):
478 return self._parse_alt_form(value, path, parser_context=parser_context)
479 return self._parse_typed_dict_form(value, path, parser_context=parser_context)
481 def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]:
482 conflicts = self._per_attribute_conflicts_cache
483 if conflicts is not None:
484 return conflicts
485 attrs = self.source_attributes
486 conflicts = {
487 a.source_attribute_name: frozenset(
488 attrs[ca].source_attribute_name for ca in a.conflicting_attributes
489 )
490 for a in attrs.values()
491 }
492 self._per_attribute_conflicts_cache = conflicts
493 return self._per_attribute_conflicts_cache
496def _is_path_attribute_candidate(
497 source_attribute: AttributeDescription, target_attribute: AttributeDescription
498) -> bool:
499 if (
500 source_attribute.parse_hints
501 and not source_attribute.parse_hints.applicable_as_path_hint
502 ):
503 return False
504 target_type = target_attribute.attribute_type
505 _, origin, args = unpack_type(target_type, False)
506 match_type = target_type
507 if origin == list:
508 match_type = args[0]
509 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
512if typing.is_typeddict(DebputyParsedContent): 512 ↛ 516line 512 didn't jump to line 516 because the condition on line 512 was always true
513 is_typeddict = typing.is_typeddict
514else:
516 def is_typeddict(t: Any) -> bool:
517 if typing.is_typeddict(t):
518 return True
519 return isinstance(t, type) and issubclass(t, DebputyParsedContent)
522class ParserGenerator:
523 def __init__(self) -> None:
524 self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {}
525 self._object_parsers: Dict[str, DispatchingObjectParser] = {}
526 self._table_parsers: Dict[
527 Type[DebputyDispatchableType], DispatchingTableParser[Any]
528 ] = {}
529 self._in_package_context_parser: Dict[str, Any] = {}
531 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None:
532 existing = self._registered_types.get(mapped_type.target_type)
533 if existing is not None: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 raise ValueError(f"The type {existing} is already registered")
535 self._registered_types[mapped_type.target_type] = mapped_type
537 def get_mapped_type_from_target_type(
538 self,
539 mapped_type: Type[T],
540 ) -> Optional[TypeMapping[Any, T]]:
541 return self._registered_types.get(mapped_type)
543 def discard_mapped_type(self, mapped_type: Type[T]) -> None:
544 del self._registered_types[mapped_type]
546 def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None:
547 assert rt not in self._table_parsers
548 self._table_parsers[rt] = DispatchingTableParser(rt, path)
550 def add_object_parser(
551 self,
552 path: str,
553 *,
554 parser_documentation: Optional[ParserDocumentation] = None,
555 expected_debputy_integration_mode: Optional[
556 Container[DebputyIntegrationMode]
557 ] = None,
558 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
559 allow_unknown_keys: bool = False,
560 ) -> DispatchingObjectParser:
561 assert path not in self._in_package_context_parser
562 assert path not in self._object_parsers
563 object_parser = DispatchingObjectParser(
564 path,
565 parser_documentation=parser_documentation,
566 expected_debputy_integration_mode=expected_debputy_integration_mode,
567 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity,
568 allow_unknown_keys=allow_unknown_keys,
569 )
570 self._object_parsers[path] = object_parser
571 return object_parser
573 def add_in_package_context_parser(
574 self,
575 path: str,
576 delegate: DeclarativeInputParser[Any],
577 ) -> None:
578 assert path not in self._in_package_context_parser
579 assert path not in self._object_parsers
580 self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
582 @property
583 def dispatchable_table_parsers(
584 self,
585 ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]:
586 return self._table_parsers
588 @property
589 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
590 return self._object_parsers
592 def dispatch_parser_table_for(
593 self, rule_type: TTP
594 ) -> Optional[DispatchingTableParser[TP]]:
595 return cast(
596 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
597 )
599 def generate_parser(
600 self,
601 parsed_content: Type[TD],
602 *,
603 source_content: Optional[SF] = None,
604 allow_optional: bool = False,
605 inline_reference_documentation: Optional[ParserDocumentation] = None,
606 expected_debputy_integration_mode: Optional[
607 Container[DebputyIntegrationMode]
608 ] = None,
609 automatic_docs: Optional[
610 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]]
611 ] = None,
612 ) -> DeclarativeInputParser[TD]:
613 """Derive a parser from a TypedDict
615 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
616 or two that are used as a description.
618 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
619 their types. As an example:
621 >>> class InstallDocsRule(DebputyParsedContent):
622 ... sources: List[str]
623 ... into: List[str]
624 >>> pg = ParserGenerator()
625 >>> simple_parser = pg.generate_parser(InstallDocsRule)
627 This will create a parser that would be able to interpret something like:
629 ```yaml
630 install-docs:
631 sources: ["docs/*"]
632 into: ["my-pkg"]
633 ```
635 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore,
636 you can also provide a TypedDict describing the input, enabling more flexibility:
638 >>> class InstallDocsRule(DebputyParsedContent):
639 ... sources: List[str]
640 ... into: List[str]
641 >>> class InputDocsRuleInputFormat(TypedDict):
642 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
643 ... sources: NotRequired[List[str]]
644 ... into: Union[str, List[str]]
645 >>> pg = ParserGenerator()
646 >>> flexible_parser = pg.generate_parser(
647 ... InstallDocsRule,
648 ... source_content=InputDocsRuleInputFormat,
649 ... )
651 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
652 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
653 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str
654 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that
655 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
656 both the previous input but also the following input:
658 ```yaml
659 install-docs:
660 source: "docs/*"
661 into: "my-pkg"
662 ```
664 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
665 with a single string in them. As noted above, the name of the `source` attribute will also be normalized
666 while parsing.
668 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
669 as part of the input. Example:
671 >>> class DiscardRule(DebputyParsedContent):
672 ... paths: List[str]
673 >>> class DiscardRuleInputDictFormat(TypedDict):
674 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
675 ... paths: NotRequired[List[str]]
676 >>> # This format relies on DiscardRule having exactly one Required attribute
677 >>> DiscardRuleInputWithAltFormat = Union[
678 ... DiscardRuleInputDictFormat,
679 ... str,
680 ... List[str],
681 ... ]
682 >>> pg = ParserGenerator()
683 >>> flexible_parser = pg.generate_parser(
684 ... DiscardRule,
685 ... source_content=DiscardRuleInputWithAltFormat,
686 ... )
689 Supported types:
690 * `List` - must have a fixed type argument (such as `List[str]`)
691 * `str`
692 * `int`
693 * `BinaryPackage` - When provided (or required), the user must provide a package name listed
694 in the debian/control file. The code receives the BinaryPackage instance
695 matching that input.
696 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
697 format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
698 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
699 available statically on all Debian systems (must be in `base-passwd`).
700 The user has multiple options for how to specify it (either via name or id).
701 * `FileSystemGroup` - When provided (or required), the user must a file system group that is
702 available statically on all Debian systems (must be in `base-passwd`).
703 The user has multiple options for how to specify it (either via name or id).
704 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
705 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
706 provides the `debputy' default `when` parameter for conditionals.
708 Supported special type-like parameters:
710 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
711 outermost level. Cannot vary between `parsed_content` and `source_content`.
712 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
713 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
714 Automapping (see below) is restricted to two members in the Union.
716 Notable non-supported types:
717 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
718 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
720 Automatic mapping rules from `source_content` to `parsed_content`:
721 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
722 `lambda value: value if isinstance(value, list) else [value]`
723 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
725 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
726 for concrete features that may be useful to you.
728 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
729 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers).
730 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a
731 `List[TypedDict[...]]`.
732 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
733 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
734 the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
735 Note you should never pass the parsed_content as source_content directly.
736 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
737 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
738 can keep this False).
739 :param inline_reference_documentation: Optionally, programmatic documentation
740 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the
741 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage.
742 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately
743 (resulting in a "compile time" failure rather than a "runtime" failure).
744 :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
745 """
746 orig_parsed_content = parsed_content
747 if source_content is parsed_content: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true
748 raise ValueError(
749 "Do not provide source_content if it is the same as parsed_content"
750 )
751 is_list_wrapped = False
752 if get_origin(orig_parsed_content) == list:
753 parsed_content = get_args(orig_parsed_content)[0]
754 is_list_wrapped = True
756 if isinstance(parsed_content, type) and issubclass(
757 parsed_content, DebputyDispatchableType
758 ):
759 parser = self.dispatch_parser_table_for(parsed_content)
760 if parser is None: 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true
761 raise ValueError(
762 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
763 f" The class {parsed_content.__qualname__} is not a pre-registered type."
764 )
765 # FIXME: Only the list wrapped version has documentation.
766 if is_list_wrapped: 766 ↛ 772line 766 didn't jump to line 772 because the condition on line 766 was always true
767 parser = ListWrappedDeclarativeInputParser(
768 parser,
769 inline_reference_documentation=inline_reference_documentation,
770 expected_debputy_integration_mode=expected_debputy_integration_mode,
771 )
772 return parser
774 if not is_typeddict(parsed_content): 774 ↛ 775line 774 didn't jump to line 775 because the condition on line 774 was never true
775 raise ValueError(
776 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
777 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
778 )
779 if is_list_wrapped and source_content is not None:
780 if get_origin(source_content) != list: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true
781 raise ValueError(
782 "If the parsed_content is a List type, then source_format must be a List type as well."
783 )
784 source_content = get_args(source_content)[0]
786 target_attributes = self._parse_types(
787 parsed_content,
788 allow_source_attribute_annotations=source_content is None,
789 forbid_optional=not allow_optional,
790 )
791 required_target_parameters = frozenset(parsed_content.__required_keys__)
792 parsed_alt_form = None
793 non_mapping_source_only = False
795 if source_content is not None:
796 default_target_attribute = None
797 if len(required_target_parameters) == 1:
798 default_target_attribute = next(iter(required_target_parameters))
800 source_typed_dict, alt_source_forms = _extract_typed_dict(
801 source_content,
802 default_target_attribute,
803 )
804 if alt_source_forms:
805 parsed_alt_form = self._parse_alt_form(
806 alt_source_forms,
807 default_target_attribute,
808 )
809 if source_typed_dict is not None:
810 source_content_attributes = self._parse_types(
811 source_typed_dict,
812 allow_target_attribute_annotation=True,
813 allow_source_attribute_annotations=True,
814 forbid_optional=not allow_optional,
815 )
816 source_content_parameter = "source_content"
817 source_and_parsed_differs = True
818 else:
819 source_typed_dict = parsed_content
820 source_content_attributes = target_attributes
821 source_content_parameter = "parsed_content"
822 source_and_parsed_differs = True
823 non_mapping_source_only = True
824 else:
825 source_typed_dict = parsed_content
826 source_content_attributes = target_attributes
827 source_content_parameter = "parsed_content"
828 source_and_parsed_differs = False
830 sources = collections.defaultdict(set)
831 seen_targets = set()
832 seen_source_names: Dict[str, str] = {}
833 source_attributes: Dict[str, AttributeDescription] = {}
834 path_hint_source_attributes = []
836 for k in source_content_attributes:
837 ia = source_content_attributes[k]
839 ta = (
840 target_attributes.get(ia.target_attribute)
841 if source_and_parsed_differs
842 else ia
843 )
844 if ta is None: 844 ↛ 846line 844 didn't jump to line 846 because the condition on line 844 was never true
845 # Error message would be wrong if this assertion is false.
846 assert source_and_parsed_differs
847 raise ValueError(
848 f'The attribute "{k}" from the "source_content" parameter should have mapped'
849 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
850 )
851 if _is_path_attribute_candidate(ia, ta):
852 path_hint_source_attributes.append(ia.source_attribute_name)
853 existing_source_name = seen_source_names.get(ia.source_attribute_name)
854 if existing_source_name: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true
855 raise ValueError(
856 f'The attribute "{k}" and "{existing_source_name}" both share the source name'
857 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
858 f' so only one attribute use "{ia.source_attribute_name}".'
859 )
860 seen_source_names[ia.source_attribute_name] = k
861 seen_targets.add(ta.target_attribute)
862 sources[ia.target_attribute].add(k)
863 if source_and_parsed_differs:
864 bridge_mapper = self._type_normalize(
865 k, ia.attribute_type, ta.attribute_type, False
866 )
867 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
868 source_attributes[k] = ia
870 def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]:
871 return frozenset(
872 source_content_attributes[a].source_attribute_name for a in td_name
873 )
875 _check_attributes(
876 parsed_content,
877 source_typed_dict,
878 source_content_attributes,
879 sources,
880 )
882 at_least_one_of = frozenset(
883 _as_attr_names(g)
884 for k, g in sources.items()
885 if len(g) > 1 and k in required_target_parameters
886 )
888 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 missing = ", ".join(
890 repr(k) for k in (target_attributes.keys() - seen_targets)
891 )
892 raise ValueError(
893 'The following attributes in "parsed_content" did not have a source field in "source_content":'
894 f" {missing}"
895 )
896 all_mutually_exclusive_fields = frozenset(
897 _as_attr_names(g) for g in sources.values() if len(g) > 1
898 )
900 all_parameters = (
901 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
902 )
903 _check_conflicts(
904 source_content_attributes,
905 source_typed_dict.__required_keys__,
906 all_parameters,
907 )
909 manifest_attributes = {
910 a.source_attribute_name: a for a in source_content_attributes.values()
911 }
913 if parsed_alt_form is not None:
914 target_attribute = parsed_alt_form.target_attribute
915 if ( 915 ↛ 920line 915 didn't jump to line 920 because the condition on line 915 was never true
916 target_attribute not in required_target_parameters
917 and required_target_parameters
918 or len(required_target_parameters) > 1
919 ):
920 raise NotImplementedError(
921 "When using alternative source formats (Union[TypedDict, ...]), then the"
922 " target must have at most one require parameter"
923 )
924 bridge_mapper = self._type_normalize(
925 target_attribute,
926 parsed_alt_form.attribute_type,
927 target_attributes[target_attribute].attribute_type,
928 False,
929 )
930 parsed_alt_form.type_validator = (
931 parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
932 )
934 inline_reference_documentation = (
935 _verify_and_auto_correct_inline_reference_documentation(
936 parsed_content,
937 source_typed_dict,
938 source_content_attributes,
939 inline_reference_documentation,
940 parsed_alt_form is not None,
941 automatic_docs,
942 )
943 )
944 if non_mapping_source_only:
945 parser = DeclarativeNonMappingInputParser(
946 assume_not_none(parsed_alt_form),
947 inline_reference_documentation=inline_reference_documentation,
948 expected_debputy_integration_mode=expected_debputy_integration_mode,
949 )
950 else:
951 parser = DeclarativeMappingInputParser(
952 _as_attr_names(source_typed_dict.__required_keys__),
953 _as_attr_names(all_parameters),
954 manifest_attributes,
955 source_attributes,
956 mutually_exclusive_attributes=all_mutually_exclusive_fields,
957 alt_form_parser=parsed_alt_form,
958 at_least_one_of=at_least_one_of,
959 inline_reference_documentation=inline_reference_documentation,
960 path_hint_source_attributes=tuple(path_hint_source_attributes),
961 expected_debputy_integration_mode=expected_debputy_integration_mode,
962 )
963 if is_list_wrapped:
964 parser = ListWrappedDeclarativeInputParser(
965 parser,
966 expected_debputy_integration_mode=expected_debputy_integration_mode,
967 )
968 return parser
970 def _as_type_validator(
971 self,
972 attribute: str,
973 provided_type: Any,
974 parsing_typed_dict_attribute: bool,
975 ) -> AttributeTypeHandler:
976 assert not isinstance(provided_type, tuple)
978 if isinstance(provided_type, type) and issubclass(
979 provided_type, DebputyDispatchableType
980 ):
981 return _dispatch_parser(provided_type)
983 unmapped_type = self._strip_mapped_types(
984 provided_type,
985 parsing_typed_dict_attribute,
986 )
987 type_normalizer = self._type_normalize(
988 attribute,
989 unmapped_type,
990 provided_type,
991 parsing_typed_dict_attribute,
992 )
993 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type(
994 unmapped_type,
995 parsing_typed_dict_attribute,
996 )
997 _, t_provided_orig, t_provided_args = unpack_type(
998 provided_type,
999 parsing_typed_dict_attribute,
1000 )
1002 if ( 1002 ↛ 1008line 1002 didn't jump to line 1008 because the condition on line 1002 was never true
1003 t_unmapped_orig == Union
1004 and t_unmapped_args
1005 and len(t_unmapped_args) == 2
1006 and any(v is _NONE_TYPE for v in t_unmapped_args)
1007 ):
1008 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
1009 actual_type = [a for a in args if a is not _NONE_TYPE][0]
1010 validator = self._as_type_validator(
1011 attribute, actual_type, parsing_typed_dict_attribute
1012 )
1014 def _validator(v: Any, path: AttributePath) -> None:
1015 if v is None:
1016 return
1017 validator.ensure_type(v, path)
1019 return AttributeTypeHandler(
1020 validator.describe_type(),
1021 _validator,
1022 base_type=validator.base_type,
1023 mapper=type_normalizer,
1024 )
1026 if unmapped_type in BASIC_SIMPLE_TYPES:
1027 type_name = BASIC_SIMPLE_TYPES[unmapped_type]
1029 type_mapping = self._registered_types.get(provided_type)
1030 if type_mapping is not None:
1031 simple_type = f" ({type_name})"
1032 type_name = type_mapping.target_type.__name__
1033 else:
1034 simple_type = ""
1036 def _validator(v: Any, path: AttributePath) -> None:
1037 if not isinstance(v, unmapped_type):
1038 _validation_type_error(
1039 path, f"The attribute must be a {type_name}{simple_type}"
1040 )
1042 return AttributeTypeHandler(
1043 type_name,
1044 _validator,
1045 base_type=unmapped_type,
1046 mapper=type_normalizer,
1047 )
1048 if t_unmapped_orig == list:
1049 if not t_unmapped_args: 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true
1050 raise ValueError(
1051 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
1052 )
1054 genetic_type = t_unmapped_args[0]
1055 key_mapper = self._as_type_validator(
1056 attribute,
1057 genetic_type,
1058 parsing_typed_dict_attribute,
1059 )
1061 def _validator(v: Any, path: AttributePath) -> None:
1062 if not isinstance(v, list): 1062 ↛ 1063line 1062 didn't jump to line 1063 because the condition on line 1062 was never true
1063 _validation_type_error(path, "The attribute must be a list")
1064 for i, v in enumerate(v):
1065 key_mapper.ensure_type(v, path[i])
1067 list_mapper = (
1068 map_each_element(key_mapper.mapper)
1069 if key_mapper.mapper is not None
1070 else None
1071 )
1073 return AttributeTypeHandler(
1074 f"List of {key_mapper.describe_type()}",
1075 _validator,
1076 base_type=list,
1077 mapper=type_normalizer,
1078 ).combine_mapper(list_mapper)
1079 if is_typeddict(provided_type):
1080 subparser = self.generate_parser(cast("Type[TD]", provided_type))
1081 return AttributeTypeHandler(
1082 description=f"{provided_type.__name__} (Typed Mapping)",
1083 ensure_type=lambda v, ap: None,
1084 base_type=dict,
1085 mapper=lambda v, ap, cv: subparser.parse_input(
1086 v, ap, parser_context=cv
1087 ),
1088 )
1089 if t_unmapped_orig == dict:
1090 if not t_unmapped_args or len(t_unmapped_args) != 2: 1090 ↛ 1091line 1090 didn't jump to line 1091 because the condition on line 1090 was never true
1091 raise ValueError(
1092 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
1093 )
1094 if t_unmapped_args[0] != str: 1094 ↛ 1095line 1094 didn't jump to line 1095 because the condition on line 1094 was never true
1095 raise ValueError(
1096 f'The attribute "{attribute}" is Dict and has a non-str type as key.'
1097 " Currently, only `str` is supported (Dict[str, Y])"
1098 )
1099 key_mapper = self._as_type_validator(
1100 attribute,
1101 t_unmapped_args[0],
1102 parsing_typed_dict_attribute,
1103 )
1104 value_mapper = self._as_type_validator(
1105 attribute,
1106 t_unmapped_args[1],
1107 parsing_typed_dict_attribute,
1108 )
1110 if key_mapper.base_type is None: 1110 ↛ 1111line 1110 didn't jump to line 1111 because the condition on line 1110 was never true
1111 raise ValueError(
1112 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
1113 f" without trivial base types (such as `str`) are not supported at the moment."
1114 )
1116 if value_mapper.mapper is not None: 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true
1117 raise ValueError(
1118 f'The attribute "{attribute}" is Dict and the value requires mapping.'
1119 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
1120 " Better typing may come later"
1121 )
1123 def _validator(uv: Any, path: AttributePath) -> None:
1124 if not isinstance(uv, dict): 1124 ↛ 1125line 1124 didn't jump to line 1125 because the condition on line 1124 was never true
1125 _validation_type_error(path, "The attribute must be a mapping")
1126 key_name = "the first key in the mapping"
1127 for i, (k, v) in enumerate(uv.items()):
1128 if not key_mapper.base_type_match(k): 1128 ↛ 1129line 1128 didn't jump to line 1129 because the condition on line 1128 was never true
1129 kp = path.copy_with_path_hint(key_name)
1130 _validation_type_error(
1131 kp,
1132 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
1133 )
1134 key_name = f"the key after {k}"
1135 value_mapper.ensure_type(v, path[k])
1137 return AttributeTypeHandler(
1138 f"Mapping of {value_mapper.describe_type()}",
1139 _validator,
1140 base_type=dict,
1141 mapper=type_normalizer,
1142 ).combine_mapper(key_mapper.mapper)
1143 if t_unmapped_orig == Union:
1144 if _is_two_arg_x_list_x(t_provided_args):
1145 # Force the order to be "X, List[X]" as it simplifies the code
1146 x_list_x = (
1147 t_provided_args
1148 if get_origin(t_provided_args[1]) == list
1149 else (t_provided_args[1], t_provided_args[0])
1150 )
1152 # X, List[X] could match if X was List[Y]. However, our code below assumes
1153 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
1154 # case to avoid this assert and fall into the "generic case".
1155 assert get_origin(x_list_x[0]) != list
1156 x_subtype_checker = self._as_type_validator(
1157 attribute,
1158 x_list_x[0],
1159 parsing_typed_dict_attribute,
1160 )
1161 list_x_subtype_checker = self._as_type_validator(
1162 attribute,
1163 x_list_x[1],
1164 parsing_typed_dict_attribute,
1165 )
1166 type_description = x_subtype_checker.describe_type()
1167 type_description = f"{type_description} or a list of {type_description}"
1169 def _validator(v: Any, path: AttributePath) -> None:
1170 if isinstance(v, list):
1171 list_x_subtype_checker.ensure_type(v, path)
1172 else:
1173 x_subtype_checker.ensure_type(v, path)
1175 return AttributeTypeHandler(
1176 type_description,
1177 _validator,
1178 mapper=type_normalizer,
1179 )
1180 else:
1181 subtype_checker = [
1182 self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
1183 for a in t_unmapped_args
1184 ]
1185 type_description = "one-of: " + ", ".join(
1186 f"{sc.describe_type()}" for sc in subtype_checker
1187 )
1188 mapper = subtype_checker[0].mapper
1189 if any(mapper != sc.mapper for sc in subtype_checker): 1189 ↛ 1190line 1189 didn't jump to line 1190 because the condition on line 1189 was never true
1190 raise ValueError(
1191 f'Cannot handle the union "{provided_type}" as the target types need different'
1192 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
1193 " where X is a non-collection type."
1194 )
1196 def _validator(v: Any, path: AttributePath) -> None:
1197 partial_matches = []
1198 for sc in subtype_checker: 1198 ↛ 1206line 1198 didn't jump to line 1206 because the loop on line 1198 didn't complete
1199 try:
1200 sc.ensure_type(v, path)
1201 return
1202 except ManifestParseException as e:
1203 if sc.base_type_match(v): 1203 ↛ 1204line 1203 didn't jump to line 1204 because the condition on line 1203 was never true
1204 partial_matches.append((sc, e))
1206 if len(partial_matches) == 1:
1207 raise partial_matches[0][1]
1208 _validation_type_error(
1209 path, f"Could not match against: {type_description}"
1210 )
1212 return AttributeTypeHandler(
1213 type_description,
1214 _validator,
1215 mapper=type_normalizer,
1216 )
1217 if t_unmapped_orig == Literal:
1218 # We want "x" for string values; repr provides 'x'
1219 pretty = ", ".join(
1220 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args
1221 )
1223 def _validator(v: Any, path: AttributePath) -> None:
1224 if v not in t_unmapped_args:
1225 value_hint = ""
1226 if isinstance(v, str): 1226 ↛ 1228line 1226 didn't jump to line 1228 because the condition on line 1226 was always true
1227 value_hint = f"({v}) "
1228 _validation_type_error(
1229 path,
1230 f"Value {value_hint}must be one of the following literal values: {pretty}",
1231 )
1233 return AttributeTypeHandler(
1234 f"One of the following literal values: {pretty}",
1235 _validator,
1236 )
1238 if provided_type == Any: 1238 ↛ 1243line 1238 didn't jump to line 1243 because the condition on line 1238 was always true
1239 return AttributeTypeHandler(
1240 "any (unvalidated)",
1241 lambda *a: None,
1242 )
1243 raise ValueError(
1244 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
1245 )
1247 def _parse_types(
1248 self,
1249 spec: Type[TypedDict],
1250 allow_target_attribute_annotation: bool = False,
1251 allow_source_attribute_annotations: bool = False,
1252 forbid_optional: bool = True,
1253 ) -> Dict[str, AttributeDescription]:
1254 annotations = get_type_hints(spec, include_extras=True)
1255 return {
1256 k: self._attribute_description(
1257 k,
1258 t,
1259 k in spec.__required_keys__,
1260 allow_target_attribute_annotation=allow_target_attribute_annotation,
1261 allow_source_attribute_annotations=allow_source_attribute_annotations,
1262 forbid_optional=forbid_optional,
1263 )
1264 for k, t in annotations.items()
1265 }
1267 def _attribute_description(
1268 self,
1269 attribute: str,
1270 orig_td: Any,
1271 is_required: bool,
1272 forbid_optional: bool = True,
1273 allow_target_attribute_annotation: bool = False,
1274 allow_source_attribute_annotations: bool = False,
1275 ) -> AttributeDescription:
1276 td, anno, is_optional = _parse_type(
1277 attribute, orig_td, forbid_optional=forbid_optional
1278 )
1279 type_validator = self._as_type_validator(attribute, td, True)
1280 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1281 anno,
1282 f' Seen with attribute "{attribute}".',
1283 attribute,
1284 is_required,
1285 allow_target_attribute_annotation=allow_target_attribute_annotation,
1286 allow_source_attribute_annotations=allow_source_attribute_annotations,
1287 )
1288 return AttributeDescription(
1289 target_attribute=parsed_annotations.target_attribute,
1290 attribute_type=td,
1291 type_validator=type_validator,
1292 annotations=anno,
1293 is_optional=is_optional,
1294 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1295 conditional_required=parsed_annotations.conditional_required,
1296 source_attribute_name=assume_not_none(
1297 parsed_annotations.source_manifest_attribute
1298 ),
1299 parse_hints=parsed_annotations,
1300 )
1302 def _parse_alt_form(
1303 self,
1304 alt_form,
1305 default_target_attribute: Optional[str],
1306 ) -> AttributeDescription:
1307 td, anno, is_optional = _parse_type(
1308 "source_format alternative form",
1309 alt_form,
1310 forbid_optional=True,
1311 parsing_typed_dict_attribute=False,
1312 )
1313 type_validator = self._as_type_validator(
1314 "source_format alternative form",
1315 td,
1316 True,
1317 )
1318 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1319 anno,
1320 " The alternative for source_format.",
1321 None,
1322 False,
1323 default_target_attribute=default_target_attribute,
1324 allow_target_attribute_annotation=True,
1325 allow_source_attribute_annotations=False,
1326 )
1327 return AttributeDescription(
1328 target_attribute=parsed_annotations.target_attribute,
1329 attribute_type=td,
1330 type_validator=type_validator,
1331 annotations=anno,
1332 is_optional=is_optional,
1333 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1334 conditional_required=parsed_annotations.conditional_required,
1335 source_attribute_name="Alt form of the source_format",
1336 )
1338 def _union_narrowing(
1339 self,
1340 input_type: Any,
1341 target_type: Any,
1342 parsing_typed_dict_attribute: bool,
1343 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]:
1344 _, input_orig, input_args = unpack_type(
1345 input_type, parsing_typed_dict_attribute
1346 )
1347 _, target_orig, target_args = unpack_type(
1348 target_type, parsing_typed_dict_attribute
1349 )
1351 if input_orig != Union or not input_args: 1351 ↛ 1352line 1351 didn't jump to line 1352 because the condition on line 1351 was never true
1352 raise ValueError("input_type must be a Union[...] with non-empty args")
1354 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
1355 # - Where X = Y or there is a simple standard transformation from X to Y.
1357 if target_orig not in (Union, list) or not target_args:
1358 # Not supported
1359 return None
1361 if target_orig == Union and set(input_args) == set(target_args): 1361 ↛ 1363line 1361 didn't jump to line 1363 because the condition on line 1361 was never true
1362 # Not needed (identity mapping)
1363 return None
1365 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1365 ↛ 1367line 1365 didn't jump to line 1367 because the condition on line 1365 was never true
1366 # Not supported
1367 return None
1369 target_arg = target_args[0]
1370 simplified_type = self._strip_mapped_types(
1371 target_arg, parsing_typed_dict_attribute
1372 )
1373 acceptable_types = {
1374 target_arg,
1375 List[target_arg], # type: ignore
1376 simplified_type,
1377 List[simplified_type], # type: ignore
1378 }
1379 target_format = (
1380 target_arg,
1381 List[target_arg], # type: ignore
1382 )
1383 in_target_format = 0
1384 in_simple_format = 0
1385 for input_arg in input_args:
1386 if input_arg not in acceptable_types: 1386 ↛ 1388line 1386 didn't jump to line 1388 because the condition on line 1386 was never true
1387 # Not supported
1388 return None
1389 if input_arg in target_format:
1390 in_target_format += 1
1391 else:
1392 in_simple_format += 1
1394 assert in_simple_format or in_target_format
1396 if in_target_format and not in_simple_format:
1397 # Union[X, List[X]] -> List[X]
1398 return normalize_into_list
1399 mapped = self._registered_types[target_arg]
1400 if not in_target_format and in_simple_format: 1400 ↛ 1415line 1400 didn't jump to line 1415 because the condition on line 1400 was always true
1401 # Union[X, List[X]] -> List[Y]
1403 def _mapper_x_list_y(
1404 x: Union[Any, List[Any]],
1405 ap: AttributePath,
1406 pc: Optional["ParserContextData"],
1407 ) -> List[Any]:
1408 in_list_form: List[Any] = normalize_into_list(x, ap, pc)
1410 return [mapped.mapper(x, ap, pc) for x in in_list_form]
1412 return _mapper_x_list_y
1414 # Union[Y, List[X]] -> List[Y]
1415 if not isinstance(target_arg, type):
1416 raise ValueError(
1417 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
1418 f" not support mixed types. Please use either {simplified_type} or {target_arg}"
1419 f" in the source content (but both a mix of both)"
1420 )
1422 def _mapper_mixed_list_y(
1423 x: Union[Any, List[Any]],
1424 ap: AttributePath,
1425 pc: Optional["ParserContextData"],
1426 ) -> List[Any]:
1427 in_list_form: List[Any] = normalize_into_list(x, ap, pc)
1429 return [
1430 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
1431 for x in in_list_form
1432 ]
1434 return _mapper_mixed_list_y
1436 def _type_normalize(
1437 self,
1438 attribute: str,
1439 input_type: Any,
1440 target_type: Any,
1441 parsing_typed_dict_attribute: bool,
1442 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]:
1443 if input_type == target_type:
1444 return None
1445 _, input_orig, input_args = unpack_type(
1446 input_type, parsing_typed_dict_attribute
1447 )
1448 _, target_orig, target_args = unpack_type(
1449 target_type,
1450 parsing_typed_dict_attribute,
1451 )
1452 if input_orig == Union:
1453 result = self._union_narrowing(
1454 input_type, target_type, parsing_typed_dict_attribute
1455 )
1456 if result:
1457 return result
1458 elif target_orig == list and target_args[0] == input_type:
1459 return wrap_into_list
1461 mapped = self._registered_types.get(target_type)
1462 if mapped is not None and input_type == mapped.source_type:
1463 # Source -> Target
1464 return mapped.mapper
1465 if target_orig == list and target_args: 1465 ↛ 1483line 1465 didn't jump to line 1483 because the condition on line 1465 was always true
1466 mapped = self._registered_types.get(target_args[0])
1467 if mapped is not None: 1467 ↛ 1483line 1467 didn't jump to line 1483 because the condition on line 1467 was always true
1468 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
1469 mapped_type: TypeMapping = mapped
1470 if input_type == mapped.source_type: 1470 ↛ 1472line 1470 didn't jump to line 1472 because the condition on line 1470 was never true
1471 # Source -> List[Target]
1472 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
1473 if ( 1473 ↛ 1483line 1473 didn't jump to line 1483 because the condition on line 1473 was always true
1474 input_orig == list
1475 and input_args
1476 and input_args[0] == mapped_type.source_type
1477 ):
1478 # List[Source] -> List[Target]
1479 return lambda xs, ap, pc: [
1480 mapped_type.mapper(x, ap, pc) for x in xs
1481 ]
1483 raise ValueError(
1484 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
1485 f" {input_type} to {target_type}"
1486 )
1488 def _strip_mapped_types(
1489 self, orig_td: Any, parsing_typed_dict_attribute: bool
1490 ) -> Any:
1491 m = self._registered_types.get(orig_td)
1492 if m is not None:
1493 return m.source_type
1494 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1495 if v == list:
1496 arg = args[0]
1497 m = self._registered_types.get(arg)
1498 if m:
1499 return List[m.source_type] # type: ignore
1500 if v == Union:
1501 stripped_args = tuple(
1502 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
1503 )
1504 if stripped_args != args:
1505 return Union[stripped_args]
1506 return orig_td
1509def _sort_key(attr: StandardParserAttributeDocumentation) -> Any:
1510 key = next(iter(attr.attributes))
1511 return attr.sort_category, key
1514def _apply_std_docs(
1515 std_doc_table: Optional[
1516 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]]
1517 ],
1518 source_format_typed_dict: Type[Any],
1519 attribute_docs: Optional[Sequence[ParserAttributeDocumentation]],
1520) -> Optional[Sequence[ParserAttributeDocumentation]]:
1521 if std_doc_table is None or not std_doc_table: 1521 ↛ 1524line 1521 didn't jump to line 1524 because the condition on line 1521 was always true
1522 return attribute_docs
1524 has_docs_for = set()
1525 if attribute_docs:
1526 for attribute_doc in attribute_docs:
1527 has_docs_for.update(attribute_doc.attributes)
1529 base_seen = set()
1530 std_docs_used = []
1532 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", []))
1533 base_seen.update(remaining_bases)
1534 while remaining_bases:
1535 base = remaining_bases.pop()
1536 new_bases_to_check = {
1537 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen
1538 }
1539 remaining_bases.update(new_bases_to_check)
1540 base_seen.update(new_bases_to_check)
1541 std_docs = std_doc_table.get(base)
1542 if std_docs:
1543 for std_doc in std_docs:
1544 if any(a in has_docs_for for a in std_doc.attributes):
1545 # If there is any overlap, do not add the docs
1546 continue
1547 has_docs_for.update(std_doc.attributes)
1548 std_docs_used.append(std_doc)
1550 if not std_docs_used:
1551 return attribute_docs
1552 docs = sorted(std_docs_used, key=_sort_key)
1553 if attribute_docs:
1554 # Plugin provided attributes first
1555 c = list(attribute_docs)
1556 c.extend(docs)
1557 docs = c
1558 return tuple(docs)
1561def _verify_and_auto_correct_inline_reference_documentation(
1562 parsed_content: Type[TD],
1563 source_typed_dict: Type[Any],
1564 source_content_attributes: Mapping[str, AttributeDescription],
1565 inline_reference_documentation: Optional[ParserDocumentation],
1566 has_alt_form: bool,
1567 automatic_docs: Optional[
1568 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]]
1569 ] = None,
1570) -> Optional[ParserDocumentation]:
1571 orig_attribute_docs = (
1572 inline_reference_documentation.attribute_doc
1573 if inline_reference_documentation
1574 else None
1575 )
1576 attribute_docs = _apply_std_docs(
1577 automatic_docs,
1578 source_typed_dict,
1579 orig_attribute_docs,
1580 )
1581 if inline_reference_documentation is None and attribute_docs is None:
1582 return None
1583 changes = {}
1584 if attribute_docs:
1585 seen = set()
1586 had_any_custom_docs = False
1587 for attr_doc in attribute_docs:
1588 if not isinstance(attr_doc, StandardParserAttributeDocumentation):
1589 had_any_custom_docs = True
1590 for attr_name in attr_doc.attributes:
1591 attr = source_content_attributes.get(attr_name)
1592 if attr is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true
1593 raise ValueError(
1594 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1595 f' references an attribute "{attr_name}", which does not exist in the source format.'
1596 )
1597 if attr_name in seen: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true
1598 raise ValueError(
1599 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1600 f' has documentation for "{attr_name}" twice, which is not supported.'
1601 f" Please document it at most once"
1602 )
1603 seen.add(attr_name)
1604 undocumented = source_content_attributes.keys() - seen
1605 if undocumented: 1605 ↛ 1606line 1605 didn't jump to line 1606 because the condition on line 1605 was never true
1606 if had_any_custom_docs:
1607 undocumented_attrs = ", ".join(undocumented)
1608 raise ValueError(
1609 f"The following attributes were not documented for the source format of"
1610 f" {parsed_content.__qualname__}. If this is deliberate, then please"
1611 ' declare each them as undocumented (via undocumented_attr("foo")):'
1612 f" {undocumented_attrs}"
1613 )
1614 combined_docs = list(attribute_docs)
1615 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented))
1616 attribute_docs = combined_docs
1618 if attribute_docs and orig_attribute_docs != attribute_docs: 1618 ↛ 1619line 1618 didn't jump to line 1619 because the condition on line 1618 was never true
1619 assert attribute_docs is not None
1620 changes["attribute_doc"] = tuple(attribute_docs)
1622 if ( 1622 ↛ 1627line 1622 didn't jump to line 1627 because the condition on line 1622 was never true
1623 inline_reference_documentation is not None
1624 and inline_reference_documentation.alt_parser_description
1625 and not has_alt_form
1626 ):
1627 raise ValueError(
1628 "The inline_reference_documentation had documentation for an non-mapping format,"
1629 " but the source format does not have a non-mapping format."
1630 )
1631 if changes: 1631 ↛ 1632line 1631 didn't jump to line 1632 because the condition on line 1631 was never true
1632 if inline_reference_documentation is None:
1633 inline_reference_documentation = reference_documentation()
1634 return inline_reference_documentation.replace(**changes)
1635 return inline_reference_documentation
1638def _check_conflicts(
1639 input_content_attributes: Dict[str, AttributeDescription],
1640 required_attributes: FrozenSet[str],
1641 all_attributes: FrozenSet[str],
1642) -> None:
1643 for attr_name, attr in input_content_attributes.items():
1644 if attr_name in required_attributes and attr.conflicting_attributes: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true
1645 c = ", ".join(repr(a) for a in attr.conflicting_attributes)
1646 raise ValueError(
1647 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
1648 " This makes it impossible to use these attributes. Either remove the attributes"
1649 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
1650 " optional (NotRequired)"
1651 )
1652 else:
1653 required_conflicts = attr.conflicting_attributes & required_attributes
1654 if required_conflicts: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the condition on line 1654 was never true
1655 c = ", ".join(repr(a) for a in required_conflicts)
1656 raise ValueError(
1657 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
1658 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
1659 f" adjust the conflicts or make the listed attributes optional (NotRequired)"
1660 )
1661 unknown_attributes = attr.conflicting_attributes - all_attributes
1662 if unknown_attributes: 1662 ↛ 1663line 1662 didn't jump to line 1663 because the condition on line 1662 was never true
1663 c = ", ".join(repr(a) for a in unknown_attributes)
1664 raise ValueError(
1665 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
1666 f" None of these attributes were declared in the input."
1667 )
1670def _check_attributes(
1671 content: Type[TypedDict],
1672 input_content: Type[TypedDict],
1673 input_content_attributes: Dict[str, AttributeDescription],
1674 sources: Mapping[str, Collection[str]],
1675) -> None:
1676 target_required_keys = content.__required_keys__
1677 input_required_keys = input_content.__required_keys__
1678 all_input_keys = input_required_keys | input_content.__optional_keys__
1680 for input_name in all_input_keys:
1681 attr = input_content_attributes[input_name]
1682 target_name = attr.target_attribute
1683 source_names = sources[target_name]
1684 input_is_required = input_name in input_required_keys
1685 target_is_required = target_name in target_required_keys
1687 assert source_names
1689 if input_is_required and len(source_names) > 1: 1689 ↛ 1690line 1689 didn't jump to line 1690 because the condition on line 1689 was never true
1690 raise ValueError(
1691 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
1692 f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
1693 f' then there is no need for additional sources for "{target_name}". Alternatively,'
1694 f' "{input_name}" might be missing a NotRequired type'
1695 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
1696 )
1697 if not input_is_required and target_is_required and len(source_names) == 1: 1697 ↛ 1698line 1697 didn't jump to line 1698 because the condition on line 1697 was never true
1698 raise ValueError(
1699 f'The source attribute "{input_name}" is not marked as required and maps to'
1700 f' "{target_name}", which is marked as required. As there are no other attributes'
1701 f' mapping to "{target_name}", then "{input_name}" must be required as well'
1702 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
1703 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
1704 )
1707def _validation_type_error(path: AttributePath, message: str) -> None:
1708 raise ManifestParseException(
1709 f'The attribute "{path.path}" did not have a valid structure/type: {message}'
1710 )
1713def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool:
1714 if len(t_args) != 2:
1715 return False
1716 lhs, rhs = t_args
1717 if get_origin(lhs) == list:
1718 if get_origin(rhs) == list: 1718 ↛ 1721line 1718 didn't jump to line 1721 because the condition on line 1718 was never true
1719 # It could still match X, List[X] - but we do not allow this case for now as the caller
1720 # does not support it.
1721 return False
1722 l_args = get_args(lhs)
1723 return bool(l_args and l_args[0] == rhs)
1724 if get_origin(rhs) == list:
1725 r_args = get_args(rhs)
1726 return bool(r_args and r_args[0] == lhs)
1727 return False
1730def _extract_typed_dict(
1731 base_type,
1732 default_target_attribute: Optional[str],
1733) -> Tuple[Optional[Type[TypedDict]], Any]:
1734 if is_typeddict(base_type):
1735 return base_type, None
1736 _, origin, args = unpack_type(base_type, False)
1737 if origin != Union:
1738 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1738 ↛ 1739line 1738 didn't jump to line 1739 because the condition on line 1738 was never true
1739 raise ValueError(
1740 "The source_format cannot be nor contain a (non-TypedDict) dict"
1741 )
1742 return None, base_type
1743 typed_dicts = [x for x in args if is_typeddict(x)]
1744 if len(typed_dicts) > 1: 1744 ↛ 1745line 1744 didn't jump to line 1745 because the condition on line 1744 was never true
1745 raise ValueError(
1746 "When source_format is a Union, it must contain at most one TypedDict"
1747 )
1748 typed_dict = typed_dicts[0] if typed_dicts else None
1750 if any(x is None or x is _NONE_TYPE for x in args): 1750 ↛ 1751line 1750 didn't jump to line 1751 because the condition on line 1750 was never true
1751 raise ValueError(
1752 "The source_format cannot be nor contain Optional[X] or Union[X, None]"
1753 )
1755 if any( 1755 ↛ 1760line 1755 didn't jump to line 1760 because the condition on line 1755 was never true
1756 isinstance(x, type) and issubclass(x, (dict, Mapping))
1757 for x in args
1758 if x is not typed_dict
1759 ):
1760 raise ValueError(
1761 "The source_format cannot be nor contain a (non-TypedDict) dict"
1762 )
1763 remaining = [x for x in args if x is not typed_dict]
1764 has_target_attribute = False
1765 anno = None
1766 if len(remaining) == 1: 1766 ↛ 1767line 1766 didn't jump to line 1767 because the condition on line 1766 was never true
1767 base_type, anno, _ = _parse_type(
1768 "source_format alternative form",
1769 remaining[0],
1770 forbid_optional=True,
1771 parsing_typed_dict_attribute=False,
1772 )
1773 has_target_attribute = bool(anno) and any(
1774 isinstance(x, TargetAttribute) for x in anno
1775 )
1776 target_type = base_type
1777 else:
1778 target_type = Union[tuple(remaining)]
1780 if default_target_attribute is None and not has_target_attribute: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true
1781 raise ValueError(
1782 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
1783 " OR the parsed_content format must have exactly one attribute that is required."
1784 )
1785 if anno: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true
1786 final_anno = [target_type]
1787 final_anno.extend(anno)
1788 return typed_dict, Annotated[tuple(final_anno)]
1789 return typed_dict, target_type
1792def _dispatch_parse_generator(
1793 dispatch_type: Type[DebputyDispatchableType],
1794) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
1795 def _dispatch_parse(
1796 value: Any,
1797 attribute_path: AttributePath,
1798 parser_context: Optional["ParserContextData"],
1799 ):
1800 assert parser_context is not None
1801 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
1802 return dispatching_parser.parse_input(
1803 value, attribute_path, parser_context=parser_context
1804 )
1806 return _dispatch_parse
1809def _dispatch_parser(
1810 dispatch_type: Type[DebputyDispatchableType],
1811) -> AttributeTypeHandler:
1812 return AttributeTypeHandler(
1813 dispatch_type.__name__,
1814 lambda *a: None,
1815 mapper=_dispatch_parse_generator(dispatch_type),
1816 )
1819def _parse_type(
1820 attribute: str,
1821 orig_td: Any,
1822 forbid_optional: bool = True,
1823 parsing_typed_dict_attribute: bool = True,
1824) -> Tuple[Any, Tuple[Any, ...], bool]:
1825 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1826 md: Tuple[Any, ...] = tuple()
1827 optional = False
1828 if v is not None:
1829 if v == Annotated:
1830 anno = get_args(td)
1831 md = anno[1:]
1832 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
1834 if td is _NONE_TYPE: 1834 ↛ 1835line 1834 didn't jump to line 1835 because the condition on line 1834 was never true
1835 raise ValueError(
1836 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
1837 " debputy manifest, so this attribute does not make sense in its current form."
1838 )
1839 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1839 ↛ 1840line 1839 didn't jump to line 1840 because the condition on line 1839 was never true
1840 raise ValueError(
1841 f'Detected use of Optional in "{attribute}", which is not allowed here.'
1842 " Please use NotRequired for optional fields"
1843 )
1845 return td, md, optional
1848def _normalize_attribute_name(attribute: str) -> str:
1849 if attribute.endswith("_"):
1850 attribute = attribute[:-1]
1851 return attribute.replace("_", "-")
1854@dataclasses.dataclass
1855class DetectedDebputyParseHint:
1856 target_attribute: str
1857 source_manifest_attribute: Optional[str]
1858 conflict_with_source_attributes: FrozenSet[str]
1859 conditional_required: Optional[ConditionalRequired]
1860 applicable_as_path_hint: bool
1862 @classmethod
1863 def parse_annotations(
1864 cls,
1865 anno: Tuple[Any, ...],
1866 error_context: str,
1867 default_attribute_name: Optional[str],
1868 is_required: bool,
1869 default_target_attribute: Optional[str] = None,
1870 allow_target_attribute_annotation: bool = False,
1871 allow_source_attribute_annotations: bool = False,
1872 ) -> "DetectedDebputyParseHint":
1873 target_attr_anno = find_annotation(anno, TargetAttribute)
1874 if target_attr_anno:
1875 if not allow_target_attribute_annotation: 1875 ↛ 1876line 1875 didn't jump to line 1876 because the condition on line 1875 was never true
1876 raise ValueError(
1877 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
1878 )
1879 target_attribute = target_attr_anno.attribute
1880 elif default_target_attribute is not None:
1881 target_attribute = default_target_attribute
1882 elif default_attribute_name is not None: 1882 ↛ 1885line 1882 didn't jump to line 1885 because the condition on line 1882 was always true
1883 target_attribute = default_attribute_name
1884 else:
1885 if default_attribute_name is None:
1886 raise ValueError(
1887 "allow_target_attribute_annotation must be True OR "
1888 "default_attribute_name/default_target_attribute must be not None"
1889 )
1890 raise ValueError(
1891 f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
1892 )
1893 source_attribute_anno = find_annotation(anno, ManifestAttribute)
1894 _source_attribute_allowed(
1895 allow_source_attribute_annotations, error_context, source_attribute_anno
1896 )
1897 if source_attribute_anno:
1898 source_attribute_name = source_attribute_anno.attribute
1899 elif default_attribute_name is not None:
1900 source_attribute_name = _normalize_attribute_name(default_attribute_name)
1901 else:
1902 source_attribute_name = None
1903 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
1904 if mutual_exclusive_with_anno:
1905 _source_attribute_allowed(
1906 allow_source_attribute_annotations,
1907 error_context,
1908 mutual_exclusive_with_anno,
1909 )
1910 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
1911 else:
1912 conflicting_attributes = frozenset()
1913 conditional_required = find_annotation(anno, ConditionalRequired)
1915 if conditional_required and is_required: 1915 ↛ 1916line 1915 didn't jump to line 1916 because the condition on line 1915 was never true
1916 if default_attribute_name is None:
1917 raise ValueError(
1918 f"is_required cannot be True without default_attribute_name being not None"
1919 )
1920 raise ValueError(
1921 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
1922 ' Please make the attribute "NotRequired" or remove the conditional requirement.'
1923 )
1925 not_path_hint_anno = find_annotation(anno, NotPathHint)
1926 applicable_as_path_hint = not_path_hint_anno is None
1928 return DetectedDebputyParseHint(
1929 target_attribute=target_attribute,
1930 source_manifest_attribute=source_attribute_name,
1931 conflict_with_source_attributes=conflicting_attributes,
1932 conditional_required=conditional_required,
1933 applicable_as_path_hint=applicable_as_path_hint,
1934 )
1937def _source_attribute_allowed(
1938 source_attribute_allowed: bool,
1939 error_context: str,
1940 annotation: Optional[DebputyParseHint],
1941) -> None:
1942 if source_attribute_allowed or annotation is None: 1942 ↛ 1944line 1942 didn't jump to line 1944 because the condition on line 1942 was always true
1943 return
1944 raise ValueError(
1945 f'The annotation "{annotation}" cannot be used here. {error_context}'
1946 )