Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%

797 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-14 10:41 +0000

1import collections 

2import dataclasses 

3import typing 

4from types import UnionType 

5from typing import ( 

6 Any, 

7 TypedDict, 

8 get_type_hints, 

9 Annotated, 

10 get_args, 

11 get_origin, 

12 TypeVar, 

13 Generic, 

14 Optional, 

15 cast, 

16 Type, 

17 Union, 

18 List, 

19 NotRequired, 

20 Literal, 

21 TYPE_CHECKING, 

22) 

23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container 

24 

25 

26from debputy.manifest_parser.base_types import FileSystemMatchRule 

27from debputy.manifest_parser.exceptions import ( 

28 ManifestParseException, 

29) 

30from debputy.manifest_parser.mapper_code import ( 

31 normalize_into_list, 

32 wrap_into_list, 

33 map_each_element, 

34) 

35from debputy.manifest_parser.parse_hints import ( 

36 ConditionalRequired, 

37 DebputyParseHint, 

38 TargetAttribute, 

39 ManifestAttribute, 

40 ConflictWithSourceAttribute, 

41 NotPathHint, 

42) 

43from debputy.manifest_parser.parser_data import ParserContextData 

44from debputy.manifest_parser.tagging_types import ( 

45 DebputyParsedContent, 

46 DebputyDispatchableType, 

47 TypeMapping, 

48) 

49from debputy.manifest_parser.util import ( 

50 AttributePath, 

51 unpack_type, 

52 find_annotation, 

53 check_integration_mode, 

54) 

55from debputy.plugin.api.impl_types import ( 

56 DeclarativeInputParser, 

57 TD, 

58 ListWrappedDeclarativeInputParser, 

59 DispatchingObjectParser, 

60 DispatchingTableParser, 

61 TTP, 

62 TP, 

63 InPackageContextParser, 

64) 

65from debputy.plugin.api.spec import ( 

66 ParserDocumentation, 

67 DebputyIntegrationMode, 

68 StandardParserAttributeDocumentation, 

69 undocumented_attr, 

70 ParserAttributeDocumentation, 

71 reference_documentation, 

72) 

73from debputy.util import _info, _warn, assume_not_none 

74 

75 

76if TYPE_CHECKING: 

77 from debputy.lsp.diagnostics import LintSeverity 

78 

79 

80try: 

81 from Levenshtein import distance 

82 

83 _WARN_ONCE: bool | None = None 

84except ImportError: 

85 _WARN_ONCE = False 

86 

87 

88def _detect_possible_typo( 

89 key: str, 

90 value: object, 

91 manifest_attributes: Mapping[str, "AttributeDescription"], 

92 path: "AttributePath", 

93) -> None: 

94 global _WARN_ONCE 

95 if _WARN_ONCE == False: 

96 _WARN_ONCE = True 

97 _info( 

98 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

99 ) 

100 elif _WARN_ONCE is None: 

101 k_len = len(key) 

102 key_path = path[key] 

103 matches: list[str] = [] 

104 current_match_strength = 0 

105 for acceptable_key, attr in manifest_attributes.items(): 

106 if abs(k_len - len(acceptable_key)) > 2: 

107 continue 

108 d = distance(key, acceptable_key) 

109 if d > 2: 

110 continue 

111 try: 

112 attr.type_validator.ensure_type(value, key_path) 

113 except ManifestParseException: 

114 if attr.type_validator.base_type_match(value): 

115 match_strength = 1 

116 else: 

117 match_strength = 0 

118 else: 

119 match_strength = 2 

120 

121 if match_strength < current_match_strength: 

122 continue 

123 if match_strength > current_match_strength: 

124 current_match_strength = match_strength 

125 matches.clear() 

126 matches.append(acceptable_key) 

127 

128 if not matches: 

129 return 

130 ref = f'at "{path.path}"' if path else "at the manifest root level" 

131 if len(matches) == 1: 

132 possible_match = repr(matches[0]) 

133 _warn( 

134 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

135 ) 

136 else: 

137 matches.sort() 

138 possible_matches = ", ".join(repr(a) for a in matches) 

139 _warn( 

140 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

141 ) 

142 

143 

144SF = TypeVar("SF") 

145T = TypeVar("T") 

146S = TypeVar("S") 

147 

148 

149_NONE_TYPE = type(None) 

150 

151 

152# These must be able to appear in an "isinstance" check and must be builtin types. 

153BASIC_SIMPLE_TYPES = { 

154 str: "string", 

155 int: "integer", 

156 bool: "boolean", 

157} 

158 

159 

160class AttributeTypeHandler: 

161 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

162 

163 def __init__( 

164 self, 

165 description: str, 

166 ensure_type: Callable[[Any, AttributePath], None], 

167 *, 

168 base_type: type[Any] | None = None, 

169 mapper: None | ( 

170 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

171 ) = None, 

172 ) -> None: 

173 self._description = description 

174 self._ensure_type = ensure_type 

175 self.base_type = base_type 

176 self.mapper = mapper 

177 

178 def describe_type(self) -> str: 

179 return self._description 

180 

181 def ensure_type(self, obj: object, path: AttributePath) -> None: 

182 self._ensure_type(obj, path) 

183 

184 def base_type_match(self, obj: object) -> bool: 

185 base_type = self.base_type 

186 return base_type is not None and isinstance(obj, base_type) 

187 

188 def map_type( 

189 self, 

190 value: Any, 

191 path: AttributePath, 

192 parser_context: Optional["ParserContextData"], 

193 ) -> Any: 

194 mapper = self.mapper 

195 if mapper is not None: 

196 return mapper(value, path, parser_context) 

197 return value 

198 

199 def combine_mapper( 

200 self, 

201 mapper: None | ( 

202 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

203 ), 

204 ) -> "AttributeTypeHandler": 

205 if mapper is None: 

206 return self 

207 _combined_mapper: Callable[ 

208 [Any, AttributePath, Optional["ParserContextData"]], Any 

209 ] 

210 if self.mapper is not None: 

211 m = self.mapper 

212 

213 def _combined_mapper( 

214 value: Any, 

215 path: AttributePath, 

216 parser_context: Optional["ParserContextData"], 

217 ) -> Any: 

218 return mapper(m(value, path, parser_context), path, parser_context) 

219 

220 else: 

221 _combined_mapper = mapper 

222 

223 return AttributeTypeHandler( 

224 self._description, 

225 self._ensure_type, 

226 base_type=self.base_type, 

227 mapper=_combined_mapper, 

228 ) 

229 

230 

231@dataclasses.dataclass(slots=True) 

232class AttributeDescription: 

233 source_attribute_name: str 

234 target_attribute: str 

235 attribute_type: Any 

236 type_validator: AttributeTypeHandler 

237 annotations: tuple[Any, ...] 

238 conflicting_attributes: frozenset[str] 

239 conditional_required: Optional["ConditionalRequired"] 

240 parse_hints: Optional["DetectedDebputyParseHint"] = None 

241 is_optional: bool = False 

242 

243 

244def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

245 if attribute_path.path_hint is not None: 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 return True 

247 if isinstance(v, str): 

248 attribute_path.path_hint = v 

249 return True 

250 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

251 attribute_path.path_hint = v[0] 

252 return True 

253 return False 

254 

255 

256@dataclasses.dataclass(slots=True, frozen=True) 

257class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

258 alt_form_parser: AttributeDescription 

259 inline_reference_documentation: ParserDocumentation | None = None 

260 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

261 

262 def parse_input( 

263 self, 

264 value: object, 

265 path: AttributePath, 

266 *, 

267 parser_context: Optional["ParserContextData"] = None, 

268 ) -> TD: 

269 check_integration_mode( 

270 path, 

271 parser_context, 

272 self.expected_debputy_integration_mode, 

273 ) 

274 if self.reference_documentation_url is not None: 

275 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

276 else: 

277 doc_ref = "" 

278 

279 alt_form_parser = self.alt_form_parser 

280 if value is None: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

282 if self.reference_documentation_url is not None: 

283 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

284 raise ManifestParseException( 

285 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

286 ) 

287 _extract_path_hint(value, path) 

288 alt_form_parser.type_validator.ensure_type(value, path) 

289 attribute = alt_form_parser.target_attribute 

290 alias_mapping = { 

291 attribute: ("", None), 

292 } 

293 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

294 path.alias_mapping = alias_mapping 

295 return cast("TD", {attribute: v}) 

296 

297 

298@dataclasses.dataclass(slots=True) 

299class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

