Skip to content

Commit 3c38d63

Browse files
authored
fix: replace deprecated ray parallelism arg with override_num_blocks (#2876)
1 parent 30276b2 commit 3c38d63

13 files changed

+45
-20
lines changed

awswrangler/distributed/ray/modin/s3/_read_orc.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def _read_orc_distributed(
2626
schema: pa.schema | None,
2727
columns: list[str] | None,
2828
use_threads: bool | int,
29-
parallelism: int,
29+
override_num_blocks: int,
3030
version_ids: dict[str, str] | None,
3131
s3_client: "S3Client" | None,
3232
s3_additional_kwargs: dict[str, Any] | None,
@@ -43,7 +43,7 @@ def _read_orc_distributed(
4343
)
4444
ray_dataset = read_datasource(
4545
datasource,
46-
parallelism=parallelism,
46+
override_num_blocks=override_num_blocks,
4747
)
4848
to_pandas_kwargs = _data_types.pyarrow2pandas_defaults(
4949
use_threads=use_threads,

awswrangler/distributed/ray/modin/s3/_read_parquet.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def _read_parquet_distributed(
3434
columns: list[str] | None,
3535
coerce_int96_timestamp_unit: str | None,
3636
use_threads: bool | int,
37-
parallelism: int,
37+
override_num_blocks: int,
3838
version_ids: dict[str, str] | None,
3939
s3_client: "S3Client" | None,
4040
s3_additional_kwargs: dict[str, Any] | None,
@@ -60,7 +60,7 @@ def _read_parquet_distributed(
6060
"dataset_kwargs": dataset_kwargs,
6161
},
6262
),
63-
parallelism=parallelism,
63+
override_num_blocks=override_num_blocks,
6464
)
6565
return _to_modin(
6666
dataset=dataset,

awswrangler/distributed/ray/modin/s3/_read_text.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def _read_text_distributed(
138138
s3_additional_kwargs: dict[str, str] | None,
139139
dataset: bool,
140140
ignore_index: bool,
141-
parallelism: int,
141+
override_num_blocks: int,
142142
version_ids: dict[str, str] | None,
143143
pandas_kwargs: dict[str, Any],
144144
) -> pd.DataFrame:
@@ -172,6 +172,6 @@ def _read_text_distributed(
172172
meta_provider=FastFileMetadataProvider(),
173173
**configuration,
174174
),
175-
parallelism=parallelism,
175+
override_num_blocks=override_num_blocks,
176176
)
177177
return _to_modin(dataset=ray_dataset, ignore_index=ignore_index)

awswrangler/s3/_read.py

+18
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from awswrangler.catalog._utils import _catalog_id
3030
from awswrangler.distributed.ray import ray_get
3131
from awswrangler.s3._list import _path2list, _prefix_cleanup
32+
from awswrangler.typing import RaySettings
3233

3334
if TYPE_CHECKING:
3435
from mypy_boto3_glue.type_defs import GetTableResponseTypeDef
@@ -377,3 +378,20 @@ def _get_paths_for_glue_table(
377378
)
378379

379380
return paths, path_root, res
381+
382+
383+
def _get_num_output_blocks(
384+
ray_args: RaySettings | None = None,
385+
) -> int:
386+
ray_args = ray_args or {}
387+
parallelism = ray_args.get("parallelism", -1)
388+
override_num_blocks = ray_args.get("override_num_blocks")
389+
if parallelism != -1:
390+
pass
391+
_logger.warning(
392+
"The argument ``parallelism`` is deprecated and will be removed in the next major release. "
393+
"Please specify ``override_num_blocks`` instead."
394+
)
395+
elif override_num_blocks is not None:
396+
parallelism = override_num_blocks
397+
return parallelism

awswrangler/s3/_read_orc.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
_apply_partition_filter,
2929
_check_version_id,
3030
_extract_partitions_dtypes_from_table_details,
31+
_get_num_output_blocks,
3132
_get_path_ignore_suffix,
3233
_get_path_root,
3334
_get_paths_for_glue_table,
@@ -137,7 +138,7 @@ def _read_orc(
137138
schema: pa.schema | None,
138139
columns: list[str] | None,
139140
use_threads: bool | int,
140-
parallelism: int,
141+
override_num_blocks: int,
141142
version_ids: dict[str, str] | None,
142143
s3_client: "S3Client" | None,
143144
s3_additional_kwargs: dict[str, Any] | None,
@@ -283,8 +284,6 @@ def read_orc(
283284
>>> df = wr.s3.read_orc(path, dataset=True, partition_filter=my_filter)
284285
285286
"""
286-
ray_args = ray_args if ray_args else {}
287-
288287
s3_client = _utils.client(service_name="s3", session=boto3_session)
289288
paths: list[str] = _path2list(
290289
path=path,
@@ -330,7 +329,7 @@ def read_orc(
330329
schema=schema,
331330
columns=columns,
332331
use_threads=use_threads,
333-
parallelism=ray_args.get("parallelism", -1),
332+
override_num_blocks=_get_num_output_blocks(ray_args),
334333
s3_client=s3_client,
335334
s3_additional_kwargs=s3_additional_kwargs,
336335
arrow_kwargs=arrow_kwargs,

awswrangler/s3/_read_parquet.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
_apply_partition_filter,
3535
_check_version_id,
3636
_extract_partitions_dtypes_from_table_details,
37+
_get_num_output_blocks,
3738
_get_path_ignore_suffix,
3839
_get_path_root,
3940
_get_paths_for_glue_table,
@@ -285,7 +286,7 @@ def _read_parquet(
285286
columns: list[str] | None,
286287
coerce_int96_timestamp_unit: str | None,
287288
use_threads: bool | int,
288-
parallelism: int,
289+
override_num_blocks: int,
289290
version_ids: dict[str, str] | None,
290291
s3_client: "S3Client" | None,
291292
s3_additional_kwargs: dict[str, Any] | None,
@@ -562,7 +563,7 @@ def read_parquet(
562563
columns=columns,
563564
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
564565
use_threads=use_threads,
565-
parallelism=ray_args.get("parallelism", -1),
566+
override_num_blocks=_get_num_output_blocks(ray_args),
566567
s3_client=s3_client,
567568
s3_additional_kwargs=s3_additional_kwargs,
568569
arrow_kwargs=arrow_kwargs,

awswrangler/s3/_read_parquet.pyi

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def _read_parquet(
2828
columns: list[str] | None,
2929
coerce_int96_timestamp_unit: str | None,
3030
use_threads: bool | int,
31-
parallelism: int,
31+
override_num_blocks: int,
3232
version_ids: dict[str, str] | None,
3333
s3_client: "S3Client" | None,
3434
s3_additional_kwargs: dict[str, Any] | None,

awswrangler/s3/_read_text.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from awswrangler.s3._read import (
2020
_apply_partition_filter,
2121
_check_version_id,
22+
_get_num_output_blocks,
2223
_get_path_ignore_suffix,
2324
_get_path_root,
2425
_union,
@@ -52,7 +53,7 @@ def _read_text(
5253
s3_additional_kwargs: dict[str, str] | None,
5354
dataset: bool,
5455
ignore_index: bool,
55-
parallelism: int,
56+
override_num_blocks: int,
5657
version_ids: dict[str, str] | None,
5758
pandas_kwargs: dict[str, Any],
5859
) -> pd.DataFrame:
@@ -131,7 +132,6 @@ def _read_text_format(
131132
**args,
132133
)
133134

134-
ray_args = ray_args if ray_args else {}
135135
return _read_text(
136136
read_format,
137137
paths=paths,
@@ -141,7 +141,7 @@ def _read_text_format(
141141
s3_additional_kwargs=s3_additional_kwargs,
142142
dataset=dataset,
143143
ignore_index=ignore_index,
144-
parallelism=ray_args.get("parallelism", -1),
144+
override_num_blocks=_get_num_output_blocks(ray_args),
145145
version_ids=version_ids,
146146
pandas_kwargs=pandas_kwargs,
147147
)

awswrangler/s3/_read_text.pyi

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def _read_text(
1919
s3_additional_kwargs: dict[str, str] | None,
2020
dataset: bool,
2121
ignore_index: bool,
22-
parallelism: int,
22+
override_num_blocks: int,
2323
version_ids: dict[str, str] | None,
2424
pandas_kwargs: dict[str, Any],
2525
) -> pd.DataFrame | Iterator[pd.DataFrame]: ...

awswrangler/typing.py

+7
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,13 @@ class RaySettings(TypedDict):
231231
Parallelism may be limited by the number of files of the dataset.
232232
Auto-detect by default.
233233
"""
234+
override_num_blocks: NotRequired[int]
235+
"""
236+
Override the number of output blocks from all read tasks.
237+
By default, the number of output blocks is dynamically decided based on
238+
input data size and available resources. You shouldn't manually set this
239+
value in most cases.
240+
"""
234241

235242

236243
class RayReadParquetSettings(RaySettings):

tests/glue_scripts/ray_read_small_parquet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
import awswrangler as wr
66

77
paths = wr.s3.list_objects(f"s3://{os.environ['data-gen-bucket']}/parquet/small/partitioned/")
8-
ray.data.read_parquet_bulk(paths=paths, parallelism=1000).to_modin()
8+
ray.data.read_parquet_bulk(paths=paths, override_num_blocks=1000).to_modin()

tests/glue_scripts/wrangler_read_small_parquet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44

55
wr.s3.read_parquet(
66
path=f"s3://{os.environ['data-gen-bucket']}/parquet/small/partitioned/",
7-
ray_args={"parallelism": 1000, "bulk_read": True},
7+
ray_args={"override_num_blocks": 1000, "bulk_read": True},
88
)

tests/glue_scripts/wrangler_write_partitioned_parquet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
df = wr.s3.read_parquet(
66
path=f"s3://{os.environ['data-gen-bucket']}/parquet/medium/partitioned/",
7-
ray_args={"parallelism": 1000},
7+
ray_args={"override_num_blocks": 1000},
88
)
99

1010
wr.s3.to_parquet(

0 commit comments

Comments
 (0)