Skip to content

Conversation

@Pulkit0110
Copy link
Member

Add bidi stream retry manager. It will the main class that would be responsible for retries.

@Pulkit0110 Pulkit0110 requested review from a team as code owners December 4, 2025 10:57
@product-auto-label product-auto-label bot added size: l Pull request size is large. api: storage Issues related to the googleapis/python-storage API. labels Dec 4, 2025
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Pulkit0110, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the resilience of asynchronous bidirectional streaming operations by introducing a dedicated retry manager. The new _BidiStreamRetryManager provides a structured way to handle transient errors, allowing streaming operations to automatically resume from their last known good state. This is achieved through a flexible strategy pattern, ensuring that complex streaming workflows can gracefully recover without manual intervention, thereby improving the overall reliability of the system.

Highlights

  • New Bidi Stream Retry Manager: Introduced the _BidiStreamRetryManager class, designed to handle retries for asynchronous bidirectional streaming operations, ensuring robustness against transient failures.
  • Pluggable Resumption Strategy: The retry manager integrates with a _BaseResumptionStrategy to manage the state of the streaming operation, allowing for flexible recovery and re-establishment of streams after an error.
  • Demonstration and Testing: A Proof of Concept (POC) script (poc_bidi_retry_final.py) was added to demonstrate the retry manager's functionality, alongside comprehensive unit tests covering various retry scenarios.
  • Copyright Headers: Copyright headers for 2025 Google LLC were added to base_strategy.py and reads_resumption_strategy.py.
  • Noxfile Update: The noxfile.py was updated to set the DOCKER_API_VERSION environment variable during the conftest_retry session, likely for test environment compatibility.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Pulkit0110 Pulkit0110 changed the title add bidi stream retry manager feat(experimental): add bidi stream retry manager Dec 4, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a _BidiStreamRetryManager to handle retries for bidirectional streaming operations, which is a valuable addition for improving robustness. The overall approach is sound, but I've identified a couple of areas for improvement. The new proof-of-concept file (poc_bidi_retry_final.py) is critically flawed and does not align with the manager's implementation, which could cause significant confusion. Additionally, the retry manager's implementation can be refactored to improve code clarity and avoid using private members of its dependencies. My review includes a critical comment on the POC and a suggestion to refactor the manager.

Comment on lines 1 to 120
# poc_bidi_retry_final.py

import asyncio
from unittest import mock
from google.api_core import exceptions
from google.api_core.retry_async import AsyncRetry

# Assuming the retry components are in these locations
# In a real scenario, these would be imported from the library
from google.cloud.storage._experimental.asyncio.retry.bidi_stream_retry_manager import (
_BidiStreamRetryManager,
)
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
_BaseResumptionStrategy,
)


class ReadResumptionStrategy(_BaseResumptionStrategy):
"""
A concrete implementation of the strategy for bidi reads.
This is a simplified version for the POC.
"""

def __init__(self):
self.state = {"offset": 0, "remaining_bytes": float("inf")}

def generate_requests(self, state):
print(f"[Strategy] Generating request with state: {state}")
# In a real scenario, this yields ReadObjectRequest protos
yield {"read_offset": state["offset"]}

def handle_response(self, response):
# In a real scenario, this is a ReadObjectResponse proto
chunk = response.get("chunk", b"")
self.state["offset"] += len(chunk)
print(f"[Strategy] Handled response, new state: {self.state}")
return response

async def recover_state_on_failure(self, error, state):
print(f"[Strategy] Recovering state from error: {error}. Current state: {state}")
# For reads, the offset is already updated, so we just return the current state
return self.state


# --- Simulation Setup ---

# A mock stream that fails once mid-stream
ATTEMPT_COUNT = 0
STREAM_CONTENT = [
[{"chunk": b"part_one"}, {"chunk": b"part_two"}, exceptions.ServiceUnavailable("Network error")],
[{"chunk": b"part_three"}, {"chunk": b"part_four"}],
]


async def mock_stream_opener(requests, state):
"""
A mock stream opener that simulates a failing and then succeeding stream.
"""
global ATTEMPT_COUNT
print(f"\n--- Stream Attempt {ATTEMPT_COUNT + 1} ---")
# Consume the request iterator (in a real scenario, this sends requests to gRPC)
_ = [req for req in requests]
print(f"Mock stream opened with state: {state}")

content_for_this_attempt = STREAM_CONTENT[ATTEMPT_COUNT]
ATTEMPT_COUNT += 1

