Skip to content

Commit 32b9338

Browse files
parthchandraNoeBviiryarluvaton-flarionandygrove
authored
chore: Comet parquet exec merge from main(20250114) (#1293)
* feat: support array_append (#1072) * feat: support array_append * formatted code * rewrite array_append plan to match spark behaviour and fixed bug in QueryPlan serde * remove unwrap * Fix for Spark 3.3 * refactor array_append binary expression serde code * Disabled array_append test for spark 4.0+ * chore: Simplify CometShuffleMemoryAllocator to use Spark unified memory allocator (#1063) * docs: Update benchmarking.md (#1085) * feat: Require offHeap memory to be enabled (always use unified memory) (#1062) * Require offHeap memory * remove unused import * use off heap memory in stability tests * reorder imports * test: Restore one test in CometExecSuite by adding COMET_SHUFFLE_MODE config (#1087) * Add changelog for 0.4.0 (#1089) * chore: Prepare for 0.5.0 development (#1090) * Update version number for build * update docs * build: Skip installation of spark-integration and fuzz testing modules (#1091) * Add hint for finding the GPG key to use when publishing to maven (#1093) * docs: Update documentation for 0.4.0 release (#1096) * update TPC-H results * update Maven links * update benchmarking guide and add TPC-DS results * include q72 * fix: Unsigned type related bugs (#1095) ## Which issue does this PR close? Closes #1067 ## Rationale for this change Bug fix. A few expressions were failing some unsigned type related tests ## What changes are included in this PR? - For `u8`/`u16`, switched to use `generate_cast_to_signed!` in order to copy full i16/i32 width instead of padding zeros in the higher bits - `u64` becomes `Decimal(20, 0)` but there was a bug in `round()` (`>` vs `>=`) ## How are these changes tested? Put back tests for unsigned types * chore: Include first ScanExec batch in metrics (#1105) * include first batch in ScanExec metrics * record row count metric * fix regression * chore: Improve CometScan metrics (#1100) * Add native metrics for plan creation * make messages consistent * Include get_next_batch cost in metrics * formatting * fix double count of rows * chore: Add custom metric for native shuffle fetching batches from JVM (#1108) * feat: support array_insert (#1073) * Part of the implementation of array_insert * Missing methods * Working version * Reformat code * Fix code-style * Add comments about spark's implementation. * Implement negative indices + fix tests for spark < 3.4 * Fix code-style * Fix scalastyle * Fix tests for spark < 3.4 * Fixes & tests - added test for the negative index - added test for the legacy spark mode * Use assume(isSpark34Plus) in tests * Test else-branch & improve coverage * Update native/spark-expr/src/list.rs Co-authored-by: Andy Grove <[email protected]> * Fix fallback test In one case there is a zero in index and test fails due to spark error * Adjust the behaviour for the NULL case to Spark * Move the logic of type checking to the method * Fix code-style --------- Co-authored-by: Andy Grove <[email protected]> * feat: enable decimal to decimal cast of different precision and scale (#1086) * enable decimal to decimal cast of different precision and scale * add more test cases for negative scale and higher precision * add check for compatibility for decimal to decimal * fix code style * Update spark/src/main/scala/org/apache/comet/expressions/CometCast.scala Co-authored-by: Andy Grove <[email protected]> * fix the nit in comment --------- Co-authored-by: himadripal <[email protected]> Co-authored-by: Andy Grove <[email protected]> * docs: fix readme FGPA/FPGA typo (#1117) * fix: Use RDD partition index (#1112) * fix: Use RDD partition index * fix * fix * fix * fix: Various metrics bug fixes and improvements (#1111) * fix: Don't create CometScanExec for subclasses of ParquetFileFormat (#1129) * Use exact class comparison for parquet scan * Add test * Add comment * fix: Fix metrics regressions (#1132) * fix metrics issues * clippy * update tests * docs: Add more technical detail and new diagram to Comet plugin overview (#1119) * Add more technical detail and new diagram to Comet plugin overview * update diagram * add info on Arrow IPC * update diagram * update diagram * update docs * address feedback * Stop passing Java config map into native createPlan (#1101) * feat: Improve ScanExec native metrics (#1133) * save * remove shuffle jvm metric and update tuning guide * docs * add source for all ScanExecs * address feedback * address feedback * chore: Remove unused StringView struct (#1143) * Remove unused StringView struct * remove more dead code * docs: Add some documentation explaining how shuffle works (#1148) * add some notes on shuffle * reads * improve docs * test: enable more Spark 4.0 tests (#1145) ## Which issue does this PR close? Part of #372 and #551 ## Rationale for this change To be ready for Spark 4.0 ## What changes are included in this PR? This PR enables more Spark 4.0 tests that were fixed by recent changes ## How are these changes tested? tests enabled * chore: Refactor cast to use SparkCastOptions param (#1146) * Refactor cast to use SparkCastOptions param * update tests * update benches * update benches * update benches * Enable more scenarios in CometExecBenchmark. (#1151) * chore: Move more expressions from core crate to spark-expr crate (#1152) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * remove dead code (#1155) * fix: Spark 4.0-preview1 SPARK-47120 (#1156) ## Which issue does this PR close? Part of #372 and #551 ## Rationale for this change To be ready for Spark 4.0 ## What changes are included in this PR? This PR fixes the new test SPARK-47120 added in Spark 4.0 ## How are these changes tested? tests enabled * chore: Move string kernels and expressions to spark-expr crate (#1164) * Move string kernels and expressions to spark-expr crate * remove unused hash kernel * remove unused dependencies * chore: Move remaining expressions to spark-expr crate + some minor refactoring (#1165) * move CheckOverflow to spark-expr crate * move NegativeExpr to spark-expr crate * move UnboundColumn to spark-expr crate * move ExpandExec from execution::datafusion::operators to execution::operators * refactoring to remove datafusion subpackage * update imports in benches * fix * fix * chore: Add ignored tests for reading complex types from Parquet (#1167) * Add ignored tests for reading structs from Parquet * add basic map test * add tests for Map and Array * feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169) * Add Spark-compatible SchemaAdapterFactory implementation * remove prototype code * fix * refactor * implement more cast logic * implement more cast logic * add basic test * improve test * cleanup * fmt * add support for casting unsigned int to signed int * clippy * address feedback * fix test * fix: Document enabling comet explain plan usage in Spark (4.0) (#1176) * test: enabling Spark tests with offHeap requirement (#1177) ## Which issue does this PR close? ## Rationale for this change After #1062 We have not running Spark tests for native execution ## What changes are included in this PR? Removed the off heap requirement for testing ## How are these changes tested? Bringing back Spark tests for native execution * feat: Improve shuffle metrics (second attempt) (#1175) * improve shuffle metrics * docs * more metrics * refactor * address feedback * fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184) * add test * fix * fix * fix * feat: Make native shuffle compression configurable and respect `spark.shuffle.compress` (#1185) * Make shuffle compression codec and level configurable * remove lz4 references * docs * update comment * clippy * fix benches * clippy * clippy * disable test for miri * remove lz4 reference from proto * minor: move shuffle classes from common to spark (#1193) * minor: refactor decodeBatches to make private in broadcast exchange (#1195) * minor: refactor prepare_output so that it does not require an ExecutionContext (#1194) * fix: fix missing explanation for then branch in case when (#1200) * minor: remove unused source files (#1202) * chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * save * save * save * remove unused imports * clippy * implement more hashers * implement Hash and PartialEq * implement Hash and PartialEq * implement Hash and PartialEq * benches * fix ScalarUDFImpl.return_type failure * exclude test from miri * ignore correct test * ignore another test * remove miri checks * use return_type_from_exprs * Revert "use return_type_from_exprs" This reverts commit febc1f1. * use DF main branch * hacky workaround for regression in ScalarUDFImpl.return_type * fix repo url * pin to revision * bump to latest rev * bump to latest DF rev * bump DF to rev 9f530dd * add Cargo.lock * bump DF version * no default features * Revert "remove miri checks" This reverts commit 4638fe3. * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930 * update pin * Update Cargo.toml Bump to 44.0.0-rc2 * update cargo lock * revert miri change --------- Co-authored-by: Andrew Lamb <[email protected]> * feat: add support for array_contains expression (#1163) * feat: add support for array_contains expression * test: add unit test for array_contains function * Removes unnecessary case expression for handling null values * chore: Move more expressions from core crate to spark-expr crate (#1152) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * remove dead code (#1155) * fix: Spark 4.0-preview1 SPARK-47120 (#1156) ## Which issue does this PR close? Part of #372 and #551 ## Rationale for this change To be ready for Spark 4.0 ## What changes are included in this PR? This PR fixes the new test SPARK-47120 added in Spark 4.0 ## How are these changes tested? tests enabled * chore: Move string kernels and expressions to spark-expr crate (#1164) * Move string kernels and expressions to spark-expr crate * remove unused hash kernel * remove unused dependencies * chore: Move remaining expressions to spark-expr crate + some minor refactoring (#1165) * move CheckOverflow to spark-expr crate * move NegativeExpr to spark-expr crate * move UnboundColumn to spark-expr crate * move ExpandExec from execution::datafusion::operators to execution::operators * refactoring to remove datafusion subpackage * update imports in benches * fix * fix * chore: Add ignored tests for reading complex types from Parquet (#1167) * Add ignored tests for reading structs from Parquet * add basic map test * add tests for Map and Array * feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169) * Add Spark-compatible SchemaAdapterFactory implementation * remove prototype code * fix * refactor * implement more cast logic * implement more cast logic * add basic test * improve test * cleanup * fmt * add support for casting unsigned int to signed int * clippy * address feedback * fix test * fix: Document enabling comet explain plan usage in Spark (4.0) (#1176) * test: enabling Spark tests with offHeap requirement (#1177) ## Which issue does this PR close? ## Rationale for this change After #1062 We have not running Spark tests for native execution ## What changes are included in this PR? Removed the off heap requirement for testing ## How are these changes tested? Bringing back Spark tests for native execution * feat: Improve shuffle metrics (second attempt) (#1175) * improve shuffle metrics * docs * more metrics * refactor * address feedback * fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184) * add test * fix * fix * fix * feat: Make native shuffle compression configurable and respect `spark.shuffle.compress` (#1185) * Make shuffle compression codec and level configurable * remove lz4 references * docs * update comment * clippy * fix benches * clippy * clippy * disable test for miri * remove lz4 reference from proto * minor: move shuffle classes from common to spark (#1193) * minor: refactor decodeBatches to make private in broadcast exchange (#1195) * minor: refactor prepare_output so that it does not require an ExecutionContext (#1194) * fix: fix missing explanation for then branch in case when (#1200) * minor: remove unused source files (#1202) * chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * save * save * save * remove unused imports * clippy * implement more hashers * implement Hash and PartialEq * implement Hash and PartialEq * implement Hash and PartialEq * benches * fix ScalarUDFImpl.return_type failure * exclude test from miri * ignore correct test * ignore another test * remove miri checks * use return_type_from_exprs * Revert "use return_type_from_exprs" This reverts commit febc1f1. * use DF main branch * hacky workaround for regression in ScalarUDFImpl.return_type * fix repo url * pin to revision * bump to latest rev * bump to latest DF rev * bump DF to rev 9f530dd * add Cargo.lock * bump DF version * no default features * Revert "remove miri checks" This reverts commit 4638fe3. * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930 * update pin * Update Cargo.toml Bump to 44.0.0-rc2 * update cargo lock * revert miri change --------- Co-authored-by: Andrew Lamb <[email protected]> * update UT Signed-off-by: Dharan Aditya <[email protected]> * fix typo in UT Signed-off-by: Dharan Aditya <[email protected]> --------- Signed-off-by: Dharan Aditya <[email protected]> Co-authored-by: Andy Grove <[email protected]> Co-authored-by: KAZUYUKI TANIMURA <[email protected]> Co-authored-by: Parth Chandra <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raz Luvaton <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> * feat: Add a `spark.comet.exec.memoryPool` configuration for experimenting with various datafusion memory pool setups. (#1021) * feat: Reenable tests for filtered SMJ anti join (#1211) * feat: reenable filtered SMJ Anti join tests * feat: reenable filtered SMJ Anti join tests * feat: reenable filtered SMJ Anti join tests * feat: reenable filtered SMJ Anti join tests * Add CoalesceBatchesExec around SMJ with join filter * adding `CoalesceBatches` * adding `CoalesceBatches` * adding `CoalesceBatches` * feat: reenable filtered SMJ Anti join tests * feat: reenable filtered SMJ Anti join tests --------- Co-authored-by: Andy Grove <[email protected]> * chore: Add safety check to CometBuffer (#1050) * chore: Add safety check to CometBuffer * Add CometColumnarToRowExec * fix * fix * more * Update plan stability results * fix * fix * fix * Revert "fix" This reverts commit 9bad173. * Revert "Revert "fix"" This reverts commit d527ad1. * fix BucketedReadWithoutHiveSupportSuite * fix SparkPlanSuite * remove unreachable code (#1213) * test: Enable Comet by default except some tests in SparkSessionExtensionSuite (#1201) ## Which issue does this PR close? Part of #1197 ## Rationale for this change Since `loadCometExtension` in the diffs were not using `isCometEnabled`, `SparkSessionExtensionSuite` was not using Comet. Once enabled, some test failures discovered ## What changes are included in this PR? `loadCometExtension` now uses `isCometEnabled` that enables Comet by default Temporary ignore the failing tests in SparkSessionExtensionSuite ## How are these changes tested? existing tests * extract struct expressions to folders based on spark grouping (#1216) * chore: extract static invoke expressions to folders based on spark grouping (#1217) * extract static invoke expressions to folders based on spark grouping * Update native/spark-expr/src/static_invoke/mod.rs Co-authored-by: Andy Grove <[email protected]> --------- Co-authored-by: Andy Grove <[email protected]> * chore: Follow-on PR to fully enable onheap memory usage (#1210) * Make datafusion's native memory pool configurable * save * fix * Update memory calculation and add draft documentation * ready for review * ready for review * address feedback * Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh <[email protected]> * Update docs/source/user-guide/tuning.md Co-authored-by: Kristin Cowalcijk <[email protected]> * Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh <[email protected]> * Update docs/source/user-guide/tuning.md Co-authored-by: Liang-Chi Hsieh <[email protected]> * remove unused config --------- Co-authored-by: Kristin Cowalcijk <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> * feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support (#1192) * Implement native decoding and decompression * revert some variable renaming for smaller diff * fix oom issues? * make NativeBatchDecoderIterator more consistent with ArrowReaderIterator * fix oom and prep for review * format * Add LZ4 support * clippy, new benchmark * rename metrics, clean up lz4 code * update test * Add support for snappy * format * change default back to lz4 * make metrics more accurate * format * clippy * use faster unsafe version of lz4_flex * Make compression codec configurable for columnar shuffle * clippy * fix bench * fmt * address feedback * address feedback * address feedback * minor code simplification * cargo fmt * overflow check * rename compression level config * address feedback * address feedback * rename constant * chore: extract agg_funcs expressions to folders based on spark grouping (#1224) * extract agg_funcs expressions to folders based on spark grouping * fix rebase * extract datetime_funcs expressions to folders based on spark grouping (#1222) Co-authored-by: Andy Grove <[email protected]> * chore: use datafusion from crates.io (#1232) * chore: extract strings file to `strings_func` like in spark grouping (#1215) * chore: extract predicate_functions expressions to folders based on spark grouping (#1218) * extract predicate_functions expressions to folders based on spark grouping * code review changes --------- Co-authored-by: Andy Grove <[email protected]> * build(deps): bump protobuf version to 3.21.12 (#1234) * extract json_funcs expressions to folders based on spark grouping (#1220) Co-authored-by: Andy Grove <[email protected]> * test: Enable shuffle by default in Spark tests (#1240) ## Which issue does this PR close? ## Rationale for this change Because `isCometShuffleEnabled` is false by default, some tests were not reached ## What changes are included in this PR? Removed `isCometShuffleEnabled` and updated spark test diff ## How are these changes tested? existing test * chore: extract hash_funcs expressions to folders based on spark grouping (#1221) * extract hash_funcs expressions to folders based on spark grouping * extract hash_funcs expressions to folders based on spark grouping --------- Co-authored-by: Andy Grove <[email protected]> * fix: Fall back to Spark for unsupported partition or sort expressions in window aggregates (#1253) * perf: Improve query planning to more reliably fall back to columnar shuffle when native shuffle is not supported (#1209) * fix regression (#1259) * feat: add support for array_remove expression (#1179) * wip: array remove * added comet expression test * updated test cases * fixed array_remove function for null values * removed commented code * remove unnecessary code * updated the test for 'array_remove' * added test for array_remove in case the input array is null * wip: case array is empty * removed test case for empty array * fix: Fall back to Spark for distinct aggregates (#1262) * fall back to Spark for distinct aggregates * update expected plans for 3.4 * update expected plans for 3.5 * force build * add comment * feat: Implement custom RecordBatch serde for shuffle for improved performance (#1190) * Implement faster encoder for shuffle blocks * make code more concise * enable fast encoding for columnar shuffle * update benches * test all int types * test float * remaining types * add Snappy and Zstd(6) back to benchmark * fix regression * Update native/core/src/execution/shuffle/codec.rs Co-authored-by: Liang-Chi Hsieh <[email protected]> * address feedback * support nullable flag --------- Co-authored-by: Liang-Chi Hsieh <[email protected]> * docs: Update TPC-H benchmark results (#1257) * fix: disable initCap by default (#1276) * fix: disable initCap by default * Update spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala Co-authored-by: Andy Grove <[email protected]> * address review comments --------- Co-authored-by: Andy Grove <[email protected]> * chore: Add changelog for 0.5.0 (#1278) * Add changelog * revert accidental change * move 2 items to performance section * update TPC-DS results for 0.5.0 (#1277) * fix: cast timestamp to decimal is unsupported (#1281) * fix: cast timestamp to decimal is unsupported * fix style * revert test name and mark as ignore * add comment * Fix build after merge * Fix tests after merge * Fix plans after merge * fix partition id in execute plan after merge (from Andy Grove) --------- Signed-off-by: Dharan Aditya <[email protected]> Co-authored-by: NoeB <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Raz Luvaton <[email protected]> Co-authored-by: Andy Grove <[email protected]> Co-authored-by: KAZUYUKI TANIMURA <[email protected]> Co-authored-by: Sem <[email protected]> Co-authored-by: Himadri Pal <[email protected]> Co-authored-by: himadripal <[email protected]> Co-authored-by: gstvg <[email protected]> Co-authored-by: Adam Binford <[email protected]> Co-authored-by: Matt Butrovich <[email protected]> Co-authored-by: Raz Luvaton <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Dharan Aditya <[email protected]> Co-authored-by: Kristin Cowalcijk <[email protected]> Co-authored-by: Oleks V <[email protected]> Co-authored-by: Zhen Wang <[email protected]> Co-authored-by: Jagdish Parihar <[email protected]>
1 parent 6cafe28 commit 32b9338

File tree

937 files changed

+43214
-32931
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

937 files changed

+43214
-32931
lines changed

README.md

+6-13
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,23 @@ The following chart shows the time it takes to run the 22 TPC-H queries against
4646
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
4747
for details of the environment used for these benchmarks.
4848

49-
When using Comet, the overall run time is reduced from 615 seconds to 364 seconds, a 1.7x speedup, with query 1
50-
running 9x faster than Spark.
49+
When using Comet, the overall run time is reduced from 640 seconds to 331 seconds, very close to a 2x speedup.
5150

52-
Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.6x
53-
speedup compared to Spark.
51+
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_allqueries.png)
5452

55-
Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
56-
for a broader set of queries.
53+
Here is a breakdown showing relative performance of Spark and Comet for each TPC-H query.
5754

58-
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_allqueries.png)
59-
60-
Here is a breakdown showing relative performance of Spark, Comet, and DataFusion for each TPC-H query.
61-
62-
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_compare.png)
55+
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_compare.png)
6356

