Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%

800 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import collections 

2import dataclasses 

3import typing 

4from types import UnionType 

5from typing import ( 

6 Any, 

7 Tuple, 

8 TypedDict, 

9 Dict, 

10 get_type_hints, 

11 Annotated, 

12 get_args, 

13 get_origin, 

14 TypeVar, 

15 Generic, 

16 FrozenSet, 

17 Optional, 

18 cast, 

19 Type, 

20 Union, 

21 List, 

22 NotRequired, 

23 Literal, 

24 TYPE_CHECKING, 

25) 

26from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container 

27 

28 

29from debputy.manifest_parser.base_types import FileSystemMatchRule 

30from debputy.manifest_parser.exceptions import ( 

31 ManifestParseException, 

32) 

33from debputy.manifest_parser.mapper_code import ( 

34 normalize_into_list, 

35 wrap_into_list, 

36 map_each_element, 

37) 

38from debputy.manifest_parser.parse_hints import ( 

39 ConditionalRequired, 

40 DebputyParseHint, 

41 TargetAttribute, 

42 ManifestAttribute, 

43 ConflictWithSourceAttribute, 

44 NotPathHint, 

45) 

46from debputy.manifest_parser.parser_data import ParserContextData 

47from debputy.manifest_parser.tagging_types import ( 

48 DebputyParsedContent, 

49 DebputyDispatchableType, 

50 TypeMapping, 

51) 

52from debputy.manifest_parser.util import ( 

53 AttributePath, 

54 unpack_type, 

55 find_annotation, 

56 check_integration_mode, 

57) 

58from debputy.plugin.api.impl_types import ( 

59 DeclarativeInputParser, 

60 TD, 

61 ListWrappedDeclarativeInputParser, 

62 DispatchingObjectParser, 

63 DispatchingTableParser, 

64 TTP, 

65 TP, 

66 InPackageContextParser, 

67) 

68from debputy.plugin.api.spec import ( 

69 ParserDocumentation, 

70 DebputyIntegrationMode, 

71 StandardParserAttributeDocumentation, 

72 undocumented_attr, 

73 ParserAttributeDocumentation, 

74 reference_documentation, 

75) 

76from debputy.util import _info, _warn, assume_not_none 

77 

78 

79if TYPE_CHECKING: 

80 from debputy.lsp.diagnostics import LintSeverity 

81 

82 

83try: 

84 from Levenshtein import distance 

85except ImportError: 

86 _WARN_ONCE = False 

87 

88 def _detect_possible_typo( 

89 _key: str, 

90 _value: object, 

91 _manifest_attributes: Mapping[str, "AttributeDescription"], 

92 _path: "AttributePath", 

93 ) -> None: 

94 global _WARN_ONCE 

95 if not _WARN_ONCE: 

96 _WARN_ONCE = True 

97 _info( 

98 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

99 ) 

100 

101else: 

102 

103 def _detect_possible_typo( 

104 key: str, 

105 value: object, 

106 manifest_attributes: Mapping[str, "AttributeDescription"], 

107 path: "AttributePath", 

108 ) -> None: 

109 k_len = len(key) 

110 key_path = path[key] 

111 matches: list[str] = [] 

112 current_match_strength = 0 

113 for acceptable_key, attr in manifest_attributes.items(): 

114 if abs(k_len - len(acceptable_key)) > 2: 

115 continue 

116 d = distance(key, acceptable_key) 

117 if d > 2: 

118 continue 

119 try: 

120 attr.type_validator.ensure_type(value, key_path) 

121 except ManifestParseException: 

122 if attr.type_validator.base_type_match(value): 

123 match_strength = 1 

124 else: 

125 match_strength = 0 

126 else: 

127 match_strength = 2 

128 

129 if match_strength < current_match_strength: 

130 continue 

131 if match_strength > current_match_strength: 

132 current_match_strength = match_strength 

133 matches.clear() 

134 matches.append(acceptable_key) 

135 

136 if not matches: 

137 return 

138 ref = f'at "{path.path}"' if path else "at the manifest root level" 

139 if len(matches) == 1: 

140 possible_match = repr(matches[0]) 

141 _warn( 

142 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

143 ) 

144 else: 

145 matches.sort() 

146 possible_matches = ", ".join(repr(a) for a in matches) 

147 _warn( 

148 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

149 ) 

150 

151 

152SF = TypeVar("SF") 

153T = TypeVar("T") 

154S = TypeVar("S") 

155 

156 

157_NONE_TYPE = type(None) 

158 

159 

160# These must be able to appear in an "isinstance" check and must be builtin types. 

161BASIC_SIMPLE_TYPES = { 

162 str: "string", 

163 int: "integer", 

164 bool: "boolean", 

165} 

166 

167 

168class AttributeTypeHandler: 

169 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

170 

171 def __init__( 

172 self, 

173 description: str, 

174 ensure_type: Callable[[Any, AttributePath], None], 

175 *, 

176 base_type: type[Any] | None = None, 

177 mapper: None | ( 

178 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

179 ) = None, 

180 ) -> None: 

181 self._description = description 

182 self._ensure_type = ensure_type 

183 self.base_type = base_type 

184 self.mapper = mapper 

185 

186 def describe_type(self) -> str: 

187 return self._description 

188 

189 def ensure_type(self, obj: object, path: AttributePath) -> None: 

190 self._ensure_type(obj, path) 

191 

192 def base_type_match(self, obj: object) -> bool: 

193 base_type = self.base_type 

194 return base_type is not None and isinstance(obj, base_type) 

195 

196 def map_type( 

197 self, 

198 value: Any, 

199 path: AttributePath, 

200 parser_context: Optional["ParserContextData"], 

201 ) -> Any: 

202 mapper = self.mapper 

203 if mapper is not None: 

204 return mapper(value, path, parser_context) 

205 return value 

206 

207 def combine_mapper( 

208 self, 

209 mapper: None | ( 

210 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

211 ), 

212 ) -> "AttributeTypeHandler": 

213 if mapper is None: 

214 return self 

215 if self.mapper is not None: 

216 m = self.mapper 

217 

218 def _combined_mapper( 

219 value: Any, 

220 path: AttributePath, 

221 parser_context: Optional["ParserContextData"], 

222 ) -> Any: 

223 return mapper(m(value, path, parser_context), path, parser_context) 

224 

225 else: 

226 _combined_mapper = mapper 

227 

228 return AttributeTypeHandler( 

229 self._description, 

230 self._ensure_type, 

231 base_type=self.base_type, 

232 mapper=_combined_mapper, 

233 ) 

234 

235 

236@dataclasses.dataclass(slots=True) 

237class AttributeDescription: 

238 source_attribute_name: str 

239 target_attribute: str 

240 attribute_type: Any 

241 type_validator: AttributeTypeHandler 

242 annotations: tuple[Any, ...] 

243 conflicting_attributes: frozenset[str] 

244 conditional_required: Optional["ConditionalRequired"] 

245 parse_hints: Optional["DetectedDebputyParseHint"] = None 

246 is_optional: bool = False 

247 

248 

249def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

250 if attribute_path.path_hint is not None: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 return True 

252 if isinstance(v, str): 

253 attribute_path.path_hint = v 

254 return True 

255 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

256 attribute_path.path_hint = v[0] 

257 return True 

258 return False 

259 

260 

261@dataclasses.dataclass(slots=True, frozen=True) 

262class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

263 alt_form_parser: AttributeDescription 

264 inline_reference_documentation: ParserDocumentation | None = None 

265 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

266 

267 def parse_input( 

268 self, 

269 value: object, 

270 path: AttributePath, 

271 *, 

272 parser_context: Optional["ParserContextData"] = None, 

273 ) -> TD: 

274 check_integration_mode( 

275 path, 

276 parser_context, 

277 self.expected_debputy_integration_mode, 

278 ) 

279 if self.reference_documentation_url is not None: 

280 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

281 else: 

282 doc_ref = "" 

283 

284 alt_form_parser = self.alt_form_parser 

285 if value is None: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

287 if self.reference_documentation_url is not None: 

288 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

289 raise ManifestParseException( 

290 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

291 ) 

292 _extract_path_hint(value, path) 

293 alt_form_parser.type_validator.ensure_type(value, path) 

294 attribute = alt_form_parser.target_attribute 

295 alias_mapping = { 

296 attribute: ("", None), 

297 } 

298 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

299 path.alias_mapping = alias_mapping 

300 return cast("TD", {attribute: v}) 

301 

302 

