Coverage for src/debputy/manifest_parser/util.py: 89%

234 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-14 10:41 +0000

1import dataclasses 

2import typing 

3from typing import ( 

4 Optional, 

5 get_origin, 

6 get_args, 

7 Any, 

8 TypeVar, 

9 TYPE_CHECKING, 

10 Literal, 

11) 

12from collections.abc import Iterator, Mapping, Iterable, Container 

13 

14from debputy.yaml.compat import CommentedBase 

15 

16from debputy.manifest_parser.exceptions import ManifestParseException 

17 

18if TYPE_CHECKING: 

19 from debputy.manifest_parser.parser_data import ParserContextData 

20 from debputy.manifest_parser.parse_hints import DebputyParseHint 

21 from debputy.plugin.api.spec import DebputyIntegrationMode 

22 

23 

24MP = TypeVar("MP", bound="DebputyParseHint") 

25StrOrInt = str | int 

26AttributePathAliasMapping = Mapping[ 

27 StrOrInt, tuple[StrOrInt, Optional["AttributePathAliasMapping"]] 

28] 

29LineReportKind = Literal["key", "value", "container"] 

30 

31 

32class AttributePath: 

33 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint") 

34 

35 # Summary: if parent is defined, name/key is too. 

36 

37 @typing.overload 

38 def __init__( 38 ↛ exitline 38 didn't return from function '__init__' because

39 self, 

40 parent: Optional["AttributePath"], 

41 key: str | int, 

42 *, 

43 container: Any | None = None, 

44 alias_mapping: AttributePathAliasMapping | None = None, 

45 ) -> None: ... 

46 

47 @typing.overload 

48 def __init__( 48 ↛ exitline 48 didn't return from function '__init__' because

49 self, 

50 parent: None, 

51 key: None, 

52 *, 

53 container: Any | None = None, 

54 alias_mapping: AttributePathAliasMapping | None = None, 

55 ) -> None: ... 

56 

57 def __init__( 

58 self, 

59 parent, 

60 key, 

61 *, 

62 container=None, 

63 alias_mapping=None, 

64 ) -> None: 

65 self.parent = parent 

66 self.container = container 

67 self.name = key 

68 self.path_hint: str | None = None 

69 self.alias_mapping = alias_mapping 

70 

71 @classmethod 

72 def root_path(cls, container: Any | None) -> "AttributePath": 

73 return AttributePath(None, None, container=container) 

74 

75 @classmethod 

76 def builtin_path(cls) -> "AttributePath": 

77 return AttributePath(None, "$builtin$") 

78 

79 @classmethod 

80 def test_path(cls) -> "AttributePath": 

81 return AttributePath(None, "$test$") 

82 

83 def copy_with_path_hint(self, path_hint: str) -> "AttributePath": 

84 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping) 

85 p.path_hint = path_hint 

86 return p 

87 

88 def path_segments(self) -> Iterable[str | int]: 

89 for name, _path_hint in self._iter_path(): 

90 yield name 

91 

92 def _resolve_path(self, report_kind: LineReportKind) -> str: 

93 parent = self.parent 

94 key = self.name 

95 if report_kind == "container": 

96 named_parent = parent is not None and parent.parent is not None 

97 key = parent.name if named_parent else None 

98 parent = parent.parent if named_parent else None 

99 container = parent.container if parent is not None else None 

100 

101 if isinstance(container, CommentedBase): 

102 lc = container.lc 

103 try: 

104 if isinstance(key, str): 

105 if report_kind == "key": 

106 lc_data = lc.key(key) 

107 else: 

108 lc_data = lc.value(key) 

109 else: 

110 lc_data = lc.item(key) 

111 except (AttributeError, RuntimeError, LookupError, TypeError): 

112 lc_data = None 

113 else: 

114 lc_data = None 

115 

116 parts: list[str] = [] 

117 path_hint: str | None = None 

118 

119 for k, s_path_hint in self._iter_path(): 

120 if s_path_hint is not None: 

121 path_hint = s_path_hint 

122 if isinstance(k, int): 

123 parts.append(f"[{k}]") 

124 else: 

125 if parts: 

126 parts.append(".") 

127 parts.append(k) 

128 

129 if lc_data is not None: 

130 line_pos, col = lc_data 

131 # Translate 0-based (index) to 1-based (line number) 

132 line_pos += 1 

133 parts.append(f" [Line {line_pos} column {col}]") 

134 

135 elif path_hint: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 parts.append(f" <Search for: {path_hint}>") 

137 if not parts: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 return "document root" 

139 return "".join(parts) 

140 

141 @property 

142 def path_container_lc(self) -> str: 

143 return self._resolve_path("container") 

144 

145 @property 

146 def path_key_lc(self) -> str: 

