Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%
797 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-14 10:41 +0000
1import collections
2import dataclasses
3import typing
4from types import UnionType
5from typing import (
6 Any,
7 TypedDict,
8 get_type_hints,
9 Annotated,
10 get_args,
11 get_origin,
12 TypeVar,
13 Generic,
14 Optional,
15 cast,
16 Type,
17 Union,
18 List,
19 NotRequired,
20 Literal,
21 TYPE_CHECKING,
22)
23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container
26from debputy.manifest_parser.base_types import FileSystemMatchRule
27from debputy.manifest_parser.exceptions import (
28 ManifestParseException,
29)
30from debputy.manifest_parser.mapper_code import (
31 normalize_into_list,
32 wrap_into_list,
33 map_each_element,
34)
35from debputy.manifest_parser.parse_hints import (
36 ConditionalRequired,
37 DebputyParseHint,
38 TargetAttribute,
39 ManifestAttribute,
40 ConflictWithSourceAttribute,
41 NotPathHint,
42)
43from debputy.manifest_parser.parser_data import ParserContextData
44from debputy.manifest_parser.tagging_types import (
45 DebputyParsedContent,
46 DebputyDispatchableType,
47 TypeMapping,
48)
49from debputy.manifest_parser.util import (
50 AttributePath,
51 unpack_type,
52 find_annotation,
53 check_integration_mode,
54)
55from debputy.plugin.api.impl_types import (
56 DeclarativeInputParser,
57 TD,
58 ListWrappedDeclarativeInputParser,
59 DispatchingObjectParser,
60 DispatchingTableParser,
61 TTP,
62 TP,
63 InPackageContextParser,
64)
65from debputy.plugin.api.spec import (
66 ParserDocumentation,
67 DebputyIntegrationMode,
68 StandardParserAttributeDocumentation,
69 undocumented_attr,
70 ParserAttributeDocumentation,
71 reference_documentation,
72)
73from debputy.util import _info, _warn, assume_not_none
76if TYPE_CHECKING:
77 from debputy.lsp.diagnostics import LintSeverity
80try:
81 from Levenshtein import distance
83 _WARN_ONCE: bool | None = None
84except ImportError:
85 _WARN_ONCE = False
88def _detect_possible_typo(
89 key: str,
90 value: object,
91 manifest_attributes: Mapping[str, "AttributeDescription"],
92 path: "AttributePath",
93) -> None:
94 global _WARN_ONCE
95 if _WARN_ONCE == False:
96 _WARN_ONCE = True
97 _info(
98 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
99 )
100 elif _WARN_ONCE is None:
101 k_len = len(key)
102 key_path = path[key]
103 matches: list[str] = []
104 current_match_strength = 0
105 for acceptable_key, attr in manifest_attributes.items():
106 if abs(k_len - len(acceptable_key)) > 2:
107 continue
108 d = distance(key, acceptable_key)
109 if d > 2:
110 continue
111 try:
112 attr.type_validator.ensure_type(value, key_path)
113 except ManifestParseException:
114 if attr.type_validator.base_type_match(value):
115 match_strength = 1
116 else:
117 match_strength = 0
118 else:
119 match_strength = 2
121 if match_strength < current_match_strength:
122 continue
123 if match_strength > current_match_strength:
124 current_match_strength = match_strength
125 matches.clear()
126 matches.append(acceptable_key)
128 if not matches:
129 return
130 ref = f'at "{path.path}"' if path else "at the manifest root level"
131 if len(matches) == 1:
132 possible_match = repr(matches[0])
133 _warn(
134 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
135 )
136 else:
137 matches.sort()
138 possible_matches = ", ".join(repr(a) for a in matches)
139 _warn(
140 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
141 )
144SF = TypeVar("SF")
145T = TypeVar("T")
146S = TypeVar("S")
149_NONE_TYPE = type(None)
152# These must be able to appear in an "isinstance" check and must be builtin types.
153BASIC_SIMPLE_TYPES = {
154 str: "string",
155 int: "integer",
156 bool: "boolean",
157}
160class AttributeTypeHandler:
161 __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
163 def __init__(
164 self,
165 description: str,
166 ensure_type: Callable[[Any, AttributePath], None],
167 *,
168 base_type: type[Any] | None = None,
169 mapper: None | (
170 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
171 ) = None,
172 ) -> None:
173 self._description = description
174 self._ensure_type = ensure_type
175 self.base_type = base_type
176 self.mapper = mapper
178 def describe_type(self) -> str:
179 return self._description
181 def ensure_type(self, obj: object, path: AttributePath) -> None:
182 self._ensure_type(obj, path)
184 def base_type_match(self, obj: object) -> bool:
185 base_type = self.base_type
186 return base_type is not None and isinstance(obj, base_type)
188 def map_type(
189 self,
190 value: Any,
191 path: AttributePath,
192 parser_context: Optional["ParserContextData"],
193 ) -> Any:
194 mapper = self.mapper
195 if mapper is not None:
196 return mapper(value, path, parser_context)
197 return value
199 def combine_mapper(
200 self,
201 mapper: None | (
202 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
203 ),
204 ) -> "AttributeTypeHandler":
205 if mapper is None:
206 return self
207 _combined_mapper: Callable[
208 [Any, AttributePath, Optional["ParserContextData"]], Any
209 ]
210 if self.mapper is not None:
211 m = self.mapper
213 def _combined_mapper(
214 value: Any,
215 path: AttributePath,
216 parser_context: Optional["ParserContextData"],
217 ) -> Any:
218 return mapper(m(value, path, parser_context), path, parser_context)
220 else:
221 _combined_mapper = mapper
223 return AttributeTypeHandler(
224 self._description,
225 self._ensure_type,
226 base_type=self.base_type,
227 mapper=_combined_mapper,
228 )
231@dataclasses.dataclass(slots=True)
232class AttributeDescription:
233 source_attribute_name: str
234 target_attribute: str
235 attribute_type: Any
236 type_validator: AttributeTypeHandler
237 annotations: tuple[Any, ...]
238 conflicting_attributes: frozenset[str]
239 conditional_required: Optional["ConditionalRequired"]
240 parse_hints: Optional["DetectedDebputyParseHint"] = None
241 is_optional: bool = False
244def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
245 if attribute_path.path_hint is not None: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 return True
247 if isinstance(v, str):
248 attribute_path.path_hint = v
249 return True
250 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
251 attribute_path.path_hint = v[0]
252 return True
253 return False
256@dataclasses.dataclass(slots=True, frozen=True)
257class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
258 alt_form_parser: AttributeDescription
259 inline_reference_documentation: ParserDocumentation | None = None
260 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
262 def parse_input(
263 self,
264 value: object,
265 path: AttributePath,
266 *,
267 parser_context: Optional["ParserContextData"] = None,
268 ) -> TD:
269 check_integration_mode(
270 path,
271 parser_context,
272 self.expected_debputy_integration_mode,
273 )
274 if self.reference_documentation_url is not None:
275 doc_ref = f" (Documentation: {self.reference_documentation_url})"
276 else:
277 doc_ref = ""
279 alt_form_parser = self.alt_form_parser
280 if value is None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
282 if self.reference_documentation_url is not None:
283 doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
284 raise ManifestParseException(
285 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
286 )
287 _extract_path_hint(value, path)
288 alt_form_parser.type_validator.ensure_type(value, path)
289 attribute = alt_form_parser.target_attribute
290 alias_mapping = {
291 attribute: ("", None),
292 }
293 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
294 path.alias_mapping = alias_mapping
295 return cast("TD", {attribute: v})
298@dataclasses.dataclass(slots=True)
299class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
300 input_time_required_parameters: frozenset[str]
301 all_parameters: frozenset[str]
302 manifest_attributes: Mapping[str, "AttributeDescription"]
303 source_attributes: Mapping[str, "AttributeDescription"]
304 at_least_one_of: frozenset[frozenset[str]]
305 alt_form_parser: AttributeDescription | None
306 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset()
307 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None
308 inline_reference_documentation: ParserDocumentation | None = None
309 path_hint_source_attributes: Sequence[str] = tuple()
310 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
312 def _parse_alt_form(
313 self,
314 value: object,
315 path: AttributePath,
316 *,
317 parser_context: Optional["ParserContextData"] = None,
318 ) -> TD:
319 alt_form_parser = self.alt_form_parser
320 if alt_form_parser is None: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 raise ManifestParseException(
322 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}"
323 )
324 _extract_path_hint(value, path)
325 alt_form_parser.type_validator.ensure_type(value, path)
326 assert (
327 value is not None
328 ), "The alternative form was None, but the parser should have rejected None earlier."
329 attribute = alt_form_parser.target_attribute
330 alias_mapping = {
331 attribute: ("", None),
332 }
333 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
334 path.alias_mapping = alias_mapping
335 return cast("TD", {attribute: v})
337 def _validate_expected_keys(
338 self,
339 value: dict[Any, Any],
340 path: AttributePath,
341 *,
342 parser_context: Optional["ParserContextData"] = None,
343 ) -> None:
344 unknown_keys = value.keys() - self.all_parameters
345 doc_ref = self._doc_url_error_suffix()
346 if unknown_keys: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 for k in unknown_keys:
348 if isinstance(k, str):
349 _detect_possible_typo(k, value[k], self.manifest_attributes, path)
350 unused_keys = self.all_parameters - value.keys()
351 if unused_keys:
352 k = ", ".join(unused_keys)
353 raise ManifestParseException(
354 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}'
355 )
356 raise ManifestParseException(
357 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}'
358 )
359 missing_keys = self.input_time_required_parameters - value.keys()
360 if missing_keys:
361 required = ", ".join(repr(k) for k in sorted(missing_keys))
362 raise ManifestParseException(
363 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}"
364 )
365 for maybe_required in self.all_parameters - value.keys():
366 attr = self.manifest_attributes[maybe_required]
367 assert attr.conditional_required is None or parser_context is not None
368 if ( 368 ↛ 374line 368 didn't jump to line 374 because the condition on line 368 was never true
369 attr.conditional_required is not None
370 and attr.conditional_required.condition_applies(
371 assume_not_none(parser_context)
372 )
373 ):
374 reason = attr.conditional_required.reason
375 raise ManifestParseException(
376 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}'
377 )
378 for keyset in self.at_least_one_of:
379 matched_keys = value.keys() & keyset
380 if not matched_keys: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true
381 conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
382 raise ManifestParseException(
383 f"At least one of the following keys must be present at {path.path_container_lc}:"
384 f" {conditionally_required}{doc_ref}"
385 )
386 for group in self.mutually_exclusive_attributes:
387 matched = value.keys() & group
388 if len(matched) > 1: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true
389 ck = ", ".join(repr(k) for k in sorted(matched))
390 raise ManifestParseException(
391 f"Could not parse {path.path_container_lc}: The following attributes are"
392 f" mutually exclusive: {ck}{doc_ref}"
393 )
395 def _parse_typed_dict_form(
396 self,
397 value: dict[Any, Any],
398 path: AttributePath,
399 *,
400 parser_context: Optional["ParserContextData"] = None,
401 ) -> TD:
402 self._validate_expected_keys(value, path, parser_context=parser_context)
403 result = {}
404 per_attribute_conflicts = self._per_attribute_conflicts()
405 alias_mapping = {}
406 for path_hint_source_attributes in self.path_hint_source_attributes:
407 v = value.get(path_hint_source_attributes)
408 if v is not None and _extract_path_hint(v, path):
409 break
410 for k, v in value.items():
411 attr = self.manifest_attributes[k]
412 matched = value.keys() & per_attribute_conflicts[k]
413 if matched: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 ck = ", ".join(repr(k) for k in sorted(matched))
415 raise ManifestParseException(
416 f'The attribute "{k}" at {path.path} cannot be used with the following'
417 f" attributes: {ck}{self._doc_url_error_suffix()}"
418 )
419 nk = attr.target_attribute
420 key_path = path[k]
421 attr.type_validator.ensure_type(v, key_path)
422 if v is None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 continue
424 if k != nk:
425 alias_mapping[nk] = k, None
426 v = attr.type_validator.map_type(v, key_path, parser_context)
427 result[nk] = v
428 if alias_mapping:
429 path.alias_mapping = alias_mapping
430 return cast("TD", result)
432 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
433 doc_url = self.reference_documentation_url
434 if doc_url is not None:
435 if see_url_version: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 return f" Please see {doc_url} for the documentation."
437 return f" (Documentation: {doc_url})"
438 return ""
440 def parse_input(
441 self,
442 value: object,
443 path: AttributePath,
444 *,
445 parser_context: Optional["ParserContextData"] = None,
446 ) -> TD:
447 check_integration_mode(
448 path,
449 parser_context,
450 self.expected_debputy_integration_mode,
451 )
452 if value is None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 form_note = " The attribute must be a mapping."
454 if self.alt_form_parser is not None:
455 form_note = (
456 " The attribute can be a mapping or a non-mapping format"
457 ' (usually, "non-mapping format" means a string or a list of strings).'
458 )
459 doc_ref = self._doc_url_error_suffix(see_url_version=True)
460 raise ManifestParseException(
461 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
462 )
464 if not isinstance(value, dict):
465 return self._parse_alt_form(value, path, parser_context=parser_context)
466 return self._parse_typed_dict_form(value, path, parser_context=parser_context)
468 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]:
469 conflicts = self._per_attribute_conflicts_cache
470 if conflicts is not None:
471 return conflicts
472 attrs = self.source_attributes
473 conflicts = {
474 a.source_attribute_name: frozenset(
475 attrs[ca].source_attribute_name for ca in a.conflicting_attributes
476 )
477 for a in attrs.values()
478 }
479 self._per_attribute_conflicts_cache = conflicts
480 return self._per_attribute_conflicts_cache
483def _is_path_attribute_candidate(
484 source_attribute: AttributeDescription, target_attribute: AttributeDescription
485) -> bool:
486 if (
487 source_attribute.parse_hints
488 and not source_attribute.parse_hints.applicable_as_path_hint
489 ):
490 return False
491 target_type = target_attribute.attribute_type
492 _, origin, args = unpack_type(target_type, False)
493 match_type = target_type
494 if origin == list:
495 match_type = args[0]
496 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
499def is_typeddict(t: Any) -> bool:
500 return typing.is_typeddict(t) or (
501 # Logically, not is_typeddict(t) and is subclass(DebputyParsedContent)
502 # implies not is_typeddict(DebputyParsedContent)
503 # except that subclass *fails* for typeddicts.
504 not typing.is_typeddict(DebputyParsedContent)
505 and isinstance(t, type)
506 and issubclass(t, DebputyParsedContent)
507 )
510class ParserGenerator:
511 def __init__(self) -> None:
512 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {}
513 self._object_parsers: dict[str, DispatchingObjectParser] = {}
514 self._table_parsers: dict[
515 type[DebputyDispatchableType], DispatchingTableParser[Any]
516 ] = {}
517 self._in_package_context_parser: dict[str, Any] = {}
519 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None:
520 existing = self._registered_types.get(mapped_type.target_type)
521 if existing is not None: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 raise ValueError(f"The type {existing} is already registered")
523 self._registered_types[mapped_type.target_type] = mapped_type
525 def get_mapped_type_from_target_type(
526 self,
527 mapped_type: type[T],
528 ) -> TypeMapping[Any, T] | None:
529 return self._registered_types.get(mapped_type)
531 def discard_mapped_type(self, mapped_type: type[T]) -> None:
532 del self._registered_types[mapped_type]
534 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None:
535 assert rt not in self._table_parsers
536 self._table_parsers[rt] = DispatchingTableParser(rt, path)
538 def add_object_parser(
539 self,
540 path: str,
541 *,
542 parser_documentation: ParserDocumentation | None = None,
543 expected_debputy_integration_mode: None | (
544 Container[DebputyIntegrationMode]
545 ) = None,
546 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
547 allow_unknown_keys: bool = False,
548 ) -> DispatchingObjectParser:
549 assert path not in self._in_package_context_parser
550 assert path not in self._object_parsers
551 object_parser = DispatchingObjectParser(
552 path,
553 parser_documentation=parser_documentation,
554 expected_debputy_integration_mode=expected_debputy_integration_mode,
555 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity,
556 allow_unknown_keys=allow_unknown_keys,
557 )
558 self._object_parsers[path] = object_parser
559 return object_parser
561 def add_in_package_context_parser(
562 self,
563 path: str,
564 delegate: DeclarativeInputParser[Any],
565 ) -> None:
566 assert path not in self._in_package_context_parser
567 assert path not in self._object_parsers
568 self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
570 @property
571 def dispatchable_table_parsers(
572 self,
573 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]:
574 return self._table_parsers
576 @property
577 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
578 return self._object_parsers
580 def dispatch_parser_table_for(
581 self, rule_type: TTP
582 ) -> DispatchingTableParser[TP] | None:
583 return cast(
584 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
585 )
587 def generate_parser(
588 self,
589 parsed_content: type[TD],
590 *,
591 source_content: SF | None = None,
592 allow_optional: bool = False,
593 inline_reference_documentation: ParserDocumentation | None = None,
594 expected_debputy_integration_mode: None | (
595 Container[DebputyIntegrationMode]
596 ) = None,
597 automatic_docs: None | (
598 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]]
599 ) = None,
600 ) -> DeclarativeInputParser[TD]:
601 """Derive a parser from a TypedDict
603 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
604 or two that are used as a description.
606 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
607 their types. As an example:
609 >>> class InstallDocsRule(DebputyParsedContent):
610 ... sources: List[str]
611 ... into: List[str]
612 >>> pg = ParserGenerator()
613 >>> simple_parser = pg.generate_parser(InstallDocsRule)
615 This will create a parser that would be able to interpret something like:
617 ```yaml
618 install-docs:
619 sources: ["docs/*"]
620 into: ["my-pkg"]
621 ```
623 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore,
624 you can also provide a TypedDict describing the input, enabling more flexibility:
626 >>> class InstallDocsRule(DebputyParsedContent):
627 ... sources: List[str]
628 ... into: List[str]
629 >>> class InputDocsRuleInputFormat(TypedDict):
630 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
631 ... sources: NotRequired[List[str]]
632 ... into: Union[str, List[str]]
633 >>> pg = ParserGenerator()
634 >>> flexible_parser = pg.generate_parser(
635 ... InstallDocsRule,
636 ... source_content=InputDocsRuleInputFormat,
637 ... )
639 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
640 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
641 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str
642 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that
643 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
644 both the previous input but also the following input:
646 ```yaml
647 install-docs:
648 source: "docs/*"
649 into: "my-pkg"
650 ```
652 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
653 with a single string in them. As noted above, the name of the `source` attribute will also be normalized
654 while parsing.
656 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
657 as part of the input. Example:
659 >>> class DiscardRule(DebputyParsedContent):
660 ... paths: List[str]
661 >>> class DiscardRuleInputDictFormat(TypedDict):
662 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
663 ... paths: NotRequired[List[str]]
664 >>> # This format relies on DiscardRule having exactly one Required attribute
665 >>> DiscardRuleInputWithAltFormat = Union[
666 ... DiscardRuleInputDictFormat,
667 ... str,
668 ... List[str],
669 ... ]
670 >>> pg = ParserGenerator()
671 >>> flexible_parser = pg.generate_parser(
672 ... DiscardRule,
673 ... source_content=DiscardRuleInputWithAltFormat,
674 ... )
677 Supported types:
678 * `List` - must have a fixed type argument (such as `List[str]`)
679 * `str`
680 * `int`
681 * `BinaryPackage` - When provided (or required), the user must provide a package name listed
682 in the debian/control file. The code receives the BinaryPackage instance
683 matching that input.
684 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
685 format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
686 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
687 available statically on all Debian systems (must be in `base-passwd`).
688 The user has multiple options for how to specify it (either via name or id).
689 * `FileSystemGroup` - When provided (or required), the user must a file system group that is
690 available statically on all Debian systems (must be in `base-passwd`).
691 The user has multiple options for how to specify it (either via name or id).
692 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
693 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
694 provides the `debputy' default `when` parameter for conditionals.
696 Supported special type-like parameters:
698 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
699 outermost level. Cannot vary between `parsed_content` and `source_content`.
700 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
701 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
702 Automapping (see below) is restricted to two members in the Union.
704 Notable non-supported types:
705 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
706 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
708 Automatic mapping rules from `source_content` to `parsed_content`:
709 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
710 `lambda value: value if isinstance(value, list) else [value]`
711 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
713 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
714 for concrete features that may be useful to you.
716 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
717 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers).
718 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a
719 `List[TypedDict[...]]`.
720 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
721 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
722 the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
723 Note you should never pass the parsed_content as source_content directly.
724 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
725 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
726 can keep this False).
727 :param inline_reference_documentation: Optionally, programmatic documentation
728 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the
729 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage.
730 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately
731 (resulting in a "compile time" failure rather than a "runtime" failure).
732 :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
733 """
734 orig_parsed_content = parsed_content
735 if source_content is parsed_content: 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true
736 raise ValueError(
737 "Do not provide source_content if it is the same as parsed_content"
738 )
739 is_list_wrapped = False
740 if get_origin(orig_parsed_content) == list:
741 parsed_content = get_args(orig_parsed_content)[0]
742 is_list_wrapped = True
744 if isinstance(parsed_content, type) and issubclass(
745 parsed_content, DebputyDispatchableType
746 ):
747 parser = self.dispatch_parser_table_for(parsed_content)
748 if parser is None: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 raise ValueError(
750 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
751 f" The class {parsed_content.__qualname__} is not a pre-registered type."
752 )
753 # FIXME: Only the list wrapped version has documentation.
754 if is_list_wrapped: 754 ↛ 760line 754 didn't jump to line 760 because the condition on line 754 was always true
755 parser = ListWrappedDeclarativeInputParser(
756 parser,
757 inline_reference_documentation=inline_reference_documentation,
758 expected_debputy_integration_mode=expected_debputy_integration_mode,
759 )
760 return parser
762 if not is_typeddict(parsed_content): 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true
763 raise ValueError(
764 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
765 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
766 )
767 if is_list_wrapped and source_content is not None:
768 if get_origin(source_content) != list: 768 ↛ 769line 768 didn't jump to line 769 because the condition on line 768 was never true
769 raise ValueError(
770 "If the parsed_content is a List type, then source_format must be a List type as well."
771 )
772 source_content = get_args(source_content)[0]
774 target_attributes = self._parse_types(
775 parsed_content,
776 allow_source_attribute_annotations=source_content is None,
777 forbid_optional=not allow_optional,
778 )
779 required_target_parameters = frozenset(parsed_content.__required_keys__)
780 parsed_alt_form = None
781 non_mapping_source_only = False
783 if source_content is not None:
784 default_target_attribute = None
785 if len(required_target_parameters) == 1:
786 default_target_attribute = next(iter(required_target_parameters))
788 source_typed_dict, alt_source_forms = _extract_typed_dict(
789 source_content,
790 default_target_attribute,
791 )
792 if alt_source_forms:
793 parsed_alt_form = self._parse_alt_form(
794 alt_source_forms,
795 default_target_attribute,
796 )
797 if source_typed_dict is not None:
798 source_content_attributes = self._parse_types(
799 source_typed_dict,
800 allow_target_attribute_annotation=True,
801 allow_source_attribute_annotations=True,
802 forbid_optional=not allow_optional,
803 )
804 source_content_parameter = "source_content"
805 source_and_parsed_differs = True
806 else:
807 source_typed_dict = parsed_content
808 source_content_attributes = target_attributes
809 source_content_parameter = "parsed_content"
810 source_and_parsed_differs = True
811 non_mapping_source_only = True
812 else:
813 source_typed_dict = parsed_content
814 source_content_attributes = target_attributes
815 source_content_parameter = "parsed_content"
816 source_and_parsed_differs = False
818 sources = collections.defaultdict(set)
819 seen_targets = set()
820 seen_source_names: dict[str, str] = {}
821 source_attributes: dict[str, AttributeDescription] = {}
822 path_hint_source_attributes = []
824 for k in source_content_attributes:
825 ia = source_content_attributes[k]
827 ta = (
828 target_attributes.get(ia.target_attribute)
829 if source_and_parsed_differs
830 else ia
831 )
832 if ta is None: 832 ↛ 834line 832 didn't jump to line 834 because the condition on line 832 was never true
833 # Error message would be wrong if this assertion is false.
834 assert source_and_parsed_differs
835 raise ValueError(
836 f'The attribute "{k}" from the "source_content" parameter should have mapped'
837 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
838 )
839 if _is_path_attribute_candidate(ia, ta):
840 path_hint_source_attributes.append(ia.source_attribute_name)
841 existing_source_name = seen_source_names.get(ia.source_attribute_name)
842 if existing_source_name: 842 ↛ 843line 842 didn't jump to line 843 because the condition on line 842 was never true
843 raise ValueError(
844 f'The attribute "{k}" and "{existing_source_name}" both share the source name'
845 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
846 f' so only one attribute use "{ia.source_attribute_name}".'
847 )
848 seen_source_names[ia.source_attribute_name] = k
849 seen_targets.add(ta.target_attribute)
850 sources[ia.target_attribute].add(k)
851 if source_and_parsed_differs:
852 bridge_mapper = self._type_normalize(
853 k, ia.attribute_type, ta.attribute_type, False
854 )
855 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
856 source_attributes[k] = ia
858 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]:
859 return frozenset(
860 source_content_attributes[a].source_attribute_name for a in td_name
861 )
863 _check_attributes(
864 parsed_content,
865 source_typed_dict,
866 source_content_attributes,
867 sources,
868 )
870 at_least_one_of = frozenset(
871 _as_attr_names(g)
872 for k, g in sources.items()
873 if len(g) > 1 and k in required_target_parameters
874 )
876 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 missing = ", ".join(
878 repr(k) for k in (target_attributes.keys() - seen_targets)
879 )
880 raise ValueError(
881 'The following attributes in "parsed_content" did not have a source field in "source_content":'
882 f" {missing}"
883 )
884 all_mutually_exclusive_fields = frozenset(
885 _as_attr_names(g) for g in sources.values() if len(g) > 1
886 )
888 all_parameters = (
889 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
890 )
891 _check_conflicts(
892 source_content_attributes,
893 source_typed_dict.__required_keys__,
894 all_parameters,
895 )
897 manifest_attributes = {
898 a.source_attribute_name: a for a in source_content_attributes.values()
899 }
901 if parsed_alt_form is not None:
902 target_attribute = parsed_alt_form.target_attribute
903 if ( 903 ↛ 908line 903 didn't jump to line 908 because the condition on line 903 was never true
904 target_attribute not in required_target_parameters
905 and required_target_parameters
906 or len(required_target_parameters) > 1
907 ):
908 raise NotImplementedError(
909 "When using alternative source formats (Union[TypedDict, ...]), then the"
910 " target must have at most one require parameter"
911 )
912 bridge_mapper = self._type_normalize(
913 target_attribute,
914 parsed_alt_form.attribute_type,
915 target_attributes[target_attribute].attribute_type,
916 False,
917 )
918 parsed_alt_form.type_validator = (
919 parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
920 )
922 inline_reference_documentation = (
923 _verify_and_auto_correct_inline_reference_documentation(
924 parsed_content,
925 source_typed_dict,
926 source_content_attributes,
927 inline_reference_documentation,
928 parsed_alt_form is not None,
929 automatic_docs,
930 )
931 )
932 if non_mapping_source_only:
933 parser = DeclarativeNonMappingInputParser(
934 assume_not_none(parsed_alt_form),
935 inline_reference_documentation=inline_reference_documentation,
936 expected_debputy_integration_mode=expected_debputy_integration_mode,
937 )
938 else:
939 parser = DeclarativeMappingInputParser(
940 _as_attr_names(source_typed_dict.__required_keys__),
941 _as_attr_names(all_parameters),
942 manifest_attributes,
943 source_attributes,
944 mutually_exclusive_attributes=all_mutually_exclusive_fields,
945 alt_form_parser=parsed_alt_form,
946 at_least_one_of=at_least_one_of,
947 inline_reference_documentation=inline_reference_documentation,
948 path_hint_source_attributes=tuple(path_hint_source_attributes),
949 expected_debputy_integration_mode=expected_debputy_integration_mode,
950 )
951 if is_list_wrapped:
952 parser = ListWrappedDeclarativeInputParser(
953 parser,
954 expected_debputy_integration_mode=expected_debputy_integration_mode,
955 )
956 return parser
958 def _as_type_validator(
959 self,
960 attribute: str,
961 provided_type: Any,
962 parsing_typed_dict_attribute: bool,
963 ) -> AttributeTypeHandler:
964 assert not isinstance(provided_type, tuple)
966 if isinstance(provided_type, type) and issubclass(
967 provided_type, DebputyDispatchableType
968 ):
969 return _dispatch_parser(provided_type)
971 unmapped_type = self._strip_mapped_types(
972 provided_type,
973 parsing_typed_dict_attribute,
974 )
975 type_normalizer = self._type_normalize(
976 attribute,
977 unmapped_type,
978 provided_type,
979 parsing_typed_dict_attribute,
980 )
981 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type(
982 unmapped_type,
983 parsing_typed_dict_attribute,
984 )
985 _, t_provided_orig, t_provided_args = unpack_type(
986 provided_type,
987 parsing_typed_dict_attribute,
988 )
990 if ( 990 ↛ 996line 990 didn't jump to line 996 because the condition on line 990 was never true
991 t_unmapped_orig == Union
992 and t_unmapped_args
993 and len(t_unmapped_args) == 2
994 and any(v is _NONE_TYPE for v in t_unmapped_args)
995 ):
996 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
997 actual_type = [a for a in args if a is not _NONE_TYPE][0]
998 validator = self._as_type_validator(
999 attribute, actual_type, parsing_typed_dict_attribute
1000 )
1002 def _validator(v: Any, path: AttributePath) -> None:
1003 if v is None:
1004 return
1005 validator.ensure_type(v, path)
1007 return AttributeTypeHandler(
1008 validator.describe_type(),
1009 _validator,
1010 base_type=validator.base_type,
1011 mapper=type_normalizer,
1012 )
1014 if unmapped_type in BASIC_SIMPLE_TYPES:
1015 type_name = BASIC_SIMPLE_TYPES[unmapped_type]
1017 type_mapping = self._registered_types.get(provided_type)
1018 if type_mapping is not None:
1019 simple_type = f" ({type_name})"
1020 type_name = type_mapping.target_type.__name__
1021 else:
1022 simple_type = ""
1024 def _validator(v: Any, path: AttributePath) -> None:
1025 if not isinstance(v, unmapped_type):
1026 _validation_type_error(
1027 path, f"The attribute must be a {type_name}{simple_type}"
1028 )
1030 return AttributeTypeHandler(
1031 type_name,
1032 _validator,
1033 base_type=unmapped_type,
1034 mapper=type_normalizer,
1035 )
1036 if t_unmapped_orig == list:
1037 if not t_unmapped_args: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true
1038 raise ValueError(
1039 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
1040 )
1042 genetic_type = t_unmapped_args[0]
1043 key_mapper = self._as_type_validator(
1044 attribute,
1045 genetic_type,
1046 parsing_typed_dict_attribute,
1047 )
1049 def _validator(v: Any, path: AttributePath) -> None:
1050 if not isinstance(v, list): 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true
1051 _validation_type_error(path, "The attribute must be a list")
1052 for i, list_item in enumerate(v):
1053 key_mapper.ensure_type(list_item, path[i])
1055 list_mapper = (
1056 map_each_element(key_mapper.mapper)
1057 if key_mapper.mapper is not None
1058 else None
1059 )
1061 return AttributeTypeHandler(
1062 f"List of {key_mapper.describe_type()}",
1063 _validator,
1064 base_type=list,
1065 mapper=type_normalizer,
1066 ).combine_mapper(list_mapper)
1067 if is_typeddict(provided_type):
1068 subparser = self.generate_parser(cast("Type[TD]", provided_type))
1069 return AttributeTypeHandler(
1070 description=f"{provided_type.__name__} (Typed Mapping)",
1071 ensure_type=lambda v, ap: None,
1072 base_type=dict,
1073 mapper=lambda v, ap, cv: subparser.parse_input(
1074 v, ap, parser_context=cv
1075 ),
1076 )
1077 if t_unmapped_orig == dict:
1078 if not t_unmapped_args or len(t_unmapped_args) != 2: 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true
1079 raise ValueError(
1080 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
1081 )
1082 if t_unmapped_args[0] != str: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true
1083 raise ValueError(
1084 f'The attribute "{attribute}" is Dict and has a non-str type as key.'
1085 " Currently, only `str` is supported (Dict[str, Y])"
1086 )
1087 key_mapper = self._as_type_validator(
1088 attribute,
1089 t_unmapped_args[0],
1090 parsing_typed_dict_attribute,
1091 )
1092 value_mapper = self._as_type_validator(
1093 attribute,
1094 t_unmapped_args[1],
1095 parsing_typed_dict_attribute,
1096 )
1098 if key_mapper.base_type is None: 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true
1099 raise ValueError(
1100 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
1101 f" without trivial base types (such as `str`) are not supported at the moment."
1102 )
1104 if value_mapper.mapper is not None: 1104 ↛ 1105line 1104 didn't jump to line 1105 because the condition on line 1104 was never true
1105 raise ValueError(
1106 f'The attribute "{attribute}" is Dict and the value requires mapping.'
1107 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
1108 " Better typing may come later"
1109 )
1111 def _validator(v: Any, path: AttributePath) -> None:
1112 if not isinstance(v, dict): 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true
1113 _validation_type_error(path, "The attribute must be a mapping")
1114 key_name = "the first key in the mapping"
1115 for i, (k, value) in enumerate(v.items()):
1116 if not key_mapper.base_type_match(k): 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true
1117 kp = path.copy_with_path_hint(key_name)
1118 _validation_type_error(
1119 kp,
1120 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
1121 )
1122 key_name = f"the key after {k}"
1123 value_mapper.ensure_type(value, path[k])
1125 return AttributeTypeHandler(
1126 f"Mapping of {value_mapper.describe_type()}",
1127 _validator,
1128 base_type=dict,
1129 mapper=type_normalizer,
1130 ).combine_mapper(key_mapper.mapper)
1131 if t_unmapped_orig in (Union, UnionType):
1132 if _is_two_arg_x_list_x(t_provided_args):
1133 # Force the order to be "X, List[X]" as it simplifies the code
1134 x_list_x = (
1135 t_provided_args
1136 if get_origin(t_provided_args[1]) == list
1137 else (t_provided_args[1], t_provided_args[0])
1138 )
1140 # X, List[X] could match if X was List[Y]. However, our code below assumes
1141 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
1142 # case to avoid this assert and fall into the "generic case".
1143 assert get_origin(x_list_x[0]) != list
1144 x_subtype_checker = self._as_type_validator(
1145 attribute,
1146 x_list_x[0],
1147 parsing_typed_dict_attribute,
1148 )
1149 list_x_subtype_checker = self._as_type_validator(
1150 attribute,
1151 x_list_x[1],
1152 parsing_typed_dict_attribute,
1153 )
1154 type_description = x_subtype_checker.describe_type()
1155 type_description = f"{type_description} or a list of {type_description}"
1157 def _validator(v: Any, path: AttributePath) -> None:
1158 if isinstance(v, list):
1159 list_x_subtype_checker.ensure_type(v, path)
1160 else:
1161 x_subtype_checker.ensure_type(v, path)
1163 return AttributeTypeHandler(
1164 type_description,
1165 _validator,
1166 mapper=type_normalizer,
1167 )
1168 else:
1169 subtype_checker = [
1170 self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
1171 for a in t_unmapped_args
1172 ]
1173 type_description = "one-of: " + ", ".join(
1174 f"{sc.describe_type()}" for sc in subtype_checker
1175 )
1176 mapper = subtype_checker[0].mapper
1177 if any(mapper != sc.mapper for sc in subtype_checker): 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true
1178 raise ValueError(
1179 f'Cannot handle the union "{provided_type}" as the target types need different'
1180 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
1181 " where X is a non-collection type."
1182 )
1184 def _validator(v: Any, path: AttributePath) -> None:
1185 partial_matches = []
1186 for sc in subtype_checker: 1186 ↛ 1194line 1186 didn't jump to line 1194 because the loop on line 1186 didn't complete
1187 try:
1188 sc.ensure_type(v, path)
1189 return
1190 except ManifestParseException as e:
1191 if sc.base_type_match(v): 1191 ↛ 1192line 1191 didn't jump to line 1192 because the condition on line 1191 was never true
1192 partial_matches.append((sc, e))
1194 if len(partial_matches) == 1:
1195 raise partial_matches[0][1]
1196 _validation_type_error(
1197 path, f"Could not match against: {type_description}"
1198 )
1200 return AttributeTypeHandler(
1201 type_description,
1202 _validator,
1203 mapper=type_normalizer,
1204 )
1205 if t_unmapped_orig == Literal:
1206 # We want "x" for string values; repr provides 'x'
1207 pretty = ", ".join(
1208 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args
1209 )
1211 def _validator(v: Any, path: AttributePath) -> None:
1212 if v not in t_unmapped_args:
1213 value_hint = ""
1214 if isinstance(v, str): 1214 ↛ 1216line 1214 didn't jump to line 1216 because the condition on line 1214 was always true
1215 value_hint = f"({v}) "
1216 _validation_type_error(
1217 path,
1218 f"Value {value_hint}must be one of the following literal values: {pretty}",
1219 )
1221 return AttributeTypeHandler(
1222 f"One of the following literal values: {pretty}",
1223 _validator,
1224 )
1226 if provided_type == Any: 1226 ↛ 1231line 1226 didn't jump to line 1231 because the condition on line 1226 was always true
1227 return AttributeTypeHandler(
1228 "any (unvalidated)",
1229 lambda *a: None,
1230 )
1231 raise ValueError(
1232 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
1233 )
1235 def _parse_types(
1236 self,
1237 spec: type[TypedDict],
1238 allow_target_attribute_annotation: bool = False,
1239 allow_source_attribute_annotations: bool = False,
1240 forbid_optional: bool = True,
1241 ) -> dict[str, AttributeDescription]:
1242 annotations = get_type_hints(spec, include_extras=True)
1243 return {
1244 k: self._attribute_description(
1245 k,
1246 t,
1247 k in spec.__required_keys__,
1248 allow_target_attribute_annotation=allow_target_attribute_annotation,
1249 allow_source_attribute_annotations=allow_source_attribute_annotations,
1250 forbid_optional=forbid_optional,
1251 )
1252 for k, t in annotations.items()
1253 }
1255 def _attribute_description(
1256 self,
1257 attribute: str,
1258 orig_td: Any,
1259 is_required: bool,
1260 forbid_optional: bool = True,
1261 allow_target_attribute_annotation: bool = False,
1262 allow_source_attribute_annotations: bool = False,
1263 ) -> AttributeDescription:
1264 td, anno, is_optional = _parse_type(
1265 attribute, orig_td, forbid_optional=forbid_optional
1266 )
1267 type_validator = self._as_type_validator(attribute, td, True)
1268 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1269 anno,
1270 f' Seen with attribute "{attribute}".',
1271 attribute,
1272 is_required,
1273 allow_target_attribute_annotation=allow_target_attribute_annotation,
1274 allow_source_attribute_annotations=allow_source_attribute_annotations,
1275 )
1276 return AttributeDescription(
1277 target_attribute=parsed_annotations.target_attribute,
1278 attribute_type=td,
1279 type_validator=type_validator,
1280 annotations=anno,
1281 is_optional=is_optional,
1282 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1283 conditional_required=parsed_annotations.conditional_required,
1284 source_attribute_name=assume_not_none(
1285 parsed_annotations.source_manifest_attribute
1286 ),
1287 parse_hints=parsed_annotations,
1288 )
1290 def _parse_alt_form(
1291 self,
1292 alt_form,
1293 default_target_attribute: str | None,
1294 ) -> AttributeDescription:
1295 td, anno, is_optional = _parse_type(
1296 "source_format alternative form",
1297 alt_form,
1298 forbid_optional=True,
1299 parsing_typed_dict_attribute=False,
1300 )
1301 type_validator = self._as_type_validator(
1302 "source_format alternative form",
1303 td,
1304 True,
1305 )
1306 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1307 anno,
1308 " The alternative for source_format.",
1309 None,
1310 False,
1311 default_target_attribute=default_target_attribute,
1312 allow_target_attribute_annotation=True,
1313 allow_source_attribute_annotations=False,
1314 )
1315 return AttributeDescription(
1316 target_attribute=parsed_annotations.target_attribute,
1317 attribute_type=td,
1318 type_validator=type_validator,
1319 annotations=anno,
1320 is_optional=is_optional,
1321 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1322 conditional_required=parsed_annotations.conditional_required,
1323 source_attribute_name="Alt form of the source_format",
1324 )
1326 def _union_narrowing(
1327 self,
1328 input_type: Any,
1329 target_type: Any,
1330 parsing_typed_dict_attribute: bool,
1331 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1332 _, input_orig, input_args = unpack_type(
1333 input_type, parsing_typed_dict_attribute
1334 )
1335 _, target_orig, target_args = unpack_type(
1336 target_type, parsing_typed_dict_attribute
1337 )
1339 if input_orig not in (Union, UnionType) or not input_args: 1339 ↛ 1340line 1339 didn't jump to line 1340 because the condition on line 1339 was never true
1340 raise ValueError("input_type must be a Union[...] with non-empty args")
1342 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
1343 # - Where X = Y or there is a simple standard transformation from X to Y.
1345 if target_orig not in (Union, UnionType, list) or not target_args:
1346 # Not supported
1347 return None
1349 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1349 ↛ 1351line 1349 didn't jump to line 1351 because the condition on line 1349 was never true
1350 # Not needed (identity mapping)
1351 return None
1353 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1353 ↛ 1355line 1353 didn't jump to line 1355 because the condition on line 1353 was never true
1354 # Not supported
1355 return None
1357 target_arg = target_args[0]
1358 simplified_type = self._strip_mapped_types(
1359 target_arg, parsing_typed_dict_attribute
1360 )
1361 acceptable_types = {
1362 target_arg,
1363 list[target_arg], # type: ignore
1364 List[target_arg], # type: ignore
1365 simplified_type,
1366 list[simplified_type], # type: ignore
1367 List[simplified_type], # type: ignore
1368 }
1369 target_format = (
1370 target_arg,
1371 list[target_arg], # type: ignore
1372 List[target_arg], # type: ignore
1373 )
1374 in_target_format = 0
1375 in_simple_format = 0
1376 for input_arg in input_args:
1377 if input_arg not in acceptable_types: 1377 ↛ 1379line 1377 didn't jump to line 1379 because the condition on line 1377 was never true
1378 # Not supported
1379 return None
1380 if input_arg in target_format:
1381 in_target_format += 1
1382 else:
1383 in_simple_format += 1
1385 assert in_simple_format or in_target_format
1387 if in_target_format and not in_simple_format:
1388 # Union[X, List[X]] -> List[X]
1389 return normalize_into_list
1390 mapped = self._registered_types[target_arg]
1391 if not in_target_format and in_simple_format: 1391 ↛ 1406line 1391 didn't jump to line 1406 because the condition on line 1391 was always true
1392 # Union[X, List[X]] -> List[Y]
1394 def _mapper_x_list_y(
1395 x: Any | list[Any],
1396 ap: AttributePath,
1397 pc: Optional["ParserContextData"],
1398 ) -> list[Any]:
1399 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1401 return [mapped.mapper(x, ap, pc) for x in in_list_form]
1403 return _mapper_x_list_y
1405 # Union[Y, List[X]] -> List[Y]
1406 if not isinstance(target_arg, type):
1407 raise ValueError(
1408 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
1409 f" not support mixed types. Please use either {simplified_type} or {target_arg}"
1410 f" in the source content (but both a mix of both)"
1411 )
1413 def _mapper_mixed_list_y(
1414 x: Any | list[Any],
1415 ap: AttributePath,
1416 pc: Optional["ParserContextData"],
1417 ) -> list[Any]:
1418 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1420 return [
1421 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
1422 for x in in_list_form
1423 ]
1425 return _mapper_mixed_list_y
1427 def _type_normalize(
1428 self,
1429 attribute: str,
1430 input_type: Any,
1431 target_type: Any,
1432 parsing_typed_dict_attribute: bool,
1433 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1434 if input_type == target_type:
1435 return None
1436 _, input_orig, input_args = unpack_type(
1437 input_type, parsing_typed_dict_attribute
1438 )
1439 _, target_orig, target_args = unpack_type(
1440 target_type,
1441 parsing_typed_dict_attribute,
1442 )
1443 if input_orig in (Union, UnionType):
1444 result = self._union_narrowing(
1445 input_type, target_type, parsing_typed_dict_attribute
1446 )
1447 if result:
1448 return result
1449 elif target_orig == list and target_args[0] == input_type:
1450 return wrap_into_list
1452 mapped = self._registered_types.get(target_type)
1453 if mapped is not None and input_type == mapped.source_type:
1454 # Source -> Target
1455 return mapped.mapper
1456 if target_orig == list and target_args: 1456 ↛ 1474line 1456 didn't jump to line 1474 because the condition on line 1456 was always true
1457 mapped = self._registered_types.get(target_args[0])
1458 if mapped is not None: 1458 ↛ 1474line 1458 didn't jump to line 1474 because the condition on line 1458 was always true
1459 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
1460 mapped_type: TypeMapping = mapped
1461 if input_type == mapped.source_type: 1461 ↛ 1463line 1461 didn't jump to line 1463 because the condition on line 1461 was never true
1462 # Source -> List[Target]
1463 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
1464 if ( 1464 ↛ 1474line 1464 didn't jump to line 1474 because the condition on line 1464 was always true
1465 input_orig == list
1466 and input_args
1467 and input_args[0] == mapped_type.source_type
1468 ):
1469 # List[Source] -> List[Target]
1470 return lambda xs, ap, pc: [
1471 mapped_type.mapper(x, ap, pc) for x in xs
1472 ]
1474 raise ValueError(
1475 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
1476 f" {input_type} to {target_type}"
1477 )
1479 def _strip_mapped_types(
1480 self, orig_td: Any, parsing_typed_dict_attribute: bool
1481 ) -> Any:
1482 m = self._registered_types.get(orig_td)
1483 if m is not None:
1484 return m.source_type
1485 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1486 if v == list:
1487 arg = args[0]
1488 m = self._registered_types.get(arg)
1489 if m:
1490 return list[m.source_type] # type: ignore
1491 if v in (Union, UnionType):
1492 stripped_args = tuple(
1493 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
1494 )
1495 if stripped_args != args:
1496 return Union[stripped_args]
1497 return orig_td
1500def _sort_key(attr: StandardParserAttributeDocumentation) -> Any:
1501 key = next(iter(attr.attributes))
1502 return attr.sort_category, key
1505def _apply_std_docs(
1506 std_doc_table: (
1507 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1508 ),
1509 source_format_typed_dict: type[Any],
1510 attribute_docs: Sequence[ParserAttributeDocumentation] | None,
1511) -> Sequence[ParserAttributeDocumentation] | None:
1512 if std_doc_table is None or not std_doc_table: 1512 ↛ 1515line 1512 didn't jump to line 1515 because the condition on line 1512 was always true
1513 return attribute_docs
1515 has_docs_for = set()
1516 if attribute_docs:
1517 for attribute_doc in attribute_docs:
1518 has_docs_for.update(attribute_doc.attributes)
1520 base_seen = set()
1521 std_docs_used = []
1523 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", []))
1524 base_seen.update(remaining_bases)
1525 while remaining_bases:
1526 base = remaining_bases.pop()
1527 new_bases_to_check = {
1528 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen
1529 }
1530 remaining_bases.update(new_bases_to_check)
1531 base_seen.update(new_bases_to_check)
1532 std_docs = std_doc_table.get(base)
1533 if std_docs:
1534 for std_doc in std_docs:
1535 if any(a in has_docs_for for a in std_doc.attributes):
1536 # If there is any overlap, do not add the docs
1537 continue
1538 has_docs_for.update(std_doc.attributes)
1539 std_docs_used.append(std_doc)
1541 if not std_docs_used:
1542 return attribute_docs
1543 docs = sorted(std_docs_used, key=_sort_key)
1544 if attribute_docs:
1545 # Plugin provided attributes first
1546 c = list(attribute_docs)
1547 c.extend(docs)
1548 docs = c
1549 return tuple(docs)
1552def _verify_and_auto_correct_inline_reference_documentation(
1553 parsed_content: type[TD],
1554 source_typed_dict: type[Any],
1555 source_content_attributes: Mapping[str, AttributeDescription],
1556 inline_reference_documentation: ParserDocumentation | None,
1557 has_alt_form: bool,
1558 automatic_docs: (
1559 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1560 ) = None,
1561) -> ParserDocumentation | None:
1562 orig_attribute_docs = (
1563 inline_reference_documentation.attribute_doc
1564 if inline_reference_documentation
1565 else None
1566 )
1567 attribute_docs = _apply_std_docs(
1568 automatic_docs,
1569 source_typed_dict,
1570 orig_attribute_docs,
1571 )
1572 if inline_reference_documentation is None and attribute_docs is None:
1573 return None
1574 changes = {}
1575 if attribute_docs:
1576 seen = set()
1577 had_any_custom_docs = False
1578 for attr_doc in attribute_docs:
1579 if not isinstance(attr_doc, StandardParserAttributeDocumentation):
1580 had_any_custom_docs = True
1581 for attr_name in attr_doc.attributes:
1582 attr = source_content_attributes.get(attr_name)
1583 if attr is None: 1583 ↛ 1584line 1583 didn't jump to line 1584 because the condition on line 1583 was never true
1584 raise ValueError(
1585 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1586 f' references an attribute "{attr_name}", which does not exist in the source format.'
1587 )
1588 if attr_name in seen: 1588 ↛ 1589line 1588 didn't jump to line 1589 because the condition on line 1588 was never true
1589 raise ValueError(
1590 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1591 f' has documentation for "{attr_name}" twice, which is not supported.'
1592 f" Please document it at most once"
1593 )
1594 seen.add(attr_name)
1595 undocumented = source_content_attributes.keys() - seen
1596 if undocumented: 1596 ↛ 1597line 1596 didn't jump to line 1597 because the condition on line 1596 was never true
1597 if had_any_custom_docs:
1598 undocumented_attrs = ", ".join(undocumented)
1599 raise ValueError(
1600 f"The following attributes were not documented for the source format of"
1601 f" {parsed_content.__qualname__}. If this is deliberate, then please"
1602 ' declare each them as undocumented (via undocumented_attr("foo")):'
1603 f" {undocumented_attrs}"
1604 )
1605 combined_docs = list(attribute_docs)
1606 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented))
1607 attribute_docs = combined_docs
1609 if attribute_docs and orig_attribute_docs != attribute_docs: 1609 ↛ 1610line 1609 didn't jump to line 1610 because the condition on line 1609 was never true
1610 assert attribute_docs is not None
1611 changes["attribute_doc"] = tuple(attribute_docs)
1613 if ( 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was never true
1614 inline_reference_documentation is not None
1615 and inline_reference_documentation.alt_parser_description
1616 and not has_alt_form
1617 ):
1618 raise ValueError(
1619 "The inline_reference_documentation had documentation for an non-mapping format,"
1620 " but the source format does not have a non-mapping format."
1621 )
1622 if changes: 1622 ↛ 1623line 1622 didn't jump to line 1623 because the condition on line 1622 was never true
1623 if inline_reference_documentation is None:
1624 inline_reference_documentation = reference_documentation()
1625 return inline_reference_documentation.replace(**changes)
1626 return inline_reference_documentation
1629def _check_conflicts(
1630 input_content_attributes: dict[str, AttributeDescription],
1631 required_attributes: frozenset[str],
1632 all_attributes: frozenset[str],
1633) -> None:
1634 for attr_name, attr in input_content_attributes.items():
1635 if attr_name in required_attributes and attr.conflicting_attributes: 1635 ↛ 1636line 1635 didn't jump to line 1636 because the condition on line 1635 was never true
1636 c = ", ".join(repr(a) for a in attr.conflicting_attributes)
1637 raise ValueError(
1638 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
1639 " This makes it impossible to use these attributes. Either remove the attributes"
1640 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
1641 " optional (NotRequired)"
1642 )
1643 else:
1644 required_conflicts = attr.conflicting_attributes & required_attributes
1645 if required_conflicts: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true
1646 c = ", ".join(repr(a) for a in required_conflicts)
1647 raise ValueError(
1648 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
1649 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
1650 f" adjust the conflicts or make the listed attributes optional (NotRequired)"
1651 )
1652 unknown_attributes = attr.conflicting_attributes - all_attributes
1653 if unknown_attributes: 1653 ↛ 1654line 1653 didn't jump to line 1654 because the condition on line 1653 was never true
1654 c = ", ".join(repr(a) for a in unknown_attributes)
1655 raise ValueError(
1656 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
1657 f" None of these attributes were declared in the input."
1658 )
1661def _check_attributes(
1662 content: type[TypedDict],
1663 input_content: type[TypedDict],
1664 input_content_attributes: dict[str, AttributeDescription],
1665 sources: Mapping[str, Collection[str]],
1666) -> None:
1667 target_required_keys = content.__required_keys__
1668 input_required_keys = input_content.__required_keys__
1669 all_input_keys = input_required_keys | input_content.__optional_keys__
1671 for input_name in all_input_keys:
1672 attr = input_content_attributes[input_name]
1673 target_name = attr.target_attribute
1674 source_names = sources[target_name]
1675 input_is_required = input_name in input_required_keys
1676 target_is_required = target_name in target_required_keys
1678 assert source_names
1680 if input_is_required and len(source_names) > 1: 1680 ↛ 1681line 1680 didn't jump to line 1681 because the condition on line 1680 was never true
1681 raise ValueError(
1682 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
1683 f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
1684 f' then there is no need for additional sources for "{target_name}". Alternatively,'
1685 f' "{input_name}" might be missing a NotRequired type'
1686 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
1687 )
1688 if not input_is_required and target_is_required and len(source_names) == 1: 1688 ↛ 1689line 1688 didn't jump to line 1689 because the condition on line 1688 was never true
1689 raise ValueError(
1690 f'The source attribute "{input_name}" is not marked as required and maps to'
1691 f' "{target_name}", which is marked as required. As there are no other attributes'
1692 f' mapping to "{target_name}", then "{input_name}" must be required as well'
1693 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
1694 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
1695 )
1698def _validation_type_error(path: AttributePath, message: str) -> None:
1699 raise ManifestParseException(
1700 f'The attribute "{path.path}" did not have a valid structure/type: {message}'
1701 )
1704def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool:
1705 if len(t_args) != 2:
1706 return False
1707 lhs, rhs = t_args
1708 if get_origin(lhs) == list:
1709 if get_origin(rhs) == list: 1709 ↛ 1712line 1709 didn't jump to line 1712 because the condition on line 1709 was never true
1710 # It could still match X, List[X] - but we do not allow this case for now as the caller
1711 # does not support it.
1712 return False
1713 l_args = get_args(lhs)
1714 return bool(l_args and l_args[0] == rhs)
1715 if get_origin(rhs) == list:
1716 r_args = get_args(rhs)
1717 return bool(r_args and r_args[0] == lhs)
1718 return False
1721def _extract_typed_dict(
1722 base_type,
1723 default_target_attribute: str | None,
1724) -> tuple[type[TypedDict] | None, Any]:
1725 if is_typeddict(base_type):
1726 return base_type, None
1727 _, origin, args = unpack_type(base_type, False)
1728 if origin != Union:
1729 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1729 ↛ 1730line 1729 didn't jump to line 1730 because the condition on line 1729 was never true
1730 raise ValueError(
1731 "The source_format cannot be nor contain a (non-TypedDict) dict"
1732 )
1733 return None, base_type
1734 typed_dicts = [x for x in args if is_typeddict(x)]
1735 if len(typed_dicts) > 1: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true
1736 raise ValueError(
1737 "When source_format is a Union, it must contain at most one TypedDict"
1738 )
1739 typed_dict = typed_dicts[0] if typed_dicts else None
1741 if any(x is None or x is _NONE_TYPE for x in args): 1741 ↛ 1742line 1741 didn't jump to line 1742 because the condition on line 1741 was never true
1742 raise ValueError(
1743 "The source_format cannot be nor contain Optional[X] or Union[X, None]"
1744 )
1746 if any( 1746 ↛ 1751line 1746 didn't jump to line 1751 because the condition on line 1746 was never true
1747 isinstance(x, type) and issubclass(x, (dict, Mapping))
1748 for x in args
1749 if x is not typed_dict
1750 ):
1751 raise ValueError(
1752 "The source_format cannot be nor contain a (non-TypedDict) dict"
1753 )
1754 remaining = [x for x in args if x is not typed_dict]
1755 has_target_attribute = False
1756 anno = None
1757 if len(remaining) == 1: 1757 ↛ 1758line 1757 didn't jump to line 1758 because the condition on line 1757 was never true
1758 base_type, anno, _ = _parse_type(
1759 "source_format alternative form",
1760 remaining[0],
1761 forbid_optional=True,
1762 parsing_typed_dict_attribute=False,
1763 )
1764 has_target_attribute = bool(anno) and any(
1765 isinstance(x, TargetAttribute) for x in anno
1766 )
1767 target_type = base_type
1768 else:
1769 target_type = Union[tuple(remaining)]
1771 if default_target_attribute is None and not has_target_attribute: 1771 ↛ 1772line 1771 didn't jump to line 1772 because the condition on line 1771 was never true
1772 raise ValueError(
1773 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
1774 " OR the parsed_content format must have exactly one attribute that is required."
1775 )
1776 if anno: 1776 ↛ 1777line 1776 didn't jump to line 1777 because the condition on line 1776 was never true
1777 final_anno = [target_type]
1778 final_anno.extend(anno)
1779 return typed_dict, Annotated[tuple(final_anno)]
1780 return typed_dict, target_type
1783def _dispatch_parse_generator(
1784 dispatch_type: type[DebputyDispatchableType],
1785) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
1786 def _dispatch_parse(
1787 value: Any,
1788 attribute_path: AttributePath,
1789 parser_context: Optional["ParserContextData"],
1790 ):
1791 assert parser_context is not None
1792 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
1793 return dispatching_parser.parse_input(
1794 value, attribute_path, parser_context=parser_context
1795 )
1797 return _dispatch_parse
1800def _dispatch_parser(
1801 dispatch_type: type[DebputyDispatchableType],
1802) -> AttributeTypeHandler:
1803 return AttributeTypeHandler(
1804 dispatch_type.__name__,
1805 lambda *a: None,
1806 mapper=_dispatch_parse_generator(dispatch_type),
1807 )
1810def _parse_type(
1811 attribute: str,
1812 orig_td: Any,
1813 forbid_optional: bool = True,
1814 parsing_typed_dict_attribute: bool = True,
1815) -> tuple[Any, tuple[Any, ...], bool]:
1816 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1817 md: tuple[Any, ...] = tuple()
1818 optional = False
1819 if v is not None:
1820 if v == Annotated:
1821 anno = get_args(td)
1822 md = anno[1:]
1823 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
1825 if td is _NONE_TYPE: 1825 ↛ 1826line 1825 didn't jump to line 1826 because the condition on line 1825 was never true
1826 raise ValueError(
1827 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
1828 " debputy manifest, so this attribute does not make sense in its current form."
1829 )
1830 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1830 ↛ 1831line 1830 didn't jump to line 1831 because the condition on line 1830 was never true
1831 raise ValueError(
1832 f'Detected use of Optional in "{attribute}", which is not allowed here.'
1833 " Please use NotRequired for optional fields"
1834 )
1836 return td, md, optional
1839def _normalize_attribute_name(attribute: str) -> str:
1840 if attribute.endswith("_"):
1841 attribute = attribute[:-1]
1842 return attribute.replace("_", "-")
1845@dataclasses.dataclass
1846class DetectedDebputyParseHint:
1847 target_attribute: str
1848 source_manifest_attribute: str | None
1849 conflict_with_source_attributes: frozenset[str]
1850 conditional_required: ConditionalRequired | None
1851 applicable_as_path_hint: bool
1853 @classmethod
1854 def parse_annotations(
1855 cls,
1856 anno: tuple[Any, ...],
1857 error_context: str,
1858 default_attribute_name: str | None,
1859 is_required: bool,
1860 default_target_attribute: str | None = None,
1861 allow_target_attribute_annotation: bool = False,
1862 allow_source_attribute_annotations: bool = False,
1863 ) -> "DetectedDebputyParseHint":
1864 target_attr_anno = find_annotation(anno, TargetAttribute)
1865 if target_attr_anno:
1866 if not allow_target_attribute_annotation: 1866 ↛ 1867line 1866 didn't jump to line 1867 because the condition on line 1866 was never true
1867 raise ValueError(
1868 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
1869 )
1870 target_attribute = target_attr_anno.attribute
1871 elif default_target_attribute is not None:
1872 target_attribute = default_target_attribute
1873 elif default_attribute_name is not None: 1873 ↛ 1876line 1873 didn't jump to line 1876 because the condition on line 1873 was always true
1874 target_attribute = default_attribute_name
1875 else:
1876 if default_attribute_name is None:
1877 raise ValueError(
1878 "allow_target_attribute_annotation must be True OR "
1879 "default_attribute_name/default_target_attribute must be not None"
1880 )
1881 raise ValueError(
1882 f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
1883 )
1884 source_attribute_anno = find_annotation(anno, ManifestAttribute)
1885 _source_attribute_allowed(
1886 allow_source_attribute_annotations, error_context, source_attribute_anno
1887 )
1888 if source_attribute_anno:
1889 source_attribute_name = source_attribute_anno.attribute
1890 elif default_attribute_name is not None:
1891 source_attribute_name = _normalize_attribute_name(default_attribute_name)
1892 else:
1893 source_attribute_name = None
1894 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
1895 if mutual_exclusive_with_anno:
1896 _source_attribute_allowed(
1897 allow_source_attribute_annotations,
1898 error_context,
1899 mutual_exclusive_with_anno,
1900 )
1901 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
1902 else:
1903 conflicting_attributes = frozenset()
1904 conditional_required = find_annotation(anno, ConditionalRequired)
1906 if conditional_required and is_required: 1906 ↛ 1907line 1906 didn't jump to line 1907 because the condition on line 1906 was never true
1907 if default_attribute_name is None:
1908 raise ValueError(
1909 "is_required cannot be True without default_attribute_name being not None"
1910 )
1911 raise ValueError(
1912 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
1913 ' Please make the attribute "NotRequired" or remove the conditional requirement.'
1914 )
1916 not_path_hint_anno = find_annotation(anno, NotPathHint)
1917 applicable_as_path_hint = not_path_hint_anno is None
1919 return DetectedDebputyParseHint(
1920 target_attribute=target_attribute,
1921 source_manifest_attribute=source_attribute_name,
1922 conflict_with_source_attributes=conflicting_attributes,
1923 conditional_required=conditional_required,
1924 applicable_as_path_hint=applicable_as_path_hint,
1925 )
1928def _source_attribute_allowed(
1929 source_attribute_allowed: bool,
1930 error_context: str,
1931 annotation: DebputyParseHint | None,
1932) -> None:
1933 if source_attribute_allowed or annotation is None: 1933 ↛ 1935line 1933 didn't jump to line 1935 because the condition on line 1933 was always true
1934 return
1935 raise ValueError(
1936 f'The annotation "{annotation}" cannot be used here. {error_context}'
1937 )