303@dataclasses.dataclass(slots=True) 

304class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

305 input_time_required_parameters: frozenset[str] 

306 all_parameters: frozenset[str] 

307 manifest_attributes: Mapping[str, "AttributeDescription"] 

308 source_attributes: Mapping[str, "AttributeDescription"] 

309 at_least_one_of: frozenset[frozenset[str]] 

310 alt_form_parser: AttributeDescription | None 

311 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset() 

312 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None 

313 inline_reference_documentation: ParserDocumentation | None = None 

314 path_hint_source_attributes: Sequence[str] = tuple() 

315 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

316 

317 def _parse_alt_form( 

318 self, 

319 value: object, 

320 path: AttributePath, 

321 *, 

322 parser_context: Optional["ParserContextData"] = None, 

323 ) -> TD: 

324 alt_form_parser = self.alt_form_parser 

325 if alt_form_parser is None: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true

326 raise ManifestParseException( 

327 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

328 ) 

329 _extract_path_hint(value, path) 

330 alt_form_parser.type_validator.ensure_type(value, path) 

331 assert ( 

332 value is not None 

333 ), "The alternative form was None, but the parser should have rejected None earlier." 

334 attribute = alt_form_parser.target_attribute 

335 alias_mapping = { 

336 attribute: ("", None), 

337 } 

338 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

339 path.alias_mapping = alias_mapping 

340 return cast("TD", {attribute: v}) 

341 

342 def _validate_expected_keys( 

343 self, 

344 value: dict[Any, Any], 

345 path: AttributePath, 

346 *, 

347 parser_context: Optional["ParserContextData"] = None, 

348 ) -> None: 

349 unknown_keys = value.keys() - self.all_parameters 

350 doc_ref = self._doc_url_error_suffix() 

351 if unknown_keys: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 for k in unknown_keys: 

353 if isinstance(k, str): 

354 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

355 unused_keys = self.all_parameters - value.keys() 

356 if unused_keys: 

357 k = ", ".join(unused_keys) 

358 raise ManifestParseException( 

359 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}' 

360 ) 

361 raise ManifestParseException( 

362 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}' 

363 ) 

364 missing_keys = self.input_time_required_parameters - value.keys() 

365 if missing_keys: 

366 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

367 raise ManifestParseException( 

368 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}" 

369 ) 

370 for maybe_required in self.all_parameters - value.keys(): 

371 attr = self.manifest_attributes[maybe_required] 

372 assert attr.conditional_required is None or parser_context is not None 

373 if ( 373 ↛ 379line 373 didn't jump to line 379 because the condition on line 373 was never true

374 attr.conditional_required is not None 

375 and attr.conditional_required.condition_applies( 

376 assume_not_none(parser_context) 

377 ) 

378 ): 

379 reason = attr.conditional_required.reason 

380 raise ManifestParseException( 

381 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}' 

382 ) 

383 for keyset in self.at_least_one_of: 

384 matched_keys = value.keys() & keyset 

385 if not matched_keys: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

387 raise ManifestParseException( 

388 f"At least one of the following keys must be present at {path.path_container_lc}:" 

389 f" {conditionally_required}{doc_ref}" 

390 ) 

391 for group in self.mutually_exclusive_attributes: 

392 matched = value.keys() & group 

393 if len(matched) > 1: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 ck = ", ".join(repr(k) for k in sorted(matched)) 

395 raise ManifestParseException( 

396 f"Could not parse {path.path_container_lc}: The following attributes are" 

397 f" mutually exclusive: {ck}{doc_ref}" 

398 ) 

399 

400 def _parse_typed_dict_form( 

401 self, 

402 value: dict[Any, Any], 

403 path: AttributePath, 

404 *, 

405 parser_context: Optional["ParserContextData"] = None, 

406 ) -> TD: 

407 self._validate_expected_keys(value, path, parser_context=parser_context) 

408 result = {} 

409 per_attribute_conflicts = self._per_attribute_conflicts() 

410 alias_mapping = {} 

411 for path_hint_source_attributes in self.path_hint_source_attributes: 

412 v = value.get(path_hint_source_attributes) 

413 if v is not None and _extract_path_hint(v, path): 

414 break 

415 for k, v in value.items(): 

416 attr = self.manifest_attributes[k] 

417 matched = value.keys() & per_attribute_conflicts[k] 

418 if matched: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true

419 ck = ", ".join(repr(k) for k in sorted(matched)) 

420 raise ManifestParseException( 

421 f'The attribute "{k}" at {path.path} cannot be used with the following' 

422 f" attributes: {ck}{self._doc_url_error_suffix()}" 

423 ) 

424 nk = attr.target_attribute 

425 key_path = path[k] 

426 attr.type_validator.ensure_type(v, key_path) 

427 if v is None: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 continue 

429 if k != nk: 

430 alias_mapping[nk] = k, None 

431 v = attr.type_validator.map_type(v, key_path, parser_context) 

432 result[nk] = v 

433 if alias_mapping: 

434 path.alias_mapping = alias_mapping 

435 return cast("TD", result) 

436 

437 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

438 doc_url = self.reference_documentation_url 

439 if doc_url is not None: 

440 if see_url_version: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 return f" Please see {doc_url} for the documentation." 

442 return f" (Documentation: {doc_url})" 

443 return "" 

444 

445 def parse_input( 

446 self, 

447 value: object, 

448 path: AttributePath, 

449 *, 

450 parser_context: Optional["ParserContextData"] = None, 

451 ) -> TD: 

452 check_integration_mode( 

453 path, 

454 parser_context, 

455 self.expected_debputy_integration_mode, 

456 ) 

457 if value is None: 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 form_note = " The attribute must be a mapping." 

459 if self.alt_form_parser is not None: 

460 form_note = ( 

461 " The attribute can be a mapping or a non-mapping format" 

462 ' (usually, "non-mapping format" means a string or a list of strings).' 

463 ) 

464 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

465 raise ManifestParseException( 

466 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

467 ) 

468 

469 if not isinstance(value, dict): 

470 return self._parse_alt_form(value, path, parser_context=parser_context) 

471 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

472 

473 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]: 

474 conflicts = self._per_attribute_conflicts_cache 

475 if conflicts is not None: 

476 return conflicts 

477 attrs = self.source_attributes 

478 conflicts = { 

479 a.source_attribute_name: frozenset( 

480 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

481 ) 

482 for a in attrs.values() 

483 } 

484 self._per_attribute_conflicts_cache = conflicts 

485 return self._per_attribute_conflicts_cache 

486 

487 

488def _is_path_attribute_candidate( 

489 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

490) -> bool: 

491 if ( 

492 source_attribute.parse_hints 

493 and not source_attribute.parse_hints.applicable_as_path_hint 

494 ): 

495 return False 

496 target_type = target_attribute.attribute_type 

497 _, origin, args = unpack_type(target_type, False) 

498 match_type = target_type 

499 if origin == list: 

500 match_type = args[0] 

501 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

502 

503 

504if typing.is_typeddict(DebputyParsedContent): 504 ↛ 508line 504 didn't jump to line 508 because the condition on line 504 was always true

505 is_typeddict = typing.is_typeddict 

506else: 

507 

508 def is_typeddict(t: Any) -> bool: 

509 if typing.is_typeddict(t): 

510 return True 

511 return isinstance(t, type) and issubclass(t, DebputyParsedContent) 

512 

513 

514class ParserGenerator: 

515 def __init__(self) -> None: 

516 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {} 

517 self._object_parsers: dict[str, DispatchingObjectParser] = {} 

518 self._table_parsers: dict[ 

519 type[DebputyDispatchableType], DispatchingTableParser[Any] 

520 ] = {} 

521 self._in_package_context_parser: dict[str, Any] = {} 

522 

523 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None: 

524 existing = self._registered_types.get(mapped_type.target_type) 

525 if existing is not None: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 raise ValueError(f"The type {existing} is already registered") 

527 self._registered_types[mapped_type.target_type] = mapped_type 

528 

529 def get_mapped_type_from_target_type( 

530 self, 

531 mapped_type: type[T], 

532 ) -> TypeMapping[Any, T] | None: 

533 return self._registered_types.get(mapped_type) 

534 

535 def discard_mapped_type(self, mapped_type: type[T]) -> None: 

536 del self._registered_types[mapped_type] 

537 

538 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None: 

539 assert rt not in self._table_parsers 

540 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

541 

