Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%

798 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import collections 

2import dataclasses 

3import typing 

4from typing import ( 

5 Any, 

6 Callable, 

7 Tuple, 

8 TypedDict, 

9 Dict, 

10 get_type_hints, 

11 Annotated, 

12 get_args, 

13 get_origin, 

14 TypeVar, 

15 Generic, 

16 FrozenSet, 

17 Mapping, 

18 Optional, 

19 cast, 

20 Type, 

21 Union, 

22 List, 

23 Collection, 

24 NotRequired, 

25 Iterable, 

26 Literal, 

27 Sequence, 

28 Container, 

29 TYPE_CHECKING, 

30) 

31 

32 

33from debputy.manifest_parser.base_types import FileSystemMatchRule 

34from debputy.manifest_parser.exceptions import ( 

35 ManifestParseException, 

36) 

37from debputy.manifest_parser.mapper_code import ( 

38 normalize_into_list, 

39 wrap_into_list, 

40 map_each_element, 

41) 

42from debputy.manifest_parser.parse_hints import ( 

43 ConditionalRequired, 

44 DebputyParseHint, 

45 TargetAttribute, 

46 ManifestAttribute, 

47 ConflictWithSourceAttribute, 

48 NotPathHint, 

49) 

50from debputy.manifest_parser.parser_data import ParserContextData 

51from debputy.manifest_parser.tagging_types import ( 

52 DebputyParsedContent, 

53 DebputyDispatchableType, 

54 TypeMapping, 

55) 

56from debputy.manifest_parser.util import ( 

57 AttributePath, 

58 unpack_type, 

59 find_annotation, 

60 check_integration_mode, 

61) 

62from debputy.plugin.api.impl_types import ( 

63 DeclarativeInputParser, 

64 TD, 

65 ListWrappedDeclarativeInputParser, 

66 DispatchingObjectParser, 

67 DispatchingTableParser, 

68 TTP, 

69 TP, 

70 InPackageContextParser, 

71) 

72from debputy.plugin.api.spec import ( 

73 ParserDocumentation, 

74 DebputyIntegrationMode, 

75 StandardParserAttributeDocumentation, 

76 undocumented_attr, 

77 ParserAttributeDocumentation, 

78 reference_documentation, 

79) 

80from debputy.util import _info, _warn, assume_not_none 

81 

82 

83if TYPE_CHECKING: 

84 from debputy.lsp.diagnostics import LintSeverity 

85 

86 

87try: 

88 from Levenshtein import distance 

89except ImportError: 

90 _WARN_ONCE = False 

91 

92 def _detect_possible_typo( 

93 _key: str, 

94 _value: object, 

95 _manifest_attributes: Mapping[str, "AttributeDescription"], 

96 _path: "AttributePath", 

97 ) -> None: 

98 global _WARN_ONCE 

99 if not _WARN_ONCE: 

100 _WARN_ONCE = True 

101 _info( 

102 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

103 ) 

104 

105else: 

106 

107 def _detect_possible_typo( 

108 key: str, 

109 value: object, 

110 manifest_attributes: Mapping[str, "AttributeDescription"], 

111 path: "AttributePath", 

112 ) -> None: 

113 k_len = len(key) 

114 key_path = path[key] 

115 matches: List[str] = [] 

116 current_match_strength = 0 

117 for acceptable_key, attr in manifest_attributes.items(): 

118 if abs(k_len - len(acceptable_key)) > 2: 

119 continue 

120 d = distance(key, acceptable_key) 

121 if d > 2: 

122 continue 

123 try: 

124 attr.type_validator.ensure_type(value, key_path) 

125 except ManifestParseException: 

126 if attr.type_validator.base_type_match(value): 

127 match_strength = 1 

128 else: 

129 match_strength = 0 

130 else: 

131 match_strength = 2 

132 

133 if match_strength < current_match_strength: 

134 continue 

135 if match_strength > current_match_strength: 

136 current_match_strength = match_strength 

137 matches.clear() 

138 matches.append(acceptable_key) 

139 

140 if not matches: 

141 return 

142 ref = f'at "{path.path}"' if path else "at the manifest root level" 

143 if len(matches) == 1: 

144 possible_match = repr(matches[0]) 

145 _warn( 

146 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

147 ) 

148 else: 

149 matches.sort() 

150 possible_matches = ", ".join(repr(a) for a in matches) 

151 _warn( 

152 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

153 ) 

154 

155 

156SF = TypeVar("SF") 

157T = TypeVar("T") 

158S = TypeVar("S") 

159 

160 

161_NONE_TYPE = type(None) 

162 

163 

164# These must be able to appear in an "isinstance" check and must be builtin types. 

165BASIC_SIMPLE_TYPES = { 

166 str: "string", 

167 int: "integer", 

168 bool: "boolean", 

169} 

170 

171 

172class AttributeTypeHandler: 

173 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

174 

175 def __init__( 

176 self, 

177 description: str, 

178 ensure_type: Callable[[Any, AttributePath], None], 

179 *, 

180 base_type: Optional[Type[Any]] = None, 

181 mapper: Optional[ 

182 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

183 ] = None, 

184 ) -> None: 

185 self._description = description 

186 self._ensure_type = ensure_type 

187 self.base_type = base_type 

188 self.mapper = mapper 

189 

190 def describe_type(self) -> str: 

191 return self._description 

192 

193 def ensure_type(self, obj: object, path: AttributePath) -> None: 

194 self._ensure_type(obj, path) 

195 

196 def base_type_match(self, obj: object) -> bool: 

197 base_type = self.base_type 

198 return base_type is not None and isinstance(obj, base_type) 

199 

200 def map_type( 

201 self, 

202 value: Any, 

203 path: AttributePath, 

204 parser_context: Optional["ParserContextData"], 

205 ) -> Any: 

206 mapper = self.mapper 

207 if mapper is not None: 

208 return mapper(value, path, parser_context) 

209 return value 

210 

211 def combine_mapper( 

212 self, 

213 mapper: Optional[ 

214 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

215 ], 

216 ) -> "AttributeTypeHandler": 

217 if mapper is None: 

218 return self 

219 if self.mapper is not None: 

220 m = self.mapper 

221 

222 def _combined_mapper( 

223 value: Any, 

224 path: AttributePath, 

225 parser_context: Optional["ParserContextData"], 

226 ) -> Any: 

227 return mapper(m(value, path, parser_context), path, parser_context) 

228 

229 else: 

230 _combined_mapper = mapper 

231 

232 return AttributeTypeHandler( 

233 self._description, 

234 self._ensure_type, 

235 base_type=self.base_type, 

236 mapper=_combined_mapper, 

237 ) 

238 

239 

240@dataclasses.dataclass(slots=True) 

241class AttributeDescription: 

242 source_attribute_name: str 

243 target_attribute: str 

244 attribute_type: Any 

245 type_validator: AttributeTypeHandler 

246 annotations: Tuple[Any, ...] 

247 conflicting_attributes: FrozenSet[str] 

248 conditional_required: Optional["ConditionalRequired"] 

249 parse_hints: Optional["DetectedDebputyParseHint"] = None 

250 is_optional: bool = False 

251 

252 

253def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

254 if attribute_path.path_hint is not None: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 return True 

256 if isinstance(v, str): 

257 attribute_path.path_hint = v 

258 return True 

259 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

260 attribute_path.path_hint = v[0] 

261 return True 

262 return False 

263 

264 

265@dataclasses.dataclass(slots=True, frozen=True) 

266class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

267 alt_form_parser: AttributeDescription 

268 inline_reference_documentation: Optional[ParserDocumentation] = None 

269 expected_debputy_integration_mode: Optional[Container[DebputyIntegrationMode]] = ( 

270 None 

271 ) 

272 

273 def parse_input( 

274 self, 

275 value: object, 

276 path: AttributePath, 

277 *, 

278 parser_context: Optional["ParserContextData"] = None, 

279 ) -> TD: 

280 check_integration_mode( 

281 path, 

282 parser_context, 

283 self.expected_debputy_integration_mode, 

284 ) 

285 if self.reference_documentation_url is not None: 

286 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

287 else: 

288 doc_ref = "" 

289 

290 alt_form_parser = self.alt_form_parser 

291 if value is None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

293 if self.reference_documentation_url is not None: 

294 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

295 raise ManifestParseException( 

296 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

297 ) 

298 _extract_path_hint(value, path) 

299 alt_form_parser.type_validator.ensure_type(value, path) 

300 attribute = alt_form_parser.target_attribute 

301 alias_mapping = { 

302 attribute: ("", None), 

303 } 

304 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

