Coverage for src/debputy/manifest_parser/declarative_parser.py: 72%

797 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-04-19 20:37 +0000

1import collections 

2import dataclasses 

3import typing 

4from types import UnionType 

5from typing import ( 

6 Any, 

7 TypedDict, 

8 get_type_hints, 

9 Annotated, 

10 get_args, 

11 get_origin, 

12 TypeVar, 

13 Generic, 

14 Optional, 

15 cast, 

16 Type, 

17 Union, 

18 List, 

19 NotRequired, 

20 Literal, 

21 TYPE_CHECKING, 

22) 

23from collections.abc import Callable, Mapping, Collection, Iterable, Sequence, Container 

24 

25 

26from debputy.manifest_parser.base_types import FileSystemMatchRule 

27from debputy.manifest_parser.exceptions import ( 

28 ManifestParseException, 

29) 

30from debputy.manifest_parser.mapper_code import ( 

31 normalize_into_list, 

32 wrap_into_list, 

33 map_each_element, 

34) 

35from debputy.manifest_parser.parse_hints import ( 

36 ConditionalRequired, 

37 DebputyParseHint, 

38 TargetAttribute, 

39 ManifestAttribute, 

40 ConflictWithSourceAttribute, 

41 NotPathHint, 

42) 

43from debputy.manifest_parser.parser_data import ParserContextData 

44from debputy.manifest_parser.tagging_types import ( 

45 DebputyParsedContent, 

46 DebputyDispatchableType, 

47 TypeMapping, 

48) 

49from debputy.manifest_parser.util import ( 

50 AttributePath, 

51 unpack_type, 

52 find_annotation, 

53 check_integration_mode, 

54) 

55from debputy.plugin.api.impl_types import ( 

56 DeclarativeInputParser, 

57 TD, 

58 ListWrappedDeclarativeInputParser, 

59 DispatchingObjectParser, 

60 DispatchingTableParser, 

61 TTP, 

62 TP, 

63 InPackageContextParser, 

64) 

65from debputy.plugin.api.spec import ( 

66 ParserDocumentation, 

67 DebputyIntegrationMode, 

68 StandardParserAttributeDocumentation, 

69 undocumented_attr, 

70 ParserAttributeDocumentation, 

71 reference_documentation, 

72) 

73from debputy.util import _info, _warn, assume_not_none 

74 

75if TYPE_CHECKING: 

76 from debputy.lsp.diagnostics import LintSeverity 

77 

78 

79try: 

80 from Levenshtein import distance 

81 

82 _WARN_ONCE: bool | None = None 

83except ImportError: 

84 _WARN_ONCE = False 

85 

86 

87def _detect_possible_typo( 

88 key: str, 

89 value: object, 

90 manifest_attributes: Mapping[str, "AttributeDescription"], 

91 path: "AttributePath", 

92) -> None: 

93 global _WARN_ONCE 

94 if _WARN_ONCE == False: 

95 _WARN_ONCE = True 

96 _info( 

97 "Install python3-levenshtein to have debputy try to detect typos in the manifest." 

98 ) 

99 elif _WARN_ONCE is None: 

100 k_len = len(key) 

101 key_path = path[key] 

102 matches: list[str] = [] 

103 current_match_strength = 0 

104 for acceptable_key, attr in manifest_attributes.items(): 

105 if abs(k_len - len(acceptable_key)) > 2: 

106 continue 

107 d = distance(key, acceptable_key) 

108 if d > 2: 

109 continue 

110 try: 

111 attr.type_validator.ensure_type(value, key_path) 

112 except ManifestParseException: 

113 if attr.type_validator.base_type_match(value): 

114 match_strength = 1 

115 else: 

116 match_strength = 0 

117 else: 

118 match_strength = 2 

119 

120 if match_strength < current_match_strength: 

121 continue 

122 if match_strength > current_match_strength: 

123 current_match_strength = match_strength 

124 matches.clear() 

125 matches.append(acceptable_key) 

126 

127 if not matches: 

128 return 

129 ref = f'at "{path.path}"' if path else "at the manifest root level" 

130 if len(matches) == 1: 

131 possible_match = repr(matches[0]) 

132 _warn( 

133 f'Possible typo: The key "{key}" {ref} should probably have been {possible_match}' 

134 ) 

135 else: 

136 matches.sort() 

137 possible_matches = ", ".join(repr(a) for a in matches) 

138 _warn( 

139 f'Possible typo: The key "{key}" {ref} should probably have been one of {possible_matches}' 

140 ) 

141 

142 

143SF = TypeVar("SF") 

144T = TypeVar("T") 

145S = TypeVar("S") 

146 

147 

148_NONE_TYPE = type(None) 

149 

150 

151# These must be able to appear in an "isinstance" check and must be builtin types. 

152BASIC_SIMPLE_TYPES = { 

153 str: "string", 

154 int: "integer", 

155 bool: "boolean", 

156} 

157 

158 

159class AttributeTypeHandler: 

160 __slots__ = ("_description", "_ensure_type", "base_type", "mapper") 

161 

162 def __init__( 

163 self, 

164 description: str, 

165 ensure_type: Callable[[Any, AttributePath], None], 

166 *, 

167 base_type: type[Any] | None = None, 

168 mapper: None | ( 

169 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

170 ) = None, 

171 ) -> None: 

172 self._description = description 

173 self._ensure_type = ensure_type 

174 self.base_type = base_type 

175 self.mapper = mapper 

176 

177 def describe_type(self) -> str: 

178 return self._description 

179 

180 def ensure_type(self, obj: object, path: AttributePath) -> None: 

181 self._ensure_type(obj, path) 

182 

183 def base_type_match(self, obj: object) -> bool: 

184 base_type = self.base_type 

185 return base_type is not None and isinstance(obj, base_type) 

186 

187 def map_type( 

188 self, 

189 value: Any, 

190 path: AttributePath, 

191 parser_context: Optional["ParserContextData"], 

192 ) -> Any: 

193 mapper = self.mapper 

194 if mapper is not None: 

195 return mapper(value, path, parser_context) 

196 return value 

197 

198 def combine_mapper( 

199 self, 

200 mapper: None | ( 

201 Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] 

202 ), 

203 ) -> "AttributeTypeHandler": 

204 if mapper is None: 

205 return self 

206 _combined_mapper: Callable[ 

207 [Any, AttributePath, Optional["ParserContextData"]], Any 

208 ] 

209 if self.mapper is not None: 

210 m = self.mapper 

211 

212 def _combined_mapper( 

213 value: Any, 

214 path: AttributePath, 

215 parser_context: Optional["ParserContextData"], 

216 ) -> Any: 

217 return mapper(m(value, path, parser_context), path, parser_context) 

218 

219 else: 

220 _combined_mapper = mapper 

221 

222 return AttributeTypeHandler( 

223 self._description, 

224 self._ensure_type, 

225 base_type=self.base_type, 

226 mapper=_combined_mapper, 

227 ) 

228 

229 

230@dataclasses.dataclass(slots=True) 

231class AttributeDescription: 

232 source_attribute_name: str 

233 target_attribute: str 

234 attribute_type: Any 

235 type_validator: AttributeTypeHandler 

236 annotations: tuple[Any, ...] 

237 conflicting_attributes: frozenset[str] 

238 conditional_required: Optional["ConditionalRequired"] 

239 parse_hints: Optional["DetectedDebputyParseHint"] = None 

240 is_optional: bool = False 

241 

242 

243def _extract_path_hint(v: Any, attribute_path: AttributePath) -> bool: 

244 if attribute_path.path_hint is not None: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 return True 

246 if isinstance(v, str): 

247 attribute_path.path_hint = v 

248 return True 

249 elif isinstance(v, list) and len(v) > 0 and isinstance(v[0], str): 

250 attribute_path.path_hint = v[0] 

251 return True 

252 return False 

253 

254 

255@dataclasses.dataclass(slots=True, frozen=True) 

256class DeclarativeNonMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

257 alt_form_parser: AttributeDescription 

258 inline_reference_documentation: ParserDocumentation | None = None 

259 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

260 

261 def parse_input( 

262 self, 

263 value: object, 

264 path: AttributePath, 

265 *, 

266 parser_context: Optional["ParserContextData"] = None, 

267 ) -> TD: 

268 check_integration_mode( 

269 path, 

270 parser_context, 

271 self.expected_debputy_integration_mode, 

272 ) 

273 if self.reference_documentation_url is not None: 

274 doc_ref = f" (Documentation: {self.reference_documentation_url})" 

275 else: 

276 doc_ref = "" 

277 

278 alt_form_parser = self.alt_form_parser 

279 if value is None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 form_note = f" The value must have type: {alt_form_parser.type_validator.describe_type()}" 

281 if self.reference_documentation_url is not None: 

282 doc_ref = f" Please see {self.reference_documentation_url} for the documentation." 

283 raise ManifestParseException( 

284 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

285 ) 

286 _extract_path_hint(value, path) 

287 alt_form_parser.type_validator.ensure_type(value, path) 

288 attribute = alt_form_parser.target_attribute 

289 alias_mapping = { 

290 attribute: ("", None), 

291 } 

292 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

293 path.alias_mapping = alias_mapping 

294 return cast("TD", {attribute: v}) 

295 

296 

297@dataclasses.dataclass(slots=True) 

298class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]): 

299 input_time_required_parameters: frozenset[str] 

300 all_parameters: frozenset[str] 

301 manifest_attributes: Mapping[str, "AttributeDescription"] 

302 source_attributes: Mapping[str, "AttributeDescription"] 

303 at_least_one_of: frozenset[frozenset[str]] 

304 alt_form_parser: AttributeDescription | None 

305 mutually_exclusive_attributes: frozenset[frozenset[str]] = frozenset() 

306 _per_attribute_conflicts_cache: Mapping[str, frozenset[str]] | None = None 

307 inline_reference_documentation: ParserDocumentation | None = None 

308 path_hint_source_attributes: Sequence[str] = tuple() 

309 expected_debputy_integration_mode: Container[DebputyIntegrationMode] | None = None 

310 

311 def _parse_alt_form( 

312 self, 

313 value: object, 

314 path: AttributePath, 

315 *, 

316 parser_context: Optional["ParserContextData"] = None, 

317 ) -> TD: 

318 alt_form_parser = self.alt_form_parser 

319 if alt_form_parser is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 raise ManifestParseException( 

321 f"The attribute {path.path} must be a mapping.{self._doc_url_error_suffix()}" 

322 ) 

323 _extract_path_hint(value, path) 

324 alt_form_parser.type_validator.ensure_type(value, path) 

325 assert ( 

326 value is not None 

327 ), "The alternative form was None, but the parser should have rejected None earlier." 

328 attribute = alt_form_parser.target_attribute 

329 alias_mapping = { 

330 attribute: ("", None), 

331 } 

332 v = alt_form_parser.type_validator.map_type(value, path, parser_context) 

333 path.alias_mapping = alias_mapping 

334 return cast("TD", {attribute: v}) 

335 

336 def _validate_expected_keys( 

337 self, 

338 value: dict[Any, Any], 

339 path: AttributePath, 

340 *, 

341 parser_context: Optional["ParserContextData"] = None, 

342 ) -> None: 

343 unknown_keys = value.keys() - self.all_parameters 

344 doc_ref = self._doc_url_error_suffix() 

345 if unknown_keys: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 for k in unknown_keys: 

347 if isinstance(k, str): 

348 _detect_possible_typo(k, value[k], self.manifest_attributes, path) 

349 unused_keys = self.all_parameters - value.keys() 

350 if unused_keys: 

351 k = ", ".join(unused_keys) 

352 raise ManifestParseException( 

353 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Keys that could be used here are: {k}.{doc_ref}' 

354 ) 

355 raise ManifestParseException( 

356 f'Unknown keys "{unknown_keys}" at {path.path_container_lc}". Please remove them.{doc_ref}' 

357 ) 

358 missing_keys = self.input_time_required_parameters - value.keys() 

359 if missing_keys: 

360 required = ", ".join(repr(k) for k in sorted(missing_keys)) 

361 raise ManifestParseException( 

362 f"The following keys were required but not present at {path.path_container_lc}: {required}{doc_ref}" 

363 ) 

364 for maybe_required in self.all_parameters - value.keys(): 

365 attr = self.manifest_attributes[maybe_required] 

366 assert attr.conditional_required is None or parser_context is not None 

367 if ( 367 ↛ 373line 367 didn't jump to line 373 because the condition on line 367 was never true

368 attr.conditional_required is not None 

369 and attr.conditional_required.condition_applies( 

370 assume_not_none(parser_context) 

371 ) 

372 ): 

373 reason = attr.conditional_required.reason 

374 raise ManifestParseException( 

375 f'Missing the *conditionally* required attribute "{maybe_required}" at {path.path_container_lc}. {reason}{doc_ref}' 

376 ) 

377 for keyset in self.at_least_one_of: 

378 matched_keys = value.keys() & keyset 

379 if not matched_keys: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 conditionally_required = ", ".join(repr(k) for k in sorted(keyset)) 

381 raise ManifestParseException( 

382 f"At least one of the following keys must be present at {path.path_container_lc}:" 

383 f" {conditionally_required}{doc_ref}" 

384 ) 

385 for group in self.mutually_exclusive_attributes: 

386 matched = value.keys() & group 

387 if len(matched) > 1: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 ck = ", ".join(repr(k) for k in sorted(matched)) 

389 raise ManifestParseException( 

390 f"Could not parse {path.path_container_lc}: The following attributes are" 

391 f" mutually exclusive: {ck}{doc_ref}" 

392 ) 

393 

394 def _parse_typed_dict_form( 

395 self, 

396 value: dict[Any, Any], 

397 path: AttributePath, 

398 *, 

399 parser_context: Optional["ParserContextData"] = None, 

400 ) -> TD: 

401 self._validate_expected_keys(value, path, parser_context=parser_context) 

402 result = {} 

403 per_attribute_conflicts = self._per_attribute_conflicts() 

404 alias_mapping = {} 

405 for path_hint_source_attributes in self.path_hint_source_attributes: 

406 v = value.get(path_hint_source_attributes) 

407 if v is not None and _extract_path_hint(v, path): 

408 break 

409 for k, v in value.items(): 

410 attr = self.manifest_attributes[k] 

411 matched = value.keys() & per_attribute_conflicts[k] 

412 if matched: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true

413 ck = ", ".join(repr(k) for k in sorted(matched)) 

414 raise ManifestParseException( 

415 f'The attribute "{k}" at {path.path} cannot be used with the following' 

416 f" attributes: {ck}{self._doc_url_error_suffix()}" 

417 ) 

418 nk = attr.target_attribute 

419 key_path = path[k] 

420 attr.type_validator.ensure_type(v, key_path) 

421 if v is None: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 continue 

423 if k != nk: 

424 alias_mapping[nk] = k, None 

425 v = attr.type_validator.map_type(v, key_path, parser_context) 

426 result[nk] = v 

427 if alias_mapping: 

428 path.alias_mapping = alias_mapping 

429 return cast("TD", result) 

430 

431 def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: 

432 doc_url = self.reference_documentation_url 

433 if doc_url is not None: 

434 if see_url_version: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 return f" Please see {doc_url} for the documentation." 

436 return f" (Documentation: {doc_url})" 

437 return "" 

438 

439 def parse_input( 

440 self, 

441 value: object, 

442 path: AttributePath, 

443 *, 

444 parser_context: Optional["ParserContextData"] = None, 

445 ) -> TD: 

446 check_integration_mode( 

447 path, 

448 parser_context, 

449 self.expected_debputy_integration_mode, 

450 ) 

451 if value is None: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 form_note = " The attribute must be a mapping." 

453 if self.alt_form_parser is not None: 

454 form_note = ( 

455 " The attribute can be a mapping or a non-mapping format" 

456 ' (usually, "non-mapping format" means a string or a list of strings).' 

457 ) 

458 doc_ref = self._doc_url_error_suffix(see_url_version=True) 

459 raise ManifestParseException( 

460 f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" 

461 ) 

462 

463 if not isinstance(value, dict): 

464 return self._parse_alt_form(value, path, parser_context=parser_context) 

465 return self._parse_typed_dict_form(value, path, parser_context=parser_context) 

466 

467 def _per_attribute_conflicts(self) -> Mapping[str, frozenset[str]]: 

468 conflicts = self._per_attribute_conflicts_cache 

469 if conflicts is not None: 

470 return conflicts 

471 attrs = self.source_attributes 

472 conflicts = { 

473 a.source_attribute_name: frozenset( 

474 attrs[ca].source_attribute_name for ca in a.conflicting_attributes 

475 ) 

476 for a in attrs.values() 

477 } 

478 self._per_attribute_conflicts_cache = conflicts 

479 return self._per_attribute_conflicts_cache 

480 

481 

482def _is_path_attribute_candidate( 

483 source_attribute: AttributeDescription, target_attribute: AttributeDescription 

484) -> bool: 

485 if ( 

486 source_attribute.parse_hints 

487 and not source_attribute.parse_hints.applicable_as_path_hint 

488 ): 

489 return False 

490 target_type = target_attribute.attribute_type 

491 _, origin, args = unpack_type(target_type, False) 

492 match_type = target_type 

493 if origin == list: 

494 match_type = args[0] 

495 return isinstance(match_type, type) and issubclass(match_type, FileSystemMatchRule) 

496 

497 

498def is_typeddict(t: Any) -> bool: 

499 return typing.is_typeddict(t) or ( 

500 # Logically, not is_typeddict(t) and is subclass(DebputyParsedContent) 

501 # implies not is_typeddict(DebputyParsedContent) 

502 # except that subclass *fails* for typeddicts. 

503 not typing.is_typeddict(DebputyParsedContent) 

504 and isinstance(t, type) 

505 and issubclass(t, DebputyParsedContent) 

506 ) 

507 

508 

509class ParserGenerator: 

510 def __init__(self) -> None: 

511 self._registered_types: dict[Any, TypeMapping[Any, Any]] = {} 

512 self._object_parsers: dict[str, DispatchingObjectParser] = {} 

513 self._table_parsers: dict[ 

514 type[DebputyDispatchableType], DispatchingTableParser[Any] 

515 ] = {} 

516 self._in_package_context_parser: dict[str, Any] = {} 

517 

518 def register_mapped_type(self, mapped_type: TypeMapping[Any, Any]) -> None: 

519 existing = self._registered_types.get(mapped_type.target_type) 

520 if existing is not None: 520 ↛ 521line 520 didn't jump to line 521 because the condition on line 520 was never true

521 raise ValueError(f"The type {existing} is already registered") 

522 self._registered_types[mapped_type.target_type] = mapped_type 

523 

524 def get_mapped_type_from_target_type( 

525 self, 

526 mapped_type: type[T], 

527 ) -> TypeMapping[Any, T] | None: 

528 return self._registered_types.get(mapped_type) 

529 

530 def discard_mapped_type(self, mapped_type: type[T]) -> None: 

531 del self._registered_types[mapped_type] 

532 

533 def add_table_parser(self, rt: type[DebputyDispatchableType], path: str) -> None: 

534 assert rt not in self._table_parsers 

535 self._table_parsers[rt] = DispatchingTableParser(rt, path) 

536 

537 def add_object_parser( 

538 self, 

539 path: str, 

540 *, 

541 parser_documentation: ParserDocumentation | None = None, 

542 expected_debputy_integration_mode: None | ( 

543 Container[DebputyIntegrationMode] 

544 ) = None, 

545 unknown_keys_diagnostic_severity: Optional["LintSeverity"] = "error", 

546 allow_unknown_keys: bool = False, 

547 ) -> DispatchingObjectParser: 

548 assert path not in self._in_package_context_parser 

549 assert path not in self._object_parsers 

550 object_parser = DispatchingObjectParser( 

551 path, 

552 parser_documentation=parser_documentation, 

553 expected_debputy_integration_mode=expected_debputy_integration_mode, 

554 unknown_keys_diagnostic_severity=unknown_keys_diagnostic_severity, 

555 allow_unknown_keys=allow_unknown_keys, 

556 ) 

