From a844a9bac32583911ed842229b3e6a8070c05fb6 Mon Sep 17 00:00:00 2001 From: atollk Date: Sat, 24 Jul 2021 11:22:13 +0200 Subject: [PATCH 1/2] Added filter_glob and exclude_glob to fs.walk.Walker. These extend the class by an option to include/exclude resources by their entire path, not just its last component. To do so, fs.wildcard had to undergo a rework to remove the dependency on the `re` module. Unit tests were added for all new/changed code. --- CHANGELOG.md | 9 ++ fs/base.py | 61 ++++++++- fs/errors.py | 17 +++ fs/glob.py | 179 ++++++++++++++++++++++++- fs/walk.py | 62 +++++++-- fs/wildcard.py | 14 +- tests/test_glob.py | 7 +- tests/test_walk.py | 319 ++++++++++++++++++++++++++++++--------------- 8 files changed, 536 insertions(+), 132 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 267c942b..2c370c9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +### Added + +- To `fs.walk.Walker`, added parameters `filter_glob` and `exclude_glob`. + Closes [#459](https://github.com/PyFilesystem/pyfilesystem2/issues/459). + +### Fixed +- Elaborated documentation of `filter_dirs` and `exclude_dirs` in `fs.walk.Walker`. + Closes [#371](https://github.com/PyFilesystem/pyfilesystem2/issues/371). + ## [2.4.16] - 2022-05-02 diff --git a/fs/base.py b/fs/base.py index 07a16756..dfbc6106 100644 --- a/fs/base.py +++ b/fs/base.py @@ -21,7 +21,7 @@ from contextlib import closing from functools import partial, wraps -from . import copy, errors, fsencode, iotools, tools, walk, wildcard +from . import copy, errors, fsencode, iotools, tools, walk, wildcard, glob from .copy import copy_modified_time from .glob import BoundGlobber from .mode import validate_open_mode @@ -1653,8 +1653,8 @@ def check(self): if self.isclosed(): raise errors.FilesystemClosed() - def match(self, patterns, name): - # type: (Optional[Iterable[Text]], Text) -> bool + def match(self, patterns, name, accept_prefix=False): + # type: (Optional[Iterable[Text]], Text, bool) -> bool """Check if a name matches any of a list of wildcards. If a filesystem is case *insensitive* (such as Windows) then @@ -1696,6 +1696,61 @@ def match(self, patterns, name): matcher = wildcard.get_matcher(patterns, case_sensitive) return matcher(name) + def match_glob(self, patterns, path, accept_prefix=False): + # type: (Optional[Iterable[Text]], Text, bool) -> bool + """Check if a path matches any of a list of glob patterns. + + If a filesystem is case *insensitive* (such as Windows) then + this method will perform a case insensitive match (i.e. ``*.py`` + will match the same names as ``*.PY``). Otherwise the match will + be case sensitive (``*.py`` and ``*.PY`` will match different + names). + + Arguments: + patterns (list, optional): A list of patterns, e.g. + ``['*.py']``, or `None` to match everything. + path (str): A resource path, starting with "/". + accept_prefix (bool): If ``True``, the path is + not required to match the wildcards themselves + but only need to be a prefix of a string that does. + + Returns: + bool: `True` if ``path`` matches any of the patterns. + + Raises: + TypeError: If ``patterns`` is a single string instead of + a list (or `None`). + ValueError: If ``path`` is not a string starting with "/". + + Example: + >>> my_fs.match_glob(['*.py'], '/__init__.py') + True + >>> my_fs.match_glob(['*.jpg', '*.png'], '/foo.gif') + False + >>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=True) + True + >>> my_fs.match_glob(['dir/file.txt'], '/dir/gile.txt', accept_prefix=True) + False + + Note: + If ``patterns`` is `None` (or ``['*']``), then this + method will always return `True`. + + """ + if patterns is None: + return True + if not path or path[0] != "/": + raise ValueError("%s needs to be a string starting with /" % path) + if isinstance(patterns, six.text_type): + raise TypeError("patterns must be a list or sequence") + case_sensitive = not typing.cast( + bool, self.getmeta().get("case_insensitive", False) + ) + matcher = glob.get_matcher( + patterns, case_sensitive, accept_prefix=accept_prefix + ) + return matcher(path) + def tree(self, **kwargs): # type: (**Any) -> None """Render a tree view of the filesystem to stdout or a file. diff --git a/fs/errors.py b/fs/errors.py index 400bac76..32c795d9 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -41,6 +41,7 @@ "OperationFailed", "OperationTimeout", "PathError", + "PatternError", "PermissionDenied", "RemoteConnectionError", "RemoveRootError", @@ -346,3 +347,19 @@ class UnsupportedHash(ValueError): not supported by hashlib. """ + + +class PatternError(ValueError): + """A string pattern with invalid syntax was given.""" + + default_message = "pattern '{pattern}' is invalid at position {position}" + + def __init__(self, pattern, position, exc=None, msg=None): # noqa: D107 + # type: (Text, int, Optional[Exception], Optional[Text]) -> None + self.pattern = pattern + self.position = position + self.exc = exc + super(ValueError, self).__init__() + + def __reduce__(self): + return type(self), (self.path, self.position, self.exc, self._msg) diff --git a/fs/glob.py b/fs/glob.py index cd6473d2..82eb0228 100644 --- a/fs/glob.py +++ b/fs/glob.py @@ -4,22 +4,31 @@ from __future__ import unicode_literals import typing +from functools import partial import re from collections import namedtuple -from . import wildcard from ._repr import make_repr from .lrucache import LRUCache from .path import iteratepath + GlobMatch = namedtuple("GlobMatch", ["path", "info"]) Counts = namedtuple("Counts", ["files", "directories", "data"]) LineCounts = namedtuple("LineCounts", ["lines", "non_blank"]) if typing.TYPE_CHECKING: - from typing import Iterator, List, Optional, Pattern, Text, Tuple - + from typing import ( + Iterator, + List, + Optional, + Pattern, + Text, + Tuple, + Iterable, + Callable, + ) from .base import FS @@ -28,17 +37,87 @@ ) # type: LRUCache[Tuple[Text, bool], Tuple[int, bool, Pattern]] +def _split_pattern_by_rec(pattern): + # type: (Text) -> List[Text] + """Split a glob pattern at its directory seperators (/). + + Takes into account escaped cases like [/]. + """ + indices = [-1] + bracket_open = False + for i, c in enumerate(pattern): + if c == "/" and not bracket_open: + indices.append(i) + elif c == "[": + bracket_open = True + elif c == "]": + bracket_open = False + + indices.append(len(pattern)) + return [pattern[i + 1 : j] for i, j in zip(indices[:-1], indices[1:])] + + +def _translate(pattern, case_sensitive=True): + # type: (Text, bool) -> Text + """Translate a wildcard pattern to a regular expression. + + There is no way to quote meta-characters. + Arguments: + pattern (str): A wildcard pattern. + case_sensitive (bool): Set to `False` to use a case + insensitive regex (default `True`). + + Returns: + str: A regex equivalent to the given pattern. + + """ + if not case_sensitive: + pattern = pattern.lower() + i, n = 0, len(pattern) + res = [] + while i < n: + c = pattern[i] + i = i + 1 + if c == "*": + res.append("[^/]*") + elif c == "?": + res.append("[^/]") + elif c == "[": + j = i + if j < n and pattern[j] == "!": + j = j + 1 + if j < n and pattern[j] == "]": + j = j + 1 + while j < n and pattern[j] != "]": + j = j + 1 + if j >= n: + res.append("\\[") + else: + stuff = pattern[i:j].replace("\\", "\\\\") + i = j + 1 + if stuff[0] == "!": + stuff = "^" + stuff[1:] + elif stuff[0] == "^": + stuff = "\\" + stuff + res.append("[%s]" % stuff) + else: + res.append(re.escape(c)) + return "".join(res) + + def _translate_glob(pattern, case_sensitive=True): levels = 0 recursive = False re_patterns = [""] for component in iteratepath(pattern): - if component == "**": - re_patterns.append(".*/?") + if "**" in component: recursive = True + split = component.split("**") + split_re = [_translate(s, case_sensitive=case_sensitive) for s in split] + re_patterns.append("/?" + ".*/?".join(split_re)) else: re_patterns.append( - "/" + wildcard._translate(component, case_sensitive=case_sensitive) + "/" + _translate(component, case_sensitive=case_sensitive) ) levels += 1 re_glob = "(?ms)^" + "".join(re_patterns) + ("/$" if pattern.endswith("/") else "$") @@ -72,6 +151,8 @@ def match(pattern, path): except KeyError: levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) _PATTERN_CACHE[(pattern, True)] = (levels, recursive, re_pattern) + if path and path[0] != "/": + path = "/" + path return bool(re_pattern.match(path)) @@ -92,9 +173,95 @@ def imatch(pattern, path): except KeyError: levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) _PATTERN_CACHE[(pattern, False)] = (levels, recursive, re_pattern) + if path and path[0] != "/": + path = "/" + path return bool(re_pattern.match(path)) +def match_any(patterns, path): + # type: (Iterable[Text], Text) -> bool + """Test if a path matches any of a list of patterns. + + Will return `True` if ``patterns`` is an empty list. + + Arguments: + patterns (list): A list of wildcard pattern, e.g ``["*.py", + "*.pyc"]`` + name (str): A filename. + + Returns: + bool: `True` if the path matches at least one of the patterns. + + """ + if not patterns: + return True + return any(match(pattern, path) for pattern in patterns) + + +def imatch_any(patterns, path): + # type: (Iterable[Text], Text) -> bool + """Test if a path matches any of a list of patterns (case insensitive). + + Will return `True` if ``patterns`` is an empty list. + + Arguments: + patterns (list): A list of wildcard pattern, e.g ``["*.py", + "*.pyc"]`` + name (str): A filename. + + Returns: + bool: `True` if the path matches at least one of the patterns. + + """ + if not patterns: + return True + return any(imatch(pattern, path) for pattern in patterns) + + +def get_matcher(patterns, case_sensitive, accept_prefix=False): + # type: (Iterable[Text], bool, bool) -> Callable[[Text], bool] + """Get a callable that matches paths against the given patterns. + + Arguments: + patterns (list): A list of wildcard pattern. e.g. ``["*.py", + "*.pyc"]`` + case_sensitive (bool): If ``True``, then the callable will be case + sensitive, otherwise it will be case insensitive. + accept_prefix (bool): If ``True``, the name is + not required to match the wildcards themselves + but only need to be a prefix of a string that does. + + Returns: + callable: a matcher that will return `True` if the paths given as + an argument matches any of the given patterns. + + Example: + >>> from fs import wildcard + >>> is_python = wildcard.get_matcher(['*.py'], True) + >>> is_python('__init__.py') + True + >>> is_python('foo.txt') + False + + """ + if not patterns: + return lambda name: True + + if accept_prefix: + new_patterns = [] + for pattern in patterns: + split = _split_pattern_by_rec(pattern) + for i in range(1, len(split)): + new_pattern = "/".join(split[:i]) + new_patterns.append(new_pattern) + new_patterns.append(new_pattern + "/") + new_patterns.append(pattern) + patterns = new_patterns + + matcher = match_any if case_sensitive else imatch_any + return partial(matcher, patterns) + + class Globber(object): """A generator of glob results.""" diff --git a/fs/walk.py b/fs/walk.py index 31dc6b0e..1724dfcc 100644 --- a/fs/walk.py +++ b/fs/walk.py @@ -60,6 +60,8 @@ def __init__( filter_dirs=None, # type: Optional[List[Text]] exclude_dirs=None, # type: Optional[List[Text]] max_depth=None, # type: Optional[int] + filter_glob=None, # type: Optional[List[Text]] + exclude_glob=None, # type: Optional[List[Text]] ): # type: (...) -> None """Create a new `Walker` instance. @@ -83,11 +85,22 @@ def __init__( any of these patterns will be removed from the walk. filter_dirs (list, optional): A list of patterns that will be used to match directories paths. The walk will only open directories - that match at least one of these patterns. + that match at least one of these patterns. Directories will + only be returned if the final component matches one of the + patterns. exclude_dirs (list, optional): A list of patterns that will be used to filter out directories from the walk. e.g. - ``['*.svn', '*.git']``. + ``['*.svn', '*.git']``. Directories matching any of these + patterns will be removed from the walk. max_depth (int, optional): Maximum directory depth to walk. + filter_glob (list, optional): If supplied, this parameter + should be a list of path patterns e.g. ``["foo/**/*.py"]``. + Resources will only be returned if their global path or + an extension of it matches one of the patterns. + exclude_glob (list, optional): If supplied, this parameter + should be a list of path patterns e.g. ``["foo/**/*.py"]``. + Resources will not be returned if their global path or + an extension of it matches one of the patterns. """ if search not in ("breadth", "depth"): @@ -107,6 +120,8 @@ def __init__( self.exclude = exclude self.filter_dirs = filter_dirs self.exclude_dirs = exclude_dirs + self.filter_glob = filter_glob + self.exclude_glob = exclude_glob self.max_depth = max_depth super(Walker, self).__init__() @@ -178,6 +193,8 @@ def __repr__(self): filter_dirs=(self.filter_dirs, None), exclude_dirs=(self.exclude_dirs, None), max_depth=(self.max_depth, None), + filter_glob=(self.filter_glob, None), + exclude_glob=(self.exclude_glob, None), ) def _iter_walk( @@ -196,10 +213,19 @@ def _iter_walk( def _check_open_dir(self, fs, path, info): # type: (FS, Text, Info) -> bool """Check if a directory should be considered in the walk.""" + full_path = ("" if path == "/" else path) + "/" + info.name if self.exclude_dirs is not None and fs.match(self.exclude_dirs, info.name): return False + if self.exclude_glob is not None and fs.match_glob( + self.exclude_glob, full_path + ): + return False if self.filter_dirs is not None and not fs.match(self.filter_dirs, info.name): return False + if self.filter_glob is not None and not fs.match_glob( + self.filter_glob, full_path, accept_prefix=True + ): + return False return self.check_open_dir(fs, path, info) def check_open_dir(self, fs, path, info): @@ -245,6 +271,26 @@ def check_scan_dir(self, fs, path, info): """ return True + def _check_file(self, fs, dir_path, info): + # type: (FS, Text, Info) -> bool + """Check if a filename should be included.""" + # Weird check required for backwards compatibility, + # when _check_file did not exist. + if Walker._check_file == type(self)._check_file: + if self.exclude is not None and fs.match(self.exclude, info.name): + return False + if self.exclude_glob is not None and fs.match_glob( + self.exclude_glob, dir_path + "/" + info.name + ): + return False + if self.filter is not None and not fs.match(self.filter, info.name): + return False + if self.filter_glob is not None and not fs.match_glob( + self.filter_glob, dir_path + "/" + info.name, accept_prefix=True + ): + return False + return self.check_file(fs, info) + def check_file(self, fs, info): # type: (FS, Info) -> bool """Check if a filename should be included. @@ -259,9 +305,7 @@ def check_file(self, fs, info): bool: `True` if the file should be included. """ - if self.exclude is not None and fs.match(self.exclude, info.name): - return False - return fs.match(self.filter, info.name) + return True def _scan( self, @@ -418,7 +462,7 @@ def _walk_breadth( _calculate_depth = self._calculate_depth _check_open_dir = self._check_open_dir _check_scan_dir = self._check_scan_dir - _check_file = self.check_file + _check_file = self._check_file depth = _calculate_depth(path) @@ -432,7 +476,7 @@ def _walk_breadth( if _check_scan_dir(fs, dir_path, info, _depth): push(_combine(dir_path, info.name)) else: - if _check_file(fs, info): + if _check_file(fs, dir_path, info): yield dir_path, info # Found a file yield dir_path, None # End of directory @@ -451,7 +495,7 @@ def _walk_depth( _calculate_depth = self._calculate_depth _check_open_dir = self._check_open_dir _check_scan_dir = self._check_scan_dir - _check_file = self.check_file + _check_file = self._check_file depth = _calculate_depth(path) stack = [ @@ -483,7 +527,7 @@ def _walk_depth( else: yield dir_path, info else: - if _check_file(fs, info): + if _check_file(fs, dir_path, info): yield dir_path, info diff --git a/fs/wildcard.py b/fs/wildcard.py index 7a34d6b7..cc6c0530 100644 --- a/fs/wildcard.py +++ b/fs/wildcard.py @@ -147,14 +147,14 @@ def _translate(pattern, case_sensitive=True): if not case_sensitive: pattern = pattern.lower() i, n = 0, len(pattern) - res = "" + res = [] while i < n: c = pattern[i] i = i + 1 if c == "*": - res = res + "[^/]*" + res.append("[^/]*") elif c == "?": - res = res + "." + res.append(".") elif c == "[": j = i if j < n and pattern[j] == "!": @@ -164,7 +164,7 @@ def _translate(pattern, case_sensitive=True): while j < n and pattern[j] != "]": j = j + 1 if j >= n: - res = res + "\\[" + res.append("\\[") else: stuff = pattern[i:j].replace("\\", "\\\\") i = j + 1 @@ -172,7 +172,7 @@ def _translate(pattern, case_sensitive=True): stuff = "^" + stuff[1:] elif stuff[0] == "^": stuff = "\\" + stuff - res = "%s[%s]" % (res, stuff) + res.append("[%s]" % stuff) else: - res = res + re.escape(c) - return res + res.append(re.escape(c)) + return "".join(res) diff --git a/tests/test_glob.py b/tests/test_glob.py index 51eb7779..6147bf49 100644 --- a/tests/test_glob.py +++ b/tests/test_glob.py @@ -21,6 +21,7 @@ def test_match(self): tests = [ ("*.?y", "/test.py", True), ("*.py", "/test.py", True), + ("*.py", "__init__.py", True), ("*.py", "/test.pc", False), ("*.py", "/foo/test.py", False), ("foo/*.py", "/foo/test.py", True), @@ -28,21 +29,23 @@ def test_match(self): ("?oo/*.py", "/foo/test.py", True), ("*/*.py", "/foo/test.py", True), ("foo/*.py", "/bar/foo/test.py", False), + ("/foo/**", "/foo/test.py", True), ("**/foo/*.py", "/bar/foo/test.py", True), ("foo/**/bar/*.py", "/foo/bar/test.py", True), ("foo/**/bar/*.py", "/foo/baz/egg/bar/test.py", True), ("foo/**/bar/*.py", "/foo/baz/egg/bar/egg/test.py", False), ("**", "/test.py", True), + ("/**", "/test.py", True), ("**", "/test", True), ("**", "/test/", True), ("**/", "/test/", True), ("**/", "/test.py", False), ] for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected) + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) # Run a second time to test cache for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected) + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) def test_count_1dir(self): globber = glob.BoundGlobber(self.fs) diff --git a/tests/test_walk.py b/tests/test_walk.py index 5e05d054..e0146f2d 100644 --- a/tests/test_walk.py +++ b/tests/test_walk.py @@ -21,9 +21,28 @@ def test_create(self): walk.Walker(ignore_errors=True, on_error=lambda path, error: True) walk.Walker(ignore_errors=True) + def test_on_error_invalid(self): + with self.assertRaises(TypeError): + walk.Walker(on_error="nope") + -class TestWalk(unittest.TestCase): +class TestBoundWalkerBase(unittest.TestCase): def setUp(self): + """ + Sets up the following file system with empty files: + + / + -foo1/ + - -top1.txt + - -top2.txt + -foo2/ + - -bar1/ + - -bar2/ + - - -bar3/ + - - - -test.txt + - -top3.bin + -foo3/ + """ self.fs = MemoryFS() self.fs.makedir("foo1") @@ -37,21 +56,50 @@ def setUp(self): self.fs.create("foo2/bar2/bar3/test.txt") self.fs.create("foo2/top3.bin") - def test_invalid(self): - with self.assertRaises(ValueError): - self.fs.walk(search="random") +class TestBoundWalker(TestBoundWalkerBase): def test_repr(self): repr(self.fs.walk) - def test_walk(self): + def test_readonly_wrapper_uses_same_walker(self): + class CustomWalker(walk.Walker): + @classmethod + def bind(cls, fs): + return walk.BoundWalker(fs, walker_class=CustomWalker) + + class CustomizedMemoryFS(MemoryFS): + walker_class = CustomWalker + + base_fs = CustomizedMemoryFS() + base_walker = base_fs.walk + self.assertEqual(base_walker.walker_class, CustomWalker) + + readonly_fs = read_only(CustomizedMemoryFS()) + readonly_walker = readonly_fs.walk + self.assertEqual(readonly_walker.walker_class, CustomWalker) + + +class TestWalk(TestBoundWalkerBase): + def _walk_step_names(self, *args, **kwargs): + """Performs a walk with the given arguments and returns a list of steps. + + Each step is a triple of the path, list of directory names, and list of file names. + """ _walk = [] - for step in self.fs.walk(): + for step in self.fs.walk(*args, **kwargs): self.assertIsInstance(step, walk.Step) path, dirs, files = step _walk.append( (path, [info.name for info in dirs], [info.name for info in files]) ) + return _walk + + def test_invalid_search(self): + with self.assertRaises(ValueError): + self.fs.walk(search="random") + + def test_walk_simple(self): + _walk = self._walk_step_names() expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -63,14 +111,34 @@ def test_walk(self): ] self.assertEqual(_walk, expected) + def test_walk_filter(self): + _walk = self._walk_step_names(filter=["top*.txt"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude(self): + _walk = self._walk_step_names(exclude=["top*"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + def test_walk_filter_dirs(self): - _walk = [] - for step in self.fs.walk(filter_dirs=["foo*"]): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + _walk = self._walk_step_names(filter_dirs=["foo*"]) expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", [], ["top1.txt", "top2.txt"]), @@ -79,14 +147,65 @@ def test_walk_filter_dirs(self): ] self.assertEqual(_walk, expected) + def test_walk_filter_glob_1(self): + _walk = self._walk_step_names(filter_glob=["/foo*/bar*/"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_filter_glob_2(self): + _walk = self._walk_step_names(filter_glob=["/foo*/bar**"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + + def test_walk_filter_mix(self): + _walk = self._walk_step_names(filter_glob=["/foo2/bar**"], filter=["top1.txt"]) + expected = [ + ("/", ["foo2"], []), + ("/foo2", ["bar2"], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude_dirs(self): + _walk = self._walk_step_names(exclude_dirs=["bar*", "foo2"]) + expected = [ + ("/", ["foo1", "foo3"], []), + ("/foo1", [], ["top1.txt", "top2.txt"]), + ("/foo3", [], []), + ] + self.assertEqual(_walk, expected) + + def test_walk_exclude_glob(self): + _walk = self._walk_step_names(exclude_glob=["**/top*", "test.txt"]) + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ("/foo1", ["bar1"], []), + ("/foo2", ["bar2"], []), + ("/foo3", [], []), + ("/foo1/bar1", [], []), + ("/foo2/bar2", ["bar3"], []), + ("/foo2/bar2/bar3", [], ["test.txt"]), + ] + self.assertEqual(_walk, expected) + def test_walk_depth(self): - _walk = [] - for step in self.fs.walk(search="depth"): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + _walk = self._walk_step_names(search="depth") expected = [ ("/foo1/bar1", [], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -98,14 +217,8 @@ def test_walk_depth(self): ] self.assertEqual(_walk, expected) - def test_walk_directory(self): - _walk = [] - for step in self.fs.walk("foo2"): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - (path, [info.name for info in dirs], [info.name for info in files]) - ) + def test_walk_path(self): + _walk = self._walk_step_names("foo2") expected = [ ("/foo2", ["bar2"], ["top3.bin"]), ("/foo2/bar2", ["bar3"], []), @@ -113,34 +226,22 @@ def test_walk_directory(self): ] self.assertEqual(_walk, expected) - def test_walk_levels_1(self): - results = list(self.fs.walk(max_depth=1)) - self.assertEqual(len(results), 1) - dirs = sorted(info.name for info in results[0].dirs) - self.assertEqual(dirs, ["foo1", "foo2", "foo3"]) - files = sorted(info.name for info in results[0].files) - self.assertEqual(files, []) + def test_walk_max_depth_1_breadth(self): + _walk = self._walk_step_names(max_depth=1, search="breadth") + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ] + self.assertEqual(_walk, expected) - def test_walk_levels_1_depth(self): - results = list(self.fs.walk(max_depth=1, search="depth")) - self.assertEqual(len(results), 1) - dirs = sorted(info.name for info in results[0].dirs) - self.assertEqual(dirs, ["foo1", "foo2", "foo3"]) - files = sorted(info.name for info in results[0].files) - self.assertEqual(files, []) + def test_walk_max_depth_1_depth(self): + _walk = self._walk_step_names(max_depth=1, search="depth") + expected = [ + ("/", ["foo1", "foo2", "foo3"], []), + ] + self.assertEqual(_walk, expected) - def test_walk_levels_2(self): - _walk = [] - for step in self.fs.walk(max_depth=2): - self.assertIsInstance(step, walk.Step) - path, dirs, files = step - _walk.append( - ( - path, - sorted(info.name for info in dirs), - sorted(info.name for info in files), - ) - ) + def test_walk_max_depth_2(self): + _walk = self._walk_step_names(max_depth=2) expected = [ ("/", ["foo1", "foo2", "foo3"], []), ("/foo1", ["bar1"], ["top1.txt", "top2.txt"]), @@ -149,6 +250,30 @@ def test_walk_levels_2(self): ] self.assertEqual(_walk, expected) + +class TestDirs(TestBoundWalkerBase): + def test_walk_dirs(self): + dirs = list(self.fs.walk.dirs()) + self.assertEqual( + dirs, + ["/foo1", "/foo2", "/foo3", "/foo1/bar1", "/foo2/bar2", "/foo2/bar2/bar3"], + ) + + dirs = list(self.fs.walk.dirs(search="depth")) + self.assertEqual( + dirs, + ["/foo1/bar1", "/foo1", "/foo2/bar2/bar3", "/foo2/bar2", "/foo2", "/foo3"], + ) + + dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) + self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) + + def test_foo(self): + dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) + self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) + + +class TestFiles(TestBoundWalkerBase): def test_walk_files(self): files = list(self.fs.walk.files()) @@ -173,22 +298,6 @@ def test_walk_files(self): ], ) - def test_walk_dirs(self): - dirs = list(self.fs.walk.dirs()) - self.assertEqual( - dirs, - ["/foo1", "/foo2", "/foo3", "/foo1/bar1", "/foo2/bar2", "/foo2/bar2/bar3"], - ) - - dirs = list(self.fs.walk.dirs(search="depth")) - self.assertEqual( - dirs, - ["/foo1/bar1", "/foo1", "/foo2/bar2/bar3", "/foo2/bar2", "/foo2", "/foo3"], - ) - - dirs = list(self.fs.walk.dirs(search="depth", exclude_dirs=["foo2"])) - self.assertEqual(dirs, ["/foo1/bar1", "/foo1", "/foo3"]) - def test_walk_files_filter(self): files = list(self.fs.walk.files(filter=["*.txt"])) @@ -209,6 +318,16 @@ def test_walk_files_filter(self): self.assertEqual(files, []) + def test_walk_files_filter_glob(self): + files = list(self.fs.walk.files(filter_glob=["/foo2/**"])) + self.assertEqual( + files, + [ + "/foo2/top3.bin", + "/foo2/bar2/bar3/test.txt", + ], + ) + def test_walk_files_exclude(self): # Test exclude argument works files = list(self.fs.walk.files(exclude=["*.txt"])) @@ -222,25 +341,6 @@ def test_walk_files_exclude(self): files = list(self.fs.walk.files(exclude=["*"])) self.assertEqual(files, []) - def test_walk_info(self): - walk = [] - for path, info in self.fs.walk.info(): - walk.append((path, info.is_dir, info.name)) - - expected = [ - ("/foo1", True, "foo1"), - ("/foo2", True, "foo2"), - ("/foo3", True, "foo3"), - ("/foo1/top1.txt", False, "top1.txt"), - ("/foo1/top2.txt", False, "top2.txt"), - ("/foo1/bar1", True, "bar1"), - ("/foo2/bar2", True, "bar2"), - ("/foo2/top3.bin", False, "top3.bin"), - ("/foo2/bar2/bar3", True, "bar3"), - ("/foo2/bar2/bar3/test.txt", False, "test.txt"), - ] - self.assertEqual(walk, expected) - def test_broken(self): original_scandir = self.fs.scandir @@ -257,10 +357,6 @@ def broken_scandir(path, namespaces=None): with self.assertRaises(FSError): list(self.fs.walk.files(on_error=lambda path, error: False)) - def test_on_error_invalid(self): - with self.assertRaises(TypeError): - walk.Walker(on_error="nope") - def test_subdir_uses_same_walker(self): class CustomWalker(walk.Walker): @classmethod @@ -284,19 +380,32 @@ class CustomizedMemoryFS(MemoryFS): self.assertEqual(sub_walker.walker_class, CustomWalker) six.assertCountEqual(self, ["/c", "/d"], sub_walker.files()) - def test_readonly_wrapper_uses_same_walker(self): + def test_check_file_overwrite(self): class CustomWalker(walk.Walker): - @classmethod - def bind(cls, fs): - return walk.BoundWalker(fs, walker_class=CustomWalker) + def check_file(self, fs, info): + return False - class CustomizedMemoryFS(MemoryFS): - walker_class = CustomWalker + walker = CustomWalker() + files = list(walker.files(self.fs)) + self.assertEqual(files, []) - base_fs = CustomizedMemoryFS() - base_walker = base_fs.walk - self.assertEqual(base_walker.walker_class, CustomWalker) - readonly_fs = read_only(CustomizedMemoryFS()) - readonly_walker = readonly_fs.walk - self.assertEqual(readonly_walker.walker_class, CustomWalker) +class TestInfo(TestBoundWalkerBase): + def test_walk_info(self): + walk = [] + for path, info in self.fs.walk.info(): + walk.append((path, info.is_dir, info.name)) + + expected = [ + ("/foo1", True, "foo1"), + ("/foo2", True, "foo2"), + ("/foo3", True, "foo3"), + ("/foo1/top1.txt", False, "top1.txt"), + ("/foo1/top2.txt", False, "top2.txt"), + ("/foo1/bar1", True, "bar1"), + ("/foo2/bar2", True, "bar2"), + ("/foo2/top3.bin", False, "top3.bin"), + ("/foo2/bar2/bar3", True, "bar3"), + ("/foo2/bar2/bar3/test.txt", False, "test.txt"), + ] + self.assertEqual(walk, expected) From d85d9dbddf826d856242fd2c8cb3724c68d18f93 Mon Sep 17 00:00:00 2001 From: atollk Date: Sun, 5 Sep 2021 13:00:30 +0200 Subject: [PATCH 2/2] Changes from code review. --- CHANGELOG.md | 2 +- fs/base.py | 8 ++-- fs/glob.py | 95 +++++++++++++++++++++++++--------------------- fs/walk.py | 8 ++-- tests/test_glob.py | 61 ++++++++++++++++++++++++++--- 5 files changed, 116 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c370c9d..837f4947 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added -- To `fs.walk.Walker`, added parameters `filter_glob` and `exclude_glob`. +- Added `filter_glob` and `exclude_glob` parameters to `fs.walk.Walker`. Closes [#459](https://github.com/PyFilesystem/pyfilesystem2/issues/459). ### Fixed diff --git a/fs/base.py b/fs/base.py index dfbc6106..56e5cf99 100644 --- a/fs/base.py +++ b/fs/base.py @@ -1653,8 +1653,8 @@ def check(self): if self.isclosed(): raise errors.FilesystemClosed() - def match(self, patterns, name, accept_prefix=False): - # type: (Optional[Iterable[Text]], Text, bool) -> bool + def match(self, patterns, name): + # type: (Optional[Iterable[Text]], Text) -> bool """Check if a name matches any of a list of wildcards. If a filesystem is case *insensitive* (such as Windows) then @@ -1711,7 +1711,7 @@ def match_glob(self, patterns, path, accept_prefix=False): ``['*.py']``, or `None` to match everything. path (str): A resource path, starting with "/". accept_prefix (bool): If ``True``, the path is - not required to match the wildcards themselves + not required to match the patterns themselves but only need to be a prefix of a string that does. Returns: @@ -1729,6 +1729,8 @@ def match_glob(self, patterns, path, accept_prefix=False): False >>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=True) True + >>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=False) + False >>> my_fs.match_glob(['dir/file.txt'], '/dir/gile.txt', accept_prefix=True) False diff --git a/fs/glob.py b/fs/glob.py index 82eb0228..4e783652 100644 --- a/fs/glob.py +++ b/fs/glob.py @@ -34,10 +34,10 @@ _PATTERN_CACHE = LRUCache( 1000 -) # type: LRUCache[Tuple[Text, bool], Tuple[int, bool, Pattern]] +) # type: LRUCache[Tuple[Text, bool], Tuple[Optional[int], Pattern]] -def _split_pattern_by_rec(pattern): +def _split_pattern_by_sep(pattern): # type: (Text) -> List[Text] """Split a glob pattern at its directory seperators (/). @@ -57,28 +57,27 @@ def _split_pattern_by_rec(pattern): return [pattern[i + 1 : j] for i, j in zip(indices[:-1], indices[1:])] -def _translate(pattern, case_sensitive=True): - # type: (Text, bool) -> Text - """Translate a wildcard pattern to a regular expression. +def _translate(pattern): + # type: (Text) -> Text + """Translate a glob pattern without '**' to a regular expression. There is no way to quote meta-characters. + Arguments: - pattern (str): A wildcard pattern. - case_sensitive (bool): Set to `False` to use a case - insensitive regex (default `True`). + pattern (str): A glob pattern. Returns: str: A regex equivalent to the given pattern. """ - if not case_sensitive: - pattern = pattern.lower() i, n = 0, len(pattern) res = [] while i < n: c = pattern[i] i = i + 1 if c == "*": + if i < n and pattern[i] == "*": + raise ValueError("glob._translate does not support '**' patterns.") res.append("[^/]*") elif c == "?": res.append("[^/]") @@ -96,7 +95,7 @@ def _translate(pattern, case_sensitive=True): stuff = pattern[i:j].replace("\\", "\\\\") i = j + 1 if stuff[0] == "!": - stuff = "^" + stuff[1:] + stuff = "^/" + stuff[1:] elif stuff[0] == "^": stuff = "\\" + stuff res.append("[%s]" % stuff) @@ -105,27 +104,35 @@ def _translate(pattern, case_sensitive=True): return "".join(res) -def _translate_glob(pattern, case_sensitive=True): - levels = 0 +def _translate_glob(pattern): + # type: (Text) -> Tuple[Optional[int], Text] + """Translate a glob pattern to a regular expression. + + There is no way to quote meta-characters. + + Arguments: + pattern (str): A glob pattern. + + Returns: + Tuple[Optional[int], Text]: The first component describes the levels + of depth this glob pattern goes to; basically the number of "/" in + the pattern. If there is a "**" in the glob pattern, the depth is + basically unbounded, and this component is `None` instead. + The second component is the regular expression. + + """ recursive = False re_patterns = [""] for component in iteratepath(pattern): if "**" in component: recursive = True split = component.split("**") - split_re = [_translate(s, case_sensitive=case_sensitive) for s in split] + split_re = [_translate(s) for s in split] re_patterns.append("/?" + ".*/?".join(split_re)) else: - re_patterns.append( - "/" + _translate(component, case_sensitive=case_sensitive) - ) - levels += 1 + re_patterns.append("/" + _translate(component)) re_glob = "(?ms)^" + "".join(re_patterns) + ("/$" if pattern.endswith("/") else "$") - return ( - levels, - recursive, - re.compile(re_glob, 0 if case_sensitive else re.IGNORECASE), - ) + return pattern.count("/") + 1 if not recursive else None, re_glob def match(pattern, path): @@ -147,10 +154,11 @@ def match(pattern, path): """ try: - levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, True)] + levels, re_pattern = _PATTERN_CACHE[(pattern, True)] except KeyError: - levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) - _PATTERN_CACHE[(pattern, True)] = (levels, recursive, re_pattern) + levels, re_str = _translate_glob(pattern) + re_pattern = re.compile(re_str) + _PATTERN_CACHE[(pattern, True)] = (levels, re_pattern) if path and path[0] != "/": path = "/" + path return bool(re_pattern.match(path)) @@ -169,10 +177,11 @@ def imatch(pattern, path): """ try: - levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, False)] + levels, re_pattern = _PATTERN_CACHE[(pattern, False)] except KeyError: - levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True) - _PATTERN_CACHE[(pattern, False)] = (levels, recursive, re_pattern) + levels, re_str = _translate_glob(pattern) + re_pattern = re.compile(re_str, re.IGNORECASE) + _PATTERN_CACHE[(pattern, False)] = (levels, re_pattern) if path and path[0] != "/": path = "/" + path return bool(re_pattern.match(path)) @@ -187,7 +196,7 @@ def match_any(patterns, path): Arguments: patterns (list): A list of wildcard pattern, e.g ``["*.py", "*.pyc"]`` - name (str): A filename. + path (str): A resource path. Returns: bool: `True` if the path matches at least one of the patterns. @@ -207,7 +216,7 @@ def imatch_any(patterns, path): Arguments: patterns (list): A list of wildcard pattern, e.g ``["*.py", "*.pyc"]`` - name (str): A filename. + path (str): A resource path. Returns: bool: `True` if the path matches at least one of the patterns. @@ -228,16 +237,17 @@ def get_matcher(patterns, case_sensitive, accept_prefix=False): case_sensitive (bool): If ``True``, then the callable will be case sensitive, otherwise it will be case insensitive. accept_prefix (bool): If ``True``, the name is - not required to match the wildcards themselves + not required to match the patterns themselves but only need to be a prefix of a string that does. Returns: callable: a matcher that will return `True` if the paths given as - an argument matches any of the given patterns. + an argument matches any of the given patterns, or if no patterns + exist. Example: - >>> from fs import wildcard - >>> is_python = wildcard.get_matcher(['*.py'], True) + >>> from fs import glob + >>> is_python = glob.get_matcher(['*.py'], True) >>> is_python('__init__.py') True >>> is_python('foo.txt') @@ -245,12 +255,12 @@ def get_matcher(patterns, case_sensitive, accept_prefix=False): """ if not patterns: - return lambda name: True + return lambda path: True if accept_prefix: new_patterns = [] for pattern in patterns: - split = _split_pattern_by_rec(pattern) + split = _split_pattern_by_sep(pattern) for i in range(1, len(split)): new_pattern = "/".join(split[:i]) new_patterns.append(new_pattern) @@ -310,18 +320,15 @@ def __repr__(self): def _make_iter(self, search="breadth", namespaces=None): # type: (str, List[str]) -> Iterator[GlobMatch] try: - levels, recursive, re_pattern = _PATTERN_CACHE[ - (self.pattern, self.case_sensitive) - ] + levels, re_pattern = _PATTERN_CACHE[(self.pattern, self.case_sensitive)] except KeyError: - levels, recursive, re_pattern = _translate_glob( - self.pattern, case_sensitive=self.case_sensitive - ) + levels, re_str = _translate_glob(self.pattern) + re_pattern = re.compile(re_str, 0 if self.case_sensitive else re.IGNORECASE) for path, info in self.fs.walk.info( path=self.path, namespaces=namespaces or self.namespaces, - max_depth=None if recursive else levels, + max_depth=levels, search=search, exclude_dirs=self.exclude_dirs, ): diff --git a/fs/walk.py b/fs/walk.py index 1724dfcc..b743e6f2 100644 --- a/fs/walk.py +++ b/fs/walk.py @@ -84,13 +84,13 @@ def __init__( a list of filename patterns, e.g. ``["~*"]``. Files matching any of these patterns will be removed from the walk. filter_dirs (list, optional): A list of patterns that will be used - to match directories paths. The walk will only open directories + to match directories names. The walk will only open directories that match at least one of these patterns. Directories will only be returned if the final component matches one of the patterns. exclude_dirs (list, optional): A list of patterns that will be used to filter out directories from the walk. e.g. - ``['*.svn', '*.git']``. Directories matching any of these + ``['*.svn', '*.git']``. Directory names matching any of these patterns will be removed from the walk. max_depth (int, optional): Maximum directory depth to walk. filter_glob (list, optional): If supplied, this parameter @@ -98,7 +98,7 @@ def __init__( Resources will only be returned if their global path or an extension of it matches one of the patterns. exclude_glob (list, optional): If supplied, this parameter - should be a list of path patterns e.g. ``["foo/**/*.py"]``. + should be a list of path patterns e.g. ``["foo/**/*.pyc"]``. Resources will not be returned if their global path or an extension of it matches one of the patterns. @@ -213,7 +213,7 @@ def _iter_walk( def _check_open_dir(self, fs, path, info): # type: (FS, Text, Info) -> bool """Check if a directory should be considered in the walk.""" - full_path = ("" if path == "/" else path) + "/" + info.name + full_path = combine(path, info.name) if self.exclude_dirs is not None and fs.match(self.exclude_dirs, info.name): return False if self.exclude_glob is not None and fs.match_glob( diff --git a/tests/test_glob.py b/tests/test_glob.py index 6147bf49..9a5d8827 100644 --- a/tests/test_glob.py +++ b/tests/test_glob.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals +import re import unittest +from parameterized import parameterized + from fs import glob, open_fs @@ -17,8 +20,8 @@ def setUp(self): fs.makedirs("a/b/c/").writetext("foo.py", "import fs") repr(fs.glob) - def test_match(self): - tests = [ + @parameterized.expand( + [ ("*.?y", "/test.py", True), ("*.py", "/test.py", True), ("*.py", "__init__.py", True), @@ -41,11 +44,11 @@ def test_match(self): ("**/", "/test/", True), ("**/", "/test.py", False), ] - for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) + ) + def test_match(self, pattern, path, expected): + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) # Run a second time to test cache - for pattern, path, expected in tests: - self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) + self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path)) def test_count_1dir(self): globber = glob.BoundGlobber(self.fs) @@ -99,3 +102,49 @@ def test_remove_all(self): globber = glob.BoundGlobber(self.fs) globber("**").remove() self.assertEqual(sorted(self.fs.listdir("/")), []) + + translate_test_cases = [ + ("foo.py", ["foo.py"], ["Foo.py", "foo_py", "foo", ".py"]), + ("foo?py", ["foo.py", "fooapy"], ["foo/py", "foopy", "fopy"]), + ("bar/foo.py", ["bar/foo.py"], []), + ("bar?foo.py", ["barafoo.py"], ["bar/foo.py"]), + ("???.py", ["foo.py", "bar.py", "FOO.py"], [".py", "foo.PY"]), + ("bar/*.py", ["bar/.py", "bar/foo.py"], ["bar/foo"]), + ("bar/foo*.py", ["bar/foo.py", "bar/foobaz.py"], ["bar/foo", "bar/.py"]), + ("*/[bar]/foo.py", ["/b/foo.py", "x/a/foo.py", "/r/foo.py"], ["b/foo.py", "/bar/foo.py"]), + ("[!bar]/foo.py", ["x/foo.py"], ["//foo.py"]), + ("[.py", ["[.py"], [".py", "."]), + ] + + @parameterized.expand(translate_test_cases) + def test_translate(self, glob_pattern, expected_matches, expected_not_matches): + translated = glob._translate(glob_pattern) + for m in expected_matches: + self.assertTrue(re.match(translated, m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m)) + + @parameterized.expand(translate_test_cases) + def test_translate_glob_simple(self, glob_pattern, expected_matches, expected_not_matches): + levels, translated = glob._translate_glob(glob_pattern) + self.assertEqual(levels, glob_pattern.count("/") + 1) + for m in expected_matches: + self.assertTrue(re.match(translated, "/" + m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m)) + self.assertFalse(re.match(translated, "/" + m)) + + @parameterized.expand( + [ + ("foo/**/bar", ["/foo/bar", "/foo/baz/bar", "/foo/baz/qux/bar"], ["/foo"]), + ("**/*/bar", ["/foo/bar", "/foo/bar"], ["/bar", "/bar"]), + ("/**/foo/**/bar", ["/baz/foo/qux/bar", "/foo/bar"], ["/bar"]), + ] + ) + def test_translate_glob(self, glob_pattern, expected_matches, expected_not_matches): + levels, translated = glob._translate_glob(glob_pattern) + self.assertIsNone(levels) + for m in expected_matches: + self.assertTrue(re.match(translated, m)) + for m in expected_not_matches: + self.assertFalse(re.match(translated, m))