305 path.alias_mapping = alias_mapping 

306 return cast("TD", {attribute: v}) 

307 

308 

309@dataclasses.dataclass(slots=True) 

310class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

311 input_time_required_parameters: FrozenSet[str] 

312 all_parameters: FrozenSet[str] 

313 manifest_attributes: Mapping[str, "AttributeDescription"] 

314 source_attributes: Mapping[str, "AttributeDescription"] 

315 at_least_one_of: FrozenSet[FrozenSet[str]] 

316 alt_form_parser: Optional[AttributeDescription] 

317 mutually_exclusive_attributes: FrozenSet[FrozenSet[str]] = frozenset() 

318 _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None 

319 inline_reference_documentation: Optional[ParserDocumentation] = None 

320 path_hint_source_attributes: Sequence[str] = tuple() 

321 expected_debputy_integration_mode: Optional[Container[DebputyIntegrationMode]] = ( 

322 None 

323 ) 

324 

325 def _parse_alt_form( 

326 self, 

327 value: object, 

328 path: AttributePath, 

329 *, 

330 parser_context: Optional["ParserContextData"] = None, 

331 ) -> TD: 

332 alt_form_parser = self.alt_form_parser 

333 if alt_form_parser is None: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise ManifestParseException( 

335 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

336 ) 

337 _extract_path_hint(value, path) 

338 alt_form_parser.type_validator.ensure_type(value, path) 

339 assert ( 

340 value is not None 

341 ), "The alternative form was None, but the parser should have rejected None earlier." 

342 attribute = alt_form_parser.target_attribute 

343 alias_mapping = { 

344 attribute: ("", None), 

345 } 

346 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

347 path.alias_mapping = alias_mapping 

348 return cast("TD", {attribute: v}) 

349 

350 def _validate_expected_keys( 

351 self, 

352 value: Dict[Any, Any], 

353 path: AttributePath, 

354 *, 

355 parser_context: Optional["ParserContextData"] = None, 

356 ) -> None: 

357 unknown_keys = value.keys() - self.all_parameters 

358 doc_ref = self._doc_url_error_suffix() 

359 if unknown_keys: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 for k in unknown_keys: 

361 if isinstance(k, str): 

362 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

363 unused_keys = self.all_parameters - value.keys() 

364 if unused_keys: 

365 k = ", ".join(unused_keys) 

366 raise ManifestParseException( 

367 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}' 

368 ) 

369 raise ManifestParseException( 

370 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}' 

371 ) 

372 missing_keys = self.input_time_required_parameters - value.keys() 

373 if missing_keys: 

374 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

375 raise ManifestParseException( 

376 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}" 

377 ) 

378 for maybe_required in self.all_parameters - value.keys(): 

379 attr = self.manifest_attributes[maybe_required] 

380 assert attr.conditional_required is None or parser_context is not None 

381 if ( 381 ↛ 387line 381 didn't jump to line 387 because the condition on line 381 was never true

382 attr.conditional_required is not None 

383 and attr.conditional_required.condition_applies( 

384 assume_not_none(parser_context) 

385 ) 

386 ): 

387 reason = attr.conditional_required.reason 

388 raise ManifestParseException( 

389 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}' 

390 ) 

391 for keyset in self.at_least_one_of: 

392 matched_keys = value.keys() & keyset 

393 if not matched_keys: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

395 raise ManifestParseException( 

396 f"At least one of the following keys must be present at {path.path_container_lc}:" 

397 f" {conditionally_required}{doc_ref}" 

398 ) 

399 for group in self.mutually_exclusive_attributes: 

400 matched = value.keys() & group 

401 if len(matched) > 1: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true

402 ck = ", ".join(repr(k) for k in sorted(matched)) 

403 raise ManifestParseException( 

404 f"Could not parse {path.path_container_lc}: The following attributes are" 

405 f" mutually exclusive: {ck}{doc_ref}" 

406 ) 

407 

408 def _parse_typed_dict_form( 

409 self, 

410 value: Dict[Any, Any], 

411 path: AttributePath, 

412 *, 

413 parser_context: Optional["ParserContextData"] = None, 

414 ) -> TD: 

415 self._validate_expected_keys(value, path, parser_context=parser_context) 

416 result = {} 

417 per_attribute_conflicts = self._per_attribute_conflicts() 

418 alias_mapping = {} 

419 for path_hint_source_attributes in self.path_hint_source_attributes: 

420 v = value.get(path_hint_source_attributes) 

421 if v is not None and _extract_path_hint(v, path): 

422 break 

423 for k, v in value.items(): 

424 attr = self.manifest_attributes[k] 

425 matched = value.keys() & per_attribute_conflicts[k] 

426 if matched: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 ck = ", ".join(repr(k) for k in sorted(matched)) 

428 raise ManifestParseException( 

429 f'The attribute "{k}" at {path.path} cannot be used with the following' 

430 f" attributes: {ck}{self._doc_url_error_suffix()}" 

431 ) 

432 nk = attr.target_attribute 

433 key_path = path[k] 

434 attr.type_validator.ensure_type(v, key_path) 

435 if v is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 continue 

437 if k != nk: 

438 alias_mapping[nk] = k, None 

439 v = attr.type_validator.map_type(v, key_path, parser_context) 

440 result[nk] = v 

441 if alias_mapping: 

442 path.alias_mapping = alias_mapping 

443 return cast("TD", result) 

444 

445 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

446 doc_url = self.reference_documentation_url 

447 if doc_url is not None: 

448 if see_url_version: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true

449 return f" Please see {doc_url} for the documentation." 

450 return f" (Documentation: {doc_url})" 

451 return "" 

452 

453 def parse_input( 

454 self, 

455 value: object, 

456 path: AttributePath, 

457 *, 

458 parser_context: Optional["ParserContextData"] = None, 

459 ) -> TD: 

460 check_integration_mode( 

461 path, 

462 parser_context, 

463 self.expected_debputy_integration_mode, 

464 ) 

465 if value is None: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true

466 form_note = " The attribute must be a mapping." 

467 if self.alt_form_parser is not None: 

468 form_note = ( 

469 " The attribute can be a mapping or a non-mapping format" 

470 ' (usually, "non-mapping format" means a string or a list of strings).' 

471 ) 

472 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

473 raise ManifestParseException( 

474 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

475 ) 

476 

477 if not isinstance(value, dict): 

478 return self._parse_alt_form(value, path, parser_context=parser_context) 

479 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

480 

481 def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: 

482 conflicts = self._per_attribute_conflicts_cache 

483 if conflicts is not None: 

484 return conflicts 

485 attrs = self.source_attributes 

486 conflicts = { 

487 a.source_attribute_name: frozenset( 

488 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

489 ) 

490 for a in attrs.values() 

491 } 

492 self._per_attribute_conflicts_cache = conflicts 

493 return self._per_attribute_conflicts_cache 

494 

495 

496def _is_path_attribute_candidate( 

497 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

498) -> bool: 

499 if ( 

500 source_attribute.parse_hints 

501 and not source_attribute.parse_hints.applicable_as_path_hint 

502 ): 

503 return False 

504 target_type = target_attribute.attribute_type 

505 _, origin, args = unpack_type(target_type, False) 

506 match_type = target_type 

507 if origin == list: 

508 match_type = args[0] 

509 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

510 

511 

512if typing.is_typeddict(DebputyParsedContent): 512 ↛ 516line 512 didn't jump to line 516 because the condition on line 512 was always true

513 is_typeddict = typing.is_typeddict 

514else: 

515 

516 def is_typeddict(t: Any) -> bool: 

517 if typing.is_typeddict(t): 

518 return True 

519 return isinstance(t, type) and issubclass(t, DebputyParsedContent) 

520 

521 

522class ParserGenerator: 

523 def __init__(self) -> None: 

524 self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} 

525 self._object_parsers: Dict[str, DispatchingObjectParser] = {} 

526 self._table_parsers: Dict[ 

527 Type[DebputyDispatchableType], DispatchingTableParser[Any] 

528 ] = {} 

529 self._in_package_context_parser: Dict[str, Any] = {} 

530 

531 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None: 

532 existing = self._registered_types.get(mapped_type.target_type) 

533 if existing is not None: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 raise ValueError(f"The type {existing} is already registered") 

535 self._registered_types[mapped_type.target_type] = mapped_type 

536 

537 def get_mapped_type_from_target_type( 

538 self, 

539 mapped_type: Type[T], 

540 ) -> Optional[TypeMapping[Any, T]]: 

541 return self._registered_types.get(mapped_type) 

542 

543 def discard_mapped_type(self, mapped_type: Type[T]) -> None: 

