@@ -106,10 +106,25 @@ async def get(
106106 )
107107 return prototype .buffer .from_bytes (await resp .bytes_async ()) # type: ignore[arg-type]
108108 elif isinstance (byte_range , SuffixByteRequest ):
109- resp = await obs .get_async (
110- self .store , key , options = {"range" : {"suffix" : byte_range .suffix }}
111- )
112- return prototype .buffer .from_bytes (await resp .bytes_async ()) # type: ignore[arg-type]
109+ # some object stores (Azure) don't support suffix requests. In this
110+ # case, our workaround is to first get the length of the object and then
111+ # manually request the byte range at the end.
112+ try :
113+ resp = await obs .get_async (
114+ self .store , key , options = {"range" : {"suffix" : byte_range .suffix }}
115+ )
116+ return prototype .buffer .from_bytes (await resp .bytes_async ()) # type: ignore[arg-type]
117+ except obs .exceptions .NotSupportedError :
118+ head_resp = await obs .head_async (self .store , key )
119+ file_size = head_resp ["size" ]
120+ suffix_len = byte_range .suffix
121+ buffer = await obs .get_range_async (
122+ self .store ,
123+ key ,
124+ start = file_size - suffix_len ,
125+ length = suffix_len ,
126+ )
127+ return prototype .buffer .from_bytes (buffer ) # type: ignore[arg-type]
113128 else :
114129 raise ValueError (f"Unexpected byte_range, got { byte_range } " )
115130 except _ALLOWED_EXCEPTIONS :
@@ -265,10 +280,29 @@ class _OtherRequest(TypedDict):
265280 path : str
266281 """The path to request from."""
267282
268- range : OffsetRange | SuffixRange | None
283+ range : OffsetRange | None
284+ # Note: suffix requests are handled separately because some object stores (Azure)
285+ # don't support them
269286 """The range request type."""
270287
271288
289+ class _SuffixRequest (TypedDict ):
290+ """Offset or suffix range requests.
291+
292+ These requests cannot be concurrent on the Rust side, and each need their own call
293+ to `obstore.get_async`, passing in the `range` parameter.
294+ """
295+
296+ original_request_index : int
297+ """The positional index in the original key_ranges input"""
298+
299+ path : str
300+ """The path to request from."""
301+
302+ range : SuffixRange
303+ """The suffix range."""
304+
305+
272306class _Response (TypedDict ):
273307 """A response buffer associated with the original index that it should be restored to."""
274308
@@ -317,7 +351,7 @@ async def _make_other_request(
317351 prototype : BufferPrototype ,
318352 semaphore : asyncio .Semaphore ,
319353) -> list [_Response ]:
320- """Make suffix or offset requests.
354+ """Make offset or full-file requests.
321355
322356 We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
323357 futures can be gathered together.
@@ -339,6 +373,46 @@ async def _make_other_request(
339373 ]
340374
341375
376+ async def _make_suffix_request (
377+ store : _UpstreamObjectStore ,
378+ request : _SuffixRequest ,
379+ prototype : BufferPrototype ,
380+ semaphore : asyncio .Semaphore ,
381+ ) -> list [_Response ]:
382+ """Make suffix requests.
383+
384+ This is separated out from `_make_other_request` because some object stores (Azure)
385+ don't support suffix requests. In this case, our workaround is to first get the
386+ length of the object and then manually request the byte range at the end.
387+
388+ We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
389+ futures can be gathered together.
390+ """
391+ import obstore as obs
392+
393+ async with semaphore :
394+ try :
395+ resp = await obs .get_async (store , request ["path" ], options = {"range" : request ["range" ]})
396+ buffer = await resp .bytes_async ()
397+ except obs .exceptions .NotSupportedError :
398+ head_resp = await obs .head_async (store , request ["path" ])
399+ file_size = head_resp ["size" ]
400+ suffix_len = request ["range" ]["suffix" ]
401+ buffer = await obs .get_range_async (
402+ store ,
403+ request ["path" ],
404+ start = file_size - suffix_len ,
405+ length = suffix_len ,
406+ )
407+
408+ return [
409+ {
410+ "original_request_index" : request ["original_request_index" ],
411+ "buffer" : prototype .buffer .from_bytes (buffer ), # type: ignore[arg-type]
412+ }
413+ ]
414+
415+
342416async def _get_partial_values (
343417 store : _UpstreamObjectStore ,
344418 prototype : BufferPrototype ,
@@ -358,6 +432,7 @@ async def _get_partial_values(
358432 key_ranges = list (key_ranges )
359433 per_file_bounded_requests : dict [str , list [_BoundedRequest ]] = defaultdict (list )
360434 other_requests : list [_OtherRequest ] = []
435+ suffix_requests : list [_SuffixRequest ] = []
361436
362437 for idx , (path , byte_range ) in enumerate (key_ranges ):
363438 if byte_range is None :
@@ -381,7 +456,7 @@ async def _get_partial_values(
381456 }
382457 )
383458 elif isinstance (byte_range , SuffixByteRequest ):
384- other_requests .append (
459+ suffix_requests .append (
385460 {
386461 "original_request_index" : idx ,
387462 "path" : path ,
@@ -402,6 +477,9 @@ async def _get_partial_values(
402477 for request in other_requests :
403478 futs .append (_make_other_request (store , request , prototype , semaphore = semaphore )) # noqa: PERF401
404479
480+ for suffix_request in suffix_requests :
481+ futs .append (_make_suffix_request (store , suffix_request , prototype , semaphore = semaphore )) # noqa: PERF401
482+
405483 buffers : list [Buffer | None ] = [None ] * len (key_ranges )
406484
407485 for responses in await asyncio .gather (* futs ):
0 commit comments