Skip to content

Commit 9b2e250

Browse files
committed
feat(data): support list[str] URI columns in download() expression
The row-level download() expression only accepted a scalar str URI per row. Rows that carry multiple files (e.g. a video row with N S3 frame paths) had to hand-roll a per-row ThreadPoolExecutor. Accept a list<string> column (also large_list / fixed_size_list of (large_)string): flatten every row's URIs into one flat list, run them through the existing concurrent downloader in a single pool, then re-nest into a list<binary> column preserving per-row length and order (empty list -> [], null cell -> null, failed download -> None in place). Additive: the scalar str path is unchanged -- every list branch is gated behind is_uri_list_column, which is false for scalar columns. Both the obstore and PyArrow-threaded download paths and the partition actor are made list-aware, and the range-split hidden-size-column optimization is deferred for list columns. Signed-off-by: Aydin Abiar <aydin@anyscale.com>
1 parent 2c7ca74 commit 9b2e250

6 files changed

Lines changed: 290 additions & 17 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Helpers for download columns whose cells are lists of URIs.
2+
3+
A scalar download column holds one URI string per row; a list column (e.g. the
4+
frame paths of one video) holds many. These helpers flatten a ``list<string>``
5+
column into a single flat URI list so it runs through the same concurrent
6+
downloader as the scalar path, then re-nest the downloaded bytes back into a
7+
``list<binary>`` column with the original per-row shape and order.
8+
"""
9+
from typing import List, Optional, Tuple
10+
11+
import pyarrow as pa
12+
13+
14+
def is_uri_list_column(arrow_type: "pa.DataType") -> bool:
15+
"""Return whether ``arrow_type`` is a list of strings (a multi-URI column).
16+
17+
Matches ``list`` / ``large_list`` / ``fixed_size_list`` of ``string`` or
18+
``large_string``. Scalar string columns return ``False`` and stay on the
19+
unchanged single-URI-per-row download path.
20+
"""
21+
if not (
22+
pa.types.is_list(arrow_type)
23+
or pa.types.is_large_list(arrow_type)
24+
or pa.types.is_fixed_size_list(arrow_type)
25+
):
26+
return False
27+
value_type = arrow_type.value_type
28+
return pa.types.is_string(value_type) or pa.types.is_large_string(value_type)
29+
30+
31+
def first_inner_uri(column: "pa.ChunkedArray") -> Optional[str]:
32+
"""Return the first non-null inner URI in a list<string> column, or ``None``.
33+
34+
Used only to pick the download path (obstore vs PyArrow) from a URI's
35+
scheme, mirroring the scalar path's "look at the first URI" behavior. Scans
36+
the offset-free child values, so it never indexes an empty or null cell.
37+
"""
38+
for chunk in column.iterchunks():
39+
for value in chunk.values:
40+
if value.is_valid:
41+
return value.as_py()
42+
return None
43+
44+
45+
def flatten_uri_list(
46+
column: "pa.ChunkedArray",
47+
) -> Tuple[List[Optional[str]], List[Optional[int]]]:
48+
"""Flatten a list<string> URI column into a flat URI list + per-row lengths.
49+
50+
Returns ``(flat_uris, row_lengths)``: ``flat_uris`` concatenates every row's
51+
URIs in order (null inner elements are kept as ``None`` so positions stay
52+
aligned with the downloaded bytes); ``row_lengths[i]`` is row ``i``'s URI
53+
count, or ``None`` for a null cell. Pair with :func:`renest_downloaded_bytes`.
54+
"""
55+
flat_uris: List[Optional[str]] = []
56+
row_lengths: List[Optional[int]] = []
57+
for uris in column.to_pylist():
58+
if uris is None:
59+
row_lengths.append(None)
60+
else:
61+
row_lengths.append(len(uris))
62+
flat_uris.extend(uris)
63+
return flat_uris, row_lengths
64+
65+
66+
def renest_downloaded_bytes(
67+
flat_bytes: List[Optional[bytes]], row_lengths: List[Optional[int]]
68+
) -> "pa.Array":
69+
"""Re-nest flat downloaded bytes into a ``list<binary>`` column.
70+
71+
Inverse of :func:`flatten_uri_list`: slices ``flat_bytes`` back into one
72+
inner list per row using ``row_lengths`` (``None`` -> null cell, ``0`` ->
73+
empty list), preserving per-row length and order. Failed downloads stay
74+
``None`` in place, matching the scalar path. Always returns ``list<binary>``
75+
(even for all-empty or all-null blocks) so output blocks concatenate without
76+
a ``list<null>`` type clash.
77+
"""
78+
nested: List[Optional[List[Optional[bytes]]]] = []
79+
pos = 0
80+
for length in row_lengths:
81+
if length is None:
82+
nested.append(None)
83+
continue
84+
nested.append(flat_bytes[pos : pos + length])
85+
pos += length
86+
return pa.array(nested, type=pa.list_(pa.binary()))

python/ray/data/_internal/planner/_obstore_download.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
import pyarrow as pa
1010
import pyarrow.fs
1111

12+
from ray.data._internal.planner._download_list_utils import (
13+
first_inner_uri,
14+
flatten_uri_list,
15+
is_uri_list_column,
16+
renest_downloaded_bytes,
17+
)
1218
from ray.data._internal.util import (
1319
RetryingPyFileSystem,
1420
_iter_arrow_table_for_target_max_block_size,
@@ -380,17 +386,23 @@ def download_bytes_async(
380386
if not isinstance(block, pa.Table):
381387
block = BlockAccessor.for_block(block).to_arrow()
382388

383-
first_uris = block.column(uri_column_names[0]).to_pylist()
384-
if not first_uris:
385-
yield block
386-
return
389+
first_column = block.column(uri_column_names[0])
390+
if is_uri_list_column(first_column.type):
391+
# list<string> column: peek the first inner URI for scheme detection.
392+
first_uri = first_inner_uri(first_column)
393+
else:
394+
first_uris = first_column.to_pylist()
395+
if not first_uris:
396+
yield block
397+
return
398+
first_uri = first_uris[0]
387399

388400
# Fall back to PyArrow for URI schemes obstore doesn't handle.
389-
if not _is_obstore_supported_url(first_uris[0]):
401+
if not _is_obstore_supported_url(first_uri):
390402
logger.debug(
391403
"URI scheme not supported by obstore (first URI: %s); "
392404
"falling back to PyArrow threaded download.",
393-
first_uris[0],
405+
first_uri,
394406
)
395407
yield from _yield_threaded_download_bytes(
396408
block,
@@ -420,7 +432,30 @@ def download_bytes_async(
420432
for uri_column_name, output_bytes_column_name in zip(
421433
uri_column_names, output_bytes_column_names
422434
):
423-
uris = output_block.column(uri_column_name).to_pylist()
435+
column = output_block.column(uri_column_name)
436+
437+
if is_uri_list_column(column.type):
438+
# Flatten every row's URIs into one flat list, download it through
439+
# the same concurrent engine, then re-nest preserving per-row shape.
440+
# List columns carry no __ray_file_size__ column (see
441+
# AsyncPartitionActor), so no precomputed sizes are passed.
442+
flat_uris, row_lengths = flatten_uri_list(column)
443+
flat_bytes = (
444+
asyncio.run(
445+
_download_uris_with_obstore(
446+
flat_uris, uri_column_name, filesystem=filesystem
447+
)
448+
)
449+
if flat_uris
450+
else []
451+
)
452+
output_block = output_block.append_column(
453+
output_bytes_column_name,
454+
renest_downloaded_bytes(flat_bytes, row_lengths),
455+
)
456+
continue
457+
458+
uris = column.to_pylist()
424459

425460
if not uris:
426461
continue

python/ray/data/_internal/planner/download_partition_actor.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
import pyarrow.fs as pafs
99
from typing_extensions import override
1010

11+
from ray.data._internal.planner._download_list_utils import (
12+
flatten_uri_list,
13+
is_uri_list_column,
14+
)
1115
from ray.data._internal.planner._obstore_download import (
1216
_FILE_SIZE_COLUMN_PREFIX,
1317
RAY_DATA_OBSTORE_RANGE_THRESHOLD,
@@ -73,11 +77,37 @@ def _partition_and_yield(self, block: pa.Table) -> Iterator[pa.Table]:
7377
def _sampled_file_sizes_for_partition_estimate(
7478
self, block: pa.Table, uri_column_name: str
7579
) -> List[Optional[int]]:
76-
uris = block.column(uri_column_name).to_pylist()
80+
column = block.column(uri_column_name)
81+
if is_uri_list_column(column.type):
82+
return self._sampled_list_row_sizes(column)
83+
uris = column.to_pylist()
7784
sample_uris = uris[: self.INIT_SAMPLE_BATCH_SIZE]
7885
# ``_sample_sizes`` returns concrete ``int``s; widen for this API.
7986
return cast(List[Optional[int]], self._sample_sizes(sample_uris))
8087

88+
def _sampled_list_row_sizes(
89+
self, column: "pa.ChunkedArray"
90+
) -> List[Optional[int]]:
91+
"""Per-row size estimate for a list<string> URI column.
92+
93+
Samples the first ``INIT_SAMPLE_BATCH_SIZE`` rows and estimates each
94+
row's download size as the sum of its inner files' sizes (a null/empty
95+
cell contributes 0). Returns one estimate per sampled row, so it lines
96+
up with the scalar columns in ``_estimate_nrows_per_partition``.
97+
"""
98+
sample = column.slice(0, self.INIT_SAMPLE_BATCH_SIZE)
99+
flat_uris, row_lengths = flatten_uri_list(sample)
100+
flat_sizes = self._sample_sizes(flat_uris)
101+
row_sizes: List[Optional[int]] = []
102+
pos = 0
103+
for length in row_lengths:
104+
if length is None:
105+
row_sizes.append(0)
106+
continue
107+
row_sizes.append(sum(flat_sizes[pos : pos + length]))
108+
pos += length
109+
return row_sizes
110+
81111
def _estimate_nrows_per_partition(self, block: pa.Table) -> int:
82112
sampled_file_sizes_by_column = {}
83113
for uri_column_name in self._uri_column_names:
@@ -214,8 +244,13 @@ def __call__(self, block: pa.Table) -> Iterator[pa.Table]:
214244
self._validate_uri_columns(block)
215245

216246
if block.num_rows > 0 and RAY_DATA_OBSTORE_RANGE_THRESHOLD > 0:
217-
first_uri = block.column(self._uri_column_names[0])[0].as_py()
218-
if _is_obstore_supported_url(first_uri):
247+
first_column = block.column(self._uri_column_names[0])
248+
# Range-split size hints assume scalar string cells; skip the probe
249+
# for list URI columns (their sizes are sampled without a hidden
250+
# size column).
251+
if not is_uri_list_column(
252+
first_column.type
253+
) and _is_obstore_supported_url(first_column[0].as_py()):
219254
block = self._attach_file_sizes(block)
220255

221256
yield from self._partition_and_yield(block)
@@ -289,8 +324,14 @@ def _attach_file_sizes(self, block: pa.Table) -> pa.Table:
289324
download path falls back to HEAD via obstore.
290325
"""
291326
for uri_column_name in self._uri_column_names:
327+
column = block.column(uri_column_name)
328+
if is_uri_list_column(column.type):
329+
# Defer the range-split size-hint optimization for list URI
330+
# columns: the download path samples their sizes directly and
331+
# does not expect a hidden size column.
332+
continue
292333
size_col = f"{_FILE_SIZE_COLUMN_PREFIX}{uri_column_name}"
293-
uris = block.column(uri_column_name).to_pylist()
334+
uris = column.to_pylist()
294335
# Fetches all file sizes (not just a sample).
295336
sizes = self._sample_sizes(uris)
296337
block = block.append_column(size_col, pa.array(sizes, type=pa.int64()))

