Coverage for src/debputy/path_matcher.py: 72%

280 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import fnmatch 

2import glob 

3import itertools 

4import os 

5import re 

6from enum import Enum 

7from typing import ( 

8 Optional, 

9 TypeVar, 

10 Union, 

11 Tuple, 

12) 

13from collections.abc import Callable, Iterable, Sequence 

14 

15from debputy.intermediate_manifest import PathType 

16from debputy.plugin.api import VirtualPath 

17from debputy.substitution import Substitution, NULL_SUBSTITUTION 

18from debputy.types import VP 

19from debputy.util import _normalize_path, _error, escape_shell 

20 

21MR = TypeVar("MR") 

22_GLOB_PARTS = re.compile(r"[*?]|\[]?[^]]+]") 

23 

24 

25def _lookup_path(fs_root: VP, path: str) -> VP | None: 

26 if not path.startswith("./"): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 raise ValueError("Directory must be normalized (and not the root directory)") 

28 if fs_root.name != "." or fs_root.parent_dir is not None: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 raise ValueError("Provided fs_root must be the root directory") 

30 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

31 return fs_root.lookup(path[2:]) 

32 

33 

34def _compile_basename_glob( 

35 basename_glob: str, 

36) -> tuple[str | None, Callable[[str], bool]]: 

37 remainder = None 

38 if not glob.has_magic(basename_glob): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 return escape_shell(basename_glob), lambda x: x == basename_glob 

40 

41 if basename_glob.startswith("*"): 

42 if basename_glob.endswith("*"): 

43 remainder = basename_glob[1:-1] 

44 possible_quick_match = lambda x: remainder in x 

45 escaped_pattern = "*" + escape_shell(remainder) + "*" 

46 else: 

47 remainder = basename_glob[1:] 

48 possible_quick_match = lambda x: x.endswith(remainder) 

49 escaped_pattern = "*" + escape_shell(remainder) 

50 else: 

51 remainder = basename_glob[:-1] 

52 possible_quick_match = lambda x: x.startswith(remainder) 

53 escaped_pattern = escape_shell(remainder) + "*" 

54 

55 if not glob.has_magic(remainder): 

56 return escaped_pattern, possible_quick_match 

57 slow_pattern = re.compile(fnmatch.translate(basename_glob)) 

58 return None, lambda x: bool(slow_pattern.match(x)) 

59 

60 

61def _apply_match( 

62 fs_path: VP, 

63 match_part: Callable[[str], bool] | str, 

64) -> Iterable[VP]: 

65 if isinstance(match_part, str): 

66 m = fs_path.lookup(match_part) 

67 if m: 

68 yield m 

69 else: 

70 yield from (p for p in fs_path.iterdir if match_part(p.name)) 

71 

72 

73class MatchRuleType(Enum): 

74 EXACT_MATCH = "exact" 

75 BASENAME_GLOB = "basename-glob" 

76 DIRECT_CHILDREN_OF_DIR = "direct-children-of-dir" 

77 ANYTHING_BENEATH_DIR = "anything-beneath-dir" 

78 GENERIC_GLOB = "generic-glob" 

79 MATCH_ANYTHING = "match-anything" 

80 

81 

82class MatchRule: 

83 __slots__ = ("_rule_type",) 

84 

85 def __init__(self, rule_type: MatchRuleType) -> None: 

86 self._rule_type = rule_type 

87 

88 @property 

89 def rule_type(self) -> MatchRuleType: 

90 return self._rule_type 

91 

92 def finditer( 

93 self, 

94 fs_root: VP, 

95 *, 

96 ignore_paths: Callable[[VP], bool] | None = None, 

97 ) -> Iterable[VP]: 

98 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

99 raise NotImplementedError 

100 

101 def _full_pattern(self) -> str: 

102 raise NotImplementedError 

103 

104 @property 

105 def path_type(self) -> PathType | None: 

106 return None 

107 

108 def describe_match_short(self) -> str: 

109 return self._full_pattern() 

