diff --git a/s3fs/core.py b/s3fs/core.py index d8837222..e1e5b4f3 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -48,7 +48,6 @@ def setup_logging(level=None): - setup_logger(logger=logger, level=(level or os.environ["S3FS_LOGGING_LEVEL"])) @@ -165,8 +164,9 @@ def _coalesce_version_id(*args): version_ids.remove(None) if len(version_ids) > 1: raise ValueError( - "Cannot coalesce version_ids where more than one are defined," - " {}".format(version_ids) + "Cannot coalesce version_ids where more than one are defined, {}".format( + version_ids + ) ) elif len(version_ids) == 0: return None @@ -710,8 +710,7 @@ def _open( kw.update(kwargs) if not self.version_aware and version_id: raise ValueError( - "version_id cannot be specified if the filesystem " - "is not version aware" + "version_id cannot be specified if the filesystem is not version aware" ) if cache_type is None: @@ -1186,7 +1185,6 @@ async def _pipe_file( self.invalidate_cache(path) return out else: - mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) @@ -1266,7 +1264,6 @@ async def _put_file( ) callback.relative_update(size) else: - mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) @@ -2161,7 +2158,7 @@ async def _invalidate_region_cache(self): async def open_async(self, path, mode="rb", **kwargs): if "b" not in mode or kwargs.get("compression"): raise ValueError - return S3AsyncStreamedFile(self, path, mode) + return S3AsyncStreamedFile(self, path, mode, **kwargs) class S3File(AbstractBufferedFile): @@ -2515,19 +2512,28 @@ def _abort_mpu(self): class S3AsyncStreamedFile(AbstractAsyncStreamedFile): - def __init__(self, fs, path, mode): + def __init__(self, fs, path, mode, **kwargs): self.fs = fs self.path = path self.mode = mode self.r = None - self.loc = 0 - self.size = None + self.loc = kwargs.get("loc", 0) + self.size = kwargs.get("size", None) async def read(self, length=-1): if self.r is None: - bucket, key, gen = self.fs.split_path(self.path) - r = await self.fs._call_s3("get_object", Bucket=bucket, Key=key) - self.size = int(r["ResponseMetadata"]["HTTPHeaders"]["content-length"]) + bucket, key, _ = self.fs.split_path(self.path) + if self.size is None: + r = await self.fs._call_s3("get_object", Bucket=bucket, Key=key) + self.size = int(r["ResponseMetadata"]["HTTPHeaders"]["content-length"]) + else: + r = await self.fs._call_s3( + "get_object", + Bucket=bucket, + Key=key, + Range="bytes=%i-%i" % (self.loc, self.loc + self.size - 1), + ) + self.r = r["Body"] out = await self.r.read(length) self.loc += len(out) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index b8a22b27..ee0f7fc4 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -2772,6 +2772,34 @@ async def read_stream(): assert b"".join(out) == data +def test_async_stream_partial(s3_base): + fn = test_bucket_name + "/target" + data = b"hello world" * 1000 + out = [] + + async def read_stream(): + fs = S3FileSystem( + anon=False, + client_kwargs={"endpoint_url": endpoint_uri}, + skip_instance_cache=True, + ) + await fs._mkdir(test_bucket_name) + await fs._pipe(fn, data) + f = await fs.open_async(fn, mode="rb", loc=0, size=len(data) // 2) + + while True: + got = await f.read(1000) + assert f.size == len(data) // 2 + assert f.tell() + if not got: + break + out.append(got) + + asyncio.run(read_stream()) + assert len(b"".join(out)) == len(data) // 2 + assert b"".join(out) == data[: len(data) // 2] + + def test_rm_invalidates_cache(s3): # Issue 761: rm_file does not invalidate cache fn = test_bucket_name + "/2014-01-01.csv"