557 self._object_parsers[path] = object_parser 

558 return object_parser 

559 

560 def add_in_package_context_parser( 

561 self, 

562 path: str, 

563 delegate: DeclarativeInputParser[Any], 

564 ) -> None: 

565 assert path not in self._in_package_context_parser 

566 assert path not in self._object_parsers 

567 self._in_package_context_parser[path] = InPackageContextParser(path, delegate) 

568 

569 @property 

570 def dispatchable_table_parsers( 

571 self, 

572 ) -> Mapping[type[DebputyDispatchableType], DispatchingTableParser[Any]]: 

573 return self._table_parsers 

574 

575 @property 

576 def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: 

577 return self._object_parsers 

578 

579 def dispatch_parser_table_for( 

580 self, rule_type: TTP 

581 ) -> DispatchingTableParser[TP] | None: 

582 return cast( 

583 "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) 

584 ) 

585 

586 def generate_parser( 

587 self, 

588 parsed_content: type[TD], 

589 *, 

590 source_content: SF | None = None, 

591 allow_optional: bool = False, 

592 inline_reference_documentation: ParserDocumentation | None = None, 

593 expected_debputy_integration_mode: None | ( 

594 Container[DebputyIntegrationMode] 

595 ) = None, 

596 automatic_docs: None | ( 

597 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] 

598 ) = None, 

599 ) -> DeclarativeInputParser[TD]: 

600 """Derive a parser from a TypedDict 

601 

602 Generates a parser for a segment of the manifest (think the `install-docs` snippet) from a TypedDict 

603 or two that are used as a description. 

604 

605 In its most simple use-case, the caller provides a TypedDict of the expected attributed along with 

606 their types. As an example: 

607 

608 >>> class InstallDocsRule(DebputyParsedContent): 

609 ... sources: List[str] 

610 ... into: List[str] 

611 >>> pg = ParserGenerator() 

612 >>> simple_parser = pg.generate_parser(InstallDocsRule) 

613 

614 This will create a parser that would be able to interpret something like: 

615 

616 ```yaml 

617 install-docs: 

618 sources: ["docs/*"] 

619 into: ["my-pkg"] 

620 ``` 

621 

622 While this is sufficient for programmers, it is a bit rigid for the packager writing the manifest. Therefore, 

623 you can also provide a TypedDict describing the input, enabling more flexibility: 

624 

625 >>> class InstallDocsRule(DebputyParsedContent): 

626 ... sources: List[str] 

627 ... into: List[str] 

628 >>> class InputDocsRuleInputFormat(TypedDict): 

629 ... source: NotRequired[Annotated[str, DebputyParseHint.target_attribute("sources")]] 

630 ... sources: NotRequired[List[str]] 

631 ... into: Union[str, List[str]] 

632 >>> pg = ParserGenerator() 

633 >>> flexible_parser = pg.generate_parser( 

634 ... InstallDocsRule, 

635 ... source_content=InputDocsRuleInputFormat, 

636 ... ) 

637 

638 In this case, the `sources` field can either come from a single `source` in the manifest (which must be a string) 

639 or `sources` (which must be a list of strings). The parser also ensures that only one of `source` or `sources` 

640 is used to ensure the input is not ambiguous. For the `into` parameter, the parser will accept it being a str 

641 or a list of strings. Regardless of how the input was provided, the parser will normalize the input so that 

642 both `sources` and `into` in the result is a list of strings. As an example, this parser can accept 

643 both the previous input but also the following input: 

644 

645 ```yaml 

646 install-docs: 

647 source: "docs/*" 

648 into: "my-pkg" 

649 ``` 

650 

651 The `source` and `into` attributes are then normalized to lists as if the user had written them as lists 

652 with a single string in them. As noted above, the name of the `source` attribute will also be normalized 

653 while parsing. 

654 

655 In the cases where only one field is required by the user, it can sometimes make sense to allow a non-dict 

656 as part of the input. Example: 

657 

658 >>> class DiscardRule(DebputyParsedContent): 

659 ... paths: List[str] 

660 >>> class DiscardRuleInputDictFormat(TypedDict): 

661 ... path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

662 ... paths: NotRequired[List[str]] 

663 >>> # This format relies on DiscardRule having exactly one Required attribute 

664 >>> DiscardRuleInputWithAltFormat = Union[ 

665 ... DiscardRuleInputDictFormat, 

666 ... str, 

667 ... List[str], 

668 ... ] 

669 >>> pg = ParserGenerator() 

670 >>> flexible_parser = pg.generate_parser( 

671 ... DiscardRule, 

672 ... source_content=DiscardRuleInputWithAltFormat, 

673 ... ) 

674 

675 

676 Supported types: 

677 * `List` - must have a fixed type argument (such as `List[str]`) 

678 * `str` 

679 * `int` 

680 * `BinaryPackage` - When provided (or required), the user must provide a package name listed 

681 in the debian/control file. The code receives the BinaryPackage instance 

682 matching that input. 

683 * `FileSystemMode` - When provided (or required), the user must provide a file system mode in any 

684 format that `debputy' provides (such as `0644` or `a=rw,go=rw`). 

685 * `FileSystemOwner` - When provided (or required), the user must a file system owner that is 

686 available statically on all Debian systems (must be in `base-passwd`). 

687 The user has multiple options for how to specify it (either via name or id). 

688 * `FileSystemGroup` - When provided (or required), the user must a file system group that is 

689 available statically on all Debian systems (must be in `base-passwd`). 

690 The user has multiple options for how to specify it (either via name or id). 

691 * `ManifestCondition` - When provided (or required), the user must specify a conditional rule to apply. 

692 Usually, it is better to extend `DebputyParsedContentStandardConditional`, which 

693 provides the `debputy' default `when` parameter for conditionals. 

694 

695 Supported special type-like parameters: 

696 

697 * `Required` / `NotRequired` to mark a field as `Required` or `NotRequired`. Must be provided at the 

698 outermost level. Cannot vary between `parsed_content` and `source_content`. 

699 * `Annotated`. Accepted at the outermost level (inside Required/NotRequired) but ignored at the moment. 

700 * `Union`. Must be the outermost level (inside `Annotated` or/and `Required`/`NotRequired` if these are present). 

701 Automapping (see below) is restricted to two members in the Union. 

702 

703 Notable non-supported types: 

704 * `Mapping` and all variants therefore (such as `dict`). In the future, nested `TypedDict`s may be allowed. 

705 * `Optional` (or `Union[..., None]`): Use `NotRequired` for optional fields. 

706 

707 Automatic mapping rules from `source_content` to `parsed_content`: 

708 - `Union[T, List[T]]` can be narrowed automatically to `List[T]`. Transformation is basically: 

709 `lambda value: value if isinstance(value, list) else [value]` 

710 - `T` can be mapped automatically to `List[T]`, Transformation being: `lambda value: [value]` 

711 

712 Additionally, types can be annotated (`Annotated[str, ...]`) with `DebputyParseHint`s. Check its classmethod 

713 for concrete features that may be useful to you. 

714 

715 :param parsed_content: A DebputyParsedContent / TypedDict describing the desired model of the input once parsed. 

716 (DebputyParsedContent is a TypedDict subclass that work around some inadequate type checkers). 

717 It can also be a `List[DebputyParsedContent]`. In that case, `source_content` must be a 

718 `List[TypedDict[...]]`. 

719 :param source_content: Optionally, a TypedDict describing the input allowed by the user. This can be useful 

720 to describe more variations than in `parsed_content` that the parser will normalize for you. If omitted, 

721 the parsed_content is also considered the source_content (which affects what annotations are allowed in it). 

722 Note you should never pass the parsed_content as source_content directly. 

723 :param allow_optional: In rare cases, you want to support explicitly provided vs. optional. In this case, you 

724 should set this to True. Though, in 99.9% of all cases, you want `NotRequired` rather than `Optional` (and 

725 can keep this False). 

726 :param inline_reference_documentation: Optionally, programmatic documentation 

727 :param expected_debputy_integration_mode: If provided, this declares the integration modes where the 

728 result of the parser can be used. This is primarily useful for "fail-fast" on incorrect usage. 

729 When the restriction is not satisfiable, the generated parser will trigger a parse error immediately 

730 (resulting in a "compile time" failure rather than a "runtime" failure). 

731 :return: An input parser capable of reading input matching the TypedDict(s) used as reference. 

732 """ 

733 orig_parsed_content = parsed_content 

734 if source_content is parsed_content: 734 ↛ 735line 734 didn't jump to line 735 because the condition on line 734 was never true

735 raise ValueError( 

736 "Do not provide source_content if it is the same as parsed_content" 

737 ) 

738 is_list_wrapped = False 

739 if get_origin(orig_parsed_content) == list: 

740 parsed_content = get_args(orig_parsed_content)[0] 

741 is_list_wrapped = True 

742 

743 if isinstance(parsed_content, type) and issubclass( 

744 parsed_content, DebputyDispatchableType 

745 ): 

746 parser = self.dispatch_parser_table_for(parsed_content) 

747 if parser is None: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true

748 raise ValueError( 

749 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

750 f" The class {parsed_content.__qualname__} is not a pre-registered type." 

751 ) 

752 # FIXME: Only the list wrapped version has documentation. 

753 if is_list_wrapped: 753 ↛ 759line 753 didn't jump to line 759 because the condition on line 753 was always true

754 parser = ListWrappedDeclarativeInputParser( 

755 parser, 

756 inline_reference_documentation=inline_reference_documentation, 

757 expected_debputy_integration_mode=expected_debputy_integration_mode, 

758 ) 

759 return parser 

760 

761 if not is_typeddict(parsed_content): 761 ↛ 762line 761 didn't jump to line 762 because the condition on line 761 was never true

762 raise ValueError( 

763 f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." 

764 ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' 

765 ) 

766 if is_list_wrapped and source_content is not None: 

767 if get_origin(source_content) != list: 767 ↛ 768line 767 didn't jump to line 768 because the condition on line 767 was never true

768 raise ValueError( 

769 "If the parsed_content is a List type, then source_format must be a List type as well." 

770 ) 