544 del self._registered_types[mapped_type] 

545 

546 def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None: 

547 assert rt not in self._table_parsers 

548 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

549 

550 def add_object_parser( 

551 self, 

552 path: str, 

553 *, 

554 parser_documentation: Optional[ParserDocumentation] = None, 

555 expected_debputy_integration_mode: Optional[ 

556 Container[DebputyIntegrationMode] 

557 ] = None, 

558 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error", 

559 allow_unknown_keys: bool = False, 

560 ) -> DispatchingObjectParser: 

561 assert path not in self._in_package_context_parser 

562 assert path not in self._object_parsers 

563 object_parser = DispatchingObjectParser( 

564 path, 

565 parser_documentation=parser_documentation, 

566 expected_debputy_integration_mode=expected_debputy_integration_mode, 

567 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity, 

568 allow_unknown_keys=allow_unknown_keys, 

569 ) 

570 self._object_parsers[path] = object_parser 

571 return object_parser 

572 

573 def add_in_package_context_parser( 

574 self, 

575 path: str, 

576 delegate: DeclarativeInputParser[Any], 

577 ) -> None: 

578 assert path not in self._in_package_context_parser 

579 assert path not in self._object_parsers 

580 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

581 

582 @property 

583 def dispatchable_table_parsers( 

584 self, 

585 ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

586 return self._table_parsers 

587 

588 @property 

589 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

590 return self._object_parsers 

591 

592 def dispatch_parser_table_for( 

593 self, rule_type: TTP 

594 ) -> Optional[DispatchingTableParser[TP]]: 

595 return cast( 

596 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

597 ) 

598 

599 def generate_parser( 

600 self, 

601 parsed_content: Type[TD], 

602 *, 

603 source_content: Optional[SF] = None, 

604 allow_optional: bool = False, 

605 inline_reference_documentation: Optional[ParserDocumentation] = None, 

606 expected_debputy_integration_mode: Optional[ 

607 Container[DebputyIntegrationMode] 

608 ] = None, 

609 automatic_docs: Optional[ 

610 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]] 

611 ] = None, 

612 ) -> DeclarativeInputParser[TD]: 

613 """Derive a parser from a TypedDict 

614 

615 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

616 or two that are used as a description. 

617 

618 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

619 their types. As an example: 

620 

621 >>> class InstallDocsRule(DebputyParsedContent): 

622 ... sources: List[str] 

623 ... into: List[str] 

624 >>> pg = ParserGenerator() 

625 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

626 

627 This will create a parser that would be able to interpret something like: 

628 

629 ```yaml 

630 install-docs: 

631 sources: ["docs/*"] 

632 into: ["my-pkg"] 

633 ``` 

634 

635 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

636 you can also provide a TypedDict describing the input, enabling more flexibility: 

637 

638 >>> class InstallDocsRule(DebputyParsedContent): 

639 ... sources: List[str] 

640 ... into: List[str] 

641 >>> class InputDocsRuleInputFormat(TypedDict): 

642 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

643 ... sources: NotRequired[List[str]] 

644 ... into: Union[str, List[str]] 

645 >>> pg = ParserGenerator() 

646 >>> flexible_parser = pg.generate_parser( 

647 ... InstallDocsRule, 

648 ... source_content=InputDocsRuleInputFormat, 

649 ... ) 

650 

651 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

652 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

653 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

654 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that 

655 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

656 both the previous input but also the following input: 

657 

658 ```yaml 

659 install-docs: 

660 source: "docs/*" 

661 into: "my-pkg" 

662 ``` 

663 

664 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

665 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

666 while parsing. 

667 

668 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

669 as part of the input. Example: 

670 

671 >>> class DiscardRule(DebputyParsedContent): 

672 ... paths: List[str] 

673 >>> class DiscardRuleInputDictFormat(TypedDict): 

674 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

675 ... paths: NotRequired[List[str]] 

676 >>> # This format relies on DiscardRule having exactly one Required attribute 

677 >>> DiscardRuleInputWithAltFormat = Union[ 

678 ... DiscardRuleInputDictFormat, 

679 ... str, 

680 ... List[str], 

681 ... ] 

682 >>> pg = ParserGenerator() 

683 >>> flexible_parser = pg.generate_parser( 

684 ... DiscardRule, 

685 ... source_content=DiscardRuleInputWithAltFormat, 

686 ... ) 

687 

688 

689 Supported types: 

690 * `List` - must have a fixed type argument (such as `List[str]`) 

691 * `str` 

692 * `int` 

693 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

694 in the debian/control file. The code receives the BinaryPackage instance 

695 matching that input. 

696 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

697 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

698 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

699 available statically on all Debian systems (must be in `base-passwd`). 

700 The user has multiple options for how to specify it (either via name or id). 

701 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

702 available statically on all Debian systems (must be in `base-passwd`). 

703 The user has multiple options for how to specify it (either via name or id). 

704 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

705 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

706 provides the `debputy' default `when` parameter for conditionals. 

707 

708 Supported special type-like parameters: 

709 

710 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

711 outermost level. Cannot vary between `parsed_content` and `source_content`. 

712 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

713 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

714 Automapping (see below) is restricted to two members in the Union. 

715 

716 Notable non-supported types: 

717 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

718 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

719 

720 Automatic mapping rules from `source_content` to `parsed_content`: 

721 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

722 `lambda value: value if isinstance(value, list) else [value]` 

723 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

724 

725 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

726 for concrete features that may be useful to you. 

727 

728 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

729 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

730 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

731 `List[TypedDict[...]]`. 

732 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

733 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

734 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

735 Note you should never pass the parsed_content as source_content directly. 

736 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

737 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

738 can keep this False). 

739 :param inline_reference_documentation: Optionally, programmatic documentation 

740 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the 

741 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage. 

742 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately 

743 (resulting in a "compile time" failure rather than a "runtime" failure). 

744 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

745 """ 

746 orig_parsed_content = parsed_content 

747 if source_content is parsed_content: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true

748 raise ValueError( 

749 "Do not provide source_content if it is the same as parsed_content" 

750 ) 

751 is_list_wrapped = False 

752 if get_origin(orig_parsed_content) == list: 

753 parsed_content = get_args(orig_parsed_content)[0] 

754 is_list_wrapped = True 

755 

756 if isinstance(parsed_content, type) and issubclass( 

757 parsed_content, DebputyDispatchableType 

758 ): 

759 parser = self.dispatch_parser_table_for(parsed_content) 

760 if parser is None: 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true

761 raise ValueError( 

762 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

763 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

764 ) 

765 # FIXME: Only the list wrapped version has documentation. 

766 if is_list_wrapped: 766 ↛ 772line 766 didn't jump to line 772 because the condition on line 766 was always true

767 parser = ListWrappedDeclarativeInputParser( 

768 parser, 

769 inline_reference_documentation=inline_reference_documentation, 

770 expected_debputy_integration_mode=expected_debputy_integration_mode, 

771 ) 

772 return parser 

773 

774 if not is_typeddict(parsed_content): 774 ↛ 775line 774 didn't jump to line 775 because the condition on line 774 was never true

775 raise ValueError( 

776 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

777 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

778 ) 

779 if is_list_wrapped and source_content is not None: 

780 if get_origin(source_content) != list: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true

781 raise ValueError( 

782 "If the parsed_content is a List type, then source_format must be a List type as well." 

783 ) 

784 source_content = get_args(source_content)[0] 

785 

786 target_attributes = self._parse_types( 

787 parsed_content, 

788 allow_source_attribute_annotations=source_content is None, 

789 forbid_optional=not allow_optional, 

790 ) 

791 required_target_parameters = frozenset(parsed_content.__required_keys__) 

792 parsed_alt_form = None 

793 non_mapping_source_only = False 

794 

795 if source_content is not None: 

796 default_target_attribute = None 

797 if len(required_target_parameters) == 1: 

798 default_target_attribute = next(iter(required_target_parameters)) 

799 

800 source_typed_dict, alt_source_forms = _extract_typed_dict( 

801 source_content, 

802 default_target_attribute, 

803 ) 

804 if alt_source_forms: 

805 parsed_alt_form = self._parse_alt_form( 

806 alt_source_forms, 

807 default_target_attribute, 

808 ) 

809 if source_typed_dict is not None: 

810 source_content_attributes = self._parse_types( 

811 source_typed_dict, 

812 allow_target_attribute_annotation=True, 

813 allow_source_attribute_annotations=True, 

814 forbid_optional=not allow_optional, 

815 ) 

816 source_content_parameter = "source_content" 

817 source_and_parsed_differs = True 

