From f595e1abe1703729f4e1c1a8a8631f7f3697d1af Mon Sep 17 00:00:00 2001 From: Andrew January Date: Tue, 23 Aug 2022 16:16:40 +0100 Subject: [PATCH] Add readinto to StreamingBody This adds a readinto implementation to StreamingBody, following the contract outlined in the Python io package. The primary motivation for this is that it allows consumers to wrap the StreamingBody in an io.BufferedReader. This provides a nice performance boost for parsing streaming binary data formats, where you want to be able to read small numbers of bytes in your parsing algorithm, but benefit from reading large chunks of data from upstream. --- botocore/response.py | 15 +++++++++++++++ tests/unit/test_response.py | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/botocore/response.py b/botocore/response.py index ba3fac9bab..e8ddf685e6 100644 --- a/botocore/response.py +++ b/botocore/response.py @@ -113,6 +113,21 @@ def read(self, amt=None): def readlines(self): return self._raw_stream.readlines() + def readinto(self, b): + try: + amount_read = self._raw_stream.readinto(b) + except URLLib3ReadTimeoutError as e: + # TODO: the url will be None as urllib3 isn't setting it yet + raise ReadTimeoutError(endpoint_url=e.url, error=e) + except URLLib3ProtocolError as e: + raise ResponseStreamingError(error=e) + self._amount_read += amount_read + if amount_read == 0: + # If the server sends empty contents then we know we need to verify + # the content length. + self._verify_content_length() + return amount_read + def __iter__(self): """Return an iterator to yield 1k chunks from the raw stream.""" return self.iter_chunks(self._DEFAULT_CHUNK_SIZE) diff --git a/tests/unit/test_response.py b/tests/unit/test_response.py index e1c25ceac8..8aff955c11 100644 --- a/tests/unit/test_response.py +++ b/tests/unit/test_response.py @@ -111,6 +111,24 @@ def test_streaming_body_readlines(self): chunks = [b'1234567890\n', b'1234567890\n', b'12345'] self.assertEqual(stream.readlines(), chunks) + def test_streaming_body_readinto(self): + body = BytesIO(b'123456789') + stream = response.StreamingBody(body, content_length=9) + chunk = bytearray(b'\x00\x00\x00\x00\x00') + self.assertEqual(5, stream.readinto(chunk)) + self.assertEqual(chunk, bytearray(b'\x31\x32\x33\x34\x35')) + self.assertEqual(4, stream.readinto(chunk)) + self.assertEqual(chunk, bytearray(b'\x36\x37\x38\x39\x35')) + + def test_streaming_body_readinto_with_invalid_length(self): + body = BytesIO(b'12') + stream = response.StreamingBody(body, content_length=9) + chunk = bytearray(b'\xDE\xAD\xBE\xEF') + self.assertEqual(2, stream.readinto(chunk)) + self.assertEqual(chunk, bytearray(b'\x31\x32\xBE\xEF')) + with self.assertRaises(IncompleteReadError): + stream.readinto(chunk) + def test_streaming_body_tell(self): body = BytesIO(b'1234567890') stream = response.StreamingBody(body, content_length=10)