771 source_content = get_args(source_content)[0] 

772 

773 target_attributes = self._parse_types( 

774 parsed_content, 

775 allow_source_attribute_annotations=source_content is None, 

776 forbid_optional=not allow_optional, 

777 ) 

778 required_target_parameters = frozenset(parsed_content.__required_keys__) 

779 parsed_alt_form = None 

780 non_mapping_source_only = False 

781 

782 if source_content is not None: 

783 default_target_attribute = None 

784 if len(required_target_parameters) == 1: 

785 default_target_attribute = next(iter(required_target_parameters)) 

786 

787 source_typed_dict, alt_source_forms = _extract_typed_dict( 

788 source_content, 

789 default_target_attribute, 

790 ) 

791 if alt_source_forms: 

792 parsed_alt_form = self._parse_alt_form( 

793 alt_source_forms, 

794 default_target_attribute, 

795 ) 

796 if source_typed_dict is not None: 

797 source_content_attributes = self._parse_types( 

798 source_typed_dict, 

799 allow_target_attribute_annotation=True, 

800 allow_source_attribute_annotations=True, 

801 forbid_optional=not allow_optional, 

802 ) 

803 source_content_parameter = "source_content" 

804 source_and_parsed_differs = True 

805 else: 

806 source_typed_dict = parsed_content 

807 source_content_attributes = target_attributes 

808 source_content_parameter = "parsed_content" 

809 source_and_parsed_differs = True 

810 non_mapping_source_only = True 

811 else: 

812 source_typed_dict = parsed_content 

813 source_content_attributes = target_attributes 

814 source_content_parameter = "parsed_content" 

815 source_and_parsed_differs = False 

816 

817 sources = collections.defaultdict(set) 

818 seen_targets = set() 

819 seen_source_names: dict[str, str] = {} 

820 source_attributes: dict[str, AttributeDescription] = {} 

821 path_hint_source_attributes = [] 

822 

823 for k in source_content_attributes: 

824 ia = source_content_attributes[k] 

825 

826 ta = ( 

827 target_attributes.get(ia.target_attribute) 

828 if source_and_parsed_differs 

829 else ia 

830 ) 

831 if ta is None: 831 ↛ 833line 831 didn't jump to line 833 because the condition on line 831 was never true

832 # Error message would be wrong if this assertion is false. 

833 assert source_and_parsed_differs 

834 raise ValueError( 

835 f'The attribute "{k}" from the "source_content" parameter should have mapped' 

836 f' to "{ia.target_attribute}", but that parameter does not exist in "parsed_content"' 

837 ) 

838 if _is_path_attribute_candidate(ia, ta): 

839 path_hint_source_attributes.append(ia.source_attribute_name) 

840 existing_source_name = seen_source_names.get(ia.source_attribute_name) 

841 if existing_source_name: 841 ↛ 842line 841 didn't jump to line 842 because the condition on line 841 was never true

842 raise ValueError( 

843 f'The attribute "{k}" and "{existing_source_name}" both share the source name' 

844 f' "{ia.source_attribute_name}". Please change the {source_content_parameter} parameter,' 

845 f' so only one attribute use "{ia.source_attribute_name}".' 

846 ) 

847 seen_source_names[ia.source_attribute_name] = k 

848 seen_targets.add(ta.target_attribute) 

849 sources[ia.target_attribute].add(k) 

850 if source_and_parsed_differs: 

851 bridge_mapper = self._type_normalize( 

852 k, ia.attribute_type, ta.attribute_type, False 

853 ) 

854 ia.type_validator = ia.type_validator.combine_mapper(bridge_mapper) 

855 source_attributes[k] = ia 

856 

857 def _as_attr_names(td_name: Iterable[str]) -> frozenset[str]: 

858 return frozenset( 

859 source_content_attributes[a].source_attribute_name for a in td_name 

860 ) 

861 

862 _check_attributes( 

863 parsed_content, 

864 source_typed_dict, 

865 source_content_attributes, 

866 sources, 

867 ) 

868 

869 at_least_one_of = frozenset( 

870 _as_attr_names(g) 

871 for k, g in sources.items() 

872 if len(g) > 1 and k in required_target_parameters 

873 ) 

874 

875 if source_and_parsed_differs and seen_targets != target_attributes.keys(): 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true

876 missing = ", ".join( 

877 repr(k) for k in (target_attributes.keys() - seen_targets) 

878 ) 

879 raise ValueError( 

880 'The following attributes in "parsed_content" did not have a source field in "source_content":' 

881 f" {missing}" 

882 ) 

883 all_mutually_exclusive_fields = frozenset( 

884 _as_attr_names(g) for g in sources.values() if len(g) > 1 

885 ) 

886 

887 all_parameters = ( 

888 source_typed_dict.__required_keys__ | source_typed_dict.__optional_keys__ 

889 ) 

890 _check_conflicts( 

891 source_content_attributes, 

892 source_typed_dict.__required_keys__, 

893 all_parameters, 

894 ) 

895 

896 manifest_attributes = { 

897 a.source_attribute_name: a for a in source_content_attributes.values() 

898 } 

899 

900 if parsed_alt_form is not None: 

901 target_attribute = parsed_alt_form.target_attribute 

902 if ( 902 ↛ 907line 902 didn't jump to line 907 because the condition on line 902 was never true

903 target_attribute not in required_target_parameters 

904 and required_target_parameters 

905 or len(required_target_parameters) > 1 

906 ): 

907 raise NotImplementedError( 

908 "When using alternative source formats (Union[TypedDict, ...]), then the" 

909 " target must have at most one require parameter" 

910 ) 

911 bridge_mapper = self._type_normalize( 

912 target_attribute, 

913 parsed_alt_form.attribute_type, 

914 target_attributes[target_attribute].attribute_type, 

915 False, 

916 ) 

917 parsed_alt_form.type_validator = ( 

918 parsed_alt_form.type_validator.combine_mapper(bridge_mapper) 

919 ) 

920 

921 inline_reference_documentation = ( 

922 _verify_and_auto_correct_inline_reference_documentation( 

923 parsed_content, 

924 source_typed_dict, 

925 source_content_attributes, 

926 inline_reference_documentation, 

927 parsed_alt_form is not None, 

928 automatic_docs, 

929 ) 

930 ) 

931 if non_mapping_source_only: 

932 parser = DeclarativeNonMappingInputParser( 

933 assume_not_none(parsed_alt_form), 

934 inline_reference_documentation=inline_reference_documentation, 

935 expected_debputy_integration_mode=expected_debputy_integration_mode, 

936 ) 

937 else: 

938 parser = DeclarativeMappingInputParser( 

939 _as_attr_names(source_typed_dict.__required_keys__), 

940 _as_attr_names(all_parameters), 

941 manifest_attributes, 

942 source_attributes, 

943 mutually_exclusive_attributes=all_mutually_exclusive_fields, 

944 alt_form_parser=parsed_alt_form, 

945 at_least_one_of=at_least_one_of, 

946 inline_reference_documentation=inline_reference_documentation, 

947 path_hint_source_attributes=tuple(path_hint_source_attributes), 

948 expected_debputy_integration_mode=expected_debputy_integration_mode, 

949 ) 

950 if is_list_wrapped: 

951 parser = ListWrappedDeclarativeInputParser( 

952 parser, 

953 expected_debputy_integration_mode=expected_debputy_integration_mode, 

954 ) 

955 return parser 

956 

957 def _as_type_validator( 

958 self, 

959 attribute: str, 

960 provided_type: Any, 

961 parsing_typed_dict_attribute: bool, 

962 ) -> AttributeTypeHandler: 

963 assert not isinstance(provided_type, tuple) 

964 

965 if isinstance(provided_type, type) and issubclass( 

966 provided_type, DebputyDispatchableType 

967 ): 

968 return _dispatch_parser(provided_type) 

969 

970 unmapped_type = self._strip_mapped_types( 

971 provided_type, 

972 parsing_typed_dict_attribute, 

973 ) 

974 type_normalizer = self._type_normalize( 

975 attribute, 

976 unmapped_type, 

977 provided_type, 

978 parsing_typed_dict_attribute, 

979 ) 

980 t_unmapped, t_unmapped_orig, t_unmapped_args = unpack_type( 

981 unmapped_type, 

982 parsing_typed_dict_attribute, 

983 ) 

984 _, t_provided_orig, t_provided_args = unpack_type( 

985 provided_type, 

986 parsing_typed_dict_attribute, 

987 ) 

988 

989 if ( 989 ↛ 995line 989 didn't jump to line 995 because the condition on line 989 was never true

990 t_unmapped_orig == Union 

991 and t_unmapped_args 

992 and len(t_unmapped_args) == 2 

993 and any(v is _NONE_TYPE for v in t_unmapped_args) 

994 ): 

995 _, _, args = unpack_type(provided_type, parsing_typed_dict_attribute) 

996 actual_type = [a for a in args if a is not _NONE_TYPE][0] 

997 validator = self._as_type_validator( 

998 attribute, actual_type, parsing_typed_dict_attribute 

999 ) 

1000 

1001 def _validator(v: Any, path: AttributePath) -> None: 

1002 if v is None: 

1003 return 

1004 validator.ensure_type(v, path) 

1005 

1006 return AttributeTypeHandler( 

1007 validator.describe_type(), 

1008 _validator, 

1009 base_type=validator.base_type, 

1010 mapper=type_normalizer, 

1011 ) 

1012 

1013 if unmapped_type in BASIC_SIMPLE_TYPES: 

1014 type_name = BASIC_SIMPLE_TYPES[unmapped_type] 

1015 

1016 type_mapping = self._registered_types.get(provided_type) 

1017 if type_mapping is not None: 

1018 simple_type = f" ({type_name})" 

1019 type_name = type_mapping.target_type.__name__ 

1020 else: 

1021 simple_type = "" 

1022 

1023 def _validator(v: Any, path: AttributePath) -> None: 

1024 if not isinstance(v, unmapped_type): 

1025 _validation_type_error( 

1026 path, f"The attribute must be a {type_name}{simple_type}" 

1027 ) 

1028 