300 input_time_required_parameters: frozenset[str] 

301 all_parameters: frozenset[str] 

302 manifest_attributes: Mapping[str, "AttributeDescription"] 

303 source_attributes: Mapping[str, "AttributeDescription"] 

304 at_least_one_of: frozenset[frozenset[str]] 

305 alt_form_parser: AttributeDescription | None 

306 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset() 

307 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None 

308 inline_reference_documentation: ParserDocumentation | None = None 

309 path_hint_source_attributes: Sequence[str] = tuple() 

310 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

311 

312 def _parse_alt_form( 

313 self, 

314 value: object, 

315 path: AttributePath, 

316 *, 

317 parser_context: Optional["ParserContextData"] = None, 

318 ) -> TD: 

319 alt_form_parser = self.alt_form_parser 

320 if alt_form_parser is None: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 raise ManifestParseException( 

322 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

323 ) 

324 _extract_path_hint(value, path) 

325 alt_form_parser.type_validator.ensure_type(value, path) 

326 assert ( 

327 value is not None 

328 ), "The alternative form was None, but the parser should have rejected None earlier." 

329 attribute = alt_form_parser.target_attribute 

330 alias_mapping = { 

331 attribute: ("", None), 

332 } 

333 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

334 path.alias_mapping = alias_mapping 

335 return cast("TD", {attribute: v}) 

336 

337 def _validate_expected_keys( 

338 self, 

339 value: dict[Any, Any], 

340 path: AttributePath, 

341 *, 

342 parser_context: Optional["ParserContextData"] = None, 

343 ) -> None: 

344 unknown_keys = value.keys() - self.all_parameters 

345 doc_ref = self._doc_url_error_suffix() 

346 if unknown_keys: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 for k in unknown_keys: 

348 if isinstance(k, str): 

349 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

350 unused_keys = self.all_parameters - value.keys() 

351 if unused_keys: 

352 k = ", ".join(unused_keys) 

353 raise ManifestParseException( 

354 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}' 

355 ) 

356 raise ManifestParseException( 

357 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}' 

358 ) 

359 missing_keys = self.input_time_required_parameters - value.keys() 

360 if missing_keys: 

361 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

362 raise ManifestParseException( 

363 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}" 

364 ) 

365 for maybe_required in self.all_parameters - value.keys(): 

366 attr = self.manifest_attributes[maybe_required] 

367 assert attr.conditional_required is None or parser_context is not None 

368 if ( 368 ↛ 374line 368 didn't jump to line 374 because the condition on line 368 was never true

369 attr.conditional_required is not None 

370 and attr.conditional_required.condition_applies( 

371 assume_not_none(parser_context) 

372 ) 

373 ): 

374 reason = attr.conditional_required.reason 

375 raise ManifestParseException( 

376 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}' 

377 ) 

378 for keyset in self.at_least_one_of: 

379 matched_keys = value.keys() & keyset 

380 if not matched_keys: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

382 raise ManifestParseException( 

383 f"At least one of the following keys must be present at {path.path_container_lc}:" 

384 f" {conditionally_required}{doc_ref}" 

385 ) 

386 for group in self.mutually_exclusive_attributes: 

387 matched = value.keys() & group 

388 if len(matched) > 1: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true

389 ck = ", ".join(repr(k) for k in sorted(matched)) 

390 raise ManifestParseException( 

391 f"Could not parse {path.path_container_lc}: The following attributes are" 

392 f" mutually exclusive: {ck}{doc_ref}" 

393 ) 

394 

395 def _parse_typed_dict_form( 

396 self, 

397 value: dict[Any, Any], 

398 path: AttributePath, 

399 *, 

400 parser_context: Optional["ParserContextData"] = None, 

401 ) -> TD: 

402 self._validate_expected_keys(value, path, parser_context=parser_context) 

403 result = {} 

404 per_attribute_conflicts = self._per_attribute_conflicts() 

405 alias_mapping = {} 

406 for path_hint_source_attributes in self.path_hint_source_attributes: 

407 v = value.get(path_hint_source_attributes) 

408 if v is not None and _extract_path_hint(v, path): 

409 break 

410 for k, v in value.items(): 

411 attr = self.manifest_attributes[k] 

412 matched = value.keys() & per_attribute_conflicts[k] 

413 if matched: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 ck = ", ".join(repr(k) for k in sorted(matched)) 

415 raise ManifestParseException( 

416 f'The attribute "{k}" at {path.path} cannot be used with the following' 

417 f" attributes: {ck}{self._doc_url_error_suffix()}" 

418 ) 

419 nk = attr.target_attribute 

420 key_path = path[k] 

421 attr.type_validator.ensure_type(v, key_path) 

422 if v is None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 continue 

424 if k != nk: 

425 alias_mapping[nk] = k, None 

426 v = attr.type_validator.map_type(v, key_path, parser_context) 

427 result[nk] = v 

428 if alias_mapping: 

429 path.alias_mapping = alias_mapping 

430 return cast("TD", result) 

431 

432 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

433 doc_url = self.reference_documentation_url 

434 if doc_url is not None: 

435 if see_url_version: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 return f" Please see {doc_url} for the documentation." 

437 return f" (Documentation: {doc_url})" 

438 return "" 

439 

440 def parse_input( 

441 self, 

442 value: object, 

443 path: AttributePath, 

444 *, 

445 parser_context: Optional["ParserContextData"] = None, 

446 ) -> TD: 

447 check_integration_mode( 

448 path, 

449 parser_context, 

450 self.expected_debputy_integration_mode, 

451 ) 

452 if value is None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 form_note = " The attribute must be a mapping." 

454 if self.alt_form_parser is not None: 

455 form_note = ( 

456 " The attribute can be a mapping or a non-mapping format" 

457 ' (usually, "non-mapping format" means a string or a list of strings).' 

458 ) 

459 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

460 raise ManifestParseException( 

461 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

462 ) 

463 

464 if not isinstance(value, dict): 

465 return self._parse_alt_form(value, path, parser_context=parser_context) 

466 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

467 

468 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]: 

469 conflicts = self._per_attribute_conflicts_cache 

470 if conflicts is not None: 

471 return conflicts 

472 attrs = self.source_attributes 

473 conflicts = { 

474 a.source_attribute_name: frozenset( 

475 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

476 ) 

477 for a in attrs.values() 

478 } 

479 self._per_attribute_conflicts_cache = conflicts 

480 return self._per_attribute_conflicts_cache 

481 

482 

483def _is_path_attribute_candidate( 

484 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

485) -> bool: 

486 if ( 

487 source_attribute.parse_hints 

488 and not source_attribute.parse_hints.applicable_as_path_hint 

489 ): 

490 return False 

491 target_type = target_attribute.attribute_type 

492 _, origin, args = unpack_type(target_type, False) 

493 match_type = target_type 

494 if origin == list: 

495 match_type = args[0] 

496 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

497 

498 

499def is_typeddict(t: Any) -> bool: 

500 return typing.is_typeddict(t) or ( 

501 # Logically, not is_typeddict(t) and is subclass(DebputyParsedContent) 

502 # implies not is_typeddict(DebputyParsedContent) 

503 # except that subclass *fails* for typeddicts. 

504 not typing.is_typeddict(DebputyParsedContent) 

505 and isinstance(t, type) 

506 and issubclass(t, DebputyParsedContent) 

507 ) 

508 

509 

510class ParserGenerator: 

511 def __init__(self) -> None: 

512 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {} 

513 self._object_parsers: dict[str, DispatchingObjectParser] = {} 

514 self._table_parsers: dict[ 

515 type[DebputyDispatchableType], DispatchingTableParser[Any] 

516 ] = {} 

517 self._in_package_context_parser: dict[str, Any] = {} 

518 

519 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None: 

520 existing = self._registered_types.get(mapped_type.target_type) 

521 if existing is not None: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 raise ValueError(f"The type {existing} is already registered") 

523 self._registered_types[mapped_type.target_type] = mapped_type 

524 

525 def get_mapped_type_from_target_type( 

526 self, 

527 mapped_type: type[T], 

528 ) -> TypeMapping[Any, T] | None: 

529 return self._registered_types.get(mapped_type) 

530 

531 def discard_mapped_type(self, mapped_type: type[T]) -> None: 

532 del self._registered_types[mapped_type] 

533 

534 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None: 

535 assert rt not in self._table_parsers 

536 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

537 