818 else: 

819 source_typed_dict = parsed_content 

820 source_content_attributes = target_attributes 

821 source_content_parameter = "parsed_content" 

822 source_and_parsed_differs = True 

823 non_mapping_source_only = True 

824 else: 

825 source_typed_dict = parsed_content 

826 source_content_attributes = target_attributes 

827 source_content_parameter = "parsed_content" 

828 source_and_parsed_differs = False 

829 

830 sources = collections.defaultdict(set) 

831 seen_targets = set() 

832 seen_source_names: Dict[str, str] = {} 

833 source_attributes: Dict[str, AttributeDescription] = {} 

834 path_hint_source_attributes = [] 

835 

836 for k in source_content_attributes: 

837 ia = source_content_attributes[k] 

838 

839 ta = ( 

840 target_attributes.get(ia.target_attribute) 

841 if source_and_parsed_differs 

842 else ia 

843 ) 

844 if ta is None: 844 ↛ 846line 844 didn't jump to line 846 because the condition on line 844 was never true

845 # Error message would be wrong if this assertion is false. 

846 assert source_and_parsed_differs 

847 raise ValueError( 

848 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

849 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

850 ) 

851 if _is_path_attribute_candidate(ia, ta): 

852 path_hint_source_attributes.append(ia.source_attribute_name) 

853 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

854 if existing_source_name: 854 ↛ 855line 854 didn't jump to line 855 because the condition on line 854 was never true

855 raise ValueError( 

856 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

857 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

858 f' so only one attribute use "{ia.source_attribute_name}".' 

859 ) 

860 seen_source_names[ia.source_attribute_name] = k 

861 seen_targets.add(ta.target_attribute) 

862 sources[ia.target_attribute].add(k) 

863 if source_and_parsed_differs: 

864 bridge_mapper = self._type_normalize( 

865 k, ia.attribute_type, ta.attribute_type, False 

866 ) 

867 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

868 source_attributes[k] = ia 

869 

870 def _as_attr_names(td_name: Iterable[str]) -> FrozenSet[str]: 

871 return frozenset( 

872 source_content_attributes[a].source_attribute_name for a in td_name 

873 ) 

874 

875 _check_attributes( 

876 parsed_content, 

877 source_typed_dict, 

878 source_content_attributes, 

879 sources, 

880 ) 

881 

882 at_least_one_of = frozenset( 

883 _as_attr_names(g) 

884 for k, g in sources.items() 

885 if len(g) > 1 and k in required_target_parameters 

886 ) 

887 

888 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 missing = ", ".join( 

890 repr(k) for k in (target_attributes.keys() - seen_targets) 

891 ) 

892 raise ValueError( 

893 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

894 f" {missing}" 

895 ) 

896 all_mutually_exclusive_fields = frozenset( 

897 _as_attr_names(g) for g in sources.values() if len(g) > 1 

898 ) 

899 

900 all_parameters = ( 

901 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

902 ) 

903 _check_conflicts( 

904 source_content_attributes, 

905 source_typed_dict.__required_keys__, 

906 all_parameters, 

907 ) 

908 

909 manifest_attributes = { 

910 a.source_attribute_name: a for a in source_content_attributes.values() 

911 } 

912 

913 if parsed_alt_form is not None: 

914 target_attribute = parsed_alt_form.target_attribute 

915 if ( 915 ↛ 920line 915 didn't jump to line 920 because the condition on line 915 was never true

916 target_attribute not in required_target_parameters 

917 and required_target_parameters 

918 or len(required_target_parameters) > 1 

919 ): 

920 raise NotImplementedError( 

921 "When using alternative source formats (Union[TypedDict, ...]), then the" 

922 " target must have at most one require parameter" 

923 ) 

924 bridge_mapper = self._type_normalize( 

925 target_attribute, 

926 parsed_alt_form.attribute_type, 

927 target_attributes[target_attribute].attribute_type, 

928 False, 

929 ) 

930 parsed_alt_form.type_validator = ( 

931 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

932 ) 

933 

934 inline_reference_documentation = ( 

935 _verify_and_auto_correct_inline_reference_documentation( 

936 parsed_content, 

937 source_typed_dict, 

938 source_content_attributes, 

939 inline_reference_documentation, 

940 parsed_alt_form is not None, 

941 automatic_docs, 

942 ) 

943 ) 

944 if non_mapping_source_only: 

945 parser = DeclarativeNonMappingInputParser( 

946 assume_not_none(parsed_alt_form), 

947 inline_reference_documentation=inline_reference_documentation, 

948 expected_debputy_integration_mode=expected_debputy_integration_mode, 

949 ) 

950 else: 

951 parser = DeclarativeMappingInputParser( 

952 _as_attr_names(source_typed_dict.__required_keys__), 

953 _as_attr_names(all_parameters), 

954 manifest_attributes, 

955 source_attributes, 

956 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

957 alt_form_parser=parsed_alt_form, 

958 at_least_one_of=at_least_one_of, 

959 inline_reference_documentation=inline_reference_documentation, 

960 path_hint_source_attributes=tuple(path_hint_source_attributes), 

961 expected_debputy_integration_mode=expected_debputy_integration_mode, 

962 ) 

963 if is_list_wrapped: 

964 parser = ListWrappedDeclarativeInputParser( 

965 parser, 

966 expected_debputy_integration_mode=expected_debputy_integration_mode, 

967 ) 

968 return parser 

969 

970 def _as_type_validator( 

971 self, 

972 attribute: str, 

973 provided_type: Any, 

974 parsing_typed_dict_attribute: bool, 

975 ) -> AttributeTypeHandler: 

976 assert not isinstance(provided_type, tuple) 

977 

978 if isinstance(provided_type, type) and issubclass( 

979 provided_type, DebputyDispatchableType 

980 ): 

981 return _dispatch_parser(provided_type) 

982 

983 unmapped_type = self._strip_mapped_types( 

984 provided_type, 

985 parsing_typed_dict_attribute, 

986 ) 

987 type_normalizer = self._type_normalize( 

988 attribute, 

989 unmapped_type, 

990 provided_type, 

991 parsing_typed_dict_attribute, 

992 ) 

993 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type( 

994 unmapped_type, 

995 parsing_typed_dict_attribute, 

996 ) 

997 _, t_provided_orig, t_provided_args = unpack_type( 

998 provided_type, 

999 parsing_typed_dict_attribute, 

1000 ) 

1001 

1002 if ( 1002 ↛ 1008line 1002 didn't jump to line 1008 because the condition on line 1002 was never true

1003 t_unmapped_orig == Union 

1004 and t_unmapped_args 

1005 and len(t_unmapped_args) == 2 

1006 and any(v is _NONE_TYPE for v in t_unmapped_args) 

1007 ): 

1008 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

1009 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

1010 validator = self._as_type_validator( 

1011 attribute, actual_type, parsing_typed_dict_attribute 

1012 ) 

1013 

1014 def _validator(v: Any, path: AttributePath) -> None: 

1015 if v is None: 

1016 return 

1017 validator.ensure_type(v, path) 

1018 

1019 return AttributeTypeHandler( 

1020 validator.describe_type(), 

1021 _validator, 

1022 base_type=validator.base_type, 

1023 mapper=type_normalizer, 

1024 ) 

1025 

1026 if unmapped_type in BASIC_SIMPLE_TYPES: 

1027 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1028 

1029 type_mapping = self._registered_types.get(provided_type) 

1030 if type_mapping is not None: 

1031 simple_type = f" ({type_name})" 

1032 type_name = type_mapping.target_type.__name__ 

1033 else: 

1034 simple_type = "" 

1035 

1036 def _validator(v: Any, path: AttributePath) -> None: 

1037 if not isinstance(v, unmapped_type): 

1038 _validation_type_error( 

1039 path, f"The attribute must be a {type_name}{simple_type}" 

1040 ) 

1041 

1042 return AttributeTypeHandler( 

1043 type_name, 

1044 _validator, 

1045 base_type=unmapped_type, 

1046 mapper=type_normalizer, 

1047 ) 

1048 if t_unmapped_orig == list: 

1049 if not t_unmapped_args: 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true

1050 raise ValueError( 

1051 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1052 ) 

1053 

1054 genetic_type = t_unmapped_args[0] 

1055 key_mapper = self._as_type_validator( 

1056 attribute, 

1057 genetic_type, 

1058 parsing_typed_dict_attribute, 

1059 ) 

1060 

1061 def _validator(v: Any, path: AttributePath) -> None: 

1062 if not isinstance(v, list): 1062 ↛ 1063line 1062 didn't jump to line 1063 because the condition on line 1062 was never true

