Coverage for src/debputy/path_matcher.py: 72%

280 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-02-14 10:41 +0000

1import fnmatch 

2import glob 

3import itertools 

4import os 

5import re 

6from enum import Enum 

7from typing import ( 

8 Optional, 

9 TypeVar, 

10 Union, 

11 Tuple, 

12) 

13from collections.abc import Callable, Iterable, Sequence 

14 

15from debputy.intermediate_manifest import PathType 

16from debputy.plugin.api import VirtualPath 

17from debputy.substitution import Substitution, NULL_SUBSTITUTION 

18from debputy.types import VP 

19from debputy.util import _normalize_path, _error, escape_shell 

20 

21MR = TypeVar("MR") 

22_GLOB_PARTS = re.compile(r"[*?]|\[]?[^]]+]") 

23 

24 

25def _lookup_path(fs_root: VP, path: str) -> VP | None: 

26 if not path.startswith("./"): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 raise ValueError("Directory must be normalized (and not the root directory)") 

28 if not fs_root.is_root_dir(): 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 raise ValueError("Provided fs_root must be the root directory") 

30 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

31 return fs_root.lookup(path[2:]) 

32 

33 

34def _compile_basename_glob( 

35 basename_glob: str, 

36) -> tuple[str | None, Callable[[str], bool]]: 

37 remainder = None 

38 if not glob.has_magic(basename_glob): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 return escape_shell(basename_glob), lambda x: x == basename_glob 

40 

41 if basename_glob.startswith("*"): 

42 if basename_glob.endswith("*"): 

43 remainder = basename_glob[1:-1] 

44 possible_quick_match = lambda x: remainder in x 

45 escaped_pattern = "*" + escape_shell(remainder) + "*" 

46 else: 

47 remainder = basename_glob[1:] 

48 possible_quick_match = lambda x: x.endswith(remainder) 

49 escaped_pattern = "*" + escape_shell(remainder) 

50 else: 

51 remainder = basename_glob[:-1] 

52 possible_quick_match = lambda x: x.startswith(remainder) 

53 escaped_pattern = escape_shell(remainder) + "*" 

54 

55 if not glob.has_magic(remainder): 

56 return escaped_pattern, possible_quick_match 

57 slow_pattern = re.compile(fnmatch.translate(basename_glob)) 

58 return None, lambda x: bool(slow_pattern.match(x)) 

59 

60 

61def _apply_match( 

62 fs_path: VP, 

63 match_part: Callable[[str], bool] | str, 

64) -> Iterable[VP]: 

65 if isinstance(match_part, str): 

66 m = fs_path.lookup(match_part) 

67 if m: 

68 yield m 

69 else: 

70 yield from (p for p in fs_path.iterdir() if match_part(p.name)) 

71 

72 

73class MatchRuleType(Enum): 

74 EXACT_MATCH = "exact" 

75 BASENAME_GLOB = "basename-glob" 

76 DIRECT_CHILDREN_OF_DIR = "direct-children-of-dir" 

77 ANYTHING_BENEATH_DIR = "anything-beneath-dir" 

78 GENERIC_GLOB = "generic-glob" 

79 MATCH_ANYTHING = "match-anything" 

80 

81 

82class MatchRule: 

83 __slots__ = ("_rule_type",) 

84 

85 def __init__(self, rule_type: MatchRuleType) -> None: 

86 self._rule_type = rule_type 

87 

88 @property 

89 def rule_type(self) -> MatchRuleType: 

90 return self._rule_type 

91 

92 def finditer( 

93 self, 

94 fs_root: VP, 

95 *, 

96 ignore_paths: Callable[[VP], bool] | None = None, 

97 ) -> Iterable[VP]: 

98 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup) 

99 raise NotImplementedError 

100 

101 def _full_pattern(self) -> str: 

102 raise NotImplementedError 

103 

104 @property 

105 def path_type(self) -> PathType | None: 

106 return None 

107 

108 def describe_match_short(self) -> str: 

109 return self._full_pattern() 