542 def add_object_parser( 

543 self, 

544 path: str, 

545 *, 

546 parser_documentation: ParserDocumentation | None = None, 

547 expected_debputy_integration_mode: None | ( 

548 Container[DebputyIntegrationMode] 

549 ) = None, 

550 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error", 

551 allow_unknown_keys: bool = False, 

552 ) -> DispatchingObjectParser: 

553 assert path not in self._in_package_context_parser 

554 assert path not in self._object_parsers 

555 object_parser = DispatchingObjectParser( 

556 path, 

557 parser_documentation=parser_documentation, 

558 expected_debputy_integration_mode=expected_debputy_integration_mode, 

559 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity, 

560 allow_unknown_keys=allow_unknown_keys, 

561 ) 

562 self._object_parsers[path] = object_parser 

563 return object_parser 

564 

565 def add_in_package_context_parser( 

566 self, 

567 path: str, 

568 delegate: DeclarativeInputParser[Any], 

569 ) -> None: 

570 assert path not in self._in_package_context_parser 

571 assert path not in self._object_parsers 

572 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

573 

574 @property 

575 def dispatchable_table_parsers( 

576 self, 

577 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

578 return self._table_parsers 

579 

580 @property 

581 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

582 return self._object_parsers 

583 

584 def dispatch_parser_table_for( 

585 self, rule_type: TTP 

586 ) -> DispatchingTableParser[TP] | None: 

587 return cast( 

588 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

589 ) 

590 

591 def generate_parser( 

592 self, 

593 parsed_content: type[TD], 

594 *, 

595 source_content: SF | None = None, 

596 allow_optional: bool = False, 

597 inline_reference_documentation: ParserDocumentation | None = None, 

598 expected_debputy_integration_mode: None | ( 

599 Container[DebputyIntegrationMode] 

600 ) = None, 

601 automatic_docs: None | ( 

602 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] 

603 ) = None, 

604 ) -> DeclarativeInputParser[TD]: 

605 """Derive a parser from a TypedDict 

606 

607 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

608 or two that are used as a description. 

609 

610 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

611 their types. As an example: 

612 

613 >>> class InstallDocsRule(DebputyParsedContent): 

614 ... sources: List[str] 

615 ... into: List[str] 

616 >>> pg = ParserGenerator() 

617 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

618 

619 This will create a parser that would be able to interpret something like: 

620 

621 ```yaml 

622 install-docs: 

623 sources: ["docs/*"] 

624 into: ["my-pkg"] 

625 ``` 

626 

627 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

628 you can also provide a TypedDict describing the input, enabling more flexibility: 

629 

630 >>> class InstallDocsRule(DebputyParsedContent): 

631 ... sources: List[str] 

632 ... into: List[str] 

633 >>> class InputDocsRuleInputFormat(TypedDict): 

634 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

635 ... sources: NotRequired[List[str]] 

636 ... into: Union[str, List[str]] 

637 >>> pg = ParserGenerator() 

638 >>> flexible_parser = pg.generate_parser( 

639 ... InstallDocsRule, 

640 ... source_content=InputDocsRuleInputFormat, 

641 ... ) 

642 

643 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

644 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

645 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

646 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that 

647 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

648 both the previous input but also the following input: 

649 

650 ```yaml 

651 install-docs: 

652 source: "docs/*" 

653 into: "my-pkg" 

654 ``` 

655 

656 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

657 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

658 while parsing. 

659 

660 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

661 as part of the input. Example: 

662 

663 >>> class DiscardRule(DebputyParsedContent): 

664 ... paths: List[str] 

665 >>> class DiscardRuleInputDictFormat(TypedDict): 

666 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

667 ... paths: NotRequired[List[str]] 

668 >>> # This format relies on DiscardRule having exactly one Required attribute 

669 >>> DiscardRuleInputWithAltFormat = Union[ 

670 ... DiscardRuleInputDictFormat, 

671 ... str, 

672 ... List[str], 

673 ... ] 

674 >>> pg = ParserGenerator() 

675 >>> flexible_parser = pg.generate_parser( 

676 ... DiscardRule, 

677 ... source_content=DiscardRuleInputWithAltFormat, 

678 ... ) 

679 

680 

681 Supported types: 

682 * `List` - must have a fixed type argument (such as `List[str]`) 

683 * `str` 

684 * `int` 

685 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

686 in the debian/control file. The code receives the BinaryPackage instance 

687 matching that input. 

688 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

689 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

690 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

691 available statically on all Debian systems (must be in `base-passwd`). 

692 The user has multiple options for how to specify it (either via name or id). 

693 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

694 available statically on all Debian systems (must be in `base-passwd`). 

695 The user has multiple options for how to specify it (either via name or id). 

696 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

697 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

698 provides the `debputy' default `when` parameter for conditionals. 

699 

700 Supported special type-like parameters: 

701 

702 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

703 outermost level. Cannot vary between `parsed_content` and `source_content`. 

704 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

705 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

706 Automapping (see below) is restricted to two members in the Union. 

707 

708 Notable non-supported types: 

709 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

710 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

711 

712 Automatic mapping rules from `source_content` to `parsed_content`: 

713 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

714 `lambda value: value if isinstance(value, list) else [value]` 

715 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

716 

717 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

718 for concrete features that may be useful to you. 

719 

720 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

721 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

722 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

723 `List[TypedDict[...]]`. 

724 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

725 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

726 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

727 Note you should never pass the parsed_content as source_content directly. 

728 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

729 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

730 can keep this False). 

731 :param inline_reference_documentation: Optionally, programmatic documentation 

732 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the 

733 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage. 

734 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately 

735 (resulting in a "compile time" failure rather than a "runtime" failure). 

736 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

737 """ 

738 orig_parsed_content = parsed_content 

739 if source_content is parsed_content: 739 ↛ 740line 739 didn't jump to line 740 because the condition on line 739 was never true

740 raise ValueError( 

741 "Do not provide source_content if it is the same as parsed_content" 

742 ) 

743 is_list_wrapped = False 

744 if get_origin(orig_parsed_content) == list: 

745 parsed_content = get_args(orig_parsed_content)[0] 

746 is_list_wrapped = True 

747 

748 if isinstance(parsed_content, type) and issubclass( 

749 parsed_content, DebputyDispatchableType 

750 ): 

751 parser = self.dispatch_parser_table_for(parsed_content) 

752 if parser is None: 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true

753 raise ValueError( 

754 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

755 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

756 ) 

757 # FIXME: Only the list wrapped version has documentation. 

758 if is_list_wrapped: 758 ↛ 764line 758 didn't jump to line 764 because the condition on line 758 was always true

759 parser = ListWrappedDeclarativeInputParser( 

760 parser, 

761 inline_reference_documentation=inline_reference_documentation, 

762 expected_debputy_integration_mode=expected_debputy_integration_mode, 

763 ) 

764 return parser 

765 

766 if not is_typeddict(parsed_content): 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 raise ValueError( 

768 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

769 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

770 ) 

771 if is_list_wrapped and source_content is not None: 

772 if get_origin(source_content) != list: 772 ↛ 773line 772 didn't jump to line 773 because the condition on line 772 was never true

773 raise ValueError( 

774 "If the parsed_content is a List type, then source_format must be a List type as well." 

775 ) 

776 source_content = get_args(source_content)[0] 

777 

778 target_attributes = self._parse_types( 

779 parsed_content, 

780 allow_source_attribute_annotations=source_content is None, 

781 forbid_optional=not allow_optional, 

782 ) 

783 required_target_parameters = frozenset(parsed_content.__required_keys__) 

784 parsed_alt_form = None 

785 non_mapping_source_only = False 

786 

787 if source_content is not None: 

788 default_target_attribute = None 

789 if len(required_target_parameters) == 1: 

790 default_target_attribute = next(iter(required_target_parameters)) 

791 

792 source_typed_dict, alt_source_forms = _extract_typed_dict( 

793 source_content, 

794 default_target_attribute, 

795 ) 

796 if alt_source_forms: 

797 parsed_alt_form = self._parse_alt_form( 

798 alt_source_forms, 

799 default_target_attribute, 

800 ) 

801 if source_typed_dict is not None: 

802 source_content_attributes = self._parse_types( 

803 source_typed_dict, 

804 allow_target_attribute_annotation=True, 

805 allow_source_attribute_annotations=True, 

806 forbid_optional=not allow_optional, 

807 ) 

808 source_content_parameter = "source_content" 

809 source_and_parsed_differs = True 

810 else: 

811 source_typed_dict = parsed_content 

812 source_content_attributes = target_attributes 

813 source_content_parameter = "parsed_content" 