1063 _validation_type_error(path, "The attribute must be a list") 

1064 for i, v in enumerate(v): 

1065 key_mapper.ensure_type(v, path[i]) 

1066 

1067 list_mapper = ( 

1068 map_each_element(key_mapper.mapper) 

1069 if key_mapper.mapper is not None 

1070 else None 

1071 ) 

1072 

1073 return AttributeTypeHandler( 

1074 f"List of {key_mapper.describe_type()}", 

1075 _validator, 

1076 base_type=list, 

1077 mapper=type_normalizer, 

1078 ).combine_mapper(list_mapper) 

1079 if is_typeddict(provided_type): 

1080 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1081 return AttributeTypeHandler( 

1082 description=f"{provided_type.__name__} (Typed Mapping)", 

1083 ensure_type=lambda v, ap: None, 

1084 base_type=dict, 

1085 mapper=lambda v, ap, cv: subparser.parse_input( 

1086 v, ap, parser_context=cv 

1087 ), 

1088 ) 

1089 if t_unmapped_orig == dict: 

1090 if not t_unmapped_args or len(t_unmapped_args) != 2: 1090 ↛ 1091line 1090 didn't jump to line 1091 because the condition on line 1090 was never true

1091 raise ValueError( 

1092 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1093 ) 

1094 if t_unmapped_args[0] != str: 1094 ↛ 1095line 1094 didn't jump to line 1095 because the condition on line 1094 was never true

1095 raise ValueError( 

1096 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1097 " Currently, only `str` is supported (Dict[str, Y])" 

1098 ) 

1099 key_mapper = self._as_type_validator( 

1100 attribute, 

1101 t_unmapped_args[0], 

1102 parsing_typed_dict_attribute, 

1103 ) 

1104 value_mapper = self._as_type_validator( 

1105 attribute, 

1106 t_unmapped_args[1], 

1107 parsing_typed_dict_attribute, 

1108 ) 

1109 

1110 if key_mapper.base_type is None: 1110 ↛ 1111line 1110 didn't jump to line 1111 because the condition on line 1110 was never true

1111 raise ValueError( 

1112 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1113 f" without trivial base types (such as `str`) are not supported at the moment." 

1114 ) 

1115 

1116 if value_mapper.mapper is not None: 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true

1117 raise ValueError( 

1118 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1119 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1120 " Better typing may come later" 

1121 ) 

1122 

1123 def _validator(uv: Any, path: AttributePath) -> None: 

1124 if not isinstance(uv, dict): 1124 ↛ 1125line 1124 didn't jump to line 1125 because the condition on line 1124 was never true

1125 _validation_type_error(path, "The attribute must be a mapping") 

1126 key_name = "the first key in the mapping" 

1127 for i, (k, v) in enumerate(uv.items()): 

1128 if not key_mapper.base_type_match(k): 1128 ↛ 1129line 1128 didn't jump to line 1129 because the condition on line 1128 was never true

1129 kp = path.copy_with_path_hint(key_name) 

1130 _validation_type_error( 

1131 kp, 

1132 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1133 ) 

1134 key_name = f"the key after {k}" 

1135 value_mapper.ensure_type(v, path[k]) 

1136 

1137 return AttributeTypeHandler( 

1138 f"Mapping of {value_mapper.describe_type()}", 

1139 _validator, 

1140 base_type=dict, 

1141 mapper=type_normalizer, 

1142 ).combine_mapper(key_mapper.mapper) 

1143 if t_unmapped_orig == Union: 

1144 if _is_two_arg_x_list_x(t_provided_args): 

1145 # Force the order to be "X, List[X]" as it simplifies the code 

1146 x_list_x = ( 

1147 t_provided_args 

1148 if get_origin(t_provided_args[1]) == list 

1149 else (t_provided_args[1], t_provided_args[0]) 

1150 ) 

1151 

1152 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1153 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1154 # case to avoid this assert and fall into the "generic case". 

1155 assert get_origin(x_list_x[0]) != list 

1156 x_subtype_checker = self._as_type_validator( 

1157 attribute, 

1158 x_list_x[0], 

1159 parsing_typed_dict_attribute, 

1160 ) 

1161 list_x_subtype_checker = self._as_type_validator( 

1162 attribute, 

1163 x_list_x[1], 

1164 parsing_typed_dict_attribute, 

1165 ) 

1166 type_description = x_subtype_checker.describe_type() 

1167 type_description = f"{type_description} or a list of {type_description}" 

1168 

1169 def _validator(v: Any, path: AttributePath) -> None: 

1170 if isinstance(v, list): 

1171 list_x_subtype_checker.ensure_type(v, path) 

1172 else: 

1173 x_subtype_checker.ensure_type(v, path) 

1174 

1175 return AttributeTypeHandler( 

1176 type_description, 

1177 _validator, 

1178 mapper=type_normalizer, 

1179 ) 

1180 else: 

1181 subtype_checker = [ 

1182 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1183 for a in t_unmapped_args 

1184 ] 

1185 type_description = "one-of: " + ", ".join( 

1186 f"{sc.describe_type()}" for sc in subtype_checker 

1187 ) 

1188 mapper = subtype_checker[0].mapper 

1189 if any(mapper != sc.mapper for sc in subtype_checker): 1189 ↛ 1190line 1189 didn't jump to line 1190 because the condition on line 1189 was never true

1190 raise ValueError( 

1191 f'Cannot handle the union "{provided_type}" as the target types need different' 

1192 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1193 " where X is a non-collection type." 

1194 ) 

1195 

1196 def _validator(v: Any, path: AttributePath) -> None: 

1197 partial_matches = [] 

1198 for sc in subtype_checker: 1198 ↛ 1206line 1198 didn't jump to line 1206 because the loop on line 1198 didn't complete

1199 try: 

1200 sc.ensure_type(v, path) 

1201 return 

1202 except ManifestParseException as e: 

1203 if sc.base_type_match(v): 1203 ↛ 1204line 1203 didn't jump to line 1204 because the condition on line 1203 was never true

1204 partial_matches.append((sc, e)) 

1205 

1206 if len(partial_matches) == 1: 

1207 raise partial_matches[0][1] 

1208 _validation_type_error( 

1209 path, f"Could not match against: {type_description}" 

1210 ) 

1211 

1212 return AttributeTypeHandler( 

1213 type_description, 

1214 _validator, 

1215 mapper=type_normalizer, 

1216 ) 

1217 if t_unmapped_orig == Literal: 

1218 # We want "x" for string values; repr provides 'x' 

1219 pretty = ", ".join( 

1220 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args 

1221 ) 

1222 

1223 def _validator(v: Any, path: AttributePath) -> None: 

1224 if v not in t_unmapped_args: 

1225 value_hint = "" 

1226 if isinstance(v, str): 1226 ↛ 1228line 1226 didn't jump to line 1228 because the condition on line 1226 was always true

1227 value_hint = f"({v}) " 

1228 _validation_type_error( 

1229 path, 

1230 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1231 ) 

1232 

1233 return AttributeTypeHandler( 

1234 f"One of the following literal values: {pretty}", 

1235 _validator, 

1236 ) 

1237 

1238 if provided_type == Any: 1238 ↛ 1243line 1238 didn't jump to line 1243 because the condition on line 1238 was always true

1239 return AttributeTypeHandler( 

1240 "any (unvalidated)", 

1241 lambda *a: None, 

1242 ) 

1243 raise ValueError( 

1244 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1245 ) 

1246 

1247 def _parse_types( 

1248 self, 

1249 spec: Type[TypedDict], 

1250 allow_target_attribute_annotation: bool = False, 

1251 allow_source_attribute_annotations: bool = False, 

1252 forbid_optional: bool = True, 

1253 ) -> Dict[str, AttributeDescription]: 

1254 annotations = get_type_hints(spec, include_extras=True) 

1255 return { 

1256 k: self._attribute_description( 

1257 k, 

1258 t, 

1259 k in spec.__required_keys__, 

1260 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1261 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1262 forbid_optional=forbid_optional, 

1263 ) 

1264 for k, t in annotations.items() 

1265 } 

1266 

1267 def _attribute_description( 

1268 self, 

1269 attribute: str, 

1270 orig_td: Any, 

1271 is_required: bool, 

1272 forbid_optional: bool = True, 

1273 allow_target_attribute_annotation: bool = False, 

1274 allow_source_attribute_annotations: bool = False, 

1275 ) -> AttributeDescription: 

1276 td, anno, is_optional = _parse_type( 

1277 attribute, orig_td, forbid_optional=forbid_optional 

1278 ) 