1029 return AttributeTypeHandler( 

1030 type_name, 

1031 _validator, 

1032 base_type=unmapped_type, 

1033 mapper=type_normalizer, 

1034 ) 

1035 if t_unmapped_orig == list: 

1036 if not t_unmapped_args: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true

1037 raise ValueError( 

1038 f'The attribute "{attribute}" is List but does not have Generics (Must use List[X])' 

1039 ) 

1040 

1041 genetic_type = t_unmapped_args[0] 

1042 key_mapper = self._as_type_validator( 

1043 attribute, 

1044 genetic_type, 

1045 parsing_typed_dict_attribute, 

1046 ) 

1047 

1048 def _validator(v: Any, path: AttributePath) -> None: 

1049 if not isinstance(v, list): 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true

1050 _validation_type_error(path, "The attribute must be a list") 

1051 for i, list_item in enumerate(v): 

1052 key_mapper.ensure_type(list_item, path[i]) 

1053 

1054 list_mapper = ( 

1055 map_each_element(key_mapper.mapper) 

1056 if key_mapper.mapper is not None 

1057 else None 

1058 ) 

1059 

1060 return AttributeTypeHandler( 

1061 f"List of {key_mapper.describe_type()}", 

1062 _validator, 

1063 base_type=list, 

1064 mapper=type_normalizer, 

1065 ).combine_mapper(list_mapper) 

1066 if is_typeddict(provided_type): 

1067 subparser = self.generate_parser(cast("Type[TD]", provided_type)) 

1068 return AttributeTypeHandler( 

1069 description=f"{provided_type.__name__} (Typed Mapping)", 

1070 ensure_type=lambda v, ap: None, 

1071 base_type=dict, 

1072 mapper=lambda v, ap, cv: subparser.parse_input( 

1073 v, ap, parser_context=cv 

1074 ), 

1075 ) 

1076 if t_unmapped_orig == dict: 

1077 if not t_unmapped_args or len(t_unmapped_args) != 2: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true

1078 raise ValueError( 

1079 f'The attribute "{attribute}" is Dict but does not have Generics (Must use Dict[str, Y])' 

1080 ) 

1081 if t_unmapped_args[0] != str: 1081 ↛ 1082line 1081 didn't jump to line 1082 because the condition on line 1081 was never true

1082 raise ValueError( 

1083 f'The attribute "{attribute}" is Dict and has a non-str type as key.' 

1084 " Currently, only `str` is supported (Dict[str, Y])" 

1085 ) 

1086 key_mapper = self._as_type_validator( 

1087 attribute, 

1088 t_unmapped_args[0], 

1089 parsing_typed_dict_attribute, 

1090 ) 

1091 value_mapper = self._as_type_validator( 

1092 attribute, 

1093 t_unmapped_args[1], 

1094 parsing_typed_dict_attribute, 

1095 ) 

1096 

1097 if key_mapper.base_type is None: 1097 ↛ 1098line 1097 didn't jump to line 1098 because the condition on line 1097 was never true

1098 raise ValueError( 

1099 f'The attribute "{attribute}" is Dict and the key did not have a trivial base type. Key types' 

1100 f" without trivial base types (such as `str`) are not supported at the moment." 

1101 ) 

1102 

1103 if value_mapper.mapper is not None: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 raise ValueError( 

1105 f'The attribute "{attribute}" is Dict and the value requires mapping.' 

1106 " Currently, this is not supported. Consider a simpler type (such as Dict[str, str] or Dict[str, Any])." 

1107 " Better typing may come later" 

1108 ) 

1109 

1110 def _validator(v: Any, path: AttributePath) -> None: 

1111 if not isinstance(v, dict): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true

1112 _validation_type_error(path, "The attribute must be a mapping") 

1113 key_name = "the first key in the mapping" 

1114 for i, (k, value) in enumerate(v.items()): 

1115 if not key_mapper.base_type_match(k): 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true

1116 kp = path.copy_with_path_hint(key_name) 

1117 _validation_type_error( 

1118 kp, 

1119 f'The key number {i + 1} in attribute "{kp}" must be a {key_mapper.describe_type()}', 

1120 ) 

1121 key_name = f"the key after {k}" 

1122 value_mapper.ensure_type(value, path[k]) 

1123 

1124 return AttributeTypeHandler( 

1125 f"Mapping of {value_mapper.describe_type()}", 

1126 _validator, 

1127 base_type=dict, 

1128 mapper=type_normalizer, 

1129 ).combine_mapper(key_mapper.mapper) 

1130 if t_unmapped_orig in (Union, UnionType): 

1131 if _is_two_arg_x_list_x(t_provided_args): 

1132 # Force the order to be "X, List[X]" as it simplifies the code 

1133 x_list_x = ( 

1134 t_provided_args 

1135 if get_origin(t_provided_args[1]) == list 

1136 else (t_provided_args[1], t_provided_args[0]) 

1137 ) 

1138 

1139 # X, List[X] could match if X was List[Y]. However, our code below assumes 

1140 # that X is a non-list. The `_is_two_arg_x_list_x` returns False for this 

1141 # case to avoid this assert and fall into the "generic case". 

1142 assert get_origin(x_list_x[0]) != list 

1143 x_subtype_checker = self._as_type_validator( 

1144 attribute, 

1145 x_list_x[0], 

1146 parsing_typed_dict_attribute, 

1147 ) 

1148 list_x_subtype_checker = self._as_type_validator( 

1149 attribute, 

1150 x_list_x[1], 

1151 parsing_typed_dict_attribute, 

1152 ) 

1153 type_description = x_subtype_checker.describe_type() 

1154 type_description = f"{type_description} or a list of {type_description}" 

1155 

1156 def _validator(v: Any, path: AttributePath) -> None: 

1157 if isinstance(v, list): 

1158 list_x_subtype_checker.ensure_type(v, path) 

1159 else: 

1160 x_subtype_checker.ensure_type(v, path) 

1161 

1162 return AttributeTypeHandler( 

1163 type_description, 

1164 _validator, 

1165 mapper=type_normalizer, 

1166 ) 

1167 else: 

1168 subtype_checker = [ 

1169 self._as_type_validator(attribute, a, parsing_typed_dict_attribute) 

1170 for a in t_unmapped_args 

1171 ] 

1172 type_description = "one-of: " + ", ".join( 

1173 f"{sc.describe_type()}" for sc in subtype_checker 

1174 ) 

1175 mapper = subtype_checker[0].mapper 

1176 if any(mapper != sc.mapper for sc in subtype_checker): 1176 ↛ 1177line 1176 didn't jump to line 1177 because the condition on line 1176 was never true

1177 raise ValueError( 

1178 f'Cannot handle the union "{provided_type}" as the target types need different' 

1179 " type normalization/mapping logic. Unions are generally limited to Union[X, List[X]]" 

1180 " where X is a non-collection type." 

1181 ) 

1182 

1183 def _validator(v: Any, path: AttributePath) -> None: 

1184 partial_matches = [] 

1185 for sc in subtype_checker: 1185 ↛ 1193line 1185 didn't jump to line 1193 because the loop on line 1185 didn't complete

1186 try: 

1187 sc.ensure_type(v, path) 

1188 return 

1189 except ManifestParseException as e: 

1190 if sc.base_type_match(v): 1190 ↛ 1191line 1190 didn't jump to line 1191 because the condition on line 1190 was never true

1191 partial_matches.append((sc, e)) 

1192 

1193 if len(partial_matches) == 1: 

1194 raise partial_matches[0][1] 

1195 _validation_type_error( 

1196 path, f"Could not match against: {type_description}" 

1197 ) 

1198 

1199 return AttributeTypeHandler( 

1200 type_description, 

1201 _validator, 

1202 mapper=type_normalizer, 

1203 ) 

1204 if t_unmapped_orig == Literal: 

1205 # We want "x" for string values; repr provides 'x' 

1206 pretty = ", ".join( 

1207 f"`{v}`" if isinstance(v, str) else str(v) for v in t_unmapped_args 

1208 ) 

1209 

1210 def _validator(v: Any, path: AttributePath) -> None: 

1211 if v not in t_unmapped_args: 

1212 value_hint = "" 

1213 if isinstance(v, str): 1213 ↛ 1215line 1213 didn't jump to line 1215 because the condition on line 1213 was always true

1214 value_hint = f"({v}) " 

1215 _validation_type_error( 

1216 path, 

1217 f"Value {value_hint}must be one of the following literal values: {pretty}", 

1218 ) 

1219 

1220 return AttributeTypeHandler( 

1221 f"One of the following literal values: {pretty}", 

1222 _validator, 

1223 ) 

1224 

1225 if provided_type == Any: 1225 ↛ 1230line 1225 didn't jump to line 1230 because the condition on line 1225 was always true

1226 return AttributeTypeHandler( 

1227 "any (unvalidated)", 

1228 lambda *a: None, 

1229 ) 

1230 raise ValueError( 

1231 f'The attribute "{attribute}" had/contained a type {provided_type}, which is not supported' 

1232 ) 

1233 

1234 def _parse_types( 

1235 self, 

1236 spec: type[TypedDict], 

1237 allow_target_attribute_annotation: bool = False, 

1238 allow_source_attribute_annotations: bool = False, 

1239 forbid_optional: bool = True, 

1240 ) -> dict[str, AttributeDescription]: 

1241 annotations = get_type_hints(spec, include_extras=True) 

1242 return { 

1243 k: self._attribute_description( 

1244 k, 

1245 t, 

1246 k in spec.__required_keys__, 

1247 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1248 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1249 forbid_optional=forbid_optional, 

1250 ) 

1251 for k, t in annotations.items() 

1252 } 

1253 

1254 def _attribute_description( 

1255 self, 

1256 attribute: str, 

1257 orig_td: Any, 

1258 is_required: bool, 

1259 forbid_optional: bool = True, 

1260 allow_target_attribute_annotation: bool = False, 

1261 allow_source_attribute_annotations: bool = False, 

1262 ) -> AttributeDescription: 

1263 td, anno, is_optional = _parse_type( 

1264 attribute, orig_td, forbid_optional=forbid_optional 

1265 ) 

