Coverage for src/debputy/path_matcher.py: 72%

279 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import fnmatch 

2import glob 

3import itertools 

4import os 

5import re 

6from enum import Enum 

7from typing import ( 

8 Callable, 

9 Optional, 

10 TypeVar, 

11 Iterable, 

12 Union, 

13 Sequence, 

14 Tuple, 

15) 

16 

17from debputy.intermediate_manifest import PathType 

18from debputy.plugin.api import VirtualPath 

19from debputy.substitution import Substitution, NULL_SUBSTITUTION 

20from debputy.types import VP 

21from debputy.util import _normalize_path, _error, escape_shell 

22 

23MR = TypeVar("MR") 

24_GLOB_PARTS = re.compile(r"[*?]|\[]?[^]]+]") 

25 

26 

27def _lookup_path(fs_root: VP, path: str) -> Optional[VP]: 

28 if not path.startswith("./"): 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 raise ValueError("Directory must be normalized (and not the root directory)") 

30 if fs_root.name != "." or fs_root.parent_dir is not None: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true

31 raise ValueError("Provided fs_root must be the root directory") 

32 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

33 return fs_root.lookup(path[2:]) 

34 

35 

36def _compile_basename_glob( 

37 basename_glob: str, 

38) -> Tuple[Optional[str], Callable[[str], bool]]: 

39 remainder = None 

40 if not glob.has_magic(basename_glob): 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 return escape_shell(basename_glob), lambda x: x == basename_glob 

42 

43 if basename_glob.startswith("*"): 

44 if basename_glob.endswith("*"): 

45 remainder = basename_glob[1:-1] 

46 possible_quick_match = lambda x: remainder in x 

47 escaped_pattern = "*" + escape_shell(remainder) + "*" 

48 else: 

49 remainder = basename_glob[1:] 

50 possible_quick_match = lambda x: x.endswith(remainder) 

51 escaped_pattern = "*" + escape_shell(remainder) 

52 else: 

53 remainder = basename_glob[:-1] 

54 possible_quick_match = lambda x: x.startswith(remainder) 

55 escaped_pattern = escape_shell(remainder) + "*" 

56 

57 if not glob.has_magic(remainder): 

58 return escaped_pattern, possible_quick_match 

59 slow_pattern = re.compile(fnmatch.translate(basename_glob)) 

60 return None, lambda x: bool(slow_pattern.match(x)) 60 ↛ exitline 60 didn't run the lambda on line 60

61 

62 

63def _apply_match( 

64 fs_path: VP, 

65 match_part: Union[Callable[[str], bool], str], 

66) -> Iterable[VP]: 

67 if isinstance(match_part, str): 

68 m = fs_path.lookup(match_part) 

69 if m: 

70 yield m 

71 else: 

72 yield from (p for p in fs_path.iterdir if match_part(p.name)) 

73 

74 

75class MatchRuleType(Enum): 

76 EXACT_MATCH = "exact" 

77 BASENAME_GLOB = "basename-glob" 

78 DIRECT_CHILDREN_OF_DIR = "direct-children-of-dir" 

79 ANYTHING_BENEATH_DIR = "anything-beneath-dir" 

80 GENERIC_GLOB = "generic-glob" 

81 MATCH_ANYTHING = "match-anything" 

82 

83 

84class MatchRule: 

85 __slots__ = ("_rule_type",) 

86 

87 def __init__(self, rule_type: MatchRuleType) -> None: 

88 self._rule_type = rule_type 

89 

90 @property 

91 def rule_type(self) -> MatchRuleType: 

92 return self._rule_type 

93 

94 def finditer( 

95 self, 

96 fs_root: VP, 

97 *, 

98 ignore_paths: Optional[Callable[[VP], bool]] = None, 

99 ) -> Iterable[VP]: 

100 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

101 raise NotImplementedError 

102 

103 def _full_pattern(self) -> str: 

104 raise NotImplementedError 

105 

106 @property 

107 def path_type(self) -> Optional[PathType]: 

108 return None 

109 

110 def describe_match_short(self) -> str: 