538 def add_object_parser( 

539 self, 

540 path: str, 

541 *, 

542 parser_documentation: ParserDocumentation | None = None, 

543 expected_debputy_integration_mode: None | ( 

544 Container[DebputyIntegrationMode] 

545 ) = None, 

546 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error", 

547 allow_unknown_keys: bool = False, 

548 ) -> DispatchingObjectParser: 

549 assert path not in self._in_package_context_parser 

550 assert path not in self._object_parsers 

551 object_parser = DispatchingObjectParser( 

552 path, 

553 parser_documentation=parser_documentation, 

554 expected_debputy_integration_mode=expected_debputy_integration_mode, 

555 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity, 

556 allow_unknown_keys=allow_unknown_keys, 

557 ) 

558 self._object_parsers[path] = object_parser 

559 return object_parser 

560 

561 def add_in_package_context_parser( 

562 self, 

563 path: str, 

564 delegate: DeclarativeInputParser[Any], 

565 ) -> None: 

566 assert path not in self._in_package_context_parser 

567 assert path not in self._object_parsers 

568 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

569 

570 @property 

571 def dispatchable_table_parsers( 

572 self, 

573 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

574 return self._table_parsers 

575 

576 @property 

577 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

578 return self._object_parsers 

579 

580 def dispatch_parser_table_for( 

581 self, rule_type: TTP 

582 ) -> DispatchingTableParser[TP] | None: 

583 return cast( 

584 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

585 ) 

586 

587 def generate_parser( 

588 self, 

589 parsed_content: type[TD], 

590 *, 

591 source_content: SF | None = None, 

592 allow_optional: bool = False, 

593 inline_reference_documentation: ParserDocumentation | None = None, 

594 expected_debputy_integration_mode: None | ( 

595 Container[DebputyIntegrationMode] 

596 ) = None, 

597 automatic_docs: None | ( 

598 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] 

599 ) = None, 

600 ) -> DeclarativeInputParser[TD]: 

601 """Derive a parser from a TypedDict 

602 

603 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

604 or two that are used as a description. 

605 

606 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

607 their types. As an example: 

608 

609 >>> class InstallDocsRule(DebputyParsedContent): 

610 ... sources: List[str] 

611 ... into: List[str] 

612 >>> pg = ParserGenerator() 

613 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

614 

615 This will create a parser that would be able to interpret something like: 

616 

617 ```yaml 

618 install-docs: 

619 sources: ["docs/*"] 

620 into: ["my-pkg"] 

621 ``` 

622 

623 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

624 you can also provide a TypedDict describing the input, enabling more flexibility: 

625 

626 >>> class InstallDocsRule(DebputyParsedContent): 

627 ... sources: List[str] 

628 ... into: List[str] 

629 >>> class InputDocsRuleInputFormat(TypedDict): 

630 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

631 ... sources: NotRequired[List[str]] 

632 ... into: Union[str, List[str]] 

633 >>> pg = ParserGenerator() 

634 >>> flexible_parser = pg.generate_parser( 

635 ... InstallDocsRule, 

636 ... source_content=InputDocsRuleInputFormat, 

637 ... ) 

638 

639 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

640 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

641 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

642 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that 

643 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

644 both the previous input but also the following input: 

645 

646 ```yaml 

647 install-docs: 

648 source: "docs/*" 

649 into: "my-pkg" 

650 ``` 

651 

652 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

653 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

654 while parsing. 

655 

656 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

657 as part of the input. Example: 

658 

659 >>> class DiscardRule(DebputyParsedContent): 

660 ... paths: List[str] 

661 >>> class DiscardRuleInputDictFormat(TypedDict): 

662 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

663 ... paths: NotRequired[List[str]] 

664 >>> # This format relies on DiscardRule having exactly one Required attribute 

665 >>> DiscardRuleInputWithAltFormat = Union[ 

666 ... DiscardRuleInputDictFormat, 

667 ... str, 

668 ... List[str], 

669 ... ] 

670 >>> pg = ParserGenerator() 

671 >>> flexible_parser = pg.generate_parser( 

672 ... DiscardRule, 

673 ... source_content=DiscardRuleInputWithAltFormat, 

674 ... ) 

675 

676 

677 Supported types: 

678 * `List` - must have a fixed type argument (such as `List[str]`) 

679 * `str` 

680 * `int` 

681 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

682 in the debian/control file. The code receives the BinaryPackage instance 

683 matching that input. 

684 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

685 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

686 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

687 available statically on all Debian systems (must be in `base-passwd`). 

688 The user has multiple options for how to specify it (either via name or id). 

689 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

690 available statically on all Debian systems (must be in `base-passwd`). 

691 The user has multiple options for how to specify it (either via name or id). 

692 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

693 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

694 provides the `debputy' default `when` parameter for conditionals. 

695 

696 Supported special type-like parameters: 

697 

698 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

699 outermost level. Cannot vary between `parsed_content` and `source_content`. 

700 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

701 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

702 Automapping (see below) is restricted to two members in the Union. 

703 

704 Notable non-supported types: 

705 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

706 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

707 

708 Automatic mapping rules from `source_content` to `parsed_content`: 

709 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

710 `lambda value: value if isinstance(value, list) else [value]` 

711 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

712 

713 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

714 for concrete features that may be useful to you. 

715 

716 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

717 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

718 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

719 `List[TypedDict[...]]`. 

720 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

721 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

722 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

723 Note you should never pass the parsed_content as source_content directly. 

724 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

725 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

726 can keep this False). 

727 :param inline_reference_documentation: Optionally, programmatic documentation 

728 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the 

729 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage. 

730 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately 

731 (resulting in a "compile time" failure rather than a "runtime" failure). 

732 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

733 """ 

734 orig_parsed_content = parsed_content 

735 if source_content is parsed_content: 735 ↛ 736line 735 didn't jump to line 736 because the condition on line 735 was never true

736 raise ValueError( 

737 "Do not provide source_content if it is the same as parsed_content" 

738 ) 

739 is_list_wrapped = False 

740 if get_origin(orig_parsed_content) == list: 

741 parsed_content = get_args(orig_parsed_content)[0] 

742 is_list_wrapped = True 

743 

744 if isinstance(parsed_content, type) and issubclass( 

745 parsed_content, DebputyDispatchableType 

746 ): 

747 parser = self.dispatch_parser_table_for(parsed_content) 

748 if parser is None: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true

749 raise ValueError( 

750 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

751 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

752 ) 

753 # FIXME: Only the list wrapped version has documentation. 

754 if is_list_wrapped: 754 ↛ 760line 754 didn't jump to line 760 because the condition on line 754 was always true

755 parser = ListWrappedDeclarativeInputParser( 

756 parser, 

757 inline_reference_documentation=inline_reference_documentation, 

758 expected_debputy_integration_mode=expected_debputy_integration_mode, 

759 ) 

760 return parser 

761 

762 if not is_typeddict(parsed_content): 762 ↛ 763line 762 didn't jump to line 763 because the condition on line 762 was never true

763 raise ValueError( 

764 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

765 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

766 ) 

767 if is_list_wrapped and source_content is not None: 

768 if get_origin(source_content) != list: 768 ↛ 769line 768 didn't jump to line 769 because the condition on line 768 was never true

769 raise ValueError( 

770 "If the parsed_content is a List type, then source_format must be a List type as well." 

771 ) 

772 source_content = get_args(source_content)[0] 

773 

774 target_attributes = self._parse_types( 

775 parsed_content, 

776 allow_source_attribute_annotations=source_content is None, 

777 forbid_optional=not allow_optional, 

778 ) 

779 required_target_parameters = frozenset(parsed_content.__required_keys__) 

780 parsed_alt_form = None 

781 non_mapping_source_only = False 

782 

783 if source_content is not None: 

784 default_target_attribute = None 

785 if len(required_target_parameters) == 1: 

786 default_target_attribute = next(iter(required_target_parameters)) 

787 

788 source_typed_dict, alt_source_forms = _extract_typed_dict( 

789 source_content, 

790 default_target_attribute, 

791 ) 

792 if alt_source_forms: 

793 parsed_alt_form = self._parse_alt_form( 

794 alt_source_forms, 

795 default_target_attribute, 

796 ) 

797 if source_typed_dict is not None: 

798 source_content_attributes = self._parse_types( 

799 source_typed_dict, 

800 allow_target_attribute_annotation=True, 

801 allow_source_attribute_annotations=True, 

802 forbid_optional=not allow_optional, 

803 ) 

804 source_content_parameter = "source_content" 

805 source_and_parsed_differs = True 

806 else: 

807 source_typed_dict = parsed_content 