1266 type_validator = self._as_type_validator(attribute, td, True) 

1267 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1268 anno, 

1269 f' Seen with attribute "{attribute}".', 

1270 attribute, 

1271 is_required, 

1272 allow_target_attribute_annotation=allow_target_attribute_annotation, 

1273 allow_source_attribute_annotations=allow_source_attribute_annotations, 

1274 ) 

1275 return AttributeDescription( 

1276 target_attribute=parsed_annotations.target_attribute, 

1277 attribute_type=td, 

1278 type_validator=type_validator, 

1279 annotations=anno, 

1280 is_optional=is_optional, 

1281 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1282 conditional_required=parsed_annotations.conditional_required, 

1283 source_attribute_name=assume_not_none( 

1284 parsed_annotations.source_manifest_attribute 

1285 ), 

1286 parse_hints=parsed_annotations, 

1287 ) 

1288 

1289 def _parse_alt_form( 

1290 self, 

1291 alt_form, 

1292 default_target_attribute: str | None, 

1293 ) -> AttributeDescription: 

1294 td, anno, is_optional = _parse_type( 

1295 "source_format alternative form", 

1296 alt_form, 

1297 forbid_optional=True, 

1298 parsing_typed_dict_attribute=False, 

1299 ) 

1300 type_validator = self._as_type_validator( 

1301 "source_format alternative form", 

1302 td, 

1303 True, 

1304 ) 

1305 parsed_annotations = DetectedDebputyParseHint.parse_annotations( 

1306 anno, 

1307 " The alternative for source_format.", 

1308 None, 

1309 False, 

1310 default_target_attribute=default_target_attribute, 

1311 allow_target_attribute_annotation=True, 

1312 allow_source_attribute_annotations=False, 

1313 ) 

1314 return AttributeDescription( 

1315 target_attribute=parsed_annotations.target_attribute, 

1316 attribute_type=td, 

1317 type_validator=type_validator, 

1318 annotations=anno, 

1319 is_optional=is_optional, 

1320 conflicting_attributes=parsed_annotations.conflict_with_source_attributes, 

1321 conditional_required=parsed_annotations.conditional_required, 

1322 source_attribute_name="Alt form of the source_format", 

1323 ) 

1324 

1325 def _union_narrowing( 

1326 self, 

1327 input_type: Any, 

1328 target_type: Any, 

1329 parsing_typed_dict_attribute: bool, 

1330 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1331 _, input_orig, input_args = unpack_type( 

1332 input_type, parsing_typed_dict_attribute 

1333 ) 

1334 _, target_orig, target_args = unpack_type( 

1335 target_type, parsing_typed_dict_attribute 

1336 ) 

1337 

1338 if input_orig not in (Union, UnionType) or not input_args: 1338 ↛ 1339line 1338 didn't jump to line 1339 because the condition on line 1338 was never true

1339 raise ValueError("input_type must be a Union[...] with non-empty args") 

1340 

1341 # Currently, we only support Union[X, List[X]] -> List[Y] narrowing or Union[X, List[X]] -> Union[Y, Union[Y]] 

1342 # - Where X = Y or there is a simple standard transformation from X to Y. 

1343 

1344 if target_orig not in (Union, UnionType, list) or not target_args: 

1345 # Not supported 

1346 return None 

1347 

1348 if target_orig in (Union, UnionType) and set(input_args) == set(target_args): 1348 ↛ 1350line 1348 didn't jump to line 1350 because the condition on line 1348 was never true

1349 # Not needed (identity mapping) 

1350 return None 

1351 

1352 if target_orig == list and not any(get_origin(a) == list for a in input_args): 1352 ↛ 1354line 1352 didn't jump to line 1354 because the condition on line 1352 was never true

1353 # Not supported 

1354 return None 

1355 

1356 target_arg = target_args[0] 

1357 simplified_type = self._strip_mapped_types( 

1358 target_arg, parsing_typed_dict_attribute 

1359 ) 

1360 acceptable_types = { 

1361 target_arg, 

1362 list[target_arg], # type: ignore 

1363 List[target_arg], # type: ignore 

1364 simplified_type, 

1365 list[simplified_type], # type: ignore 

1366 List[simplified_type], # type: ignore 

1367 } 

1368 target_format = ( 

1369 target_arg, 

1370 list[target_arg], # type: ignore 

1371 List[target_arg], # type: ignore 

1372 ) 

1373 in_target_format = 0 

1374 in_simple_format = 0 

1375 for input_arg in input_args: 

1376 if input_arg not in acceptable_types: 1376 ↛ 1378line 1376 didn't jump to line 1378 because the condition on line 1376 was never true

1377 # Not supported 

1378 return None 

1379 if input_arg in target_format: 

1380 in_target_format += 1 

1381 else: 

1382 in_simple_format += 1 

1383 

1384 assert in_simple_format or in_target_format 

1385 

1386 if in_target_format and not in_simple_format: 

1387 # Union[X, List[X]] -> List[X] 

1388 return normalize_into_list 

1389 mapped = self._registered_types[target_arg] 

1390 if not in_target_format and in_simple_format: 1390 ↛ 1405line 1390 didn't jump to line 1405 because the condition on line 1390 was always true

1391 # Union[X, List[X]] -> List[Y] 

1392 

1393 def _mapper_x_list_y( 

1394 x: Any | list[Any], 

1395 ap: AttributePath, 

1396 pc: Optional["ParserContextData"], 

1397 ) -> list[Any]: 

1398 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1399 

1400 return [mapped.mapper(x, ap, pc) for x in in_list_form] 

1401 

1402 return _mapper_x_list_y 

1403 

1404 # Union[Y, List[X]] -> List[Y] 

1405 if not isinstance(target_arg, type): 

1406 raise ValueError( 

1407 f"Cannot narrow {input_type} -> {target_type}: The automatic conversion does" 

1408 f" not support mixed types. Please use either {simplified_type} or {target_arg}" 

1409 f" in the source content (but both a mix of both)" 

1410 ) 

1411 

1412 def _mapper_mixed_list_y( 

1413 x: Any | list[Any], 

1414 ap: AttributePath, 

1415 pc: Optional["ParserContextData"], 

1416 ) -> list[Any]: 

1417 in_list_form: list[Any] = normalize_into_list(x, ap, pc) 

1418 

1419 return [ 

1420 x if isinstance(x, target_arg) else mapped.mapper(x, ap, pc) 

1421 for x in in_list_form 

1422 ] 

1423 

1424 return _mapper_mixed_list_y 

1425 

1426 def _type_normalize( 

1427 self, 

1428 attribute: str, 

1429 input_type: Any, 

1430 target_type: Any, 

1431 parsing_typed_dict_attribute: bool, 

1432 ) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any] | None: 

1433 if input_type == target_type: 

1434 return None 

1435 _, input_orig, input_args = unpack_type( 

1436 input_type, parsing_typed_dict_attribute 

1437 ) 

1438 _, target_orig, target_args = unpack_type( 

1439 target_type, 

1440 parsing_typed_dict_attribute, 

1441 ) 

1442 if input_orig in (Union, UnionType): 

1443 result = self._union_narrowing( 

1444 input_type, target_type, parsing_typed_dict_attribute 

1445 ) 

1446 if result: 

1447 return result 

1448 elif target_orig == list and target_args[0] == input_type: 

1449 return wrap_into_list 

1450 

1451 mapped = self._registered_types.get(target_type) 

1452 if mapped is not None and input_type == mapped.source_type: 

1453 # Source -> Target 

1454 return mapped.mapper 

1455 if target_orig == list and target_args: 1455 ↛ 1473line 1455 didn't jump to line 1473 because the condition on line 1455 was always true

1456 mapped = self._registered_types.get(target_args[0]) 

1457 if mapped is not None: 1457 ↛ 1473line 1457 didn't jump to line 1473 because the condition on line 1457 was always true

1458 # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. 

1459 mapped_type: TypeMapping = mapped 

1460 if input_type == mapped.source_type: 1460 ↛ 1462line 1460 didn't jump to line 1462 because the condition on line 1460 was never true

1461 # Source -> List[Target] 

1462 return lambda x, ap, pc: [mapped_type.mapper(x, ap, pc)] 

1463 if ( 1463 ↛ 1473line 1463 didn't jump to line 1473 because the condition on line 1463 was always true

1464 input_orig == list 

1465 and input_args 

1466 and input_args[0] == mapped_type.source_type 

1467 ): 

1468 # List[Source] -> List[Target] 

1469 return lambda xs, ap, pc: [ 

1470 mapped_type.mapper(x, ap, pc) for x in xs 

1471 ] 

1472 

1473 raise ValueError( 

1474 f'Unsupported type normalization for "{attribute}": Cannot automatically map/narrow' 

1475 f" {input_type} to {target_type}" 

1476 ) 

1477 

1478 def _strip_mapped_types( 

1479 self, orig_td: Any, parsing_typed_dict_attribute: bool 

1480 ) -> Any: 

1481 m = self._registered_types.get(orig_td) 

1482 if m is not None: 

1483 return m.source_type 

1484 _, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1485 if v == list: 

1486 arg = args[0] 

1487 m = self._registered_types.get(arg) 

1488 if m: 

1489 return list[m.source_type] # type: ignore 

1490 if v in (Union, UnionType): 

1491 stripped_args = tuple( 

1492 self._strip_mapped_types(x, parsing_typed_dict_attribute) for x in args 

1493 ) 

1494 if stripped_args != args: 

1495 return Union[stripped_args] 

1496 return orig_td 

1497 

1498 

1499def _sort_key(attr: StandardParserAttributeDocumentation) -> Any: 

1500 key = next(iter(attr.attributes)) 

1501 return attr.sort_category, key 

1502 

1503 

1504def _apply_std_docs( 

1505 std_doc_table: ( 

1506 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1507 ), 

1508 source_format_typed_dict: type[Any], 

1509 attribute_docs: Sequence[ParserAttributeDocumentation] | None, 

1510) -> Sequence[ParserAttributeDocumentation] | None: 

1511 if std_doc_table is None or not std_doc_table: 1511 ↛ 1514line 1511 didn't jump to line 1514 because the condition on line 1511 was always true

