Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%
797 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-19 20:37 +0000
1import collections
2import dataclasses
3import typing
4from types import UnionType
5from typing import (
6 Any,
7 TypedDict,
8 get_type_hints,
9 Annotated,
10 get_args,
11 get_origin,
12 TypeVar,
13 Generic,
14 Optional,
15 cast,
16 Type,
17 Union,
18 List,
19 NotRequired,
20 Literal,
21 TYPE_CHECKING,
22)
23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container
26from debputy.manifest_parser.base_types import FileSystemMatchRule
27from debputy.manifest_parser.exceptions import (
28 ManifestParseException,
29)
30from debputy.manifest_parser.mapper_code import (
31 normalize_into_list,
32 wrap_into_list,
33 map_each_element,
34)
35from debputy.manifest_parser.parse_hints import (
36 ConditionalRequired,
37 DebputyParseHint,
38 TargetAttribute,
39 ManifestAttribute,
40 ConflictWithSourceAttribute,
41 NotPathHint,
42)
43from debputy.manifest_parser.parser_data import ParserContextData
44from debputy.manifest_parser.tagging_types import (
45 DebputyParsedContent,
46 DebputyDispatchableType,
47 TypeMapping,
48)
49from debputy.manifest_parser.util import (
50 AttributePath,
51 unpack_type,
52 find_annotation,
53 check_integration_mode,
54)
55from debputy.plugin.api.impl_types import (
56 DeclarativeInputParser,
57 TD,
58 ListWrappedDeclarativeInputParser,
59 DispatchingObjectParser,
60 DispatchingTableParser,
61 TTP,
62 TP,
63 InPackageContextParser,
64)
65from debputy.plugin.api.spec import (
66 ParserDocumentation,
67 DebputyIntegrationMode,
68 StandardParserAttributeDocumentation,
69 undocumented_attr,
70 ParserAttributeDocumentation,
71 reference_documentation,
72)
73from debputy.util import _info, _warn, assume_not_none
75if TYPE_CHECKING:
76 from debputy.lsp.diagnostics import LintSeverity
79try:
80 from Levenshtein import distance
82 _WARN_ONCE: bool | None = None
83except ImportError:
84 _WARN_ONCE = False
87def _detect_possible_typo(
88 key: str,
89 value: object,
90 manifest_attributes: Mapping[str, "AttributeDescription"],
91 path: "AttributePath",
92) -> None:
93 global _WARN_ONCE
94 if _WARN_ONCE == False:
95 _WARN_ONCE = True
96 _info(
97 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
98 )
99 elif _WARN_ONCE is None:
100 k_len = len(key)
101 key_path = path[key]
102 matches: list[str] = []
103 current_match_strength = 0
104 for acceptable_key, attr in manifest_attributes.items():
105 if abs(k_len - len(acceptable_key)) > 2:
106 continue
107 d = distance(key, acceptable_key)
108 if d > 2:
109 continue
110 try:
111 attr.type_validator.ensure_type(value, key_path)
112 except ManifestParseException:
113 if attr.type_validator.base_type_match(value):
114 match_strength = 1
115 else:
116 match_strength = 0
117 else:
118 match_strength = 2
120 if match_strength < current_match_strength:
121 continue
122 if match_strength > current_match_strength:
123 current_match_strength = match_strength
124 matches.clear()
125 matches.append(acceptable_key)
127 if not matches:
128 return
129 ref = f'at "{path.path}"' if path else "at the manifest root level"
130 if len(matches) == 1:
131 possible_match = repr(matches[0])
132 _warn(
133 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}'
134 )
135 else:
136 matches.sort()
137 possible_matches = ", ".join(repr(a) for a in matches)
138 _warn(
139 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}'
140 )
143SF = TypeVar("SF")
144T = TypeVar("T")
145S = TypeVar("S")
148_NONE_TYPE = type(None)
151# These must be able to appear in an "isinstance" check and must be builtin types.
152BASIC_SIMPLE_TYPES = {
153 str: "string",
154 int: "integer",
155 bool: "boolean",
156}
159class AttributeTypeHandler:
160 __slots__ = ("_description", "_ensure_type", "base_type", "mapper")
162 def __init__(
163 self,
164 description: str,
165 ensure_type: Callable[[Any, AttributePath], None],
166 *,
167 base_type: type[Any] | None = None,
168 mapper: None | (
169 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
170 ) = None,
171 ) -> None:
172 self._description = description
173 self._ensure_type = ensure_type
174 self.base_type = base_type
175 self.mapper = mapper
177 def describe_type(self) -> str:
178 return self._description
180 def ensure_type(self, obj: object, path: AttributePath) -> None:
181 self._ensure_type(obj, path)
183 def base_type_match(self, obj: object) -> bool:
184 base_type = self.base_type
185 return base_type is not None and isinstance(obj, base_type)
187 def map_type(
188 self,
189 value: Any,
190 path: AttributePath,
191 parser_context: Optional["ParserContextData"],
192 ) -> Any:
193 mapper = self.mapper
194 if mapper is not None:
195 return mapper(value, path, parser_context)
196 return value
198 def combine_mapper(
199 self,
200 mapper: None | (
201 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]
202 ),
203 ) -> "AttributeTypeHandler":
204 if mapper is None:
205 return self
206 _combined_mapper: Callable[
207 [Any, AttributePath, Optional["ParserContextData"]], Any
208 ]
209 if self.mapper is not None:
210 m = self.mapper
212 def _combined_mapper(
213 value: Any,
214 path: AttributePath,
215 parser_context: Optional["ParserContextData"],
216 ) -> Any:
217 return mapper(m(value, path, parser_context), path, parser_context)
219 else:
220 _combined_mapper = mapper
222 return AttributeTypeHandler(
223 self._description,
224 self._ensure_type,
225 base_type=self.base_type,
226 mapper=_combined_mapper,
227 )
230@dataclasses.dataclass(slots=True)
231class AttributeDescription:
232 source_attribute_name: str
233 target_attribute: str
234 attribute_type: Any
235 type_validator: AttributeTypeHandler
236 annotations: tuple[Any, ...]
237 conflicting_attributes: frozenset[str]
238 conditional_required: Optional["ConditionalRequired"]
239 parse_hints: Optional["DetectedDebputyParseHint"] = None
240 is_optional: bool = False
243def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool:
244 if attribute_path.path_hint is not None: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 return True
246 if isinstance(v, str):
247 attribute_path.path_hint = v
248 return True
249 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str):
250 attribute_path.path_hint = v[0]
251 return True
252 return False
255@dataclasses.dataclass(slots=True, frozen=True)
256class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
257 alt_form_parser: AttributeDescription
258 inline_reference_documentation: ParserDocumentation | None = None
259 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
261 def parse_input(
262 self,
263 value: object,
264 path: AttributePath,
265 *,
266 parser_context: Optional["ParserContextData"] = None,
267 ) -> TD:
268 check_integration_mode(
269 path,
270 parser_context,
271 self.expected_debputy_integration_mode,
272 )
273 if self.reference_documentation_url is not None:
274 doc_ref = f" (Documentation: {self.reference_documentation_url})"
275 else:
276 doc_ref = ""
278 alt_form_parser = self.alt_form_parser
279 if value is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}"
281 if self.reference_documentation_url is not None:
282 doc_ref = f" Please see {self.reference_documentation_url} for the documentation."
283 raise ManifestParseException(
284 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
285 )
286 _extract_path_hint(value, path)
287 alt_form_parser.type_validator.ensure_type(value, path)
288 attribute = alt_form_parser.target_attribute
289 alias_mapping = {
290 attribute: ("", None),
291 }
292 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
293 path.alias_mapping = alias_mapping
294 return cast("TD", {attribute: v})
297@dataclasses.dataclass(slots=True)
298class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]):
299 input_time_required_parameters: frozenset[str]
300 all_parameters: frozenset[str]
301 manifest_attributes: Mapping[str, "AttributeDescription"]
302 source_attributes: Mapping[str, "AttributeDescription"]
303 at_least_one_of: frozenset[frozenset[str]]
304 alt_form_parser: AttributeDescription | None
305 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset()
306 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None
307 inline_reference_documentation: ParserDocumentation | None = None
308 path_hint_source_attributes: Sequence[str] = tuple()
309 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None
311 def _parse_alt_form(
312 self,
313 value: object,
314 path: AttributePath,
315 *,
316 parser_context: Optional["ParserContextData"] = None,
317 ) -> TD:
318 alt_form_parser = self.alt_form_parser
319 if alt_form_parser is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 raise ManifestParseException(
321 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}"
322 )
323 _extract_path_hint(value, path)
324 alt_form_parser.type_validator.ensure_type(value, path)
325 assert (
326 value is not None
327 ), "The alternative form was None, but the parser should have rejected None earlier."
328 attribute = alt_form_parser.target_attribute
329 alias_mapping = {
330 attribute: ("", None),
331 }
332 v = alt_form_parser.type_validator.map_type(value, path, parser_context)
333 path.alias_mapping = alias_mapping
334 return cast("TD", {attribute: v})
336 def _validate_expected_keys(
337 self,
338 value: dict[Any, Any],
339 path: AttributePath,
340 *,
341 parser_context: Optional["ParserContextData"] = None,
342 ) -> None:
343 unknown_keys = value.keys() - self.all_parameters
344 doc_ref = self._doc_url_error_suffix()
345 if unknown_keys: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 for k in unknown_keys:
347 if isinstance(k, str):
348 _detect_possible_typo(k, value[k], self.manifest_attributes, path)
349 unused_keys = self.all_parameters - value.keys()
350 if unused_keys:
351 k = ", ".join(unused_keys)
352 raise ManifestParseException(
353 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}'
354 )
355 raise ManifestParseException(
356 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}'
357 )
358 missing_keys = self.input_time_required_parameters - value.keys()
359 if missing_keys:
360 required = ", ".join(repr(k) for k in sorted(missing_keys))
361 raise ManifestParseException(
362 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}"
363 )
364 for maybe_required in self.all_parameters - value.keys():
365 attr = self.manifest_attributes[maybe_required]
366 assert attr.conditional_required is None or parser_context is not None
367 if ( 367 ↛ 373line 367 didn't jump to line 373 because the condition on line 367 was never true
368 attr.conditional_required is not None
369 and attr.conditional_required.condition_applies(
370 assume_not_none(parser_context)
371 )
372 ):
373 reason = attr.conditional_required.reason
374 raise ManifestParseException(
375 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}'
376 )
377 for keyset in self.at_least_one_of:
378 matched_keys = value.keys() & keyset
379 if not matched_keys: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 conditionally_required = ", ".join(repr(k) for k in sorted(keyset))
381 raise ManifestParseException(
382 f"At least one of the following keys must be present at {path.path_container_lc}:"
383 f" {conditionally_required}{doc_ref}"
384 )
385 for group in self.mutually_exclusive_attributes:
386 matched = value.keys() & group
387 if len(matched) > 1: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 ck = ", ".join(repr(k) for k in sorted(matched))
389 raise ManifestParseException(
390 f"Could not parse {path.path_container_lc}: The following attributes are"
391 f" mutually exclusive: {ck}{doc_ref}"
392 )
394 def _parse_typed_dict_form(
395 self,
396 value: dict[Any, Any],
397 path: AttributePath,
398 *,
399 parser_context: Optional["ParserContextData"] = None,
400 ) -> TD:
401 self._validate_expected_keys(value, path, parser_context=parser_context)
402 result = {}
403 per_attribute_conflicts = self._per_attribute_conflicts()
404 alias_mapping = {}
405 for path_hint_source_attributes in self.path_hint_source_attributes:
406 v = value.get(path_hint_source_attributes)
407 if v is not None and _extract_path_hint(v, path):
408 break
409 for k, v in value.items():
410 attr = self.manifest_attributes[k]
411 matched = value.keys() & per_attribute_conflicts[k]
412 if matched: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 ck = ", ".join(repr(k) for k in sorted(matched))
414 raise ManifestParseException(
415 f'The attribute "{k}" at {path.path} cannot be used with the following'
416 f" attributes: {ck}{self._doc_url_error_suffix()}"
417 )
418 nk = attr.target_attribute
419 key_path = path[k]
420 attr.type_validator.ensure_type(v, key_path)
421 if v is None: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 continue
423 if k != nk:
424 alias_mapping[nk] = k, None
425 v = attr.type_validator.map_type(v, key_path, parser_context)
426 result[nk] = v
427 if alias_mapping:
428 path.alias_mapping = alias_mapping
429 return cast("TD", result)
431 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
432 doc_url = self.reference_documentation_url
433 if doc_url is not None:
434 if see_url_version: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 return f" Please see {doc_url} for the documentation."
436 return f" (Documentation: {doc_url})"
437 return ""
439 def parse_input(
440 self,
441 value: object,
442 path: AttributePath,
443 *,
444 parser_context: Optional["ParserContextData"] = None,
445 ) -> TD:
446 check_integration_mode(
447 path,
448 parser_context,
449 self.expected_debputy_integration_mode,
450 )
451 if value is None: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 form_note = " The attribute must be a mapping."
453 if self.alt_form_parser is not None:
454 form_note = (
455 " The attribute can be a mapping or a non-mapping format"
456 ' (usually, "non-mapping format" means a string or a list of strings).'
457 )
458 doc_ref = self._doc_url_error_suffix(see_url_version=True)
459 raise ManifestParseException(
460 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
461 )
463 if not isinstance(value, dict):
464 return self._parse_alt_form(value, path, parser_context=parser_context)
465 return self._parse_typed_dict_form(value, path, parser_context=parser_context)
467 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]:
468 conflicts = self._per_attribute_conflicts_cache
469 if conflicts is not None:
470 return conflicts
471 attrs = self.source_attributes
472 conflicts = {
473 a.source_attribute_name: frozenset(
474 attrs[ca].source_attribute_name for ca in a.conflicting_attributes
475 )
476 for a in attrs.values()
477 }
478 self._per_attribute_conflicts_cache = conflicts
479 return self._per_attribute_conflicts_cache
482def _is_path_attribute_candidate(
483 source_attribute: AttributeDescription, target_attribute: AttributeDescription
484) -> bool:
485 if (
486 source_attribute.parse_hints
487 and not source_attribute.parse_hints.applicable_as_path_hint
488 ):
489 return False
490 target_type = target_attribute.attribute_type
491 _, origin, args = unpack_type(target_type, False)
492 match_type = target_type
493 if origin == list:
494 match_type = args[0]
495 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule)
498def is_typeddict(t: Any) -> bool:
499 return typing.is_typeddict(t) or (
500 # Logically, not is_typeddict(t) and is subclass(DebputyParsedContent)
501 # implies not is_typeddict(DebputyParsedContent)
502 # except that subclass *fails* for typeddicts.
503 not typing.is_typeddict(DebputyParsedContent)
504 and isinstance(t, type)
505 and issubclass(t, DebputyParsedContent)
506 )
509class ParserGenerator:
510 def __init__(self) -> None:
511 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {}
512 self._object_parsers: dict[str, DispatchingObjectParser] = {}
513 self._table_parsers: dict[
514 type[DebputyDispatchableType], DispatchingTableParser[Any]
515 ] = {}
516 self._in_package_context_parser: dict[str, Any] = {}
518 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None:
519 existing = self._registered_types.get(mapped_type.target_type)
520 if existing is not None: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true
521 raise ValueError(f"The type {existing} is already registered")
522 self._registered_types[mapped_type.target_type] = mapped_type
524 def get_mapped_type_from_target_type(
525 self,
526 mapped_type: type[T],
527 ) -> TypeMapping[Any, T] | None:
528 return self._registered_types.get(mapped_type)
530 def discard_mapped_type(self, mapped_type: type[T]) -> None:
531 del self._registered_types[mapped_type]
533 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None:
534 assert rt not in self._table_parsers
535 self._table_parsers[rt] = DispatchingTableParser(rt, path)
537 def add_object_parser(
538 self,
539 path: str,
540 *,
541 parser_documentation: ParserDocumentation | None = None,
542 expected_debputy_integration_mode: None | (
543 Container[DebputyIntegrationMode]
544 ) = None,
545 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error",
546 allow_unknown_keys: bool = False,
547 ) -> DispatchingObjectParser:
548 assert path not in self._in_package_context_parser
549 assert path not in self._object_parsers
550 object_parser = DispatchingObjectParser(
551 path,
552 parser_documentation=parser_documentation,
553 expected_debputy_integration_mode=expected_debputy_integration_mode,
554 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity,
555 allow_unknown_keys=allow_unknown_keys,
556 )
557 self._object_parsers[path] = object_parser
558 return object_parser
560 def add_in_package_context_parser(
561 self,
562 path: str,
563 delegate: DeclarativeInputParser[Any],
564 ) -> None:
565 assert path not in self._in_package_context_parser
566 assert path not in self._object_parsers
567 self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
569 @property
570 def dispatchable_table_parsers(
571 self,
572 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]:
573 return self._table_parsers
575 @property
576 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
577 return self._object_parsers
579 def dispatch_parser_table_for(
580 self, rule_type: TTP
581 ) -> DispatchingTableParser[TP] | None:
582 return cast(
583 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
584 )
586 def generate_parser(
587 self,
588 parsed_content: type[TD],
589 *,
590 source_content: SF | None = None,
591 allow_optional: bool = False,
592 inline_reference_documentation: ParserDocumentation | None = None,
593 expected_debputy_integration_mode: None | (
594 Container[DebputyIntegrationMode]
595 ) = None,
596 automatic_docs: None | (
597 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]]
598 ) = None,
599 ) -> DeclarativeInputParser[TD]:
600 """Derive a parser from a TypedDict
602 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict
603 or two that are used as a description.
605 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with
606 their types. As an example:
608 >>> class InstallDocsRule(DebputyParsedContent):
609 ... sources: List[str]
610 ... into: List[str]
611 >>> pg = ParserGenerator()
612 >>> simple_parser = pg.generate_parser(InstallDocsRule)
614 This will create a parser that would be able to interpret something like:
616 ```yaml
617 install-docs:
618 sources: ["docs/*"]
619 into: ["my-pkg"]
620 ```
622 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore,
623 you can also provide a TypedDict describing the input, enabling more flexibility:
625 >>> class InstallDocsRule(DebputyParsedContent):
626 ... sources: List[str]
627 ... into: List[str]
628 >>> class InputDocsRuleInputFormat(TypedDict):
629 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]]
630 ... sources: NotRequired[List[str]]
631 ... into: Union[str, List[str]]
632 >>> pg = ParserGenerator()
633 >>> flexible_parser = pg.generate_parser(
634 ... InstallDocsRule,
635 ... source_content=InputDocsRuleInputFormat,
636 ... )
638 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string)
639 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources`
640 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str
641 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that
642 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept
643 both the previous input but also the following input:
645 ```yaml
646 install-docs:
647 source: "docs/*"
648 into: "my-pkg"
649 ```
651 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists
652 with a single string in them. As noted above, the name of the `source` attribute will also be normalized
653 while parsing.
655 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict
656 as part of the input. Example:
658 >>> class DiscardRule(DebputyParsedContent):
659 ... paths: List[str]
660 >>> class DiscardRuleInputDictFormat(TypedDict):
661 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
662 ... paths: NotRequired[List[str]]
663 >>> # This format relies on DiscardRule having exactly one Required attribute
664 >>> DiscardRuleInputWithAltFormat = Union[
665 ... DiscardRuleInputDictFormat,
666 ... str,
667 ... List[str],
668 ... ]
669 >>> pg = ParserGenerator()
670 >>> flexible_parser = pg.generate_parser(
671 ... DiscardRule,
672 ... source_content=DiscardRuleInputWithAltFormat,
673 ... )
676 Supported types:
677 * `List` - must have a fixed type argument (such as `List[str]`)
678 * `str`
679 * `int`
680 * `BinaryPackage` - When provided (or required), the user must provide a package name listed
681 in the debian/control file. The code receives the BinaryPackage instance
682 matching that input.
683 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any
684 format that `debputy' provides (such as `0644` or `a=rw,go=rw`).
685 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is
686 available statically on all Debian systems (must be in `base-passwd`).
687 The user has multiple options for how to specify it (either via name or id).
688 * `FileSystemGroup` - When provided (or required), the user must a file system group that is
689 available statically on all Debian systems (must be in `base-passwd`).
690 The user has multiple options for how to specify it (either via name or id).
691 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply.
692 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which
693 provides the `debputy' default `when` parameter for conditionals.
695 Supported special type-like parameters:
697 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the
698 outermost level. Cannot vary between `parsed_content` and `source_content`.
699 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment.
700 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present).
701 Automapping (see below) is restricted to two members in the Union.
703 Notable non-supported types:
704 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed.
705 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields.
707 Automatic mapping rules from `source_content` to `parsed_content`:
708 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically:
709 `lambda value: value if isinstance(value, list) else [value]`
710 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]`
712 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod
713 for concrete features that may be useful to you.
715 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed.
716 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers).
717 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a
718 `List[TypedDict[...]]`.
719 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful
720 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted,
721 the parsed_content is also considered the source_content (which affects what annotations are allowed in it).
722 Note you should never pass the parsed_content as source_content directly.
723 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you
724 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and
725 can keep this False).
726 :param inline_reference_documentation: Optionally, programmatic documentation
727 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the
728 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage.
729 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately
730 (resulting in a "compile time" failure rather than a "runtime" failure).
731 :return: An input parser capable of reading input matching the TypedDict(s) used as reference.
732 """
733 orig_parsed_content = parsed_content
734 if source_content is parsed_content: 734 ↛ 735line 734 didn't jump to line 735 because the condition on line 734 was never true
735 raise ValueError(
736 "Do not provide source_content if it is the same as parsed_content"
737 )
738 is_list_wrapped = False
739 if get_origin(orig_parsed_content) == list:
740 parsed_content = get_args(orig_parsed_content)[0]
741 is_list_wrapped = True
743 if isinstance(parsed_content, type) and issubclass(
744 parsed_content, DebputyDispatchableType
745 ):
746 parser = self.dispatch_parser_table_for(parsed_content)
747 if parser is None: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true
748 raise ValueError(
749 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
750 f" The class {parsed_content.__qualname__} is not a pre-registered type."
751 )
752 # FIXME: Only the list wrapped version has documentation.
753 if is_list_wrapped: 753 ↛ 759line 753 didn't jump to line 759 because the condition on line 753 was always true
754 parser = ListWrappedDeclarativeInputParser(
755 parser,
756 inline_reference_documentation=inline_reference_documentation,
757 expected_debputy_integration_mode=expected_debputy_integration_mode,
758 )
759 return parser
761 if not is_typeddict(parsed_content): 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true
762 raise ValueError(
763 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
764 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
765 )
766 if is_list_wrapped and source_content is not None:
767 if get_origin(source_content) != list: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true
768 raise ValueError(
769 "If the parsed_content is a List type, then source_format must be a List type as well."
770 )
771 source_content = get_args(source_content)[0]
773 target_attributes = self._parse_types(
774 parsed_content,
775 allow_source_attribute_annotations=source_content is None,
776 forbid_optional=not allow_optional,
777 )
778 required_target_parameters = frozenset(parsed_content.__required_keys__)
779 parsed_alt_form = None
780 non_mapping_source_only = False
782 if source_content is not None:
783 default_target_attribute = None
784 if len(required_target_parameters) == 1:
785 default_target_attribute = next(iter(required_target_parameters))
787 source_typed_dict, alt_source_forms = _extract_typed_dict(
788 source_content,
789 default_target_attribute,
790 )
791 if alt_source_forms:
792 parsed_alt_form = self._parse_alt_form(
793 alt_source_forms,
794 default_target_attribute,
795 )
796 if source_typed_dict is not None:
797 source_content_attributes = self._parse_types(
798 source_typed_dict,
799 allow_target_attribute_annotation=True,
800 allow_source_attribute_annotations=True,
801 forbid_optional=not allow_optional,
802 )
803 source_content_parameter = "source_content"
804 source_and_parsed_differs = True
805 else:
806 source_typed_dict = parsed_content
807 source_content_attributes = target_attributes
808 source_content_parameter = "parsed_content"
809 source_and_parsed_differs = True
810 non_mapping_source_only = True
811 else:
812 source_typed_dict = parsed_content
813 source_content_attributes = target_attributes
814 source_content_parameter = "parsed_content"
815 source_and_parsed_differs = False
817 sources = collections.defaultdict(set)
818 seen_targets = set()
819 seen_source_names: dict[str, str] = {}
820 source_attributes: dict[str, AttributeDescription] = {}
821 path_hint_source_attributes = []
823 for k in source_content_attributes:
824 ia = source_content_attributes[k]
826 ta = (
827 target_attributes.get(ia.target_attribute)
828 if source_and_parsed_differs
829 else ia
830 )
831 if ta is None: 831 ↛ 833line 831 didn't jump to line 833 because the condition on line 831 was never true
832 # Error message would be wrong if this assertion is false.
833 assert source_and_parsed_differs
834 raise ValueError(
835 f'The attribute "{k}" from the "source_content" parameter should have mapped'
836 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"'
837 )
838 if _is_path_attribute_candidate(ia, ta):
839 path_hint_source_attributes.append(ia.source_attribute_name)
840 existing_source_name = seen_source_names.get(ia.source_attribute_name)
841 if existing_source_name: 841 ↛ 842line 841 didn't jump to line 842 because the condition on line 841 was never true
842 raise ValueError(
843 f'The attribute "{k}" and "{existing_source_name}" both share the source name'
844 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,'
845 f' so only one attribute use "{ia.source_attribute_name}".'
846 )
847 seen_source_names[ia.source_attribute_name] = k
848 seen_targets.add(ta.target_attribute)
849 sources[ia.target_attribute].add(k)
850 if source_and_parsed_differs:
851 bridge_mapper = self._type_normalize(
852 k, ia.attribute_type, ta.attribute_type, False
853 )
854 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper)
855 source_attributes[k] = ia
857 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]:
858 return frozenset(
859 source_content_attributes[a].source_attribute_name for a in td_name
860 )
862 _check_attributes(
863 parsed_content,
864 source_typed_dict,
865 source_content_attributes,
866 sources,
867 )
869 at_least_one_of = frozenset(
870 _as_attr_names(g)
871 for k, g in sources.items()
872 if len(g) > 1 and k in required_target_parameters
873 )
875 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true
876 missing = ", ".join(
877 repr(k) for k in (target_attributes.keys() - seen_targets)
878 )
879 raise ValueError(
880 'The following attributes in "parsed_content" did not have a source field in "source_content":'
881 f" {missing}"
882 )
883 all_mutually_exclusive_fields = frozenset(
884 _as_attr_names(g) for g in sources.values() if len(g) > 1
885 )
887 all_parameters = (
888 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__
889 )
890 _check_conflicts(
891 source_content_attributes,
892 source_typed_dict.__required_keys__,
893 all_parameters,
894 )
896 manifest_attributes = {
897 a.source_attribute_name: a for a in source_content_attributes.values()
898 }
900 if parsed_alt_form is not None:
901 target_attribute = parsed_alt_form.target_attribute
902 if ( 902 ↛ 907line 902 didn't jump to line 907 because the condition on line 902 was never true
903 target_attribute not in required_target_parameters
904 and required_target_parameters
905 or len(required_target_parameters) > 1
906 ):
907 raise NotImplementedError(
908 "When using alternative source formats (Union[TypedDict, ...]), then the"
909 " target must have at most one require parameter"
910 )
911 bridge_mapper = self._type_normalize(
912 target_attribute,
913 parsed_alt_form.attribute_type,
914 target_attributes[target_attribute].attribute_type,
915 False,
916 )
917 parsed_alt_form.type_validator = (
918 parsed_alt_form.type_validator.combine_mapper(bridge_mapper)
919 )
921 inline_reference_documentation = (
922 _verify_and_auto_correct_inline_reference_documentation(
923 parsed_content,
924 source_typed_dict,
925 source_content_attributes,
926 inline_reference_documentation,
927 parsed_alt_form is not None,
928 automatic_docs,
929 )
930 )
931 if non_mapping_source_only:
932 parser = DeclarativeNonMappingInputParser(
933 assume_not_none(parsed_alt_form),
934 inline_reference_documentation=inline_reference_documentation,
935 expected_debputy_integration_mode=expected_debputy_integration_mode,
936 )
937 else:
938 parser = DeclarativeMappingInputParser(
939 _as_attr_names(source_typed_dict.__required_keys__),
940 _as_attr_names(all_parameters),
941 manifest_attributes,
942 source_attributes,
943 mutually_exclusive_attributes=all_mutually_exclusive_fields,
944 alt_form_parser=parsed_alt_form,
945 at_least_one_of=at_least_one_of,
946 inline_reference_documentation=inline_reference_documentation,
947 path_hint_source_attributes=tuple(path_hint_source_attributes),
948 expected_debputy_integration_mode=expected_debputy_integration_mode,
949 )
950 if is_list_wrapped:
951 parser = ListWrappedDeclarativeInputParser(
952 parser,
953 expected_debputy_integration_mode=expected_debputy_integration_mode,
954 )
955 return parser
957 def _as_type_validator(
958 self,
959 attribute: str,
960 provided_type: Any,
961 parsing_typed_dict_attribute: bool,
962 ) -> AttributeTypeHandler:
963 assert not isinstance(provided_type, tuple)
965 if isinstance(provided_type, type) and issubclass(
966 provided_type, DebputyDispatchableType
967 ):
968 return _dispatch_parser(provided_type)
970 unmapped_type = self._strip_mapped_types(
971 provided_type,
972 parsing_typed_dict_attribute,
973 )
974 type_normalizer = self._type_normalize(
975 attribute,
976 unmapped_type,
977 provided_type,
978 parsing_typed_dict_attribute,
979 )
980 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type(
981 unmapped_type,
982 parsing_typed_dict_attribute,
983 )
984 _, t_provided_orig, t_provided_args = unpack_type(
985 provided_type,
986 parsing_typed_dict_attribute,
987 )
989 if ( 989 ↛ 995line 989 didn't jump to line 995 because the condition on line 989 was never true
990 t_unmapped_orig == Union
991 and t_unmapped_args
992 and len(t_unmapped_args) == 2
993 and any(v is _NONE_TYPE for v in t_unmapped_args)
994 ):
995 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute)
996 actual_type = [a for a in args if a is not _NONE_TYPE][0]
997 validator = self._as_type_validator(
998 attribute, actual_type, parsing_typed_dict_attribute
999 )
1001 def _validator(v: Any, path: AttributePath) -> None:
1002 if v is None:
1003 return
1004 validator.ensure_type(v, path)
1006 return AttributeTypeHandler(
1007 validator.describe_type(),
1008 _validator,
1009 base_type=validator.base_type,
1010 mapper=type_normalizer,
1011 )
1013 if unmapped_type in BASIC_SIMPLE_TYPES:
1014 type_name = BASIC_SIMPLE_TYPES[unmapped_type]
1016 type_mapping = self._registered_types.get(provided_type)
1017 if type_mapping is not None:
1018 simple_type = f" ({type_name})"
1019 type_name = type_mapping.target_type.__name__
1020 else:
1021 simple_type = ""
1023 def _validator(v: Any, path: AttributePath) -> None:
1024 if not isinstance(v, unmapped_type):
1025 _validation_type_error(
1026 path, f"The attribute must be a {type_name}{simple_type}"
1027 )
1029 return AttributeTypeHandler(
1030 type_name,
1031 _validator,
1032 base_type=unmapped_type,
1033 mapper=type_normalizer,
1034 )
1035 if t_unmapped_orig == list:
1036 if not t_unmapped_args: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true
1037 raise ValueError(
1038 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])'
1039 )
1041 genetic_type = t_unmapped_args[0]
1042 key_mapper = self._as_type_validator(
1043 attribute,
1044 genetic_type,
1045 parsing_typed_dict_attribute,
1046 )
1048 def _validator(v: Any, path: AttributePath) -> None:
1049 if not isinstance(v, list): 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true
1050 _validation_type_error(path, "The attribute must be a list")
1051 for i, list_item in enumerate(v):
1052 key_mapper.ensure_type(list_item, path[i])
1054 list_mapper = (
1055 map_each_element(key_mapper.mapper)
1056 if key_mapper.mapper is not None
1057 else None
1058 )
1060 return AttributeTypeHandler(
1061 f"List of {key_mapper.describe_type()}",
1062 _validator,
1063 base_type=list,
1064 mapper=type_normalizer,
1065 ).combine_mapper(list_mapper)
1066 if is_typeddict(provided_type):
1067 subparser = self.generate_parser(cast("Type[TD]", provided_type))
1068 return AttributeTypeHandler(
1069 description=f"{provided_type.__name__} (Typed Mapping)",
1070 ensure_type=lambda v, ap: None,
1071 base_type=dict,
1072 mapper=lambda v, ap, cv: subparser.parse_input(
1073 v, ap, parser_context=cv
1074 ),
1075 )
1076 if t_unmapped_orig == dict:
1077 if not t_unmapped_args or len(t_unmapped_args) != 2: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true
1078 raise ValueError(
1079 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])'
1080 )
1081 if t_unmapped_args[0] != str: 1081 ↛ 1082line 1081 didn't jump to line 1082 because the condition on line 1081 was never true
1082 raise ValueError(
1083 f'The attribute "{attribute}" is Dict and has a non-str type as key.'
1084 " Currently, only `str` is supported (Dict[str, Y])"
1085 )
1086 key_mapper = self._as_type_validator(
1087 attribute,
1088 t_unmapped_args[0],
1089 parsing_typed_dict_attribute,
1090 )
1091 value_mapper = self._as_type_validator(
1092 attribute,
1093 t_unmapped_args[1],
1094 parsing_typed_dict_attribute,
1095 )
1097 if key_mapper.base_type is None: 1097 ↛ 1098line 1097 didn't jump to line 1098 because the condition on line 1097 was never true
1098 raise ValueError(
1099 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types'
1100 f" without trivial base types (such as `str`) are not supported at the moment."
1101 )
1103 if value_mapper.mapper is not None: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 raise ValueError(
1105 f'The attribute "{attribute}" is Dict and the value requires mapping.'
1106 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])."
1107 " Better typing may come later"
1108 )
1110 def _validator(v: Any, path: AttributePath) -> None:
1111 if not isinstance(v, dict): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true
1112 _validation_type_error(path, "The attribute must be a mapping")
1113 key_name = "the first key in the mapping"
1114 for i, (k, value) in enumerate(v.items()):
1115 if not key_mapper.base_type_match(k): 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true
1116 kp = path.copy_with_path_hint(key_name)
1117 _validation_type_error(
1118 kp,
1119 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}',
1120 )
1121 key_name = f"the key after {k}"
1122 value_mapper.ensure_type(value, path[k])
1124 return AttributeTypeHandler(
1125 f"Mapping of {value_mapper.describe_type()}",
1126 _validator,
1127 base_type=dict,
1128 mapper=type_normalizer,
1129 ).combine_mapper(key_mapper.mapper)
1130 if t_unmapped_orig in (Union, UnionType):
1131 if _is_two_arg_x_list_x(t_provided_args):
1132 # Force the order to be "X, List[X]" as it simplifies the code
1133 x_list_x = (
1134 t_provided_args
1135 if get_origin(t_provided_args[1]) == list
1136 else (t_provided_args[1], t_provided_args[0])
1137 )
1139 # X, List[X] could match if X was List[Y]. However, our code below assumes
1140 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this
1141 # case to avoid this assert and fall into the "generic case".
1142 assert get_origin(x_list_x[0]) != list
1143 x_subtype_checker = self._as_type_validator(
1144 attribute,
1145 x_list_x[0],
1146 parsing_typed_dict_attribute,
1147 )
1148 list_x_subtype_checker = self._as_type_validator(
1149 attribute,
1150 x_list_x[1],
1151 parsing_typed_dict_attribute,
1152 )
1153 type_description = x_subtype_checker.describe_type()
1154 type_description = f"{type_description} or a list of {type_description}"
1156 def _validator(v: Any, path: AttributePath) -> None:
1157 if isinstance(v, list):
1158 list_x_subtype_checker.ensure_type(v, path)
1159 else:
1160 x_subtype_checker.ensure_type(v, path)
1162 return AttributeTypeHandler(
1163 type_description,
1164 _validator,
1165 mapper=type_normalizer,
1166 )
1167 else:
1168 subtype_checker = [
1169 self._as_type_validator(attribute, a, parsing_typed_dict_attribute)
1170 for a in t_unmapped_args
1171 ]
1172 type_description = "one-of: " + ", ".join(
1173 f"{sc.describe_type()}" for sc in subtype_checker
1174 )
1175 mapper = subtype_checker[0].mapper
1176 if any(mapper != sc.mapper for sc in subtype_checker): 1176 ↛ 1177line 1176 didn't jump to line 1177 because the condition on line 1176 was never true
1177 raise ValueError(
1178 f'Cannot handle the union "{provided_type}" as the target types need different'
1179 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]"
1180 " where X is a non-collection type."
1181 )
1183 def _validator(v: Any, path: AttributePath) -> None:
1184 partial_matches = []
1185 for sc in subtype_checker: 1185 ↛ 1193line 1185 didn't jump to line 1193 because the loop on line 1185 didn't complete
1186 try:
1187 sc.ensure_type(v, path)
1188 return
1189 except ManifestParseException as e:
1190 if sc.base_type_match(v): 1190 ↛ 1191line 1190 didn't jump to line 1191 because the condition on line 1190 was never true
1191 partial_matches.append((sc, e))
1193 if len(partial_matches) == 1:
1194 raise partial_matches[0][1]
1195 _validation_type_error(
1196 path, f"Could not match against: {type_description}"
1197 )
1199 return AttributeTypeHandler(
1200 type_description,
1201 _validator,
1202 mapper=type_normalizer,
1203 )
1204 if t_unmapped_orig == Literal:
1205 # We want "x" for string values; repr provides 'x'
1206 pretty = ", ".join(
1207 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args
1208 )
1210 def _validator(v: Any, path: AttributePath) -> None:
1211 if v not in t_unmapped_args:
1212 value_hint = ""
1213 if isinstance(v, str): 1213 ↛ 1215line 1213 didn't jump to line 1215 because the condition on line 1213 was always true
1214 value_hint = f"({v}) "
1215 _validation_type_error(
1216 path,
1217 f"Value {value_hint}must be one of the following literal values: {pretty}",
1218 )
1220 return AttributeTypeHandler(
1221 f"One of the following literal values: {pretty}",
1222 _validator,
1223 )
1225 if provided_type == Any: 1225 ↛ 1230line 1225 didn't jump to line 1230 because the condition on line 1225 was always true
1226 return AttributeTypeHandler(
1227 "any (unvalidated)",
1228 lambda *a: None,
1229 )
1230 raise ValueError(
1231 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported'
1232 )
1234 def _parse_types(
1235 self,
1236 spec: type[TypedDict],
1237 allow_target_attribute_annotation: bool = False,
1238 allow_source_attribute_annotations: bool = False,
1239 forbid_optional: bool = True,
1240 ) -> dict[str, AttributeDescription]:
1241 annotations = get_type_hints(spec, include_extras=True)
1242 return {
1243 k: self._attribute_description(
1244 k,
1245 t,
1246 k in spec.__required_keys__,
1247 allow_target_attribute_annotation=allow_target_attribute_annotation,
1248 allow_source_attribute_annotations=allow_source_attribute_annotations,
1249 forbid_optional=forbid_optional,
1250 )
1251 for k, t in annotations.items()
1252 }
1254 def _attribute_description(
1255 self,
1256 attribute: str,
1257 orig_td: Any,
1258 is_required: bool,
1259 forbid_optional: bool = True,
1260 allow_target_attribute_annotation: bool = False,
1261 allow_source_attribute_annotations: bool = False,
1262 ) -> AttributeDescription:
1263 td, anno, is_optional = _parse_type(
1264 attribute, orig_td, forbid_optional=forbid_optional
1265 )
1266 type_validator = self._as_type_validator(attribute, td, True)
1267 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1268 anno,
1269 f' Seen with attribute "{attribute}".',
1270 attribute,
1271 is_required,
1272 allow_target_attribute_annotation=allow_target_attribute_annotation,
1273 allow_source_attribute_annotations=allow_source_attribute_annotations,
1274 )
1275 return AttributeDescription(
1276 target_attribute=parsed_annotations.target_attribute,
1277 attribute_type=td,
1278 type_validator=type_validator,
1279 annotations=anno,
1280 is_optional=is_optional,
1281 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1282 conditional_required=parsed_annotations.conditional_required,
1283 source_attribute_name=assume_not_none(
1284 parsed_annotations.source_manifest_attribute
1285 ),
1286 parse_hints=parsed_annotations,
1287 )
1289 def _parse_alt_form(
1290 self,
1291 alt_form,
1292 default_target_attribute: str | None,
1293 ) -> AttributeDescription:
1294 td, anno, is_optional = _parse_type(
1295 "source_format alternative form",
1296 alt_form,
1297 forbid_optional=True,
1298 parsing_typed_dict_attribute=False,
1299 )
1300 type_validator = self._as_type_validator(
1301 "source_format alternative form",
1302 td,
1303 True,
1304 )
1305 parsed_annotations = DetectedDebputyParseHint.parse_annotations(
1306 anno,
1307 " The alternative for source_format.",
1308 None,
1309 False,
1310 default_target_attribute=default_target_attribute,
1311 allow_target_attribute_annotation=True,
1312 allow_source_attribute_annotations=False,
1313 )
1314 return AttributeDescription(
1315 target_attribute=parsed_annotations.target_attribute,
1316 attribute_type=td,
1317 type_validator=type_validator,
1318 annotations=anno,
1319 is_optional=is_optional,
1320 conflicting_attributes=parsed_annotations.conflict_with_source_attributes,
1321 conditional_required=parsed_annotations.conditional_required,
1322 source_attribute_name="Alt form of the source_format",
1323 )
1325 def _union_narrowing(
1326 self,
1327 input_type: Any,
1328 target_type: Any,
1329 parsing_typed_dict_attribute: bool,
1330 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1331 _, input_orig, input_args = unpack_type(
1332 input_type, parsing_typed_dict_attribute
1333 )
1334 _, target_orig, target_args = unpack_type(
1335 target_type, parsing_typed_dict_attribute
1336 )
1338 if input_orig not in (Union, UnionType) or not input_args: 1338 ↛ 1339line 1338 didn't jump to line 1339 because the condition on line 1338 was never true
1339 raise ValueError("input_type must be a Union[...] with non-empty args")
1341 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]]
1342 # - Where X = Y or there is a simple standard transformation from X to Y.
1344 if target_orig not in (Union, UnionType, list) or not target_args:
1345 # Not supported
1346 return None
1348 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1348 ↛ 1350line 1348 didn't jump to line 1350 because the condition on line 1348 was never true
1349 # Not needed (identity mapping)
1350 return None
1352 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1352 ↛ 1354line 1352 didn't jump to line 1354 because the condition on line 1352 was never true
1353 # Not supported
1354 return None
1356 target_arg = target_args[0]
1357 simplified_type = self._strip_mapped_types(
1358 target_arg, parsing_typed_dict_attribute
1359 )
1360 acceptable_types = {
1361 target_arg,
1362 list[target_arg], # type: ignore
1363 List[target_arg], # type: ignore
1364 simplified_type,
1365 list[simplified_type], # type: ignore
1366 List[simplified_type], # type: ignore
1367 }
1368 target_format = (
1369 target_arg,
1370 list[target_arg], # type: ignore
1371 List[target_arg], # type: ignore
1372 )
1373 in_target_format = 0
1374 in_simple_format = 0
1375 for input_arg in input_args:
1376 if input_arg not in acceptable_types: 1376 ↛ 1378line 1376 didn't jump to line 1378 because the condition on line 1376 was never true
1377 # Not supported
1378 return None
1379 if input_arg in target_format:
1380 in_target_format += 1
1381 else:
1382 in_simple_format += 1
1384 assert in_simple_format or in_target_format
1386 if in_target_format and not in_simple_format:
1387 # Union[X, List[X]] -> List[X]
1388 return normalize_into_list
1389 mapped = self._registered_types[target_arg]
1390 if not in_target_format and in_simple_format: 1390 ↛ 1405line 1390 didn't jump to line 1405 because the condition on line 1390 was always true
1391 # Union[X, List[X]] -> List[Y]
1393 def _mapper_x_list_y(
1394 x: Any | list[Any],
1395 ap: AttributePath,
1396 pc: Optional["ParserContextData"],
1397 ) -> list[Any]:
1398 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1400 return [mapped.mapper(x, ap, pc) for x in in_list_form]
1402 return _mapper_x_list_y
1404 # Union[Y, List[X]] -> List[Y]
1405 if not isinstance(target_arg, type):
1406 raise ValueError(
1407 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does"
1408 f" not support mixed types. Please use either {simplified_type} or {target_arg}"
1409 f" in the source content (but both a mix of both)"
1410 )
1412 def _mapper_mixed_list_y(
1413 x: Any | list[Any],
1414 ap: AttributePath,
1415 pc: Optional["ParserContextData"],
1416 ) -> list[Any]:
1417 in_list_form: list[Any] = normalize_into_list(x, ap, pc)
1419 return [
1420 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc)
1421 for x in in_list_form
1422 ]
1424 return _mapper_mixed_list_y
1426 def _type_normalize(
1427 self,
1428 attribute: str,
1429 input_type: Any,
1430 target_type: Any,
1431 parsing_typed_dict_attribute: bool,
1432 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None:
1433 if input_type == target_type:
1434 return None
1435 _, input_orig, input_args = unpack_type(
1436 input_type, parsing_typed_dict_attribute
1437 )
1438 _, target_orig, target_args = unpack_type(
1439 target_type,
1440 parsing_typed_dict_attribute,
1441 )
1442 if input_orig in (Union, UnionType):
1443 result = self._union_narrowing(
1444 input_type, target_type, parsing_typed_dict_attribute
1445 )
1446 if result:
1447 return result
1448 elif target_orig == list and target_args[0] == input_type:
1449 return wrap_into_list
1451 mapped = self._registered_types.get(target_type)
1452 if mapped is not None and input_type == mapped.source_type:
1453 # Source -> Target
1454 return mapped.mapper
1455 if target_orig == list and target_args: 1455 ↛ 1473line 1455 didn't jump to line 1473 because the condition on line 1455 was always true
1456 mapped = self._registered_types.get(target_args[0])
1457 if mapped is not None: 1457 ↛ 1473line 1457 didn't jump to line 1473 because the condition on line 1457 was always true
1458 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
1459 mapped_type: TypeMapping = mapped
1460 if input_type == mapped.source_type: 1460 ↛ 1462line 1460 didn't jump to line 1462 because the condition on line 1460 was never true
1461 # Source -> List[Target]
1462 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)]
1463 if ( 1463 ↛ 1473line 1463 didn't jump to line 1473 because the condition on line 1463 was always true
1464 input_orig == list
1465 and input_args
1466 and input_args[0] == mapped_type.source_type
1467 ):
1468 # List[Source] -> List[Target]
1469 return lambda xs, ap, pc: [
1470 mapped_type.mapper(x, ap, pc) for x in xs
1471 ]
1473 raise ValueError(
1474 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow'
1475 f" {input_type} to {target_type}"
1476 )
1478 def _strip_mapped_types(
1479 self, orig_td: Any, parsing_typed_dict_attribute: bool
1480 ) -> Any:
1481 m = self._registered_types.get(orig_td)
1482 if m is not None:
1483 return m.source_type
1484 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1485 if v == list:
1486 arg = args[0]
1487 m = self._registered_types.get(arg)
1488 if m:
1489 return list[m.source_type] # type: ignore
1490 if v in (Union, UnionType):
1491 stripped_args = tuple(
1492 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args
1493 )
1494 if stripped_args != args:
1495 return Union[stripped_args]
1496 return orig_td
1499def _sort_key(attr: StandardParserAttributeDocumentation) -> Any:
1500 key = next(iter(attr.attributes))
1501 return attr.sort_category, key
1504def _apply_std_docs(
1505 std_doc_table: (
1506 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1507 ),
1508 source_format_typed_dict: type[Any],
1509 attribute_docs: Sequence[ParserAttributeDocumentation] | None,
1510) -> Sequence[ParserAttributeDocumentation] | None:
1511 if std_doc_table is None or not std_doc_table: 1511 ↛ 1514line 1511 didn't jump to line 1514 because the condition on line 1511 was always true
1512 return attribute_docs
1514 has_docs_for = set()
1515 if attribute_docs:
1516 for attribute_doc in attribute_docs:
1517 has_docs_for.update(attribute_doc.attributes)
1519 base_seen = set()
1520 std_docs_used = []
1522 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", []))
1523 base_seen.update(remaining_bases)
1524 while remaining_bases:
1525 base = remaining_bases.pop()
1526 new_bases_to_check = {
1527 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen
1528 }
1529 remaining_bases.update(new_bases_to_check)
1530 base_seen.update(new_bases_to_check)
1531 std_docs = std_doc_table.get(base)
1532 if std_docs:
1533 for std_doc in std_docs:
1534 if any(a in has_docs_for for a in std_doc.attributes):
1535 # If there is any overlap, do not add the docs
1536 continue
1537 has_docs_for.update(std_doc.attributes)
1538 std_docs_used.append(std_doc)
1540 if not std_docs_used:
1541 return attribute_docs
1542 docs = sorted(std_docs_used, key=_sort_key)
1543 if attribute_docs:
1544 # Plugin provided attributes first
1545 c = list(attribute_docs)
1546 c.extend(docs)
1547 docs = c
1548 return tuple(docs)
1551def _verify_and_auto_correct_inline_reference_documentation(
1552 parsed_content: type[TD],
1553 source_typed_dict: type[Any],
1554 source_content_attributes: Mapping[str, AttributeDescription],
1555 inline_reference_documentation: ParserDocumentation | None,
1556 has_alt_form: bool,
1557 automatic_docs: (
1558 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None
1559 ) = None,
1560) -> ParserDocumentation | None:
1561 orig_attribute_docs = (
1562 inline_reference_documentation.attribute_doc
1563 if inline_reference_documentation
1564 else None
1565 )
1566 attribute_docs = _apply_std_docs(
1567 automatic_docs,
1568 source_typed_dict,
1569 orig_attribute_docs,
1570 )
1571 if inline_reference_documentation is None and attribute_docs is None:
1572 return None
1573 changes = {}
1574 if attribute_docs:
1575 seen = set()
1576 had_any_custom_docs = False
1577 for attr_doc in attribute_docs:
1578 if not isinstance(attr_doc, StandardParserAttributeDocumentation):
1579 had_any_custom_docs = True
1580 for attr_name in attr_doc.attributes:
1581 attr = source_content_attributes.get(attr_name)
1582 if attr is None: 1582 ↛ 1583line 1582 didn't jump to line 1583 because the condition on line 1582 was never true
1583 raise ValueError(
1584 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1585 f' references an attribute "{attr_name}", which does not exist in the source format.'
1586 )
1587 if attr_name in seen: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true
1588 raise ValueError(
1589 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}"
1590 f' has documentation for "{attr_name}" twice, which is not supported.'
1591 f" Please document it at most once"
1592 )
1593 seen.add(attr_name)
1594 undocumented = source_content_attributes.keys() - seen
1595 if undocumented: 1595 ↛ 1596line 1595 didn't jump to line 1596 because the condition on line 1595 was never true
1596 if had_any_custom_docs:
1597 undocumented_attrs = ", ".join(undocumented)
1598 raise ValueError(
1599 f"The following attributes were not documented for the source format of"
1600 f" {parsed_content.__qualname__}. If this is deliberate, then please"
1601 ' declare each them as undocumented (via undocumented_attr("foo")):'
1602 f" {undocumented_attrs}"
1603 )
1604 combined_docs = list(attribute_docs)
1605 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented))
1606 attribute_docs = combined_docs
1608 if attribute_docs and orig_attribute_docs != attribute_docs: 1608 ↛ 1609line 1608 didn't jump to line 1609 because the condition on line 1608 was never true
1609 assert attribute_docs is not None
1610 changes["attribute_doc"] = tuple(attribute_docs)
1612 if ( 1612 ↛ 1617line 1612 didn't jump to line 1617 because the condition on line 1612 was never true
1613 inline_reference_documentation is not None
1614 and inline_reference_documentation.alt_parser_description
1615 and not has_alt_form
1616 ):
1617 raise ValueError(
1618 "The inline_reference_documentation had documentation for an non-mapping format,"
1619 " but the source format does not have a non-mapping format."
1620 )
1621 if changes: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true
1622 if inline_reference_documentation is None:
1623 inline_reference_documentation = reference_documentation()
1624 return inline_reference_documentation.replace(**changes)
1625 return inline_reference_documentation
1628def _check_conflicts(
1629 input_content_attributes: dict[str, AttributeDescription],
1630 required_attributes: frozenset[str],
1631 all_attributes: frozenset[str],
1632) -> None:
1633 for attr_name, attr in input_content_attributes.items():
1634 if attr_name in required_attributes and attr.conflicting_attributes: 1634 ↛ 1635line 1634 didn't jump to line 1635 because the condition on line 1634 was never true
1635 c = ", ".join(repr(a) for a in attr.conflicting_attributes)
1636 raise ValueError(
1637 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.'
1638 " This makes it impossible to use these attributes. Either remove the attributes"
1639 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"'
1640 " optional (NotRequired)"
1641 )
1642 else:
1643 required_conflicts = attr.conflicting_attributes & required_attributes
1644 if required_conflicts: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true
1645 c = ", ".join(repr(a) for a in required_conflicts)
1646 raise ValueError(
1647 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.'
1648 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,'
1649 f" adjust the conflicts or make the listed attributes optional (NotRequired)"
1650 )
1651 unknown_attributes = attr.conflicting_attributes - all_attributes
1652 if unknown_attributes: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true
1653 c = ", ".join(repr(a) for a in unknown_attributes)
1654 raise ValueError(
1655 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.'
1656 f" None of these attributes were declared in the input."
1657 )
1660def _check_attributes(
1661 content: type[TypedDict],
1662 input_content: type[TypedDict],
1663 input_content_attributes: dict[str, AttributeDescription],
1664 sources: Mapping[str, Collection[str]],
1665) -> None:
1666 target_required_keys = content.__required_keys__
1667 input_required_keys = input_content.__required_keys__
1668 all_input_keys = input_required_keys | input_content.__optional_keys__
1670 for input_name in all_input_keys:
1671 attr = input_content_attributes[input_name]
1672 target_name = attr.target_attribute
1673 source_names = sources[target_name]
1674 input_is_required = input_name in input_required_keys
1675 target_is_required = target_name in target_required_keys
1677 assert source_names
1679 if input_is_required and len(source_names) > 1: 1679 ↛ 1680line 1679 didn't jump to line 1680 because the condition on line 1679 was never true
1680 raise ValueError(
1681 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",'
1682 f' which has multiple sources "{source_names}". If "{input_name}" should be required,'
1683 f' then there is no need for additional sources for "{target_name}". Alternatively,'
1684 f' "{input_name}" might be missing a NotRequired type'
1685 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")'
1686 )
1687 if not input_is_required and target_is_required and len(source_names) == 1: 1687 ↛ 1688line 1687 didn't jump to line 1688 because the condition on line 1687 was never true
1688 raise ValueError(
1689 f'The source attribute "{input_name}" is not marked as required and maps to'
1690 f' "{target_name}", which is marked as required. As there are no other attributes'
1691 f' mapping to "{target_name}", then "{input_name}" must be required as well'
1692 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional'
1693 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.'
1694 )
1697def _validation_type_error(path: AttributePath, message: str) -> None:
1698 raise ManifestParseException(
1699 f'The attribute "{path.path}" did not have a valid structure/type: {message}'
1700 )
1703def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool:
1704 if len(t_args) != 2:
1705 return False
1706 lhs, rhs = t_args
1707 if get_origin(lhs) == list:
1708 if get_origin(rhs) == list: 1708 ↛ 1711line 1708 didn't jump to line 1711 because the condition on line 1708 was never true
1709 # It could still match X, List[X] - but we do not allow this case for now as the caller
1710 # does not support it.
1711 return False
1712 l_args = get_args(lhs)
1713 return bool(l_args and l_args[0] == rhs)
1714 if get_origin(rhs) == list:
1715 r_args = get_args(rhs)
1716 return bool(r_args and r_args[0] == lhs)
1717 return False
1720def _extract_typed_dict(
1721 base_type,
1722 default_target_attribute: str | None,
1723) -> tuple[type[TypedDict] | None, Any]:
1724 if is_typeddict(base_type):
1725 return base_type, None
1726 _, origin, args = unpack_type(base_type, False)
1727 if origin != Union:
1728 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1728 ↛ 1729line 1728 didn't jump to line 1729 because the condition on line 1728 was never true
1729 raise ValueError(
1730 "The source_format cannot be nor contain a (non-TypedDict) dict"
1731 )
1732 return None, base_type
1733 typed_dicts = [x for x in args if is_typeddict(x)]
1734 if len(typed_dicts) > 1: 1734 ↛ 1735line 1734 didn't jump to line 1735 because the condition on line 1734 was never true
1735 raise ValueError(
1736 "When source_format is a Union, it must contain at most one TypedDict"
1737 )
1738 typed_dict = typed_dicts[0] if typed_dicts else None
1740 if any(x is None or x is _NONE_TYPE for x in args): 1740 ↛ 1741line 1740 didn't jump to line 1741 because the condition on line 1740 was never true
1741 raise ValueError(
1742 "The source_format cannot be nor contain Optional[X] or Union[X, None]"
1743 )
1745 if any( 1745 ↛ 1750line 1745 didn't jump to line 1750 because the condition on line 1745 was never true
1746 isinstance(x, type) and issubclass(x, (dict, Mapping))
1747 for x in args
1748 if x is not typed_dict
1749 ):
1750 raise ValueError(
1751 "The source_format cannot be nor contain a (non-TypedDict) dict"
1752 )
1753 remaining = [x for x in args if x is not typed_dict]
1754 has_target_attribute = False
1755 anno = None
1756 if len(remaining) == 1: 1756 ↛ 1757line 1756 didn't jump to line 1757 because the condition on line 1756 was never true
1757 base_type, anno, _ = _parse_type(
1758 "source_format alternative form",
1759 remaining[0],
1760 forbid_optional=True,
1761 parsing_typed_dict_attribute=False,
1762 )
1763 has_target_attribute = bool(anno) and any(
1764 isinstance(x, TargetAttribute) for x in anno
1765 )
1766 target_type = base_type
1767 else:
1768 target_type = Union[tuple(remaining)]
1770 if default_target_attribute is None and not has_target_attribute: 1770 ↛ 1771line 1770 didn't jump to line 1771 because the condition on line 1770 was never true
1771 raise ValueError(
1772 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]'
1773 " OR the parsed_content format must have exactly one attribute that is required."
1774 )
1775 if anno: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true
1776 final_anno = [target_type]
1777 final_anno.extend(anno)
1778 return typed_dict, Annotated[tuple(final_anno)]
1779 return typed_dict, target_type
1782def _dispatch_parse_generator(
1783 dispatch_type: type[DebputyDispatchableType],
1784) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]:
1785 def _dispatch_parse(
1786 value: Any,
1787 attribute_path: AttributePath,
1788 parser_context: Optional["ParserContextData"],
1789 ):
1790 assert parser_context is not None
1791 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
1792 return dispatching_parser.parse_input(
1793 value, attribute_path, parser_context=parser_context
1794 )
1796 return _dispatch_parse
1799def _dispatch_parser(
1800 dispatch_type: type[DebputyDispatchableType],
1801) -> AttributeTypeHandler:
1802 return AttributeTypeHandler(
1803 dispatch_type.__name__,
1804 lambda *a: None,
1805 mapper=_dispatch_parse_generator(dispatch_type),
1806 )
1809def _parse_type(
1810 attribute: str,
1811 orig_td: Any,
1812 forbid_optional: bool = True,
1813 parsing_typed_dict_attribute: bool = True,
1814) -> tuple[Any, tuple[Any, ...], bool]:
1815 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute)
1816 md: tuple[Any, ...] = tuple()
1817 optional = False
1818 if v is not None:
1819 if v == Annotated:
1820 anno = get_args(td)
1821 md = anno[1:]
1822 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute)
1824 if td is _NONE_TYPE: 1824 ↛ 1825line 1824 didn't jump to line 1825 because the condition on line 1824 was never true
1825 raise ValueError(
1826 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the'
1827 " debputy manifest, so this attribute does not make sense in its current form."
1828 )
1829 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1829 ↛ 1830line 1829 didn't jump to line 1830 because the condition on line 1829 was never true
1830 raise ValueError(
1831 f'Detected use of Optional in "{attribute}", which is not allowed here.'
1832 " Please use NotRequired for optional fields"
1833 )
1835 return td, md, optional
1838def _normalize_attribute_name(attribute: str) -> str:
1839 if attribute.endswith("_"):
1840 attribute = attribute[:-1]
1841 return attribute.replace("_", "-")
1844@dataclasses.dataclass
1845class DetectedDebputyParseHint:
1846 target_attribute: str
1847 source_manifest_attribute: str | None
1848 conflict_with_source_attributes: frozenset[str]
1849 conditional_required: ConditionalRequired | None
1850 applicable_as_path_hint: bool
1852 @classmethod
1853 def parse_annotations(
1854 cls,
1855 anno: tuple[Any, ...],
1856 error_context: str,
1857 default_attribute_name: str | None,
1858 is_required: bool,
1859 default_target_attribute: str | None = None,
1860 allow_target_attribute_annotation: bool = False,
1861 allow_source_attribute_annotations: bool = False,
1862 ) -> "DetectedDebputyParseHint":
1863 target_attr_anno = find_annotation(anno, TargetAttribute)
1864 if target_attr_anno:
1865 if not allow_target_attribute_annotation: 1865 ↛ 1866line 1865 didn't jump to line 1866 because the condition on line 1865 was never true
1866 raise ValueError(
1867 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}"
1868 )
1869 target_attribute = target_attr_anno.attribute
1870 elif default_target_attribute is not None:
1871 target_attribute = default_target_attribute
1872 elif default_attribute_name is not None: 1872 ↛ 1875line 1872 didn't jump to line 1875 because the condition on line 1872 was always true
1873 target_attribute = default_attribute_name
1874 else:
1875 if default_attribute_name is None:
1876 raise ValueError(
1877 "allow_target_attribute_annotation must be True OR "
1878 "default_attribute_name/default_target_attribute must be not None"
1879 )
1880 raise ValueError(
1881 f"Missing DebputyParseHint.target_attribute annotation.{error_context}"
1882 )
1883 source_attribute_anno = find_annotation(anno, ManifestAttribute)
1884 _source_attribute_allowed(
1885 allow_source_attribute_annotations, error_context, source_attribute_anno
1886 )
1887 if source_attribute_anno:
1888 source_attribute_name = source_attribute_anno.attribute
1889 elif default_attribute_name is not None:
1890 source_attribute_name = _normalize_attribute_name(default_attribute_name)
1891 else:
1892 source_attribute_name = None
1893 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute)
1894 if mutual_exclusive_with_anno:
1895 _source_attribute_allowed(
1896 allow_source_attribute_annotations,
1897 error_context,
1898 mutual_exclusive_with_anno,
1899 )
1900 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes
1901 else:
1902 conflicting_attributes = frozenset()
1903 conditional_required = find_annotation(anno, ConditionalRequired)
1905 if conditional_required and is_required: 1905 ↛ 1906line 1905 didn't jump to line 1906 because the condition on line 1905 was never true
1906 if default_attribute_name is None:
1907 raise ValueError(
1908 "is_required cannot be True without default_attribute_name being not None"
1909 )
1910 raise ValueError(
1911 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.'
1912 ' Please make the attribute "NotRequired" or remove the conditional requirement.'
1913 )
1915 not_path_hint_anno = find_annotation(anno, NotPathHint)
1916 applicable_as_path_hint = not_path_hint_anno is None
1918 return DetectedDebputyParseHint(
1919 target_attribute=target_attribute,
1920 source_manifest_attribute=source_attribute_name,
1921 conflict_with_source_attributes=conflicting_attributes,
1922 conditional_required=conditional_required,
1923 applicable_as_path_hint=applicable_as_path_hint,
1924 )
1927def _source_attribute_allowed(
1928 source_attribute_allowed: bool,
1929 error_context: str,
1930 annotation: DebputyParseHint | None,
1931) -> None:
1932 if source_attribute_allowed or annotation is None: 1932 ↛ 1934line 1932 didn't jump to line 1934 because the condition on line 1932 was always true
1933 return
1934 raise ValueError(
1935 f'The annotation "{annotation}" cannot be used here. {error_context}'
1936 )