110 

111 def describe_match_exact(self) -> str: 

112 raise NotImplementedError 

113 

114 def shell_escape_pattern(self) -> str: 

115 raise TypeError("Pattern not suitable or not supported for shell escape") 

116 

117 @classmethod 

118 def recursive_beneath_directory( 

119 cls, 

120 directory: str, 

121 definition_source: str, 

122 path_type: PathType | None = None, 

123 substitution: Substitution = NULL_SUBSTITUTION, 

124 ) -> "MatchRule": 

125 if directory in (".", "/"): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 return MATCH_ANYTHING 

127 assert not glob.has_magic(directory) 

128 return DirectoryBasedMatch( 

129 MatchRuleType.ANYTHING_BENEATH_DIR, 

130 substitution.substitute(_normalize_path(directory), definition_source), 

131 path_type=path_type, 

132 ) 

133 

134 @classmethod 

135 def from_path_or_glob( 

136 cls, 

137 path_or_glob: str, 

138 definition_source: str, 

139 path_type: PathType | None = None, 

140 substitution: Substitution = NULL_SUBSTITUTION, 

141 ) -> "MatchRule": 

142 # TODO: Handle '{a,b,c}' patterns too 

143 # FIXME: Better error handling! 

144 normalized_no_prefix = _normalize_path(path_or_glob, with_prefix=False) 

145 if path_or_glob in ("*", "**/*", ".", "/"): 

146 assert path_type is None 

147 return MATCH_ANYTHING 

148 

149 # We do not support {a,b} at the moment. This check is not perfect, but it should catch the most obvious 

150 # unsupported usage. 

151 if ( 151 ↛ 156line 151 didn't jump to line 156 because the condition on line 151 was never true

152 "{" in path_or_glob 

153 and ("," in path_or_glob or ".." in path_or_glob) 

154 and re.search(r"[{][^},.]*(?:,|[.][.])[^},.]*[}]", path_or_glob) 

155 ): 

156 m = re.search(r"(.*)[{]([^},.]*(?:,|[.][.])[^},.]*[}])", path_or_glob) 

157 assert m is not None 

158 replacement = m.group(1) + "{{OPEN_CURLY_BRACE}}" + m.group(2) 

159 _error( 

160 f'The pattern "{path_or_glob}" (defined in {definition_source}) looks like it contains a' 

161 f' brace expansion (such as "{ a,b} " or "{ a..b} "). Brace expansions are not supported.' 

162 " If you wanted to match the literal path with a brace in it, please use a substitution to insert" 

163 f' the opening brace. As an example: "{replacement}"' 

164 ) 

165 

166 normalized_with_prefix = "./" + normalized_no_prefix 

167 # TODO: Check for escapes here "foo[?]/bar" can be written as an exact match for foo?/bar 

168 # - similar holds for "foo[?]/*" being a directory match (etc.). 

169 if not glob.has_magic(normalized_with_prefix): 

170 assert path_type is None 

171 return ExactFileSystemPath( 

172 substitution.substitute(normalized_with_prefix, definition_source) 

173 ) 

174 

175 directory = os.path.dirname(normalized_with_prefix) 

176 basename = os.path.basename(normalized_with_prefix) 

177 

178 if ("**" in directory and directory != "./**") or "**" in basename: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise ValueError( 

180 f'Cannot process pattern "{path_or_glob}" from {definition_source}: The double-star' 

181 ' glob ("**") is not supported in general. Only "**/<basename-glob>" supported.' 

182 ) 

183 

184 if basename == "*" and not glob.has_magic(directory): 

185 return DirectoryBasedMatch( 

186 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

187 substitution.substitute(directory, definition_source), 

188 path_type=path_type, 

189 ) 

190 elif directory == "./**" or not glob.has_magic(directory): 

191 basename_glob = substitution.substitute( 

192 basename, definition_source, escape_glob_characters=True 

193 ) 

194 if directory in (".", "./**"): 

