From 7fff3f774782bbc32c1a8b98345ba0d9ac4f56aa Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 23 Apr 2024 10:45:06 -0400 Subject: [PATCH 1/2] Add bulk_mkdirs --- fsspec/asyn.py | 5 ++++- fsspec/spec.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index a040efc4b..80769b84d 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -577,7 +577,7 @@ async def _put( rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] - await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) + await self._bulk_makedirs(rdirs) batch_size = batch_size or self.batch_size coros = [] @@ -590,6 +590,9 @@ async def _put( coros, batch_size=batch_size, callback=callback ) + async def _bulk_makedirs(self, dirs, **kw): + await asyncio.gather(*[self._makedirs(_, **kw) for _ in dirs]) + async def _get_file(self, rpath, lpath, **kwargs): raise NotImplementedError diff --git a/fsspec/spec.py b/fsspec/spec.py index 9a7e4e8b9..f830f7c2c 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -306,6 +306,9 @@ def makedirs(self, path, exist_ok=False): """ pass # not necessary to implement, may not have directories + def bulk_makedirs(self, path, **kw): + [self.makedirs(_, **kw) for _ in path] + def rmdir(self, path): """Remove a directory, if empty""" pass # not necessary to implement, may not have directories From 76a05f9603afe80940d4f08c38dc584d925b9044 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 23 Apr 2024 11:33:14 -0400 Subject: [PATCH 2/2] exist --- fsspec/asyn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 80769b84d..31a816836 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -577,7 +577,7 @@ async def _put( rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] - await self._bulk_makedirs(rdirs) + await self._bulk_makedirs(rdirs, exist_ok=True) batch_size = batch_size or self.batch_size coros = []