110 

111 def describe_match_exact(self) -> str: 

112 raise NotImplementedError 

113 

114 def shell_escape_pattern(self) -> str: 

115 raise TypeError("Pattern not suitable or not supported for shell escape") 

116 

117 @classmethod 

118 def recursive_beneath_directory( 

119 cls, 

120 directory: str, 

121 definition_source: str, 

122 path_type: PathType | None = None, 

123 substitution: Substitution = NULL_SUBSTITUTION, 

124 ) -> "MatchRule": 

125 if directory in (".", "/"): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 return MATCH_ANYTHING 

127 assert not glob.has_magic(directory) 

128 return DirectoryBasedMatch( 

129 MatchRuleType.ANYTHING_BENEATH_DIR, 

130 substitution.substitute(_normalize_path(directory), definition_source), 

131 path_type=path_type, 

132 ) 

133 

134 @classmethod 

135 def from_path_or_glob( 

136 cls, 

137 path_or_glob: str, 

138 definition_source: str, 

139 path_type: PathType | None = None, 

140 substitution: Substitution = NULL_SUBSTITUTION, 

141 ) -> "MatchRule": 

142 # TODO: Handle '{a,b,c}' patterns too 

143 # FIXME: Better error handling! 

144 normalized_no_prefix = _normalize_path(path_or_glob, with_prefix=False) 

145 if path_or_glob in ("*", "**/*", ".", "/"): 

146 assert path_type is None 

147 return MATCH_ANYTHING 

148 

149 # We do not support {a,b} at the moment. This check is not perfect, but it should catch the most obvious 

150 # unsupported usage. 

151 if ( 151 ↛ 156line 151 didn't jump to line 156 because the condition on line 151 was never true

152 "{" in path_or_glob 

153 and ("," in path_or_glob or ".." in path_or_glob) 

154 and re.search(r"[{][^},.]*(?:,|[.][.])[^},.]*[}]", path_or_glob) 

155 ): 

156 m = re.search(r"(.*)[{]([^},.]*(?:,|[.][.])[^},.]*[}])", path_or_glob) 

157 assert m is not None 

158 replacement = m.group(1) + "{{OPEN_CURLY_BRACE}}" + m.group(2) 

159 _error( 

160 f'The pattern "{path_or_glob}" (defined in {definition_source}) looks like it contains a' 

161 f' brace expansion (such as "{ a,b} " or "{ a..b} "). Brace expansions are not supported.' 

162 " If you wanted to match the literal path with a brace in it, please use a substitution to insert" 

163 f' the opening brace. As an example: "{replacement}"' 

164 ) 

165 

166 normalized_with_prefix = "./" + normalized_no_prefix 

167 # TODO: Check for escapes here "foo[?]/bar" can be written as an exact match for foo?/bar 

168 # - similar holds for "foo[?]/*" being a directory match (etc.). 

169 if not glob.has_magic(normalized_with_prefix): 

170 assert path_type is None 

171 return ExactFileSystemPath( 

172 substitution.substitute(normalized_with_prefix, definition_source) 

173 ) 

174 

175 directory = os.path.dirname(normalized_with_prefix) 

176 basename = os.path.basename(normalized_with_prefix) 

177 

178 if ("**" in directory and directory != "./**") or "**" in basename: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise ValueError( 

180 f'Cannot process pattern "{path_or_glob}" from {definition_source}: The double-star' 

181 ' glob ("**") is not supported in general. Only "**/<basename-glob>" supported.' 

182 ) 

183 

184 if basename == "*" and not glob.has_magic(directory): 

185 return DirectoryBasedMatch( 

186 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

187 substitution.substitute(directory, definition_source), 

188 path_type=path_type, 

189 ) 

190 elif directory == "./**" or not glob.has_magic(directory): 

191 basename_glob = substitution.substitute( 

192 basename, definition_source, escape_glob_characters=True 

193 ) 

194 if directory in (".", "./**"): 

