Coverage for src/debputy/manifest_parser/util.py: 89%

246 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2from typing import ( 

3 Iterator, 

4 Union, 

5 Optional, 

6 List, 

7 Tuple, 

8 Mapping, 

9 get_origin, 

10 get_args, 

11 Any, 

12 Type, 

13 TypeVar, 

14 TYPE_CHECKING, 

15 Iterable, 

16 Container, 

17 Literal, 

18 FrozenSet, 

19 cast, 

20) 

21 

22from debputy.yaml.compat import CommentedBase 

23 

24from debputy.manifest_parser.exceptions import ManifestParseException 

25 

26if TYPE_CHECKING: 

27 from debputy.manifest_parser.parser_data import ParserContextData 

28 from debputy.plugin.api.spec import DebputyIntegrationMode, PackageTypeSelector 

29 

30 

31MP = TypeVar("MP", bound="DebputyParseHint") 

32StrOrInt = Union[str, int] 

33AttributePathAliasMapping = Mapping[ 

34 StrOrInt, Tuple[StrOrInt, Optional["AttributePathAliasMapping"]] 

35] 

36LineReportKind = Literal["key", "value", "container"] 

37 

38 

39_PACKAGE_TYPE_DEB_ONLY = frozenset(["deb"]) 

40_ALL_PACKAGE_TYPES = frozenset(["deb", "udeb"]) 

41 

42 

43def resolve_package_type_selectors( 

44 package_type: "PackageTypeSelector", 

45) -> FrozenSet[str]: 

46 if package_type is _ALL_PACKAGE_TYPES or package_type is _PACKAGE_TYPE_DEB_ONLY: 

47 return cast("FrozenSet[str]", package_type) 

48 if isinstance(package_type, str): 

49 return ( 

50 _PACKAGE_TYPE_DEB_ONLY 

51 if package_type == "deb" 

52 else frozenset([package_type]) 

53 ) 

54 else: 

55 return frozenset(package_type) 

56 

57 

58class AttributePath: 

59 __slots__ = ("parent", "container", "name", "alias_mapping", "path_hint") 

60 

61 def __init__( 

62 self, 

63 parent: Optional["AttributePath"], 

64 key: Optional[Union[str, int]], 

65 *, 

66 container: Optional[Any] = None, 

67 alias_mapping: Optional[AttributePathAliasMapping] = None, 

68 ) -> None: 

69 self.parent = parent 

70 self.container = container 

71 self.name = key 

72 self.path_hint: Optional[str] = None 

73 self.alias_mapping = alias_mapping 

74 

75 @classmethod 

76 def root_path(cls, container: Optional[Any]) -> "AttributePath": 

77 return AttributePath(None, None, container=container) 

78 

79 @classmethod 

80 def builtin_path(cls) -> "AttributePath": 

81 return AttributePath(None, "$builtin$") 

82 

83 @classmethod 

84 def test_path(cls) -> "AttributePath": 

85 return AttributePath(None, "$test$") 

86 

87 def __bool__(self) -> bool: 

88 return self.name is not None or self.parent is not None 

89 

90 def copy_with_path_hint(self, path_hint: str) -> "AttributePath": 

91 p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping) 

92 p.path_hint = path_hint 

93 return p 

94 

95 def path_segments(self) -> Iterable[Union[str, int]]: 

96 segments = list(self._iter_path()) 

97 segments.reverse() 

98 yield from (s.name for s in segments) 

99 

100 def _resolve_path(self, report_kind: LineReportKind) -> str: 

101 parent = self.parent 

102 key = self.name 

103 if report_kind == "container": 

104 key = parent.name if parent else None 

105 parent = parent.parent if parent else None 

106 container = parent.container if parent is not None else None 

107 

108 if isinstance(container, CommentedBase): 

109 lc = container.lc 

110 try: 

111 if isinstance(key, str): 

112 if report_kind == "key": 

113 lc_data = lc.key(key) 

114 else: 

115 lc_data = lc.value(key) 

116 else: 

117 lc_data = lc.item(key) 

118 except (AttributeError, RuntimeError, LookupError, TypeError): 

119 lc_data = None 

120 else: 

121 lc_data = None 

122 

123 segments = list(self._iter_path()) 

124 segments.reverse() 

125 parts: List[str] = [] 

126 path_hint = None 

127 

128 for s in segments: 

129 k = s.name 

130 s_path_hint = s.path_hint 

131 if s_path_hint is not None: 

132 path_hint = s_path_hint 

133 if isinstance(k, int): 

134 parts.append(f"[{k}]") 

135 elif k is not None: 135 ↛ 128line 135 didn't jump to line 128 because the condition on line 135 was always true

136 if parts: 

137 parts.append(".") 

138 parts.append(k) 

139 

140 if lc_data is not None: 

141 line_pos, col = lc_data 

142 # Translate 0-based (index) to 1-based (line number) 

143 line_pos += 1 

144 parts.append(f" [Line {line_pos} column {col}]") 

145 

146 elif path_hint: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 parts.append(f" <Search for: {path_hint}>") 