195 return BasenameGlobMatch( 

196 basename_glob, 

197 path_type=path_type, 

198 recursive_match=True, 

199 ) 

200 return BasenameGlobMatch( 

201 basename_glob, 

202 only_when_in_directory=substitution.substitute( 

203 directory, definition_source 

204 ), 

205 path_type=path_type, 

206 recursive_match=False, 

207 ) 

208 

209 return GenericGlobImplementation(normalized_with_prefix, path_type=path_type) 

210 

211 

212def _match_file_type(path_type: PathType, path: VirtualPath) -> bool: 

213 if path_type == PathType.FILE and path.is_file: 

214 return True 

215 if path_type == PathType.DIRECTORY and path.is_dir: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 return True 

217 if path_type == PathType.SYMLINK and path.is_symlink: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return True 

219 assert path_type in (PathType.FILE, PathType.DIRECTORY, PathType.SYMLINK) 

220 return False 

221 

222 

223class MatchAnything(MatchRule): 

224 def __init__(self) -> None: 

225 super().__init__(MatchRuleType.MATCH_ANYTHING) 

226 

227 def _full_pattern(self) -> str: 

228 return "**/*" 

229 

230 def finditer( 

231 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None 

232 ) -> Iterable[VP]: 

233 if ignore_paths is not None: 

234 yield from (p for p in fs_root.all_paths() if not ignore_paths(p)) 

235 yield from fs_root.all_paths() 

236 

237 def describe_match_exact(self) -> str: 

238 return "**/* (Match anything)" 

239 

240 

241MATCH_ANYTHING: MatchRule = MatchAnything() 

242 

243del MatchAnything 

244 

245 

246class ExactFileSystemPath(MatchRule): 

247 __slots__ = "_path" 

248 

249 def __init__(self, path: str) -> None: 

250 super().__init__(MatchRuleType.EXACT_MATCH) 

251 self._path = path 

252 

253 def _full_pattern(self) -> str: 

254 return self._path 

255 

256 def finditer( 

257 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None 

258 ) -> Iterable[VP]: 

259 p = _lookup_path(fs_root, self._path) 

260 if p is not None and (ignore_paths is None or not ignore_paths(p)): 

261 yield p 

262 

263 def describe_match_exact(self) -> str: 

264 return f"{self._path} (the exact path / no globbing)" 

265 

266 @property 

267 def path(self) -> str: 

268 return self._path 

269 

270 def shell_escape_pattern(self) -> str: 

271 return escape_shell(self._path.lstrip(".")) 

272 

273 

274class DirectoryBasedMatch(MatchRule): 

275 __slots__ = "_directory", "_path_type" 

276 

277 def __init__( 

278 self, 

279 rule_type: MatchRuleType, 

280 directory: str, 

281 path_type: PathType | None = None, 

282 ) -> None: 

283 super().__init__(rule_type) 

284 self._directory = directory 

285 self._path_type = path_type 

286 assert rule_type in ( 

287 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

288 MatchRuleType.ANYTHING_BENEATH_DIR, 

289 ) 

290 assert not self._directory.endswith("/") 

291 

292 def _full_pattern(self) -> str: 

293 return self._directory 

294 

295 def finditer( 

296 self, 

297 fs_root: VP, 

298 *, 

299 ignore_paths: Callable[[VP], bool] | None = None, 

300 ) -> Iterable[VP]: 

301 p = _lookup_path(fs_root, self._directory) 

302 if p is None or not p.is_dir: 

303 return 

304 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 path_iter = p.all_paths() 

306 else: 

307 path_iter = p.iterdir 

308 if ignore_paths is not None: 

309 path_iter = (p for p in path_iter if not ignore_paths(p)) 

310 if self._path_type is None: 

311 yield from path_iter 

312 else: 

313 yield from (m for m in path_iter if _match_file_type(self._path_type, m)) 

314 

315 def describe_match_short(self) -> str: 

316 path_type_match = ( 

317 "" 

318 if self._path_type is None 

319 else f" <only for path type {self._path_type.manifest_key}>" 

320 ) 