1279 type_validator = self._as_type_validator(attribute, td, True) 

1280 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1281 anno, 

1282 f' Seen with attribute "{attribute}".', 

1283 attribute, 

1284 is_required, 

1285 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1286 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1287 ) 

1288 return AttributeDescription( 

1289 target_attribute=parsed_annotations.target_attribute, 

1290 attribute_type=td, 

1291 type_validator=type_validator, 

1292 annotations=anno, 

1293 is_optional=is_optional, 

1294 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1295 conditional_required=parsed_annotations.conditional_required, 

1296 source_attribute_name=assume_not_none( 

1297 parsed_annotations.source_manifest_attribute 

1298 ), 

1299 parse_hints=parsed_annotations, 

1300 ) 

1301 

1302 def _parse_alt_form( 

1303 self, 

1304 alt_form, 

1305 default_target_attribute: Optional[str], 

1306 ) -> AttributeDescription: 

1307 td, anno, is_optional = _parse_type( 

1308 "source_format alternative form", 

1309 alt_form, 

1310 forbid_optional=True, 

1311 parsing_typed_dict_attribute=False, 

1312 ) 

1313 type_validator = self._as_type_validator( 

1314 "source_format alternative form", 

1315 td, 

1316 True, 

1317 ) 

1318 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1319 anno, 

1320 " The alternative for source_format.", 

1321 None, 

1322 False, 

1323 default_target_attribute=default_target_attribute, 

1324 allow_target_attribute_annotation=True, 

1325 allow_source_attribute_annotations=False, 

1326 ) 

1327 return AttributeDescription( 

1328 target_attribute=parsed_annotations.target_attribute, 

1329 attribute_type=td, 

1330 type_validator=type_validator, 

1331 annotations=anno, 

1332 is_optional=is_optional, 

1333 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1334 conditional_required=parsed_annotations.conditional_required, 

1335 source_attribute_name="Alt form of the source_format", 

1336 ) 

1337 

1338 def _union_narrowing( 

1339 self, 

1340 input_type: Any, 

1341 target_type: Any, 

1342 parsing_typed_dict_attribute: bool, 

1343 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

1344 _, input_orig, input_args = unpack_type( 

1345 input_type, parsing_typed_dict_attribute 

1346 ) 

1347 _, target_orig, target_args = unpack_type( 

1348 target_type, parsing_typed_dict_attribute 

1349 ) 

1350 

1351 if input_orig != Union or not input_args: 1351 ↛ 1352line 1351 didn't jump to line 1352 because the condition on line 1351 was never true

1352 raise ValueError("input_type must be a Union[...] with non-empty args") 

1353 

1354 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1355 # - Where X = Y or there is a simple standard transformation from X to Y. 

1356 

1357 if target_orig not in (Union, list) or not target_args: 

1358 # Not supported 

1359 return None 

1360 

1361 if target_orig == Union and set(input_args) == set(target_args): 1361 ↛ 1363line 1361 didn't jump to line 1363 because the condition on line 1361 was never true

1362 # Not needed (identity mapping) 

1363 return None 

1364 

1365 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1365 ↛ 1367line 1365 didn't jump to line 1367 because the condition on line 1365 was never true

1366 # Not supported 

1367 return None 

1368 

1369 target_arg = target_args[0] 

1370 simplified_type = self._strip_mapped_types( 

1371 target_arg, parsing_typed_dict_attribute 

1372 ) 

1373 acceptable_types = { 

1374 target_arg, 

1375 List[target_arg], # type: ignore 

1376 simplified_type, 

1377 List[simplified_type], # type: ignore 

1378 } 

1379 target_format = ( 

1380 target_arg, 

1381 List[target_arg], # type: ignore 

1382 ) 

1383 in_target_format = 0 

1384 in_simple_format = 0 

1385 for input_arg in input_args: 

1386 if input_arg not in acceptable_types: 1386 ↛ 1388line 1386 didn't jump to line 1388 because the condition on line 1386 was never true

1387 # Not supported 

1388 return None 

1389 if input_arg in target_format: 

1390 in_target_format += 1 

1391 else: 

1392 in_simple_format += 1 

1393 

1394 assert in_simple_format or in_target_format 

1395 

1396 if in_target_format and not in_simple_format: 

1397 # Union[X, List[X]] -> List[X] 

1398 return normalize_into_list 

1399 mapped = self._registered_types[target_arg] 

1400 if not in_target_format and in_simple_format: 1400 ↛ 1415line 1400 didn't jump to line 1415 because the condition on line 1400 was always true

1401 # Union[X, List[X]] -> List[Y] 

1402 

1403 def _mapper_x_list_y( 

1404 x: Union[Any, List[Any]], 

1405 ap: AttributePath, 

1406 pc: Optional["ParserContextData"], 

1407 ) -> List[Any]: 

1408 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

1409 

1410 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1411 

1412 return _mapper_x_list_y 

1413 

1414 # Union[Y, List[X]] -> List[Y] 

1415 if not isinstance(target_arg, type): 

1416 raise ValueError( 

1417 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1418 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1419 f" in the source content (but both a mix of both)" 

1420 ) 

1421 

1422 def _mapper_mixed_list_y( 

1423 x: Union[Any, List[Any]], 

1424 ap: AttributePath, 

1425 pc: Optional["ParserContextData"], 

1426 ) -> List[Any]: 

1427 in_list_form: List[Any] = normalize_into_list(x, ap, pc) 

1428 

1429 return [ 

1430 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1431 for x in in_list_form 

1432 ] 

1433 

1434 return _mapper_mixed_list_y 

1435 

1436 def _type_normalize( 

1437 self, 

1438 attribute: str, 

1439 input_type: Any, 

1440 target_type: Any, 

1441 parsing_typed_dict_attribute: bool, 

1442 ) -> Optional[Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]]: 

1443 if input_type == target_type: 

1444 return None 

1445 _, input_orig, input_args = unpack_type( 

1446 input_type, parsing_typed_dict_attribute 

1447 ) 

1448 _, target_orig, target_args = unpack_type( 

1449 target_type, 

1450 parsing_typed_dict_attribute, 

1451 ) 

1452 if input_orig == Union: 

1453 result = self._union_narrowing( 

1454 input_type, target_type, parsing_typed_dict_attribute 

1455 ) 

1456 if result: 

1457 return result 

1458 elif target_orig == list and target_args[0] == input_type: 

1459 return wrap_into_list 

1460 

1461 mapped = self._registered_types.get(target_type) 

1462 if mapped is not None and input_type == mapped.source_type: 

1463 # Source -> Target 

1464 return mapped.mapper 

1465 if target_orig == list and target_args: 1465 ↛ 1483line 1465 didn't jump to line 1483 because the condition on line 1465 was always true

1466 mapped = self._registered_types.get(target_args[0]) 

1467 if mapped is not None: 1467 ↛ 1483line 1467 didn't jump to line 1483 because the condition on line 1467 was always true

1468 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1469 mapped_type: TypeMapping = mapped 

1470 if input_type == mapped.source_type: 1470 ↛ 1472line 1470 didn't jump to line 1472 because the condition on line 1470 was never true

1471 # Source -> List[Target] 

1472 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1473 if ( 1473 ↛ 1483line 1473 didn't jump to line 1483 because the condition on line 1473 was always true

1474 input_orig == list 

1475 and input_args 

1476 and input_args[0] == mapped_type.source_type 

1477 ): 

1478 # List[Source] -> List[Target] 

1479 return lambda xs, ap, pc: [ 

1480 mapped_type.mapper(x, ap, pc) for x in xs 

1481 ] 

1482 

1483 raise ValueError( 

1484 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1485 f" {input_type} to {target_type}" 

1486 ) 

1487 

1488 def _strip_mapped_types( 

1489 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1490 ) -> Any: 

1491 m = self._registered_types.get(orig_td) 

1492 if m is not None: 

1493 return m.source_type 

1494 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1495 if v == list: 

1496 arg = args[0] 

1497 m = self._registered_types.get(arg) 

1498 if m: 

1499 return List[m.source_type] # type: ignore 

1500 if v == Union: 

1501 stripped_args = tuple( 

1502 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1503 ) 

1504 if stripped_args != args: 

1505 return Union[stripped_args] 

1506 return orig_td 

1507 

1508 

1509def _sort_key(attr: StandardParserAttributeDocumentation) -> Any: 

1510 key = next(iter(attr.attributes)) 

1511 return attr.sort_category, key 

1512 

1513 

1514def _apply_std_docs( 

1515 std_doc_table: Optional[ 

1516 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]] 

1517 ], 

1518 source_format_typed_dict: Type[Any], 

