From e6899695cacd65e18ef12fa6f73520c8b43bc748 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 21 Jul 2023 14:03:07 -0700 Subject: [PATCH 1/5] Simple deltalake benchmark. --- .github/workflows/tests.yml | 47 +++++++++++++++--------------- ci/environment.yml | 4 ++- tests/benchmarks/test_deltalake.py | 34 +++++++++++++++++++++ 3 files changed, 61 insertions(+), 24 deletions(-) create mode 100644 tests/benchmarks/test_deltalake.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9b76cdbff5..7c952c2c56 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -32,29 +32,30 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9"] - pytest_args: [tests] - include: - # Run stability tests on the lowest and highest versions of Python only - # These are temporarily redundant with the current global python-version - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - os: macos-latest + # pytest_args: [tests] + pytest_args: [tests/benchmarks/test_deltalake.py] +# include: +# # Run stability tests on the lowest and highest versions of Python only +# # These are temporarily redundant with the current global python-version +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# # Run stability tests on Python Windows and MacOS (latest py39 only) +# - pytest_args: tests/stability +# python-version: "3.9" +# os: windows-latest +# - pytest_args: tests/stability +# python-version: "3.9" +# os: macos-latest steps: - name: Checkout diff --git a/ci/environment.yml b/ci/environment.yml index aadf8a1189..1e2dea97d7 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -41,4 +41,6 @@ dependencies: - gilknocker ==0.4.1 - openssl >1.1.0g - pyopenssl ==22.1.0 # Pinned by snowflake-connector-python - - cryptography ==38.0.4 # Pinned by snowflake-connector-python \ No newline at end of file + - cryptography ==38.0.4 # Pinned by snowflake-connector-python + - pip: + - git+https://github.com/dask-contrib/dask-deltatable.git # TODO: link to release version \ No newline at end of file diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py new file mode 100644 index 0000000000..2d7cf48dc4 --- /dev/null +++ b/tests/benchmarks/test_deltalake.py @@ -0,0 +1,34 @@ +import dask.dataframe as dd +import dask_deltatable as ddt +import pytest + + +@pytest.fixture(params=["read_deltalake", "read_parquet"]) +def ddf(request, small_client): + uri = "s3://coiled-datasets/delta/ds20f_100M/" + if request.param == "read_deltalake": + yield ddt.read_deltalake(uri) + else: + yield dd.read_parquet(f"{uri}*.parquet", engine="pyarrow") + + +def test_column_agg(ddf): + ddf["float1"].agg(["sum", "mean"]).compute() + + +def test_group_agg(ddf): + ddf = ddf[["int1", "int2", "int3"]] + ( + ddf.groupby(["int2", "int3"], dropna=False, observed=True) + .agg({"int1": ["sum", "mean"]}) + .compute() + ) + + +def test_group_median(ddf, shuffle_method): + ddf = ddf[["int1", "int2", "int3"]] + ( + ddf.groupby(["int2", "int3"], dropna=False, observed=True) + .agg({"int1": ["median", "std"]}, shuffle=shuffle_method) + .compute() + ) From 48ba50264fe46aa6120cf2288303ad3f99a093d5 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 21 Jul 2023 14:18:23 -0700 Subject: [PATCH 2/5] Aws region. --- tests/benchmarks/test_deltalake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py index 2d7cf48dc4..668e697f8e 100644 --- a/tests/benchmarks/test_deltalake.py +++ b/tests/benchmarks/test_deltalake.py @@ -7,7 +7,7 @@ def ddf(request, small_client): uri = "s3://coiled-datasets/delta/ds20f_100M/" if request.param == "read_deltalake": - yield ddt.read_deltalake(uri) + yield ddt.read_deltalake(uri, delta_storage_options={"AWS_REGION": "us-east-2"}) else: yield dd.read_parquet(f"{uri}*.parquet", engine="pyarrow") From 67bae9aae33a38cd18d96b42d9e5cb0d302cc904 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 21 Jul 2023 16:33:55 -0700 Subject: [PATCH 3/5] Use h2o delta datasets. --- tests/benchmarks/test_deltalake.py | 128 +++++++++++++++++++++++++---- 1 file changed, 114 insertions(+), 14 deletions(-) diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py index 668e697f8e..4c78cf78f6 100644 --- a/tests/benchmarks/test_deltalake.py +++ b/tests/benchmarks/test_deltalake.py @@ -1,34 +1,134 @@ +import os + import dask.dataframe as dd import dask_deltatable as ddt +import pandas as pd import pytest +DATASETS = { + "0.5 GB": "s3://coiled-datasets/h2o-delta/N_1e7_K_1e2/", + "5 GB": "s3://coiled-datasets/h2o-delta/N_1e8_K_1e2/", +} + +enabled_datasets = os.getenv("H2O_DELTA_DATASETS") +if enabled_datasets is not None: + enabled_datasets = {k.strip() for k in enabled_datasets.split(",")} + if unknown_datasets := enabled_datasets - DATASETS.keys(): + raise ValueError("Unknown h2o-delta dataset(s): ", unknown_datasets) +else: + enabled_datasets = { + "0.5 GB", + "5 GB", + } + + +@pytest.fixture(params=list(DATASETS)) +def uri(request): + if request.param not in enabled_datasets: + raise pytest.skip( + "Disabled by default config or H2O_DELTA_DATASETS env variable" + ) + return DATASETS[request.param] + @pytest.fixture(params=["read_deltalake", "read_parquet"]) -def ddf(request, small_client): - uri = "s3://coiled-datasets/delta/ds20f_100M/" +def ddf(request, small_client, uri): if request.param == "read_deltalake": - yield ddt.read_deltalake(uri, delta_storage_options={"AWS_REGION": "us-east-2"}) + yield ddt.read_deltalake( + uri, delta_storage_options={"AWS_REGION": "us-east-2", "anon": "true"} + ) else: - yield dd.read_parquet(f"{uri}*.parquet", engine="pyarrow") + yield dd.read_parquet( + f"{uri}*.parquet", engine="pyarrow", storage_options={"anon": "true"} + ) + + +def test_q1(ddf): + ddf = ddf[["id1", "v1"]] + ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute() + + +def test_q2(ddf): + ddf = ddf[["id1", "id2", "v1"]] + ( + ddf.groupby(["id1", "id2"], dropna=False, observed=True) + .agg({"v1": "sum"}) + .compute() + ) + + +def test_q3(ddf): + ddf = ddf[["id3", "v1", "v3"]] + ( + ddf.groupby("id3", dropna=False, observed=True) + .agg({"v1": "sum", "v3": "mean"}) + .compute() + ) -def test_column_agg(ddf): - ddf["float1"].agg(["sum", "mean"]).compute() +def test_q4(ddf): + ddf = ddf[["id4", "v1", "v2", "v3"]] + ( + ddf.groupby("id4", dropna=False, observed=True) + .agg({"v1": "mean", "v2": "mean", "v3": "mean"}) + .compute() + ) + + +def test_q5(ddf): + ddf = ddf[["id6", "v1", "v2", "v3"]] + ( + ddf.groupby("id6", dropna=False, observed=True) + .agg( + {"v1": "sum", "v2": "sum", "v3": "sum"}, + ) + .compute() + ) + + +def test_q6(ddf, shuffle_method): + # Median aggregation uses an explicitly-set shuffle + ddf = ddf[["id4", "id5", "v3"]] + ( + ddf.groupby(["id4", "id5"], dropna=False, observed=True) + .agg({"v3": ["median", "std"]}, shuffle=shuffle_method) + .compute() # requires shuffle arg to be set explicitly + ) + + +def test_q7(ddf): + ddf = ddf[["id3", "v1", "v2"]] + ( + ddf.groupby("id3", dropna=False, observed=True) + .agg({"v1": "max", "v2": "min"}) + .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]] + .compute() + ) -def test_group_agg(ddf): - ddf = ddf[["int1", "int2", "int3"]] +def test_q8(ddf, configure_shuffling): + # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function + ddf = ddf[["id6", "v1", "v2", "v3"]] ( - ddf.groupby(["int2", "int3"], dropna=False, observed=True) - .agg({"int1": ["sum", "mean"]}) + ddf[~ddf["v3"].isna()][["id6", "v3"]] + .groupby("id6", dropna=False, observed=True) + .apply( + lambda x: x.nlargest(2, columns="v3"), + meta={"id6": "Int64", "v3": "float64"}, + )[["v3"]] .compute() ) -def test_group_median(ddf, shuffle_method): - ddf = ddf[["int1", "int2", "int3"]] +def test_q9(ddf, configure_shuffling): + # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function + ddf = ddf[["id2", "id4", "v1", "v2"]] ( - ddf.groupby(["int2", "int3"], dropna=False, observed=True) - .agg({"int1": ["median", "std"]}, shuffle=shuffle_method) + ddf[["id2", "id4", "v1", "v2"]] + .groupby(["id2", "id4"], dropna=False, observed=True) + .apply( + lambda x: pd.Series({"r2": x.corr(numeric_only=True)["v1"]["v2"] ** 2}), + meta={"r2": "float64"}, + ) .compute() ) From 080e06fbfae45e5ba7595f4930f76042aaa1bc14 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 21 Jul 2023 17:48:07 -0700 Subject: [PATCH 4/5] Auth doesn't work; do we have a profile? --- tests/benchmarks/test_deltalake.py | 176 ++++++++++++++--------------- 1 file changed, 87 insertions(+), 89 deletions(-) diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py index 4c78cf78f6..54f318b961 100644 --- a/tests/benchmarks/test_deltalake.py +++ b/tests/benchmarks/test_deltalake.py @@ -2,7 +2,6 @@ import dask.dataframe as dd import dask_deltatable as ddt -import pandas as pd import pytest DATASETS = { @@ -34,12 +33,11 @@ def uri(request): @pytest.fixture(params=["read_deltalake", "read_parquet"]) def ddf(request, small_client, uri): if request.param == "read_deltalake": - yield ddt.read_deltalake( - uri, delta_storage_options={"AWS_REGION": "us-east-2", "anon": "true"} - ) + delta_storage_options = {"AWS_REGION": "us-east-2", "AWS_PROFILE": "default"} + yield ddt.read_deltalake(uri, delta_storage_options=delta_storage_options) else: yield dd.read_parquet( - f"{uri}*.parquet", engine="pyarrow", storage_options={"anon": "true"} + f"{uri}*/*.parquet", engine="pyarrow", storage_options={"anon": "true"} ) @@ -48,87 +46,87 @@ def test_q1(ddf): ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute() -def test_q2(ddf): - ddf = ddf[["id1", "id2", "v1"]] - ( - ddf.groupby(["id1", "id2"], dropna=False, observed=True) - .agg({"v1": "sum"}) - .compute() - ) - - -def test_q3(ddf): - ddf = ddf[["id3", "v1", "v3"]] - ( - ddf.groupby("id3", dropna=False, observed=True) - .agg({"v1": "sum", "v3": "mean"}) - .compute() - ) - - -def test_q4(ddf): - ddf = ddf[["id4", "v1", "v2", "v3"]] - ( - ddf.groupby("id4", dropna=False, observed=True) - .agg({"v1": "mean", "v2": "mean", "v3": "mean"}) - .compute() - ) - - -def test_q5(ddf): - ddf = ddf[["id6", "v1", "v2", "v3"]] - ( - ddf.groupby("id6", dropna=False, observed=True) - .agg( - {"v1": "sum", "v2": "sum", "v3": "sum"}, - ) - .compute() - ) - - -def test_q6(ddf, shuffle_method): - # Median aggregation uses an explicitly-set shuffle - ddf = ddf[["id4", "id5", "v3"]] - ( - ddf.groupby(["id4", "id5"], dropna=False, observed=True) - .agg({"v3": ["median", "std"]}, shuffle=shuffle_method) - .compute() # requires shuffle arg to be set explicitly - ) - - -def test_q7(ddf): - ddf = ddf[["id3", "v1", "v2"]] - ( - ddf.groupby("id3", dropna=False, observed=True) - .agg({"v1": "max", "v2": "min"}) - .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]] - .compute() - ) - - -def test_q8(ddf, configure_shuffling): - # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function - ddf = ddf[["id6", "v1", "v2", "v3"]] - ( - ddf[~ddf["v3"].isna()][["id6", "v3"]] - .groupby("id6", dropna=False, observed=True) - .apply( - lambda x: x.nlargest(2, columns="v3"), - meta={"id6": "Int64", "v3": "float64"}, - )[["v3"]] - .compute() - ) - - -def test_q9(ddf, configure_shuffling): - # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function - ddf = ddf[["id2", "id4", "v1", "v2"]] - ( - ddf[["id2", "id4", "v1", "v2"]] - .groupby(["id2", "id4"], dropna=False, observed=True) - .apply( - lambda x: pd.Series({"r2": x.corr(numeric_only=True)["v1"]["v2"] ** 2}), - meta={"r2": "float64"}, - ) - .compute() - ) +# def test_q2(ddf): +# ddf = ddf[["id1", "id2", "v1"]] +# ( +# ddf.groupby(["id1", "id2"], dropna=False, observed=True) +# .agg({"v1": "sum"}) +# .compute() +# ) +# +# +# def test_q3(ddf): +# ddf = ddf[["id3", "v1", "v3"]] +# ( +# ddf.groupby("id3", dropna=False, observed=True) +# .agg({"v1": "sum", "v3": "mean"}) +# .compute() +# ) +# +# +# def test_q4(ddf): +# ddf = ddf[["id4", "v1", "v2", "v3"]] +# ( +# ddf.groupby("id4", dropna=False, observed=True) +# .agg({"v1": "mean", "v2": "mean", "v3": "mean"}) +# .compute() +# ) +# +# +# def test_q5(ddf): +# ddf = ddf[["id6", "v1", "v2", "v3"]] +# ( +# ddf.groupby("id6", dropna=False, observed=True) +# .agg( +# {"v1": "sum", "v2": "sum", "v3": "sum"}, +# ) +# .compute() +# ) +# +# +# def test_q6(ddf, shuffle_method): +# # Median aggregation uses an explicitly-set shuffle +# ddf = ddf[["id4", "id5", "v3"]] +# ( +# ddf.groupby(["id4", "id5"], dropna=False, observed=True) +# .agg({"v3": ["median", "std"]}, shuffle=shuffle_method) +# .compute() # requires shuffle arg to be set explicitly +# ) +# +# +# def test_q7(ddf): +# ddf = ddf[["id3", "v1", "v2"]] +# ( +# ddf.groupby("id3", dropna=False, observed=True) +# .agg({"v1": "max", "v2": "min"}) +# .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]] +# .compute() +# ) +# +# +# def test_q8(ddf, configure_shuffling): +# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function +# ddf = ddf[["id6", "v1", "v2", "v3"]] +# ( +# ddf[~ddf["v3"].isna()][["id6", "v3"]] +# .groupby("id6", dropna=False, observed=True) +# .apply( +# lambda x: x.nlargest(2, columns="v3"), +# meta={"id6": "Int64", "v3": "float64"}, +# )[["v3"]] +# .compute() +# ) +# +# +# def test_q9(ddf, configure_shuffling): +# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function +# ddf = ddf[["id2", "id4", "v1", "v2"]] +# ( +# ddf[["id2", "id4", "v1", "v2"]] +# .groupby(["id2", "id4"], dropna=False, observed=True) +# .apply( +# lambda x: pd.Series({"r2": x.corr(numeric_only=True)["v1"]["v2"] ** 2}), +# meta={"r2": "float64"}, +# ) +# .compute() +# ) From 76650f75bba3cea0c0fc77ae9a4ea8c1edb0c2fc Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 21 Jul 2023 18:22:48 -0700 Subject: [PATCH 5/5] Perhaps with access key / secret key. --- tests/benchmarks/test_deltalake.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py index 54f318b961..d6d726cc1d 100644 --- a/tests/benchmarks/test_deltalake.py +++ b/tests/benchmarks/test_deltalake.py @@ -33,7 +33,11 @@ def uri(request): @pytest.fixture(params=["read_deltalake", "read_parquet"]) def ddf(request, small_client, uri): if request.param == "read_deltalake": - delta_storage_options = {"AWS_REGION": "us-east-2", "AWS_PROFILE": "default"} + delta_storage_options = { + "AWS_REGION": "us-east-2", + "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"], + "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"], + } yield ddt.read_deltalake(uri, delta_storage_options=delta_storage_options) else: yield dd.read_parquet( @@ -42,8 +46,8 @@ def ddf(request, small_client, uri): def test_q1(ddf): - ddf = ddf[["id1", "v1"]] - ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute() + ddf = ddf[["id1", "v2"]] + ddf.groupby("id1", dropna=False, observed=True).agg({"v2": "sum"}).compute() # def test_q2(ddf):