195 return BasenameGlobMatch( 

196 basename_glob, 

197 path_type=path_type, 

198 recursive_match=True, 

199 ) 

200 return BasenameGlobMatch( 

201 basename_glob, 

202 only_when_in_directory=substitution.substitute( 

203 directory, definition_source 

204 ), 

205 path_type=path_type, 

206 recursive_match=False, 

207 ) 

208 

209 return GenericGlobImplementation(normalized_with_prefix, path_type=path_type) 

210 

211 

212def _match_file_type(path_type: PathType, path: VirtualPath) -> bool: 

213 if path_type == PathType.FILE and path.is_file: 

214 return True 

215 if path_type == PathType.DIRECTORY and path.is_dir: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 return True 

217 if path_type == PathType.SYMLINK and path.is_symlink: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return True 

219 assert path_type in (PathType.FILE, PathType.DIRECTORY, PathType.SYMLINK) 

220 return False 

221 

222 

223class MatchAnything(MatchRule): 

224 def __init__(self) -> None: 

225 super().__init__(MatchRuleType.MATCH_ANYTHING) 

226 

227 def _full_pattern(self) -> str: 

228 return "**/*" 

229 

230 def finditer( 

231 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None 

232 ) -> Iterable[VP]: 

233 if ignore_paths is not None: 

234 yield from (p for p in fs_root.all_paths() if not ignore_paths(p)) 

235 yield from fs_root.all_paths() 

236 

237 def describe_match_exact(self) -> str: 

238 return "**/* (Match anything)" 

239 

240 

241MATCH_ANYTHING: MatchRule = MatchAnything() 

242 

243del MatchAnything 

244 

245 

246class ExactFileSystemPath(MatchRule): 

247 __slots__ = "_path" 

248 

249 def __init__(self, path: str) -> None: 

250 super().__init__(MatchRuleType.EXACT_MATCH) 

251 self._path = path 

252 

253 def _full_pattern(self) -> str: 

254 return self._path 

255 

256 def finditer( 

257 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None 

258 ) -> Iterable[VP]: 

259 p = _lookup_path(fs_root, self._path) 

260 if p is not None and (ignore_paths is None or not ignore_paths(p)): 

261 yield p 

262 

263 def describe_match_exact(self) -> str: 

264 return f"{self._path} (the exact path / no globbing)" 

265 

266 @property 

267 def path(self) -> str: 

268 return self._path 

269 

270 def shell_escape_pattern(self) -> str: 

271 return escape_shell(self._path.lstrip(".")) 

272 

273 

274class DirectoryBasedMatch(MatchRule): 

275 __slots__ = "_directory", "_path_type" 

276 

277 def __init__( 

278 self, 

279 rule_type: MatchRuleType, 

280 directory: str, 

281 path_type: PathType | None = None, 

282 ) -> None: 

283 super().__init__(rule_type) 

284 self._directory = directory 

285 self._path_type = path_type 

286 assert rule_type in ( 

287 MatchRuleType.DIRECT_CHILDREN_OF_DIR, 

288 MatchRuleType.ANYTHING_BENEATH_DIR, 

289 ) 

290 assert not self._directory.endswith("/") 

291 

292 def _full_pattern(self) -> str: 

293 return self._directory 

294 

295 def finditer( 

296 self, 

297 fs_root: VP, 

298 *, 

299 ignore_paths: Callable[[VP], bool] | None = None, 

300 ) -> Iterable[VP]: 

301 p = _lookup_path(fs_root, self._directory) 

302 if p is None or not p.is_dir: 

303 return 

304 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 path_iter = p.all_paths() 

306 else: 

307 path_iter = p.iterdir() 

308 if ignore_paths is not None: 

309 path_iter = (p for p in path_iter if not ignore_paths(p)) 

310 if self._path_type is None: 

311 yield from path_iter 

312 else: 

313 yield from (m for m in path_iter if _match_file_type(self._path_type, m)) 

314 

315 def describe_match_short(self) -> str: 