python/ray/data/_internal/planner/plan_download_op.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
)
1616
from ray.data._internal.logical.operators import Download
1717
from ray.data._internal.output_buffer import OutputBlockSizeOption
18+
from ray.data._internal.planner._download_list_utils import (
19+
flatten_uri_list,
20+
is_uri_list_column,
21+
renest_downloaded_bytes,
22+
)
1823
from ray.data._internal.planner._obstore_download import (
1924
OBSTORE_AVAILABLE,
2025
_log_fallback_warning,
@@ -192,10 +197,25 @@ def download_bytes_threaded(
192197
for uri_column_name, output_bytes_column_name in zip(
193198
uri_column_names, output_bytes_column_names
194199
):
195-
# Extract URIs from PyArrow table
196-
uris = output_block.column(uri_column_name).to_pylist()
200+
# Extract URIs from PyArrow table. For a list<string> column, flatten
201+
# every row's URIs into one flat list (tracked by row_lengths) so they
202+
# all run through the same concurrent pool, then re-nest below.
203+
column = output_block.column(uri_column_name)
204+
is_list = is_uri_list_column(column.type)
205+
if is_list:
206+
uris, row_lengths = flatten_uri_list(column)
207+
else:
208+
uris = column.to_pylist()
197209

198210
if len(uris) == 0:
211+
if is_list:
212+
# Rows exist but hold only empty/null lists: still append the
213+
# re-nested (empty/null) list<binary> column so the output schema
214+
# stays consistent with blocks that did download bytes.
215+
output_block = output_block.append_column(
216+
output_bytes_column_name,
217+
renest_downloaded_bytes([], row_lengths),
218+
)
199219
continue
200220

201221
def load_uri_bytes(uri_iterator):
@@ -253,11 +273,17 @@ def load_uri_bytes(uri_iterator):
253273
)
254274
)
255275