148 if not parts: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 return "document root" 

150 return "".join(parts) 

151 

152 @property 

153 def path_container_lc(self) -> str: 

154 return self._resolve_path("container") 

155 

156 @property 

157 def path_key_lc(self) -> str: 

158 return self._resolve_path("key") 

159 

160 @property 

161 def path(self) -> str: 

162 return self._resolve_path("value") 

163 

164 def __str__(self) -> str: 

165 return self.path 

166 

167 def __getitem__(self, item: Union[str, int]) -> "AttributePath": 

168 alias_mapping = None 

169 if self.alias_mapping: 

170 match = self.alias_mapping.get(item) 

171 if match: 

172 item, alias_mapping = match 

173 if item == "": 

174 # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`. 

175 return AttributePath( 

176 self.parent, 

177 self.name, 

178 alias_mapping=alias_mapping, 

179 container=self.container, 

180 ) 

181 container = self.container 

182 if container is not None: 

183 try: 

184 child_container = self.container[item] 

185 except (AttributeError, RuntimeError, LookupError, TypeError): 

186 child_container = None 

187 else: 

188 child_container = None 

189 return AttributePath( 

190 self, 

191 item, 

192 alias_mapping=alias_mapping, 

193 container=child_container, 

194 ) 

195 

196 def _iter_path(self) -> Iterator["AttributePath"]: 

197 current = self 

198 yield current 

199 while True: 

200 parent = current.parent 

201 if not parent: 

202 break 

203 current = parent 

204 yield current 

205 

206 

207def check_integration_mode( 

208 path: AttributePath, 

209 parser_context: Optional["ParserContextData"] = None, 

210 expected_debputy_integration_mode: Optional[ 

211 Container["DebputyIntegrationMode"] 

212 ] = None, 

213) -> None: 

214 if expected_debputy_integration_mode is None: 

215 return 

216 if parser_context is None: 

217 raise AssertionError( 

218 f"Cannot use integration mode restriction when parsing {path.path} since it is not parsed in the manifest context" 

219 ) 

220 if parser_context.debputy_integration_mode not in expected_debputy_integration_mode: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 raise ManifestParseException( 

222 f"The attribute {path.path} cannot be used as it is not allowed for" 

223 f" the current debputy integration mode ({parser_context.debputy_integration_mode})." 

224 f" Please remove the manifest definition or change the integration mode" 

225 ) 

226 

227 

228@dataclasses.dataclass(slots=True, frozen=True) 

229class _SymbolicModeSegment: 

230 base_mode: int 

231 base_mask: int 

232 cap_x_mode: int 

233 cap_x_mask: int 

234 

235 def apply(self, current_mode: int, is_dir: bool) -> int: 

236 if current_mode & 0o111 or is_dir: 

237 chosen_mode = self.cap_x_mode 

238 mode_mask = self.cap_x_mask 

239 else: 

240 chosen_mode = self.base_mode 

241 mode_mask = self.base_mask 

242 # set ("="): mode mask clears relevant segment and current_mode are the desired bits 

243 # add ("+"): mode mask keeps everything and current_mode are the desired bits 

244 # remove ("-"): mode mask clears relevant bits and current_mode are 0 

245 return (current_mode & mode_mask) | chosen_mode 

246 

247 

248def _symbolic_mode_bit_inverse(v: int) -> int: 

249 # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits 

250 # required, which is not what we want. 

251 return ~v & 0o7777 

252 

253 

254def parse_symbolic_mode( 

255 symbolic_mode: str, 

256 attribute_path: Optional[AttributePath], 

257) -> Iterator[_SymbolicModeSegment]: 

258 sticky_bit = 0o01000 

259 setuid_bit = 0o04000 

260 setgid_bit = 0o02000 

261 mode_group_flag = 0o7 

262 subject_mask_and_shift = { 

263 "u": (mode_group_flag << 6, 6), 

264 "g": (mode_group_flag << 3, 3), 

265 "o": (mode_group_flag << 0, 0), 

266 } 

267 bits = { 

268 "r": (0o4, 0o4), 

269 "w": (0o2, 0o2), 

270 "x": (0o1, 0o1), 

271 "X": (0o0, 0o1), 

272 "s": (0o0, 0o0), # Special-cased below (it depends on the subject) 

273 "t": (0o0, 0o0), # Special-cased below 

274 } 

275 modifiers = { 

276 "+", 

277 "-", 

278 "=", 

279 } 

280 in_path = f" in {attribute_path.path}" if attribute_path is not None else "" 

281 for orig_part in symbolic_mode.split(","): 

282 base_mode = 0 

283 cap_x_mode = 0 

284 part = orig_part 

285 subjects = set() 

286 while part and part[0] in ("u", "g", "o", "a"): 

287 subject = part[0] 

288 if subject == "a": 

289 subjects = {"u", "g", "o"} 

290 else: 

291 subjects.add(subject) 

292 part = part[1:] 

293 if not subjects: 

294 subjects = {"u", "g", "o"} 

295 