316 path_type_match = ( 

317 "" 

318 if self._path_type is None 

319 else f" <only for path type {self._path_type.manifest_key}>" 

320 ) 

321 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

322 return f"{self._directory}/**/*{path_type_match}" 

323 return f"{self._directory}/*{path_type_match}" 

324 

325 def describe_match_exact(self) -> str: 

326 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 

327 return f"{self._directory}/**/* (anything below the directory)" 

328 return f"{self.describe_match_short()} (anything directly in the directory)" 

329 

330 @property 

331 def path_type(self) -> PathType | None: 

332 return self._path_type 

333 

334 @property 

335 def directory(self) -> str: 

336 return self._directory 

337 

338 def shell_escape_pattern(self) -> str: 

339 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 return super().shell_escape_pattern() 

341 return escape_shell(self._directory.lstrip(".")) + "/*" 

342 

343 

344class BasenameGlobMatch(MatchRule): 

345 __slots__ = ( 

346 "_basename_glob", 

347 "_directory", 

348 "_matcher", 

349 "_path_type", 

350 "_recursive_match", 

351 "_escaped_basename_pattern", 

352 ) 

353 

354 def __init__( 

355 self, 

356 basename_glob: str, 

357 only_when_in_directory: str | None = None, 

358 path_type: PathType | None = None, 

359 recursive_match: bool | None = None, # TODO: Can this just be = False (?) 

360 ) -> None: 

361 super().__init__(MatchRuleType.BASENAME_GLOB) 

362 self._basename_glob = basename_glob 

363 self._directory = only_when_in_directory 

364 self._path_type = path_type 

365 self._recursive_match = recursive_match 

366 if self._directory is None and not recursive_match: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 self._recursive_match = True 

368 assert self._directory is None or not self._directory.endswith("/") 

369 assert "/" not in basename_glob # Not a basename if it contains / 

370 assert "**" not in basename_glob # Also not a (true) basename if it has ** 

371 self._escaped_basename_pattern, self._matcher = _compile_basename_glob( 

372 basename_glob 

373 ) 

374 

375 def _full_pattern(self) -> str: 

376 if self._directory is not None: 

377 maybe_recursive = "**/" if self._recursive_match else "" 

378 return f"{self._directory}/{maybe_recursive}{self._basename_glob}" 

379 return self._basename_glob 

380 

381 def finditer( 

382 self, 

383 fs_root: VP, 

384 *, 

385 ignore_paths: Callable[[VP], bool] | None = None, 

386 ) -> Iterable[VP]: 

387 search_root = fs_root 

388 if self._directory is not None: 

389 p = _lookup_path(fs_root, self._directory) 

390 if p is None or not p.is_dir: 

391 return 

392 search_root = p 

393 path_iter = ( 

394 search_root.all_paths() if self._recursive_match else search_root.iterdir() 

395 ) 

396 if ignore_paths is not None: 

397 path_iter = (p for p in path_iter if not ignore_paths(p)) 

398 if self._path_type is None: 

399 yield from (m for m in path_iter if self._matcher(m.name)) 

400 else: 

401 yield from ( 

402 m 

403 for m in path_iter 

404 if self._matcher(m.name) and _match_file_type(self._path_type, m) 

405 ) 

406 

407 def describe_match_short(self) -> str: 

408 path_type_match = ( 

409 "" 

410 if self._path_type is None 

411 else f" <only for path type {self._path_type.manifest_key}>" 

412 ) 

413 return ( 

414 self._full_pattern() 

415 if path_type_match == "" 

416 else f"{self._full_pattern()}{path_type_match}" 

417 ) 

418 

419 def describe_match_exact(self) -> str: 

420 if self._directory is not None: 

421 return f"{self.describe_match_short()} (glob / directly in the directory)" 

422 return f"{self.describe_match_short()} (basename match)" 

423 

424 def __eq__(self, other: object) -> bool: 

425 if not isinstance(other, BasenameGlobMatch): 

426 return NotImplemented 