814 source_and_parsed_differs = True 

815 non_mapping_source_only = True 

816 else: 

817 source_typed_dict = parsed_content 

818 source_content_attributes = target_attributes 

819 source_content_parameter = "parsed_content" 

820 source_and_parsed_differs = False 

821 

822 sources = collections.defaultdict(set) 

823 seen_targets = set() 

824 seen_source_names: dict[str, str] = {} 

825 source_attributes: dict[str, AttributeDescription] = {} 

826 path_hint_source_attributes = [] 

827 

828 for k in source_content_attributes: 

829 ia = source_content_attributes[k] 

830 

831 ta = ( 

832 target_attributes.get(ia.target_attribute) 

833 if source_and_parsed_differs 

834 else ia 

835 ) 

836 if ta is None: 836 ↛ 838line 836 didn't jump to line 838 because the condition on line 836 was never true

837 # Error message would be wrong if this assertion is false. 

838 assert source_and_parsed_differs 

839 raise ValueError( 

840 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

841 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

842 ) 

843 if _is_path_attribute_candidate(ia, ta): 

844 path_hint_source_attributes.append(ia.source_attribute_name) 

845 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

846 if existing_source_name: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true

847 raise ValueError( 

848 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

849 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

850 f' so only one attribute use "{ia.source_attribute_name}".' 

851 ) 

852 seen_source_names[ia.source_attribute_name] = k 

853 seen_targets.add(ta.target_attribute) 

854 sources[ia.target_attribute].add(k) 

855 if source_and_parsed_differs: 

856 bridge_mapper = self._type_normalize( 

857 k, ia.attribute_type, ta.attribute_type, False 

858 ) 

859 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

860 source_attributes[k] = ia 

861 

862 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]: 

863 return frozenset( 

864 source_content_attributes[a].source_attribute_name for a in td_name 

865 ) 

866 

867 _check_attributes( 

868 parsed_content, 

869 source_typed_dict, 

870 source_content_attributes, 

871 sources, 

872 ) 

873 

874 at_least_one_of = frozenset( 

875 _as_attr_names(g) 

876 for k, g in sources.items() 

877 if len(g) > 1 and k in required_target_parameters 

878 ) 

879 

880 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 880 ↛ 881line 880 didn't jump to line 881 because the condition on line 880 was never true

881 missing = ", ".join( 

882 repr(k) for k in (target_attributes.keys() - seen_targets) 

883 ) 

884 raise ValueError( 

885 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

886 f" {missing}" 

887 ) 

888 all_mutually_exclusive_fields = frozenset( 

889 _as_attr_names(g) for g in sources.values() if len(g) > 1 

890 ) 

891 

892 all_parameters = ( 

893 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

894 ) 

895 _check_conflicts( 

896 source_content_attributes, 

897 source_typed_dict.__required_keys__, 

898 all_parameters, 

899 ) 

900 

901 manifest_attributes = { 

902 a.source_attribute_name: a for a in source_content_attributes.values() 

903 } 

904 

905 if parsed_alt_form is not None: 

906 target_attribute = parsed_alt_form.target_attribute 

907 if ( 907 ↛ 912line 907 didn't jump to line 912 because the condition on line 907 was never true

908 target_attribute not in required_target_parameters 

909 and required_target_parameters 

910 or len(required_target_parameters) > 1 

911 ): 

912 raise NotImplementedError( 

913 "When using alternative source formats (Union[TypedDict, ...]), then the" 

914 " target must have at most one require parameter" 

915 ) 

916 bridge_mapper = self._type_normalize( 

917 target_attribute, 

918 parsed_alt_form.attribute_type, 

919 target_attributes[target_attribute].attribute_type, 

920 False, 

921 ) 

922 parsed_alt_form.type_validator = ( 

923 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

924 ) 

925 

926 inline_reference_documentation = ( 

927 _verify_and_auto_correct_inline_reference_documentation( 

928 parsed_content, 

929 source_typed_dict, 

930 source_content_attributes, 

931 inline_reference_documentation, 

932 parsed_alt_form is not None, 

933 automatic_docs, 

934 ) 

935 ) 

936 if non_mapping_source_only: 

937 parser = DeclarativeNonMappingInputParser( 

938 assume_not_none(parsed_alt_form), 

939 inline_reference_documentation=inline_reference_documentation, 

940 expected_debputy_integration_mode=expected_debputy_integration_mode, 

941 ) 

942 else: 

943 parser = DeclarativeMappingInputParser( 

944 _as_attr_names(source_typed_dict.__required_keys__), 

945 _as_attr_names(all_parameters), 

946 manifest_attributes, 

947 source_attributes, 

948 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

949 alt_form_parser=parsed_alt_form, 

950 at_least_one_of=at_least_one_of, 

951 inline_reference_documentation=inline_reference_documentation, 

952 path_hint_source_attributes=tuple(path_hint_source_attributes), 

953 expected_debputy_integration_mode=expected_debputy_integration_mode, 

954 ) 

955 if is_list_wrapped: 

956 parser = ListWrappedDeclarativeInputParser( 

957 parser, 

958 expected_debputy_integration_mode=expected_debputy_integration_mode, 

959 ) 

960 return parser 

961 

962 def _as_type_validator( 

963 self, 

964 attribute: str, 

965 provided_type: Any, 

966 parsing_typed_dict_attribute: bool, 

967 ) -> AttributeTypeHandler: 

968 assert not isinstance(provided_type, tuple) 

969 

970 if isinstance(provided_type, type) and issubclass( 

971 provided_type, DebputyDispatchableType 

972 ): 

973 return _dispatch_parser(provided_type) 

974 

975 unmapped_type = self._strip_mapped_types( 

976 provided_type, 

977 parsing_typed_dict_attribute, 

978 ) 

979 type_normalizer = self._type_normalize( 

980 attribute, 

981 unmapped_type, 

982 provided_type, 

983 parsing_typed_dict_attribute, 

984 ) 

985 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type( 

986 unmapped_type, 

987 parsing_typed_dict_attribute, 

988 ) 

989 _, t_provided_orig, t_provided_args = unpack_type( 

990 provided_type, 

991 parsing_typed_dict_attribute, 

992 ) 

993 

994 if ( 994 ↛ 1000line 994 didn't jump to line 1000 because the condition on line 994 was never true

995 t_unmapped_orig == Union 

996 and t_unmapped_args 

997 and len(t_unmapped_args) == 2 

998 and any(v is _NONE_TYPE for v in t_unmapped_args) 

999 ): 

1000 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

1001 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

1002 validator = self._as_type_validator( 

1003 attribute, actual_type, parsing_typed_dict_attribute 

1004 ) 

1005 

1006 def _validator(v: Any, path: AttributePath) -> None: 

1007 if v is None: 

1008 return 

1009 validator.ensure_type(v, path) 

1010 

1011 return AttributeTypeHandler( 

1012 validator.describe_type(), 

1013 _validator, 

1014 base_type=validator.base_type, 

1015 mapper=type_normalizer, 

1016 ) 

1017 

1018 if unmapped_type in BASIC_SIMPLE_TYPES: 

1019 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1020 

1021 type_mapping = self._registered_types.get(provided_type) 

1022 if type_mapping is not None: 

1023 simple_type = f" ({type_name})" 

1024 type_name = type_mapping.target_type.__name__ 

1025 else: 

1026 simple_type = "" 

1027 

1028 def _validator(v: Any, path: AttributePath) -> None: 

1029 if not isinstance(v, unmapped_type): 

1030 _validation_type_error( 

1031 path, f"The attribute must be a {type_name}{simple_type}" 

1032 ) 

1033 

1034 return AttributeTypeHandler( 

1035 type_name, 

1036 _validator, 

1037 base_type=unmapped_type, 

1038 mapper=type_normalizer, 

1039 ) 

1040 if t_unmapped_orig == list: 

1041 if not t_unmapped_args: 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true

1042 raise ValueError( 

1043 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1044 ) 

1045 

1046 genetic_type = t_unmapped_args[0] 

1047 key_mapper = self._as_type_validator( 

1048 attribute, 

1049 genetic_type, 

1050 parsing_typed_dict_attribute, 

1051 ) 

1052 

1053 def _validator(v: Any, path: AttributePath) -> None: 

1054 if not isinstance(v, list): 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 _validation_type_error(path, "The attribute must be a list") 

1056 for i, v in enumerate(v): 

1057 key_mapper.ensure_type(v, path[i]) 

1058 

