Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%
800 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import collections
2import dataclasses
3import typing
4from types import UnionType
5from typing import (
6 Any,
7 Tuple,
8 TypedDict,
9 Dict,
10 get_type_hints,
11 Annotated,
12 get_args,
13 get_origin,
14 TypeVar,
15 Generic,
16 FrozenSet,
17 Optional,
18 cast,
19 Type,
20 Union,
21 List,
22 NotRequired,
23 Literal,
24 TYPE_CHECKING,
25)
26from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container
29from debputy.manifest_parser.base_types import FileSystemMatchRule
30from debputy.manifest_parser.exceptions import (
31 ManifestParseException,
32)
33from debputy.manifest_parser.mapper_code import (
34 normalize_into_list,
35 wrap_into_list,
36 map_each_element,
37)
38from debputy.manifest_parser.parse_hints import (
39 ConditionalRequired,
40 DebputyParseHint,
41 TargetAttribute,
42 ManifestAttribute,
43 ConflictWithSourceAttribute,
44 NotPathHint,
45)
46from debputy.manifest_parser.parser_data import ParserContextData
47from debputy.manifest_parser.tagging_types import (
48 DebputyParsedContent,
49 DebputyDispatchableType,
50 TypeMapping,
51)
52from debputy.manifest_parser.util import (
53 AttributePath,
54 unpack_type,
55 find_annotation,
56 check_integration_mode,
57)
58from debputy.plugin.api.impl_types import (
59 DeclarativeInputParser,
60 TD,
61 ListWrappedDeclarativeInputParser,
62 DispatchingObjectParser,
63 DispatchingTableParser,
64 TTP,
65 TP,
66 InPackageContextParser,
67)
68from debputy.plugin.api.spec import (
69 ParserDocumentation,
70 DebputyIntegrationMode,
71 StandardParserAttributeDocumentation,
72 undocumented_attr,
73 ParserAttributeDocumentation,
74 reference_documentation,
75)
76from debputy.util import _info, _warn, assume_not_none
79if TYPE_CHECKING:
80 from debputy.lsp.diagnostics import LintSeverity
83try:
84 from Levenshtein import distance
85except ImportError:
86 _WARN_ONCE = False
88 def _detect_possible_typo(
89 _key: str,
90 _value: object,
91 _manifest_attributes: Mapping[str, "AttributeDescription"],
92 _path: "AttributePath",
93 ) -> None:
94 global _WARN_ONCE
95 if not _WARN_ONCE:
96 _WARN_ONCE = True
97 _info(
98 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
99 )
101else:
103 def _detect_possible_typo(
104 key: str,
105 value: object,
106 manifest_attributes: Mapping[str, "AttributeDescription"],
107 path: "AttributePath",
108 ) -> None:
109 k_len = len(key)
110 key_path = path[key]
111 matches: list[str] = []
112 current_match_strength = 0
113 for acceptable_key, attr in manifest_attributes.items():
114 if abs(k_len - len(acceptable_key)) > 2:
115 continue
116 d = distance(key, acceptable_key)
117 if d > 2:
118 continue
119 try:
120 attr.type_validator.ensure_type(value, key_path)
121 except ManifestParseException:
122 if attr.type_validator.base_type_match(value):
123 match_strength = 1
124 else:
125 match_strength = 0
126 else:
127 match_strength = 2
129 if match_strength < current_match_strength:
130 continue
131 if match_strength > current_match_strength:
132 current_match_strength = match_strength
133 matches.clear()
134 matches.append(acceptable_key)
136 if not matches:
137 return
138 ref = f'at "{path.path}"' if path else "at the manifest root level"
139 if len(matches) == 1:
140 possible_match = repr(matches[0])
141 _warn(
142 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
143 )
144 else:
145 matches.sort()
146 possible_matches = ", ".join(repr(a) for a in matches)
147 _warn(
148 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
149 )
152SF = TypeVar("SF")
153T = TypeVar("T")
154S = TypeVar("S")
157_NONE_TYPE = type(None)
160# These must be able to appear in an "isinstance" check and must be builtin types.
161BASIC_SIMPLE_TYPES = {
162 str: "string",
163 int: "integer",
164 bool: "boolean",
165}
168class AttributeTypeHandler:
169 __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
171 def __init__(
172 self,
173 description: str,
174 ensure_type: Callable[[Any, AttributePath], None],
175 *,
176 base_type: type[Any] | None = None,
177 mapper: None | (
178 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
179 ) = None,
180 ) -> None:
181 self._description = description
182 self._ensure_type = ensure_type
183 self.base_type = base_type
184 self.mapper = mapper
186 def describe_type(self) -> str:
187 return self._description
189 def ensure_type(self, obj: object, path: AttributePath) -> None:
190 self._ensure_type(obj, path)
192 def base_type_match(self, obj: object) -> bool:
193 base_type = self.base_type
194 return base_type is not None and isinstance(obj, base_type)
196 def map_type(
197 self,
198 value: Any,
199 path: AttributePath,
200 parser_context: Optional["ParserContextData"],
201 ) -> Any:
202 mapper = self.mapper
203 if mapper is not None:
204 return mapper(value, path, parser_context)
205 return value
207 def combine_mapper(
208 self,
209 mapper: None | (
210 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
211 ),
212 ) -> "AttributeTypeHandler":
213 if mapper is None:
214 return self
215 if self.mapper is not None:
216 m = self.mapper
218 def _combined_mapper(
219 value: Any,
220 path: AttributePath,
221 parser_context: Optional["ParserContextData"],
222 ) -> Any:
223 return mapper(m(value, path, parser_context), path, parser_context)
225 else:
226 _combined_mapper = mapper
228 return AttributeTypeHandler(
229 self._description,
230 self._ensure_type,
231 base_type=self.base_type,
232 mapper=_combined_mapper,
233 )
236@dataclasses.dataclass(slots=True)
237class AttributeDescription:
238 source_attribute_name: str
239 target_attribute: str
240 attribute_type: Any
241 type_validator: AttributeTypeHandler
242 annotations: tuple[Any, ...]
243 conflicting_attributes: frozenset[str]
244 conditional_required: Optional["ConditionalRequired"]
245 parse_hints: Optional["DetectedDebputyParseHint"] = None
246 is_optional: bool = False
249def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
250 if attribute_path.path_hint is not None: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 return True
252 if isinstance(v, str):
253 attribute_path.path_hint = v
254 return True
255 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
256 attribute_path.path_hint = v[0]
257 return True
258 return False
261@dataclasses.dataclass(slots=True, frozen=True)
262class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
263 alt_form_parser: AttributeDescription
264 inline_reference_documentation: ParserDocumentation | None = None
265 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
267 def parse_input(
268 self,
269 value: object,
270 path: AttributePath,
271 *,
272 parser_context: Optional["ParserContextData"] = None,
273 ) -> TD:
274 check_integration_mode(
275 path,
276 parser_context,
277 self.expected_debputy_integration_mode,
278 )
279 if self.reference_documentation_url is not None:
280 doc_ref = f" (Documentation: {self.reference_documentation_url})"
281 else:
282 doc_ref = ""
284 alt_form_parser = self.alt_form_parser
285 if value is None: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
287 if self.reference_documentation_url is not None:
288 doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
289 raise ManifestParseException(
290 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
291 )
292 _extract_path_hint(value, path)
293 alt_form_parser.type_validator.ensure_type(value, path)
294 attribute = alt_form_parser.target_attribute
295 alias_mapping = {
296 attribute: ("", None),
297 }
298 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
299 path.alias_mapping = alias_mapping
300 return cast("TD", {attribute: v})
303@dataclasses.dataclass(slots=True)
304class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
305 input_time_required_parameters: frozenset[str]
306 all_parameters: frozenset[str]
307 manifest_attributes: Mapping[str, "AttributeDescription"]
308 source_attributes: Mapping[str, "AttributeDescription"]
309 at_least_one_of: frozenset[frozenset[str]]
310 alt_form_parser: AttributeDescription | None
311 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset()
312 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None
313 inline_reference_documentation: ParserDocumentation | None = None
314 path_hint_source_attributes: Sequence[str] = tuple()
315 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
317 def _parse_alt_form(
318 self,
319 value: object,
320 path: AttributePath,
321 *,
322 parser_context: Optional["ParserContextData"] = None,
323 ) -> TD:
324 alt_form_parser = self.alt_form_parser
325 if alt_form_parser is None: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true
326 raise ManifestParseException(
327 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}"
328 )
329 _extract_path_hint(value, path)
330 alt_form_parser.type_validator.ensure_type(value, path)
331 assert (
332 value is not None
333 ), "The alternative form was None, but the parser should have rejected None earlier."
334 attribute = alt_form_parser.target_attribute
335 alias_mapping = {
336 attribute: ("", None),
337 }
338 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
339 path.alias_mapping = alias_mapping
340 return cast("TD", {attribute: v})
342 def _validate_expected_keys(
343 self,
344 value: dict[Any, Any],
345 path: AttributePath,
346 *,
347 parser_context: Optional["ParserContextData"] = None,
348 ) -> None:
349 unknown_keys = value.keys() - self.all_parameters
350 doc_ref = self._doc_url_error_suffix()
351 if unknown_keys: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 for k in unknown_keys:
353 if isinstance(k, str):
354 _detect_possible_typo(k, value[k], self.manifest_attributes, path)
355 unused_keys = self.all_parameters - value.keys()
356 if unused_keys:
357 k = ", ".join(unused_keys)
358 raise ManifestParseException(
359 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}'
360 )
361 raise ManifestParseException(
362 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}'
363 )
364 missing_keys = self.input_time_required_parameters - value.keys()
365 if missing_keys:
366 required = ", ".join(repr(k) for k in sorted(missing_keys))
367 raise ManifestParseException(
368 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}"
369 )
370 for maybe_required in self.all_parameters - value.keys():
371 attr = self.manifest_attributes[maybe_required]
372 assert attr.conditional_required is None or parser_context is not None
373 if ( 373 ↛ 379line 373 didn't jump to line 379 because the condition on line 373 was never true
374 attr.conditional_required is not None
375 and attr.conditional_required.condition_applies(
376 assume_not_none(parser_context)
377 )
378 ):
379 reason = attr.conditional_required.reason
380 raise ManifestParseException(
381 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}'
382 )
383 for keyset in self.at_least_one_of:
384 matched_keys = value.keys() & keyset
385 if not matched_keys: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
387 raise ManifestParseException(
388 f"At least one of the following keys must be present at {path.path_container_lc}:"
389 f" {conditionally_required}{doc_ref}"
390 )
391 for group in self.mutually_exclusive_attributes:
392 matched = value.keys() & group
393 if len(matched) > 1: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 ck = ", ".join(repr(k) for k in sorted(matched))
395 raise ManifestParseException(
396 f"Could not parse {path.path_container_lc}: The following attributes are"
397 f" mutually exclusive: {ck}{doc_ref}"
398 )
400 def _parse_typed_dict_form(
401 self,
402 value: dict[Any, Any],
403 path: AttributePath,
404 *,
405 parser_context: Optional["ParserContextData"] = None,
406 ) -> TD:
407 self._validate_expected_keys(value, path, parser_context=parser_context)
408 result = {}
409 per_attribute_conflicts = self._per_attribute_conflicts()
410 alias_mapping = {}
411 for path_hint_source_attributes in self.path_hint_source_attributes:
412 v = value.get(path_hint_source_attributes)
413 if v is not None and _extract_path_hint(v, path):
414 break
415 for k, v in value.items():
416 attr = self.manifest_attributes[k]
417 matched = value.keys() & per_attribute_conflicts[k]
418 if matched: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true
419 ck = ", ".join(repr(k) for k in sorted(matched))
420 raise ManifestParseException(
421 f'The attribute "{k}" at {path.path} cannot be used with the following'
422 f" attributes: {ck}{self._doc_url_error_suffix()}"
423 )
424 nk = attr.target_attribute
425 key_path = path[k]
426 attr.type_validator.ensure_type(v, key_path)
427 if v is None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 continue
429 if k != nk:
430 alias_mapping[nk] = k, None
431 v = attr.type_validator.map_type(v, key_path, parser_context)
432 result[nk] = v
433 if alias_mapping:
434 path.alias_mapping = alias_mapping
435 return cast("TD", result)
437 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
438 doc_url = self.reference_documentation_url
439 if doc_url is not None:
440 if see_url_version: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 return f" Please see {doc_url} for the documentation."
442 return f" (Documentation: {doc_url})"
443 return ""
445 def parse_input(
446 self,
447 value: object,
448 path: AttributePath,
449 *,
450 parser_context: Optional["ParserContextData"] = None,
451 ) -> TD:
452 check_integration_mode(
453 path,
454 parser_context,
455 self.expected_debputy_integration_mode,
456 )
457 if value is None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true
458 form_note = " The attribute must be a mapping."
459 if self.alt_form_parser is not None:
460 form_note = (
461 " The attribute can be a mapping or a non-mapping format"
462 ' (usually, "non-mapping format" means a string or a list of strings).'
463 )
464 doc_ref = self._doc_url_error_suffix(see_url_version=True)
465 raise ManifestParseException(
466 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
467 )
469 if not isinstance(value, dict):
470 return self._parse_alt_form(value, path, parser_context=parser_context)
471 return self._parse_typed_dict_form(value, path, parser_context=parser_context)
473 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]:
474 conflicts = self._per_attribute_conflicts_cache
475 if conflicts is not None:
476 return conflicts
477 attrs = self.source_attributes
478 conflicts = {
479 a.source_attribute_name: frozenset(
480 attrs[ca].source_attribute_name for ca in a.conflicting_attributes
481 )
482 for a in attrs.values()
483 }
484 self._per_attribute_conflicts_cache = conflicts
485 return self._per_attribute_conflicts_cache
488def _is_path_attribute_candidate(
489 source_attribute: AttributeDescription, target_attribute: AttributeDescription
490) -> bool:
491 if (
492 source_attribute.parse_hints
493 and not source_attribute.parse_hints.applicable_as_path_hint
494 ):
495 return False
496 target_type = target_attribute.attribute_type
497 _, origin, args = unpack_type(target_type, False)
498 match_type = target_type
499 if origin == list:
500 match_type = args[0]
501 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
504if typing.is_typeddict(DebputyParsedContent): 504 ↛ 508line 504 didn't jump to line 508 because the condition on line 504 was always true
505 is_typeddict = typing.is_typeddict
506else:
508 def is_typeddict(t: Any) -> bool:
509 if typing.is_typeddict(t):
510 return True
511 return isinstance(t, type) and issubclass(t, DebputyParsedContent)
514class ParserGenerator:
515 def __init__(self) -> None:
516 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {}
517 self._object_parsers: dict[str, DispatchingObjectParser] = {}
518 self._table_parsers: dict[
519 type[DebputyDispatchableType], DispatchingTableParser[Any]
520 ] = {}
521 self._in_package_context_parser: dict[str, Any] = {}
523 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None:
524 existing = self._registered_types.get(mapped_type.target_type)
525 if existing is not None: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 raise ValueError(f"The type {existing} is already registered")
527 self._registered_types[mapped_type.target_type] = mapped_type
529 def get_mapped_type_from_target_type(
530 self,
531 mapped_type: type[T],
532 ) -> TypeMapping[Any, T] | None:
533 return self._registered_types.get(mapped_type)
535 def discard_mapped_type(self, mapped_type: type[T]) -> None:
536 del self._registered_types[mapped_type]
538 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None:
539 assert rt not in self._table_parsers
540 self._table_parsers[rt] = DispatchingTableParser(rt, path)
542 def add_object_parser(
543 self,
544 path: str,
545 *,
546 parser_documentation: ParserDocumentation | None = None,
547 expected_debputy_integration_mode: None | (
548 Container[DebputyIntegrationMode]
549 ) = None,
550 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
551 allow_unknown_keys: bool = False,
552 ) -> DispatchingObjectParser:
553 assert path not in self._in_package_context_parser
554 assert path not in self._object_parsers
555 object_parser = DispatchingObjectParser(
556 path,
557 parser_documentation=parser_documentation,
558 expected_debputy_integration_mode=expected_debputy_integration_mode,
559 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity,
560 allow_unknown_keys=allow_unknown_keys,
561 )
562 self._object_parsers[path] = object_parser
563 return object_parser
565 def add_in_package_context_parser(
566 self,
567 path: str,
568 delegate: DeclarativeInputParser[Any],
569 ) -> None:
570 assert path not in self._in_package_context_parser
571 assert path not in self._object_parsers
572 self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
574 @property
575 def dispatchable_table_parsers(
576 self,
577 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]:
578 return self._table_parsers
580 @property
581 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
582 return self._object_parsers
584 def dispatch_parser_table_for(
585 self, rule_type: TTP
586 ) -> DispatchingTableParser[TP] | None:
587 return cast(
588 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
589 )
591 def generate_parser(
592 self,
593 parsed_content: type[TD],
594 *,
595 source_content: SF | None = None,
596 allow_optional: bool = False,
597 inline_reference_documentation: ParserDocumentation | None = None,
598 expected_debputy_integration_mode: None | (
599 Container[DebputyIntegrationMode]
600 ) = None,
601 automatic_docs: None | (
602 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]]
603 ) = None,
604 ) -> DeclarativeInputParser[TD]:
605 """Derive a parser from a TypedDict
607 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
608 or two that are used as a description.
610 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
611 their types. As an example:
613 >>> class InstallDocsRule(DebputyParsedContent):
614 ... sources: List[str]
615 ... into: List[str]
616 >>> pg = ParserGenerator()
617 >>> simple_parser = pg.generate_parser(InstallDocsRule)
619 This will create a parser that would be able to interpret something like:
621 ```yaml
622 install-docs:
623 sources: ["docs/*"]
624 into: ["my-pkg"]
625 ```
627 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore,
628 you can also provide a TypedDict describing the input, enabling more flexibility:
630 >>> class InstallDocsRule(DebputyParsedContent):
631 ... sources: List[str]
632 ... into: List[str]
633 >>> class InputDocsRuleInputFormat(TypedDict):
634 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
635 ... sources: NotRequired[List[str]]
636 ... into: Union[str, List[str]]
637 >>> pg = ParserGenerator()
638 >>> flexible_parser = pg.generate_parser(
639 ... InstallDocsRule,
640 ... source_content=InputDocsRuleInputFormat,
641 ... )
643 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
644 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
645 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str
646 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that
647 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
648 both the previous input but also the following input:
650 ```yaml
651 install-docs:
652 source: "docs/*"
653 into: "my-pkg"
654 ```
656 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
657 with a single string in them. As noted above, the name of the `source` attribute will also be normalized
658 while parsing.
660 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
661 as part of the input. Example:
663 >>> class DiscardRule(DebputyParsedContent):
664 ... paths: List[str]
665 >>> class DiscardRuleInputDictFormat(TypedDict):
666 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
667 ... paths: NotRequired[List[str]]
668 >>> # This format relies on DiscardRule having exactly one Required attribute
669 >>> DiscardRuleInputWithAltFormat = Union[
670 ... DiscardRuleInputDictFormat,
671 ... str,
672 ... List[str],
673 ... ]
674 >>> pg = ParserGenerator()
675 >>> flexible_parser = pg.generate_parser(
676 ... DiscardRule,
677 ... source_content=DiscardRuleInputWithAltFormat,
678 ... )
681 Supported types:
682 * `List` - must have a fixed type argument (such as `List[str]`)
683 * `str`
684 * `int`
685 * `BinaryPackage` - When provided (or required), the user must provide a package name listed
686 in the debian/control file. The code receives the BinaryPackage instance
687 matching that input.
688 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
689 format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
690 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
691 available statically on all Debian systems (must be in `base-passwd`).
692 The user has multiple options for how to specify it (either via name or id).
693 * `FileSystemGroup` - When provided (or required), the user must a file system group that is
694 available statically on all Debian systems (must be in `base-passwd`).
695 The user has multiple options for how to specify it (either via name or id).
696 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
697 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
698 provides the `debputy' default `when` parameter for conditionals.
700 Supported special type-like parameters:
702 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
703 outermost level. Cannot vary between `parsed_content` and `source_content`.
704 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
705 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
706 Automapping (see below) is restricted to two members in the Union.
708 Notable non-supported types:
709 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
710 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
712 Automatic mapping rules from `source_content` to `parsed_content`:
713 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
714 `lambda value: value if isinstance(value, list) else [value]`
715 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
717 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
718 for concrete features that may be useful to you.
720 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
721 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers).
722 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a
723 `List[TypedDict[...]]`.
724 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
725 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
726 the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
727 Note you should never pass the parsed_content as source_content directly.
728 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
729 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
730 can keep this False).
731 :param inline_reference_documentation: Optionally, programmatic documentation
732 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the
733 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage.
734 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately
735 (resulting in a "compile time" failure rather than a "runtime" failure).
736 :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
737 """
738 orig_parsed_content = parsed_content
739 if source_content is parsed_content: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true
740 raise ValueError(
741 "Do not provide source_content if it is the same as parsed_content"
742 )
743 is_list_wrapped = False
744 if get_origin(orig_parsed_content) == list:
745 parsed_content = get_args(orig_parsed_content)[0]
746 is_list_wrapped = True
748 if isinstance(parsed_content, type) and issubclass(
749 parsed_content, DebputyDispatchableType
750 ):
751 parser = self.dispatch_parser_table_for(parsed_content)
752 if parser is None: 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true
753 raise ValueError(
754 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
755 f" The class {parsed_content.__qualname__} is not a pre-registered type."
756 )
757 # FIXME: Only the list wrapped version has documentation.
758 if is_list_wrapped: 758 ↛ 764line 758 didn't jump to line 764 because the condition on line 758 was always true
759 parser = ListWrappedDeclarativeInputParser(
760 parser,
761 inline_reference_documentation=inline_reference_documentation,
762 expected_debputy_integration_mode=expected_debputy_integration_mode,
763 )
764 return parser
766 if not is_typeddict(parsed_content): 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 raise ValueError(
768 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
769 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
770 )
771 if is_list_wrapped and source_content is not None:
772 if get_origin(source_content) != list: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true
773 raise ValueError(
774 "If the parsed_content is a List type, then source_format must be a List type as well."
775 )
776 source_content = get_args(source_content)[0]
778 target_attributes = self._parse_types(
779 parsed_content,
780 allow_source_attribute_annotations=source_content is None,
781 forbid_optional=not allow_optional,
782 )
783 required_target_parameters = frozenset(parsed_content.__required_keys__)
784 parsed_alt_form = None
785 non_mapping_source_only = False
787 if source_content is not None:
788 default_target_attribute = None
789 if len(required_target_parameters) == 1:
790 default_target_attribute = next(iter(required_target_parameters))
792 source_typed_dict, alt_source_forms = _extract_typed_dict(
793 source_content,
794 default_target_attribute,
795 )
796 if alt_source_forms:
797 parsed_alt_form = self._parse_alt_form(
798 alt_source_forms,
799 default_target_attribute,
800 )
801 if source_typed_dict is not None:
802 source_content_attributes = self._parse_types(
803 source_typed_dict,
804 allow_target_attribute_annotation=True,
805 allow_source_attribute_annotations=True,
806 forbid_optional=not allow_optional,
807 )
808 source_content_parameter = "source_content"
809 source_and_parsed_differs = True
810 else:
811 source_typed_dict = parsed_content
812 source_content_attributes = target_attributes
813 source_content_parameter = "parsed_content"
814 source_and_parsed_differs = True
815 non_mapping_source_only = True
816 else:
817 source_typed_dict = parsed_content
818 source_content_attributes = target_attributes
819 source_content_parameter = "parsed_content"
820 source_and_parsed_differs = False
822 sources = collections.defaultdict(set)
823 seen_targets = set()
824 seen_source_names: dict[str, str] = {}
825 source_attributes: dict[str, AttributeDescription] = {}
826 path_hint_source_attributes = []
828 for k in source_content_attributes:
829 ia = source_content_attributes[k]
831 ta = (
832 target_attributes.get(ia.target_attribute)
833 if source_and_parsed_differs
834 else ia
835 )
836 if ta is None: 836 ↛ 838line 836 didn't jump to line 838 because the condition on line 836 was never true
837 # Error message would be wrong if this assertion is false.
838 assert source_and_parsed_differs
839 raise ValueError(
840 f'The attribute "{k}" from the "source_content" parameter should have mapped'
841 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
842 )
843 if _is_path_attribute_candidate(ia, ta):
844 path_hint_source_attributes.append(ia.source_attribute_name)
845 existing_source_name = seen_source_names.get(ia.source_attribute_name)
846 if existing_source_name: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true
847 raise ValueError(
848 f'The attribute "{k}" and "{existing_source_name}" both share the source name'
849 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
850 f' so only one attribute use "{ia.source_attribute_name}".'
851 )
852 seen_source_names[ia.source_attribute_name] = k
853 seen_targets.add(ta.target_attribute)
854 sources[ia.target_attribute].add(k)
855 if source_and_parsed_differs:
856 bridge_mapper = self._type_normalize(
857 k, ia.attribute_type, ta.attribute_type, False
858 )
859 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
860 source_attributes[k] = ia
862 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]:
863 return frozenset(
864 source_content_attributes[a].source_attribute_name for a in td_name
865 )
867 _check_attributes(
868 parsed_content,
869 source_typed_dict,
870 source_content_attributes,
871 sources,
872 )
874 at_least_one_of = frozenset(
875 _as_attr_names(g)
876 for k, g in sources.items()
877 if len(g) > 1 and k in required_target_parameters
878 )
880 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 880 ↛ 881line 880 didn't jump to line 881 because the condition on line 880 was never true
881 missing = ", ".join(
882 repr(k) for k in (target_attributes.keys() - seen_targets)
883 )
884 raise ValueError(
885 'The following attributes in "parsed_content" did not have a source field in "source_content":'
886 f" {missing}"
887 )
888 all_mutually_exclusive_fields = frozenset(
889 _as_attr_names(g) for g in sources.values() if len(g) > 1
890 )
892 all_parameters = (
893 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
894 )
895 _check_conflicts(
896 source_content_attributes,
897 source_typed_dict.__required_keys__,
898 all_parameters,
899 )
901 manifest_attributes = {
902 a.source_attribute_name: a for a in source_content_attributes.values()
903 }
905 if parsed_alt_form is not None:
906 target_attribute = parsed_alt_form.target_attribute
907 if ( 907 ↛ 912line 907 didn't jump to line 912 because the condition on line 907 was never true
908 target_attribute not in required_target_parameters
909 and required_target_parameters
910 or len(required_target_parameters) > 1
911 ):
912 raise NotImplementedError(
913 "When using alternative source formats (Union[TypedDict, ...]), then the"
914 " target must have at most one require parameter"
915 )
916 bridge_mapper = self._type_normalize(
917 target_attribute,
918 parsed_alt_form.attribute_type,
919 target_attributes[target_attribute].attribute_type,
920 False,
921 )
922 parsed_alt_form.type_validator = (
923 parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
924 )
926 inline_reference_documentation = (
927 _verify_and_auto_correct_inline_reference_documentation(
928 parsed_content,
929 source_typed_dict,
930 source_content_attributes,
931 inline_reference_documentation,
932 parsed_alt_form is not None,
933 automatic_docs,
934 )
935 )
936 if non_mapping_source_only:
937 parser = DeclarativeNonMappingInputParser(
938 assume_not_none(parsed_alt_form),
939 inline_reference_documentation=inline_reference_documentation,
940 expected_debputy_integration_mode=expected_debputy_integration_mode,
941 )
942 else:
943 parser = DeclarativeMappingInputParser(
944 _as_attr_names(source_typed_dict.__required_keys__),
945 _as_attr_names(all_parameters),
946 manifest_attributes,
947 source_attributes,
948 mutually_exclusive_attributes=all_mutually_exclusive_fields,
949 alt_form_parser=parsed_alt_form,
950 at_least_one_of=at_least_one_of,
951 inline_reference_documentation=inline_reference_documentation,
952 path_hint_source_attributes=tuple(path_hint_source_attributes),
953 expected_debputy_integration_mode=expected_debputy_integration_mode,
954 )
955 if is_list_wrapped:
956 parser = ListWrappedDeclarativeInputParser(
957 parser,
958 expected_debputy_integration_mode=expected_debputy_integration_mode,
959 )
960 return parser
962 def _as_type_validator(
963 self,
964 attribute: str,
965 provided_type: Any,
966 parsing_typed_dict_attribute: bool,
967 ) -> AttributeTypeHandler:
968 assert not isinstance(provided_type, tuple)
970 if isinstance(provided_type, type) and issubclass(
971 provided_type, DebputyDispatchableType
972 ):
973 return _dispatch_parser(provided_type)
975 unmapped_type = self._strip_mapped_types(
976 provided_type,
977 parsing_typed_dict_attribute,
978 )
979 type_normalizer = self._type_normalize(
980 attribute,
981 unmapped_type,
982 provided_type,
983 parsing_typed_dict_attribute,
984 )
985 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type(
986 unmapped_type,
987 parsing_typed_dict_attribute,
988 )
989 _, t_provided_orig, t_provided_args = unpack_type(
990 provided_type,
991 parsing_typed_dict_attribute,
992 )
994 if ( 994 ↛ 1000line 994 didn't jump to line 1000 because the condition on line 994 was never true
995 t_unmapped_orig == Union
996 and t_unmapped_args
997 and len(t_unmapped_args) == 2
998 and any(v is _NONE_TYPE for v in t_unmapped_args)
999 ):
1000 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
1001 actual_type = [a for a in args if a is not _NONE_TYPE][0]
1002 validator = self._as_type_validator(
1003 attribute, actual_type, parsing_typed_dict_attribute
1004 )
1006 def _validator(v: Any, path: AttributePath) -> None:
1007 if v is None:
1008 return
1009 validator.ensure_type(v, path)
1011 return AttributeTypeHandler(
1012 validator.describe_type(),
1013 _validator,
1014 base_type=validator.base_type,
1015 mapper=type_normalizer,
1016 )
1018 if unmapped_type in BASIC_SIMPLE_TYPES:
1019 type_name = BASIC_SIMPLE_TYPES[unmapped_type]
1021 type_mapping = self._registered_types.get(provided_type)
1022 if type_mapping is not None:
1023 simple_type = f" ({type_name})"
1024 type_name = type_mapping.target_type.__name__
1025 else:
1026 simple_type = ""
1028 def _validator(v: Any, path: AttributePath) -> None:
1029 if not isinstance(v, unmapped_type):
1030 _validation_type_error(
1031 path, f"The attribute must be a {type_name}{simple_type}"
1032 )
1034 return AttributeTypeHandler(
1035 type_name,
1036 _validator,
1037 base_type=unmapped_type,
1038 mapper=type_normalizer,
1039 )
1040 if t_unmapped_orig == list:
1041 if not t_unmapped_args: 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true
1042 raise ValueError(
1043 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
1044 )
1046 genetic_type = t_unmapped_args[0]
1047 key_mapper = self._as_type_validator(
1048 attribute,
1049 genetic_type,
1050 parsing_typed_dict_attribute,
1051 )
1053 def _validator(v: Any, path: AttributePath) -> None:
1054 if not isinstance(v, list): 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 _validation_type_error(path, "The attribute must be a list")
1056 for i, v in enumerate(v):
1057 key_mapper.ensure_type(v, path[i])
1059 list_mapper = (
1060 map_each_element(key_mapper.mapper)
1061 if key_mapper.mapper is not None
1062 else None
1063 )
1065 return AttributeTypeHandler(
1066 f"List of {key_mapper.describe_type()}",
1067 _validator,
1068 base_type=list,
1069 mapper=type_normalizer,
1070 ).combine_mapper(list_mapper)
1071 if is_typeddict(provided_type):
1072 subparser = self.generate_parser(cast("Type[TD]", provided_type))
1073 return AttributeTypeHandler(
1074 description=f"{provided_type.__name__} (Typed Mapping)",
1075 ensure_type=lambda v, ap: None,
1076 base_type=dict,
1077 mapper=lambda v, ap, cv: subparser.parse_input(
1078 v, ap, parser_context=cv
1079 ),
1080 )
1081 if t_unmapped_orig == dict:
1082 if not t_unmapped_args or len(t_unmapped_args) != 2: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true
1083 raise ValueError(
1084 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
1085 )
1086 if t_unmapped_args[0] != str: 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true
1087 raise ValueError(
1088 f'The attribute "{attribute}" is Dict and has a non-str type as key.'
1089 " Currently, only `str` is supported (Dict[str, Y])"
1090 )
1091 key_mapper = self._as_type_validator(
1092 attribute,
1093 t_unmapped_args[0],
1094 parsing_typed_dict_attribute,
1095 )
1096 value_mapper = self._as_type_validator(
1097 attribute,
1098 t_unmapped_args[1],
1099 parsing_typed_dict_attribute,
1100 )
1102 if key_mapper.base_type is None: 1102 ↛ 1103line 1102 didn't jump to line 1103 because the condition on line 1102 was never true
1103 raise ValueError(
1104 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
1105 f" without trivial base types (such as `str`) are not supported at the moment."
1106 )
1108 if value_mapper.mapper is not None: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true
1109 raise ValueError(
1110 f'The attribute "{attribute}" is Dict and the value requires mapping.'
1111 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
1112 " Better typing may come later"
1113 )
1115 def _validator(uv: Any, path: AttributePath) -> None:
1116 if not isinstance(uv, dict): 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true
1117 _validation_type_error(path, "The attribute must be a mapping")
1118 key_name = "the first key in the mapping"
1119 for i, (k, v) in enumerate(uv.items()):
1120 if not key_mapper.base_type_match(k): 1120 ↛ 1121line 1120 didn't jump to line 1121 because the condition on line 1120 was never true
1121 kp = path.copy_with_path_hint(key_name)
1122 _validation_type_error(
1123 kp,
1124 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
1125 )
1126 key_name = f"the key after {k}"
1127 value_mapper.ensure_type(v, path[k])
1129 return AttributeTypeHandler(
1130 f"Mapping of {value_mapper.describe_type()}",
1131 _validator,
1132 base_type=dict,
1133 mapper=type_normalizer,
1134 ).combine_mapper(key_mapper.mapper)
1135 if t_unmapped_orig in (Union, UnionType):
1136 if _is_two_arg_x_list_x(t_provided_args):
1137 # Force the order to be "X, List[X]" as it simplifies the code
1138 x_list_x = (
1139 t_provided_args
1140 if get_origin(t_provided_args[1]) == list
1141 else (t_provided_args[1], t_provided_args[0])
1142 )
1144 # X, List[X] could match if X was List[Y]. However, our code below assumes
1145 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
1146 # case to avoid this assert and fall into the "generic case".
1147 assert get_origin(x_list_x[0]) != list
1148 x_subtype_checker = self._as_type_validator(
1149 attribute,
1150 x_list_x[0],
1151 parsing_typed_dict_attribute,
1152 )
1153 list_x_subtype_checker = self._as_type_validator(
1154 attribute,
1155 x_list_x[1],
1156 parsing_typed_dict_attribute,
1157 )
1158 type_description = x_subtype_checker.describe_type()
1159 type_description = f"{type_description} or a list of {type_description}"
1161 def _validator(v: Any, path: AttributePath) -> None:
1162 if isinstance(v, list):
1163 list_x_subtype_checker.ensure_type(v, path)
1164 else:
1165 x_subtype_checker.ensure_type(v, path)
1167 return AttributeTypeHandler(
1168 type_description,
1169 _validator,
1170 mapper=type_normalizer,
1171 )
1172 else:
1173 subtype_checker = [
1174 self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
1175 for a in t_unmapped_args
1176 ]
1177 type_description = "one-of: " + ", ".join(
1178 f"{sc.describe_type()}" for sc in subtype_checker
1179 )
1180 mapper = subtype_checker[0].mapper
1181 if any(mapper != sc.mapper for sc in subtype_checker): 1181 ↛ 1182line 1181 didn't jump to line 1182 because the condition on line 1181 was never true
1182 raise ValueError(
1183 f'Cannot handle the union "{provided_type}" as the target types need different'
1184 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
1185 " where X is a non-collection type."
1186 )
1188 def _validator(v: Any, path: AttributePath) -> None:
1189 partial_matches = []
1190 for sc in subtype_checker: 1190 ↛ 1198line 1190 didn't jump to line 1198 because the loop on line 1190 didn't complete
1191 try:
1192 sc.ensure_type(v, path)
1193 return
1194 except ManifestParseException as e:
1195 if sc.base_type_match(v): 1195 ↛ 1196line 1195 didn't jump to line 1196 because the condition on line 1195 was never true
1196 partial_matches.append((sc, e))
1198 if len(partial_matches) == 1:
1199 raise partial_matches[0][1]
1200 _validation_type_error(
1201 path, f"Could not match against: {type_description}"
1202 )
1204 return AttributeTypeHandler(
1205 type_description,
1206 _validator,
1207 mapper=type_normalizer,
1208 )
1209 if t_unmapped_orig == Literal:
1210 # We want "x" for string values; repr provides 'x'
1211 pretty = ", ".join(
1212 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args
1213 )
1215 def _validator(v: Any, path: AttributePath) -> None:
1216 if v not in t_unmapped_args:
1217 value_hint = ""
1218 if isinstance(v, str): 1218 ↛ 1220line 1218 didn't jump to line 1220 because the condition on line 1218 was always true
1219 value_hint = f"({v}) "
1220 _validation_type_error(
1221 path,
1222 f"Value {value_hint}must be one of the following literal values: {pretty}",
1223 )
1225 return AttributeTypeHandler(
1226 f"One of the following literal values: {pretty}",
1227 _validator,
1228 )
1230 if provided_type == Any: 1230 ↛ 1235line 1230 didn't jump to line 1235 because the condition on line 1230 was always true
1231 return AttributeTypeHandler(
1232 "any (unvalidated)",
1233 lambda *a: None,
1234 )
1235 raise ValueError(
1236 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
1237 )
1239 def _parse_types(
1240 self,
1241 spec: type[TypedDict],
1242 allow_target_attribute_annotation: bool = False,
1243 allow_source_attribute_annotations: bool = False,
1244 forbid_optional: bool = True,
1245 ) -> dict[str, AttributeDescription]:
1246 annotations = get_type_hints(spec, include_extras=True)
1247 return {
1248 k: self._attribute_description(
1249 k,
1250 t,
1251 k in spec.__required_keys__,
1252 allow_target_attribute_annotation=allow_target_attribute_annotation,
1253 allow_source_attribute_annotations=allow_source_attribute_annotations,
1254 forbid_optional=forbid_optional,
1255 )
1256 for k, t in annotations.items()
1257 }
1259 def _attribute_description(
1260 self,
1261 attribute: str,
1262 orig_td: Any,
1263 is_required: bool,
1264 forbid_optional: bool = True,
1265 allow_target_attribute_annotation: bool = False,
1266 allow_source_attribute_annotations: bool = False,
1267 ) -> AttributeDescription:
1268 td, anno, is_optional = _parse_type(
1269 attribute, orig_td, forbid_optional=forbid_optional
1270 )
1271 type_validator = self._as_type_validator(attribute, td, True)
1272 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1273 anno,
1274 f' Seen with attribute "{attribute}".',
1275 attribute,
1276 is_required,
1277 allow_target_attribute_annotation=allow_target_attribute_annotation,
1278 allow_source_attribute_annotations=allow_source_attribute_annotations,
1279 )
1280 return AttributeDescription(
1281 target_attribute=parsed_annotations.target_attribute,
1282 attribute_type=td,
1283 type_validator=type_validator,
1284 annotations=anno,
1285 is_optional=is_optional,
1286 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1287 conditional_required=parsed_annotations.conditional_required,
1288 source_attribute_name=assume_not_none(
1289 parsed_annotations.source_manifest_attribute
1290 ),
1291 parse_hints=parsed_annotations,
1292 )
1294 def _parse_alt_form(
1295 self,
1296 alt_form,
1297 default_target_attribute: str | None,
1298 ) -> AttributeDescription:
1299 td, anno, is_optional = _parse_type(
1300 "source_format alternative form",
1301 alt_form,
1302 forbid_optional=True,
1303 parsing_typed_dict_attribute=False,
1304 )
1305 type_validator = self._as_type_validator(
1306 "source_format alternative form",
1307 td,
1308 True,
1309 )
1310 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1311 anno,
1312 " The alternative for source_format.",
1313 None,
1314 False,
1315 default_target_attribute=default_target_attribute,
1316 allow_target_attribute_annotation=True,
1317 allow_source_attribute_annotations=False,
1318 )
1319 return AttributeDescription(
1320 target_attribute=parsed_annotations.target_attribute,
1321 attribute_type=td,
1322 type_validator=type_validator,
1323 annotations=anno,
1324 is_optional=is_optional,
1325 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1326 conditional_required=parsed_annotations.conditional_required,
1327 source_attribute_name="Alt form of the source_format",
1328 )
1330 def _union_narrowing(
1331 self,
1332 input_type: Any,
1333 target_type: Any,
1334 parsing_typed_dict_attribute: bool,
1335 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1336 _, input_orig, input_args = unpack_type(
1337 input_type, parsing_typed_dict_attribute
1338 )
1339 _, target_orig, target_args = unpack_type(
1340 target_type, parsing_typed_dict_attribute
1341 )
1343 if input_orig not in (Union, UnionType) or not input_args: 1343 ↛ 1344line 1343 didn't jump to line 1344 because the condition on line 1343 was never true
1344 raise ValueError("input_type must be a Union[...] with non-empty args")
1346 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
1347 # - Where X = Y or there is a simple standard transformation from X to Y.
1349 if target_orig not in (Union, UnionType, list) or not target_args:
1350 # Not supported
1351 return None
1353 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1353 ↛ 1355line 1353 didn't jump to line 1355 because the condition on line 1353 was never true
1354 # Not needed (identity mapping)
1355 return None
1357 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1357 ↛ 1359line 1357 didn't jump to line 1359 because the condition on line 1357 was never true
1358 # Not supported
1359 return None
1361 target_arg = target_args[0]
1362 simplified_type = self._strip_mapped_types(
1363 target_arg, parsing_typed_dict_attribute
1364 )
1365 acceptable_types = {
1366 target_arg,
1367 list[target_arg], # type: ignore
1368 List[target_arg], # type: ignore
1369 simplified_type,
1370 list[simplified_type], # type: ignore
1371 List[simplified_type], # type: ignore
1372 }
1373 target_format = (
1374 target_arg,
1375 list[target_arg], # type: ignore
1376 List[target_arg], # type: ignore
1377 )
1378 in_target_format = 0
1379 in_simple_format = 0
1380 for input_arg in input_args:
1381 if input_arg not in acceptable_types: 1381 ↛ 1383line 1381 didn't jump to line 1383 because the condition on line 1381 was never true
1382 # Not supported
1383 return None
1384 if input_arg in target_format:
1385 in_target_format += 1
1386 else:
1387 in_simple_format += 1
1389 assert in_simple_format or in_target_format
1391 if in_target_format and not in_simple_format:
1392 # Union[X, List[X]] -> List[X]
1393 return normalize_into_list
1394 mapped = self._registered_types[target_arg]
1395 if not in_target_format and in_simple_format: 1395 ↛ 1410line 1395 didn't jump to line 1410 because the condition on line 1395 was always true
1396 # Union[X, List[X]] -> List[Y]
1398 def _mapper_x_list_y(
1399 x: Any | list[Any],
1400 ap: AttributePath,
1401 pc: Optional["ParserContextData"],
1402 ) -> list[Any]:
1403 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1405 return [mapped.mapper(x, ap, pc) for x in in_list_form]
1407 return _mapper_x_list_y
1409 # Union[Y, List[X]] -> List[Y]
1410 if not isinstance(target_arg, type):
1411 raise ValueError(
1412 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
1413 f" not support mixed types. Please use either {simplified_type} or {target_arg}"
1414 f" in the source content (but both a mix of both)"
1415 )
1417 def _mapper_mixed_list_y(
1418 x: Any | list[Any],
1419 ap: AttributePath,
1420 pc: Optional["ParserContextData"],
1421 ) -> list[Any]:
1422 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1424 return [
1425 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
1426 for x in in_list_form
1427 ]
1429 return _mapper_mixed_list_y
1431 def _type_normalize(
1432 self,
1433 attribute: str,
1434 input_type: Any,
1435 target_type: Any,
1436 parsing_typed_dict_attribute: bool,
1437 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1438 if input_type == target_type:
1439 return None
1440 _, input_orig, input_args = unpack_type(
1441 input_type, parsing_typed_dict_attribute
1442 )
1443 _, target_orig, target_args = unpack_type(
1444 target_type,
1445 parsing_typed_dict_attribute,
1446 )
1447 if input_orig in (Union, UnionType):
1448 result = self._union_narrowing(
1449 input_type, target_type, parsing_typed_dict_attribute
1450 )
1451 if result:
1452 return result
1453 elif target_orig == list and target_args[0] == input_type:
1454 return wrap_into_list
1456 mapped = self._registered_types.get(target_type)
1457 if mapped is not None and input_type == mapped.source_type:
1458 # Source -> Target
1459 return mapped.mapper
1460 if target_orig == list and target_args: 1460 ↛ 1478line 1460 didn't jump to line 1478 because the condition on line 1460 was always true
1461 mapped = self._registered_types.get(target_args[0])
1462 if mapped is not None: 1462 ↛ 1478line 1462 didn't jump to line 1478 because the condition on line 1462 was always true
1463 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
1464 mapped_type: TypeMapping = mapped
1465 if input_type == mapped.source_type: 1465 ↛ 1467line 1465 didn't jump to line 1467 because the condition on line 1465 was never true
1466 # Source -> List[Target]
1467 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
1468 if ( 1468 ↛ 1478line 1468 didn't jump to line 1478 because the condition on line 1468 was always true
1469 input_orig == list
1470 and input_args
1471 and input_args[0] == mapped_type.source_type
1472 ):
1473 # List[Source] -> List[Target]
1474 return lambda xs, ap, pc: [
1475 mapped_type.mapper(x, ap, pc) for x in xs
1476 ]
1478 raise ValueError(
1479 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
1480 f" {input_type} to {target_type}"
1481 )
1483 def _strip_mapped_types(
1484 self, orig_td: Any, parsing_typed_dict_attribute: bool
1485 ) -> Any:
1486 m = self._registered_types.get(orig_td)
1487 if m is not None:
1488 return m.source_type
1489 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1490 if v == list:
1491 arg = args[0]
1492 m = self._registered_types.get(arg)
1493 if m:
1494 return list[m.source_type] # type: ignore
1495 if v in (Union, UnionType):
1496 stripped_args = tuple(
1497 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
1498 )
1499 if stripped_args != args:
1500 return Union[stripped_args]
1501 return orig_td
1504def _sort_key(attr: StandardParserAttributeDocumentation) -> Any:
1505 key = next(iter(attr.attributes))
1506 return attr.sort_category, key
1509def _apply_std_docs(
1510 std_doc_table: (
1511 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1512 ),
1513 source_format_typed_dict: type[Any],
1514 attribute_docs: Sequence[ParserAttributeDocumentation] | None,
1515) -> Sequence[ParserAttributeDocumentation] | None:
1516 if std_doc_table is None or not std_doc_table: 1516 ↛ 1519line 1516 didn't jump to line 1519 because the condition on line 1516 was always true
1517 return attribute_docs
1519 has_docs_for = set()
1520 if attribute_docs:
1521 for attribute_doc in attribute_docs:
1522 has_docs_for.update(attribute_doc.attributes)
1524 base_seen = set()
1525 std_docs_used = []
1527 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", []))
1528 base_seen.update(remaining_bases)
1529 while remaining_bases:
1530 base = remaining_bases.pop()
1531 new_bases_to_check = {
1532 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen
1533 }
1534 remaining_bases.update(new_bases_to_check)
1535 base_seen.update(new_bases_to_check)
1536 std_docs = std_doc_table.get(base)
1537 if std_docs:
1538 for std_doc in std_docs:
1539 if any(a in has_docs_for for a in std_doc.attributes):
1540 # If there is any overlap, do not add the docs
1541 continue
1542 has_docs_for.update(std_doc.attributes)
1543 std_docs_used.append(std_doc)
1545 if not std_docs_used:
1546 return attribute_docs
1547 docs = sorted(std_docs_used, key=_sort_key)
1548 if attribute_docs:
1549 # Plugin provided attributes first
1550 c = list(attribute_docs)
1551 c.extend(docs)
1552 docs = c
1553 return tuple(docs)
1556def _verify_and_auto_correct_inline_reference_documentation(
1557 parsed_content: type[TD],
1558 source_typed_dict: type[Any],
1559 source_content_attributes: Mapping[str, AttributeDescription],
1560 inline_reference_documentation: ParserDocumentation | None,
1561 has_alt_form: bool,
1562 automatic_docs: (
1563 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1564 ) = None,
1565) -> ParserDocumentation | None:
1566 orig_attribute_docs = (
1567 inline_reference_documentation.attribute_doc
1568 if inline_reference_documentation
1569 else None
1570 )
1571 attribute_docs = _apply_std_docs(
1572 automatic_docs,
1573 source_typed_dict,
1574 orig_attribute_docs,
1575 )
1576 if inline_reference_documentation is None and attribute_docs is None:
1577 return None
1578 changes = {}
1579 if attribute_docs:
1580 seen = set()
1581 had_any_custom_docs = False
1582 for attr_doc in attribute_docs:
1583 if not isinstance(attr_doc, StandardParserAttributeDocumentation):
1584 had_any_custom_docs = True
1585 for attr_name in attr_doc.attributes:
1586 attr = source_content_attributes.get(attr_name)
1587 if attr is None: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true
1588 raise ValueError(
1589 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1590 f' references an attribute "{attr_name}", which does not exist in the source format.'
1591 )
1592 if attr_name in seen: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true
1593 raise ValueError(
1594 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1595 f' has documentation for "{attr_name}" twice, which is not supported.'
1596 f" Please document it at most once"
1597 )
1598 seen.add(attr_name)
1599 undocumented = source_content_attributes.keys() - seen
1600 if undocumented: 1600 ↛ 1601line 1600 didn't jump to line 1601 because the condition on line 1600 was never true
1601 if had_any_custom_docs:
1602 undocumented_attrs = ", ".join(undocumented)
1603 raise ValueError(
1604 f"The following attributes were not documented for the source format of"
1605 f" {parsed_content.__qualname__}. If this is deliberate, then please"
1606 ' declare each them as undocumented (via undocumented_attr("foo")):'
1607 f" {undocumented_attrs}"
1608 )
1609 combined_docs = list(attribute_docs)
1610 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented))
1611 attribute_docs = combined_docs
1613 if attribute_docs and orig_attribute_docs != attribute_docs: 1613 ↛ 1614line 1613 didn't jump to line 1614 because the condition on line 1613 was never true
1614 assert attribute_docs is not None
1615 changes["attribute_doc"] = tuple(attribute_docs)
1617 if ( 1617 ↛ 1622line 1617 didn't jump to line 1622 because the condition on line 1617 was never true
1618 inline_reference_documentation is not None
1619 and inline_reference_documentation.alt_parser_description
1620 and not has_alt_form
1621 ):
1622 raise ValueError(
1623 "The inline_reference_documentation had documentation for an non-mapping format,"
1624 " but the source format does not have a non-mapping format."
1625 )
1626 if changes: 1626 ↛ 1627line 1626 didn't jump to line 1627 because the condition on line 1626 was never true
1627 if inline_reference_documentation is None:
1628 inline_reference_documentation = reference_documentation()
1629 return inline_reference_documentation.replace(**changes)
1630 return inline_reference_documentation
1633def _check_conflicts(
1634 input_content_attributes: dict[str, AttributeDescription],
1635 required_attributes: frozenset[str],
1636 all_attributes: frozenset[str],
1637) -> None:
1638 for attr_name, attr in input_content_attributes.items():
1639 if attr_name in required_attributes and attr.conflicting_attributes: 1639 ↛ 1640line 1639 didn't jump to line 1640 because the condition on line 1639 was never true
1640 c = ", ".join(repr(a) for a in attr.conflicting_attributes)
1641 raise ValueError(
1642 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
1643 " This makes it impossible to use these attributes. Either remove the attributes"
1644 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
1645 " optional (NotRequired)"
1646 )
1647 else:
1648 required_conflicts = attr.conflicting_attributes & required_attributes
1649 if required_conflicts: 1649 ↛ 1650line 1649 didn't jump to line 1650 because the condition on line 1649 was never true
1650 c = ", ".join(repr(a) for a in required_conflicts)
1651 raise ValueError(
1652 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
1653 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
1654 f" adjust the conflicts or make the listed attributes optional (NotRequired)"
1655 )
1656 unknown_attributes = attr.conflicting_attributes - all_attributes
1657 if unknown_attributes: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true
1658 c = ", ".join(repr(a) for a in unknown_attributes)
1659 raise ValueError(
1660 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
1661 f" None of these attributes were declared in the input."
1662 )
1665def _check_attributes(
1666 content: type[TypedDict],
1667 input_content: type[TypedDict],
1668 input_content_attributes: dict[str, AttributeDescription],
1669 sources: Mapping[str, Collection[str]],
1670) -> None:
1671 target_required_keys = content.__required_keys__
1672 input_required_keys = input_content.__required_keys__
1673 all_input_keys = input_required_keys | input_content.__optional_keys__
1675 for input_name in all_input_keys:
1676 attr = input_content_attributes[input_name]
1677 target_name = attr.target_attribute
1678 source_names = sources[target_name]
1679 input_is_required = input_name in input_required_keys
1680 target_is_required = target_name in target_required_keys
1682 assert source_names
1684 if input_is_required and len(source_names) > 1: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true
1685 raise ValueError(
1686 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
1687 f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
1688 f' then there is no need for additional sources for "{target_name}". Alternatively,'
1689 f' "{input_name}" might be missing a NotRequired type'
1690 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
1691 )
1692 if not input_is_required and target_is_required and len(source_names) == 1: 1692 ↛ 1693line 1692 didn't jump to line 1693 because the condition on line 1692 was never true
1693 raise ValueError(
1694 f'The source attribute "{input_name}" is not marked as required and maps to'
1695 f' "{target_name}", which is marked as required. As there are no other attributes'
1696 f' mapping to "{target_name}", then "{input_name}" must be required as well'
1697 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
1698 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
1699 )
1702def _validation_type_error(path: AttributePath, message: str) -> None:
1703 raise ManifestParseException(
1704 f'The attribute "{path.path}" did not have a valid structure/type: {message}'
1705 )
1708def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool:
1709 if len(t_args) != 2:
1710 return False
1711 lhs, rhs = t_args
1712 if get_origin(lhs) == list:
1713 if get_origin(rhs) == list: 1713 ↛ 1716line 1713 didn't jump to line 1716 because the condition on line 1713 was never true
1714 # It could still match X, List[X] - but we do not allow this case for now as the caller
1715 # does not support it.
1716 return False
1717 l_args = get_args(lhs)
1718 return bool(l_args and l_args[0] == rhs)
1719 if get_origin(rhs) == list:
1720 r_args = get_args(rhs)
1721 return bool(r_args and r_args[0] == lhs)
1722 return False
1725def _extract_typed_dict(
1726 base_type,
1727 default_target_attribute: str | None,
1728) -> tuple[type[TypedDict] | None, Any]:
1729 if is_typeddict(base_type):
1730 return base_type, None
1731 _, origin, args = unpack_type(base_type, False)
1732 if origin != Union:
1733 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1733 ↛ 1734line 1733 didn't jump to line 1734 because the condition on line 1733 was never true
1734 raise ValueError(
1735 "The source_format cannot be nor contain a (non-TypedDict) dict"
1736 )
1737 return None, base_type
1738 typed_dicts = [x for x in args if is_typeddict(x)]
1739 if len(typed_dicts) > 1: 1739 ↛ 1740line 1739 didn't jump to line 1740 because the condition on line 1739 was never true
1740 raise ValueError(
1741 "When source_format is a Union, it must contain at most one TypedDict"
1742 )
1743 typed_dict = typed_dicts[0] if typed_dicts else None
1745 if any(x is None or x is _NONE_TYPE for x in args): 1745 ↛ 1746line 1745 didn't jump to line 1746 because the condition on line 1745 was never true
1746 raise ValueError(
1747 "The source_format cannot be nor contain Optional[X] or Union[X, None]"
1748 )
1750 if any( 1750 ↛ 1755line 1750 didn't jump to line 1755 because the condition on line 1750 was never true
1751 isinstance(x, type) and issubclass(x, (dict, Mapping))
1752 for x in args
1753 if x is not typed_dict
1754 ):
1755 raise ValueError(
1756 "The source_format cannot be nor contain a (non-TypedDict) dict"
1757 )
1758 remaining = [x for x in args if x is not typed_dict]
1759 has_target_attribute = False
1760 anno = None
1761 if len(remaining) == 1: 1761 ↛ 1762line 1761 didn't jump to line 1762 because the condition on line 1761 was never true
1762 base_type, anno, _ = _parse_type(
1763 "source_format alternative form",
1764 remaining[0],
1765 forbid_optional=True,
1766 parsing_typed_dict_attribute=False,
1767 )
1768 has_target_attribute = bool(anno) and any(
1769 isinstance(x, TargetAttribute) for x in anno
1770 )
1771 target_type = base_type
1772 else:
1773 target_type = Union[tuple(remaining)]
1775 if default_target_attribute is None and not has_target_attribute: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true
1776 raise ValueError(
1777 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
1778 " OR the parsed_content format must have exactly one attribute that is required."
1779 )
1780 if anno: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true
1781 final_anno = [target_type]
1782 final_anno.extend(anno)
1783 return typed_dict, Annotated[tuple(final_anno)]
1784 return typed_dict, target_type
1787def _dispatch_parse_generator(
1788 dispatch_type: type[DebputyDispatchableType],
1789) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
1790 def _dispatch_parse(
1791 value: Any,
1792 attribute_path: AttributePath,
1793 parser_context: Optional["ParserContextData"],
1794 ):
1795 assert parser_context is not None
1796 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
1797 return dispatching_parser.parse_input(
1798 value, attribute_path, parser_context=parser_context
1799 )
1801 return _dispatch_parse
1804def _dispatch_parser(
1805 dispatch_type: type[DebputyDispatchableType],
1806) -> AttributeTypeHandler:
1807 return AttributeTypeHandler(
1808 dispatch_type.__name__,
1809 lambda *a: None,
1810 mapper=_dispatch_parse_generator(dispatch_type),
1811 )
1814def _parse_type(
1815 attribute: str,
1816 orig_td: Any,
1817 forbid_optional: bool = True,
1818 parsing_typed_dict_attribute: bool = True,
1819) -> tuple[Any, tuple[Any, ...], bool]:
1820 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1821 md: tuple[Any, ...] = tuple()
1822 optional = False
1823 if v is not None:
1824 if v == Annotated:
1825 anno = get_args(td)
1826 md = anno[1:]
1827 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
1829 if td is _NONE_TYPE: 1829 ↛ 1830line 1829 didn't jump to line 1830 because the condition on line 1829 was never true
1830 raise ValueError(
1831 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
1832 " debputy manifest, so this attribute does not make sense in its current form."
1833 )
1834 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1834 ↛ 1835line 1834 didn't jump to line 1835 because the condition on line 1834 was never true
1835 raise ValueError(
1836 f'Detected use of Optional in "{attribute}", which is not allowed here.'
1837 " Please use NotRequired for optional fields"
1838 )
1840 return td, md, optional
1843def _normalize_attribute_name(attribute: str) -> str:
1844 if attribute.endswith("_"):
1845 attribute = attribute[:-1]
1846 return attribute.replace("_", "-")
1849@dataclasses.dataclass
1850class DetectedDebputyParseHint:
1851 target_attribute: str
1852 source_manifest_attribute: str | None
1853 conflict_with_source_attributes: frozenset[str]
1854 conditional_required: ConditionalRequired | None
1855 applicable_as_path_hint: bool
1857 @classmethod
1858 def parse_annotations(
1859 cls,
1860 anno: tuple[Any, ...],
1861 error_context: str,
1862 default_attribute_name: str | None,
1863 is_required: bool,
1864 default_target_attribute: str | None = None,
1865 allow_target_attribute_annotation: bool = False,
1866 allow_source_attribute_annotations: bool = False,
1867 ) -> "DetectedDebputyParseHint":
1868 target_attr_anno = find_annotation(anno, TargetAttribute)
1869 if target_attr_anno:
1870 if not allow_target_attribute_annotation: 1870 ↛ 1871line 1870 didn't jump to line 1871 because the condition on line 1870 was never true
1871 raise ValueError(
1872 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
1873 )
1874 target_attribute = target_attr_anno.attribute
1875 elif default_target_attribute is not None:
1876 target_attribute = default_target_attribute
1877 elif default_attribute_name is not None: 1877 ↛ 1880line 1877 didn't jump to line 1880 because the condition on line 1877 was always true
1878 target_attribute = default_attribute_name
1879 else:
1880 if default_attribute_name is None:
1881 raise ValueError(
1882 "allow_target_attribute_annotation must be True OR "
1883 "default_attribute_name/default_target_attribute must be not None"
1884 )
1885 raise ValueError(
1886 f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
1887 )
1888 source_attribute_anno = find_annotation(anno, ManifestAttribute)
1889 _source_attribute_allowed(
1890 allow_source_attribute_annotations, error_context, source_attribute_anno
1891 )
1892 if source_attribute_anno:
1893 source_attribute_name = source_attribute_anno.attribute
1894 elif default_attribute_name is not None:
1895 source_attribute_name = _normalize_attribute_name(default_attribute_name)
1896 else:
1897 source_attribute_name = None
1898 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
1899 if mutual_exclusive_with_anno:
1900 _source_attribute_allowed(
1901 allow_source_attribute_annotations,
1902 error_context,
1903 mutual_exclusive_with_anno,
1904 )
1905 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
1906 else:
1907 conflicting_attributes = frozenset()
1908 conditional_required = find_annotation(anno, ConditionalRequired)
1910 if conditional_required and is_required: 1910 ↛ 1911line 1910 didn't jump to line 1911 because the condition on line 1910 was never true
1911 if default_attribute_name is None:
1912 raise ValueError(
1913 f"is_required cannot be True without default_attribute_name being not None"
1914 )
1915 raise ValueError(
1916 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
1917 ' Please make the attribute "NotRequired" or remove the conditional requirement.'
1918 )
1920 not_path_hint_anno = find_annotation(anno, NotPathHint)
1921 applicable_as_path_hint = not_path_hint_anno is None
1923 return DetectedDebputyParseHint(
1924 target_attribute=target_attribute,
1925 source_manifest_attribute=source_attribute_name,
1926 conflict_with_source_attributes=conflicting_attributes,
1927 conditional_required=conditional_required,
1928 applicable_as_path_hint=applicable_as_path_hint,
1929 )
1932def _source_attribute_allowed(
1933 source_attribute_allowed: bool,
1934 error_context: str,
1935 annotation: DebputyParseHint | None,
1936) -> None:
1937 if source_attribute_allowed or annotation is None: 1937 ↛ 1939line 1937 didn't jump to line 1939 because the condition on line 1937 was always true
1938 return
1939 raise ValueError(
1940 f'The annotation "{annotation}" cannot be used here. {error_context}'
1941 )