diff --git a/fs/mountfs.py b/fs/mountfs.py index 5e590637..39bd578f 100644 --- a/fs/mountfs.py +++ b/fs/mountfs.py @@ -13,8 +13,10 @@ from .base import FS from .memoryfs import MemoryFS from .path import abspath +from .path import dirname from .path import forcedir from .path import normpath +from .path import recursepath from .mode import validate_open_mode from .mode import validate_openbin_mode @@ -23,10 +25,10 @@ Any, BinaryIO, Collection, + Dict, Iterator, IO, List, - MutableSequence, Optional, Text, Tuple, @@ -67,7 +69,7 @@ def __init__(self, auto_close=True): super(MountFS, self).__init__() self.auto_close = auto_close self.default_fs = MemoryFS() # type: FS - self.mounts = [] # type: MutableSequence[Tuple[Text, FS]] + self.mounts = {} # type: Dict[Text, FS] def __repr__(self): # type: () -> str @@ -85,19 +87,26 @@ def _delegate(self, path): path (str): A path. Returns: - (FS, str): a tuple of ``(, )`` for a mounted filesystem, - or ``(None, None)`` if no filesystem is mounted on the + (FS, str): a tuple of ``(, )`` for a mounted filesystem. + Raises ``ResourceNotFound`` if no filesystem is mounted on the given ``path``. """ _path = forcedir(abspath(normpath(path))) is_mounted = _path.startswith - for mount_path, fs in self.mounts: + if _path == "/": + return self.default_fs, path + + for mount_path in self.mounts: if is_mounted(mount_path): - return fs, _path[len(mount_path) :].rstrip("/") + return self.mounts[mount_path], _path[len(mount_path) :].rstrip("/") + + for mount_path in self.mounts: + if mount_path.startswith(_path): + return self.default_fs, path - return self.default_fs, path + raise errors.ResourceNotFound(path) def mount(self, path, fs): # type: (Text, Union[FS, Text]) -> None @@ -120,28 +129,53 @@ def mount(self, path, fs): raise ValueError("Unable to mount self") _path = forcedir(abspath(normpath(path))) - for mount_path, _ in self.mounts: + if _path == "/": + raise MountError("mount point can't be empty") + + for mount_path in self.mounts: if _path.startswith(mount_path): raise MountError("mount point overlaps existing mount") - self.mounts.append((_path, fs)) + self.mounts[_path] = fs self.default_fs.makedirs(_path, recreate=True) + def unmount(self, path): + # type: (Text) -> None + """Unmounts a previously-mounted FS. + + Arguments: + path (str): A mount-path previously used with ``mount``. + + """ + _path = forcedir(abspath(normpath(path))) + if _path not in self.mounts: + raise ValueError("not a current mount point") + + if self.auto_close: + self.mounts[_path].close() + del self.mounts[_path] + self.default_fs.removedir(_path) + for parent in recursepath(dirname(_path.rstrip("/")), reverse=True): + if parent == "/" or not self.default_fs.isempty(parent): + break + else: + self.default_fs.removedir(parent) + def close(self): # type: () -> None # Explicitly closes children if requested if self.auto_close: - for _path, fs in self.mounts: + for fs in self.mounts.values(): fs.close() - del self.mounts[:] + self.mounts.clear() self.default_fs.close() super(MountFS, self).close() def desc(self, path): # type: (Text) -> Text + fs, delegate_path = self._delegate(path) if not self.exists(path): raise errors.ResourceNotFound(path) - fs, delegate_path = self._delegate(path) if fs is self.default_fs: fs = self return "{path} on {fs}".format(fs=fs, path=delegate_path) diff --git a/tests/test_mountfs.py b/tests/test_mountfs.py index 1ffa82d2..69942183 100644 --- a/tests/test_mountfs.py +++ b/tests/test_mountfs.py @@ -2,6 +2,7 @@ import unittest +from fs.errors import ResourceNotFound from fs.mountfs import MountError, MountFS from fs.memoryfs import MemoryFS from fs.tempfs import TempFS @@ -11,16 +12,6 @@ class TestMountFS(FSTestCases, unittest.TestCase): """Test OSFS implementation.""" - def make_fs(self): - fs = MountFS() - mem_fs = MemoryFS() - fs.mount("/", mem_fs) - return fs - - -class TestMountFS2(FSTestCases, unittest.TestCase): - """Test OSFS implementation.""" - def make_fs(self): fs = MountFS() mem_fs = MemoryFS() @@ -35,6 +26,11 @@ def test_bad_mount(self): mount_fs.mount("foo", 5) with self.assertRaises(TypeError): mount_fs.mount("foo", b"bar") + m1 = MemoryFS() + with self.assertRaises(MountError): + mount_fs.mount("", m1) + with self.assertRaises(MountError): + mount_fs.mount("/", m1) def test_listdir(self): mount_fs = MountFS() @@ -42,14 +38,32 @@ def test_listdir(self): m1 = MemoryFS() m3 = MemoryFS() m4 = TempFS() + m5 = MemoryFS() mount_fs.mount("/m1", m1) mount_fs.mount("/m2", "temp://") mount_fs.mount("/m3", m3) with self.assertRaises(MountError): mount_fs.mount("/m3/foo", m4) self.assertEqual(sorted(mount_fs.listdir("/")), ["m1", "m2", "m3"]) + mount_fs.makedir("/m2/foo") + self.assertEqual(sorted(mount_fs.listdir("/m2")), ["foo"]) m3.makedir("foo") self.assertEqual(sorted(mount_fs.listdir("/m3")), ["foo"]) + mount_fs.mount("/subdir/m4", m4) + self.assertEqual(sorted(mount_fs.listdir("/")), ["m1", "m2", "m3", "subdir"]) + self.assertEqual(mount_fs.listdir("/subdir"), ["m4"]) + self.assertEqual(mount_fs.listdir("/subdir/m4"), []) + mount_fs.mount("/subdir/m5", m5) + self.assertEqual(sorted(mount_fs.listdir("/subdir")), ["m4", "m5"]) + self.assertEqual(mount_fs.listdir("/subdir/m5"), []) + mount_fs.makedir("/subdir/m4/foo") + mount_fs.makedir("/subdir/m5/bar") + self.assertEqual(mount_fs.listdir("/subdir/m4"), ["foo"]) + self.assertEqual(mount_fs.listdir("/subdir/m5"), ["bar"]) + self.assertEqual(m4.listdir("/"), ["foo"]) + self.assertEqual(m5.listdir("/"), ["bar"]) + m5.removedir("/bar") + self.assertEqual(mount_fs.listdir("/subdir/m5"), []) def test_auto_close(self): """Test MountFS auto close is working""" @@ -85,8 +99,60 @@ def test_empty(self): def test_mount_self(self): mount_fs = MountFS() with self.assertRaises(ValueError): - mount_fs.mount("/", mount_fs) + mount_fs.mount("/m1", mount_fs) def test_desc(self): mount_fs = MountFS() mount_fs.desc("/") + + def test_makedirs(self): + mount_fs = MountFS() + with self.assertRaises(ResourceNotFound): + mount_fs.makedir("empty") + m1 = MemoryFS() + m2 = MemoryFS() + with self.assertRaises(ResourceNotFound): + mount_fs.makedirs("/m1/foo/bar", recreate=True) + mount_fs.mount("/m1", m1) + mount_fs.makedirs("/m1/foo/bar", recreate=True) + self.assertEqual(m1.listdir("foo"), ["bar"]) + with self.assertRaises(ResourceNotFound): + mount_fs.makedirs("/subdir/m2/bar/foo", recreate=True) + mount_fs.mount("/subdir/m2", m2) + mount_fs.makedirs("/subdir/m2/bar/foo", recreate=True) + self.assertEqual(m2.listdir("bar"), ["foo"]) + with self.assertRaises(ResourceNotFound): + mount_fs.makedir("/subdir/m3", recreate=True) + + def test_unmount(self): + mount_fs = MountFS() + m1 = MemoryFS() + m2 = MemoryFS() + m3 = MemoryFS() + m4 = MemoryFS() + mount_fs.mount("/m1", m1) + with self.assertRaises(ValueError): + mount_fs.unmount("/m2") + mount_fs.mount("/m2", m2) + self.assertEqual(sorted(mount_fs.listdir("/")), ["m1", "m2"]) + mount_fs.unmount("/m1") + with self.assertRaises(ResourceNotFound): + mount_fs.listdir("/m1") + self.assertEqual(mount_fs.listdir("/"), ["m2"]) + with self.assertRaises(ValueError): + mount_fs.unmount("/m1") + mount_fs.mount("/subdir/m3", m3) + with self.assertRaises(ValueError): + mount_fs.unmount("/subdir") + mount_fs.mount("/subdir/m4", m4) + self.assertEqual(sorted(mount_fs.listdir("/")), ["m2", "subdir"]) + mount_fs.makedir("/subdir/m4/foo") + with self.assertRaises(ValueError): + mount_fs.unmount("/subdir/m4/foo") + mount_fs.unmount("/subdir/m4") + self.assertEqual(sorted(mount_fs.listdir("/")), ["m2", "subdir"]) + self.assertEqual(mount_fs.listdir("/subdir"), ["m3"]) + mount_fs.unmount("/subdir/m3") + self.assertEqual(mount_fs.listdir("/"), ["m2"]) + with self.assertRaises(ResourceNotFound): + mount_fs.listdir("/subdir")