1059 list_mapper = ( 

1060 map_each_element(key_mapper.mapper) 

1061 if key_mapper.mapper is not None 

1062 else None 

1063 ) 

1064 

1065 return AttributeTypeHandler( 

1066 f"List of {key_mapper.describe_type()}", 

1067 _validator, 

1068 base_type=list, 

1069 mapper=type_normalizer, 

1070 ).combine_mapper(list_mapper) 

1071 if is_typeddict(provided_type): 

1072 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1073 return AttributeTypeHandler( 

1074 description=f"{provided_type.__name__} (Typed Mapping)", 

1075 ensure_type=lambda v, ap: None, 

1076 base_type=dict, 

1077 mapper=lambda v, ap, cv: subparser.parse_input( 

1078 v, ap, parser_context=cv 

1079 ), 

1080 ) 

1081 if t_unmapped_orig == dict: 

1082 if not t_unmapped_args or len(t_unmapped_args) != 2: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true

1083 raise ValueError( 

1084 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1085 ) 

1086 if t_unmapped_args[0] != str: 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true

1087 raise ValueError( 

1088 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1089 " Currently, only `str` is supported (Dict[str, Y])" 

1090 ) 

1091 key_mapper = self._as_type_validator( 

1092 attribute, 

1093 t_unmapped_args[0], 

1094 parsing_typed_dict_attribute, 

1095 ) 

1096 value_mapper = self._as_type_validator( 

1097 attribute, 

1098 t_unmapped_args[1], 

1099 parsing_typed_dict_attribute, 

1100 ) 

1101 

1102 if key_mapper.base_type is None: 1102 ↛ 1103line 1102 didn't jump to line 1103 because the condition on line 1102 was never true

1103 raise ValueError( 

1104 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1105 f" without trivial base types (such as `str`) are not supported at the moment." 

1106 ) 

1107 

1108 if value_mapper.mapper is not None: 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true

1109 raise ValueError( 

1110 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1111 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1112 " Better typing may come later" 

1113 ) 

1114 

1115 def _validator(uv: Any, path: AttributePath) -> None: 

1116 if not isinstance(uv, dict): 1116 ↛ 1117line 1116 didn't jump to line 1117 because the condition on line 1116 was never true

1117 _validation_type_error(path, "The attribute must be a mapping") 

1118 key_name = "the first key in the mapping" 

1119 for i, (k, v) in enumerate(uv.items()): 

1120 if not key_mapper.base_type_match(k): 1120 ↛ 1121line 1120 didn't jump to line 1121 because the condition on line 1120 was never true

1121 kp = path.copy_with_path_hint(key_name) 

1122 _validation_type_error( 

1123 kp, 

1124 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1125 ) 

1126 key_name = f"the key after {k}" 

1127 value_mapper.ensure_type(v, path[k]) 

1128 

1129 return AttributeTypeHandler( 

1130 f"Mapping of {value_mapper.describe_type()}", 

1131 _validator, 

1132 base_type=dict, 

1133 mapper=type_normalizer, 

1134 ).combine_mapper(key_mapper.mapper) 

1135 if t_unmapped_orig in (Union, UnionType): 

1136 if _is_two_arg_x_list_x(t_provided_args): 

1137 # Force the order to be "X, List[X]" as it simplifies the code 

1138 x_list_x = ( 

1139 t_provided_args 

1140 if get_origin(t_provided_args[1]) == list 

1141 else (t_provided_args[1], t_provided_args[0]) 

1142 ) 

1143 

1144 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1145 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1146 # case to avoid this assert and fall into the "generic case". 

1147 assert get_origin(x_list_x[0]) != list 

1148 x_subtype_checker = self._as_type_validator( 

1149 attribute, 

1150 x_list_x[0], 

1151 parsing_typed_dict_attribute, 

1152 ) 

1153 list_x_subtype_checker = self._as_type_validator( 

1154 attribute, 

1155 x_list_x[1], 

1156 parsing_typed_dict_attribute, 

1157 ) 

1158 type_description = x_subtype_checker.describe_type() 

1159 type_description = f"{type_description} or a list of {type_description}" 

1160 

1161 def _validator(v: Any, path: AttributePath) -> None: 

1162 if isinstance(v, list): 

1163 list_x_subtype_checker.ensure_type(v, path) 

1164 else: 

1165 x_subtype_checker.ensure_type(v, path) 

1166 

1167 return AttributeTypeHandler( 

1168 type_description, 

1169 _validator, 

1170 mapper=type_normalizer, 

1171 ) 

1172 else: 

1173 subtype_checker = [ 

1174 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1175 for a in t_unmapped_args 

1176 ] 

1177 type_description = "one-of: " + ", ".join( 

1178 f"{sc.describe_type()}" for sc in subtype_checker 

1179 ) 

1180 mapper = subtype_checker[0].mapper 

1181 if any(mapper != sc.mapper for sc in subtype_checker): 1181 ↛ 1182line 1181 didn't jump to line 1182 because the condition on line 1181 was never true

1182 raise ValueError( 

1183 f'Cannot handle the union "{provided_type}" as the target types need different' 

1184 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1185 " where X is a non-collection type." 

1186 ) 

1187 

1188 def _validator(v: Any, path: AttributePath) -> None: 

1189 partial_matches = [] 

1190 for sc in subtype_checker: 1190 ↛ 1198line 1190 didn't jump to line 1198 because the loop on line 1190 didn't complete

1191 try: 

1192 sc.ensure_type(v, path) 

1193 return 

1194 except ManifestParseException as e: 

1195 if sc.base_type_match(v): 1195 ↛ 1196line 1195 didn't jump to line 1196 because the condition on line 1195 was never true

1196 partial_matches.append((sc, e)) 

1197 

1198 if len(partial_matches) == 1: 

1199 raise partial_matches[0][1] 

1200 _validation_type_error( 

1201 path, f"Could not match against: {type_description}" 

1202 ) 

1203 

1204 return AttributeTypeHandler( 

1205 type_description, 

1206 _validator, 

1207 mapper=type_normalizer, 

1208 ) 

1209 if t_unmapped_orig == Literal: 

1210 # We want "x" for string values; repr provides 'x' 

1211 pretty = ", ".join( 

1212 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args 

1213 ) 

1214 

1215 def _validator(v: Any, path: AttributePath) -> None: 

1216 if v not in t_unmapped_args: 

1217 value_hint = "" 

1218 if isinstance(v, str): 1218 ↛ 1220line 1218 didn't jump to line 1220 because the condition on line 1218 was always true

1219 value_hint = f"({v}) " 

1220 _validation_type_error( 

1221 path, 

1222 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1223 ) 

1224 

1225 return AttributeTypeHandler( 

1226 f"One of the following literal values: {pretty}", 

1227 _validator, 

1228 ) 

1229 

1230 if provided_type == Any: 1230 ↛ 1235line 1230 didn't jump to line 1235 because the condition on line 1230 was always true

1231 return AttributeTypeHandler( 

1232 "any (unvalidated)", 

1233 lambda *a: None, 

1234 ) 

1235 raise ValueError( 

1236 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1237 ) 

1238 

1239 def _parse_types( 

1240 self, 

1241 spec: type[TypedDict], 

1242 allow_target_attribute_annotation: bool = False, 

1243 allow_source_attribute_annotations: bool = False, 

1244 forbid_optional: bool = True, 

1245 ) -> dict[str, AttributeDescription]: 

1246 annotations = get_type_hints(spec, include_extras=True) 

1247 return { 

1248 k: self._attribute_description( 

1249 k, 

1250 t, 

1251 k in spec.__required_keys__, 

1252 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1253 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1254 forbid_optional=forbid_optional, 

1255 ) 

1256 for k, t in annotations.items() 

1257 } 

1258 

1259 def _attribute_description( 

1260 self, 

1261 attribute: str, 

1262 orig_td: Any, 

1263 is_required: bool, 

1264 forbid_optional: bool = True, 

1265 allow_target_attribute_annotation: bool = False, 

1266 allow_source_attribute_annotations: bool = False, 

1267 ) -> AttributeDescription: 

1268 td, anno, is_optional = _parse_type( 

1269 attribute, orig_td, forbid_optional=forbid_optional 

1270 ) 

1271 type_validator = self._as_type_validator(attribute, td, True) 

1272 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1273 anno, 

1274 f' Seen with attribute "{attribute}".', 

1275 attribute, 

1276 is_required, 

1277 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1278 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1279 ) 