321 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

322 return f"{self._directory}/**/*{path_type_match}" 

323 return f"{self._directory}/*{path_type_match}" 

324 

325 def describe_match_exact(self) -> str: 

326 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

327 return f"{self._directory}/**/* (anything below the directory)" 

328 return f"{self.describe_match_short()} (anything directly in the directory)" 

329 

330 @property 

331 def path_type(self) -> PathType | None: 

332 return self._path_type 

333 

334 @property 

335 def directory(self) -> str: 

336 return self._directory 

337 

338 def shell_escape_pattern(self) -> str: 

339 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 return super().shell_escape_pattern() 

341 return escape_shell(self._directory.lstrip(".")) + "/*" 

342 

343 

344class BasenameGlobMatch(MatchRule): 

345 __slots__ = ( 

346 "_basename_glob", 

347 "_directory", 

348 "_matcher", 

349 "_path_type", 

350 "_recursive_match", 

351 "_escaped_basename_pattern", 

352 ) 

353 

354 def __init__( 

355 self, 

356 basename_glob: str, 

357 only_when_in_directory: str | None = None, 

358 path_type: PathType | None = None, 

359 recursive_match: bool | None = None, # TODO: Can this just be = False (?) 

360 ) -> None: 

361 super().__init__(MatchRuleType.BASENAME_GLOB) 

362 self._basename_glob = basename_glob 

363 self._directory = only_when_in_directory 

364 self._path_type = path_type 

365 self._recursive_match = recursive_match 

366 if self._directory is None and not recursive_match: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 self._recursive_match = True 

368 assert self._directory is None or not self._directory.endswith("/") 

369 assert "/" not in basename_glob # Not a basename if it contains / 

370 assert "**" not in basename_glob # Also not a (true) basename if it has ** 

371 self._escaped_basename_pattern, self._matcher = _compile_basename_glob( 

372 basename_glob 

373 ) 

374 

375 def _full_pattern(self) -> str: 

376 if self._directory is not None: 

377 maybe_recursive = "**/" if self._recursive_match else "" 

378 return f"{self._directory}/{maybe_recursive}{self._basename_glob}" 

379 return self._basename_glob 

380 

381 def finditer( 

382 self, 

383 fs_root: VP, 

384 *, 

385 ignore_paths: Callable[[VP], bool] | None = None, 

386 ) -> Iterable[VP]: 

387 search_root = fs_root 

388 if self._directory is not None: 

389 p = _lookup_path(fs_root, self._directory) 

390 if p is None or not p.is_dir: 

391 return 

392 search_root = p 

393 path_iter = ( 

394 search_root.all_paths() if self._recursive_match else search_root.iterdir 

395 ) 

396 if ignore_paths is not None: 

397 path_iter = (p for p in path_iter if not ignore_paths(p)) 

398 if self._path_type is None: 

399 yield from (m for m in path_iter if self._matcher(m.name)) 

400 else: 

401 yield from ( 

402 m 

403 for m in path_iter 

404 if self._matcher(m.name) and _match_file_type(self._path_type, m) 

405 ) 

406 

407 def describe_match_short(self) -> str: 

408 path_type_match = ( 

409 "" 

410 if self._path_type is None 

411 else f" <only for path type {self._path_type.manifest_key}>" 

412 ) 

413 return ( 

414 self._full_pattern() 

415 if path_type_match == "" 

416 else f"{self._full_pattern()}{path_type_match}" 

417 ) 

418 

419 def describe_match_exact(self) -> str: 

420 if self._directory is not None: 

421 return f"{self.describe_match_short()} (glob / directly in the directory)" 

422 return f"{self.describe_match_short()} (basename match)" 

423 

424 def __eq__(self, other: object) -> bool: 

425 if not isinstance(other, BasenameGlobMatch): 

426 return NotImplemented 