6457
The following charts shows how much Comet currently accelerates each query from the benchmark.
6558

6659
### Relative speedup
6760

68-
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_rel.png)
61+
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_speedup_rel.png)
6962

7063
### Absolute speedup
7164

72-
![](docs/source/_static/images/benchmark-results/0.4.0/tpch_queries_speedup_abs.png)
65+
![](docs/source/_static/images/benchmark-results/0.5.0/tpch_queries_speedup_abs.png)
7366

7467
These benchmarks can be reproduced in any environment using the documentation in the
7568
[Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html). We encourage

common/src/main/java/org/apache/comet/parquet/ColumnReader.java

-22
Original file line numberDiff line numberDiff line change
@@ -172,28 +172,6 @@ public void close() {
172172

173173
/** Returns a decoded {@link CometDecodedVector Comet vector}. */
174174
public CometDecodedVector loadVector() {
175-
// Only re-use Comet vector iff:
176-
// 1. if we're not using dictionary encoding, since with dictionary encoding, the native
177-
// side may fallback to plain encoding and the underlying memory address for the vector
178-
// will change as result.
179-
// 2. if the column type is of fixed width, in other words, string/binary are not supported
180-
// since the native side may resize the vector and therefore change memory address.
181-
// 3. if the last loaded vector contains null values: if values of last vector are all not
182-
// null, Arrow C data API will skip loading the native validity buffer, therefore we
183-
// should not re-use the vector in that case.
184-
// 4. if the last loaded vector doesn't contain any null value, but the current vector also
185-
// are all not null, which means we can also re-use the loaded vector.
186-
// 5. if the new number of value is the same or smaller
187-
if ((hadNull || currentNumNulls == 0)
188-
&& currentVector != null
189-
&& dictionary == null
190-
&& currentVector.isFixedLength()
191-
&& currentVector.numValues() >= currentNumValues) {
192-
currentVector.setNumNulls(currentNumNulls);
193-
currentVector.setNumValues(currentNumValues);
194-
return currentVector;
195-
}
196-
197175
LOG.debug("Reloading vector");
198176

199177
// Close the previous vector first to release struct memory allocated to import Arrow array &

common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ public ConstantColumnReader(
5353

5454
public ConstantColumnReader(
5555
DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
56-
super(type, descriptor, useDecimal128);
56+
super(type, descriptor, useDecimal128, true);
5757
this.value = value;
5858
}
5959

6060
ConstantColumnReader(
6161
DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
62-
super(type, descriptor, useDecimal128);
62+
super(type, descriptor, useDecimal128, true);
6363
this.batchSize = batchSize;
6464
initNative();
6565
}

common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ public class MetadataColumnReader extends AbstractColumnReader {
4040
private ArrowArray array = null;
4141
private ArrowSchema schema = null;
4242

43-
public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
43+
private boolean isConstant;
44+
45+
public MetadataColumnReader(
46+
DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
4447
// TODO: should we handle legacy dates & timestamps for metadata columns?
4548
super(type, descriptor, useDecimal128, false);
49+
50+
this.isConstant = isConstant;
4651
}
4752

4853
@Override
@@ -62,7 +67,7 @@ public void readBatch(int total) {
6267

6368
Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
6469
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
65-
vector = new CometPlainVector(fieldVector, useDecimal128);
70+
vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant);
6671
}
6772

6873
vector.setNumValues(total);

common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class RowIndexColumnReader extends MetadataColumnReader {
3333
private long offset;
3434

3535
public RowIndexColumnReader(StructField field, int batchSize, long[] indices) {
36-
super(field.dataType(), TypeUtil.convertToParquet(field), false);
36+
super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
3737
this.indices = indices;
3838
setBatchSize(batchSize);
3939
}

common/src/main/java/org/apache/comet/vector/CometPlainVector.java

+16
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,18 @@ public class CometPlainVector extends CometDecodedVector {
3838
private byte booleanByteCache;
3939
private int booleanByteCacheIndex = -1;
4040

41+
private boolean isReused;
42+
4143
public CometPlainVector(ValueVector vector, boolean useDecimal128) {
4244
this(vector, useDecimal128, false);
4345
}
4446

4547
public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) {
48+
this(vector, useDecimal128, isUuid, false);
49+
}
50+
51+
public CometPlainVector(
52+
ValueVector vector, boolean useDecimal128, boolean isUuid, boolean isReused) {
4653
super(vector, vector.getField(), useDecimal128, isUuid);
4754
// NullType doesn't have data buffer.
4855
if (vector instanceof NullVector) {
@@ -52,6 +59,15 @@ public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUui
5259
}
5360

5461
isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector;
62+
this.isReused = isReused;
63+
}
64+
65+
public boolean isReused() {
66+
return isReused;
67+
}
68+
69+
public void setReused(boolean isReused) {
70+
this.isReused = isReused;
5571
}
5672

5773
@Override

common/src/main/scala/org/apache/comet/CometConf.scala

+41-15
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ object CometConf extends ShimCometConf {
210210
createExecEnabledConfig("window", defaultValue = true)
211211
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
212212
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
213+
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
214+
createExecEnabledConfig("initCap", defaultValue = false)
213215

214216
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
215217
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
@@ -275,6 +277,13 @@ object CometConf extends ShimCometConf {
275277
.checkValues(Set("native", "jvm", "auto"))
276278
.createWithDefault("auto")
277279

280+
val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
281+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
282+
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
283+
.internal()
284+
.booleanConf
285+
.createWithDefault(false)
286+
278287
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
279288
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
280289
.doc(
@@ -293,12 +302,29 @@ object CometConf extends ShimCometConf {
293302
.booleanConf
294303
.createWithDefault(false)
295304

296-
val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
297-
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
298-
.doc(
299-
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
300-
.stringConf
301-
.createWithDefault("zstd")
305+
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
306+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
307+
.doc(
308+
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
309+
"snappy are supported. Compression can be disabled by setting " +
310+
"spark.shuffle.compress=false.")
311+
.stringConf
312+
.checkValues(Set("zstd", "lz4", "snappy"))
313+
.createWithDefault("lz4")
314+
315+
val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
316+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
317+
.doc("The compression level to use when compressing shuffle files with zstd.")
318+
.intConf
319+
.createWithDefault(1)
320+
321+
val COMET_SHUFFLE_ENABLE_FAST_ENCODING: ConfigEntry[Boolean] =
322+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enableFastEncoding")
323+
.doc("Whether to enable Comet's faster proprietary encoding for shuffle blocks " +
324+
"rather than using Arrow IPC.")
325+
.internal()
326+
.booleanConf
327+
.createWithDefault(true)
302328

303329
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
304330
conf("spark.comet.columnar.shuffle.async.enabled")
@@ -465,21 +491,21 @@ object CometConf extends ShimCometConf {
465491
.intConf
466492
.createWithDefault(8192)
467493

468-
val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction")
469-
.doc(
470-
"The fraction of memory from Comet memory overhead that the native memory " +
471-
"manager can use for execution. The purpose of this config is to set aside memory for " +
472-
"untracked data structures, as well as imprecise size estimation during memory " +
473-
"acquisition.")
474-
.doubleConf
475-
.createWithDefault(0.7)
476-
477494
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
478495
conf("spark.comet.parquet.enable.directBuffer")
479496
.doc("Whether to use Java direct byte buffer when reading Parquet.")
480497
.booleanConf
481498
.createWithDefault(false)
482499

500+
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
501+
.doc(
502+
"The type of memory pool to be used for Comet native execution. " +
503+
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
504+
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, " +
505+
"this config is 'greedy_task_shared'.")
506+
.stringConf
507+
.createWithDefault("greedy_task_shared")
508+
483509
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
484510
conf("spark.comet.scan.preFetch.enabled")
485511
.doc("Whether to enable pre-fetching feature of CometScan.")

common/src/main/scala/org/apache/comet/vector/NativeUtil.scala

-2
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,6 @@ class NativeUtil {
163163
case numRows =>
164164
val cometVectors = importVector(arrays, schemas)
165165
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
166-
case flag =>
167-
throw new IllegalStateException(s"Invalid native flag: $flag")
168166
}
169167
}
170168

common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala

-127
This file was deleted.

0 commit comments

Comments
 (0)