1512 return attribute_docs 

1513 

1514 has_docs_for = set() 

1515 if attribute_docs: 

1516 for attribute_doc in attribute_docs: 

1517 has_docs_for.update(attribute_doc.attributes) 

1518 

1519 base_seen = set() 

1520 std_docs_used = [] 

1521 

1522 remaining_bases = set(getattr(source_format_typed_dict, "__orig_bases__", [])) 

1523 base_seen.update(remaining_bases) 

1524 while remaining_bases: 

1525 base = remaining_bases.pop() 

1526 new_bases_to_check = { 

1527 x for x in getattr(base, "__orig_bases__", []) if x not in base_seen 

1528 } 

1529 remaining_bases.update(new_bases_to_check) 

1530 base_seen.update(new_bases_to_check) 

1531 std_docs = std_doc_table.get(base) 

1532 if std_docs: 

1533 for std_doc in std_docs: 

1534 if any(a in has_docs_for for a in std_doc.attributes): 

1535 # If there is any overlap, do not add the docs 

1536 continue 

1537 has_docs_for.update(std_doc.attributes) 

1538 std_docs_used.append(std_doc) 

1539 

1540 if not std_docs_used: 

1541 return attribute_docs 

1542 docs = sorted(std_docs_used, key=_sort_key) 

1543 if attribute_docs: 

1544 # Plugin provided attributes first 

1545 c = list(attribute_docs) 

1546 c.extend(docs) 

1547 docs = c 

1548 return tuple(docs) 

1549 

1550 

1551def _verify_and_auto_correct_inline_reference_documentation( 

1552 parsed_content: type[TD], 

1553 source_typed_dict: type[Any], 

1554 source_content_attributes: Mapping[str, AttributeDescription], 

1555 inline_reference_documentation: ParserDocumentation | None, 

1556 has_alt_form: bool, 

1557 automatic_docs: ( 

1558 Mapping[type[Any], Sequence[StandardParserAttributeDocumentation]] | None 

1559 ) = None, 

1560) -> ParserDocumentation | None: 

1561 orig_attribute_docs = ( 

1562 inline_reference_documentation.attribute_doc 

1563 if inline_reference_documentation 

1564 else None 

1565 ) 

1566 attribute_docs = _apply_std_docs( 

1567 automatic_docs, 

1568 source_typed_dict, 

1569 orig_attribute_docs, 

1570 ) 

1571 if inline_reference_documentation is None and attribute_docs is None: 

1572 return None 

1573 changes = {} 

1574 if attribute_docs: 

1575 seen = set() 

1576 had_any_custom_docs = False 

1577 for attr_doc in attribute_docs: 

1578 if not isinstance(attr_doc, StandardParserAttributeDocumentation): 

1579 had_any_custom_docs = True 

1580 for attr_name in attr_doc.attributes: 

1581 attr = source_content_attributes.get(attr_name) 

1582 if attr is None: 1582 ↛ 1583line 1582 didn't jump to line 1583 because the condition on line 1582 was never true

1583 raise ValueError( 

1584 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1585 f' references an attribute "{attr_name}", which does not exist in the source format.' 

1586 ) 

1587 if attr_name in seen: 1587 ↛ 1588line 1587 didn't jump to line 1588 because the condition on line 1587 was never true

1588 raise ValueError( 

1589 f"The inline_reference_documentation for the source format of {parsed_content.__qualname__}" 

1590 f' has documentation for "{attr_name}" twice, which is not supported.' 

1591 f" Please document it at most once" 

1592 ) 

1593 seen.add(attr_name) 

1594 undocumented = source_content_attributes.keys() - seen 

1595 if undocumented: 1595 ↛ 1596line 1595 didn't jump to line 1596 because the condition on line 1595 was never true

1596 if had_any_custom_docs: 

1597 undocumented_attrs = ", ".join(undocumented) 

1598 raise ValueError( 

1599 f"The following attributes were not documented for the source format of" 

1600 f" {parsed_content.__qualname__}. If this is deliberate, then please" 

1601 ' declare each them as undocumented (via undocumented_attr("foo")):' 

1602 f" {undocumented_attrs}" 

1603 ) 

1604 combined_docs = list(attribute_docs) 

1605 combined_docs.extend(undocumented_attr(a) for a in sorted(undocumented)) 

1606 attribute_docs = combined_docs 

1607 

1608 if attribute_docs and orig_attribute_docs != attribute_docs: 1608 ↛ 1609line 1608 didn't jump to line 1609 because the condition on line 1608 was never true

1609 assert attribute_docs is not None 

1610 changes["attribute_doc"] = tuple(attribute_docs) 

1611 

1612 if ( 1612 ↛ 1617line 1612 didn't jump to line 1617 because the condition on line 1612 was never true

1613 inline_reference_documentation is not None 

1614 and inline_reference_documentation.alt_parser_description 

1615 and not has_alt_form 

1616 ): 

1617 raise ValueError( 

1618 "The inline_reference_documentation had documentation for an non-mapping format," 

1619 " but the source format does not have a non-mapping format." 

1620 ) 

1621 if changes: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true

1622 if inline_reference_documentation is None: 

1623 inline_reference_documentation = reference_documentation() 

1624 return inline_reference_documentation.replace(**changes) 

1625 return inline_reference_documentation 

1626 

1627 

1628def _check_conflicts( 

1629 input_content_attributes: dict[str, AttributeDescription], 

1630 required_attributes: frozenset[str], 

1631 all_attributes: frozenset[str], 

1632) -> None: 

1633 for attr_name, attr in input_content_attributes.items(): 

1634 if attr_name in required_attributes and attr.conflicting_attributes: 1634 ↛ 1635line 1634 didn't jump to line 1635 because the condition on line 1634 was never true

1635 c = ", ".join(repr(a) for a in attr.conflicting_attributes) 

1636 raise ValueError( 

1637 f'The attribute "{attr_name}" is required and conflicts with the attributes: {c}.' 

1638 " This makes it impossible to use these attributes. Either remove the attributes" 

1639 f' (along with the conflicts for them), adjust the conflicts or make "{attr_name}"' 

1640 " optional (NotRequired)" 

1641 ) 

1642 else: 

1643 required_conflicts = attr.conflicting_attributes & required_attributes 

1644 if required_conflicts: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true

1645 c = ", ".join(repr(a) for a in required_conflicts) 

1646 raise ValueError( 

1647 f'The attribute "{attr_name}" conflicts with the following *required* attributes: {c}.' 

1648 f' This makes it impossible to use the "{attr_name}" attribute. Either remove it,' 

1649 f" adjust the conflicts or make the listed attributes optional (NotRequired)" 

1650 ) 

1651 unknown_attributes = attr.conflicting_attributes - all_attributes 

1652 if unknown_attributes: 1652 ↛ 1653line 1652 didn't jump to line 1653 because the condition on line 1652 was never true

1653 c = ", ".join(repr(a) for a in unknown_attributes) 

1654 raise ValueError( 

1655 f'The attribute "{attr_name}" declares a conflict with the following unknown attributes: {c}.' 

1656 f" None of these attributes were declared in the input." 

1657 ) 

1658 

1659 

1660def _check_attributes( 

1661 content: type[TypedDict], 

1662 input_content: type[TypedDict], 

1663 input_content_attributes: dict[str, AttributeDescription], 

1664 sources: Mapping[str, Collection[str]], 

1665) -> None: 

1666 target_required_keys = content.__required_keys__ 

1667 input_required_keys = input_content.__required_keys__ 

1668 all_input_keys = input_required_keys | input_content.__optional_keys__ 

1669 

1670 for input_name in all_input_keys: 

1671 attr = input_content_attributes[input_name] 

1672 target_name = attr.target_attribute 

1673 source_names = sources[target_name] 

1674 input_is_required = input_name in input_required_keys 

1675 target_is_required = target_name in target_required_keys 

1676 

1677 assert source_names 

1678 

1679 if input_is_required and len(source_names) > 1: 1679 ↛ 1680line 1679 didn't jump to line 1680 because the condition on line 1679 was never true

1680 raise ValueError( 

1681 f'The source attribute "{input_name}" is required, but it maps to "{target_name}",' 

1682 f' which has multiple sources "{source_names}". If "{input_name}" should be required,' 

1683 f' then there is no need for additional sources for "{target_name}". Alternatively,' 

1684 f' "{input_name}" might be missing a NotRequired type' 

1685 f' (example: "{input_name}: NotRequired[<OriginalTypeHere>]")' 

1686 ) 

1687 if not input_is_required and target_is_required and len(source_names) == 1: 1687 ↛ 1688line 1687 didn't jump to line 1688 because the condition on line 1687 was never true

1688 raise ValueError( 

1689 f'The source attribute "{input_name}" is not marked as required and maps to' 

1690 f' "{target_name}", which is marked as required. As there are no other attributes' 

1691 f' mapping to "{target_name}", then "{input_name}" must be required as well' 

1692 f' ("{input_name}: Required[<Type>]"). Alternatively, "{target_name}" should be optional' 

1693 f' ("{target_name}: NotRequired[<Type>]") or an "MappingHint.aliasOf" might be missing.' 

1694 ) 

1695 

1696 

1697def _validation_type_error(path: AttributePath, message: str) -> None: 

1698 raise ManifestParseException( 

1699 f'The attribute "{path.path}" did not have a valid structure/type: {message}' 

1700 ) 

1701 

1702 

1703def _is_two_arg_x_list_x(t_args: tuple[Any, ...]) -> bool: 

1704 if len(t_args) != 2: 

1705 return False 

1706 lhs, rhs = t_args 

1707 if get_origin(lhs) == list: 

1708 if get_origin(rhs) == list: 1708 ↛ 1711line 1708 didn't jump to line 1711 because the condition on line 1708 was never true

1709 # It could still match X, List[X] - but we do not allow this case for now as the caller 

1710 # does not support it. 

1711 return False 

1712 l_args = get_args(lhs) 

1713 return bool(l_args and l_args[0] == rhs) 

1714 if get_origin(rhs) == list: 

1715 r_args = get_args(rhs) 