427 return ( 

428 self._basename_glob == other._basename_glob 

429 and self._directory == other._directory 

430 and self._path_type == other._path_type 

431 and self._recursive_match == other._recursive_match 

432 ) 

433 

434 @property 

435 def path_type(self) -> PathType | None: 

436 return self._path_type 

437 

438 @property 

439 def directory(self) -> str | None: 

440 return self._directory 

441 

442 def shell_escape_pattern(self) -> str: 

443 if self._directory is None or self._escaped_basename_pattern is None: 

444 return super().shell_escape_pattern() 

445 return ( 

446 escape_shell(self._directory.lstrip(".")) 

447 + f"/{self._escaped_basename_pattern}" 

448 ) 

449 

450 

451class GenericGlobImplementation(MatchRule): 

452 __slots__ = "_glob_pattern", "_path_type", "_match_parts" 

453 

454 def __init__( 

455 self, 

456 glob_pattern: str, 

457 path_type: PathType | None = None, 

458 ) -> None: 

459 super().__init__(MatchRuleType.GENERIC_GLOB) 

460 if glob_pattern.startswith("./"): 460 ↛ 462line 460 didn't jump to line 462 because the condition on line 460 was always true

461 glob_pattern = glob_pattern[2:] 

462 self._glob_pattern = glob_pattern 

463 self._path_type = path_type 

464 assert "**" not in glob_pattern # No recursive globs 

465 assert glob.has_magic( 

466 glob_pattern 

467 ) # If it has no glob, then it could have been an exact match 

468 assert ( 

469 "/" in glob_pattern 

470 ) # If it does not have a / then a BasenameGlob could have been used instead 

471 self._match_parts = self._compile_glob() 

472 

473 def _full_pattern(self) -> str: 

474 return self._glob_pattern 

475 

476 def finditer( 

477 self, 

478 fs_root: VP, 

479 *, 

480 ignore_paths: Callable[[VP], bool] | None = None, 

481 ) -> Iterable[VP]: 

482 search_history = [fs_root] 

483 for part in self._match_parts: 

484 next_layer = itertools.chain.from_iterable( 

485 _apply_match(m, part) for m in search_history 

486 ) 

487 # TODO: Figure out why we need to materialize next_layer into a list for this to work. 

488 search_history = list(next_layer) 

489 if not search_history: 

490 # While we have it as a list, we might as well have an "early exit". 

491 return 

492 

493 if self._path_type is None: 

494 if ignore_paths is None: 

495 yield from search_history 

496 else: 

497 yield from (p for p in search_history if not ignore_paths(p)) 

498 elif ignore_paths is None: 

499 yield from ( 

500 m for m in search_history if _match_file_type(self._path_type, m) 

501 ) 

502 else: 

503 yield from ( 

504 m 

505 for m in search_history 

506 if _match_file_type(self._path_type, m) and not ignore_paths(m) 

507 ) 

508 

509 def describe_match_short(self) -> str: 

510 path_type_match = ( 

511 "" 

512 if self._path_type is None 

513 else f" <only for path type {self._path_type.manifest_key}>" 

514 ) 

515 return ( 

516 self._full_pattern() 

517 if path_type_match == "" 

518 else f"{self._full_pattern()}{path_type_match}" 

519 ) 

520 

521 def describe_match_exact(self) -> str: 

522 return f"{self.describe_match_short()} (glob)" 

523 

524 def _compile_glob(self) -> Sequence[Callable[[str], bool] | str]: 

525 assert self._glob_pattern.strip("/") == self._glob_pattern 

526 return [ 

527 _compile_basename_glob(part)[1] if glob.has_magic(part) else part 

528 for part in self._glob_pattern.split("/") 

529 ] 

530 

531 def __eq__(self, other: object) -> bool: 

532 if not isinstance(other, GenericGlobImplementation): 

533 return NotImplemented 

534 return ( 

535 self._glob_pattern == other._glob_pattern 

536 and self._path_type == other._path_type 

537 ) 

538 

539 @property 

540 def path_type(self) -> PathType | None: 

541 return self._path_type