diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java index f87edd1a3e169..9eff2161e29c9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Objects; -import java.util.function.Function; import java.util.function.Supplier; /** @@ -32,17 +31,14 @@ public class ExchangeSinkOperator extends SinkOperator { private final ExchangeSink sink; - private final Function transformer; private int pagesReceived; private long rowsReceived; - public record ExchangeSinkOperatorFactory(Supplier exchangeSinks, Function transformer) - implements - SinkOperatorFactory { + public record ExchangeSinkOperatorFactory(Supplier exchangeSinks) implements SinkOperatorFactory { @Override public SinkOperator get(DriverContext driverContext) { - return new ExchangeSinkOperator(exchangeSinks.get(), transformer); + return new ExchangeSinkOperator(exchangeSinks.get()); } @Override @@ -51,9 +47,8 @@ public String describe() { } } - public ExchangeSinkOperator(ExchangeSink sink, Function transformer) { + public ExchangeSinkOperator(ExchangeSink sink) { this.sink = sink; - this.transformer = transformer; } @Override @@ -84,7 +79,7 @@ public boolean needsInput() { protected void doAddInput(Page page) { pagesReceived++; rowsReceived += page.getPositionCount(); - sink.addPage(transformer.apply(page)); + sink.addPage(page); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index 7fe90a3fa4ee5..c538cf41ee1fd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -45,7 +45,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.function.LongSupplier; import static org.hamcrest.Matchers.either; @@ -328,7 +327,7 @@ public void testEarlyTermination() { final int maxAllowedRows = between(1, 100); final AtomicInteger processedRows = new AtomicInteger(0); var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis); - var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); + var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {})); final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() { @Override public Block eval(Page page) { @@ -365,7 +364,7 @@ public void testResumeOnEarlyFinish() throws Exception { var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql")); var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis); var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource()); - var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); + var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {})); Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator); PlainActionFuture future = new PlainActionFuture<>(); Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index bc3024ac4f45d..7d1471a0f3ebf 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -39,7 +39,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -242,7 +241,7 @@ List createDriversForInput(List input, List results, boolean simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context), intermediateOperatorItr.next() ), - new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity()) + new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {})) ) ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 92cf99b5b3a87..9da7ae8d4bbd5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -65,7 +65,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -305,7 +304,7 @@ Set runConcurrentTest( "sink-" + i, dc, seqNoGenerator.get(dc), - new ExchangeSinkOperator(exchangeSink.get(), Function.identity()) + new ExchangeSinkOperator(exchangeSink.get()) ); drivers.add(d); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java index e342f17363bc8..940471484c3e7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExec.java @@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec { ); private final List output; + // TODO: remove this flag private final boolean intermediateAgg; public ExchangeSinkExec(Source source, List output, boolean intermediateAgg, PhysicalPlan child) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 3e1c8b2585012..d82417c6acfff 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -385,14 +385,8 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) { Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided"); var child = exchangeSink.child(); - PhysicalOperation source = plan(child, context); - - Function transformer = exchangeSink.isIntermediateAgg() - ? Function.identity() - : alignPageToAttributes(exchangeSink.output(), source.layout); - - return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout); + return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout); } private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {