Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%

800 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-16 17:20 +0000

1import collections 

2import dataclasses 

3import typing 

4from types import UnionType 

5from typing import ( 

6 Any, 

7 TypedDict, 

8 get_type_hints, 

9 Annotated, 

10 get_args, 

11 get_origin, 

12 TypeVar, 

13 Generic, 

14 Optional, 

15 cast, 

16 Type, 

17 Union, 

18 List, 

19 NotRequired, 

20 Literal, 

21 TYPE_CHECKING, 

22) 

23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container 

24 

25 

26from debputy.manifest_parser.base_types import FileSystemMatchRule 

27from debputy.manifest_parser.exceptions import ( 

28 ManifestParseException, 

29) 

30from debputy.manifest_parser.mapper_code import ( 

31 normalize_into_list, 

32 wrap_into_list, 

33 map_each_element, 

34) 

35from debputy.manifest_parser.parse_hints import ( 

36 ConditionalRequired, 

37 DebputyParseHint, 

38 TargetAttribute, 

39 ManifestAttribute, 

40 ConflictWithSourceAttribute, 

41 NotPathHint, 

42) 

43from debputy.manifest_parser.parser_data import ParserContextData 

44from debputy.manifest_parser.tagging_types import ( 

45 DebputyParsedContent, 

46 DebputyDispatchableType, 

47 TypeMapping, 

48) 

49from debputy.manifest_parser.util import ( 

50 AttributePath, 

51 unpack_type, 

52 find_annotation, 

53 check_integration_mode, 

54) 

55from debputy.plugin.api.impl_types import ( 

56 DeclarativeInputParser, 

57 TD, 

58 ListWrappedDeclarativeInputParser, 

59 DispatchingObjectParser, 

60 DispatchingTableParser, 

61 TTP, 

62 TP, 

63 InPackageContextParser, 

64) 

65from debputy.plugin.api.spec import ( 

66 ParserDocumentation, 

67 DebputyIntegrationMode, 

68 StandardParserAttributeDocumentation, 

69 undocumented_attr, 

70 ParserAttributeDocumentation, 

71 reference_documentation, 

72) 

73from debputy.util import _info, _warn, assume_not_none 

74 

75 

76if TYPE_CHECKING: 

77 from debputy.lsp.diagnostics import LintSeverity 

78 

79 

80try: 

81 from Levenshtein import distance 

82except ImportError: 

83 _WARN_ONCE = False 

84 

85 def _detect_possible_typo( 

86 _key: str, 

87 _value: object, 

88 _manifest_attributes: Mapping[str, "AttributeDescription"], 

89 _path: "AttributePath", 

90 ) -> None: 

91 global _WARN_ONCE 

92 if not _WARN_ONCE: 

93 _WARN_ONCE = True 

94 _info( 

95 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

96 ) 

97 

98else: 

99 

100 def _detect_possible_typo( 

101 key: str, 

102 value: object, 

103 manifest_attributes: Mapping[str, "AttributeDescription"], 

104 path: "AttributePath", 

105 ) -> None: 

106 k_len = len(key) 

107 key_path = path[key] 

108 matches: list[str] = [] 

109 current_match_strength = 0 

110 for acceptable_key, attr in manifest_attributes.items(): 

111 if abs(k_len - len(acceptable_key)) > 2: 

112 continue 

113 d = distance(key, acceptable_key) 

114 if d > 2: 

115 continue 

116 try: 

117 attr.type_validator.ensure_type(value, key_path) 

118 except ManifestParseException: 

119 if attr.type_validator.base_type_match(value): 

120 match_strength = 1 

121 else: 

122 match_strength = 0 

123 else: 

124 match_strength = 2 

125 

126 if match_strength < current_match_strength: 

127 continue 

128 if match_strength > current_match_strength: 

129 current_match_strength = match_strength 

130 matches.clear() 

131 matches.append(acceptable_key) 

132 

133 if not matches: 

134 return 

135 ref = f'at "{path.path}"' if path else "at the manifest root level" 

136 if len(matches) == 1: 

137 possible_match = repr(matches[0]) 

138 _warn( 

139 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

140 ) 

141 else: 

142 matches.sort() 

143 possible_matches = ", ".join(repr(a) for a in matches) 

144 _warn( 

145 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

146 ) 

147 

148 

149SF = TypeVar("SF") 

150T = TypeVar("T") 

151S = TypeVar("S") 

152 

153 

154_NONE_TYPE = type(None) 

155 

156 

157# These must be able to appear in an "isinstance" check and must be builtin types. 

158BASIC_SIMPLE_TYPES = { 

159 str: "string", 

160 int: "integer", 

161 bool: "boolean", 

162} 

163 

164 

165class AttributeTypeHandler: 

166 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

167 

168 def __init__( 

169 self, 

170 description: str, 

171 ensure_type: Callable[[Any, AttributePath], None], 

172 *, 

173 base_type: type[Any] | None = None, 

174 mapper: None | ( 

175 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

176 ) = None, 

177 ) -> None: 

178 self._description = description 

179 self._ensure_type = ensure_type 

180 self.base_type = base_type 

181 self.mapper = mapper 

182 

183 def describe_type(self) -> str: 

184 return self._description 

185 

186 def ensure_type(self, obj: object, path: AttributePath) -> None: 

187 self._ensure_type(obj, path) 

188 

189 def base_type_match(self, obj: object) -> bool: 

190 base_type = self.base_type 

191 return base_type is not None and isinstance(obj, base_type) 

192 

193 def map_type( 

194 self, 

195 value: Any, 

196 path: AttributePath, 

197 parser_context: Optional["ParserContextData"], 

198 ) -> Any: 

199 mapper = self.mapper 

200 if mapper is not None: 

201 return mapper(value, path, parser_context) 

202 return value 

203 

204 def combine_mapper( 

205 self, 

206 mapper: None | ( 

207 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

208 ), 

209 ) -> "AttributeTypeHandler": 

210 if mapper is None: 

211 return self 

212 if self.mapper is not None: 

213 m = self.mapper 

214 

215 def _combined_mapper( 

216 value: Any, 

217 path: AttributePath, 

218 parser_context: Optional["ParserContextData"], 

219 ) -> Any: 

220 return mapper(m(value, path, parser_context), path, parser_context) 

221 

222 else: 

223 _combined_mapper = mapper 

224 

225 return AttributeTypeHandler( 

226 self._description, 

227 self._ensure_type, 

228 base_type=self.base_type, 

229 mapper=_combined_mapper, 

230 ) 

231 

232 

233@dataclasses.dataclass(slots=True) 

234class AttributeDescription: 

235 source_attribute_name: str 

236 target_attribute: str 

237 attribute_type: Any 

238 type_validator: AttributeTypeHandler 

239 annotations: tuple[Any, ...] 

240 conflicting_attributes: frozenset[str] 

241 conditional_required: Optional["ConditionalRequired"] 

242 parse_hints: Optional["DetectedDebputyParseHint"] = None 

243 is_optional: bool = False 

244 

245 

246def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

247 if attribute_path.path_hint is not None: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return True 

249 if isinstance(v, str): 

250 attribute_path.path_hint = v 

251 return True 

252 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

253 attribute_path.path_hint = v[0] 

254 return True 

255 return False 

256 

257 

258@dataclasses.dataclass(slots=True, frozen=True) 

259class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

260 alt_form_parser: AttributeDescription 

261 inline_reference_documentation: ParserDocumentation | None = None 

262 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

263 

264 def parse_input( 

265 self, 

266 value: object, 

267 path: AttributePath, 

268 *, 

269 parser_context: Optional["ParserContextData"] = None, 

270 ) -> TD: 

271 check_integration_mode( 

272 path, 

273 parser_context, 

274 self.expected_debputy_integration_mode, 

275 ) 

276 if self.reference_documentation_url is not None: 

277 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

278 else: 

279 doc_ref = "" 

280 

281 alt_form_parser = self.alt_form_parser 

282 if value is None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

284 if self.reference_documentation_url is not None: 

285 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

286 raise ManifestParseException( 

287 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

288 ) 

289 _extract_path_hint(value, path) 

290 alt_form_parser.type_validator.ensure_type(value, path) 

291 attribute = alt_form_parser.target_attribute 

292 alias_mapping = { 

293 attribute: ("", None), 

294 } 

295 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

296 path.alias_mapping = alias_mapping 

297 return cast("TD", {attribute: v}) 

298 

299 

300@dataclasses.dataclass(slots=True) 

301class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

302 input_time_required_parameters: frozenset[str] 

303 all_parameters: frozenset[str] 

304 manifest_attributes: Mapping[str, "AttributeDescription"] 

305 source_attributes: Mapping[str, "AttributeDescription"] 

306 at_least_one_of: frozenset[frozenset[str]] 

307 alt_form_parser: AttributeDescription | None 

308 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset() 

309 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None 

310 inline_reference_documentation: ParserDocumentation | None = None 

311 path_hint_source_attributes: Sequence[str] = tuple() 

312 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

313 

314 def _parse_alt_form( 

315 self, 

316 value: object, 

317 path: AttributePath, 

318 *, 

319 parser_context: Optional["ParserContextData"] = None, 

320 ) -> TD: 

321 alt_form_parser = self.alt_form_parser 

322 if alt_form_parser is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 raise ManifestParseException( 

324 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

325 ) 

326 _extract_path_hint(value, path) 

327 alt_form_parser.type_validator.ensure_type(value, path) 

328 assert ( 

329 value is not None 

330 ), "The alternative form was None, but the parser should have rejected None earlier." 

331 attribute = alt_form_parser.target_attribute 

332 alias_mapping = { 

333 attribute: ("", None), 

334 } 

335 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

336 path.alias_mapping = alias_mapping 

337 return cast("TD", {attribute: v}) 

338 

339 def _validate_expected_keys( 

340 self, 

341 value: dict[Any, Any], 

342 path: AttributePath, 

343 *, 

344 parser_context: Optional["ParserContextData"] = None, 

345 ) -> None: 

346 unknown_keys = value.keys() - self.all_parameters 

347 doc_ref = self._doc_url_error_suffix() 

348 if unknown_keys: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 for k in unknown_keys: 

350 if isinstance(k, str): 

351 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

352 unused_keys = self.all_parameters - value.keys() 

353 if unused_keys: 

354 k = ", ".join(unused_keys) 

355 raise ManifestParseException( 

356 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}' 

357 ) 

358 raise ManifestParseException( 

359 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}' 

360 ) 

361 missing_keys = self.input_time_required_parameters - value.keys() 

362 if missing_keys: 

363 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

364 raise ManifestParseException( 

365 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}" 

366 ) 

367 for maybe_required in self.all_parameters - value.keys(): 

368 attr = self.manifest_attributes[maybe_required] 

369 assert attr.conditional_required is None or parser_context is not None 

370 if ( 370 ↛ 376line 370 didn't jump to line 376 because the condition on line 370 was never true

371 attr.conditional_required is not None 

372 and attr.conditional_required.condition_applies( 

373 assume_not_none(parser_context) 

374 ) 

375 ): 

376 reason = attr.conditional_required.reason 

377 raise ManifestParseException( 

378 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}' 

379 ) 

380 for keyset in self.at_least_one_of: 

381 matched_keys = value.keys() & keyset 

382 if not matched_keys: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

384 raise ManifestParseException( 

385 f"At least one of the following keys must be present at {path.path_container_lc}:" 

386 f" {conditionally_required}{doc_ref}" 

387 ) 

388 for group in self.mutually_exclusive_attributes: 

389 matched = value.keys() & group 

390 if len(matched) > 1: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 ck = ", ".join(repr(k) for k in sorted(matched)) 

392 raise ManifestParseException( 

393 f"Could not parse {path.path_container_lc}: The following attributes are" 

394 f" mutually exclusive: {ck}{doc_ref}" 

395 ) 

396 

397 def _parse_typed_dict_form( 

398 self, 

399 value: dict[Any, Any], 

400 path: AttributePath, 

401 *, 

402 parser_context: Optional["ParserContextData"] = None, 

403 ) -> TD: 

404 self._validate_expected_keys(value, path, parser_context=parser_context) 

405 result = {} 

406 per_attribute_conflicts = self._per_attribute_conflicts() 

407 alias_mapping = {} 

408 for path_hint_source_attributes in self.path_hint_source_attributes: 

409 v = value.get(path_hint_source_attributes) 

410 if v is not None and _extract_path_hint(v, path): 

411 break 

412 for k, v in value.items(): 

413 attr = self.manifest_attributes[k] 

414 matched = value.keys() & per_attribute_conflicts[k] 

415 if matched: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 ck = ", ".join(repr(k) for k in sorted(matched)) 

417 raise ManifestParseException( 

418 f'The attribute "{k}" at {path.path} cannot be used with the following' 

419 f" attributes: {ck}{self._doc_url_error_suffix()}" 

420 ) 

421 nk = attr.target_attribute 

422 key_path = path[k] 

423 attr.type_validator.ensure_type(v, key_path) 

424 if v is None: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 continue 

426 if k != nk: 

427 alias_mapping[nk] = k, None 

428 v = attr.type_validator.map_type(v, key_path, parser_context) 

429 result[nk] = v 

430 if alias_mapping: 

431 path.alias_mapping = alias_mapping 

432 return cast("TD", result) 

433 

434 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

435 doc_url = self.reference_documentation_url 

436 if doc_url is not None: 

437 if see_url_version: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 return f" Please see {doc_url} for the documentation." 

439 return f" (Documentation: {doc_url})" 

440 return "" 

441 

442 def parse_input( 

443 self, 

444 value: object, 

445 path: AttributePath, 

446 *, 

447 parser_context: Optional["ParserContextData"] = None, 

448 ) -> TD: 

449 check_integration_mode( 

450 path, 

451 parser_context, 

452 self.expected_debputy_integration_mode, 

453 ) 

454 if value is None: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 form_note = " The attribute must be a mapping." 

456 if self.alt_form_parser is not None: 

457 form_note = ( 

458 " The attribute can be a mapping or a non-mapping format" 

459 ' (usually, "non-mapping format" means a string or a list of strings).' 

460 ) 

461 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

462 raise ManifestParseException( 

463 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

464 ) 

465 

466 if not isinstance(value, dict): 

467 return self._parse_alt_form(value, path, parser_context=parser_context) 

468 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

469 

470 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]: 

471 conflicts = self._per_attribute_conflicts_cache 

472 if conflicts is not None: 

473 return conflicts 

474 attrs = self.source_attributes 

475 conflicts = { 

476 a.source_attribute_name: frozenset( 

477 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

478 ) 

479 for a in attrs.values() 

480 } 

481 self._per_attribute_conflicts_cache = conflicts 

482 return self._per_attribute_conflicts_cache 

483 

484 

485def _is_path_attribute_candidate( 

486 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

487) -> bool: 

488 if ( 

489 source_attribute.parse_hints 

490 and not source_attribute.parse_hints.applicable_as_path_hint 

491 ): 

492 return False 

493 target_type = target_attribute.attribute_type 

494 _, origin, args = unpack_type(target_type, False) 

495 match_type = target_type 

496 if origin == list: 

497 match_type = args[0] 

498 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

499 

500 

501if typing.is_typeddict(DebputyParsedContent): 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was always true

502 is_typeddict = typing.is_typeddict 

503else: 

504 

505 def is_typeddict(t: Any) -> bool: 

506 if typing.is_typeddict(t): 

507 return True 

508 return isinstance(t, type) and issubclass(t, DebputyParsedContent) 

509 

510 

511class ParserGenerator: 

512 def __init__(self) -> None: 

513 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {} 

514 self._object_parsers: dict[str, DispatchingObjectParser] = {} 

515 self._table_parsers: dict[ 

516 type[DebputyDispatchableType], DispatchingTableParser[Any] 

517 ] = {} 

518 self._in_package_context_parser: dict[str, Any] = {} 

519 

520 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None: 

521 existing = self._registered_types.get(mapped_type.target_type) 

522 if existing is not None: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 raise ValueError(f"The type {existing} is already registered") 

524 self._registered_types[mapped_type.target_type] = mapped_type 

525 

526 def get_mapped_type_from_target_type( 

527 self, 

528 mapped_type: type[T], 

529 ) -> TypeMapping[Any, T] | None: 

530 return self._registered_types.get(mapped_type) 

531 

532 def discard_mapped_type(self, mapped_type: type[T]) -> None: 

533 del self._registered_types[mapped_type] 

534 

535 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None: 

536 assert rt not in self._table_parsers 

537 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

538 