808 source_content_attributes = target_attributes 

809 source_content_parameter = "parsed_content" 

810 source_and_parsed_differs = True 

811 non_mapping_source_only = True 

812 else: 

813 source_typed_dict = parsed_content 

814 source_content_attributes = target_attributes 

815 source_content_parameter = "parsed_content" 

816 source_and_parsed_differs = False 

817 

818 sources = collections.defaultdict(set) 

819 seen_targets = set() 

820 seen_source_names: dict[str, str] = {} 

821 source_attributes: dict[str, AttributeDescription] = {} 

822 path_hint_source_attributes = [] 

823 

824 for k in source_content_attributes: 

825 ia = source_content_attributes[k] 

826 

827 ta = ( 

828 target_attributes.get(ia.target_attribute) 

829 if source_and_parsed_differs 

830 else ia 

831 ) 

832 if ta is None: 832 ↛ 834line 832 didn't jump to line 834 because the condition on line 832 was never true

833 # Error message would be wrong if this assertion is false. 

834 assert source_and_parsed_differs 

835 raise ValueError( 

836 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

837 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

838 ) 

839 if _is_path_attribute_candidate(ia, ta): 

840 path_hint_source_attributes.append(ia.source_attribute_name) 

841 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

842 if existing_source_name: 842 ↛ 843line 842 didn't jump to line 843 because the condition on line 842 was never true

843 raise ValueError( 

844 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

845 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

846 f' so only one attribute use "{ia.source_attribute_name}".' 

847 ) 

848 seen_source_names[ia.source_attribute_name] = k 

849 seen_targets.add(ta.target_attribute) 

850 sources[ia.target_attribute].add(k) 

851 if source_and_parsed_differs: 

852 bridge_mapper = self._type_normalize( 

853 k, ia.attribute_type, ta.attribute_type, False 

854 ) 

855 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

856 source_attributes[k] = ia 

857 

858 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]: 

859 return frozenset( 

860 source_content_attributes[a].source_attribute_name for a in td_name 

861 ) 

862 

863 _check_attributes( 

864 parsed_content, 

865 source_typed_dict, 

866 source_content_attributes, 

867 sources, 

868 ) 

869 

870 at_least_one_of = frozenset( 

871 _as_attr_names(g) 

872 for k, g in sources.items() 

873 if len(g) > 1 and k in required_target_parameters 

874 ) 

875 

876 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 missing = ", ".join( 

878 repr(k) for k in (target_attributes.keys() - seen_targets) 

879 ) 

880 raise ValueError( 

881 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

882 f" {missing}" 

883 ) 

884 all_mutually_exclusive_fields = frozenset( 

885 _as_attr_names(g) for g in sources.values() if len(g) > 1 

886 ) 

887 

888 all_parameters = ( 

889 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

890 ) 

891 _check_conflicts( 

892 source_content_attributes, 

893 source_typed_dict.__required_keys__, 

894 all_parameters, 

895 ) 

896 

897 manifest_attributes = { 

898 a.source_attribute_name: a for a in source_content_attributes.values() 

899 } 

900 

901 if parsed_alt_form is not None: 

902 target_attribute = parsed_alt_form.target_attribute 

903 if ( 903 ↛ 908line 903 didn't jump to line 908 because the condition on line 903 was never true

904 target_attribute not in required_target_parameters 

905 and required_target_parameters 

906 or len(required_target_parameters) > 1 

907 ): 

908 raise NotImplementedError( 

909 "When using alternative source formats (Union[TypedDict, ...]), then the" 

910 " target must have at most one require parameter" 

911 ) 

912 bridge_mapper = self._type_normalize( 

913 target_attribute, 

914 parsed_alt_form.attribute_type, 

915 target_attributes[target_attribute].attribute_type, 

916 False, 

917 ) 

918 parsed_alt_form.type_validator = ( 

919 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

920 ) 

921 

922 inline_reference_documentation = ( 

923 _verify_and_auto_correct_inline_reference_documentation( 

924 parsed_content, 

925 source_typed_dict, 

926 source_content_attributes, 

927 inline_reference_documentation, 

928 parsed_alt_form is not None, 

929 automatic_docs, 

930 ) 

931 ) 

932 if non_mapping_source_only: 

933 parser = DeclarativeNonMappingInputParser( 

934 assume_not_none(parsed_alt_form), 

935 inline_reference_documentation=inline_reference_documentation, 

936 expected_debputy_integration_mode=expected_debputy_integration_mode, 

937 ) 

938 else: 

939 parser = DeclarativeMappingInputParser( 

940 _as_attr_names(source_typed_dict.__required_keys__), 

941 _as_attr_names(all_parameters), 

942 manifest_attributes, 

943 source_attributes, 

944 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

945 alt_form_parser=parsed_alt_form, 

946 at_least_one_of=at_least_one_of, 

947 inline_reference_documentation=inline_reference_documentation, 

948 path_hint_source_attributes=tuple(path_hint_source_attributes), 

949 expected_debputy_integration_mode=expected_debputy_integration_mode, 

950 ) 

951 if is_list_wrapped: 

952 parser = ListWrappedDeclarativeInputParser( 

953 parser, 

954 expected_debputy_integration_mode=expected_debputy_integration_mode, 

955 ) 

956 return parser 

957 

958 def _as_type_validator( 

959 self, 

960 attribute: str, 

961 provided_type: Any, 

962 parsing_typed_dict_attribute: bool, 

963 ) -> AttributeTypeHandler: 

964 assert not isinstance(provided_type, tuple) 

965 

966 if isinstance(provided_type, type) and issubclass( 

967 provided_type, DebputyDispatchableType 

968 ): 

969 return _dispatch_parser(provided_type) 

970 

971 unmapped_type = self._strip_mapped_types( 

972 provided_type, 

973 parsing_typed_dict_attribute, 

974 ) 

975 type_normalizer = self._type_normalize( 

976 attribute, 

977 unmapped_type, 

978 provided_type, 

979 parsing_typed_dict_attribute, 

980 ) 

981 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type( 

982 unmapped_type, 

983 parsing_typed_dict_attribute, 

984 ) 

985 _, t_provided_orig, t_provided_args = unpack_type( 

986 provided_type, 

987 parsing_typed_dict_attribute, 

988 ) 

989 

990 if ( 990 ↛ 996line 990 didn't jump to line 996 because the condition on line 990 was never true

991 t_unmapped_orig == Union 

992 and t_unmapped_args 

993 and len(t_unmapped_args) == 2 

994 and any(v is _NONE_TYPE for v in t_unmapped_args) 

995 ): 

996 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

997 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

998 validator = self._as_type_validator( 

999 attribute, actual_type, parsing_typed_dict_attribute 

1000 ) 

1001 

1002 def _validator(v: Any, path: AttributePath) -> None: 

1003 if v is None: 

1004 return 

1005 validator.ensure_type(v, path) 

1006 

1007 return AttributeTypeHandler( 

1008 validator.describe_type(), 

1009 _validator, 

1010 base_type=validator.base_type, 

1011 mapper=type_normalizer, 

1012 ) 

1013 

1014 if unmapped_type in BASIC_SIMPLE_TYPES: 

1015 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1016 

1017 type_mapping = self._registered_types.get(provided_type) 

1018 if type_mapping is not None: 

1019 simple_type = f" ({type_name})" 

1020 type_name = type_mapping.target_type.__name__ 

1021 else: 

1022 simple_type = "" 

1023 

1024 def _validator(v: Any, path: AttributePath) -> None: 

1025 if not isinstance(v, unmapped_type): 

1026 _validation_type_error( 

1027 path, f"The attribute must be a {type_name}{simple_type}" 

1028 ) 

1029 

1030 return AttributeTypeHandler( 

1031 type_name, 

1032 _validator, 

1033 base_type=unmapped_type, 

1034 mapper=type_normalizer, 

1035 ) 

1036 if t_unmapped_orig == list: 

1037 if not t_unmapped_args: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true

1038 raise ValueError( 

1039 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1040 ) 

1041 

1042 genetic_type = t_unmapped_args[0] 

1043 key_mapper = self._as_type_validator( 

1044 attribute, 

1045 genetic_type, 

1046 parsing_typed_dict_attribute, 

1047 ) 

1048 

1049 def _validator(v: Any, path: AttributePath) -> None: 

1050 if not isinstance(v, list): 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true

1051 _validation_type_error(path, "The attribute must be a list") 

1052 for i, list_item in enumerate(v): 

1053 key_mapper.ensure_type(list_item, path[i]) 