147 return self._resolve_path("key") 

148 

149 @property 

150 def path(self) -> str: 

151 return self._resolve_path("value") 

152 

153 def __str__(self) -> str: 

154 return self.path 

155 

156 def __getitem__(self, item: str | int) -> "AttributePath": 

157 alias_mapping = None 

158 if self.alias_mapping: 

159 match = self.alias_mapping.get(item) 

160 if match: 

161 item, alias_mapping = match 

162 if item == "": 

163 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`. 

164 return AttributePath( 

165 self.parent, 

166 self.name, 

167 alias_mapping=alias_mapping, 

168 container=self.container, 

169 ) 

170 container = self.container 

171 if container is not None: 

172 try: 

173 child_container = self.container[item] 

174 except (AttributeError, RuntimeError, LookupError, TypeError): 

175 child_container = None 

176 else: 

177 child_container = None 

178 return AttributePath( 

179 self, 

180 item, 

181 alias_mapping=alias_mapping, 

182 container=child_container, 

183 ) 

184 

185 def _iter_path(self) -> Iterator[tuple[str | int, str | None]]: 

186 "Parents, from the (excluded) root to (included) self." 

187 if self.name is not None: 

188 parent = self.parent 

189 if parent is not None: 

190 yield from parent._iter_path() 

191 yield self.name, self.path_hint 

192 

193 

194def check_integration_mode( 

195 path: AttributePath, 

196 parser_context: Optional["ParserContextData"] = None, 

197 expected_debputy_integration_mode: ( 

198 Container["DebputyIntegrationMode"] | None 

199 ) = None, 

200) -> None: 

201 if expected_debputy_integration_mode is None: 

202 return 

203 if parser_context is None: 

204 raise AssertionError( 

205 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context" 

206 ) 

207 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise ManifestParseException( 

209 f"The attribute {path.path} cannot be used as it is not allowed for" 

210 f" the current debputy integration mode ({parser_context.debputy_integration_mode})." 

211 f" Please remove the manifest definition or change the integration mode" 

212 ) 

213 

214 

215@dataclasses.dataclass(slots=True, frozen=True) 

216class _SymbolicModeSegment: 

217 base_mode: int 

218 base_mask: int 

219 cap_x_mode: int 

220 cap_x_mask: int 

221 

222 def apply(self, current_mode: int, is_dir: bool) -> int: 

223 if current_mode & 0o111 or is_dir: 

224 chosen_mode = self.cap_x_mode 

225 mode_mask = self.cap_x_mask 

226 else: 

227 chosen_mode = self.base_mode 

228 mode_mask = self.base_mask 

229 # set ("="): mode mask clears relevant segment and current_mode are the desired bits 

230 # add ("+"): mode mask keeps everything and current_mode are the desired bits 

231 # remove ("-"): mode mask clears relevant bits and current_mode are 0 

232 return (current_mode & mode_mask) | chosen_mode 

233 

234 

235def _symbolic_mode_bit_inverse(v: int) -> int: 

236 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits 

237 # required, which is not what we want. 

238 return ~v & 0o7777 

239 

240 

241def parse_symbolic_mode( 

242 symbolic_mode: str, 

243 attribute_path: AttributePath | None, 

244) -> Iterator[_SymbolicModeSegment]: 

245 sticky_bit = 0o01000 

246 setuid_bit = 0o04000 

247 setgid_bit = 0o02000 

248 mode_group_flag = 0o7 

249 subject_mask_and_shift = { 

250 "u": (mode_group_flag << 6, 6), 

251 "g": (mode_group_flag << 3, 3), 

252 "o": (mode_group_flag << 0, 0), 

253 } 

254 bits = { 

255 "r": (0o4, 0o4), 

256 "w": (0o2, 0o2), 

257 "x": (0o1, 0o1), 

258 "X": (0o0, 0o1), 

259 "s": (0o0, 0o0), # Special-cased below (it depends on the subject) 

260 "t": (0o0, 0o0), # Special-cased below 

261 } 

262 modifiers = { 

263 "+", 

264 "-", 

265 "=", 

266 } 

267 in_path = f" in {attribute_path.path}" if attribute_path is not None else "" 

268 for orig_part in symbolic_mode.split(","): 

269 base_mode = 0 

270 cap_x_mode = 0 

271 part = orig_part 

272 subjects = set() 

273 while part and part[0] in ("u", "g", "o", "a"): 

274 subject = part[0] 

275 if subject == "a": 

276 subjects = {"u", "g", "o"} 

277 else: 

278 subjects.add(subject) 

279 part = part[1:] 

280 if not subjects: 

281 subjects = {"u", "g", "o"} 

282 

283 if part and part[0] in modifiers: 283 ↛ 285line 283 didn't jump to line 285 because the condition on line 283 was always true

284 modifier = part[0] 

285 elif not part: 

286 raise ValueError( 

287 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")' 

288 ) 

289 else: 

290 raise ValueError( 

291 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]' 

292 f' (from "{orig_part}")' 

293 ) 

294 part = part[1:] 

295 s_bit_seen = False 

296 t_bit_seen = False 

297 while part and part[0] in bits: 

298 if part == "s": 

299 s_bit_seen = True 

300 elif part == "t": 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 t_bit_seen = True 

302 elif part in ("u", "g", "o"): 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 raise NotImplementedError( 

304 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an" 

305 " existing subject's permissions (a=u) in symbolic modes." 

306 ) 

307 else: 

308 matched_bits = bits.get(part[0]) 

309 if matched_bits is None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 valid_bits = "".join(bits) 

311 raise ValueError( 

312 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters' 

313 f' in "{valid_bits}" (from "{orig_part}")' 

314 ) 

315 base_mode_bits, cap_x_mode_bits = bits[part[0]] 

316 base_mode |= base_mode_bits 

317 cap_x_mode |= cap_x_mode_bits 

318 part = part[1:] 

319 

320 if part: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 raise ValueError( 

322 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"' 

323 ) 

324 

325 final_base_mode = 0 

326 final_cap_x_mode = 0 

327 segment_mask = 0 

328 for subject in subjects: 

329 mask, shift = subject_mask_and_shift[subject] 

330 segment_mask |= mask 

331 final_base_mode |= base_mode << shift 

332 final_cap_x_mode |= cap_x_mode << shift 

333 if modifier == "=": 

334 segment_mask |= setuid_bit if "u" in subjects else 0 

335 segment_mask |= setgid_bit if "g" in subjects else 0 

336 segment_mask |= sticky_bit if "o" in subjects else 0 

337 if s_bit_seen: 

338 if "u" in subjects: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true

339 final_base_mode |= setuid_bit 

340 final_cap_x_mode |= setuid_bit 

341 if "g" in subjects: 

342 final_base_mode |= setgid_bit 

343 final_cap_x_mode |= setgid_bit 

344 if t_bit_seen: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 final_base_mode |= sticky_bit 

346 final_cap_x_mode |= sticky_bit 

347 if modifier == "+": 

348 final_base_mask = ~0 

349 final_cap_x_mask = ~0 

350 elif modifier == "-": 

351 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode) 

352 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode) 

353 final_base_mode = 0 

354 final_cap_x_mode = 0 

355 elif modifier == "=": 

356 # FIXME: Handle "unmentioned directory's setgid/setuid bits" 

357 inverted_mask = _symbolic_mode_bit_inverse(segment_mask) 

358 final_base_mask = inverted_mask 

359 final_cap_x_mask = inverted_mask 

360 else: 

361 raise AssertionError( 

362 f"Unknown modifier in symbolic mode: {modifier} - should not have happened" 

363 ) 

364 yield _SymbolicModeSegment( 

365 base_mode=final_base_mode, 

366 base_mask=final_base_mask, 

367 cap_x_mode=final_cap_x_mode, 

368 cap_x_mask=final_cap_x_mask, 

369 ) 

370 

371 

372def unpack_type( 

373 orig_type: Any, 

374 parsing_typed_dict_attribute: bool, 

375) -> tuple[Any, Any | None, tuple[Any, ...]]: 

376 raw_type = orig_type 

377 origin = get_origin(raw_type) 

378 args = get_args(raw_type) 

379 if not parsing_typed_dict_attribute and repr(origin) in ( 379 ↛ 383line 379 didn't jump to line 383 because the condition on line 379 was never true

380 "typing.NotRequired", 

381 "typing.Required", 

382 ): 

383 raise ValueError( 

384 f"The Required/NotRequired attributes cannot be used outside typed dicts," 

385 f" the type that triggered the error: {orig_type}" 

386 ) 

387 

388 while repr(origin) in ("typing.NotRequired", "typing.Required"): 

389 if len(args) != 1: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true

390 raise ValueError( 

391 f"The type {raw_type} should have exactly one type parameter" 

392 ) 

393 raw_type = args[0] 

394 origin = get_origin(raw_type) 

395 args = get_args(raw_type) 

396 

397 assert not isinstance(raw_type, tuple) 

398 

399 return raw_type, origin, args 

400 

401 

402def find_annotation( 

403 annotations: tuple[Any, ...], 

404 anno_class: type[MP], 

405) -> MP | None: 

406 m = None 

407 for anno in annotations: 

408 if isinstance(anno, anno_class): 

409 if m is not None: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 raise ValueError( 

411 f"The annotation {anno_class.__name__} was used more than once" 

412 ) 

413 m = anno 

414 return m