539 def add_object_parser( 

540 self, 

541 path: str, 

542 *, 

543 parser_documentation: ParserDocumentation | None = None, 

544 expected_debputy_integration_mode: None | ( 

545 Container[DebputyIntegrationMode] 

546 ) = None, 

547 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error", 

548 allow_unknown_keys: bool = False, 

549 ) -> DispatchingObjectParser: 

550 assert path not in self._in_package_context_parser 

551 assert path not in self._object_parsers 

552 object_parser = DispatchingObjectParser( 

553 path, 

554 parser_documentation=parser_documentation, 

555 expected_debputy_integration_mode=expected_debputy_integration_mode, 

556 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity, 

557 allow_unknown_keys=allow_unknown_keys, 

558 ) 

559 self._object_parsers[path] = object_parser 

560 return object_parser 

561 

562 def add_in_package_context_parser( 

563 self, 

564 path: str, 

565 delegate: DeclarativeInputParser[Any], 

566 ) -> None: 

567 assert path not in self._in_package_context_parser 

568 assert path not in self._object_parsers 

569 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

570 

571 @property 

572 def dispatchable_table_parsers( 

573 self, 

574 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

575 return self._table_parsers 

576 

577 @property 

578 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

579 return self._object_parsers 

580 

581 def dispatch_parser_table_for( 

582 self, rule_type: TTP 

583 ) -> DispatchingTableParser[TP] | None: 

584 return cast( 

585 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

586 ) 

587 

588 def generate_parser( 

589 self, 

590 parsed_content: type[TD], 

591 *, 

592 source_content: SF | None = None, 

593 allow_optional: bool = False, 

594 inline_reference_documentation: ParserDocumentation | None = None, 

595 expected_debputy_integration_mode: None | ( 

596 Container[DebputyIntegrationMode] 

597 ) = None, 

598 automatic_docs: None | ( 

599 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] 

600 ) = None, 

601 ) -> DeclarativeInputParser[TD]: 

602 """Derive a parser from a TypedDict 

603 

604 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

605 or two that are used as a description. 

606 

607 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

608 their types. As an example: 

609 

610 >>> class InstallDocsRule(DebputyParsedContent): 

611 ... sources: List[str] 

612 ... into: List[str] 

613 >>> pg = ParserGenerator() 

614 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

615 

616 This will create a parser that would be able to interpret something like: 

617 

618 ```yaml 

619 install-docs: 

620 sources: ["docs/*"] 

621 into: ["my-pkg"] 

622 ``` 

623 

624 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

625 you can also provide a TypedDict describing the input, enabling more flexibility: 

626 

627 >>> class InstallDocsRule(DebputyParsedContent): 

628 ... sources: List[str] 

629 ... into: List[str] 

630 >>> class InputDocsRuleInputFormat(TypedDict): 

631 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

632 ... sources: NotRequired[List[str]] 

633 ... into: Union[str, List[str]] 

634 >>> pg = ParserGenerator() 

635 >>> flexible_parser = pg.generate_parser( 

636 ... InstallDocsRule, 

637 ... source_content=InputDocsRuleInputFormat, 

638 ... ) 

639 

640 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

641 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

642 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

643 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that 

644 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

645 both the previous input but also the following input: 

646 

647 ```yaml 

648 install-docs: 

649 source: "docs/*" 

650 into: "my-pkg" 

651 ``` 

652 

653 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

654 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

655 while parsing. 

656 

657 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

658 as part of the input. Example: 

659 

660 >>> class DiscardRule(DebputyParsedContent): 

661 ... paths: List[str] 

662 >>> class DiscardRuleInputDictFormat(TypedDict): 

663 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

664 ... paths: NotRequired[List[str]] 

665 >>> # This format relies on DiscardRule having exactly one Required attribute 

666 >>> DiscardRuleInputWithAltFormat = Union[ 

667 ... DiscardRuleInputDictFormat, 

668 ... str, 

669 ... List[str], 

670 ... ] 

671 >>> pg = ParserGenerator() 

672 >>> flexible_parser = pg.generate_parser( 

673 ... DiscardRule, 

674 ... source_content=DiscardRuleInputWithAltFormat, 

675 ... ) 

676 

677 

678 Supported types: 

679 * `List` - must have a fixed type argument (such as `List[str]`) 

680 * `str` 

681 * `int` 

682 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

683 in the debian/control file. The code receives the BinaryPackage instance 

684 matching that input. 

685 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

686 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

687 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

688 available statically on all Debian systems (must be in `base-passwd`). 

689 The user has multiple options for how to specify it (either via name or id). 

690 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

691 available statically on all Debian systems (must be in `base-passwd`). 

692 The user has multiple options for how to specify it (either via name or id). 

693 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

694 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

695 provides the `debputy' default `when` parameter for conditionals. 

696 

697 Supported special type-like parameters: 

698 

699 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

700 outermost level. Cannot vary between `parsed_content` and `source_content`. 

701 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

702 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

703 Automapping (see below) is restricted to two members in the Union. 

704 

705 Notable non-supported types: 

706 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

707 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

708 

709 Automatic mapping rules from `source_content` to `parsed_content`: 

710 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

711 `lambda value: value if isinstance(value, list) else [value]` 

712 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

713 

714 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

715 for concrete features that may be useful to you. 

716 

717 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

718 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

719 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

720 `List[TypedDict[...]]`. 

721 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

722 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

723 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

724 Note you should never pass the parsed_content as source_content directly. 

725 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

726 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

727 can keep this False). 

728 :param inline_reference_documentation: Optionally, programmatic documentation 

729 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the 

730 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage. 

731 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately 

732 (resulting in a "compile time" failure rather than a "runtime" failure). 

733 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

734 """ 

735 orig_parsed_content = parsed_content 

736 if source_content is parsed_content: 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true

737 raise ValueError( 

738 "Do not provide source_content if it is the same as parsed_content" 

739 ) 

740 is_list_wrapped = False 

741 if get_origin(orig_parsed_content) == list: 

742 parsed_content = get_args(orig_parsed_content)[0] 

743 is_list_wrapped = True 

744 

745 if isinstance(parsed_content, type) and issubclass( 

746 parsed_content, DebputyDispatchableType 

747 ): 

748 parser = self.dispatch_parser_table_for(parsed_content) 

749 if parser is None: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 raise ValueError( 

751 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

752 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

753 ) 

754 # FIXME: Only the list wrapped version has documentation. 

755 if is_list_wrapped: 755 ↛ 761line 755 didn't jump to line 761 because the condition on line 755 was always true

756 parser = ListWrappedDeclarativeInputParser( 

757 parser, 

758 inline_reference_documentation=inline_reference_documentation, 

759 expected_debputy_integration_mode=expected_debputy_integration_mode, 

760 ) 

761 return parser 

762 

763 if not is_typeddict(parsed_content): 763 ↛ 764line 763 didn't jump to line 764 because the condition on line 763 was never true

764 raise ValueError( 

765 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

766 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

767 ) 

768 if is_list_wrapped and source_content is not None: 

769 if get_origin(source_content) != list: 769 ↛ 770line 769 didn't jump to line 770 because the condition on line 769 was never true

770 raise ValueError( 

771 "If the parsed_content is a List type, then source_format must be a List type as well." 

772 ) 

773 source_content = get_args(source_content)[0] 

774 

775 target_attributes = self._parse_types( 

776 parsed_content, 

777 allow_source_attribute_annotations=source_content is None, 

778 forbid_optional=not allow_optional, 

779 ) 

780 required_target_parameters = frozenset(parsed_content.__required_keys__) 

781 parsed_alt_form = None 

782 non_mapping_source_only = False 

783 

784 if source_content is not None: 

785 default_target_attribute = None 

786 if len(required_target_parameters) == 1: 

787 default_target_attribute = next(iter(required_target_parameters)) 

788 

789 source_typed_dict, alt_source_forms = _extract_typed_dict( 

790 source_content, 

791 default_target_attribute, 

792 ) 

793 if alt_source_forms: 

794 parsed_alt_form = self._parse_alt_form( 

795 alt_source_forms, 

796 default_target_attribute, 

797 ) 

798 if source_typed_dict is not None: 

799 source_content_attributes = self._parse_types( 

800 source_typed_dict, 

801 allow_target_attribute_annotation=True, 

802 allow_source_attribute_annotations=True, 

803 forbid_optional=not allow_optional, 

804 ) 

805 source_content_parameter = "source_content" 

806 source_and_parsed_differs = True 

807 else: 

808 source_typed_dict = parsed_content 

809 source_content_attributes = target_attributes 

810 source_content_parameter = "parsed_content" 

811 source_and_parsed_differs = True 

812 non_mapping_source_only = True 

813 else: 

814 source_typed_dict = parsed_content 

815 source_content_attributes = target_attributes 

816 source_content_parameter = "parsed_content" 

817 source_and_parsed_differs = False 

818 

819 sources = collections.defaultdict(set) 

820 seen_targets = set() 

821 seen_source_names: dict[str, str] = {} 

822 source_attributes: dict[str, AttributeDescription] = {} 

823 path_hint_source_attributes = [] 

824 

825 for k in source_content_attributes: 

826 ia = source_content_attributes[k] 

827 

828 ta = ( 

829 target_attributes.get(ia.target_attribute) 

830 if source_and_parsed_differs 

831 else ia 

832 ) 

833 if ta is None: 833 ↛ 835line 833 didn't jump to line 835 because the condition on line 833 was never true

834 # Error message would be wrong if this assertion is false. 

835 assert source_and_parsed_differs 

836 raise ValueError( 

837 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

838 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

839 ) 

840 if _is_path_attribute_candidate(ia, ta): 

841 path_hint_source_attributes.append(ia.source_attribute_name) 

842 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

843 if existing_source_name: 843 ↛ 844line 843 didn't jump to line 844 because the condition on line 843 was never true

844 raise ValueError( 

845 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

846 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

847 f' so only one attribute use "{ia.source_attribute_name}".' 

848 ) 

849 seen_source_names[ia.source_attribute_name] = k 

850 seen_targets.add(ta.target_attribute) 

851 sources[ia.target_attribute].add(k) 

852 if source_and_parsed_differs: 

853 bridge_mapper = self._type_normalize( 

854 k, ia.attribute_type, ta.attribute_type, False 

855 ) 

856 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

857 source_attributes[k] = ia 

858 

859 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]: 

860 return frozenset( 

861 source_content_attributes[a].source_attribute_name for a in td_name 

862 ) 

863 

864 _check_attributes( 

865 parsed_content, 

866 source_typed_dict, 

867 source_content_attributes, 

868 sources, 

869 ) 

870 

871 at_least_one_of = frozenset( 

872 _as_attr_names(g) 

873 for k, g in sources.items() 

874 if len(g) > 1 and k in required_target_parameters 

875 ) 

876 

877 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 missing = ", ".join( 

879 repr(k) for k in (target_attributes.keys() - seen_targets) 

880 ) 

881 raise ValueError( 

882 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

883 f" {missing}" 

884 ) 

885 all_mutually_exclusive_fields = frozenset( 

886 _as_attr_names(g) for g in sources.values() if len(g) > 1 

887 ) 

888 

889 all_parameters = ( 

890 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

891 ) 

892 _check_conflicts( 

893 source_content_attributes, 

894 source_typed_dict.__required_keys__, 

895 all_parameters, 

896 ) 

897 

898 manifest_attributes = { 

899 a.source_attribute_name: a for a in source_content_attributes.values() 

900 } 

901 

902 if parsed_alt_form is not None: 

903 target_attribute = parsed_alt_form.target_attribute 

904 if ( 904 ↛ 909line 904 didn't jump to line 909 because the condition on line 904 was never true

905 target_attribute not in required_target_parameters 

906 and required_target_parameters 

907 or len(required_target_parameters) > 1 

908 ): 

909 raise NotImplementedError( 

910 "When using alternative source formats (Union[TypedDict, ...]), then the" 

911 " target must have at most one require parameter" 

912 ) 

913 bridge_mapper = self._type_normalize( 

914 target_attribute, 

915 parsed_alt_form.attribute_type, 

916 target_attributes[target_attribute].attribute_type, 

917 False, 

918 ) 

919 parsed_alt_form.type_validator = ( 

920 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

921 ) 

922 

923 inline_reference_documentation = ( 

924 _verify_and_auto_correct_inline_reference_documentation( 

925 parsed_content, 

926 source_typed_dict, 

927 source_content_attributes, 

928 inline_reference_documentation, 

929 parsed_alt_form is not None, 

930 automatic_docs, 

931 ) 

932 ) 

933 if non_mapping_source_only: 

934 parser = DeclarativeNonMappingInputParser( 

935 assume_not_none(parsed_alt_form), 

936 inline_reference_documentation=inline_reference_documentation, 

937 expected_debputy_integration_mode=expected_debputy_integration_mode, 

938 ) 

939 else: 

940 parser = DeclarativeMappingInputParser( 

941 _as_attr_names(source_typed_dict.__required_keys__), 

942 _as_attr_names(all_parameters), 

943 manifest_attributes, 

944 source_attributes, 

945 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

946 alt_form_parser=parsed_alt_form, 

947 at_least_one_of=at_least_one_of, 

948 inline_reference_documentation=inline_reference_documentation, 

949 path_hint_source_attributes=tuple(path_hint_source_attributes), 

950 expected_debputy_integration_mode=expected_debputy_integration_mode, 

951 ) 

952 if is_list_wrapped: 

953 parser = ListWrappedDeclarativeInputParser( 

954 parser, 

955 expected_debputy_integration_mode=expected_debputy_integration_mode, 

956 ) 

957 return parser 

958 

959 def _as_type_validator( 

960 self, 

961 attribute: str, 

962 provided_type: Any, 

963 parsing_typed_dict_attribute: bool, 

964 ) -> AttributeTypeHandler: 

965 assert not isinstance(provided_type, tuple) 

966 

967 if isinstance(provided_type, type) and issubclass( 

968 provided_type, DebputyDispatchableType 

969 ): 

970 return _dispatch_parser(provided_type) 

971 

972 unmapped_type = self._strip_mapped_types( 

973 provided_type, 

974 parsing_typed_dict_attribute, 

975 ) 

976 type_normalizer = self._type_normalize( 

977 attribute, 

978 unmapped_type, 

979 provided_type, 

980 parsing_typed_dict_attribute, 

981 ) 

982 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type( 

983 unmapped_type, 

984 parsing_typed_dict_attribute, 

985 ) 

986 _, t_provided_orig, t_provided_args = unpack_type( 

987 provided_type, 

988 parsing_typed_dict_attribute, 

989 ) 

990 

991 if ( 991 ↛ 997line 991 didn't jump to line 997 because the condition on line 991 was never true

992 t_unmapped_orig == Union 

993 and t_unmapped_args 

994 and len(t_unmapped_args) == 2 

995 and any(v is _NONE_TYPE for v in t_unmapped_args) 

996 ): 

997 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

998 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

999 validator = self._as_type_validator( 

1000 attribute, actual_type, parsing_typed_dict_attribute 

1001 ) 

1002 

1003 def _validator(v: Any, path: AttributePath) -> None: 

1004 if v is None: 

1005 return 

1006 validator.ensure_type(v, path) 

1007 

1008 return AttributeTypeHandler( 

1009 validator.describe_type(), 

1010 _validator, 

1011 base_type=validator.base_type, 

1012 mapper=type_normalizer, 

1013 ) 

1014 

1015 if unmapped_type in BASIC_SIMPLE_TYPES: 

1016 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1017 

1018 type_mapping = self._registered_types.get(provided_type) 

1019 if type_mapping is not None: 

1020 simple_type = f" ({type_name})" 

1021 type_name = type_mapping.target_type.__name__ 

1022 else: 

1023 simple_type = "" 

1024 

1025 def _validator(v: Any, path: AttributePath) -> None: 

1026 if not isinstance(v, unmapped_type): 

1027 _validation_type_error( 

1028 path, f"The attribute must be a {type_name}{simple_type}" 

1029 ) 

1030 

1031 return AttributeTypeHandler( 

1032 type_name, 

1033 _validator, 

1034 base_type=unmapped_type, 

1035 mapper=type_normalizer, 

1036 ) 

1037 if t_unmapped_orig == list: 

1038 if not t_unmapped_args: 1038 ↛ 1039line 1038 didn't jump to line 1039 because the condition on line 1038 was never true

1039 raise ValueError( 

1040 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1041 ) 

1042 

1043 genetic_type = t_unmapped_args[0] 

1044 key_mapper = self._as_type_validator( 

1045 attribute, 

1046 genetic_type, 

1047 parsing_typed_dict_attribute, 

1048 ) 

1049 

1050 def _validator(v: Any, path: AttributePath) -> None: 

