Skip to content

Commit 4f8ce75

Browse files
dharanadandygrovekazuyukitanimuraparthchandraviirya
authored
feat: add support for array_contains expression (#1163)
* feat: add support for array_contains expression * test: add unit test for array_contains function * Removes unnecessary case expression for handling null values * chore: Move more expressions from core crate to spark-expr crate (#1152) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * remove dead code (#1155) * fix: Spark 4.0-preview1 SPARK-47120 (#1156) ## Which issue does this PR close? Part of #372 and #551 ## Rationale for this change To be ready for Spark 4.0 ## What changes are included in this PR? This PR fixes the new test SPARK-47120 added in Spark 4.0 ## How are these changes tested? tests enabled * chore: Move string kernels and expressions to spark-expr crate (#1164) * Move string kernels and expressions to spark-expr crate * remove unused hash kernel * remove unused dependencies * chore: Move remaining expressions to spark-expr crate + some minor refactoring (#1165) * move CheckOverflow to spark-expr crate * move NegativeExpr to spark-expr crate * move UnboundColumn to spark-expr crate * move ExpandExec from execution::datafusion::operators to execution::operators * refactoring to remove datafusion subpackage * update imports in benches * fix * fix * chore: Add ignored tests for reading complex types from Parquet (#1167) * Add ignored tests for reading structs from Parquet * add basic map test * add tests for Map and Array * feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169) * Add Spark-compatible SchemaAdapterFactory implementation * remove prototype code * fix * refactor * implement more cast logic * implement more cast logic * add basic test * improve test * cleanup * fmt * add support for casting unsigned int to signed int * clippy * address feedback * fix test * fix: Document enabling comet explain plan usage in Spark (4.0) (#1176) * test: enabling Spark tests with offHeap requirement (#1177) ## Which issue does this PR close? ## Rationale for this change After #1062 We have not running Spark tests for native execution ## What changes are included in this PR? Removed the off heap requirement for testing ## How are these changes tested? Bringing back Spark tests for native execution * feat: Improve shuffle metrics (second attempt) (#1175) * improve shuffle metrics * docs * more metrics * refactor * address feedback * fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184) * add test * fix * fix * fix * feat: Make native shuffle compression configurable and respect `spark.shuffle.compress` (#1185) * Make shuffle compression codec and level configurable * remove lz4 references * docs * update comment * clippy * fix benches * clippy * clippy * disable test for miri * remove lz4 reference from proto * minor: move shuffle classes from common to spark (#1193) * minor: refactor decodeBatches to make private in broadcast exchange (#1195) * minor: refactor prepare_output so that it does not require an ExecutionContext (#1194) * fix: fix missing explanation for then branch in case when (#1200) * minor: remove unused source files (#1202) * chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * save * save * save * remove unused imports * clippy * implement more hashers * implement Hash and PartialEq * implement Hash and PartialEq * implement Hash and PartialEq * benches * fix ScalarUDFImpl.return_type failure * exclude test from miri * ignore correct test * ignore another test * remove miri checks * use return_type_from_exprs * Revert "use return_type_from_exprs" This reverts commit febc1f1. * use DF main branch * hacky workaround for regression in ScalarUDFImpl.return_type * fix repo url * pin to revision * bump to latest rev * bump to latest DF rev * bump DF to rev 9f530dd * add Cargo.lock * bump DF version * no default features * Revert "remove miri checks" This reverts commit 4638fe3. * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930 * update pin * Update Cargo.toml Bump to 44.0.0-rc2 * update cargo lock * revert miri change --------- Co-authored-by: Andrew Lamb <[email protected]> * update UT Signed-off-by: Dharan Aditya <[email protected]> * fix typo in UT Signed-off-by: Dharan Aditya <[email protected]> --------- Signed-off-by: Dharan Aditya <[email protected]> Co-authored-by: Andy Grove <[email protected]> Co-authored-by: KAZUYUKI TANIMURA <[email protected]> Co-authored-by: Parth Chandra <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raz Luvaton <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 5d2c909 commit 4f8ce75

File tree

4 files changed

+34
-0
lines changed

4 files changed

+34
-0
lines changed

native/core/src/execution/planner.rs

+15
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ use datafusion_expr::{
9898
AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
9999
WindowFunctionDefinition,
100100
};
101+
use datafusion_functions_nested::array_has::ArrayHas;
101102
use datafusion_physical_expr::expressions::{Literal, StatsType};
102103
use datafusion_physical_expr::window::WindowExpr;
103104
use datafusion_physical_expr::LexOrdering;
@@ -719,6 +720,20 @@ impl PhysicalPlanner {
719720
expr.legacy_negative_index,
720721
)))
721722
}
723+
ExprStruct::ArrayContains(expr) => {
724+
let src_array_expr =
725+
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
726+
let key_expr =
727+
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
728+
let args = vec![Arc::clone(&src_array_expr), key_expr];
729+
let array_has_expr = Arc::new(ScalarFunctionExpr::new(
730+
"array_has",
731+
Arc::new(ScalarUDF::new_from_impl(ArrayHas::new())),
732+
args,
733+
DataType::Boolean,
734+
));
735+
Ok(array_has_expr)
736+
}
722737
expr => Err(ExecutionError::GeneralError(format!(
723738
"Not implemented: {:?}",
724739
expr

native/proto/src/proto/expr.proto

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ message Expr {
8484
GetArrayStructFields get_array_struct_fields = 57;
8585
BinaryExpr array_append = 58;
8686
ArrayInsert array_insert = 59;
87+
BinaryExpr array_contains = 60;
8788
}
8889
}
8990

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

+6
Original file line numberDiff line numberDiff line change
@@ -2266,6 +2266,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
22662266
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
22672267
None
22682268
}
2269+
case expr if expr.prettyName == "array_contains" =>
2270+
createBinaryExpr(
2271+
expr.children(0),
2272+
expr.children(1),
2273+
inputs,
2274+
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
22692275
case _ if expr.prettyName == "array_append" =>
22702276
createBinaryExpr(
22712277
expr.children(0),

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

+12
Original file line numberDiff line numberDiff line change
@@ -2517,4 +2517,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25172517
checkSparkAnswer(df.select("arrUnsupportedArgs"))
25182518
}
25192519
}
2520+
2521+
test("array_contains") {
2522+
withTempDir { dir =>
2523+
val path = new Path(dir.toURI.toString, "test.parquet")
2524+
makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000)
2525+
spark.read.parquet(path.toString).createOrReplaceTempView("t1");
2526+
checkSparkAnswerAndOperator(
2527+
spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
2528+
checkSparkAnswerAndOperator(
2529+
spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1"));
2530+
}
2531+
}
25202532
}

0 commit comments

Comments
 (0)