111 return self._full_pattern() 

112 

113 def describe_match_exact(self) -> str: 

114 raise NotImplementedError 

115 

116 def shell_escape_pattern(self) -> str: 

117 raise TypeError("Pattern not suitable or not supported for shell escape") 

118 

119 @classmethod 

120 def recursive_beneath_directory( 

121 cls, 

122 directory: str, 

123 definition_source: str, 

124 path_type: Optional[PathType] = None, 

125 substitution: Substitution = NULL_SUBSTITUTION, 

126 ) -> "MatchRule": 

127 if directory in (".", "/"): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 return MATCH_ANYTHING 

129 assert not glob.has_magic(directory) 

130 return DirectoryBasedMatch( 

131 MatchRuleType.ANYTHING_BENEATH_DIR, 

132 substitution.substitute(_normalize_path(directory), definition_source), 

133 path_type=path_type, 

134 ) 

135 

136 @classmethod 

137 def from_path_or_glob( 

138 cls, 

139 path_or_glob: str, 

140 definition_source: str, 

141 path_type: Optional[PathType] = None, 

142 substitution: Substitution = NULL_SUBSTITUTION, 

143 ) -> "MatchRule": 

144 # TODO: Handle '{a,b,c}' patterns too 

145 # FIXME: Better error handling! 

146 normalized_no_prefix = _normalize_path(path_or_glob, with_prefix=False) 

147 if path_or_glob in ("*", "**/*", ".", "/"): 

148 assert path_type is None 

149 return MATCH_ANYTHING 

150 

151 # We do not support {a,b} at the moment. This check is not perfect, but it should catch the most obvious 

152 # unsupported usage. 

153 if ( 153 ↛ 158line 153 didn't jump to line 158

154 "{" in path_or_glob 

155 and ("," in path_or_glob or ".." in path_or_glob) 

156 and re.search(r"[{][^},.]*(?:,|[.][.])[^},.]*[}]", path_or_glob) 

157 ): 

158 m = re.search(r"(.*)[{]([^},.]*(?:,|[.][.])[^},.]*[}])", path_or_glob) 

159 assert m is not None 

160 replacement = m.group(1) + "{{OPEN_CURLY_BRACE}}" + m.group(2) 

161 _error( 

162 f'The pattern "{path_or_glob}" (defined in {definition_source}) looks like it contains a' 

163 f' brace expansion (such as "{ a,b} " or "{ a..b} "). Brace expansions are not supported.' 

164 " If you wanted to match the literal path with a brace in it, please use a substitution to insert" 

165 f' the opening brace. As an example: "{replacement}"' 

166 ) 

167 

168 normalized_with_prefix = "./" + normalized_no_prefix 

169 # TODO: Check for escapes here "foo[?]/bar" can be written as an exact match for foo?/bar 

170 # - similar holds for "foo[?]/*" being a directory match (etc.). 

171 if not glob.has_magic(normalized_with_prefix): 

172 assert path_type is None 

173 return ExactFileSystemPath( 

174 substitution.substitute(normalized_with_prefix, definition_source) 

175 ) 

176 

177 directory = os.path.dirname(normalized_with_prefix) 

178 basename = os.path.basename(normalized_with_prefix) 

179 

180 if ("**" in directory and directory != "./**") or "**" in basename: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 raise ValueError( 

182 f'Cannot process pattern "{path_or_glob}" from {definition_source}: The double-star' 

183 ' glob ("**") is not supported in general. Only "**/<basename-glob>" supported.' 

184 ) 

185 

186 if basename == "*" and not glob.has_magic(directory): 

187 return DirectoryBasedMatch( 

188 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

189 substitution.substitute(directory, definition_source), 

190 path_type=path_type, 

191 ) 

192 elif directory == "./**" or not glob.has_magic(directory): 

193 basename_glob = substitution.substitute( 

194 basename, definition_source, escape_glob_characters=True 

195 ) 

196 if directory in (".", "./**"): 

197 return BasenameGlobMatch( 

198 basename_glob, 

199 path_type=path_type, 

200 recursive_match=True, 

201 ) 