256-
# Add the new column to the PyArrow table
257-
output_block = output_block.add_column(
258-
len(output_block.column_names),
276+
# Add the new column to the PyArrow table. For a list column, re-nest
277+
# the flat bytes back into one inner list per row (preserving length and
278+
# order); failed downloads stay None in place.
279+
new_column = (
280+
renest_downloaded_bytes(uri_bytes, row_lengths)
281+
if is_list
282+
else pa.array(uri_bytes)
283+
)
284+
output_block = output_block.append_column(
259285
output_bytes_column_name,
260-
pa.array(uri_bytes),
286+
new_column,
261287
)
262288

263289
yield from _iter_arrow_table_for_target_max_block_size(

python/ray/data/expressions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,6 +1487,11 @@ class DownloadExpr(Expr):
14871487

14881488
uri_column_name: str
14891489
filesystem: "pyarrow.fs.FileSystem" = None
1490+
# Nominal type only; unused on the download path. ``with_column`` lowers a
1491+
# ``DownloadExpr`` to a ``Download`` op (see ``Dataset.with_column``) whose
1492+
# ``infer_schema`` returns the input schema, so the output column's real type
1493+
# comes from the produced blocks: ``binary`` for a scalar URI column,
1494+
# ``list<binary>`` for a ``list<string>`` one.
14901495
data_type: DataType = field(default_factory=lambda: DataType.binary(), init=False)
14911496

14921497
def structurally_equals(self, other: Any) -> bool:

0 commit comments

Comments
 (0)