Skip to content

Commit bad42bd

Browse files
committed
minor change
1 parent a4a7296 commit bad42bd

File tree

4 files changed

+140
-14
lines changed

4 files changed

+140
-14
lines changed

google/cloud/storage/_experimental/asyncio/retry/base_strategy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def update_state_from_response(self, response: Any, state: Any) -> None:
6363
pass
6464

6565
@abc.abstractmethod
66-
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
66+
def recover_state_on_failure(self, error: Exception, state: Any) -> None:
6767
"""Prepares the state for the next retry attempt after a failure.
6868
6969
This method is called when a retriable gRPC error occurs. It is

google/cloud/storage/_experimental/asyncio/retry/bidi_stream_retry_manager.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ def __init__(
3333
stream_opener: Callable[[Iterable[Any], Any], AsyncIterator[Any]],
3434
):
3535
"""Initializes the retry manager.
36-
3736
Args:
3837
strategy: The strategy for managing the state of a specific
3938
bidi operation (e.g., reads or writes).
@@ -45,28 +44,35 @@ def __init__(
4544
async def execute(self, initial_state: Any, retry_policy: "AsyncRetry"):
4645
"""
4746
Executes the bidi operation with the configured retry policy.
48-
4947
Args:
5048
initial_state: An object containing all state for the operation.
5149
retry_policy: The `google.api_core.retry_async.AsyncRetry` object to
5250
govern the retry behavior for this specific operation.
5351
"""
5452
state = initial_state
5553

54+
def on_error(e: Exception):
55+
"""The single point of recovery logic."""
56+
self._strategy.recover_state_on_failure(e, state)
57+
5658
async def attempt():
59+
"""The core operation to be retried."""
5760
requests = self._strategy.generate_requests(state)
5861
stream = self._stream_opener(requests, state)
59-
try:
60-
async for response in stream:
61-
self._strategy.update_state_from_response(response, state)
62-
return # Successful completion of the stream.
63-
except Exception as e:
64-
if retry_policy._predicate(e):
65-
await self._strategy.recover_state_on_failure(e, state)
66-
raise e
62+
async for response in stream:
63+
self._strategy.update_state_from_response(response, state)
64+
65+
# Correctly create a new retry instance with the on_error handler.
66+
retry_with_error_handler = type(retry_policy)(
67+
predicate=retry_policy._predicate,
68+
initial=retry_policy._initial,
69+
maximum=retry_policy._maximum,
70+
multiplier=retry_policy._multiplier,
71+
deadline=retry_policy._deadline,
72+
on_error=on_error,
73+
)
6774

68-
# Wrap the attempt function with the retry policy.
69-
wrapped_attempt = retry_policy(attempt)
75+
wrapped_attempt = retry_with_error_handler(attempt)
7076

7177
# Execute the operation with retry.
7278
await wrapped_attempt()

google/cloud/storage/_experimental/asyncio/retry/reads_resumption_strategy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def update_state_from_response(
9090
response, f"Byte count mismatch for read_id {read_id}"
9191
)
9292

93-
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
93+
def recover_state_on_failure(self, error: Exception, state: Any) -> None:
9494
"""Handles BidiReadObjectRedirectedError for reads."""
9595
# This would parse the gRPC error details, extract the routing_token,
9696
# and store it on the shared state object.

poc_bidi_retry_final.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# poc_bidi_retry_final.py
2+
3+
import asyncio
4+
from unittest import mock
5+
from google.api_core import exceptions
6+
from google.api_core.retry_async import AsyncRetry
7+
8+
# Assuming the retry components are in these locations
9+
# In a real scenario, these would be imported from the library
10+
from google.cloud.storage._experimental.asyncio.retry.bidi_stream_retry_manager import (
11+
_BidiStreamRetryManager,
12+
)
13+
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
14+
_BaseResumptionStrategy,
15+
)
16+
17+
18+
class ReadResumptionStrategy(_BaseResumptionStrategy):
19+
"""
20+
A concrete implementation of the strategy for bidi reads.
21+
This is a simplified version for the POC.
22+
"""
23+
24+
def __init__(self):
25+
self.state = {"offset": 0, "remaining_bytes": float("inf")}
26+
27+
def generate_requests(self, state):
28+
print(f"[Strategy] Generating request with state: {state}")
29+
# In a real scenario, this yields ReadObjectRequest protos
30+
yield {"read_offset": state["offset"]}
31+
32+
def handle_response(self, response):
33+
# In a real scenario, this is a ReadObjectResponse proto
34+
chunk = response.get("chunk", b"")
35+
self.state["offset"] += len(chunk)
36+
print(f"[Strategy] Handled response, new state: {self.state}")
37+
return response
38+
39+
async def recover_state_on_failure(self, error, state):
40+
print(f"[Strategy] Recovering state from error: {error}. Current state: {state}")
41+
# For reads, the offset is already updated, so we just return the current state
42+
return self.state
43+
44+
45+
# --- Simulation Setup ---
46+
47+
# A mock stream that fails once mid-stream
48+
ATTEMPT_COUNT = 0
49+
STREAM_CONTENT = [
50+
[{"chunk": b"part_one"}, {"chunk": b"part_two"}, exceptions.ServiceUnavailable("Network error")],
51+
[{"chunk": b"part_three"}, {"chunk": b"part_four"}],
52+
]
53+
54+
55+
async def mock_stream_opener(requests, state):
56+
"""
57+
A mock stream opener that simulates a failing and then succeeding stream.
58+
"""
59+
global ATTEMPT_COUNT
60+
print(f"\n--- Stream Attempt {ATTEMPT_COUNT + 1} ---")
61+
# Consume the request iterator (in a real scenario, this sends requests to gRPC)
62+
_ = [req for req in requests]
63+
print(f"Mock stream opened with state: {state}")
64+
65+
content_for_this_attempt = STREAM_CONTENT[ATTEMPT_COUNT]
66+
ATTEMPT_COUNT += 1
67+
68+
for item in content_for_this_attempt:
69+
await asyncio.sleep(0.01) # Simulate network latency
70+
if isinstance(item, Exception):
71+
print(f"!!! Stream yielding an error: {item} !!!")
72+
raise item
73+
else:
74+
print(f"Stream yielding chunk of size: {len(item.get('chunk', b''))}")
75+
yield item
76+
77+
78+
async def main():
79+
"""
80+
Main function to run the POC.
81+
"""
82+
print("--- Starting Bidi Read Retry POC ---")
83+
84+
# 1. Define a retry policy
85+
retry_policy = AsyncRetry(
86+
predicate=lambda e: isinstance(e, exceptions.ServiceUnavailable),
87+
deadline=30.0,
88+
initial=0.1, # Start with a short wait
89+
)
90+
91+
# 2. Instantiate the strategy and retry manager
92+
strategy = ReadResumptionStrategy()
93+
retry_manager = _BidiStreamRetryManager(
94+
strategy=strategy, stream_opener=mock_stream_opener
95+
)
96+
97+
# 3. Execute the operation
98+
print("\nExecuting the retry manager...")
99+
final_stream_iterator = await retry_manager.execute(
100+
initial_state={"offset": 0}, retry_policy=retry_policy
101+
)
102+
103+
# 4. Consume the final, successful stream
104+
all_content = b""
105+
print("\n--- Consuming Final Stream ---")
106+
async for response in final_stream_iterator:
107+
chunk = response.get("chunk", b"")
108+
all_content += chunk
109+
print(f"Received chunk: {chunk.decode()}. Total size: {len(all_content)}")
110+
111+
print("\n--- POC Finished ---")
112+
print(f"Final downloaded content: {all_content.decode()}")
113+
print(f"Total attempts made: {ATTEMPT_COUNT}")
114+
assert all_content == b"part_onepart_twopart_threepart_four"
115+
assert ATTEMPT_COUNT == 2
116+
print("\nAssertion passed: Content correctly assembled across retries.")
117+
118+
119+
if __name__ == "__main__":
120+
asyncio.run(main())

0 commit comments

Comments
 (0)