202 return BasenameGlobMatch( 

203 basename_glob, 

204 only_when_in_directory=substitution.substitute( 

205 directory, definition_source 

206 ), 

207 path_type=path_type, 

208 recursive_match=False, 

209 ) 

210 

211 return GenericGlobImplementation(normalized_with_prefix, path_type=path_type) 

212 

213 

214def _match_file_type(path_type: PathType, path: VirtualPath) -> bool: 

215 if path_type == PathType.FILE and path.is_file: 

216 return True 

217 if path_type == PathType.DIRECTORY and path.is_dir: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return True 

219 if path_type == PathType.SYMLINK and path.is_symlink: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 return True 

221 assert path_type in (PathType.FILE, PathType.DIRECTORY, PathType.SYMLINK) 

222 return False 

223 

224 

225class MatchAnything(MatchRule): 

226 def __init__(self) -> None: 

227 super().__init__(MatchRuleType.MATCH_ANYTHING) 

228 

229 def _full_pattern(self) -> str: 

230 return "**/*" 

231 

232 def finditer( 

233 self, fs_root: VP, *, ignore_paths: Optional[Callable[[VP], bool]] = None 

234 ) -> Iterable[VP]: 

235 if ignore_paths is not None: 

236 yield from (p for p in fs_root.all_paths() if not ignore_paths(p)) 

237 yield from fs_root.all_paths() 

238 

239 def describe_match_exact(self) -> str: 

240 return "**/* (Match anything)" 

241 

242 

243MATCH_ANYTHING: MatchRule = MatchAnything() 

244 

245del MatchAnything 

246 

247 

248class ExactFileSystemPath(MatchRule): 

249 __slots__ = "_path" 

250 

251 def __init__(self, path: str) -> None: 

252 super().__init__(MatchRuleType.EXACT_MATCH) 

253 self._path = path 

254 

255 def _full_pattern(self) -> str: 

256 return self._path 

257 

258 def finditer( 

259 self, fs_root: VP, *, ignore_paths: Optional[Callable[[VP], bool]] = None 

260 ) -> Iterable[VP]: 

261 p = _lookup_path(fs_root, self._path) 

262 if p is not None and (ignore_paths is None or not ignore_paths(p)): 

263 yield p 

264 

265 def describe_match_exact(self) -> str: 

266 return f"{self._path} (the exact path / no globbing)" 

267 

268 @property 

269 def path(self) -> str: 

270 return self._path 

271 

272 def shell_escape_pattern(self) -> str: 

273 return escape_shell(self._path.lstrip(".")) 

274 

275 

276class DirectoryBasedMatch(MatchRule): 

277 __slots__ = "_directory", "_path_type" 

278 

279 def __init__( 

280 self, 

281 rule_type: MatchRuleType, 

282 directory: str, 

283 path_type: Optional[PathType] = None, 

284 ) -> None: 

285 super().__init__(rule_type) 

286 self._directory = directory 

287 self._path_type = path_type 

288 assert rule_type in ( 

289 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

290 MatchRuleType.ANYTHING_BENEATH_DIR, 

291 ) 

292 assert not self._directory.endswith("/") 

293 

294 def _full_pattern(self) -> str: 

295 return self._directory 

296 

297 def finditer( 

298 self, 

299 fs_root: VP, 

300 *, 

301 ignore_paths: Optional[Callable[[VP], bool]] = None, 

302 ) -> Iterable[VP]: 

303 p = _lookup_path(fs_root, self._directory) 

304 if p is None or not p.is_dir: 

305 return 

306 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 path_iter = p.all_paths() 

308 else: 

309 path_iter = p.iterdir 

310 if ignore_paths is not None: 

311 path_iter = (p for p in path_iter if not ignore_paths(p)) 

312 if self._path_type is None: 

313 yield from path_iter 

314 else: 

315 yield from (m for m in path_iter if _match_file_type(self._path_type, m)) 

316 

317 def describe_match_short(self) -> str: 