1051 if not isinstance(v, list): 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 _validation_type_error(path, "The attribute must be a list") 

1053 for i, v in enumerate(v): 

1054 key_mapper.ensure_type(v, path[i]) 

1055 

1056 list_mapper = ( 

1057 map_each_element(key_mapper.mapper) 

1058 if key_mapper.mapper is not None 

1059 else None 

1060 ) 

1061 

1062 return AttributeTypeHandler( 

1063 f"List of {key_mapper.describe_type()}", 

1064 _validator, 

1065 base_type=list, 

1066 mapper=type_normalizer, 

1067 ).combine_mapper(list_mapper) 

1068 if is_typeddict(provided_type): 

1069 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1070 return AttributeTypeHandler( 

1071 description=f"{provided_type.__name__} (Typed Mapping)", 

1072 ensure_type=lambda v, ap: None, 

1073 base_type=dict, 

1074 mapper=lambda v, ap, cv: subparser.parse_input( 

1075 v, ap, parser_context=cv 

1076 ), 

1077 ) 

1078 if t_unmapped_orig == dict: 

1079 if not t_unmapped_args or len(t_unmapped_args) != 2: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true

1080 raise ValueError( 

1081 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1082 ) 

1083 if t_unmapped_args[0] != str: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true

1084 raise ValueError( 

1085 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1086 " Currently, only `str` is supported (Dict[str, Y])" 

1087 ) 

1088 key_mapper = self._as_type_validator( 

1089 attribute, 

1090 t_unmapped_args[0], 

1091 parsing_typed_dict_attribute, 

1092 ) 

1093 value_mapper = self._as_type_validator( 

1094 attribute, 

1095 t_unmapped_args[1], 

1096 parsing_typed_dict_attribute, 

1097 ) 

1098 

1099 if key_mapper.base_type is None: 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true

1100 raise ValueError( 

1101 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1102 f" without trivial base types (such as `str`) are not supported at the moment." 

1103 ) 

1104 

1105 if value_mapper.mapper is not None: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 raise ValueError( 

1107 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1108 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1109 " Better typing may come later" 

1110 ) 

1111 

1112 def _validator(uv: Any, path: AttributePath) -> None: 

1113 if not isinstance(uv, dict): 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true

1114 _validation_type_error(path, "The attribute must be a mapping") 

1115 key_name = "the first key in the mapping" 

1116 for i, (k, v) in enumerate(uv.items()): 

1117 if not key_mapper.base_type_match(k): 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 kp = path.copy_with_path_hint(key_name) 

1119 _validation_type_error( 

1120 kp, 

1121 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1122 ) 

1123 key_name = f"the key after {k}" 

1124 value_mapper.ensure_type(v, path[k]) 

1125 

1126 return AttributeTypeHandler( 

1127 f"Mapping of {value_mapper.describe_type()}", 

1128 _validator, 

1129 base_type=dict, 

1130 mapper=type_normalizer, 

1131 ).combine_mapper(key_mapper.mapper) 

1132 if t_unmapped_orig in (Union, UnionType): 

1133 if _is_two_arg_x_list_x(t_provided_args): 

1134 # Force the order to be "X, List[X]" as it simplifies the code 

1135 x_list_x = ( 

1136 t_provided_args 

1137 if get_origin(t_provided_args[1]) == list 

1138 else (t_provided_args[1], t_provided_args[0]) 

1139 ) 

1140 

1141 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1142 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1143 # case to avoid this assert and fall into the "generic case". 

1144 assert get_origin(x_list_x[0]) != list 

1145 x_subtype_checker = self._as_type_validator( 

1146 attribute, 

1147 x_list_x[0], 

1148 parsing_typed_dict_attribute, 

1149 ) 

1150 list_x_subtype_checker = self._as_type_validator( 

1151 attribute, 

1152 x_list_x[1], 

1153 parsing_typed_dict_attribute, 

1154 ) 

1155 type_description = x_subtype_checker.describe_type() 

1156 type_description = f"{type_description} or a list of {type_description}" 

1157 

1158 def _validator(v: Any, path: AttributePath) -> None: 

1159 if isinstance(v, list): 

1160 list_x_subtype_checker.ensure_type(v, path) 

1161 else: 

1162 x_subtype_checker.ensure_type(v, path) 

1163 

1164 return AttributeTypeHandler( 

1165 type_description, 

1166 _validator, 

1167 mapper=type_normalizer, 

1168 ) 

1169 else: 

1170 subtype_checker = [ 

1171 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1172 for a in t_unmapped_args 

1173 ] 

1174 type_description = "one-of: " + ", ".join( 

1175 f"{sc.describe_type()}" for sc in subtype_checker 

1176 ) 

1177 mapper = subtype_checker[0].mapper 

1178 if any(mapper != sc.mapper for sc in subtype_checker): 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true

1179 raise ValueError( 

1180 f'Cannot handle the union "{provided_type}" as the target types need different' 

1181 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1182 " where X is a non-collection type." 

1183 ) 

1184 

1185 def _validator(v: Any, path: AttributePath) -> None: 

1186 partial_matches = [] 

1187 for sc in subtype_checker: 1187 ↛ 1195line 1187 didn't jump to line 1195 because the loop on line 1187 didn't complete

1188 try: 

1189 sc.ensure_type(v, path) 

1190 return 

1191 except ManifestParseException as e: 

1192 if sc.base_type_match(v): 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true

1193 partial_matches.append((sc, e)) 

1194 

1195 if len(partial_matches) == 1: 

1196 raise partial_matches[0][1] 

1197 _validation_type_error( 

1198 path, f"Could not match against: {type_description}" 

1199 ) 

1200 

1201 return AttributeTypeHandler( 

1202 type_description, 

1203 _validator, 

1204 mapper=type_normalizer, 

1205 ) 

1206 if t_unmapped_orig == Literal: 

1207 # We want "x" for string values; repr provides 'x' 

1208 pretty = ", ".join( 

1209 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args 

1210 ) 

1211 

1212 def _validator(v: Any, path: AttributePath) -> None: 

1213 if v not in t_unmapped_args: 

1214 value_hint = "" 

1215 if isinstance(v, str): 1215 ↛ 1217line 1215 didn't jump to line 1217 because the condition on line 1215 was always true

1216 value_hint = f"({v}) " 

1217 _validation_type_error( 

1218 path, 

1219 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1220 ) 

1221 

1222 return AttributeTypeHandler( 

1223 f"One of the following literal values: {pretty}", 

1224 _validator, 

1225 ) 

1226 

1227 if provided_type == Any: 1227 ↛ 1232line 1227 didn't jump to line 1232 because the condition on line 1227 was always true

1228 return AttributeTypeHandler( 

1229 "any (unvalidated)", 

1230 lambda *a: None, 

1231 ) 

1232 raise ValueError( 

1233 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1234 ) 

1235 

1236 def _parse_types( 

1237 self, 

1238 spec: type[TypedDict], 

1239 allow_target_attribute_annotation: bool = False, 

1240 allow_source_attribute_annotations: bool = False, 

1241 forbid_optional: bool = True, 

1242 ) -> dict[str, AttributeDescription]: 

1243 annotations = get_type_hints(spec, include_extras=True) 

1244 return { 

1245 k: self._attribute_description( 

1246 k, 

1247 t, 

1248 k in spec.__required_keys__, 

1249 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1250 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1251 forbid_optional=forbid_optional, 

1252 ) 

1253 for k, t in annotations.items() 

1254 } 

1255 

1256 def _attribute_description( 

1257 self, 

1258 attribute: str, 

1259 orig_td: Any, 

1260 is_required: bool, 

1261 forbid_optional: bool = True, 

1262 allow_target_attribute_annotation: bool = False, 

1263 allow_source_attribute_annotations: bool = False, 

1264 ) -> AttributeDescription: 

1265 td, anno, is_optional = _parse_type( 

1266 attribute, orig_td, forbid_optional=forbid_optional 

1267 ) 

1268 type_validator = self._as_type_validator(attribute, td, True) 

1269 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1270 anno, 

1271 f' Seen with attribute "{attribute}".', 

1272 attribute, 

1273 is_required, 

1274 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1275 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1276 ) 

1277 return AttributeDescription( 

1278 target_attribute=parsed_annotations.target_attribute, 

1279 attribute_type=td, 

1280 type_validator=type_validator, 

1281 annotations=anno, 

1282 is_optional=is_optional, 

1283 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1284 conditional_required=parsed_annotations.conditional_required, 

1285 source_attribute_name=assume_not_none( 

1286 parsed_annotations.source_manifest_attribute 

1287 ), 

1288 parse_hints=parsed_annotations, 

1289 ) 

1290 

1291 def _parse_alt_form( 

1292 self, 

1293 alt_form, 

1294 default_target_attribute: str | None, 

1295 ) -> AttributeDescription: 

1296 td, anno, is_optional = _parse_type( 

1297 "source_format alternative form", 

1298 alt_form, 

1299 forbid_optional=True, 

1300 parsing_typed_dict_attribute=False, 

1301 ) 

1302 type_validator = self._as_type_validator( 

1303 "source_format alternative form", 

1304 td, 

1305 True, 

1306 ) 

1307 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1308 anno, 

1309 " The alternative for source_format.", 

1310 None, 

1311 False, 

1312 default_target_attribute=default_target_attribute, 

1313 allow_target_attribute_annotation=True, 

1314 allow_source_attribute_annotations=False, 

1315 ) 

1316 return AttributeDescription( 

1317 target_attribute=parsed_annotations.target_attribute, 

1318 attribute_type=td, 

1319 type_validator=type_validator, 

1320 annotations=anno, 

1321 is_optional=is_optional, 

1322 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1323 conditional_required=parsed_annotations.conditional_required, 

1324 source_attribute_name="Alt form of the source_format", 

1325 ) 

1326 

1327 def _union_narrowing( 

1328 self, 

1329 input_type: Any, 

1330 target_type: Any, 

1331 parsing_typed_dict_attribute: bool, 

1332 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1333 _, input_orig, input_args = unpack_type( 

1334 input_type, parsing_typed_dict_attribute 

1335 ) 

1336 _, target_orig, target_args = unpack_type( 

1337 target_type, parsing_typed_dict_attribute 

1338 ) 

1339 

1340 if input_orig not in (Union, UnionType) or not input_args: 1340 ↛ 1341line 1340 didn't jump to line 1341 because the condition on line 1340 was never true

1341 raise ValueError("input_type must be a Union[...] with non-empty args") 

1342 

1343 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1344 # - Where X = Y or there is a simple standard transformation from X to Y. 

1345 

1346 if target_orig not in (Union, UnionType, list) or not target_args: 

1347 # Not supported 

1348 return None 

1349 

1350 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1350 ↛ 1352line 1350 didn't jump to line 1352 because the condition on line 1350 was never true

1351 # Not needed (identity mapping) 

1352 return None 

1353 

1354 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1354 ↛ 1356line 1354 didn't jump to line 1356 because the condition on line 1354 was never true

1355 # Not supported 

1356 return None 

1357 

1358 target_arg = target_args[0] 

1359 simplified_type = self._strip_mapped_types( 

1360 target_arg, parsing_typed_dict_attribute 

1361 ) 

1362 acceptable_types = { 

1363 target_arg, 

1364 list[target_arg], # type: ignore 

1365 List[target_arg], # type: ignore 

1366 simplified_type, 

1367 list[simplified_type], # type: ignore 

1368 List[simplified_type], # type: ignore 

1369 } 

1370 target_format = ( 

1371 target_arg, 

1372 list[target_arg], # type: ignore 

1373 List[target_arg], # type: ignore 

1374 ) 

1375 in_target_format = 0 

1376 in_simple_format = 0 

1377 for input_arg in input_args: 

1378 if input_arg not in acceptable_types: 1378 ↛ 1380line 1378 didn't jump to line 1380 because the condition on line 1378 was never true

1379 # Not supported 

1380 return None 

1381 if input_arg in target_format: 

1382 in_target_format += 1 

1383 else: 

1384 in_simple_format += 1 

1385 

1386 assert in_simple_format or in_target_format 

1387 

1388 if in_target_format and not in_simple_format: 

1389 # Union[X, List[X]] -> List[X] 

1390 return normalize_into_list 

1391 mapped = self._registered_types[target_arg] 

1392 if not in_target_format and in_simple_format: 1392 ↛ 1407line 1392 didn't jump to line 1407 because the condition on line 1392 was always true

1393 # Union[X, List[X]] -> List[Y] 

1394 

1395 def _mapper_x_list_y( 

1396 x: Any | list[Any], 

1397 ap: AttributePath, 

1398 pc: Optional["ParserContextData"], 

1399 ) -> list[Any]: 

1400 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1401 

1402 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1403 

1404 return _mapper_x_list_y 

1405 

1406 # Union[Y, List[X]] -> List[Y] 

1407 if not isinstance(target_arg, type): 

1408 raise ValueError( 

1409 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1410 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1411 f" in the source content (but both a mix of both)" 

1412 ) 

1413 

1414 def _mapper_mixed_list_y( 

1415 x: Any | list[Any], 

1416 ap: AttributePath, 

1417 pc: Optional["ParserContextData"], 

1418 ) -> list[Any]: 

1419 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1420 

1421 return [ 

1422 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1423 for x in in_list_form 

1424 ] 

1425 

1426 return _mapper_mixed_list_y 

1427 

1428 def _type_normalize( 

1429 self, 

1430 attribute: str, 

1431 input_type: Any, 

1432 target_type: Any, 

1433 parsing_typed_dict_attribute: bool, 

1434 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1435 if input_type == target_type: 

1436 return None 

1437 _, input_orig, input_args = unpack_type( 

1438 input_type, parsing_typed_dict_attribute 

1439 ) 

1440 _, target_orig, target_args = unpack_type( 

1441 target_type, 

1442 parsing_typed_dict_attribute, 

1443 ) 

1444 if input_orig in (Union, UnionType): 

1445 result = self._union_narrowing( 

1446 input_type, target_type, parsing_typed_dict_attribute 

1447 ) 

1448 if result: 

1449 return result 

1450 elif target_orig == list and target_args[0] == input_type: 

1451 return wrap_into_list 

1452 

1453 mapped = self._registered_types.get(target_type) 

1454 if mapped is not None and input_type == mapped.source_type: 

1455 # Source -> Target 

1456 return mapped.mapper 

1457 if target_orig == list and target_args: 1457 ↛ 1475line 1457 didn't jump to line 1475 because the condition on line 1457 was always true

1458 mapped = self._registered_types.get(target_args[0]) 

1459 if mapped is not None: 1459 ↛ 1475line 1459 didn't jump to line 1475 because the condition on line 1459 was always true

1460 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1461 mapped_type: TypeMapping = mapped 

1462 if input_type == mapped.source_type: 1462 ↛ 1464line 1462 didn't jump to line 1464 because the condition on line 1462 was never true

1463 # Source -> List[Target] 

1464 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1465 if ( 1465 ↛ 1475line 1465 didn't jump to line 1475 because the condition on line 1465 was always true

1466 input_orig == list 

1467 and input_args 

1468 and input_args[0] == mapped_type.source_type 

1469 ): 

1470 # List[Source] -> List[Target] 

1471 return lambda xs, ap, pc: [ 

1472 mapped_type.mapper(x, ap, pc) for x in xs 

1473 ] 

1474 

1475 raise ValueError( 

1476 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1477 f" {input_type} to {target_type}" 

1478 ) 

1479 

1480 def _strip_mapped_types( 

1481 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1482 ) -> Any: 

1483 m = self._registered_types.get(orig_td) 

1484 if m is not None: 

1485 return m.source_type 

1486 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1487 if v == list: 

1488 arg = args[0] 

1489 m = self._registered_types.get(arg) 

1490 if m: 

1491 return list[m.source_type] # type: ignore 

1492 if v in (Union, UnionType): 

1493 stripped_args = tuple( 

1494 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1495 ) 

1496 if stripped_args != args: 

1497 return Union[stripped_args] 

1498 return orig_td 

1499 

1500 

1501def _sort_key(attr: StandardParserAttributeDocumentation) -> Any: 

1502 key = next(iter(attr.attributes)) 

1503 return attr.sort_category, key 

1504 

1505 

1506def _apply_std_docs( 

1507 std_doc_table: ( 

1508 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1509 ), 

1510 source_format_typed_dict: type[Any], 

1511 attribute_docs: Sequence[ParserAttributeDocumentation] | None, 

1512) -> Sequence[ParserAttributeDocumentation] | None: 

1513 if std_doc_table is None or not std_doc_table: 1513 ↛ 1516line 1513 didn't jump to line 1516 because the condition on line 1513 was always true

1514 return attribute_docs 

1515 

1516 has_docs_for = set() 

1517 if attribute_docs: 

1518 for attribute_doc in attribute_docs: 

1519 has_docs_for.update(attribute_doc.attributes) 

1520 

1521 base_seen = set() 

1522 std_docs_used = [] 

1523 

1524 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", [])) 

1525 base_seen.update(remaining_bases) 

1526 while remaining_bases: 

1527 base = remaining_bases.pop() 

1528 new_bases_to_check = { 

1529 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen 

1530 } 

1531 remaining_bases.update(new_bases_to_check) 

1532 base_seen.update(new_bases_to_check) 

1533 std_docs = std_doc_table.get(base) 

1534 if std_docs: 

1535 for std_doc in std_docs: 

1536 if any(a in has_docs_for for a in std_doc.attributes): 

1537 # If there is any overlap, do not add the docs 

1538 continue 

1539 has_docs_for.update(std_doc.attributes) 

1540 std_docs_used.append(std_doc) 

1541 

1542 if not std_docs_used: 

1543 return attribute_docs 

1544 docs = sorted(std_docs_used, key=_sort_key) 

1545 if attribute_docs: 

1546 # Plugin provided attributes first 

1547 c = list(attribute_docs) 

1548 c.extend(docs) 

1549 docs = c 

1550 return tuple(docs) 

1551 

1552 

1553def _verify_and_auto_correct_inline_reference_documentation( 

1554 parsed_content: type[TD], 

1555 source_typed_dict: type[Any], 

1556 source_content_attributes: Mapping[str, AttributeDescription], 

1557 inline_reference_documentation: ParserDocumentation | None, 

1558 has_alt_form: bool, 

1559 automatic_docs: ( 

1560 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1561 ) = None, 

1562) -> ParserDocumentation | None: 

1563 orig_attribute_docs = ( 

1564 inline_reference_documentation.attribute_doc 

1565 if inline_reference_documentation 

1566 else None 

1567 ) 

1568 attribute_docs = _apply_std_docs( 

1569 automatic_docs, 

1570 source_typed_dict, 

1571 orig_attribute_docs, 

1572 ) 

1573 if inline_reference_documentation is None and attribute_docs is None: 

1574 return None 

1575 changes = {} 

1576 if attribute_docs: 

1577 seen = set() 

1578 had_any_custom_docs = False 

1579 for attr_doc in attribute_docs: 

1580 if not isinstance(attr_doc, StandardParserAttributeDocumentation): 

1581 had_any_custom_docs = True 

1582 for attr_name in attr_doc.attributes: 

1583 attr = source_content_attributes.get(attr_name) 

1584 if attr is None: 1584 ↛ 1585line 1584 didn't jump to line 1585 because the condition on line 1584 was never true

1585 raise ValueError( 

1586 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1587 f' references an attribute "{attr_name}", which does not exist in the source format.' 

1588 ) 

1589 if attr_name in seen: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true

1590 raise ValueError( 

1591 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1592 f' has documentation for "{attr_name}" twice, which is not supported.' 

1593 f" Please document it at most once" 

1594 ) 

1595 seen.add(attr_name) 

1596 undocumented = source_content_attributes.keys() - seen 

1597 if undocumented: 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true

1598 if had_any_custom_docs: 

1599 undocumented_attrs = ", ".join(undocumented) 

1600 raise ValueError( 

1601 f"The following attributes were not documented for the source format of" 

1602 f" {parsed_content.__qualname__}. If this is deliberate, then please" 

1603 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1604 f" {undocumented_attrs}" 

1605 ) 

1606 combined_docs = list(attribute_docs) 

1607 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented)) 

1608 attribute_docs = combined_docs 

1609 

1610 if attribute_docs and orig_attribute_docs != attribute_docs: 1610 ↛ 1611line 1610 didn't jump to line 1611 because the condition on line 1610 was never true

1611 assert attribute_docs is not None 

1612 changes["attribute_doc"] = tuple(attribute_docs) 

1613 

1614 if ( 1614 ↛ 1619line 1614 didn't jump to line 1619 because the condition on line 1614 was never true

1615 inline_reference_documentation is not None 

1616 and inline_reference_documentation.alt_parser_description 

1617 and not has_alt_form 

1618 ): 

1619 raise ValueError( 

1620 "The inline_reference_documentation had documentation for an non-mapping format," 

1621 " but the source format does not have a non-mapping format." 

1622 ) 

1623 if changes: 1623 ↛ 1624line 1623 didn't jump to line 1624 because the condition on line 1623 was never true

1624 if inline_reference_documentation is None: 

1625 inline_reference_documentation = reference_documentation() 

1626 return inline_reference_documentation.replace(**changes) 

1627 return inline_reference_documentation 

1628 

1629 

1630def _check_conflicts( 

1631 input_content_attributes: dict[str, AttributeDescription], 

1632 required_attributes: frozenset[str], 

1633 all_attributes: frozenset[str], 

1634) -> None: 

1635 for attr_name, attr in input_content_attributes.items(): 

1636 if attr_name in required_attributes and attr.conflicting_attributes: 1636 ↛ 1637line 1636 didn't jump to line 1637 because the condition on line 1636 was never true

1637 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1638 raise ValueError( 

1639 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1640 " This makes it impossible to use these attributes. Either remove the attributes" 

1641 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1642 " optional (NotRequired)" 

1643 ) 

1644 else: 

1645 required_conflicts = attr.conflicting_attributes & required_attributes 

1646 if required_conflicts: 1646 ↛ 1647line 1646 didn't jump to line 1647 because the condition on line 1646 was never true

1647 c = ", ".join(repr(a) for a in required_conflicts) 

1648 raise ValueError( 

1649 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1650 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1651 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1652 ) 

1653 unknown_attributes = attr.conflicting_attributes - all_attributes 

1654 if unknown_attributes: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the condition on line 1654 was never true

1655 c = ", ".join(repr(a) for a in unknown_attributes) 

1656 raise ValueError( 

1657 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1658 f" None of these attributes were declared in the input." 

1659 ) 

1660 

1661 

1662def _check_attributes( 

1663 content: type[TypedDict], 

1664 input_content: type[TypedDict], 

1665 input_content_attributes: dict[str, AttributeDescription], 

1666 sources: Mapping[str, Collection[str]], 

1667) -> None: 

1668 target_required_keys = content.__required_keys__ 

1669 input_required_keys = input_content.__required_keys__ 

1670 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1671 

1672 for input_name in all_input_keys: 

1673 attr = input_content_attributes[input_name] 

1674 target_name = attr.target_attribute 

1675 source_names = sources[target_name] 

1676 input_is_required = input_name in input_required_keys 

1677 target_is_required = target_name in target_required_keys 

1678 

1679 assert source_names 

1680 

1681 if input_is_required and len(source_names) > 1: 1681 ↛ 1682line 1681 didn't jump to line 1682 because the condition on line 1681 was never true

1682 raise ValueError( 

1683 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1684 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1685 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1686 f' "{input_name}" might be missing a NotRequired type' 

1687 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1688 ) 

1689 if not input_is_required and target_is_required and len(source_names) == 1: 1689 ↛ 1690line 1689 didn't jump to line 1690 because the condition on line 1689 was never true

1690 raise ValueError( 

1691 f'The source attribute "{input_name}" is not marked as required and maps to' 

1692 f' "{target_name}", which is marked as required. As there are no other attributes' 

1693 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1694 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1695 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1696 ) 

1697 

1698 

1699def _validation_type_error(path: AttributePath, message: str) -> None: 

1700 raise ManifestParseException( 

1701 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1702 ) 

1703 

1704 

1705def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool: 

1706 if len(t_args) != 2: 

1707 return False 

1708 lhs, rhs = t_args 

1709 if get_origin(lhs) == list: 

1710 if get_origin(rhs) == list: 1710 ↛ 1713line 1710 didn't jump to line 1713 because the condition on line 1710 was never true

1711 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1712 # does not support it. 

1713 return False 

1714 l_args = get_args(lhs) 

1715 return bool(l_args and l_args[0] == rhs) 

1716 if get_origin(rhs) == list: 

1717 r_args = get_args(rhs) 

1718 return bool(r_args and r_args[0] == lhs) 

1719 return False 

1720 

1721 

1722def _extract_typed_dict( 

1723 base_type, 

1724 default_target_attribute: str | None, 

1725) -> tuple[type[TypedDict] | None, Any]: 