1054 

1055 list_mapper = ( 

1056 map_each_element(key_mapper.mapper) 

1057 if key_mapper.mapper is not None 

1058 else None 

1059 ) 

1060 

1061 return AttributeTypeHandler( 

1062 f"List of {key_mapper.describe_type()}", 

1063 _validator, 

1064 base_type=list, 

1065 mapper=type_normalizer, 

1066 ).combine_mapper(list_mapper) 

1067 if is_typeddict(provided_type): 

1068 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1069 return AttributeTypeHandler( 

1070 description=f"{provided_type.__name__} (Typed Mapping)", 

1071 ensure_type=lambda v, ap: None, 

1072 base_type=dict, 

1073 mapper=lambda v, ap, cv: subparser.parse_input( 

1074 v, ap, parser_context=cv 

1075 ), 

1076 ) 

1077 if t_unmapped_orig == dict: 

1078 if not t_unmapped_args or len(t_unmapped_args) != 2: 1078 ↛ 1079line 1078 didn't jump to line 1079 because the condition on line 1078 was never true

1079 raise ValueError( 

1080 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1081 ) 

1082 if t_unmapped_args[0] != str: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 raise ValueError( 

1084 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1085 " Currently, only `str` is supported (Dict[str, Y])" 

1086 ) 

1087 key_mapper = self._as_type_validator( 

1088 attribute, 

1089 t_unmapped_args[0], 

1090 parsing_typed_dict_attribute, 

1091 ) 

1092 value_mapper = self._as_type_validator( 

1093 attribute, 

1094 t_unmapped_args[1], 

1095 parsing_typed_dict_attribute, 

1096 ) 

1097 

1098 if key_mapper.base_type is None: 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true

1099 raise ValueError( 

1100 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1101 f" without trivial base types (such as `str`) are not supported at the moment." 

1102 ) 

1103 

1104 if value_mapper.mapper is not None: 1104 ↛ 1105line 1104 didn't jump to line 1105 because the condition on line 1104 was never true

1105 raise ValueError( 

1106 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1107 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1108 " Better typing may come later" 

1109 ) 

1110 

1111 def _validator(v: Any, path: AttributePath) -> None: 

1112 if not isinstance(v, dict): 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true

1113 _validation_type_error(path, "The attribute must be a mapping") 

1114 key_name = "the first key in the mapping" 

1115 for i, (k, value) in enumerate(v.items()): 

1116 if not key_mapper.base_type_match(k): 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true

1117 kp = path.copy_with_path_hint(key_name) 

1118 _validation_type_error( 

1119 kp, 

1120 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1121 ) 

1122 key_name = f"the key after {k}" 

1123 value_mapper.ensure_type(value, path[k]) 

1124 

1125 return AttributeTypeHandler( 

1126 f"Mapping of {value_mapper.describe_type()}", 

1127 _validator, 

1128 base_type=dict, 

1129 mapper=type_normalizer, 

1130 ).combine_mapper(key_mapper.mapper) 

1131 if t_unmapped_orig in (Union, UnionType): 

1132 if _is_two_arg_x_list_x(t_provided_args): 

1133 # Force the order to be "X, List[X]" as it simplifies the code 

1134 x_list_x = ( 

1135 t_provided_args 

1136 if get_origin(t_provided_args[1]) == list 

1137 else (t_provided_args[1], t_provided_args[0]) 

1138 ) 

1139 

1140 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1141 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1142 # case to avoid this assert and fall into the "generic case". 

1143 assert get_origin(x_list_x[0]) != list 

1144 x_subtype_checker = self._as_type_validator( 

1145 attribute, 

1146 x_list_x[0], 

1147 parsing_typed_dict_attribute, 

1148 ) 

1149 list_x_subtype_checker = self._as_type_validator( 

1150 attribute, 

1151 x_list_x[1], 

1152 parsing_typed_dict_attribute, 

1153 ) 

1154 type_description = x_subtype_checker.describe_type() 

1155 type_description = f"{type_description} or a list of {type_description}" 

1156 

1157 def _validator(v: Any, path: AttributePath) -> None: 

1158 if isinstance(v, list): 

1159 list_x_subtype_checker.ensure_type(v, path) 

1160 else: 

1161 x_subtype_checker.ensure_type(v, path) 

1162 

1163 return AttributeTypeHandler( 

1164 type_description, 

1165 _validator, 

1166 mapper=type_normalizer, 

1167 ) 

1168 else: 

1169 subtype_checker = [ 

1170 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1171 for a in t_unmapped_args 

1172 ] 

1173 type_description = "one-of: " + ", ".join( 

1174 f"{sc.describe_type()}" for sc in subtype_checker 

1175 ) 

1176 mapper = subtype_checker[0].mapper 

1177 if any(mapper != sc.mapper for sc in subtype_checker): 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true

1178 raise ValueError( 

1179 f'Cannot handle the union "{provided_type}" as the target types need different' 

1180 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1181 " where X is a non-collection type." 

1182 ) 

1183 

1184 def _validator(v: Any, path: AttributePath) -> None: 

1185 partial_matches = [] 

1186 for sc in subtype_checker: 1186 ↛ 1194line 1186 didn't jump to line 1194 because the loop on line 1186 didn't complete

1187 try: 

1188 sc.ensure_type(v, path) 

1189 return 

1190 except ManifestParseException as e: 

1191 if sc.base_type_match(v): 1191 ↛ 1192line 1191 didn't jump to line 1192 because the condition on line 1191 was never true

1192 partial_matches.append((sc, e)) 

1193 

1194 if len(partial_matches) == 1: 

1195 raise partial_matches[0][1] 

1196 _validation_type_error( 

1197 path, f"Could not match against: {type_description}" 

1198 ) 

1199 

1200 return AttributeTypeHandler( 

1201 type_description, 

1202 _validator, 

1203 mapper=type_normalizer, 

1204 ) 

1205 if t_unmapped_orig == Literal: 

1206 # We want "x" for string values; repr provides 'x' 

1207 pretty = ", ".join( 

1208 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args 

1209 ) 

1210 

1211 def _validator(v: Any, path: AttributePath) -> None: 

1212 if v not in t_unmapped_args: 

1213 value_hint = "" 

1214 if isinstance(v, str): 1214 ↛ 1216line 1214 didn't jump to line 1216 because the condition on line 1214 was always true

1215 value_hint = f"({v}) " 

1216 _validation_type_error( 

1217 path, 

1218 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1219 ) 

1220 

1221 return AttributeTypeHandler( 

1222 f"One of the following literal values: {pretty}", 

1223 _validator, 

1224 ) 

1225 

1226 if provided_type == Any: 1226 ↛ 1231line 1226 didn't jump to line 1231 because the condition on line 1226 was always true

1227 return AttributeTypeHandler( 

1228 "any (unvalidated)", 

1229 lambda *a: None, 

1230 ) 

1231 raise ValueError( 

1232 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1233 ) 

1234 

1235 def _parse_types( 

1236 self, 

1237 spec: type[TypedDict], 

1238 allow_target_attribute_annotation: bool = False, 

1239 allow_source_attribute_annotations: bool = False, 

1240 forbid_optional: bool = True, 

1241 ) -> dict[str, AttributeDescription]: 

1242 annotations = get_type_hints(spec, include_extras=True) 

1243 return { 

1244 k: self._attribute_description( 

1245 k, 

1246 t, 

1247 k in spec.__required_keys__, 

1248 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1249 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1250 forbid_optional=forbid_optional, 

1251 ) 

1252 for k, t in annotations.items() 

1253 } 

1254 

1255 def _attribute_description( 

1256 self, 

1257 attribute: str, 

1258 orig_td: Any, 

1259 is_required: bool, 

1260 forbid_optional: bool = True, 

1261 allow_target_attribute_annotation: bool = False, 

1262 allow_source_attribute_annotations: bool = False, 

1263 ) -> AttributeDescription: 

1264 td, anno, is_optional = _parse_type( 

1265 attribute, orig_td, forbid_optional=forbid_optional 

1266 ) 

1267 type_validator = self._as_type_validator(attribute, td, True) 

1268 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1269 anno, 

1270 f' Seen with attribute "{attribute}".', 

1271 attribute, 

1272 is_required, 

1273 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1274 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1275 ) 

1276 return AttributeDescription( 

1277 target_attribute=parsed_annotations.target_attribute, 

1278 attribute_type=td, 

1279 type_validator=type_validator, 

1280 annotations=anno, 

1281 is_optional=is_optional, 

1282 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1283 conditional_required=parsed_annotations.conditional_required, 

1284 source_attribute_name=assume_not_none( 

1285 parsed_annotations.source_manifest_attribute 

1286 ), 

1287 parse_hints=parsed_annotations, 

1288 ) 

1289 

1290 def _parse_alt_form( 

1291 self, 

1292 alt_form, 

1293 default_target_attribute: str | None, 

1294 ) -> AttributeDescription: 

1295 td, anno, is_optional = _parse_type( 

1296 "source_format alternative form", 

1297 alt_form, 

1298 forbid_optional=True, 

1299 parsing_typed_dict_attribute=False, 

1300 ) 

1301 type_validator = self._as_type_validator( 

1302 "source_format alternative form", 

1303 td, 

1304 True, 

1305 ) 

1306 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1307 anno, 

1308 " The alternative for source_format.", 

1309 None, 

1310 False, 

1311 default_target_attribute=default_target_attribute, 

1312 allow_target_attribute_annotation=True, 

1313 allow_source_attribute_annotations=False, 

1314 ) 

1315 return AttributeDescription( 

1316 target_attribute=parsed_annotations.target_attribute, 

1317 attribute_type=td, 

1318 type_validator=type_validator, 

1319 annotations=anno, 

1320 is_optional=is_optional, 

1321 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1322 conditional_required=parsed_annotations.conditional_required, 

1323 source_attribute_name="Alt form of the source_format", 

1324 ) 

1325 

1326 def _union_narrowing( 

1327 self, 

1328 input_type: Any, 

1329 target_type: Any, 

1330 parsing_typed_dict_attribute: bool, 

1331 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1332 _, input_orig, input_args = unpack_type( 

1333 input_type, parsing_typed_dict_attribute 

1334 ) 

1335 _, target_orig, target_args = unpack_type( 

1336 target_type, parsing_typed_dict_attribute 

1337 ) 

1338 

1339 if input_orig not in (Union, UnionType) or not input_args: 1339 ↛ 1340line 1339 didn't jump to line 1340 because the condition on line 1339 was never true

1340 raise ValueError("input_type must be a Union[...] with non-empty args") 

1341 

1342 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1343 # - Where X = Y or there is a simple standard transformation from X to Y. 

1344 

1345 if target_orig not in (Union, UnionType, list) or not target_args: 

1346 # Not supported 

1347 return None 

1348 

1349 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1349 ↛ 1351line 1349 didn't jump to line 1351 because the condition on line 1349 was never true

1350 # Not needed (identity mapping) 

1351 return None 

1352 

1353 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1353 ↛ 1355line 1353 didn't jump to line 1355 because the condition on line 1353 was never true

1354 # Not supported 

1355 return None 

1356 

1357 target_arg = target_args[0] 

1358 simplified_type = self._strip_mapped_types( 

1359 target_arg, parsing_typed_dict_attribute 

1360 ) 

1361 acceptable_types = { 

1362 target_arg, 

1363 list[target_arg], # type: ignore 

1364 List[target_arg], # type: ignore 

1365 simplified_type, 

1366 list[simplified_type], # type: ignore 

1367 List[simplified_type], # type: ignore 

1368 } 

1369 target_format = ( 

1370 target_arg, 

1371 list[target_arg], # type: ignore 

1372 List[target_arg], # type: ignore 

1373 ) 

1374 in_target_format = 0 

1375 in_simple_format = 0 

1376 for input_arg in input_args: 

1377 if input_arg not in acceptable_types: 1377 ↛ 1379line 1377 didn't jump to line 1379 because the condition on line 1377 was never true

1378 # Not supported 

1379 return None 

1380 if input_arg in target_format: 

1381 in_target_format += 1 

1382 else: 

1383 in_simple_format += 1 

1384 

1385 assert in_simple_format or in_target_format 

1386 

1387 if in_target_format and not in_simple_format: 

1388 # Union[X, List[X]] -> List[X] 

1389 return normalize_into_list 

1390 mapped = self._registered_types[target_arg] 

1391 if not in_target_format and in_simple_format: 1391 ↛ 1406line 1391 didn't jump to line 1406 because the condition on line 1391 was always true

1392 # Union[X, List[X]] -> List[Y] 

1393 

1394 def _mapper_x_list_y( 

1395 x: Any | list[Any], 

1396 ap: AttributePath, 

1397 pc: Optional["ParserContextData"], 

1398 ) -> list[Any]: 

1399 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1400 

1401 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1402 

1403 return _mapper_x_list_y 

1404 

1405 # Union[Y, List[X]] -> List[Y] 

1406 if not isinstance(target_arg, type): 

1407 raise ValueError( 

1408 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1409 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1410 f" in the source content (but both a mix of both)" 

1411 ) 

1412 

1413 def _mapper_mixed_list_y( 

1414 x: Any | list[Any], 

1415 ap: AttributePath, 

1416 pc: Optional["ParserContextData"], 

1417 ) -> list[Any]: 

1418 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1419 

1420 return [ 

1421 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1422 for x in in_list_form 

1423 ] 

1424 

1425 return _mapper_mixed_list_y 

1426 

1427 def _type_normalize( 

1428 self, 

1429 attribute: str, 

1430 input_type: Any, 

1431 target_type: Any, 

1432 parsing_typed_dict_attribute: bool, 

1433 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1434 if input_type == target_type: 

1435 return None 

1436 _, input_orig, input_args = unpack_type( 

1437 input_type, parsing_typed_dict_attribute 

1438 ) 

1439 _, target_orig, target_args = unpack_type( 

1440 target_type, 

1441 parsing_typed_dict_attribute, 

1442 ) 

1443 if input_orig in (Union, UnionType): 

1444 result = self._union_narrowing( 

1445 input_type, target_type, parsing_typed_dict_attribute 

1446 ) 

1447 if result: 

1448 return result 

1449 elif target_orig == list and target_args[0] == input_type: 

1450 return wrap_into_list 

1451 

1452 mapped = self._registered_types.get(target_type) 

1453 if mapped is not None and input_type == mapped.source_type: 

1454 # Source -> Target 

1455 return mapped.mapper 

1456 if target_orig == list and target_args: 1456 ↛ 1474line 1456 didn't jump to line 1474 because the condition on line 1456 was always true

1457 mapped = self._registered_types.get(target_args[0]) 

1458 if mapped is not None: 1458 ↛ 1474line 1458 didn't jump to line 1474 because the condition on line 1458 was always true

1459 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1460 mapped_type: TypeMapping = mapped 

1461 if input_type == mapped.source_type: 1461 ↛ 1463line 1461 didn't jump to line 1463 because the condition on line 1461 was never true

1462 # Source -> List[Target] 

1463 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1464 if ( 1464 ↛ 1474line 1464 didn't jump to line 1474 because the condition on line 1464 was always true

1465 input_orig == list 

1466 and input_args 

1467 and input_args[0] == mapped_type.source_type 

1468 ): 

1469 # List[Source] -> List[Target] 

1470 return lambda xs, ap, pc: [ 

1471 mapped_type.mapper(x, ap, pc) for x in xs 

1472 ] 

1473 

1474 raise ValueError( 

1475 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1476 f" {input_type} to {target_type}" 

1477 ) 

1478 

1479 def _strip_mapped_types( 

1480 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1481 ) -> Any: 

1482 m = self._registered_types.get(orig_td) 

1483 if m is not None: 

1484 return m.source_type 

1485 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1486 if v == list: 

1487 arg = args[0] 

1488 m = self._registered_types.get(arg) 

1489 if m: 

1490 return list[m.source_type] # type: ignore 

1491 if v in (Union, UnionType): 

1492 stripped_args = tuple( 

1493 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1494 ) 

1495 if stripped_args != args: 

1496 return Union[stripped_args] 

1497 return orig_td 

1498 

1499 

1500def _sort_key(attr: StandardParserAttributeDocumentation) -> Any: 

1501 key = next(iter(attr.attributes)) 

1502 return attr.sort_category, key 

1503 

1504 

1505def _apply_std_docs( 

1506 std_doc_table: ( 

1507 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1508 ), 

1509 source_format_typed_dict: type[Any], 

1510 attribute_docs: Sequence[ParserAttributeDocumentation] | None, 

1511) -> Sequence[ParserAttributeDocumentation] | None: 

1512 if std_doc_table is None or not std_doc_table: 1512 ↛ 1515line 1512 didn't jump to line 1515 because the condition on line 1512 was always true

1513 return attribute_docs 

1514 

1515 has_docs_for = set() 

1516 if attribute_docs: 

1517 for attribute_doc in attribute_docs: 

1518 has_docs_for.update(attribute_doc.attributes) 

1519 

1520 base_seen = set() 

1521 std_docs_used = [] 

1522 

1523 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", [])) 

1524 base_seen.update(remaining_bases) 

1525 while remaining_bases: 

1526 base = remaining_bases.pop() 

1527 new_bases_to_check = { 

1528 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen 

1529 } 

1530 remaining_bases.update(new_bases_to_check) 

1531 base_seen.update(new_bases_to_check) 

1532 std_docs = std_doc_table.get(base) 

1533 if std_docs: 

1534 for std_doc in std_docs: 

1535 if any(a in has_docs_for for a in std_doc.attributes): 

1536 # If there is any overlap, do not add the docs 

1537 continue 

1538 has_docs_for.update(std_doc.attributes) 

1539 std_docs_used.append(std_doc) 

1540 

1541 if not std_docs_used: 

1542 return attribute_docs 

1543 docs = sorted(std_docs_used, key=_sort_key) 

1544 if attribute_docs: 

1545 # Plugin provided attributes first 

1546 c = list(attribute_docs) 

1547 c.extend(docs) 

1548 docs = c 

1549 return tuple(docs) 

1550 

1551 

1552def _verify_and_auto_correct_inline_reference_documentation( 

1553 parsed_content: type[TD], 

1554 source_typed_dict: type[Any], 

1555 source_content_attributes: Mapping[str, AttributeDescription], 

1556 inline_reference_documentation: ParserDocumentation | None, 

1557 has_alt_form: bool, 

1558 automatic_docs: ( 

1559 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1560 ) = None, 

1561) -> ParserDocumentation | None: 

1562 orig_attribute_docs = ( 

1563 inline_reference_documentation.attribute_doc 

1564 if inline_reference_documentation 

1565 else None 

1566 ) 

1567 attribute_docs = _apply_std_docs( 

1568 automatic_docs, 

1569 source_typed_dict, 

1570 orig_attribute_docs, 

1571 ) 

1572 if inline_reference_documentation is None and attribute_docs is None: 

1573 return None 

1574 changes = {} 

1575 if attribute_docs: 

1576 seen = set() 

1577 had_any_custom_docs = False 

1578 for attr_doc in attribute_docs: 

1579 if not isinstance(attr_doc, StandardParserAttributeDocumentation): 

1580 had_any_custom_docs = True 

1581 for attr_name in attr_doc.attributes: 

1582 attr = source_content_attributes.get(attr_name) 

1583 if attr is None: 1583 ↛ 1584line 1583 didn't jump to line 1584 because the condition on line 1583 was never true

1584 raise ValueError( 

1585 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1586 f' references an attribute "{attr_name}", which does not exist in the source format.' 

1587 ) 

1588 if attr_name in seen: 1588 ↛ 1589line 1588 didn't jump to line 1589 because the condition on line 1588 was never true

1589 raise ValueError( 

1590 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1591 f' has documentation for "{attr_name}" twice, which is not supported.' 

1592 f" Please document it at most once" 

1593 ) 

1594 seen.add(attr_name) 

1595 undocumented = source_content_attributes.keys() - seen 

1596 if undocumented: 1596 ↛ 1597line 1596 didn't jump to line 1597 because the condition on line 1596 was never true

1597 if had_any_custom_docs: 

1598 undocumented_attrs = ", ".join(undocumented) 

1599 raise ValueError( 

1600 f"The following attributes were not documented for the source format of" 

1601 f" {parsed_content.__qualname__}. If this is deliberate, then please" 

1602 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1603 f" {undocumented_attrs}" 

1604 ) 

1605 combined_docs = list(attribute_docs) 

1606 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented)) 