for item in content_for_this_attempt:
await asyncio.sleep(0.01) # Simulate network latency
if isinstance(item, Exception):
print(f"!!! Stream yielding an error: {item} !!!")
raise item
else:
print(f"Stream yielding chunk of size: {len(item.get('chunk', b''))}")
yield item


async def main():
"""
Main function to run the POC.
"""
print("--- Starting Bidi Read Retry POC ---")

# 1. Define a retry policy
retry_policy = AsyncRetry(
predicate=lambda e: isinstance(e, exceptions.ServiceUnavailable),
deadline=30.0,
initial=0.1, # Start with a short wait
)

# 2. Instantiate the strategy and retry manager
strategy = ReadResumptionStrategy()
retry_manager = _BidiStreamRetryManager(
strategy=strategy, stream_opener=mock_stream_opener
)

# 3. Execute the operation
print("\nExecuting the retry manager...")
final_stream_iterator = await retry_manager.execute(
initial_state={"offset": 0}, retry_policy=retry_policy
)

# 4. Consume the final, successful stream
all_content = b""
print("\n--- Consuming Final Stream ---")
async for response in final_stream_iterator:
chunk = response.get("chunk", b"")
all_content += chunk
print(f"Received chunk: {chunk.decode()}. Total size: {len(all_content)}")

print("\n--- POC Finished ---")
print(f"Final downloaded content: {all_content.decode()}")
print(f"Total attempts made: {ATTEMPT_COUNT}")
assert all_content == b"part_onepart_twopart_threepart_four"
assert ATTEMPT_COUNT == 2
print("\nAssertion passed: Content correctly assembled across retries.")


if __name__ == "__main__":
asyncio.run(main())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This Proof of Concept file appears to be out of sync with the actual implementation of _BidiStreamRetryManager and its related components. It will fail at runtime and is misleading for developers trying to understand how to use the new retry manager.

Here are the specific issues:

  1. Incorrect execute return value handling: _BidiStreamRetryManager.execute() consumes the stream internally and returns None. The POC code incorrectly tries to iterate over this None value (line 106), which will cause a TypeError.
  2. Incorrect Strategy Implementation: The ReadResumptionStrategy in this POC does not correctly implement the _BaseResumptionStrategy abstract base class. It is missing the required update_state_from_response method and instead has a handle_response method (line 32), which will lead to a TypeError on instantiation.
  3. State Management Mismatch: The POC's strategy uses its own internal state (self.state), whereas the _BidiStreamRetryManager is designed to pass a state object to the strategy's methods for mutation.
  4. Doesn't demonstrate the intended pattern: The actual _ReadResumptionStrategy implementation (in reads_resumption_strategy.py) writes downloaded data into a user_buffer provided in the state. This POC should be updated to demonstrate this buffer-writing pattern, which is the intended use case for reads.

This file should be either removed or updated to be a working example that correctly demonstrates the functionality of the new retry manager.

Comment on lines +49 to +63
state = initial_state

async def attempt():
requests = self._strategy.generate_requests(state)
stream = self._stream_opener(requests, state)
try:
async for response in stream:
self._strategy.update_state_from_response(response, state)
return
except Exception as e:
if retry_policy._predicate(e):
await self._strategy.recover_state_on_failure(e, state)
raise e

wrapped_attempt = retry_policy(attempt)

await wrapped_attempt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of the attempt function and its exception handling can be simplified by using the on_error callback provided by google.api_core.retry_async.AsyncRetry.

This would have two main benefits:

  1. It avoids accessing the private _predicate attribute of the retry_policy object.
  2. It simplifies the logic within the attempt function by separating the core stream processing from the error recovery logic.

Using on_error makes the code cleaner and more aligned with the intended API usage of AsyncRetry.

Suggested change
state = initial_state
async def attempt():
requests = self._strategy.generate_requests(state)
stream = self._stream_opener(requests, state)
try:
async for response in stream:
self._strategy.update_state_from_response(response, state)
return
except Exception as e:
if retry_policy._predicate(e):
await self._strategy.recover_state_on_failure(e, state)
raise e
wrapped_attempt = retry_policy(attempt)
await wrapped_attempt()
state = initial_state
async def on_error(exc):
await self._strategy.recover_state_on_failure(exc, state)
async def attempt():
requests = self._strategy.generate_requests(state)
stream = self._stream_opener(requests, state)
async for response in stream:
self._strategy.update_state_from_response(response, state)
wrapped_attempt = retry_policy(attempt, on_error=on_error)
await wrapped_attempt()