318 path_type_match = ( 

319 "" 

320 if self._path_type is None 

321 else f" <only for path type {self._path_type.manifest_key}>" 

322 ) 

323 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

324 return f"{self._directory}/**/*{path_type_match}" 

325 return f"{self._directory}/*{path_type_match}" 

326 

327 def describe_match_exact(self) -> str: 

328 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

329 return f"{self._directory}/**/* (anything below the directory)" 

330 return f"{self.describe_match_short()} (anything directly in the directory)" 

331 

332 @property 

333 def path_type(self) -> Optional[PathType]: 

334 return self._path_type 

335 

336 @property 

337 def directory(self) -> str: 

338 return self._directory 

339 

340 def shell_escape_pattern(self) -> str: 

341 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 return super().shell_escape_pattern() 

343 return escape_shell(self._directory.lstrip(".")) + "/*" 

344 

345 

346class BasenameGlobMatch(MatchRule): 

347 __slots__ = ( 

348 "_basename_glob", 

349 "_directory", 

350 "_matcher", 

351 "_path_type", 

352 "_recursive_match", 

353 "_escaped_basename_pattern", 

354 ) 

355 

356 def __init__( 

357 self, 

358 basename_glob: str, 

359 only_when_in_directory: Optional[str] = None, 

360 path_type: Optional[PathType] = None, 

361 recursive_match: Optional[bool] = None, # TODO: Can this just be = False (?) 

362 ) -> None: 

363 super().__init__(MatchRuleType.BASENAME_GLOB) 

364 self._basename_glob = basename_glob 

365 self._directory = only_when_in_directory 

366 self._path_type = path_type 

367 self._recursive_match = recursive_match 

368 if self._directory is None and not recursive_match: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 self._recursive_match = True 

370 assert self._directory is None or not self._directory.endswith("/") 

371 assert "/" not in basename_glob # Not a basename if it contains / 

372 assert "**" not in basename_glob # Also not a (true) basename if it has ** 

373 self._escaped_basename_pattern, self._matcher = _compile_basename_glob( 

374 basename_glob 

375 ) 

376 

377 def _full_pattern(self) -> str: 

378 if self._directory is not None: 

379 maybe_recursive = "**/" if self._recursive_match else "" 

380 return f"{self._directory}/{maybe_recursive}{self._basename_glob}" 

381 return self._basename_glob 

382 

383 def finditer( 

384 self, 

385 fs_root: VP, 

386 *, 

387 ignore_paths: Optional[Callable[[VP], bool]] = None, 

388 ) -> Iterable[VP]: 

389 search_root = fs_root 

390 if self._directory is not None: 

391 p = _lookup_path(fs_root, self._directory) 

392 if p is None or not p.is_dir: 

393 return 

394 search_root = p 

395 path_iter = ( 

396 search_root.all_paths() if self._recursive_match else search_root.iterdir 

397 ) 

398 if ignore_paths is not None: 

399 path_iter = (p for p in path_iter if not ignore_paths(p)) 

400 if self._path_type is None: 

401 yield from (m for m in path_iter if self._matcher(m.name)) 

402 else: 

403 yield from ( 

404 m 

405 for m in path_iter 

406 if self._matcher(m.name) and _match_file_type(self._path_type, m) 

407 ) 

408 

409 def describe_match_short(self) -> str: 

410 path_type_match = ( 

411 "" 

412 if self._path_type is None 

413 else f" <only for path type {self._path_type.manifest_key}>" 

414 ) 

415 return ( 

416 self._full_pattern() 

417 if path_type_match == "" 

418 else f"{self._full_pattern()}{path_type_match}" 

419 ) 

420 

421 def describe_match_exact(self) -> str: 

422 if self._directory is not None: 

423 return f"{self.describe_match_short()} (glob / directly in the directory)" 

424 return f"{self.describe_match_short()} (basename match)" 

425 

426 def __eq__(self, other: object) -> bool: 

427 if not isinstance(other, BasenameGlobMatch): 

428 return NotImplemented 