296 if part and part[0] in modifiers: 296 ↛ 298line 296 didn't jump to line 298 because the condition on line 296 was always true

297 modifier = part[0] 

298 elif not part: 

299 raise ValueError( 

300 f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")' 

301 ) 

302 else: 

303 raise ValueError( 

304 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]' 

305 f' (from "{orig_part}")' 

306 ) 

307 part = part[1:] 

308 s_bit_seen = False 

309 t_bit_seen = False 

310 while part and part[0] in bits: 

311 if part == "s": 

312 s_bit_seen = True 

313 elif part == "t": 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 t_bit_seen = True 

315 elif part in ("u", "g", "o"): 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 raise NotImplementedError( 

317 f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an" 

318 " existing subject's permissions (a=u) in symbolic modes." 

319 ) 

320 else: 

321 matched_bits = bits.get(part[0]) 

322 if matched_bits is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 valid_bits = "".join(bits) 

324 raise ValueError( 

325 f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters' 

326 f' in "{valid_bits}" (from "{orig_part}")' 

327 ) 

328 base_mode_bits, cap_x_mode_bits = bits[part[0]] 

329 base_mode |= base_mode_bits 

330 cap_x_mode |= cap_x_mode_bits 

331 part = part[1:] 

332 

333 if part: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise ValueError( 

335 f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"' 

336 ) 

337 

338 final_base_mode = 0 

339 final_cap_x_mode = 0 

340 segment_mask = 0 

341 for subject in subjects: 

342 mask, shift = subject_mask_and_shift[subject] 

343 segment_mask |= mask 

344 final_base_mode |= base_mode << shift 

345 final_cap_x_mode |= cap_x_mode << shift 

346 if modifier == "=": 

347 segment_mask |= setuid_bit if "u" in subjects else 0 

348 segment_mask |= setgid_bit if "g" in subjects else 0 

349 segment_mask |= sticky_bit if "o" in subjects else 0 

350 if s_bit_seen: 

351 if "u" in subjects: 351 ↛ 354line 351 didn't jump to line 354 because the condition on line 351 was always true

352 final_base_mode |= setuid_bit 

353 final_cap_x_mode |= setuid_bit 

354 if "g" in subjects: 

355 final_base_mode |= setgid_bit 

356 final_cap_x_mode |= setgid_bit 

357 if t_bit_seen: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 final_base_mode |= sticky_bit 

359 final_cap_x_mode |= sticky_bit 

360 if modifier == "+": 

361 final_base_mask = ~0 

362 final_cap_x_mask = ~0 

363 elif modifier == "-": 

364 final_base_mask = _symbolic_mode_bit_inverse(final_base_mode) 

365 final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode) 

366 final_base_mode = 0 

367 final_cap_x_mode = 0 

368 elif modifier == "=": 

369 # FIXME: Handle "unmentioned directory's setgid/setuid bits" 

370 inverted_mask = _symbolic_mode_bit_inverse(segment_mask) 

371 final_base_mask = inverted_mask 

372 final_cap_x_mask = inverted_mask 

373 else: 

374 raise AssertionError( 

375 f"Unknown modifier in symbolic mode: {modifier} - should not have happened" 

376 ) 

377 yield _SymbolicModeSegment( 

378 base_mode=final_base_mode, 

379 base_mask=final_base_mask, 

380 cap_x_mode=final_cap_x_mode, 

381 cap_x_mask=final_cap_x_mask, 

382 ) 

383 

384 

385def unpack_type( 

386 orig_type: Any, 

387 parsing_typed_dict_attribute: bool, 

388) -> Tuple[Any, Optional[Any], Tuple[Any, ...]]: 

389 raw_type = orig_type 

390 origin = get_origin(raw_type) 

391 args = get_args(raw_type) 

392 if not parsing_typed_dict_attribute and repr(origin) in ( 392 ↛ 396line 392 didn't jump to line 396 because the condition on line 392 was never true

393 "typing.NotRequired", 

394 "typing.Required", 

395 ): 

396 raise ValueError( 

397 f"The Required/NotRequired attributes cannot be used outside typed dicts," 

398 f" the type that triggered the error: {orig_type}" 

399 ) 

400 

401 while repr(origin) in ("typing.NotRequired", "typing.Required"): 

402 if len(args) != 1: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true

403 raise ValueError( 

404 f"The type {raw_type} should have exactly one type parameter" 

405 ) 

406 raw_type = args[0] 

407 origin = get_origin(raw_type) 

408 args = get_args(raw_type) 

409 

410 assert not isinstance(raw_type, tuple) 

411 

412 return raw_type, origin, args 

413 

414 

415def find_annotation( 

416 annotations: Tuple[Any, ...], 

417 anno_class: Type[MP], 

418) -> Optional[MP]: 

419 m = None 

420 for anno in annotations: 

421 if isinstance(anno, anno_class): 

422 if m is not None: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 raise ValueError( 

424 f"The annotation {anno_class.__name__} was used more than once" 

425 ) 

426 m = anno 

427 return m