@product-auto-label product-auto-label bot added size: m Pull request size is medium. and removed size: l Pull request size is large. labels Dec 4, 2025
chandra-siri and others added 4 commits December 4, 2025 23:38
…googleapis#1635)

Conformance tests were failing due to Docker API version mismatch in the
kokoro's VM and docker client.

This PR pins the `DOCKER_API_VERSION` to 1.39

RCA for kokoro failures -
https://screenshot.googleplex.com/4zsxoQ8UxqWnTky.

`Error response from daemon: client version 1.52 is too new. Maximum
supported API version is 1.39`
Add gRPC packages under extra-dependencies

Since gRPC in Python SDK is still under `_experimental` directory.
Keeping grpc packages under extra-dependencies.

These should be moved into mandatory dependencies once gRPC is out of
`_experimental` . See - b/465352227

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…gleapis#1651)

chore: fix failing system tests due to version upgrade of urllib3. 

[2.6.0](https://urllib3.readthedocs.io/en/stable/changelog.html#id1) of
urllib3 added security fixes for compressed data reads, which caused
issues from googleapis#1642 to googleapis#1649

This PR temporarily mitigates failing system test to unblock other PRs. 

Actual fix will be tracked in b/466813444
…s#1639)

Setup system tests for zonal buckets in cloud build.
@product-auto-label product-auto-label bot added size: l Pull request size is large. and removed size: m Pull request size is medium. labels Dec 8, 2025
…leapis#1637)

The Python SDK will use a hybrid approach for mTLS enablement:

If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable is set
(either true or false or any value), the SDK will respect that setting.
This is necessary for test scenarios and users who need to explicitly
control mTLS behavior.
If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable is not
set, the SDK will automatically enable mTLS only if it detects Managed
Workload Identity (MWID) or X.509 Workforce Identity Federation (WIF)
certificate sources. In other cases where the variable is not set, mTLS
will remain disabled.

---------

Signed-off-by: Radhika Agrawal <[email protected]>
Co-authored-by: Chandra Shekhar Sirimala <[email protected]>
@product-auto-label product-auto-label bot added size: xl Pull request size is extra large. and removed size: l Pull request size is large. labels Dec 9, 2025
chandra-siri and others added 10 commits December 9, 2025 13:32
…upload (googleapis#1654)

feat: send entire object checksum in the final api call of resumable
upload

fixes b/461994245

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
feat: Support urllib3 >= 2.6.0

**Context**:
* This library implements a custom decoders ( `_GzipDecoder` ,
`_BrotliDecoder` ) which inherit from `urllib3.response.ContentDecoder`
* Interface of `urllib3.response.ContentDecoder` was changed in
[2.6.0](https://urllib3.readthedocs.io/en/stable/changelog.html#id1) to
fix security vulnerability for highly compressed data reads.
(Decompression bombs)

Hence we need to change our interfaces as well. 

**Changes**
* Add `max_length` param on decompress method, provide default value of
-1 (same as urllib3's decompress)
* Provide backwards compatibility  ( ie urllib3 <= 2.5.0)
🤖 I have created a release *beep* *boop*
---


##
[3.7.0](googleapis/python-storage@v3.6.0...v3.7.0)
(2025-12-09)


### Features

* Auto enable mTLS when supported certificates are detected
([googleapis#1637](googleapis#1637))
([4e91c54](googleapis@4e91c54))
* Send entire object checksum in the final api call of resumable upload
([googleapis#1654](googleapis#1654))
([ddce7e5](googleapis@ddce7e5))
* Support urllib3 &gt;= 2.6.0
([googleapis#1658](googleapis#1658))
([57405e9](googleapis@57405e9))


### Bug Fixes

* **bucket:** Move blob fails when the new blob name contains characters
that need to be url encoded
([googleapis#1605](googleapis#1605))
([ec470a2](googleapis@ec470a2))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

---------

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
Co-authored-by: Chandra Shekhar Sirimala <[email protected]>
chore: add gcs-fs as CODEOWNERS
fix: close write object stream always. 

otherwise Task will remain for long time until GC kills it and it'll
throw this `"Task was destroyed but it is pending!"`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the googleapis/python-storage API. size: xl Pull request size is extra large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants