From 4cfb9df8f0edec9e9d56ab000796b4e7b63703db Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:18:47 -0400 Subject: [PATCH 1/7] Add _pipe_file and test --- sshfs/spec.py | 16 ++++++++++++++++ tests/test_sshfs.py | 10 ++++++++++ 2 files changed, 26 insertions(+) diff --git a/sshfs/spec.py b/sshfs/spec.py index 8056dd4..f387a47 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -331,3 +331,19 @@ async def _cat_file(self, path, **kwargs): async with self._pool.get() as channel: async with channel.open(path, "rb") as f: return await f.read() + + @wrap_exceptions + async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): + """Asynchronously writes the given data to a remote file in chunks.""" + await self._makedirs(self._parent(path), exist_ok=True) + + async with self._pool.get() as channel: + async with channel.open(path, 'wb') as f: + for i in range(0, len(data), chunksize): + chunk = data[i:i + chunksize] + await f.write(chunk) + await f.flush() + + self.invalidate_cache(path) + + pipe_file = sync_wrapper(_pipe_file) diff --git a/tests/test_sshfs.py b/tests/test_sshfs.py index c30bdfc..3dfb048 100644 --- a/tests/test_sshfs.py +++ b/tests/test_sshfs.py @@ -361,3 +361,13 @@ def test_cat_file_sync(fs, remote_dir): assert ( read_content == test_content ), "The content read from the file does not match the content written." + + +def test_pipe_file(fs, remote_dir): + test_data = b"Test data for pipe_file" * (2**20) # 1 MB of test data + test_file_path = remote_dir + "/test_pipe_file.txt" + + fs.pipe_file(test_file_path, test_data) + + with fs.open(test_file_path, "rb") as f: + assert f.read() == test_data, "The data read from the file does not match the data written." From f734c9427520cf353c96ec334707ac447e579cf3 Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:24:43 -0400 Subject: [PATCH 2/7] Remove sync_wrapper --- sshfs/spec.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index f387a47..64de368 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -336,7 +336,7 @@ async def _cat_file(self, path, **kwargs): async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): """Asynchronously writes the given data to a remote file in chunks.""" await self._makedirs(self._parent(path), exist_ok=True) - + async with self._pool.get() as channel: async with channel.open(path, 'wb') as f: for i in range(0, len(data), chunksize): @@ -346,4 +346,3 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): self.invalidate_cache(path) - pipe_file = sync_wrapper(_pipe_file) From 6b3abf7a2249c6bf8c5f4934b7db6d9b3f2914cb Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:27:41 -0400 Subject: [PATCH 3/7] Fix formatting --- tests/test_sshfs.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_sshfs.py b/tests/test_sshfs.py index 3dfb048..a6867f6 100644 --- a/tests/test_sshfs.py +++ b/tests/test_sshfs.py @@ -87,7 +87,9 @@ def test_fsspec_registration(ssh_server): def test_fsspec_url_parsing(ssh_server, remote_dir, user="user"): for ep in pkg_resources.iter_entry_points(group="fsspec.specs"): - url = f"{ep.name}://{user}@{ssh_server.host}:{ssh_server.port}/{remote_dir}/file" + url = ( + f"{ep.name}://{user}@{ssh_server.host}:{ssh_server.port}/{remote_dir}/file" + ) with fsspec.open(url, "w", client_keys=[USERS[user]]) as file: # Check the underlying file system. file_fs = file.buffer.fs @@ -296,7 +298,6 @@ def read_random_file(name): return stream.read() with futures.ThreadPoolExecutor() as executor: - write_futures, _ = futures.wait( [executor.submit(create_random_file) for _ in range(64)], return_when=futures.ALL_COMPLETED, @@ -370,4 +371,6 @@ def test_pipe_file(fs, remote_dir): fs.pipe_file(test_file_path, test_data) with fs.open(test_file_path, "rb") as f: - assert f.read() == test_data, "The data read from the file does not match the data written." + assert ( + f.read() == test_data + ), "The data read from the file does not match the data written." From 502e338d8e3b5e5be5fb4b18c2786ca9af3b05b6 Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:29:38 -0400 Subject: [PATCH 4/7] Fix formatting --- sshfs/spec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index 64de368..7f05cf9 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -338,9 +338,9 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): await self._makedirs(self._parent(path), exist_ok=True) async with self._pool.get() as channel: - async with channel.open(path, 'wb') as f: + async with channel.open(path, "wb") as f: for i in range(0, len(data), chunksize): - chunk = data[i:i + chunksize] + chunk = data[i : i + chunksize] await f.write(chunk) await f.flush() From 226e9761163e9221b7bb159188e3601d64ae7c0f Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:31:07 -0400 Subject: [PATCH 5/7] Fix formatting --- tests/test_sshfs.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_sshfs.py b/tests/test_sshfs.py index a6867f6..a9e858a 100644 --- a/tests/test_sshfs.py +++ b/tests/test_sshfs.py @@ -87,9 +87,7 @@ def test_fsspec_registration(ssh_server): def test_fsspec_url_parsing(ssh_server, remote_dir, user="user"): for ep in pkg_resources.iter_entry_points(group="fsspec.specs"): - url = ( - f"{ep.name}://{user}@{ssh_server.host}:{ssh_server.port}/{remote_dir}/file" - ) + url = f"{ep.name}://{user}@{ssh_server.host}:{ssh_server.port}/{remote_dir}/file" with fsspec.open(url, "w", client_keys=[USERS[user]]) as file: # Check the underlying file system. file_fs = file.buffer.fs From e8c524a3ce679d90373bf717a6df2d0b1ce0f79a Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Fri, 31 May 2024 09:33:12 -0400 Subject: [PATCH 6/7] Fix formatting --- sshfs/spec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index 7f05cf9..e37590d 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -345,4 +345,3 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): await f.flush() self.invalidate_cache(path) - From b75caeb4dd283b506ee0a1cef063966250230dcf Mon Sep 17 00:00:00 2001 From: Kyle Harrington Date: Mon, 3 Jun 2024 10:01:56 -0400 Subject: [PATCH 7/7] Remove flush call, flush is implicit in SFTP --- sshfs/spec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sshfs/spec.py b/sshfs/spec.py index e37590d..6dba100 100644 --- a/sshfs/spec.py +++ b/sshfs/spec.py @@ -342,6 +342,5 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): for i in range(0, len(data), chunksize): chunk = data[i : i + chunksize] await f.write(chunk) - await f.flush() self.invalidate_cache(path)