Coverage for src/debputy/path_matcher.py: 72%
280 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import fnmatch
2import glob
3import itertools
4import os
5import re
6from enum import Enum
7from typing import (
8 Optional,
9 TypeVar,
10 Union,
11 Tuple,
12)
13from collections.abc import Callable, Iterable, Sequence
15from debputy.intermediate_manifest import PathType
16from debputy.plugin.api import VirtualPath
17from debputy.substitution import Substitution, NULL_SUBSTITUTION
18from debputy.types import VP
19from debputy.util import _normalize_path, _error, escape_shell
21MR = TypeVar("MR")
22_GLOB_PARTS = re.compile(r"[*?]|\[]?[^]]+]")
25def _lookup_path(fs_root: VP, path: str) -> VP | None:
26 if not path.startswith("./"): 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 raise ValueError("Directory must be normalized (and not the root directory)")
28 if fs_root.name != "." or fs_root.parent_dir is not None: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 raise ValueError("Provided fs_root must be the root directory")
30 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup)
31 return fs_root.lookup(path[2:])
34def _compile_basename_glob(
35 basename_glob: str,
36) -> tuple[str | None, Callable[[str], bool]]:
37 remainder = None
38 if not glob.has_magic(basename_glob): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 return escape_shell(basename_glob), lambda x: x == basename_glob
41 if basename_glob.startswith("*"):
42 if basename_glob.endswith("*"):
43 remainder = basename_glob[1:-1]
44 possible_quick_match = lambda x: remainder in x
45 escaped_pattern = "*" + escape_shell(remainder) + "*"
46 else:
47 remainder = basename_glob[1:]
48 possible_quick_match = lambda x: x.endswith(remainder)
49 escaped_pattern = "*" + escape_shell(remainder)
50 else:
51 remainder = basename_glob[:-1]
52 possible_quick_match = lambda x: x.startswith(remainder)
53 escaped_pattern = escape_shell(remainder) + "*"
55 if not glob.has_magic(remainder):
56 return escaped_pattern, possible_quick_match
57 slow_pattern = re.compile(fnmatch.translate(basename_glob))
58 return None, lambda x: bool(slow_pattern.match(x))
61def _apply_match(
62 fs_path: VP,
63 match_part: Callable[[str], bool] | str,
64) -> Iterable[VP]:
65 if isinstance(match_part, str):
66 m = fs_path.lookup(match_part)
67 if m:
68 yield m
69 else:
70 yield from (p for p in fs_path.iterdir if match_part(p.name))
73class MatchRuleType(Enum):
74 EXACT_MATCH = "exact"
75 BASENAME_GLOB = "basename-glob"
76 DIRECT_CHILDREN_OF_DIR = "direct-children-of-dir"
77 ANYTHING_BENEATH_DIR = "anything-beneath-dir"
78 GENERIC_GLOB = "generic-glob"
79 MATCH_ANYTHING = "match-anything"
82class MatchRule:
83 __slots__ = ("_rule_type",)
85 def __init__(self, rule_type: MatchRuleType) -> None:
86 self._rule_type = rule_type
88 @property
89 def rule_type(self) -> MatchRuleType:
90 return self._rule_type
92 def finditer(
93 self,
94 fs_root: VP,
95 *,
96 ignore_paths: Callable[[VP], bool] | None = None,
97 ) -> Iterable[VP]:
98 # TODO: Strictly speaking, this is unsound. (E.g., FSRootDir does not return FSRootDir on a lookup)
99 raise NotImplementedError
101 def _full_pattern(self) -> str:
102 raise NotImplementedError
104 @property
105 def path_type(self) -> PathType | None:
106 return None
108 def describe_match_short(self) -> str:
109 return self._full_pattern()
111 def describe_match_exact(self) -> str:
112 raise NotImplementedError
114 def shell_escape_pattern(self) -> str:
115 raise TypeError("Pattern not suitable or not supported for shell escape")
117 @classmethod
118 def recursive_beneath_directory(
119 cls,
120 directory: str,
121 definition_source: str,
122 path_type: PathType | None = None,
123 substitution: Substitution = NULL_SUBSTITUTION,
124 ) -> "MatchRule":
125 if directory in (".", "/"): 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 return MATCH_ANYTHING
127 assert not glob.has_magic(directory)
128 return DirectoryBasedMatch(
129 MatchRuleType.ANYTHING_BENEATH_DIR,
130 substitution.substitute(_normalize_path(directory), definition_source),
131 path_type=path_type,
132 )
134 @classmethod
135 def from_path_or_glob(
136 cls,
137 path_or_glob: str,
138 definition_source: str,
139 path_type: PathType | None = None,
140 substitution: Substitution = NULL_SUBSTITUTION,
141 ) -> "MatchRule":
142 # TODO: Handle '{a,b,c}' patterns too
143 # FIXME: Better error handling!
144 normalized_no_prefix = _normalize_path(path_or_glob, with_prefix=False)
145 if path_or_glob in ("*", "**/*", ".", "/"):
146 assert path_type is None
147 return MATCH_ANYTHING
149 # We do not support {a,b} at the moment. This check is not perfect, but it should catch the most obvious
150 # unsupported usage.
151 if ( 151 ↛ 156line 151 didn't jump to line 156 because the condition on line 151 was never true
152 "{" in path_or_glob
153 and ("," in path_or_glob or ".." in path_or_glob)
154 and re.search(r"[{][^},.]*(?:,|[.][.])[^},.]*[}]", path_or_glob)
155 ):
156 m = re.search(r"(.*)[{]([^},.]*(?:,|[.][.])[^},.]*[}])", path_or_glob)
157 assert m is not None
158 replacement = m.group(1) + "{{OPEN_CURLY_BRACE}}" + m.group(2)
159 _error(
160 f'The pattern "{path_or_glob}" (defined in {definition_source}) looks like it contains a'
161 f' brace expansion (such as "{ a,b} " or "{ a..b} "). Brace expansions are not supported.'
162 " If you wanted to match the literal path with a brace in it, please use a substitution to insert"
163 f' the opening brace. As an example: "{replacement}"'
164 )
166 normalized_with_prefix = "./" + normalized_no_prefix
167 # TODO: Check for escapes here "foo[?]/bar" can be written as an exact match for foo?/bar
168 # - similar holds for "foo[?]/*" being a directory match (etc.).
169 if not glob.has_magic(normalized_with_prefix):
170 assert path_type is None
171 return ExactFileSystemPath(
172 substitution.substitute(normalized_with_prefix, definition_source)
173 )
175 directory = os.path.dirname(normalized_with_prefix)
176 basename = os.path.basename(normalized_with_prefix)
178 if ("**" in directory and directory != "./**") or "**" in basename: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise ValueError(
180 f'Cannot process pattern "{path_or_glob}" from {definition_source}: The double-star'
181 ' glob ("**") is not supported in general. Only "**/<basename-glob>" supported.'
182 )
184 if basename == "*" and not glob.has_magic(directory):
185 return DirectoryBasedMatch(
186 MatchRuleType.DIRECT_CHILDREN_OF_DIR,
187 substitution.substitute(directory, definition_source),
188 path_type=path_type,
189 )
190 elif directory == "./**" or not glob.has_magic(directory):
191 basename_glob = substitution.substitute(
192 basename, definition_source, escape_glob_characters=True
193 )
194 if directory in (".", "./**"):
195 return BasenameGlobMatch(
196 basename_glob,
197 path_type=path_type,
198 recursive_match=True,
199 )
200 return BasenameGlobMatch(
201 basename_glob,
202 only_when_in_directory=substitution.substitute(
203 directory, definition_source
204 ),
205 path_type=path_type,
206 recursive_match=False,
207 )
209 return GenericGlobImplementation(normalized_with_prefix, path_type=path_type)
212def _match_file_type(path_type: PathType, path: VirtualPath) -> bool:
213 if path_type == PathType.FILE and path.is_file:
214 return True
215 if path_type == PathType.DIRECTORY and path.is_dir: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 return True
217 if path_type == PathType.SYMLINK and path.is_symlink: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 return True
219 assert path_type in (PathType.FILE, PathType.DIRECTORY, PathType.SYMLINK)
220 return False
223class MatchAnything(MatchRule):
224 def __init__(self) -> None:
225 super().__init__(MatchRuleType.MATCH_ANYTHING)
227 def _full_pattern(self) -> str:
228 return "**/*"
230 def finditer(
231 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None
232 ) -> Iterable[VP]:
233 if ignore_paths is not None:
234 yield from (p for p in fs_root.all_paths() if not ignore_paths(p))
235 yield from fs_root.all_paths()
237 def describe_match_exact(self) -> str:
238 return "**/* (Match anything)"
241MATCH_ANYTHING: MatchRule = MatchAnything()
243del MatchAnything
246class ExactFileSystemPath(MatchRule):
247 __slots__ = "_path"
249 def __init__(self, path: str) -> None:
250 super().__init__(MatchRuleType.EXACT_MATCH)
251 self._path = path
253 def _full_pattern(self) -> str:
254 return self._path
256 def finditer(
257 self, fs_root: VP, *, ignore_paths: Callable[[VP], bool] | None = None
258 ) -> Iterable[VP]:
259 p = _lookup_path(fs_root, self._path)
260 if p is not None and (ignore_paths is None or not ignore_paths(p)):
261 yield p
263 def describe_match_exact(self) -> str:
264 return f"{self._path} (the exact path / no globbing)"
266 @property
267 def path(self) -> str:
268 return self._path
270 def shell_escape_pattern(self) -> str:
271 return escape_shell(self._path.lstrip("."))
274class DirectoryBasedMatch(MatchRule):
275 __slots__ = "_directory", "_path_type"
277 def __init__(
278 self,
279 rule_type: MatchRuleType,
280 directory: str,
281 path_type: PathType | None = None,
282 ) -> None:
283 super().__init__(rule_type)
284 self._directory = directory
285 self._path_type = path_type
286 assert rule_type in (
287 MatchRuleType.DIRECT_CHILDREN_OF_DIR,
288 MatchRuleType.ANYTHING_BENEATH_DIR,
289 )
290 assert not self._directory.endswith("/")
292 def _full_pattern(self) -> str:
293 return self._directory
295 def finditer(
296 self,
297 fs_root: VP,
298 *,
299 ignore_paths: Callable[[VP], bool] | None = None,
300 ) -> Iterable[VP]:
301 p = _lookup_path(fs_root, self._directory)
302 if p is None or not p.is_dir:
303 return
304 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 path_iter = p.all_paths()
306 else:
307 path_iter = p.iterdir
308 if ignore_paths is not None:
309 path_iter = (p for p in path_iter if not ignore_paths(p))
310 if self._path_type is None:
311 yield from path_iter
312 else:
313 yield from (m for m in path_iter if _match_file_type(self._path_type, m))
315 def describe_match_short(self) -> str:
316 path_type_match = (
317 ""
318 if self._path_type is None
319 else f" <only for path type {self._path_type.manifest_key}>"
320 )
321 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR:
322 return f"{self._directory}/**/*{path_type_match}"
323 return f"{self._directory}/*{path_type_match}"
325 def describe_match_exact(self) -> str:
326 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR:
327 return f"{self._directory}/**/* (anything below the directory)"
328 return f"{self.describe_match_short()} (anything directly in the directory)"
330 @property
331 def path_type(self) -> PathType | None:
332 return self._path_type
334 @property
335 def directory(self) -> str:
336 return self._directory
338 def shell_escape_pattern(self) -> str:
339 if self._rule_type == MatchRuleType.ANYTHING_BENEATH_DIR: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 return super().shell_escape_pattern()
341 return escape_shell(self._directory.lstrip(".")) + "/*"
344class BasenameGlobMatch(MatchRule):
345 __slots__ = (
346 "_basename_glob",
347 "_directory",
348 "_matcher",
349 "_path_type",
350 "_recursive_match",
351 "_escaped_basename_pattern",
352 )
354 def __init__(
355 self,
356 basename_glob: str,
357 only_when_in_directory: str | None = None,
358 path_type: PathType | None = None,
359 recursive_match: bool | None = None, # TODO: Can this just be = False (?)
360 ) -> None:
361 super().__init__(MatchRuleType.BASENAME_GLOB)
362 self._basename_glob = basename_glob
363 self._directory = only_when_in_directory
364 self._path_type = path_type
365 self._recursive_match = recursive_match
366 if self._directory is None and not recursive_match: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true
367 self._recursive_match = True
368 assert self._directory is None or not self._directory.endswith("/")
369 assert "/" not in basename_glob # Not a basename if it contains /
370 assert "**" not in basename_glob # Also not a (true) basename if it has **
371 self._escaped_basename_pattern, self._matcher = _compile_basename_glob(
372 basename_glob
373 )
375 def _full_pattern(self) -> str:
376 if self._directory is not None:
377 maybe_recursive = "**/" if self._recursive_match else ""
378 return f"{self._directory}/{maybe_recursive}{self._basename_glob}"
379 return self._basename_glob
381 def finditer(
382 self,
383 fs_root: VP,
384 *,
385 ignore_paths: Callable[[VP], bool] | None = None,
386 ) -> Iterable[VP]:
387 search_root = fs_root
388 if self._directory is not None:
389 p = _lookup_path(fs_root, self._directory)
390 if p is None or not p.is_dir:
391 return
392 search_root = p
393 path_iter = (
394 search_root.all_paths() if self._recursive_match else search_root.iterdir
395 )
396 if ignore_paths is not None:
397 path_iter = (p for p in path_iter if not ignore_paths(p))
398 if self._path_type is None:
399 yield from (m for m in path_iter if self._matcher(m.name))
400 else:
401 yield from (
402 m
403 for m in path_iter
404 if self._matcher(m.name) and _match_file_type(self._path_type, m)
405 )
407 def describe_match_short(self) -> str:
408 path_type_match = (
409 ""
410 if self._path_type is None
411 else f" <only for path type {self._path_type.manifest_key}>"
412 )
413 return (
414 self._full_pattern()
415 if path_type_match == ""
416 else f"{self._full_pattern()}{path_type_match}"
417 )
419 def describe_match_exact(self) -> str:
420 if self._directory is not None:
421 return f"{self.describe_match_short()} (glob / directly in the directory)"
422 return f"{self.describe_match_short()} (basename match)"
424 def __eq__(self, other: object) -> bool:
425 if not isinstance(other, BasenameGlobMatch):
426 return NotImplemented
427 return (
428 self._basename_glob == other._basename_glob
429 and self._directory == other._directory
430 and self._path_type == other._path_type
431 and self._recursive_match == other._recursive_match
432 )
434 @property
435 def path_type(self) -> PathType | None:
436 return self._path_type
438 @property
439 def directory(self) -> str | None:
440 return self._directory
442 def shell_escape_pattern(self) -> str:
443 if self._directory is None or self._escaped_basename_pattern is None:
444 return super().shell_escape_pattern()
445 return (
446 escape_shell(self._directory.lstrip("."))
447 + f"/{self._escaped_basename_pattern}"
448 )
451class GenericGlobImplementation(MatchRule):
452 __slots__ = "_glob_pattern", "_path_type", "_match_parts"
454 def __init__(
455 self,
456 glob_pattern: str,
457 path_type: PathType | None = None,
458 ) -> None:
459 super().__init__(MatchRuleType.GENERIC_GLOB)
460 if glob_pattern.startswith("./"): 460 ↛ 462line 460 didn't jump to line 462 because the condition on line 460 was always true
461 glob_pattern = glob_pattern[2:]
462 self._glob_pattern = glob_pattern
463 self._path_type = path_type
464 assert "**" not in glob_pattern # No recursive globs
465 assert glob.has_magic(
466 glob_pattern
467 ) # If it has no glob, then it could have been an exact match
468 assert (
469 "/" in glob_pattern
470 ) # If it does not have a / then a BasenameGlob could have been used instead
471 self._match_parts = self._compile_glob()
473 def _full_pattern(self) -> str:
474 return self._glob_pattern
476 def finditer(
477 self,
478 fs_root: VP,
479 *,
480 ignore_paths: Callable[[VP], bool] | None = None,
481 ) -> Iterable[VP]:
482 search_history = [fs_root]
483 for part in self._match_parts:
484 next_layer = itertools.chain.from_iterable(
485 _apply_match(m, part) for m in search_history
486 )
487 # TODO: Figure out why we need to materialize next_layer into a list for this to work.
488 search_history = list(next_layer)
489 if not search_history:
490 # While we have it as a list, we might as well have an "early exit".
491 return
493 if self._path_type is None:
494 if ignore_paths is None:
495 yield from search_history
496 else:
497 yield from (p for p in search_history if not ignore_paths(p))
498 elif ignore_paths is None:
499 yield from (
500 m for m in search_history if _match_file_type(self._path_type, m)
501 )
502 else:
503 yield from (
504 m
505 for m in search_history
506 if _match_file_type(self._path_type, m) and not ignore_paths(m)
507 )
509 def describe_match_short(self) -> str:
510 path_type_match = (
511 ""
512 if self._path_type is None
513 else f" <only for path type {self._path_type.manifest_key}>"
514 )
515 return (
516 self._full_pattern()
517 if path_type_match == ""
518 else f"{self._full_pattern()}{path_type_match}"
519 )
521 def describe_match_exact(self) -> str:
522 return f"{self.describe_match_short()} (glob)"
524 def _compile_glob(self) -> Sequence[Callable[[str], bool] | str]:
525 assert self._glob_pattern.strip("/") == self._glob_pattern
526 return [
527 _compile_basename_glob(part)[1] if glob.has_magic(part) else part
528 for part in self._glob_pattern.split("/")
529 ]
531 def __eq__(self, other: object) -> bool:
532 if not isinstance(other, GenericGlobImplementation):
533 return NotImplemented
534 return (
535 self._glob_pattern == other._glob_pattern
536 and self._path_type == other._path_type
537 )
539 @property
540 def path_type(self) -> PathType | None:
541 return self._path_type