Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #124824

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

WIP #124824

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -32,17 +31,14 @@
public class ExchangeSinkOperator extends SinkOperator {

private final ExchangeSink sink;
private final Function<Page, Page> transformer;
private int pagesReceived;
private long rowsReceived;

public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks, Function<Page, Page> transformer)
implements
SinkOperatorFactory {
public record ExchangeSinkOperatorFactory(Supplier<ExchangeSink> exchangeSinks) implements SinkOperatorFactory {

@Override
public SinkOperator get(DriverContext driverContext) {
return new ExchangeSinkOperator(exchangeSinks.get(), transformer);
return new ExchangeSinkOperator(exchangeSinks.get());
}

@Override
Expand All @@ -51,9 +47,8 @@ public String describe() {
}
}

public ExchangeSinkOperator(ExchangeSink sink, Function<Page, Page> transformer) {
public ExchangeSinkOperator(ExchangeSink sink) {
this.sink = sink;
this.transformer = transformer;
}

@Override
Expand Down Expand Up @@ -84,7 +79,7 @@ public boolean needsInput() {
protected void doAddInput(Page page) {
pagesReceived++;
rowsReceived += page.getPositionCount();
sink.addPage(transformer.apply(page));
sink.addPage(page);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.hamcrest.Matchers.either;
Expand Down Expand Up @@ -328,7 +327,7 @@ public void testEarlyTermination() {
final int maxAllowedRows = between(1, 100);
final AtomicInteger processedRows = new AtomicInteger(0);
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis);
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() {
@Override
public Block eval(Page page) {
Expand Down Expand Up @@ -365,7 +364,7 @@ public void testResumeOnEarlyFinish() throws Exception {
var sourceHandler = new ExchangeSourceHandler(between(1, 5), threadPool.executor("esql"));
var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis);
var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity());
var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}));
Driver driver = TestDriverFactory.create(driverContext, sourceOperator, List.of(), sinkOperator);
PlainActionFuture<Void> future = new PlainActionFuture<>();
Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -242,7 +241,7 @@ List<Driver> createDriversForInput(List<Page> input, List<Page> results, boolean
simpleWithMode(AggregatorMode.INTERMEDIATE).get(driver1Context),
intermediateOperatorItr.next()
),
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}), Function.identity())
new ExchangeSinkOperator(sinkExchanger.createExchangeSink(() -> {}))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -305,7 +304,7 @@ Set<Integer> runConcurrentTest(
"sink-" + i,
dc,
seqNoGenerator.get(dc),
new ExchangeSinkOperator(exchangeSink.get(), Function.identity())
new ExchangeSinkOperator(exchangeSink.get())
);
drivers.add(d);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ExchangeSinkExec extends UnaryExec {
);

private final List<Attribute> output;
// TODO: remove this flag
private final boolean intermediateAgg;

public ExchangeSinkExec(Source source, List<Attribute> output, boolean intermediateAgg, PhysicalPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,8 @@ private PhysicalOperation planExchange(ExchangeExec exchangeExec, LocalExecution
private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalExecutionPlannerContext context) {
Objects.requireNonNull(exchangeSinkSupplier, "ExchangeSinkHandler wasn't provided");
var child = exchangeSink.child();

PhysicalOperation source = plan(child, context);

Function<Page, Page> transformer = exchangeSink.isIntermediateAgg()
? Function.identity()
: alignPageToAttributes(exchangeSink.output(), source.layout);

return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier, transformer), source.layout);
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
}

private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {
Expand Down