427 return ( 

428 self._basename_glob == other._basename_glob 

429 and self._directory == other._directory 

430 and self._path_type == other._path_type 

431 and self._recursive_match == other._recursive_match 

432 ) 

433 

434 @property 

435 def path_type(self) -> PathType | None: 

436 return self._path_type 

437 

438 @property 

439 def directory(self) -> str | None: 

440 return self._directory 

441 

442 def shell_escape_pattern(self) -> str: 

443 if self._directory is None or self._escaped_basename_pattern is None: 

444 return super().shell_escape_pattern() 

445 return ( 

446 escape_shell(self._directory.lstrip(".")) 

447 + f"/{self._escaped_basename_pattern}" 

448 ) 

449 

450 

451class GenericGlobImplementation(MatchRule): 

452 __slots__ = "_glob_pattern", "_path_type", "_match_parts" 

453 

454 def __init__( 

455 self, 

456 glob_pattern: str, 

457 path_type: PathType | None = None, 

458 ) -> None: 

459 super().__init__(MatchRuleType.GENERIC_GLOB) 

460 if glob_pattern.startswith("./"): 460 ↛ 462line 460 didn't jump to line 462 because the condition on line 460 was always true

461 glob_pattern = glob_pattern[2:] 

462 self._glob_pattern = glob_pattern 

463 self._path_type = path_type 

464 assert "**" not in glob_pattern # No recursive globs 

465 assert glob.has_magic( 

466 glob_pattern 

467 ) # If it has no glob, then it could have been an exact match 

468 assert ( 

469 "/" in glob_pattern 

470 ) # If it does not have a / then a BasenameGlob could have been used instead 

471 self._match_parts = self._compile_glob() 

472 

473 def _full_pattern(self) -> str: 

474 return self._glob_pattern 

475 

476 def finditer( 

477 self, 

478 fs_root: VP, 

479 *, 

480 ignore_paths: Callable[[VP], bool] | None = None, 

481 ) -> Iterable[VP]: 

482 search_history = [fs_root] 

483 for part in self._match_parts: 

484 next_layer = itertools.chain.from_iterable( 

485 _apply_match(m, part) for m in search_history 

486 ) 

487 # TODO: Figure out why we need to materialize next_layer into a list for this to work. 

488 search_history = list(next_layer) 

489 if not search_history: 

490 # While we have it as a list, we might as well have an "early exit". 

491 return 

492 

493 if self._path_type is None: 

494 if ignore_paths is None: 

495 yield from search_history 

496 else: 

497 yield from (p for p in search_history if not ignore_paths(p)) 

498 elif ignore_paths is None: 

499 yield from ( 

500 m for m in search_history if _match_file_type(self._path_type, m) 

501 ) 

502 else: 

503 yield from ( 

504 m 

505 for m in search_history 

506 if _match_file_type(self._path_type, m) and not ignore_paths(m) 

507 ) 

508 

509 def describe_match_short(self) -> str: 

510 path_type_match = ( 

511 "" 

512 if self._path_type is None 

513 else f" <only for path type {self._path_type.manifest_key}>" 

514 ) 

515 return ( 

516 self._full_pattern() 

517 if path_type_match == "" 

518 else f"{self._full_pattern()}{path_type_match}" 

519 ) 

520 

521 def describe_match_exact(self) -> str: 

522 return f"{self.describe_match_short()} (glob)" 

523 

524 def _compile_glob(self) -> Sequence[Callable[[str], bool] | str]: 

525 assert self._glob_pattern.strip("/") == self._glob_pattern 

526 return [ 

527 _compile_basename_glob(part)[1] if glob.has_magic(part) else part 

528 for part in self._glob_pattern.split("/") 

529 ] 

530 

531 def __eq__(self, other: object) -> bool: 

532 if not isinstance(other, GenericGlobImplementation): 

533 return NotImplemented 

534 return ( 

535 self._glob_pattern == other._glob_pattern 

536 and self._path_type == other._path_type 

537 ) 

538 

539 @property 

540 def path_type(self) -> PathType | None: 

541 return self._path_type