Coverage for src/debputy/manifest_parser/util.py: 89%

247 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2from typing import ( 

3 Union, 

4 Optional, 

5 List, 

6 Tuple, 

7 get_origin, 

8 get_args, 

9 Any, 

10 Type, 

11 TypeVar, 

12 TYPE_CHECKING, 

13 Literal, 

14 FrozenSet, 

15 cast, 

16) 

17from collections.abc import Iterator, Mapping, Iterable, Container 

18 

19from debputy.yaml.compat import CommentedBase 

20 

21from debputy.manifest_parser.exceptions import ManifestParseException 

22 

23if TYPE_CHECKING: 

24 from debputy.manifest_parser.parser_data import ParserContextData 

25 from debputy.plugin.api.spec import DebputyIntegrationMode, PackageTypeSelector 

26 

27 

28MP = TypeVar("MP", bound="DebputyParseHint") 

29StrOrInt = Union[str, int] 

30AttributePathAliasMapping = Mapping[ 

31 StrOrInt, tuple[StrOrInt, Optional["AttributePathAliasMapping"]] 

32] 

33LineReportKind = Literal["key", "value", "container"] 

34 

35 

36_PACKAGE_TYPE_DEB_ONLY = frozenset(["deb"]) 

37_ALL_PACKAGE_TYPES = frozenset(["deb", "udeb"]) 

38 

39 

40def resolve_package_type_selectors( 

41 package_type: "PackageTypeSelector", 

42) -> frozenset[str]: 

43 if package_type is _ALL_PACKAGE_TYPES or package_type is _PACKAGE_TYPE_DEB_ONLY: 

44 return cast("FrozenSet[str]", package_type) 

45 if isinstance(package_type, str): 

46 return ( 

47 _PACKAGE_TYPE_DEB_ONLY 

48 if package_type == "deb" 

49 else frozenset([package_type]) 

50 ) 

51 else: 

52 return frozenset(package_type) 

53 

54 

55class AttributePath: 

56 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint") 

57 

58 def __init__( 

59 self, 

60 parent: Optional["AttributePath"], 

61 key: str | int | None, 

62 *, 

63 container: Any | None = None, 

64 alias_mapping: AttributePathAliasMapping | None = None, 

65 ) -> None: 

66 self.parent = parent 

67 self.container = container 

68 self.name = key 

69 self.path_hint: str | None = None 

70 self.alias_mapping = alias_mapping 

71 

72 @classmethod 

73 def root_path(cls, container: Any | None) -> "AttributePath": 

74 return AttributePath(None, None, container=container) 

75 

76 @classmethod 

77 def builtin_path(cls) -> "AttributePath": 

78 return AttributePath(None, "$builtin$") 

79 

80 @classmethod 

81 def test_path(cls) -> "AttributePath": 

82 return AttributePath(None, "$test$") 

83 

84 def __bool__(self) -> bool: 

85 return self.name is not None or self.parent is not None 

86 

87 def copy_with_path_hint(self, path_hint: str) -> "AttributePath": 

88 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping) 

89 p.path_hint = path_hint 

90 return p 

91 

92 def path_segments(self) -> Iterable[str | int]: 

93 segments = list(self._iter_path()) 

94 segments.reverse() 

95 yield from (s.name for s in segments) 

96 

97 def _resolve_path(self, report_kind: LineReportKind) -> str: 

98 parent = self.parent 

99 key = self.name 

100 if report_kind == "container": 

101 key = parent.name if parent else None 

102 parent = parent.parent if parent else None 

103 container = parent.container if parent is not None else None 

104 

105 if isinstance(container, CommentedBase): 

106 lc = container.lc 

107 try: 

108 if isinstance(key, str): 

109 if report_kind == "key": 

110 lc_data = lc.key(key) 

111 else: 

112 lc_data = lc.value(key) 

113 else: 

114 lc_data = lc.item(key) 

115 except (AttributeError, RuntimeError, LookupError, TypeError): 

116 lc_data = None 

117 else: 

118 lc_data = None 

119 

120 segments = list(self._iter_path()) 

121 segments.reverse() 

122 parts: list[str] = [] 

123 path_hint = None 

124 

125 for s in segments: 

126 k = s.name 

127 s_path_hint = s.path_hint 

128 if s_path_hint is not None: 

129 path_hint = s_path_hint 

130 if isinstance(k, int): 

131 parts.append(f"[{k}]") 

132 elif k is not None: 132 ↛ 125line 132 didn't jump to line 125 because the condition on line 132 was always true

133 if parts: 

134 parts.append(".") 

135 parts.append(k) 

136 

137 if lc_data is not None: 

138 line_pos, col = lc_data 

139 # Translate 0-based (index) to 1-based (line number) 

140 line_pos += 1 

141 parts.append(f" [Line {line_pos} column {col}]") 

142 

143 elif path_hint: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 parts.append(f" <Search for: {path_hint}>") 