1607 attribute_docs = combined_docs 

1608 

1609 if attribute_docs and orig_attribute_docs != attribute_docs: 1609 ↛ 1610line 1609 didn't jump to line 1610 because the condition on line 1609 was never true

1610 assert attribute_docs is not None 

1611 changes["attribute_doc"] = tuple(attribute_docs) 

1612 

1613 if ( 1613 ↛ 1618line 1613 didn't jump to line 1618 because the condition on line 1613 was never true

1614 inline_reference_documentation is not None 

1615 and inline_reference_documentation.alt_parser_description 

1616 and not has_alt_form 

1617 ): 

1618 raise ValueError( 

1619 "The inline_reference_documentation had documentation for an non-mapping format," 

1620 " but the source format does not have a non-mapping format." 

1621 ) 

1622 if changes: 1622 ↛ 1623line 1622 didn't jump to line 1623 because the condition on line 1622 was never true

1623 if inline_reference_documentation is None: 

1624 inline_reference_documentation = reference_documentation() 

1625 return inline_reference_documentation.replace(**changes) 

1626 return inline_reference_documentation 

1627 

1628 

1629def _check_conflicts( 

1630 input_content_attributes: dict[str, AttributeDescription], 

1631 required_attributes: frozenset[str], 

1632 all_attributes: frozenset[str], 

1633) -> None: 

1634 for attr_name, attr in input_content_attributes.items(): 

1635 if attr_name in required_attributes and attr.conflicting_attributes: 1635 ↛ 1636line 1635 didn't jump to line 1636 because the condition on line 1635 was never true

1636 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1637 raise ValueError( 

1638 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1639 " This makes it impossible to use these attributes. Either remove the attributes" 

1640 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1641 " optional (NotRequired)" 

1642 ) 

1643 else: 

1644 required_conflicts = attr.conflicting_attributes & required_attributes 

1645 if required_conflicts: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true

1646 c = ", ".join(repr(a) for a in required_conflicts) 

1647 raise ValueError( 

1648 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1649 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1650 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1651 ) 

1652 unknown_attributes = attr.conflicting_attributes - all_attributes 

1653 if unknown_attributes: 1653 ↛ 1654line 1653 didn't jump to line 1654 because the condition on line 1653 was never true

1654 c = ", ".join(repr(a) for a in unknown_attributes) 

1655 raise ValueError( 

1656 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1657 f" None of these attributes were declared in the input." 

1658 ) 

1659 

1660 

1661def _check_attributes( 

1662 content: type[TypedDict], 

1663 input_content: type[TypedDict], 

1664 input_content_attributes: dict[str, AttributeDescription], 

1665 sources: Mapping[str, Collection[str]], 

1666) -> None: 

1667 target_required_keys = content.__required_keys__ 

1668 input_required_keys = input_content.__required_keys__ 

1669 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1670 

1671 for input_name in all_input_keys: 

1672 attr = input_content_attributes[input_name] 

1673 target_name = attr.target_attribute 

1674 source_names = sources[target_name] 

1675 input_is_required = input_name in input_required_keys 

1676 target_is_required = target_name in target_required_keys 

1677 

1678 assert source_names 

1679 

1680 if input_is_required and len(source_names) > 1: 1680 ↛ 1681line 1680 didn't jump to line 1681 because the condition on line 1680 was never true

1681 raise ValueError( 

1682 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1683 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1684 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1685 f' "{input_name}" might be missing a NotRequired type' 

1686 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1687 ) 

1688 if not input_is_required and target_is_required and len(source_names) == 1: 1688 ↛ 1689line 1688 didn't jump to line 1689 because the condition on line 1688 was never true

1689 raise ValueError( 

1690 f'The source attribute "{input_name}" is not marked as required and maps to' 

1691 f' "{target_name}", which is marked as required. As there are no other attributes' 

1692 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1693 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1694 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1695 ) 

1696 

1697 

1698def _validation_type_error(path: AttributePath, message: str) -> None: 

1699 raise ManifestParseException( 

1700 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1701 ) 

1702 

1703 

1704def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool: 

1705 if len(t_args) != 2: 

1706 return False 

1707 lhs, rhs = t_args 

1708 if get_origin(lhs) == list: 

1709 if get_origin(rhs) == list: 1709 ↛ 1712line 1709 didn't jump to line 1712 because the condition on line 1709 was never true

1710 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1711 # does not support it. 

1712 return False 

1713 l_args = get_args(lhs) 

1714 return bool(l_args and l_args[0] == rhs) 

1715 if get_origin(rhs) == list: 

1716 r_args = get_args(rhs) 

1717 return bool(r_args and r_args[0] == lhs) 

1718 return False 

1719 

1720 

1721def _extract_typed_dict( 

1722 base_type, 

1723 default_target_attribute: str | None, 

1724) -> tuple[type[TypedDict] | None, Any]: 