1716 return bool(r_args and r_args[0] == lhs) 

1717 return False 

1718 

1719 

1720def _extract_typed_dict( 

1721 base_type, 

1722 default_target_attribute: str | None, 

1723) -> tuple[type[TypedDict] | None, Any]: 

1724 if is_typeddict(base_type): 

1725 return base_type, None 

1726 _, origin, args = unpack_type(base_type, False) 

1727 if origin != Union: 

1728 if isinstance(base_type, type) and issubclass(base_type, (dict, Mapping)): 1728 ↛ 1729line 1728 didn't jump to line 1729 because the condition on line 1728 was never true

1729 raise ValueError( 

1730 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1731 ) 

1732 return None, base_type 

1733 typed_dicts = [x for x in args if is_typeddict(x)] 

1734 if len(typed_dicts) > 1: 1734 ↛ 1735line 1734 didn't jump to line 1735 because the condition on line 1734 was never true

1735 raise ValueError( 

1736 "When source_format is a Union, it must contain at most one TypedDict" 

1737 ) 

1738 typed_dict = typed_dicts[0] if typed_dicts else None 

1739 

1740 if any(x is None or x is _NONE_TYPE for x in args): 1740 ↛ 1741line 1740 didn't jump to line 1741 because the condition on line 1740 was never true

1741 raise ValueError( 

1742 "The source_format cannot be nor contain Optional[X] or Union[X, None]" 

1743 ) 

1744 

1745 if any( 1745 ↛ 1750line 1745 didn't jump to line 1750 because the condition on line 1745 was never true

1746 isinstance(x, type) and issubclass(x, (dict, Mapping)) 

1747 for x in args 

1748 if x is not typed_dict 

1749 ): 

1750 raise ValueError( 

1751 "The source_format cannot be nor contain a (non-TypedDict) dict" 

1752 ) 

1753 remaining = [x for x in args if x is not typed_dict] 

1754 has_target_attribute = False 

1755 anno = None 

1756 if len(remaining) == 1: 1756 ↛ 1757line 1756 didn't jump to line 1757 because the condition on line 1756 was never true

1757 base_type, anno, _ = _parse_type( 

1758 "source_format alternative form", 

1759 remaining[0], 

1760 forbid_optional=True, 

1761 parsing_typed_dict_attribute=False, 

1762 ) 

1763 has_target_attribute = bool(anno) and any( 

1764 isinstance(x, TargetAttribute) for x in anno 

1765 ) 

1766 target_type = base_type 

1767 else: 

1768 target_type = Union[tuple(remaining)] 

1769 

1770 if default_target_attribute is None and not has_target_attribute: 1770 ↛ 1771line 1770 didn't jump to line 1771 because the condition on line 1770 was never true

1771 raise ValueError( 

1772 'The alternative format must be Union[TypedDict,Annotated[X, DebputyParseHint.target_attribute("...")]]' 

1773 " OR the parsed_content format must have exactly one attribute that is required." 

1774 ) 

1775 if anno: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true

1776 final_anno = [target_type] 

1777 final_anno.extend(anno) 

1778 return typed_dict, Annotated[tuple(final_anno)] 

1779 return typed_dict, target_type 

1780 

1781 

1782def _dispatch_parse_generator( 

1783 dispatch_type: type[DebputyDispatchableType], 

1784) -> Callable[[Any, AttributePath, Optional["ParserContextData"]], Any]: 

1785 def _dispatch_parse( 

1786 value: Any, 

1787 attribute_path: AttributePath, 

1788 parser_context: Optional["ParserContextData"], 

1789 ): 

1790 assert parser_context is not None 

1791 dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) 

1792 return dispatching_parser.parse_input( 

1793 value, attribute_path, parser_context=parser_context 

1794 ) 

1795 

1796 return _dispatch_parse 

1797 

1798 

1799def _dispatch_parser( 

1800 dispatch_type: type[DebputyDispatchableType], 

1801) -> AttributeTypeHandler: 

1802 return AttributeTypeHandler( 

1803 dispatch_type.__name__, 

1804 lambda *a: None, 

1805 mapper=_dispatch_parse_generator(dispatch_type), 

1806 ) 

1807 

1808 

1809def _parse_type( 

1810 attribute: str, 

1811 orig_td: Any, 

1812 forbid_optional: bool = True, 

1813 parsing_typed_dict_attribute: bool = True, 

1814) -> tuple[Any, tuple[Any, ...], bool]: 

1815 td, v, args = unpack_type(orig_td, parsing_typed_dict_attribute) 

1816 md: tuple[Any, ...] = tuple() 

1817 optional = False 

1818 if v is not None: 

1819 if v == Annotated: 

1820 anno = get_args(td) 

1821 md = anno[1:] 

1822 td, v, args = unpack_type(anno[0], parsing_typed_dict_attribute) 

1823 

1824 if td is _NONE_TYPE: 1824 ↛ 1825line 1824 didn't jump to line 1825 because the condition on line 1824 was never true

1825 raise ValueError( 

1826 f'The attribute "{attribute}" resolved to type "None". "Nil" / "None" fields are not allowed in the' 

1827 " debputy manifest, so this attribute does not make sense in its current form." 

1828 ) 

1829 if forbid_optional and v == Union and any(a is _NONE_TYPE for a in args): 1829 ↛ 1830line 1829 didn't jump to line 1830 because the condition on line 1829 was never true

1830 raise ValueError( 

1831 f'Detected use of Optional in "{attribute}", which is not allowed here.' 

1832 " Please use NotRequired for optional fields" 

1833 ) 

1834 

1835 return td, md, optional 

1836 

1837 

1838def _normalize_attribute_name(attribute: str) -> str: 

1839 if attribute.endswith("_"): 

1840 attribute = attribute[:-1] 

1841 return attribute.replace("_", "-") 

1842 

1843 

1844@dataclasses.dataclass 

1845class DetectedDebputyParseHint: 

1846 target_attribute: str 

1847 source_manifest_attribute: str | None 

1848 conflict_with_source_attributes: frozenset[str] 

1849 conditional_required: ConditionalRequired | None 

1850 applicable_as_path_hint: bool 

1851 

1852 @classmethod 

1853 def parse_annotations( 

1854 cls, 

1855 anno: tuple[Any, ...], 

1856 error_context: str, 

1857 default_attribute_name: str | None, 

1858 is_required: bool, 

1859 default_target_attribute: str | None = None, 

1860 allow_target_attribute_annotation: bool = False, 

1861 allow_source_attribute_annotations: bool = False, 

1862 ) -> "DetectedDebputyParseHint": 

1863 target_attr_anno = find_annotation(anno, TargetAttribute) 

1864 if target_attr_anno: 

1865 if not allow_target_attribute_annotation: 1865 ↛ 1866line 1865 didn't jump to line 1866 because the condition on line 1865 was never true

1866 raise ValueError( 

1867 f"The DebputyParseHint.target_attribute annotation is not allowed in this context.{error_context}" 

1868 ) 

1869 target_attribute = target_attr_anno.attribute 

1870 elif default_target_attribute is not None: 

1871 target_attribute = default_target_attribute 

1872 elif default_attribute_name is not None: 1872 ↛ 1875line 1872 didn't jump to line 1875 because the condition on line 1872 was always true

1873 target_attribute = default_attribute_name 

1874 else: 

1875 if default_attribute_name is None: 

1876 raise ValueError( 

1877 "allow_target_attribute_annotation must be True OR " 

1878 "default_attribute_name/default_target_attribute must be not None" 

1879 ) 

1880 raise ValueError( 

1881 f"Missing DebputyParseHint.target_attribute annotation.{error_context}" 

1882 ) 

1883 source_attribute_anno = find_annotation(anno, ManifestAttribute) 

1884 _source_attribute_allowed( 

1885 allow_source_attribute_annotations, error_context, source_attribute_anno 

1886 ) 

1887 if source_attribute_anno: 

1888 source_attribute_name = source_attribute_anno.attribute 

1889 elif default_attribute_name is not None: 

1890 source_attribute_name = _normalize_attribute_name(default_attribute_name) 

1891 else: 

1892 source_attribute_name = None 

1893 mutual_exclusive_with_anno = find_annotation(anno, ConflictWithSourceAttribute) 

1894 if mutual_exclusive_with_anno: 

1895 _source_attribute_allowed( 

1896 allow_source_attribute_annotations, 

1897 error_context, 

1898 mutual_exclusive_with_anno, 

1899 ) 

1900 conflicting_attributes = mutual_exclusive_with_anno.conflicting_attributes 

1901 else: 

1902 conflicting_attributes = frozenset() 

1903 conditional_required = find_annotation(anno, ConditionalRequired) 

1904 

1905 if conditional_required and is_required: 1905 ↛ 1906line 1905 didn't jump to line 1906 because the condition on line 1905 was never true

1906 if default_attribute_name is None: 

1907 raise ValueError( 

1908 "is_required cannot be True without default_attribute_name being not None" 

1909 ) 

1910 raise ValueError( 

1911 f'The attribute "{default_attribute_name}" is Required while also being conditionally required.' 

1912 ' Please make the attribute "NotRequired" or remove the conditional requirement.' 

1913 ) 

1914 

1915 not_path_hint_anno = find_annotation(anno, NotPathHint) 

1916 applicable_as_path_hint = not_path_hint_anno is None 

1917 

1918 return DetectedDebputyParseHint( 

1919 target_attribute=target_attribute, 

1920 source_manifest_attribute=source_attribute_name, 

1921 conflict_with_source_attributes=conflicting_attributes, 

1922 conditional_required=conditional_required, 

1923 applicable_as_path_hint=applicable_as_path_hint, 

1924 ) 

1925 

1926 

1927def _source_attribute_allowed( 

1928 source_attribute_allowed: bool, 

1929 error_context: str, 

1930 annotation: DebputyParseHint | None, 

1931) -> None: 

1932 if source_attribute_allowed or annotation is None: 1932 ↛ 1934line 1932 didn't jump to line 1934 because the condition on line 1932 was always true

1933 return 

1934 raise ValueError( 

1935 f'The annotation "{annotation}" cannot be used here. {error_context}' 

1936 )