1519 attribute_docs: Optional[Sequence[ParserAttributeDocumentation]], 

1520) -> Optional[Sequence[ParserAttributeDocumentation]]: 

1521 if std_doc_table is None or not std_doc_table: 1521 ↛ 1524line 1521 didn't jump to line 1524 because the condition on line 1521 was always true

1522 return attribute_docs 

1523 

1524 has_docs_for = set() 

1525 if attribute_docs: 

1526 for attribute_doc in attribute_docs: 

1527 has_docs_for.update(attribute_doc.attributes) 

1528 

1529 base_seen = set() 

1530 std_docs_used = [] 

1531 

1532 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", [])) 

1533 base_seen.update(remaining_bases) 

1534 while remaining_bases: 

1535 base = remaining_bases.pop() 

1536 new_bases_to_check = { 

1537 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen 

1538 } 

1539 remaining_bases.update(new_bases_to_check) 

1540 base_seen.update(new_bases_to_check) 

1541 std_docs = std_doc_table.get(base) 

1542 if std_docs: 

1543 for std_doc in std_docs: 

1544 if any(a in has_docs_for for a in std_doc.attributes): 

1545 # If there is any overlap, do not add the docs 

1546 continue 

1547 has_docs_for.update(std_doc.attributes) 

1548 std_docs_used.append(std_doc) 

1549 

1550 if not std_docs_used: 

1551 return attribute_docs 

1552 docs = sorted(std_docs_used, key=_sort_key) 

1553 if attribute_docs: 

1554 # Plugin provided attributes first 

1555 c = list(attribute_docs) 

1556 c.extend(docs) 

1557 docs = c 

1558 return tuple(docs) 

1559 

1560 

1561def _verify_and_auto_correct_inline_reference_documentation( 

1562 parsed_content: Type[TD], 

1563 source_typed_dict: Type[Any], 

1564 source_content_attributes: Mapping[str, AttributeDescription], 

1565 inline_reference_documentation: Optional[ParserDocumentation], 

1566 has_alt_form: bool, 

1567 automatic_docs: Optional[ 

1568 Mapping[Type[Any], Sequence[StandardParserAttributeDocumentation]] 

1569 ] = None, 

1570) -> Optional[ParserDocumentation]: 

1571 orig_attribute_docs = ( 

1572 inline_reference_documentation.attribute_doc 

1573 if inline_reference_documentation 

1574 else None 

1575 ) 

1576 attribute_docs = _apply_std_docs( 

1577 automatic_docs, 

1578 source_typed_dict, 

1579 orig_attribute_docs, 

1580 ) 

1581 if inline_reference_documentation is None and attribute_docs is None: 

1582 return None 

1583 changes = {} 

1584 if attribute_docs: 

1585 seen = set() 

1586 had_any_custom_docs = False 

1587 for attr_doc in attribute_docs: 

1588 if not isinstance(attr_doc, StandardParserAttributeDocumentation): 

1589 had_any_custom_docs = True 

1590 for attr_name in attr_doc.attributes: 

1591 attr = source_content_attributes.get(attr_name) 

1592 if attr is None: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true

1593 raise ValueError( 

1594 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1595 f' references an attribute "{attr_name}", which does not exist in the source format.' 

1596 ) 

1597 if attr_name in seen: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true

1598 raise ValueError( 

1599 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1600 f' has documentation for "{attr_name}" twice, which is not supported.' 

1601 f" Please document it at most once" 

1602 ) 

1603 seen.add(attr_name) 

1604 undocumented = source_content_attributes.keys() - seen 

1605 if undocumented: 1605 ↛ 1606line 1605 didn't jump to line 1606 because the condition on line 1605 was never true

1606 if had_any_custom_docs: 

1607 undocumented_attrs = ", ".join(undocumented) 

1608 raise ValueError( 

1609 f"The following attributes were not documented for the source format of" 

1610 f" {parsed_content.__qualname__}. If this is deliberate, then please" 

1611 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1612 f" {undocumented_attrs}" 

1613 ) 

1614 combined_docs = list(attribute_docs) 

1615 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented)) 

1616 attribute_docs = combined_docs 

1617 

1618 if attribute_docs and orig_attribute_docs != attribute_docs: 1618 ↛ 1619line 1618 didn't jump to line 1619 because the condition on line 1618 was never true

1619 assert attribute_docs is not None 

1620 changes["attribute_doc"] = tuple(attribute_docs) 

1621 

1622 if ( 1622 ↛ 1627line 1622 didn't jump to line 1627 because the condition on line 1622 was never true

1623 inline_reference_documentation is not None 

1624 and inline_reference_documentation.alt_parser_description 

1625 and not has_alt_form 

1626 ): 

1627 raise ValueError( 

1628 "The inline_reference_documentation had documentation for an non-mapping format," 

1629 " but the source format does not have a non-mapping format." 

1630 ) 

1631 if changes: 1631 ↛ 1632line 1631 didn't jump to line 1632 because the condition on line 1631 was never true

1632 if inline_reference_documentation is None: 

1633 inline_reference_documentation = reference_documentation() 

1634 return inline_reference_documentation.replace(**changes) 

1635 return inline_reference_documentation 

1636 

1637 

1638def _check_conflicts( 

1639 input_content_attributes: Dict[str, AttributeDescription], 

1640 required_attributes: FrozenSet[str], 

1641 all_attributes: FrozenSet[str], 

1642) -> None: 

1643 for attr_name, attr in input_content_attributes.items(): 

1644 if attr_name in required_attributes and attr.conflicting_attributes: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true

1645 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1646 raise ValueError( 

1647 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1648 " This makes it impossible to use these attributes. Either remove the attributes" 

1649 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1650 " optional (NotRequired)" 

1651 ) 

1652 else: 

1653 required_conflicts = attr.conflicting_attributes & required_attributes 

1654 if required_conflicts: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the condition on line 1654 was never true

1655 c = ", ".join(repr(a) for a in required_conflicts) 

1656 raise ValueError( 

1657 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1658 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1659 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1660 ) 

1661 unknown_attributes = attr.conflicting_attributes - all_attributes 

1662 if unknown_attributes: 1662 ↛ 1663line 1662 didn't jump to line 1663 because the condition on line 1662 was never true

1663 c = ", ".join(repr(a) for a in unknown_attributes) 

1664 raise ValueError( 

1665 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1666 f" None of these attributes were declared in the input." 

1667 ) 

1668 

1669 

1670def _check_attributes( 

1671 content: Type[TypedDict], 

1672 input_content: Type[TypedDict], 

1673 input_content_attributes: Dict[str, AttributeDescription], 

1674 sources: Mapping[str, Collection[str]], 

1675) -> None: 

1676 target_required_keys = content.__required_keys__ 

1677 input_required_keys = input_content.__required_keys__ 

1678 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1679 

1680 for input_name in all_input_keys: 

1681 attr = input_content_attributes[input_name] 

1682 target_name = attr.target_attribute 

1683 source_names = sources[target_name] 

1684 input_is_required = input_name in input_required_keys 

1685 target_is_required = target_name in target_required_keys 

1686 

1687 assert source_names 

1688 

1689 if input_is_required and len(source_names) > 1: 1689 ↛ 1690line 1689 didn't jump to line 1690 because the condition on line 1689 was never true

1690 raise ValueError( 

1691 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1692 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1693 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1694 f' "{input_name}" might be missing a NotRequired type' 

1695 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1696 ) 

1697 if not input_is_required and target_is_required and len(source_names) == 1: 1697 ↛ 1698line 1697 didn't jump to line 1698 because the condition on line 1697 was never true

1698 raise ValueError( 

1699 f'The source attribute "{input_name}" is not marked as required and maps to' 

1700 f' "{target_name}", which is marked as required. As there are no other attributes' 

1701 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1702 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1703 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1704 ) 

1705 

1706 

1707def _validation_type_error(path: AttributePath, message: str) -> None: 

1708 raise ManifestParseException( 

1709 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1710 ) 

1711 

1712 

1713def _is_two_arg_x_list_x(t_args: Tuple[Any, ...]) -> bool: 

1714 if len(t_args) != 2: 

1715 return False 

1716 lhs, rhs = t_args 

1717 if get_origin(lhs) == list: 

1718 if get_origin(rhs) == list: 1718 ↛ 1721line 1718 didn't jump to line 1721 because the condition on line 1718 was never true

1719 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1720 # does not support it. 

1721 return False 

1722 l_args = get_args(lhs) 

1723 return bool(l_args and l_args[0] == rhs) 

1724 if get_origin(rhs) == list: 