1280 return AttributeDescription( 

1281 target_attribute=parsed_annotations.target_attribute, 

1282 attribute_type=td, 

1283 type_validator=type_validator, 

1284 annotations=anno, 

1285 is_optional=is_optional, 

1286 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1287 conditional_required=parsed_annotations.conditional_required, 

1288 source_attribute_name=assume_not_none( 

1289 parsed_annotations.source_manifest_attribute 

1290 ), 

1291 parse_hints=parsed_annotations, 

1292 ) 

1293 

1294 def _parse_alt_form( 

1295 self, 

1296 alt_form, 

1297 default_target_attribute: str | None, 

1298 ) -> AttributeDescription: 

1299 td, anno, is_optional = _parse_type( 

1300 "source_format alternative form", 

1301 alt_form, 

1302 forbid_optional=True, 

1303 parsing_typed_dict_attribute=False, 

1304 ) 

1305 type_validator = self._as_type_validator( 

1306 "source_format alternative form", 

1307 td, 

1308 True, 

1309 ) 

1310 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1311 anno, 

1312 " The alternative for source_format.", 

1313 None, 

1314 False, 

1315 default_target_attribute=default_target_attribute, 

1316 allow_target_attribute_annotation=True, 

1317 allow_source_attribute_annotations=False, 

1318 ) 

1319 return AttributeDescription( 

1320 target_attribute=parsed_annotations.target_attribute, 

1321 attribute_type=td, 

1322 type_validator=type_validator, 

1323 annotations=anno, 

1324 is_optional=is_optional, 

1325 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1326 conditional_required=parsed_annotations.conditional_required, 

1327 source_attribute_name="Alt form of the source_format", 

1328 ) 

1329 

1330 def _union_narrowing( 

1331 self, 

1332 input_type: Any, 

1333 target_type: Any, 

1334 parsing_typed_dict_attribute: bool, 

1335 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1336 _, input_orig, input_args = unpack_type( 

1337 input_type, parsing_typed_dict_attribute 

1338 ) 

1339 _, target_orig, target_args = unpack_type( 

1340 target_type, parsing_typed_dict_attribute 

1341 ) 

1342 

1343 if input_orig not in (Union, UnionType) or not input_args: 1343 ↛ 1344line 1343 didn't jump to line 1344 because the condition on line 1343 was never true

1344 raise ValueError("input_type must be a Union[...] with non-empty args") 

1345 

1346 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1347 # - Where X = Y or there is a simple standard transformation from X to Y. 

1348 

1349 if target_orig not in (Union, UnionType, list) or not target_args: 

1350 # Not supported 

1351 return None 

1352 

1353 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1353 ↛ 1355line 1353 didn't jump to line 1355 because the condition on line 1353 was never true

1354 # Not needed (identity mapping) 

1355 return None 

1356 

1357 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1357 ↛ 1359line 1357 didn't jump to line 1359 because the condition on line 1357 was never true

1358 # Not supported 

1359 return None 

1360 

1361 target_arg = target_args[0] 

1362 simplified_type = self._strip_mapped_types( 

1363 target_arg, parsing_typed_dict_attribute 

1364 ) 

1365 acceptable_types = { 

1366 target_arg, 

1367 list[target_arg], # type: ignore 

1368 List[target_arg], # type: ignore 

1369 simplified_type, 

1370 list[simplified_type], # type: ignore 

1371 List[simplified_type], # type: ignore 

1372 } 

1373 target_format = ( 

1374 target_arg, 

1375 list[target_arg], # type: ignore 

1376 List[target_arg], # type: ignore 

1377 ) 

1378 in_target_format = 0 

1379 in_simple_format = 0 

1380 for input_arg in input_args: 

1381 if input_arg not in acceptable_types: 1381 ↛ 1383line 1381 didn't jump to line 1383 because the condition on line 1381 was never true

1382 # Not supported 

1383 return None 

1384 if input_arg in target_format: 

1385 in_target_format += 1 

1386 else: 

1387 in_simple_format += 1 

1388 

1389 assert in_simple_format or in_target_format 

1390 

1391 if in_target_format and not in_simple_format: 

1392 # Union[X, List[X]] -> List[X] 

1393 return normalize_into_list 

1394 mapped = self._registered_types[target_arg] 

1395 if not in_target_format and in_simple_format: 1395 ↛ 1410line 1395 didn't jump to line 1410 because the condition on line 1395 was always true

1396 # Union[X, List[X]] -> List[Y] 

1397 

1398 def _mapper_x_list_y( 

1399 x: Any | list[Any], 

1400 ap: AttributePath, 

1401 pc: Optional["ParserContextData"], 

1402 ) -> list[Any]: 

1403 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1404 

1405 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1406 

1407 return _mapper_x_list_y 

1408 

1409 # Union[Y, List[X]] -> List[Y] 

1410 if not isinstance(target_arg, type): 

1411 raise ValueError( 

1412 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1413 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1414 f" in the source content (but both a mix of both)" 

1415 ) 

1416 

1417 def _mapper_mixed_list_y( 

1418 x: Any | list[Any], 

1419 ap: AttributePath, 

1420 pc: Optional["ParserContextData"], 

1421 ) -> list[Any]: 

1422 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1423 

1424 return [ 

1425 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1426 for x in in_list_form 

1427 ] 

1428 

1429 return _mapper_mixed_list_y 

1430 

1431 def _type_normalize( 

1432 self, 

1433 attribute: str, 

1434 input_type: Any, 

1435 target_type: Any, 

1436 parsing_typed_dict_attribute: bool, 

1437 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1438 if input_type == target_type: 

1439 return None 

1440 _, input_orig, input_args = unpack_type( 

1441 input_type, parsing_typed_dict_attribute 

1442 ) 

1443 _, target_orig, target_args = unpack_type( 

1444 target_type, 

1445 parsing_typed_dict_attribute, 

1446 ) 

1447 if input_orig in (Union, UnionType): 

1448 result = self._union_narrowing( 

1449 input_type, target_type, parsing_typed_dict_attribute 

1450 ) 

1451 if result: 

1452 return result 

1453 elif target_orig == list and target_args[0] == input_type: 

1454 return wrap_into_list 

1455 

1456 mapped = self._registered_types.get(target_type) 

1457 if mapped is not None and input_type == mapped.source_type: 

1458 # Source -> Target 

1459 return mapped.mapper 

1460 if target_orig == list and target_args: 1460 ↛ 1478line 1460 didn't jump to line 1478 because the condition on line 1460 was always true

1461 mapped = self._registered_types.get(target_args[0]) 

1462 if mapped is not None: 1462 ↛ 1478line 1462 didn't jump to line 1478 because the condition on line 1462 was always true

1463 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1464 mapped_type: TypeMapping = mapped 

1465 if input_type == mapped.source_type: 1465 ↛ 1467line 1465 didn't jump to line 1467 because the condition on line 1465 was never true

1466 # Source -> List[Target] 

1467 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1468 if ( 1468 ↛ 1478line 1468 didn't jump to line 1478 because the condition on line 1468 was always true

1469 input_orig == list 

1470 and input_args 

1471 and input_args[0] == mapped_type.source_type 

1472 ): 

1473 # List[Source] -> List[Target] 

1474 return lambda xs, ap, pc: [ 

1475 mapped_type.mapper(x, ap, pc) for x in xs 

1476 ] 

1477 

1478 raise ValueError( 

1479 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1480 f" {input_type} to {target_type}" 

1481 ) 

1482 

1483 def _strip_mapped_types( 

1484 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1485 ) -> Any: 

1486 m = self._registered_types.get(orig_td) 

1487 if m is not None: 

1488 return m.source_type 

1489 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1490 if v == list: 

1491 arg = args[0] 

1492 m = self._registered_types.get(arg) 

1493 if m: 

1494 return list[m.source_type] # type: ignore 

1495 if v in (Union, UnionType): 

1496 stripped_args = tuple( 

1497 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1498 ) 

1499 if stripped_args != args: 

1500 return Union[stripped_args] 

1501 return orig_td 

1502 

1503 

1504def _sort_key(attr: StandardParserAttributeDocumentation) -> Any: 

1505 key = next(iter(attr.attributes)) 

1506 return attr.sort_category, key 

1507 

1508 

1509def _apply_std_docs( 

1510 std_doc_table: ( 

1511 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1512 ), 

1513 source_format_typed_dict: type[Any], 

1514 attribute_docs: Sequence[ParserAttributeDocumentation] | None, 

1515) -> Sequence[ParserAttributeDocumentation] | None: 

1516 if std_doc_table is None or not std_doc_table: 1516 ↛ 1519line 1516 didn't jump to line 1519 because the condition on line 1516 was always true

1517 return attribute_docs 

1518 

1519 has_docs_for = set() 

1520 if attribute_docs: 

1521 for attribute_doc in attribute_docs: 

1522 has_docs_for.update(attribute_doc.attributes) 

1523 

1524 base_seen = set() 

1525 std_docs_used = [] 

1526 

1527 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", [])) 

1528 base_seen.update(remaining_bases) 

1529 while remaining_bases: 

1530 base = remaining_bases.pop() 

1531 new_bases_to_check = { 

1532 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen 

1533 } 

1534 remaining_bases.update(new_bases_to_check) 

1535 base_seen.update(new_bases_to_check) 

1536 std_docs = std_doc_table.get(base) 

1537 if std_docs: 

1538 for std_doc in std_docs: 

1539 if any(a in has_docs_for for a in std_doc.attributes): 

1540 # If there is any overlap, do not add the docs 

1541 continue 

1542 has_docs_for.update(std_doc.attributes) 

1543 std_docs_used.append(std_doc) 

1544 

1545 if not std_docs_used: 

1546 return attribute_docs 

1547 docs = sorted(std_docs_used, key=_sort_key) 

1548 if attribute_docs: 

1549 # Plugin provided attributes first 

1550 c = list(attribute_docs) 

1551 c.extend(docs) 

1552 docs = c 

1553 return tuple(docs) 

1554 

1555 

1556def _verify_and_auto_correct_inline_reference_documentation( 

1557 parsed_content: type[TD], 

1558 source_typed_dict: type[Any], 

1559 source_content_attributes: Mapping[str, AttributeDescription], 

1560 inline_reference_documentation: ParserDocumentation | None, 

1561 has_alt_form: bool, 

1562 automatic_docs: ( 

1563 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1564 ) = None, 

1565) -> ParserDocumentation | None: 

1566 orig_attribute_docs = ( 

1567 inline_reference_documentation.attribute_doc 

1568 if inline_reference_documentation 

1569 else None 

1570 ) 

1571 attribute_docs = _apply_std_docs( 

1572 automatic_docs, 

1573 source_typed_dict, 

1574 orig_attribute_docs, 

1575 ) 

1576 if inline_reference_documentation is None and attribute_docs is None: 

1577 return None 

1578 changes = {} 

1579 if attribute_docs: 

1580 seen = set() 

1581 had_any_custom_docs = False 

1582 for attr_doc in attribute_docs: 

1583 if not isinstance(attr_doc, StandardParserAttributeDocumentation): 

1584 had_any_custom_docs = True 

1585 for attr_name in attr_doc.attributes: 

1586 attr = source_content_attributes.get(attr_name) 

1587 if attr is None: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true

1588 raise ValueError( 

1589 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1590 f' references an attribute "{attr_name}", which does not exist in the source format.' 

1591 ) 

1592 if attr_name in seen: 1592 ↛ 1593line 1592 didn't jump to line 1593 because the condition on line 1592 was never true

1593 raise ValueError( 

1594 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1595 f' has documentation for "{attr_name}" twice, which is not supported.' 

1596 f" Please document it at most once" 

1597 ) 

1598 seen.add(attr_name) 

1599 undocumented = source_content_attributes.keys() - seen 

1600 if undocumented: 1600 ↛ 1601line 1600 didn't jump to line 1601 because the condition on line 1600 was never true

1601 if had_any_custom_docs: 

1602 undocumented_attrs = ", ".join(undocumented) 

1603 raise ValueError( 

1604 f"The following attributes were not documented for the source format of" 

1605 f" {parsed_content.__qualname__}. If this is deliberate, then please" 

1606 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1607 f" {undocumented_attrs}" 

1608 ) 

1609 combined_docs = list(attribute_docs) 

1610 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented)) 