145 if not parts: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return "document root" 

147 return "".join(parts) 

148 

149 @property 

150 def path_container_lc(self) -> str: 

151 return self._resolve_path("container") 

152 

153 @property 

154 def path_key_lc(self) -> str: 

155 return self._resolve_path("key") 

156 

157 @property 

158 def path(self) -> str: 

159 return self._resolve_path("value") 

160 

161 def __str__(self) -> str: 

162 return self.path 

163 

164 def __getitem__(self, item: str | int) -> "AttributePath": 

165 alias_mapping = None 

166 if self.alias_mapping: 

167 match = self.alias_mapping.get(item) 

168 if match: 

169 item, alias_mapping = match 

170 if item == "": 

171 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`. 

172 return AttributePath( 

173 self.parent, 

174 self.name, 

175 alias_mapping=alias_mapping, 

176 container=self.container, 

177 ) 

178 container = self.container 

179 if container is not None: 

180 try: 

181 child_container = self.container[item] 

182 except (AttributeError, RuntimeError, LookupError, TypeError): 

183 child_container = None 

184 else: 

185 child_container = None 

186 return AttributePath( 

187 self, 

188 item, 

189 alias_mapping=alias_mapping, 

190 container=child_container, 

191 ) 

192 

193 def _iter_path(self) -> Iterator["AttributePath"]: 

194 current = self 

195 yield current 

196 while True: 

197 parent = current.parent 

198 if not parent: 

199 break 

200 current = parent 

201 yield current 

202 

203 

204def check_integration_mode( 

205 path: AttributePath, 

206 parser_context: Optional["ParserContextData"] = None, 

207 expected_debputy_integration_mode: ( 

208 Container["DebputyIntegrationMode"] | None 

209 ) = None, 

210) -> None: 

211 if expected_debputy_integration_mode is None: 

212 return 

213 if parser_context is None: 

214 raise AssertionError( 

215 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context" 

216 ) 

217 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 raise ManifestParseException( 

219 f"The attribute {path.path} cannot be used as it is not allowed for" 

220 f" the current debputy integration mode ({parser_context.debputy_integration_mode})." 

221 f" Please remove the manifest definition or change the integration mode" 

222 ) 

223 

224 

225@dataclasses.dataclass(slots=True, frozen=True) 

226class _SymbolicModeSegment: 

227 base_mode: int 

228 base_mask: int 

229 cap_x_mode: int 

230 cap_x_mask: int 

231 

232 def apply(self, current_mode: int, is_dir: bool) -> int: 

233 if current_mode & 0o111 or is_dir: 

234 chosen_mode = self.cap_x_mode 

235 mode_mask = self.cap_x_mask 

236 else: 

237 chosen_mode = self.base_mode 

238 mode_mask = self.base_mask 

239 # set ("="): mode mask clears relevant segment and current_mode are the desired bits 

240 # add ("+"): mode mask keeps everything and current_mode are the desired bits 

241 # remove ("-"): mode mask clears relevant bits and current_mode are 0 

242 return (current_mode & mode_mask) | chosen_mode 

243 

244 

245def _symbolic_mode_bit_inverse(v: int) -> int: 

246 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits 

247 # required, which is not what we want. 

248 return ~v & 0o7777 

249 

250 

251def parse_symbolic_mode( 

252 symbolic_mode: str, 

253 attribute_path: AttributePath | None, 

254) -> Iterator[_SymbolicModeSegment]: 

255 sticky_bit = 0o01000 

256 setuid_bit = 0o04000 

257 setgid_bit = 0o02000 

258 mode_group_flag = 0o7 

259 subject_mask_and_shift = { 

260 "u": (mode_group_flag << 6, 6), 

261 "g": (mode_group_flag << 3, 3), 

262 "o": (mode_group_flag << 0, 0), 

263 } 

264 bits = { 

265 "r": (0o4, 0o4), 

266 "w": (0o2, 0o2), 

267 "x": (0o1, 0o1), 

268 "X": (0o0, 0o1), 

269 "s": (0o0, 0o0), # Special-cased below (it depends on the subject) 

270 "t": (0o0, 0o0), # Special-cased below 

271 } 

272 modifiers = { 

273 "+", 

274 "-", 

275 "=", 

276 } 

277 in_path = f" in {attribute_path.path}" if attribute_path is not None else "" 

278 for orig_part in symbolic_mode.split(","): 

279 base_mode = 0 

280 cap_x_mode = 0 

281 part = orig_part 

282 subjects = set() 

283 while part and part[0] in ("u", "g", "o", "a"): 

284 subject = part[0] 

285 if subject == "a": 

286 subjects = {"u", "g", "o"} 

287 else: 

288 subjects.add(subject) 

289 part = part[1:] 

290 if not subjects: 

291 subjects = {"u", "g", "o"} 

292 

293 if part and part[0] in modifiers: 293 ↛ 295line 293 didn't jump to line 295 because the condition on line 293 was always true

294 modifier = part[0] 

295 elif not part: 

296 raise ValueError( 

297 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")' 

298 ) 

299 else: 

300 raise ValueError( 

301 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]' 

302 f' (from "{orig_part}")' 

303 ) 

304 part = part[1:] 

305 s_bit_seen = False 

306 t_bit_seen = False 

307 while part and part[0] in bits: 

308 if part == "s": 

309 s_bit_seen = True 

310 elif part == "t": 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 t_bit_seen = True 

312 elif part in ("u", "g", "o"): 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true

313 raise NotImplementedError( 

314 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an" 

315 " existing subject's permissions (a=u) in symbolic modes." 

316 ) 

317 else: 

318 matched_bits = bits.get(part[0]) 

319 if matched_bits is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 valid_bits = "".join(bits) 

321 raise ValueError( 

322 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters' 

323 f' in "{valid_bits}" (from "{orig_part}")' 

324 ) 

325 base_mode_bits, cap_x_mode_bits = bits[part[0]] 

326 base_mode |= base_mode_bits 

327 cap_x_mode |= cap_x_mode_bits 

328 part = part[1:] 

329 

330 if part: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 raise ValueError( 

332 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"' 

333 ) 

334 

335 final_base_mode = 0 

336 final_cap_x_mode = 0 

337 segment_mask = 0 

338 for subject in subjects: 

339 mask, shift = subject_mask_and_shift[subject] 

340 segment_mask |= mask 

341 final_base_mode |= base_mode << shift 

342 final_cap_x_mode |= cap_x_mode << shift 

343 if modifier == "=": 

344 segment_mask |= setuid_bit if "u" in subjects else 0 

345 segment_mask |= setgid_bit if "g" in subjects else 0 

346 segment_mask |= sticky_bit if "o" in subjects else 0 

347 if s_bit_seen: 

348 if "u" in subjects: 348 ↛ 351line 348 didn't jump to line 351 because the condition on line 348 was always true

349 final_base_mode |= setuid_bit 

350 final_cap_x_mode |= setuid_bit 

351 if "g" in subjects: 

352 final_base_mode |= setgid_bit 

353 final_cap_x_mode |= setgid_bit 

354 if t_bit_seen: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 final_base_mode |= sticky_bit 

356 final_cap_x_mode |= sticky_bit 

357 if modifier == "+": 

358 final_base_mask = ~0 

359 final_cap_x_mask = ~0 

360 elif modifier == "-": 

361 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode) 

362 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode) 

363 final_base_mode = 0 

364 final_cap_x_mode = 0 

365 elif modifier == "=": 

366 # FIXME: Handle "unmentioned directory's setgid/setuid bits" 

367 inverted_mask = _symbolic_mode_bit_inverse(segment_mask) 

368 final_base_mask = inverted_mask 

369 final_cap_x_mask = inverted_mask 

370 else: 

371 raise AssertionError( 

372 f"Unknown modifier in symbolic mode: {modifier} - should not have happened" 

373 ) 

374 yield _SymbolicModeSegment( 

375 base_mode=final_base_mode, 

376 base_mask=final_base_mask, 

377 cap_x_mode=final_cap_x_mode, 

378 cap_x_mask=final_cap_x_mask, 

379 ) 

380 

381 

382def unpack_type( 

383 orig_type: Any, 

384 parsing_typed_dict_attribute: bool, 

385) -> tuple[Any, Any | None, tuple[Any, ...]]: 

386 raw_type = orig_type 

387 origin = get_origin(raw_type) 

388 args = get_args(raw_type) 

389 if not parsing_typed_dict_attribute and repr(origin) in ( 389 ↛ 393line 389 didn't jump to line 393 because the condition on line 389 was never true

390 "typing.NotRequired", 

391 "typing.Required", 

392 ): 

393 raise ValueError( 

394 f"The Required/NotRequired attributes cannot be used outside typed dicts," 

395 f" the type that triggered the error: {orig_type}" 

396 ) 

397 

398 while repr(origin) in ("typing.NotRequired", "typing.Required"): 

399 if len(args) != 1: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 raise ValueError( 

401 f"The type {raw_type} should have exactly one type parameter" 

402 ) 

403 raw_type = args[0] 

404 origin = get_origin(raw_type) 

405 args = get_args(raw_type) 

406 

407 assert not isinstance(raw_type, tuple) 

408 

409 return raw_type, origin, args 

410 

411 

412def find_annotation( 

413 annotations: tuple[Any, ...], 

414 anno_class: type[MP], 

415) -> MP | None: 

416 m = None 

417 for anno in annotations: 

418 if isinstance(anno, anno_class): 

419 if m is not None: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 raise ValueError( 

421 f"The annotation {anno_class.__name__} was used more than once" 

422 ) 

423 m = anno 

424 return m