1725 r_args = get_args(rhs) 

1726 return bool(r_args and r_args[0] == lhs) 

1727 return False 

1728 

1729 

1730def _extract_typed_dict( 

1731 base_type, 

1732 default_target_attribute: Optional[str], 

1733) -> Tuple[Optional[Type[TypedDict]], Any]: 

1734 if is_typeddict(base_type): 

1735 return base_type, None 

1736 _, origin, args = unpack_type(base_type, False) 

1737 if origin != Union: 

1738 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1738 ↛ 1739line 1738 didn't jump to line 1739 because the condition on line 1738 was never true

1739 raise ValueError( 

1740 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1741 ) 

1742 return None, base_type 

1743 typed_dicts = [x for x in args if is_typeddict(x)] 

1744 if len(typed_dicts) > 1: 1744 ↛ 1745line 1744 didn't jump to line 1745 because the condition on line 1744 was never true

1745 raise ValueError( 

1746 "When source_format is a Union, it must contain at most one TypedDict" 

1747 ) 

1748 typed_dict = typed_dicts[0] if typed_dicts else None 

1749 

1750 if any(x is None or x is _NONE_TYPE for x in args): 1750 ↛ 1751line 1750 didn't jump to line 1751 because the condition on line 1750 was never true

1751 raise ValueError( 

1752 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1753 ) 

1754 

1755 if any( 1755 ↛ 1760line 1755 didn't jump to line 1760 because the condition on line 1755 was never true

1756 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1757 for x in args 

1758 if x is not typed_dict 

1759 ): 

1760 raise ValueError( 

1761 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1762 ) 

1763 remaining = [x for x in args if x is not typed_dict] 

1764 has_target_attribute = False 

1765 anno = None 

1766 if len(remaining) == 1: 1766 ↛ 1767line 1766 didn't jump to line 1767 because the condition on line 1766 was never true

1767 base_type, anno, _ = _parse_type( 

1768 "source_format alternative form", 

1769 remaining[0], 

1770 forbid_optional=True, 

1771 parsing_typed_dict_attribute=False, 

1772 ) 

1773 has_target_attribute = bool(anno) and any( 

1774 isinstance(x, TargetAttribute) for x in anno 

1775 ) 

1776 target_type = base_type 

1777 else: 

1778 target_type = Union[tuple(remaining)] 

1779 

1780 if default_target_attribute is None and not has_target_attribute: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true

1781 raise ValueError( 

1782 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1783 " OR the parsed_content format must have exactly one attribute that is required." 

1784 ) 

1785 if anno: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true

1786 final_anno = [target_type] 

1787 final_anno.extend(anno) 

1788 return typed_dict, Annotated[tuple(final_anno)] 

1789 return typed_dict, target_type 

1790 

1791 

1792def _dispatch_parse_generator( 

1793 dispatch_type: Type[DebputyDispatchableType], 

1794) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1795 def _dispatch_parse( 

1796 value: Any, 

1797 attribute_path: AttributePath, 

1798 parser_context: Optional["ParserContextData"], 

1799 ): 

1800 assert parser_context is not None 

1801 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1802 return dispatching_parser.parse_input( 

1803 value, attribute_path, parser_context=parser_context 

1804 ) 

1805 

1806 return _dispatch_parse 

1807 

1808 

1809def _dispatch_parser( 

1810 dispatch_type: Type[DebputyDispatchableType], 

1811) -> AttributeTypeHandler: 

1812 return AttributeTypeHandler( 

1813 dispatch_type.__name__, 

1814 lambda *a: None, 

1815 mapper=_dispatch_parse_generator(dispatch_type), 

1816 ) 

1817 

1818 

1819def _parse_type( 

1820 attribute: str, 

1821 orig_td: Any, 

1822 forbid_optional: bool = True, 

1823 parsing_typed_dict_attribute: bool = True, 

1824) -> Tuple[Any, Tuple[Any, ...], bool]: 

1825 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1826 md: Tuple[Any, ...] = tuple() 

1827 optional = False 

1828 if v is not None: 

1829 if v == Annotated: 

1830 anno = get_args(td) 

1831 md = anno[1:] 

1832 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1833 

1834 if td is _NONE_TYPE: 1834 ↛ 1835line 1834 didn't jump to line 1835 because the condition on line 1834 was never true

1835 raise ValueError( 

1836 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1837 " debputy manifest, so this attribute does not make sense in its current form." 

1838 ) 

1839 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1839 ↛ 1840line 1839 didn't jump to line 1840 because the condition on line 1839 was never true

1840 raise ValueError( 

1841 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1842 " Please use NotRequired for optional fields" 

1843 ) 

1844 

1845 return td, md, optional 

1846 

1847 

1848def _normalize_attribute_name(attribute: str) -> str: 

1849 if attribute.endswith("_"): 

1850 attribute = attribute[:-1] 

1851 return attribute.replace("_", "-") 

1852 

1853 

1854@dataclasses.dataclass 

1855class DetectedDebputyParseHint: 

1856 target_attribute: str 

1857 source_manifest_attribute: Optional[str] 

1858 conflict_with_source_attributes: FrozenSet[str] 

1859 conditional_required: Optional[ConditionalRequired] 

1860 applicable_as_path_hint: bool 

1861 

1862 @classmethod 

1863 def parse_annotations( 

1864 cls, 

1865 anno: Tuple[Any, ...], 

1866 error_context: str, 

1867 default_attribute_name: Optional[str], 

1868 is_required: bool, 

1869 default_target_attribute: Optional[str] = None, 

1870 allow_target_attribute_annotation: bool = False, 

1871 allow_source_attribute_annotations: bool = False, 

1872 ) -> "DetectedDebputyParseHint": 

1873 target_attr_anno = find_annotation(anno, TargetAttribute) 

1874 if target_attr_anno: 

1875 if not allow_target_attribute_annotation: 1875 ↛ 1876line 1875 didn't jump to line 1876 because the condition on line 1875 was never true

1876 raise ValueError( 

1877 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1878 ) 

1879 target_attribute = target_attr_anno.attribute 

1880 elif default_target_attribute is not None: 

1881 target_attribute = default_target_attribute 

1882 elif default_attribute_name is not None: 1882 ↛ 1885line 1882 didn't jump to line 1885 because the condition on line 1882 was always true

1883 target_attribute = default_attribute_name 

1884 else: 

1885 if default_attribute_name is None: 

1886 raise ValueError( 

1887 "allow_target_attribute_annotation must be True OR " 

1888 "default_attribute_name/default_target_attribute must be not None" 

1889 ) 

1890 raise ValueError( 

1891 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1892 ) 

1893 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1894 _source_attribute_allowed( 

1895 allow_source_attribute_annotations, error_context, source_attribute_anno 

1896 ) 

1897 if source_attribute_anno: 

1898 source_attribute_name = source_attribute_anno.attribute 

1899 elif default_attribute_name is not None: 

1900 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1901 else: 

1902 source_attribute_name = None 

1903 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1904 if mutual_exclusive_with_anno: 

1905 _source_attribute_allowed( 

1906 allow_source_attribute_annotations, 

1907 error_context, 

1908 mutual_exclusive_with_anno, 

1909 ) 

1910 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1911 else: 

1912 conflicting_attributes = frozenset() 

1913 conditional_required = find_annotation(anno, ConditionalRequired) 

1914 

1915 if conditional_required and is_required: 1915 ↛ 1916line 1915 didn't jump to line 1916 because the condition on line 1915 was never true

1916 if default_attribute_name is None: 

1917 raise ValueError( 

1918 f"is_required cannot be True without default_attribute_name being not None" 

1919 ) 

1920 raise ValueError( 

1921 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1922 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1923 ) 

1924 

1925 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1926 applicable_as_path_hint = not_path_hint_anno is None 

1927 

1928 return DetectedDebputyParseHint( 

1929 target_attribute=target_attribute, 

1930 source_manifest_attribute=source_attribute_name, 

1931 conflict_with_source_attributes=conflicting_attributes, 

1932 conditional_required=conditional_required, 

1933 applicable_as_path_hint=applicable_as_path_hint, 

1934 ) 

1935 

1936 

1937def _source_attribute_allowed( 

1938 source_attribute_allowed: bool, 

1939 error_context: str, 

1940 annotation: Optional[DebputyParseHint], 

1941) -> None: 

1942 if source_attribute_allowed or annotation is None: 1942 ↛ 1944line 1942 didn't jump to line 1944 because the condition on line 1942 was always true

1943 return 

1944 raise ValueError( 

1945 f'The annotation "{annotation}" cannot be used here. {error_context}' 

1946 )