1611 attribute_docs = combined_docs 

1612 

1613 if attribute_docs and orig_attribute_docs != attribute_docs: 1613 ↛ 1614line 1613 didn't jump to line 1614 because the condition on line 1613 was never true

1614 assert attribute_docs is not None 

1615 changes["attribute_doc"] = tuple(attribute_docs) 

1616 

1617 if ( 1617 ↛ 1622line 1617 didn't jump to line 1622 because the condition on line 1617 was never true

1618 inline_reference_documentation is not None 

1619 and inline_reference_documentation.alt_parser_description 

1620 and not has_alt_form 

1621 ): 

1622 raise ValueError( 

1623 "The inline_reference_documentation had documentation for an non-mapping format," 

1624 " but the source format does not have a non-mapping format." 

1625 ) 

1626 if changes: 1626 ↛ 1627line 1626 didn't jump to line 1627 because the condition on line 1626 was never true

1627 if inline_reference_documentation is None: 

1628 inline_reference_documentation = reference_documentation() 

1629 return inline_reference_documentation.replace(**changes) 

1630 return inline_reference_documentation 

1631 

1632 

1633def _check_conflicts( 

1634 input_content_attributes: dict[str, AttributeDescription], 

1635 required_attributes: frozenset[str], 

1636 all_attributes: frozenset[str], 

1637) -> None: 

1638 for attr_name, attr in input_content_attributes.items(): 

1639 if attr_name in required_attributes and attr.conflicting_attributes: 1639 ↛ 1640line 1639 didn't jump to line 1640 because the condition on line 1639 was never true

1640 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1641 raise ValueError( 

1642 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1643 " This makes it impossible to use these attributes. Either remove the attributes" 

1644 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1645 " optional (NotRequired)" 

1646 ) 

1647 else: 

1648 required_conflicts = attr.conflicting_attributes & required_attributes 

1649 if required_conflicts: 1649 ↛ 1650line 1649 didn't jump to line 1650 because the condition on line 1649 was never true

1650 c = ", ".join(repr(a) for a in required_conflicts) 

1651 raise ValueError( 

1652 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1653 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1654 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1655 ) 

1656 unknown_attributes = attr.conflicting_attributes - all_attributes 

1657 if unknown_attributes: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true

1658 c = ", ".join(repr(a) for a in unknown_attributes) 

1659 raise ValueError( 

1660 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1661 f" None of these attributes were declared in the input." 

1662 ) 

1663 

1664 

1665def _check_attributes( 

1666 content: type[TypedDict], 

1667 input_content: type[TypedDict], 

1668 input_content_attributes: dict[str, AttributeDescription], 

1669 sources: Mapping[str, Collection[str]], 

1670) -> None: 

1671 target_required_keys = content.__required_keys__ 

1672 input_required_keys = input_content.__required_keys__ 

1673 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1674 

1675 for input_name in all_input_keys: 

1676 attr = input_content_attributes[input_name] 

1677 target_name = attr.target_attribute 

1678 source_names = sources[target_name] 

1679 input_is_required = input_name in input_required_keys 

1680 target_is_required = target_name in target_required_keys 

1681 

1682 assert source_names 

1683 

1684 if input_is_required and len(source_names) > 1: 1684 ↛ 1685line 1684 didn't jump to line 1685 because the condition on line 1684 was never true

1685 raise ValueError( 

1686 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1687 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1688 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1689 f' "{input_name}" might be missing a NotRequired type' 

1690 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1691 ) 

1692 if not input_is_required and target_is_required and len(source_names) == 1: 1692 ↛ 1693line 1692 didn't jump to line 1693 because the condition on line 1692 was never true

1693 raise ValueError( 

1694 f'The source attribute "{input_name}" is not marked as required and maps to' 

1695 f' "{target_name}", which is marked as required. As there are no other attributes' 

1696 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1697 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1698 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1699 ) 

1700 

1701 

1702def _validation_type_error(path: AttributePath, message: str) -> None: 

1703 raise ManifestParseException( 

1704 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1705 ) 

1706 

1707 

1708def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool: 

1709 if len(t_args) != 2: 

1710 return False 

1711 lhs, rhs = t_args 

1712 if get_origin(lhs) == list: 

1713 if get_origin(rhs) == list: 1713 ↛ 1716line 1713 didn't jump to line 1716 because the condition on line 1713 was never true

1714 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1715 # does not support it. 

1716 return False 

1717 l_args = get_args(lhs) 

1718 return bool(l_args and l_args[0] == rhs) 

1719 if get_origin(rhs) == list: 

1720 r_args = get_args(rhs) 

1721 return bool(r_args and r_args[0] == lhs) 

1722 return False 

1723 

1724 