429 return ( 

430 self._basename_glob == other._basename_glob 

431 and self._directory == other._directory 

432 and self._path_type == other._path_type 

433 and self._recursive_match == other._recursive_match 

434 ) 

435 

436 @property 

437 def path_type(self) -> Optional[PathType]: 

438 return self._path_type 

439 

440 @property 

441 def directory(self) -> Optional[str]: 

442 return self._directory 

443 

444 def shell_escape_pattern(self) -> str: 

445 if self._directory is None or self._escaped_basename_pattern is None: 

446 return super().shell_escape_pattern() 

447 return ( 

448 escape_shell(self._directory.lstrip(".")) 

449 + f"/{self._escaped_basename_pattern}" 

450 ) 

451 

452 

453class GenericGlobImplementation(MatchRule): 

454 __slots__ = "_glob_pattern", "_path_type", "_match_parts" 

455 

456 def __init__( 

457 self, 

458 glob_pattern: str, 

459 path_type: Optional[PathType] = None, 

460 ) -> None: 

461 super().__init__(MatchRuleType.GENERIC_GLOB) 

462 if glob_pattern.startswith("./"): 462 ↛ 464line 462 didn't jump to line 464 because the condition on line 462 was always true

463 glob_pattern = glob_pattern[2:] 

464 self._glob_pattern = glob_pattern 

465 self._path_type = path_type 

466 assert "**" not in glob_pattern # No recursive globs 

467 assert glob.has_magic( 

468 glob_pattern 

469 ) # If it has no glob, then it could have been an exact match 

470 assert ( 

471 "/" in glob_pattern 

472 ) # If it does not have a / then a BasenameGlob could have been used instead 

473 self._match_parts = self._compile_glob() 

474 

475 def _full_pattern(self) -> str: 

476 return self._glob_pattern 

477 

478 def finditer( 

479 self, 

480 fs_root: VP, 

481 *, 

482 ignore_paths: Optional[Callable[[VP], bool]] = None, 

483 ) -> Iterable[VP]: 

484 search_history = [fs_root] 

485 for part in self._match_parts: 

486 next_layer = itertools.chain.from_iterable( 

487 _apply_match(m, part) for m in search_history 

488 ) 

489 # TODO: Figure out why we need to materialize next_layer into a list for this to work. 

490 search_history = list(next_layer) 

491 if not search_history: 

492 # While we have it as a list, we might as well have an "early exit". 

493 return 

494 

495 if self._path_type is None: 

496 if ignore_paths is None: 

497 yield from search_history 

498 else: 

499 yield from (p for p in search_history if not ignore_paths(p)) 

500 elif ignore_paths is None: 

501 yield from ( 

502 m for m in search_history if _match_file_type(self._path_type, m) 

503 ) 

504 else: 

505 yield from ( 

506 m 

507 for m in search_history 

508 if _match_file_type(self._path_type, m) and not ignore_paths(m) 

509 ) 

510 

511 def describe_match_short(self) -> str: 

512 path_type_match = ( 

513 "" 

514 if self._path_type is None 

515 else f" <only for path type {self._path_type.manifest_key}>" 

516 ) 

517 return ( 

518 self._full_pattern() 

519 if path_type_match == "" 

520 else f"{self._full_pattern()}{path_type_match}" 

521 ) 

522 

523 def describe_match_exact(self) -> str: 

524 return f"{self.describe_match_short()} (glob)" 

525 

526 def _compile_glob(self) -> Sequence[Union[Callable[[str], bool], str]]: 

527 assert self._glob_pattern.strip("/") == self._glob_pattern 

528 return [ 

529 _compile_basename_glob(part)[1] if glob.has_magic(part) else part 

530 for part in self._glob_pattern.split("/") 

531 ] 

532 

533 def __eq__(self, other: object) -> bool: 

534 if not isinstance(other, GenericGlobImplementation): 

535 return NotImplemented 

536 return ( 

537 self._glob_pattern == other._glob_pattern 

538 and self._path_type == other._path_type 

539 ) 

540 

541 @property 

542 def path_type(self) -> Optional[PathType]: 

543 return self._path_type