1725 if is_typeddict(base_type): 

1726 return base_type, None 

1727 _, origin, args = unpack_type(base_type, False) 

1728 if origin != Union: 

1729 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1729 ↛ 1730line 1729 didn't jump to line 1730 because the condition on line 1729 was never true

1730 raise ValueError( 

1731 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1732 ) 

1733 return None, base_type 

1734 typed_dicts = [x for x in args if is_typeddict(x)] 

1735 if len(typed_dicts) > 1: 1735 ↛ 1736line 1735 didn't jump to line 1736 because the condition on line 1735 was never true

1736 raise ValueError( 

1737 "When source_format is a Union, it must contain at most one TypedDict" 

1738 ) 

1739 typed_dict = typed_dicts[0] if typed_dicts else None 

1740 

1741 if any(x is None or x is _NONE_TYPE for x in args): 1741 ↛ 1742line 1741 didn't jump to line 1742 because the condition on line 1741 was never true

1742 raise ValueError( 

1743 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1744 ) 

1745 

1746 if any( 1746 ↛ 1751line 1746 didn't jump to line 1751 because the condition on line 1746 was never true

1747 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1748 for x in args 

1749 if x is not typed_dict 

1750 ): 

1751 raise ValueError( 

1752 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1753 ) 

1754 remaining = [x for x in args if x is not typed_dict] 

1755 has_target_attribute = False 

1756 anno = None 

1757 if len(remaining) == 1: 1757 ↛ 1758line 1757 didn't jump to line 1758 because the condition on line 1757 was never true

1758 base_type, anno, _ = _parse_type( 

1759 "source_format alternative form", 

1760 remaining[0], 

1761 forbid_optional=True, 

1762 parsing_typed_dict_attribute=False, 

1763 ) 

1764 has_target_attribute = bool(anno) and any( 

1765 isinstance(x, TargetAttribute) for x in anno 

1766 ) 

1767 target_type = base_type 

1768 else: 

1769 target_type = Union[tuple(remaining)] 

1770 

1771 if default_target_attribute is None and not has_target_attribute: 1771 ↛ 1772line 1771 didn't jump to line 1772 because the condition on line 1771 was never true

1772 raise ValueError( 

1773 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1774 " OR the parsed_content format must have exactly one attribute that is required." 

1775 ) 

1776 if anno: 1776 ↛ 1777line 1776 didn't jump to line 1777 because the condition on line 1776 was never true

1777 final_anno = [target_type] 

1778 final_anno.extend(anno) 

1779 return typed_dict, Annotated[tuple(final_anno)] 

1780 return typed_dict, target_type 

1781 

1782 

1783def _dispatch_parse_generator( 

1784 dispatch_type: type[DebputyDispatchableType], 

1785) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1786 def _dispatch_parse( 

1787 value: Any, 

1788 attribute_path: AttributePath, 

1789 parser_context: Optional["ParserContextData"], 

1790 ): 

1791 assert parser_context is not None 

1792 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1793 return dispatching_parser.parse_input( 

1794 value, attribute_path, parser_context=parser_context 

1795 ) 

1796 

1797 return _dispatch_parse 

1798 

1799 

1800def _dispatch_parser( 

1801 dispatch_type: type[DebputyDispatchableType], 

1802) -> AttributeTypeHandler: 

1803 return AttributeTypeHandler( 

1804 dispatch_type.__name__, 

1805 lambda *a: None, 

1806 mapper=_dispatch_parse_generator(dispatch_type), 

1807 ) 

1808 

1809 

1810def _parse_type( 

1811 attribute: str, 

1812 orig_td: Any, 

1813 forbid_optional: bool = True, 

1814 parsing_typed_dict_attribute: bool = True, 

1815) -> tuple[Any, tuple[Any, ...], bool]: 

1816 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1817 md: tuple[Any, ...] = tuple() 

1818 optional = False 

1819 if v is not None: 

1820 if v == Annotated: 

1821 anno = get_args(td) 

1822 md = anno[1:] 

1823 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1824 

1825 if td is _NONE_TYPE: 1825 ↛ 1826line 1825 didn't jump to line 1826 because the condition on line 1825 was never true

1826 raise ValueError( 

1827 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1828 " debputy manifest, so this attribute does not make sense in its current form." 

1829 ) 

1830 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1830 ↛ 1831line 1830 didn't jump to line 1831 because the condition on line 1830 was never true

1831 raise ValueError( 

1832 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1833 " Please use NotRequired for optional fields" 

1834 ) 

1835 

1836 return td, md, optional 

1837 

1838 

1839def _normalize_attribute_name(attribute: str) -> str: 

1840 if attribute.endswith("_"): 

1841 attribute = attribute[:-1] 

1842 return attribute.replace("_", "-") 

1843 

1844 

1845@dataclasses.dataclass 

1846class DetectedDebputyParseHint: 

1847 target_attribute: str 

1848 source_manifest_attribute: str | None 

1849 conflict_with_source_attributes: frozenset[str] 

1850 conditional_required: ConditionalRequired | None 

1851 applicable_as_path_hint: bool 

1852 

1853 @classmethod 

1854 def parse_annotations( 

1855 cls, 

1856 anno: tuple[Any, ...], 

1857 error_context: str, 

1858 default_attribute_name: str | None, 

1859 is_required: bool, 

1860 default_target_attribute: str | None = None, 

1861 allow_target_attribute_annotation: bool = False, 

1862 allow_source_attribute_annotations: bool = False, 

1863 ) -> "DetectedDebputyParseHint": 

1864 target_attr_anno = find_annotation(anno, TargetAttribute) 

1865 if target_attr_anno: 

1866 if not allow_target_attribute_annotation: 1866 ↛ 1867line 1866 didn't jump to line 1867 because the condition on line 1866 was never true

1867 raise ValueError( 

1868 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1869 ) 

1870 target_attribute = target_attr_anno.attribute 

1871 elif default_target_attribute is not None: 

1872 target_attribute = default_target_attribute 

1873 elif default_attribute_name is not None: 1873 ↛ 1876line 1873 didn't jump to line 1876 because the condition on line 1873 was always true

1874 target_attribute = default_attribute_name 

1875 else: 

1876 if default_attribute_name is None: 

1877 raise ValueError( 

1878 "allow_target_attribute_annotation must be True OR " 

1879 "default_attribute_name/default_target_attribute must be not None" 

1880 ) 

1881 raise ValueError( 

1882 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1883 ) 

1884 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1885 _source_attribute_allowed( 

1886 allow_source_attribute_annotations, error_context, source_attribute_anno 

1887 ) 

1888 if source_attribute_anno: 

1889 source_attribute_name = source_attribute_anno.attribute 

1890 elif default_attribute_name is not None: 

1891 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1892 else: 

1893 source_attribute_name = None 

1894 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1895 if mutual_exclusive_with_anno: 

1896 _source_attribute_allowed( 

1897 allow_source_attribute_annotations, 

1898 error_context, 

1899 mutual_exclusive_with_anno, 

1900 ) 

1901 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1902 else: 

1903 conflicting_attributes = frozenset() 

1904 conditional_required = find_annotation(anno, ConditionalRequired) 

1905 

1906 if conditional_required and is_required: 1906 ↛ 1907line 1906 didn't jump to line 1907 because the condition on line 1906 was never true

1907 if default_attribute_name is None: 

1908 raise ValueError( 

1909 "is_required cannot be True without default_attribute_name being not None" 

1910 ) 

1911 raise ValueError( 

1912 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1913 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1914 ) 

1915 

1916 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1917 applicable_as_path_hint = not_path_hint_anno is None 

1918 

1919 return DetectedDebputyParseHint( 

1920 target_attribute=target_attribute, 

1921 source_manifest_attribute=source_attribute_name, 

1922 conflict_with_source_attributes=conflicting_attributes, 

1923 conditional_required=conditional_required, 

1924 applicable_as_path_hint=applicable_as_path_hint, 

1925 ) 

1926 

1927 

1928def _source_attribute_allowed( 

1929 source_attribute_allowed: bool, 

1930 error_context: str, 

1931 annotation: DebputyParseHint | None, 

1932) -> None: 

1933 if source_attribute_allowed or annotation is None: 1933 ↛ 1935line 1933 didn't jump to line 1935 because the condition on line 1933 was always true

1934 return 

1935 raise ValueError( 

1936 f'The annotation "{annotation}" cannot be used here. {error_context}' 

1937 )