|
15 | 15 | from __future__ import annotations |
16 | 16 | import asyncio |
17 | 17 | import google_crc32c |
| 18 | +import grpc |
18 | 19 | from google.api_core import exceptions |
19 | 20 | from google.api_core.retry_async import AsyncRetry |
| 21 | +from google.cloud._storage_v2.types.storage import BidiReadObjectRedirectedError |
| 22 | +from google.rpc import status_pb2 |
| 23 | +from google.protobuf.any_pb2 import Any as AnyProto |
20 | 24 |
|
21 | 25 | from typing import List, Optional, Tuple, Any, Dict |
22 | 26 |
|
|
40 | 44 |
|
41 | 45 |
|
42 | 46 | _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 |
43 | | - |
| 47 | +_BIDI_READ_REDIRECTED_TYPE_URL = "type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError" |
| 48 | + |
| 49 | + |
| 50 | +def _is_read_retryable(exc): |
| 51 | + """Predicate to determine if a read operation should be retried.""" |
| 52 | + print(f"--- Checking if retryable: {type(exc)}: {exc}") |
| 53 | + if isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded, exceptions.TooManyRequests)): |
| 54 | + return True |
| 55 | + |
| 56 | + grpc_error = None |
| 57 | + if isinstance(exc, exceptions.GoogleAPICallError) and exc.errors: |
| 58 | + if isinstance(exc.errors[0], grpc.aio.AioRpcError): |
| 59 | + grpc_error = exc.errors[0] |
| 60 | + |
| 61 | + if grpc_error: |
| 62 | + print(f"--- Wrapped grpc.aio.AioRpcError code: {grpc_error.code()}") |
| 63 | + if grpc_error.code() in ( |
| 64 | + grpc.StatusCode.UNAVAILABLE, |
| 65 | + grpc.StatusCode.INTERNAL, |
| 66 | + grpc.StatusCode.DEADLINE_EXCEEDED, |
| 67 | + grpc.StatusCode.RESOURCE_EXHAUSTED, |
| 68 | + ): |
| 69 | + return True |
| 70 | + if grpc_error.code() == grpc.StatusCode.ABORTED: |
| 71 | + trailers = grpc_error.trailing_metadata() |
| 72 | + if not trailers: |
| 73 | + print("--- No trailers") |
| 74 | + return False |
| 75 | + |
| 76 | + status_details_bin = None |
| 77 | + # *** CORRECTED TRAILER ACCESS *** |
| 78 | + for key, value in trailers: |
| 79 | + if key == 'grpc-status-details-bin': |
| 80 | + status_details_bin = value |
| 81 | + break |
| 82 | + |
| 83 | + if status_details_bin: |
| 84 | + status_proto = status_pb2.Status() |
| 85 | + try: |
| 86 | + status_proto.ParseFromString(status_details_bin) |
| 87 | + for detail in status_proto.details: |
| 88 | + if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL: |
| 89 | + print("--- Found BidiReadObjectRedirectedError, is retryable") |
| 90 | + return True |
| 91 | + print("--- BidiReadObjectRedirectedError type URL not found in details") |
| 92 | + except Exception as e: |
| 93 | + print(f"--- Error parsing status_details_bin: {e}") |
| 94 | + return False |
| 95 | + else: |
| 96 | + print("--- No grpc-status-details-bin in trailers") |
| 97 | + return False |
44 | 98 |
|
45 | 99 | class AsyncMultiRangeDownloader: |
46 | 100 | """Provides an interface for downloading multiple ranges of a GCS ``Object`` |
@@ -158,43 +212,95 @@ def __init__( |
158 | 212 | self.read_handle = read_handle |
159 | 213 | self.read_obj_str: Optional[_AsyncReadObjectStream] = None |
160 | 214 | self._is_stream_open: bool = False |
| 215 | + self._routing_token: Optional[str] = None |
| 216 | + |
| 217 | + async def _on_open_error(self, exc): |
| 218 | + """Extracts routing token and read handle on redirect error during open.""" |
| 219 | + print(f"--- _on_open_error called with {type(exc)}: {exc}") |
| 220 | + grpc_error = None |
| 221 | + if isinstance(exc, exceptions.GoogleAPICallError) and exc.errors: |
| 222 | + if isinstance(exc.errors[0], grpc.aio.AioRpcError): |
| 223 | + grpc_error = exc.errors[0] |
| 224 | + |
| 225 | + if grpc_error and grpc_error.code() == grpc.StatusCode.ABORTED: |
| 226 | + trailers = grpc_error.trailing_metadata() |
| 227 | + if not trailers: return |
| 228 | + |
| 229 | + status_details_bin = None |
| 230 | + # *** CORRECTED TRAILER ACCESS *** |
| 231 | + for key, value in trailers: |
| 232 | + if key == 'grpc-status-details-bin': |
| 233 | + status_details_bin = value |
| 234 | + break |
| 235 | + |
| 236 | + if status_details_bin: |
| 237 | + status_proto = status_pb2.Status() |
| 238 | + try: |
| 239 | + status_proto.ParseFromString(status_details_bin) |
| 240 | + for detail in status_proto.details: |
| 241 | + if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL: |
| 242 | + redirect_proto = BidiReadObjectRedirectedError() |
| 243 | + detail.Unpack(redirect_proto) |
| 244 | + if redirect_proto.routing_token: |
| 245 | + self._routing_token = redirect_proto.routing_token |
| 246 | + if redirect_proto.read_handle and redirect_proto.read_handle.handle: |
| 247 | + self.read_handle = redirect_proto.read_handle.handle |
| 248 | + print(f"--- BidiReadObjectRedirectedError caught in open, new token: {self._routing_token}, handle: {self.read_handle}") |
| 249 | + break |
| 250 | + except Exception as e: |
| 251 | + print(f"--- Error unpacking redirect in _on_open_error: {e}") |
| 252 | + |
| 253 | + if self.read_obj_str and self.read_obj_str._is_open: |
| 254 | + try: |
| 255 | + await self.read_obj_str.close() |
| 256 | + except Exception: |
| 257 | + pass |
| 258 | + self._is_stream_open = False |
161 | 259 |
|
162 | 260 | async def open(self, retry_policy: Optional[AsyncRetry] = None) -> None: |
163 | | - """Opens the bidi-gRPC connection to read from the object. |
164 | | -
|
165 | | - This method initializes and opens an `_AsyncReadObjectStream` (bidi-gRPC stream) to |
166 | | - for downloading ranges of data from GCS ``Object``. |
167 | | -
|
168 | | - "Opening" constitutes fetching object metadata such as generation number |
169 | | - and read handle and sets them as attributes if not already set. |
170 | | - """ |
| 261 | + """Opens the bidi-gRPC connection to read from the object.""" |
171 | 262 | if self._is_stream_open: |
172 | 263 | raise ValueError("Underlying bidi-gRPC stream is already open") |
173 | 264 |
|
174 | 265 | if retry_policy is None: |
175 | | - # Default policy: retry generic transient errors |
176 | | - retry_policy = AsyncRetry( |
177 | | - predicate=lambda e: isinstance(e, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)) |
178 | | - ) |
| 266 | + retry_policy = AsyncRetry(predicate=_is_read_retryable, on_error=self._on_open_error) |
| 267 | + else: |
| 268 | + original_on_error = retry_policy._on_error |
| 269 | + async def combined_on_error(exc): |
| 270 | + await self._on_open_error(exc) |
| 271 | + if original_on_error: |
| 272 | + await original_on_error(exc) |
| 273 | + retry_policy = retry_policy.with_predicate(_is_read_retryable).with_on_error(combined_on_error) |
179 | 274 |
|
180 | 275 | async def _do_open(): |
| 276 | + print("--- Attempting _do_open") |
| 277 | + if self._is_stream_open: |
| 278 | + self._is_stream_open = False |
| 279 | + |
181 | 280 | self.read_obj_str = _AsyncReadObjectStream( |
182 | 281 | client=self.client, |
183 | 282 | bucket_name=self.bucket_name, |
184 | 283 | object_name=self.object_name, |
185 | 284 | generation_number=self.generation_number, |
186 | 285 | read_handle=self.read_handle, |
187 | 286 | ) |
188 | | - await self.read_obj_str.open() |
| 287 | + |
| 288 | + metadata = [] |
| 289 | + if self._routing_token: |
| 290 | + metadata.append(("x-goog-request-params", f"routing_token={self._routing_token}")) |
| 291 | + print(f"--- Using routing_token for open: {self._routing_token}") |
| 292 | + self._routing_token = None |
| 293 | + |
| 294 | + await self.read_obj_str.open(metadata=metadata if metadata else None) |
189 | 295 |
|
190 | 296 | if self.read_obj_str.generation_number: |
191 | 297 | self.generation_number = self.read_obj_str.generation_number |
192 | 298 | if self.read_obj_str.read_handle: |
193 | 299 | self.read_handle = self.read_obj_str.read_handle |
194 | 300 |
|
195 | 301 | self._is_stream_open = True |
| 302 | + print("--- Stream opened successfully") |
196 | 303 |
|
197 | | - # Execute open with retry policy |
198 | 304 | await retry_policy(_do_open)() |
199 | 305 |
|
200 | 306 | async def download_ranges( |
@@ -259,9 +365,7 @@ async def download_ranges( |
259 | 365 | lock = asyncio.Lock() |
260 | 366 |
|
261 | 367 | if retry_policy is None: |
262 | | - retry_policy = AsyncRetry( |
263 | | - predicate=lambda e: isinstance(e, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)) |
264 | | - ) |
| 368 | + retry_policy = AsyncRetry(predicate=_is_read_retryable) |
265 | 369 |
|
266 | 370 | # Initialize Global State for Retry Strategy |
267 | 371 | download_states = {} |
|
0 commit comments