Skip to content

Commit 08c9854

Browse files
authored
Enable ComponentSuppliers to run queries using Dart (#17787)
Enables Calcite*Test-s and quidem tests to run queries with Dart. needed some minor tweaks: changed to use interfaces at some places renamed DartWorkerClient to DartWorkerClientImpl and made DartWorkerClient an interface reused existing parts of the MSQ test system to run the query
1 parent daaa726 commit 08c9854

21 files changed

+936
-264
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.inject.Injector;
24-
import org.apache.druid.client.BrokerServerView;
24+
import org.apache.druid.client.TimelineServerView;
2525
import org.apache.druid.error.DruidException;
2626
import org.apache.druid.indexing.common.TaskLockType;
2727
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -84,7 +84,7 @@ public class DartControllerContext implements ControllerContext
8484
private final ObjectMapper jsonMapper;
8585
private final DruidNode selfNode;
8686
private final DartWorkerClient workerClient;
87-
private final BrokerServerView serverView;
87+
private final TimelineServerView serverView;
8888
private final MemoryIntrospector memoryIntrospector;
8989
private final ServiceMetricEvent.Builder metricBuilder;
9090
private final ServiceEmitter emitter;
@@ -95,7 +95,7 @@ public DartControllerContext(
9595
final DruidNode selfNode,
9696
final DartWorkerClient workerClient,
9797
final MemoryIntrospector memoryIntrospector,
98-
final BrokerServerView serverView,
98+
final TimelineServerView serverView,
9999
final ServiceEmitter emitter
100100
)
101101
{

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,28 @@
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.inject.Inject;
2424
import com.google.inject.Injector;
25-
import org.apache.druid.client.BrokerServerView;
25+
import org.apache.druid.client.TimelineServerView;
2626
import org.apache.druid.guice.annotations.EscalatedGlobal;
2727
import org.apache.druid.guice.annotations.Json;
2828
import org.apache.druid.guice.annotations.Self;
2929
import org.apache.druid.guice.annotations.Smile;
3030
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
31-
import org.apache.druid.msq.dart.worker.DartWorkerClient;
31+
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
3232
import org.apache.druid.msq.exec.ControllerContext;
3333
import org.apache.druid.msq.exec.MemoryIntrospector;
3434
import org.apache.druid.rpc.ServiceClientFactory;
3535
import org.apache.druid.server.DruidNode;
3636

3737
public class DartControllerContextFactoryImpl implements DartControllerContextFactory
3838
{
39-
private final Injector injector;
40-
private final ObjectMapper jsonMapper;
41-
private final ObjectMapper smileMapper;
42-
private final DruidNode selfNode;
43-
private final ServiceClientFactory serviceClientFactory;
44-
private final BrokerServerView serverView;
45-
private final MemoryIntrospector memoryIntrospector;
46-
private final ServiceEmitter emitter;
39+
protected final Injector injector;
40+
protected final ObjectMapper jsonMapper;
41+
protected final ObjectMapper smileMapper;
42+
protected final DruidNode selfNode;
43+
protected final ServiceClientFactory serviceClientFactory;
44+
protected final TimelineServerView serverView;
45+
protected final MemoryIntrospector memoryIntrospector;
46+
protected final ServiceEmitter emitter;
4747

4848
@Inject
4949
public DartControllerContextFactoryImpl(
@@ -53,7 +53,7 @@ public DartControllerContextFactoryImpl(
5353
@Self final DruidNode selfNode,
5454
@EscalatedGlobal final ServiceClientFactory serviceClientFactory,
5555
final MemoryIntrospector memoryIntrospector,
56-
final BrokerServerView serverView,
56+
final TimelineServerView serverView,
5757
final ServiceEmitter emitter
5858
)
5959
{
@@ -74,7 +74,7 @@ public ControllerContext newContext(final String queryId)
7474
injector,
7575
jsonMapper,
7676
selfNode,
77-
new DartWorkerClient(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
77+
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
7878
memoryIntrospector,
7979
serverView,
8080
emitter

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClient.java

+3-174
Original file line numberDiff line numberDiff line change
@@ -19,192 +19,21 @@
1919

2020
package org.apache.druid.msq.dart.worker;
2121

22-
import com.fasterxml.jackson.databind.ObjectMapper;
23-
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
2422
import com.google.common.util.concurrent.ListenableFuture;
25-
import com.google.errorprone.annotations.concurrent.GuardedBy;
26-
import it.unimi.dsi.fastutil.Pair;
27-
import org.apache.druid.error.DruidException;
28-
import org.apache.druid.java.util.common.logger.Logger;
29-
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
3023
import org.apache.druid.msq.dart.controller.DartWorkerManager;
31-
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
32-
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
3324
import org.apache.druid.msq.exec.WorkerClient;
34-
import org.apache.druid.msq.rpc.BaseWorkerClientImpl;
35-
import org.apache.druid.rpc.FixedServiceLocator;
36-
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
37-
import org.apache.druid.rpc.RequestBuilder;
38-
import org.apache.druid.rpc.ServiceClient;
39-
import org.apache.druid.rpc.ServiceClientFactory;
40-
import org.apache.druid.rpc.ServiceLocation;
41-
import org.apache.druid.rpc.ServiceRetryPolicy;
42-
import org.apache.druid.utils.CloseableUtils;
43-
import org.jboss.netty.handler.codec.http.HttpMethod;
4425

45-
import javax.annotation.Nullable;
46-
import java.io.Closeable;
47-
import java.net.URI;
48-
import java.util.HashMap;
49-
import java.util.Map;
50-
51-
/**
52-
* Dart implementation of {@link WorkerClient}. Uses the same {@link BaseWorkerClientImpl} as the task-based engine.
53-
* Each instance of this class is scoped to a single query.
54-
*/
55-
public class DartWorkerClient extends BaseWorkerClientImpl
26+
public interface DartWorkerClient extends WorkerClient
5627
{
57-
private static final Logger log = new Logger(DartWorkerClient.class);
58-
59-
private final String queryId;
60-
private final ServiceClientFactory clientFactory;
61-
private final ServiceRetryPolicy retryPolicy;
62-
63-
@Nullable
64-
private final String controllerHost;
65-
66-
@GuardedBy("clientMap")
67-
private final Map<String, Pair<ServiceClient, Closeable>> clientMap = new HashMap<>();
68-
69-
/**
70-
* Create a worker client.
71-
*
72-
* @param queryId dart query ID. see {@link DartSqlEngine#CTX_DART_QUERY_ID}
73-
* @param clientFactory service client factor
74-
* @param smileMapper Smile object mapper
75-
* @param controllerHost Controller host (see {@link DartWorkerResource#HEADER_CONTROLLER_HOST}) if this is a
76-
* controller-to-worker client. Null if this is a worker-to-worker client.
77-
*/
78-
public DartWorkerClient(
79-
final String queryId,
80-
final ServiceClientFactory clientFactory,
81-
final ObjectMapper smileMapper,
82-
@Nullable final String controllerHost
83-
)
84-
{
85-
super(smileMapper, SmileMediaTypes.APPLICATION_JACKSON_SMILE);
86-
this.queryId = queryId;
87-
this.clientFactory = clientFactory;
88-
this.controllerHost = controllerHost;
89-
90-
if (controllerHost == null) {
91-
// worker -> worker client. Retry HTTP 503 in case worker A starts up before worker B, and needs to
92-
// contact it immediately.
93-
this.retryPolicy = new DartWorkerRetryPolicy(true);
94-
} else {
95-
// controller -> worker client. Do not retry any HTTP error codes. If we retry HTTP 503 for controller -> worker,
96-
// we can get stuck trying to contact workers that have exited.
97-
this.retryPolicy = new DartWorkerRetryPolicy(false);
98-
}
99-
}
100-
101-
@Override
102-
protected ServiceClient getClient(final String workerIdString)
103-
{
104-
final WorkerId workerId = WorkerId.fromString(workerIdString);
105-
if (!queryId.equals(workerId.getQueryId())) {
106-
throw DruidException.defensive("Unexpected queryId[%s]. Expected queryId[%s]", workerId.getQueryId(), queryId);
107-
}
108-
109-
synchronized (clientMap) {
110-
return clientMap.computeIfAbsent(workerId.getHostAndPort(), ignored -> makeNewClient(workerId)).left();
111-
}
112-
}
113-
11428
/**
11529
* Close a single worker's clients. Used when that worker fails, so we stop trying to contact it.
11630
*
11731
* @param workerHost worker host:port
11832
*/
119-
public void closeClient(final String workerHost)
120-
{
121-
synchronized (clientMap) {
122-
final Pair<ServiceClient, Closeable> clientPair = clientMap.remove(workerHost);
123-
if (clientPair != null) {
124-
CloseableUtils.closeAndWrapExceptions(clientPair.right());
125-
}
126-
}
127-
}
128-
129-
/**
130-
* Close all outstanding clients.
131-
*/
132-
@Override
133-
public void close()
134-
{
135-
synchronized (clientMap) {
136-
for (Map.Entry<String, Pair<ServiceClient, Closeable>> entry : clientMap.entrySet()) {
137-
CloseableUtils.closeAndSuppressExceptions(
138-
entry.getValue().right(),
139-
e -> log.warn(e, "Failed to close client[%s]", entry.getKey())
140-
);
141-
}
142-
143-
clientMap.clear();
144-
}
145-
}
33+
void closeClient(String hostAndPort);
14634

14735
/**
14836
* Stops a worker. Dart-only API, used by the {@link DartWorkerManager}.
14937
*/
150-
public ListenableFuture<?> stopWorker(String workerId)
151-
{
152-
return getClient(workerId).asyncRequest(
153-
new RequestBuilder(HttpMethod.POST, "/stop"),
154-
IgnoreHttpResponseHandler.INSTANCE
155-
);
156-
}
157-
158-
/**
159-
* Create a new client. Called by {@link #getClient(String)} if a new one is needed.
160-
*/
161-
private Pair<ServiceClient, Closeable> makeNewClient(final WorkerId workerId)
162-
{
163-
final URI uri = workerId.toUri();
164-
final FixedServiceLocator locator = new FixedServiceLocator(ServiceLocation.fromUri(uri));
165-
final ServiceClient baseClient =
166-
clientFactory.makeClient(workerId.toString(), locator, retryPolicy);
167-
final ServiceClient client;
168-
169-
if (controllerHost != null) {
170-
client = new ControllerDecoratedClient(baseClient, controllerHost);
171-
} else {
172-
client = baseClient;
173-
}
174-
175-
return Pair.of(client, locator);
176-
}
177-
178-
/**
179-
* Service client that adds the {@link DartWorkerResource#HEADER_CONTROLLER_HOST} header.
180-
*/
181-
private static class ControllerDecoratedClient implements ServiceClient
182-
{
183-
private final ServiceClient delegate;
184-
private final String controllerHost;
185-
186-
ControllerDecoratedClient(final ServiceClient delegate, final String controllerHost)
187-
{
188-
this.delegate = delegate;
189-
this.controllerHost = controllerHost;
190-
}
191-
192-
@Override
193-
public <IntermediateType, FinalType> ListenableFuture<FinalType> asyncRequest(
194-
final RequestBuilder requestBuilder,
195-
final HttpResponseHandler<IntermediateType, FinalType> handler
196-
)
197-
{
198-
return delegate.asyncRequest(
199-
requestBuilder.header(DartWorkerResource.HEADER_CONTROLLER_HOST, controllerHost),
200-
handler
201-
);
202-
}
203-
204-
@Override
205-
public ServiceClient withRetryPolicy(final ServiceRetryPolicy retryPolicy)
206-
{
207-
return new ControllerDecoratedClient(delegate.withRetryPolicy(retryPolicy), controllerHost);
208-
}
209-
}
38+
ListenableFuture<?> stopWorker(String workerId);
21039
}

0 commit comments

Comments
 (0)