1725def _extract_typed_dict( 

1726 base_type, 

1727 default_target_attribute: str | None, 

1728) -> tuple[type[TypedDict] | None, Any]: 

1729 if is_typeddict(base_type): 

1730 return base_type, None 

1731 _, origin, args = unpack_type(base_type, False) 

1732 if origin != Union: 

1733 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1733 ↛ 1734line 1733 didn't jump to line 1734 because the condition on line 1733 was never true

1734 raise ValueError( 

1735 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1736 ) 

1737 return None, base_type 

1738 typed_dicts = [x for x in args if is_typeddict(x)] 

1739 if len(typed_dicts) > 1: 1739 ↛ 1740line 1739 didn't jump to line 1740 because the condition on line 1739 was never true

1740 raise ValueError( 

1741 "When source_format is a Union, it must contain at most one TypedDict" 

1742 ) 

1743 typed_dict = typed_dicts[0] if typed_dicts else None 

1744 

1745 if any(x is None or x is _NONE_TYPE for x in args): 1745 ↛ 1746line 1745 didn't jump to line 1746 because the condition on line 1745 was never true

1746 raise ValueError( 

1747 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1748 ) 

1749 

1750 if any( 1750 ↛ 1755line 1750 didn't jump to line 1755 because the condition on line 1750 was never true

1751 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1752 for x in args 

1753 if x is not typed_dict 

1754 ): 

1755 raise ValueError( 

1756 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1757 ) 

1758 remaining = [x for x in args if x is not typed_dict] 

1759 has_target_attribute = False 

1760 anno = None 

1761 if len(remaining) == 1: 1761 ↛ 1762line 1761 didn't jump to line 1762 because the condition on line 1761 was never true

1762 base_type, anno, _ = _parse_type( 

1763 "source_format alternative form", 

1764 remaining[0], 

1765 forbid_optional=True, 

1766 parsing_typed_dict_attribute=False, 

1767 ) 

1768 has_target_attribute = bool(anno) and any( 

1769 isinstance(x, TargetAttribute) for x in anno 

1770 ) 

1771 target_type = base_type 

1772 else: 

1773 target_type = Union[tuple(remaining)] 

1774 

1775 if default_target_attribute is None and not has_target_attribute: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true

1776 raise ValueError( 

1777 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1778 " OR the parsed_content format must have exactly one attribute that is required." 

1779 ) 

1780 if anno: 1780 ↛ 1781line 1780 didn't jump to line 1781 because the condition on line 1780 was never true

1781 final_anno = [target_type] 

1782 final_anno.extend(anno) 

1783 return typed_dict, Annotated[tuple(final_anno)] 

1784 return typed_dict, target_type 

1785 

1786 

1787def _dispatch_parse_generator( 

1788 dispatch_type: type[DebputyDispatchableType], 

1789) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1790 def _dispatch_parse( 

1791 value: Any, 

1792 attribute_path: AttributePath, 

1793 parser_context: Optional["ParserContextData"], 

1794 ): 

1795 assert parser_context is not None 

1796 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1797 return dispatching_parser.parse_input( 

1798 value, attribute_path, parser_context=parser_context 

1799 ) 

1800 

1801 return _dispatch_parse 

1802 

1803 

1804def _dispatch_parser( 

1805 dispatch_type: type[DebputyDispatchableType], 

1806) -> AttributeTypeHandler: 

1807 return AttributeTypeHandler( 

1808 dispatch_type.__name__, 

1809 lambda *a: None, 

1810 mapper=_dispatch_parse_generator(dispatch_type), 

1811 ) 

1812 

1813 

1814def _parse_type( 

1815 attribute: str, 

1816 orig_td: Any, 

1817 forbid_optional: bool = True, 

1818 parsing_typed_dict_attribute: bool = True, 

1819) -> tuple[Any, tuple[Any, ...], bool]: 

1820 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1821 md: tuple[Any, ...] = tuple() 

1822 optional = False 

1823 if v is not None: 

1824 if v == Annotated: 

1825 anno = get_args(td) 

1826 md = anno[1:] 

1827 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1828 

1829 if td is _NONE_TYPE: 1829 ↛ 1830line 1829 didn't jump to line 1830 because the condition on line 1829 was never true

1830 raise ValueError( 

1831 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1832 " debputy manifest, so this attribute does not make sense in its current form." 

1833 ) 

1834 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1834 ↛ 1835line 1834 didn't jump to line 1835 because the condition on line 1834 was never true

1835 raise ValueError( 

1836 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1837 " Please use NotRequired for optional fields" 

1838 ) 

1839 

1840 return td, md, optional 

1841 

1842 

1843def _normalize_attribute_name(attribute: str) -> str: 

1844 if attribute.endswith("_"): 

1845 attribute = attribute[:-1] 

1846 return attribute.replace("_", "-") 

1847 

1848 

1849@dataclasses.dataclass 

1850class DetectedDebputyParseHint: 

1851 target_attribute: str 

1852 source_manifest_attribute: str | None 

1853 conflict_with_source_attributes: frozenset[str] 

1854 conditional_required: ConditionalRequired | None 

1855 applicable_as_path_hint: bool 

1856 

1857 @classmethod 

1858 def parse_annotations( 

1859 cls, 

1860 anno: tuple[Any, ...], 

1861 error_context: str, 

1862 default_attribute_name: str | None, 

1863 is_required: bool, 

1864 default_target_attribute: str | None = None, 

1865 allow_target_attribute_annotation: bool = False, 

1866 allow_source_attribute_annotations: bool = False, 

1867 ) -> "DetectedDebputyParseHint": 

1868 target_attr_anno = find_annotation(anno, TargetAttribute) 

1869 if target_attr_anno: 

1870 if not allow_target_attribute_annotation: 1870 ↛ 1871line 1870 didn't jump to line 1871 because the condition on line 1870 was never true

1871 raise ValueError( 

1872 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1873 ) 

1874 target_attribute = target_attr_anno.attribute 

1875 elif default_target_attribute is not None: 

1876 target_attribute = default_target_attribute 

1877 elif default_attribute_name is not None: 1877 ↛ 1880line 1877 didn't jump to line 1880 because the condition on line 1877 was always true

1878 target_attribute = default_attribute_name 

1879 else: 

1880 if default_attribute_name is None: 

1881 raise ValueError( 

1882 "allow_target_attribute_annotation must be True OR " 

1883 "default_attribute_name/default_target_attribute must be not None" 

1884 ) 

1885 raise ValueError( 

1886 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1887 ) 

1888 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1889 _source_attribute_allowed( 

1890 allow_source_attribute_annotations, error_context, source_attribute_anno 

1891 ) 

1892 if source_attribute_anno: 

1893 source_attribute_name = source_attribute_anno.attribute 

1894 elif default_attribute_name is not None: 

1895 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1896 else: 

1897 source_attribute_name = None 

1898 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1899 if mutual_exclusive_with_anno: 

1900 _source_attribute_allowed( 

1901 allow_source_attribute_annotations, 

1902 error_context, 

1903 mutual_exclusive_with_anno, 

1904 ) 

1905 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1906 else: 

1907 conflicting_attributes = frozenset() 

1908 conditional_required = find_annotation(anno, ConditionalRequired) 

1909 

1910 if conditional_required and is_required: 1910 ↛ 1911line 1910 didn't jump to line 1911 because the condition on line 1910 was never true

1911 if default_attribute_name is None: 

1912 raise ValueError( 

1913 f"is_required cannot be True without default_attribute_name being not None" 

1914 ) 

1915 raise ValueError( 

1916 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1917 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1918 ) 

1919 

1920 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1921 applicable_as_path_hint = not_path_hint_anno is None 

1922 

1923 return DetectedDebputyParseHint( 

1924 target_attribute=target_attribute, 

1925 source_manifest_attribute=source_attribute_name, 

1926 conflict_with_source_attributes=conflicting_attributes, 

1927 conditional_required=conditional_required, 

1928 applicable_as_path_hint=applicable_as_path_hint, 

1929 ) 

1930 

1931 

1932def _source_attribute_allowed( 

1933 source_attribute_allowed: bool, 

1934 error_context: str, 

1935 annotation: DebputyParseHint | None, 

1936) -> None: 

1937 if source_attribute_allowed or annotation is None: 1937 ↛ 1939line 1937 didn't jump to line 1939 because the condition on line 1937 was always true

1938 return 

1939 raise ValueError( 

1940 f'The annotation "{annotation}" cannot be used here. {error_context}' 

1941 )