1726 if is_typeddict(base_type): 

1727 return base_type, None 

1728 _, origin, args = unpack_type(base_type, False) 

1729 if origin != Union: 

1730 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1730 ↛ 1731line 1730 didn't jump to line 1731 because the condition on line 1730 was never true

1731 raise ValueError( 

1732 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1733 ) 

1734 return None, base_type 

1735 typed_dicts = [x for x in args if is_typeddict(x)] 

1736 if len(typed_dicts) > 1: 1736 ↛ 1737line 1736 didn't jump to line 1737 because the condition on line 1736 was never true

1737 raise ValueError( 

1738 "When source_format is a Union, it must contain at most one TypedDict" 

1739 ) 

1740 typed_dict = typed_dicts[0] if typed_dicts else None 

1741 

1742 if any(x is None or x is _NONE_TYPE for x in args): 1742 ↛ 1743line 1742 didn't jump to line 1743 because the condition on line 1742 was never true

1743 raise ValueError( 

1744 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1745 ) 

1746 

1747 if any( 1747 ↛ 1752line 1747 didn't jump to line 1752 because the condition on line 1747 was never true

1748 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1749 for x in args 

1750 if x is not typed_dict 

1751 ): 

1752 raise ValueError( 

1753 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1754 ) 

1755 remaining = [x for x in args if x is not typed_dict] 

1756 has_target_attribute = False 

1757 anno = None 

1758 if len(remaining) == 1: 1758 ↛ 1759line 1758 didn't jump to line 1759 because the condition on line 1758 was never true

1759 base_type, anno, _ = _parse_type( 

1760 "source_format alternative form", 

1761 remaining[0], 

1762 forbid_optional=True, 

1763 parsing_typed_dict_attribute=False, 

1764 ) 

1765 has_target_attribute = bool(anno) and any( 

1766 isinstance(x, TargetAttribute) for x in anno 

1767 ) 

1768 target_type = base_type 

1769 else: 

1770 target_type = Union[tuple(remaining)] 

1771 

1772 if default_target_attribute is None and not has_target_attribute: 1772 ↛ 1773line 1772 didn't jump to line 1773 because the condition on line 1772 was never true

1773 raise ValueError( 

1774 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1775 " OR the parsed_content format must have exactly one attribute that is required." 

1776 ) 

1777 if anno: 1777 ↛ 1778line 1777 didn't jump to line 1778 because the condition on line 1777 was never true

1778 final_anno = [target_type] 

1779 final_anno.extend(anno) 

1780 return typed_dict, Annotated[tuple(final_anno)] 

1781 return typed_dict, target_type 

1782 

1783 

1784def _dispatch_parse_generator( 

1785 dispatch_type: type[DebputyDispatchableType], 

1786) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1787 def _dispatch_parse( 

1788 value: Any, 

1789 attribute_path: AttributePath, 

1790 parser_context: Optional["ParserContextData"], 

1791 ): 

1792 assert parser_context is not None 

1793 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1794 return dispatching_parser.parse_input( 

1795 value, attribute_path, parser_context=parser_context 

1796 ) 

1797 

1798 return _dispatch_parse 

1799 

1800 

1801def _dispatch_parser( 

1802 dispatch_type: type[DebputyDispatchableType], 

1803) -> AttributeTypeHandler: 

1804 return AttributeTypeHandler( 

1805 dispatch_type.__name__, 

1806 lambda *a: None, 

1807 mapper=_dispatch_parse_generator(dispatch_type), 

1808 ) 

1809 

1810 

1811def _parse_type( 

1812 attribute: str, 

1813 orig_td: Any, 

1814 forbid_optional: bool = True, 

1815 parsing_typed_dict_attribute: bool = True, 

1816) -> tuple[Any, tuple[Any, ...], bool]: 

1817 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1818 md: tuple[Any, ...] = tuple() 

1819 optional = False 

1820 if v is not None: 

1821 if v == Annotated: 

1822 anno = get_args(td) 

1823 md = anno[1:] 

1824 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1825 

1826 if td is _NONE_TYPE: 1826 ↛ 1827line 1826 didn't jump to line 1827 because the condition on line 1826 was never true

1827 raise ValueError( 

1828 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1829 " debputy manifest, so this attribute does not make sense in its current form." 

1830 ) 

1831 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1831 ↛ 1832line 1831 didn't jump to line 1832 because the condition on line 1831 was never true

1832 raise ValueError( 

1833 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1834 " Please use NotRequired for optional fields" 

1835 ) 

1836 

1837 return td, md, optional 

1838 

1839 

1840def _normalize_attribute_name(attribute: str) -> str: 

1841 if attribute.endswith("_"): 

1842 attribute = attribute[:-1] 

1843 return attribute.replace("_", "-") 

1844 

1845 

1846@dataclasses.dataclass 

1847class DetectedDebputyParseHint: 

1848 target_attribute: str 

1849 source_manifest_attribute: str | None 

1850 conflict_with_source_attributes: frozenset[str] 

1851 conditional_required: ConditionalRequired | None 

1852 applicable_as_path_hint: bool 

1853 

1854 @classmethod 

1855 def parse_annotations( 

1856 cls, 

1857 anno: tuple[Any, ...], 

1858 error_context: str, 

1859 default_attribute_name: str | None, 

1860 is_required: bool, 

1861 default_target_attribute: str | None = None, 

1862 allow_target_attribute_annotation: bool = False, 

1863 allow_source_attribute_annotations: bool = False, 

1864 ) -> "DetectedDebputyParseHint": 

1865 target_attr_anno = find_annotation(anno, TargetAttribute) 

1866 if target_attr_anno: 

1867 if not allow_target_attribute_annotation: 1867 ↛ 1868line 1867 didn't jump to line 1868 because the condition on line 1867 was never true

1868 raise ValueError( 

1869 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1870 ) 

1871 target_attribute = target_attr_anno.attribute 

1872 elif default_target_attribute is not None: 

1873 target_attribute = default_target_attribute 

1874 elif default_attribute_name is not None: 1874 ↛ 1877line 1874 didn't jump to line 1877 because the condition on line 1874 was always true

1875 target_attribute = default_attribute_name 

1876 else: 

1877 if default_attribute_name is None: 

1878 raise ValueError( 

1879 "allow_target_attribute_annotation must be True OR " 

1880 "default_attribute_name/default_target_attribute must be not None" 

1881 ) 

1882 raise ValueError( 

1883 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1884 ) 

1885 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1886 _source_attribute_allowed( 

1887 allow_source_attribute_annotations, error_context, source_attribute_anno 

1888 ) 

1889 if source_attribute_anno: 

1890 source_attribute_name = source_attribute_anno.attribute 

1891 elif default_attribute_name is not None: 

1892 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1893 else: 

1894 source_attribute_name = None 

1895 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1896 if mutual_exclusive_with_anno: 

1897 _source_attribute_allowed( 

1898 allow_source_attribute_annotations, 

1899 error_context, 

1900 mutual_exclusive_with_anno, 

1901 ) 

1902 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1903 else: 

1904 conflicting_attributes = frozenset() 

1905 conditional_required = find_annotation(anno, ConditionalRequired) 

1906 

1907 if conditional_required and is_required: 1907 ↛ 1908line 1907 didn't jump to line 1908 because the condition on line 1907 was never true

1908 if default_attribute_name is None: 

1909 raise ValueError( 

1910 "is_required cannot be True without default_attribute_name being not None" 

1911 ) 

1912 raise ValueError( 

1913 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1914 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1915 ) 

1916 

1917 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1918 applicable_as_path_hint = not_path_hint_anno is None 

1919 

1920 return DetectedDebputyParseHint( 

1921 target_attribute=target_attribute, 

1922 source_manifest_attribute=source_attribute_name, 

1923 conflict_with_source_attributes=conflicting_attributes, 

1924 conditional_required=conditional_required, 

1925 applicable_as_path_hint=applicable_as_path_hint, 

1926 ) 

1927 

1928 

1929def _source_attribute_allowed( 

1930 source_attribute_allowed: bool, 

1931 error_context: str, 

1932 annotation: DebputyParseHint | None, 

1933) -> None: 

1934 if source_attribute_allowed or annotation is None: 1934 ↛ 1936line 1934 didn't jump to line 1936 because the condition on line 1934 was always true

1935 return 

1936 raise ValueError( 

1937 f'The annotation "{annotation}" cannot be used here. {error_context}' 

1938 )