Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ def __init__(
write_handle=self.write_handle,
)
self._is_stream_open: bool = False
# `offset` is the latest size of the object without staleless.
self.offset: Optional[int] = None
# `persisted_size` is the total_bytes persisted in the GCS server.
# Please note: `offset` and `persisted_size` are same when the stream is
# opened.
self.persisted_size: Optional[int] = None

async def state_lookup(self) -> int:
Expand Down Expand Up @@ -152,17 +156,17 @@ async def open(self) -> None:
if self.generation is None:
self.generation = self.write_obj_stream.generation_number
self.write_handle = self.write_obj_stream.write_handle

# Update self.persisted_size
_ = await self.state_lookup()
self.persisted_size = self.write_obj_stream.persisted_size

async def append(self, data: bytes) -> None:
"""Appends data to the Appendable object.

This method sends the provided data to the GCS server in chunks. It
maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will
automatically flush the data to make it visible to readers when that
threshold has reached.
calling `self.append` will append bytes at the end of the current size
ie. `self.offset` bytes relative to the begining of the object.

This method sends the provided `data` to the GCS server in chunks.
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
calling `self.simple_flush`.

:type data: bytes
:param data: The bytes to append to the object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ async def download_ranges(

:type read_ranges: List[Tuple[int, int, "BytesIO"]]
:param read_ranges: A list of tuples, where each tuple represents a
byte range (start_byte, bytes_to_read, writeable_buffer). Buffer has
combintaion of byte_range and writeable buffer in format -
(`start_byte`, `bytes_to_read`, `writeable_buffer`). Buffer has
to be provided by the user, and user has to make sure appropriate
memory is available in the application to avoid out-of-memory crash.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ async def open(self) -> None:
object=self.object_name,
generation=self.generation_number,
),
state_lookup=True,
)

self.socket_like_rpc = AsyncBidiRpc(
Expand All @@ -136,11 +135,17 @@ async def open(self) -> None:
raise ValueError(
"Failed to obtain object generation after opening the stream"
)
self.generation_number = response.resource.generation

if not response.write_handle:
raise ValueError("Failed to obtain write_handle after opening the stream")

if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size

self.generation_number = response.resource.generation
self.write_handle = response.write_handle

async def close(self) -> None:
Expand Down
41 changes: 31 additions & 10 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,10 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
mock_stream = mock_write_object_stream.return_value
mock_stream.open = mock.AsyncMock()
mock_stream.send = mock.AsyncMock()
mock_stream.recv = mock.AsyncMock()

mock_state_response = mock.MagicMock()
mock_state_response.persisted_size = 1024
mock_stream.recv.return_value = mock_state_response

mock_stream.generation_number = GENERATION
mock_stream.write_handle = WRITE_HANDLE
mock_stream.persisted_size = 0

# Act
await writer.open()
Expand All @@ -151,11 +146,37 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
assert writer._is_stream_open
assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE
assert writer.persisted_size == 0

expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True)
mock_stream.send.assert_awaited_once_with(expected_request)
mock_stream.recv.assert_awaited_once()
assert writer.persisted_size == 1024

@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_open_appendable_object_writer_existing_object(
mock_write_object_stream, mock_client
):
"""Test the open method."""
# Arrange
writer = AsyncAppendableObjectWriter(
mock_client, BUCKET, OBJECT, generation=GENERATION
)
mock_stream = mock_write_object_stream.return_value
mock_stream.open = mock.AsyncMock()

mock_stream.generation_number = GENERATION
mock_stream.write_handle = WRITE_HANDLE
mock_stream.persisted_size = PERSISTED_SIZE

# Act
await writer.open()

# Assert
mock_stream.open.assert_awaited_once()
assert writer._is_stream_open
assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE
assert writer.persisted_size == PERSISTED_SIZE


@pytest.mark.asyncio
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/asyncio/test_async_write_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
mock_response.resource.generation = GENERATION
mock_response.resource.size = 0
mock_response.write_handle = WRITE_HANDLE
socket_like_rpc.recv = AsyncMock(return_value=mock_response)

Expand Down Expand Up @@ -129,6 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
mock_response.resource.generation = GENERATION
mock_response.resource.size = 0
mock_response.write_handle = WRITE_HANDLE
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)

Expand All @@ -143,6 +145,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
socket_like_rpc.recv.assert_called_once()
assert stream.generation_number == GENERATION
assert stream.write_handle == WRITE_HANDLE
assert stream.persisted_size == 0


@pytest.mark.asyncio
Expand All @@ -158,6 +161,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):

mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
mock_response.resource.size = 1024
mock_response.resource.generation = GENERATION
mock_response.write_handle = WRITE_HANDLE
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
Expand All @@ -175,6 +179,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
socket_like_rpc.recv.assert_called_once()
assert stream.generation_number == GENERATION
assert stream.write_handle == WRITE_HANDLE
assert stream.persisted_size == 1024


@pytest.mark.asyncio
Expand All @@ -191,6 +196,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
mock_response.resource.generation = GENERATION
mock_response.resource.size = 0
mock_response.write_handle = WRITE_HANDLE
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)

Expand Down