Skip to content

Commit fef5bde

Browse files
authored
Merge pull request #319 from alex268/master
Runtime loader of grpc-netty or grpc-netty-shaded libraries
2 parents 9227ada + f79ec33 commit fef5bde

File tree

17 files changed

+217
-179
lines changed

17 files changed

+217
-179
lines changed

Diff for: core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
import com.google.common.net.HostAndPort;
1616
import com.google.common.util.concurrent.MoreExecutors;
1717
import io.grpc.ManagedChannel;
18-
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
1918

2019
import tech.ydb.auth.AuthRpcProvider;
2120
import tech.ydb.auth.NopAuthProvider;
2221
import tech.ydb.core.impl.YdbSchedulerFactory;
2322
import tech.ydb.core.impl.YdbTransportImpl;
2423
import tech.ydb.core.impl.auth.GrpcAuthRpc;
25-
import tech.ydb.core.impl.pool.DefaultChannelFactory;
24+
import tech.ydb.core.impl.pool.ChannelFactoryLoader;
2625
import tech.ydb.core.impl.pool.ManagedChannelFactory;
2726
import tech.ydb.core.utils.Version;
2827

@@ -69,7 +68,7 @@ public enum InitMode {
6968

7069
private byte[] cert = null;
7170
private boolean useTLS = false;
72-
private ManagedChannelFactory.Builder channelFactoryBuilder = DefaultChannelFactory::build;
71+
private ManagedChannelFactory.Builder channelFactoryBuilder = null;
7372
private Supplier<ScheduledExecutorService> schedulerFactory = YdbSchedulerFactory::createScheduler;
7473
private String localDc;
7574
private BalancingSettings balancingSettings;
@@ -177,6 +176,10 @@ public boolean useDefaultGrpcResolver() {
177176
}
178177

179178
public ManagedChannelFactory getManagedChannelFactory() {
179+
if (channelFactoryBuilder == null) {
180+
channelFactoryBuilder = ChannelFactoryLoader.load();
181+
}
182+
180183
return channelFactoryBuilder.buildFactory(this);
181184
}
182185

@@ -193,18 +196,20 @@ public GrpcTransportBuilder withChannelFactoryBuilder(ManagedChannelFactory.Buil
193196
}
194197

195198
/**
196-
* Set a custom initialization of {@link NettyChannelBuilder} <br>
199+
* Set a custom initialization of {@link io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder} <br>
197200
* This method is deprecated. Use
198201
* {@link GrpcTransportBuilder#withChannelFactoryBuilder(tech.ydb.core.impl.pool.ManagedChannelFactory.Builder)}
199202
* instead
200203
*
201-
* @param channelInitializer custom NettyChannelBuilder initializator
204+
* @param ci custom NettyChannelBuilder initializator
202205
* @return this
203206
* @deprecated
204207
*/
205208
@Deprecated
206-
public GrpcTransportBuilder withChannelInitializer(Consumer<NettyChannelBuilder> channelInitializer) {
207-
this.channelFactoryBuilder = gtb -> DefaultChannelFactory.build(gtb, channelInitializer);
209+
public GrpcTransportBuilder withChannelInitializer(
210+
Consumer<io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder> ci
211+
) {
212+
this.channelFactoryBuilder = tech.ydb.core.impl.pool.ShadedNettyChannelFactory.withInterceptor(ci);
208213
return this;
209214
}
210215

Diff for: core/src/main/java/tech/ydb/core/impl/YdbDiscovery.java

+24-20
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import tech.ydb.core.grpc.GrpcTransport;
2222
import tech.ydb.core.impl.pool.EndpointRecord;
2323
import tech.ydb.core.operation.OperationBinder;
24-
import tech.ydb.core.utils.Async;
24+
import tech.ydb.core.utils.FutureTools;
2525
import tech.ydb.proto.discovery.DiscoveryProtos;
2626
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;
2727

@@ -140,26 +140,30 @@ private void tick() {
140140

141141
private void runDiscovery() {
142142
lastUpdateTime = handler.instant();
143-
final GrpcTransport transport = handler.createDiscoveryTransport();
144143
try {
145-
logger.debug("execute list endpoints on {} with timeout {}", transport, discoveryTimeout);
146-
DiscoveryProtos.ListEndpointsRequest request = DiscoveryProtos.ListEndpointsRequest.newBuilder()
147-
.setDatabase(discoveryDatabase)
148-
.build();
149-
150-
GrpcRequestSettings grpcSettings = GrpcRequestSettings.newBuilder()
151-
.withDeadline(discoveryTimeout)
152-
.build();
153-
154-
transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request)
155-
.whenComplete((res, ex) -> transport.close()) // close transport for any result
156-
.thenApply(OperationBinder.bindSync(
157-
DiscoveryProtos.ListEndpointsResponse::getOperation,
158-
DiscoveryProtos.ListEndpointsResult.class
159-
))
160-
.whenComplete(this::handleDiscoveryResult);
144+
final GrpcTransport transport = handler.createDiscoveryTransport();
145+
try {
146+
logger.debug("execute list endpoints on {} with timeout {}", transport, discoveryTimeout);
147+
DiscoveryProtos.ListEndpointsRequest request = DiscoveryProtos.ListEndpointsRequest.newBuilder()
148+
.setDatabase(discoveryDatabase)
149+
.build();
150+
151+
GrpcRequestSettings grpcSettings = GrpcRequestSettings.newBuilder()
152+
.withDeadline(discoveryTimeout)
153+
.build();
154+
155+
transport.unaryCall(DiscoveryServiceGrpc.getListEndpointsMethod(), grpcSettings, request)
156+
.whenComplete((res, ex) -> transport.close()) // close transport for any result
157+
.thenApply(OperationBinder.bindSync(
158+
DiscoveryProtos.ListEndpointsResponse::getOperation,
159+
DiscoveryProtos.ListEndpointsResult.class
160+
))
161+
.whenComplete(this::handleDiscoveryResult);
162+
} catch (Throwable th) {
163+
transport.close();
164+
throw th;
165+
}
161166
} catch (Throwable th) {
162-
transport.close();
163167
handleDiscoveryResult(null, th);
164168
}
165169
}
@@ -183,7 +187,7 @@ private void handleOk(String selfLocation, List<EndpointRecord> endpoints) {
183187

184188
private void handleDiscoveryResult(Result<DiscoveryProtos.ListEndpointsResult> response, Throwable th) {
185189
if (th != null) {
186-
Throwable cause = Async.unwrapCompletionException(th);
190+
Throwable cause = FutureTools.unwrapCompletionException(th);
187191
logger.warn("couldn't perform discovery with exception", cause);
188192
handleThrowable(cause);
189193
return;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package tech.ydb.core.impl.pool;
2+
3+
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
8+
/**
9+
*
10+
* @author Aleksandr Gorshenin
11+
*/
12+
public class ChannelFactoryLoader {
13+
private static final Logger logger = LoggerFactory.getLogger(ChannelFactoryLoader.class);
14+
15+
private ChannelFactoryLoader() { }
16+
17+
public static ManagedChannelFactory.Builder load() {
18+
return FactoryLoader.factory;
19+
}
20+
21+
private static class FactoryLoader {
22+
private static final String SHADED_DEPS = "io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder";
23+
private static final String NETTY_DEPS = "io.grpc.netty.NettyChannelBuilder";
24+
25+
private static ManagedChannelFactory.Builder factory;
26+
27+
static {
28+
boolean ok = tryLoad(SHADED_DEPS, ShadedNettyChannelFactory.build())
29+
|| tryLoad(NETTY_DEPS, NettyChannelFactory.build());
30+
if (!ok) {
31+
throw new IllegalStateException("Cannot load any ManagedChannelFactory!! "
32+
+ "Classpath must contain grpc-netty or grpc-netty-shaded");
33+
}
34+
}
35+
36+
private static boolean tryLoad(String name, ManagedChannelFactory.Builder f) {
37+
try {
38+
Class.forName(name);
39+
logger.info("class {} is found, use {}", name, f);
40+
factory = f;
41+
return true;
42+
} catch (ClassNotFoundException ex) {
43+
logger.info("class {} is not found", name);
44+
return false;
45+
}
46+
}
47+
}
48+
}

Diff for: core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ public class GrpcChannel {
2525
private final ReadyWatcher readyWatcher;
2626

2727
public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) {
28-
logger.debug("Creating grpc channel with {}", endpoint);
29-
this.endpoint = endpoint;
30-
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort());
31-
this.connectTimeoutMs = factory.getConnectTimeoutMs();
32-
this.readyWatcher = new ReadyWatcher();
33-
this.readyWatcher.checkState();
28+
try {
29+
logger.debug("Creating grpc channel with {}", endpoint);
30+
this.endpoint = endpoint;
31+
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort());
32+
this.connectTimeoutMs = factory.getConnectTimeoutMs();
33+
this.readyWatcher = new ReadyWatcher();
34+
this.readyWatcher.checkState();
35+
} catch (Throwable th) {
36+
logger.error("cannot create channel", th);
37+
throw new RuntimeException("cannot create channel", th);
38+
}
3439
}
3540

3641
public EndpointRecord getEndpoint() {

Diff for: core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.ByteArrayInputStream;
44
import java.util.concurrent.TimeUnit;
5+
import java.util.function.Consumer;
56

67
import javax.net.ssl.SSLException;
78

@@ -39,7 +40,7 @@ public class NettyChannelFactory implements ManagedChannelFactory {
3940
private final boolean useDefaultGrpcResolver;
4041
private final Long grpcKeepAliveTimeMillis;
4142

42-
public NettyChannelFactory(GrpcTransportBuilder builder) {
43+
private NettyChannelFactory(GrpcTransportBuilder builder) {
4344
this.database = builder.getDatabase();
4445
this.version = builder.getVersionString();
4546
this.useTLS = builder.getUseTls();
@@ -120,4 +121,29 @@ private SslContext createSslContext() {
120121
throw new RuntimeException("cannot create ssl context", e);
121122
}
122123
}
124+
125+
public static ManagedChannelFactory.Builder build() {
126+
return new Builder() {
127+
@Override
128+
public ManagedChannelFactory buildFactory(GrpcTransportBuilder builder) {
129+
return new NettyChannelFactory(builder);
130+
}
131+
132+
@Override
133+
public String toString() {
134+
return "NettyChannelFactory";
135+
}
136+
};
137+
}
138+
139+
public static ManagedChannelFactory.Builder withInterceptor(Consumer<NettyChannelBuilder> ci) {
140+
return builder -> new NettyChannelFactory(builder) {
141+
@Override
142+
protected void configure(NettyChannelBuilder channelBuilder) {
143+
if (ci != null) {
144+
ci.accept(channelBuilder);
145+
}
146+
}
147+
};
148+
}
123149
}

Diff for: core/src/main/java/tech/ydb/core/impl/pool/DefaultChannelFactory.java renamed to core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author Nikolay Perfilov
2828
* @author Aleksandr Gorshenin
2929
*/
30-
public class DefaultChannelFactory implements ManagedChannelFactory {
30+
public class ShadedNettyChannelFactory implements ManagedChannelFactory {
3131
static final int INBOUND_MESSAGE_SIZE = 64 << 20; // 64 MiB
3232
static final String DEFAULT_BALANCER_POLICY = "round_robin";
3333

@@ -40,7 +40,7 @@ public class DefaultChannelFactory implements ManagedChannelFactory {
4040
private final boolean useDefaultGrpcResolver;
4141
private final Long grpcKeepAliveTimeMillis;
4242

43-
private DefaultChannelFactory(GrpcTransportBuilder builder) {
43+
public ShadedNettyChannelFactory(GrpcTransportBuilder builder) {
4444
this.database = builder.getDatabase();
4545
this.version = builder.getVersionString();
4646
this.useTLS = builder.getUseTls();
@@ -122,12 +122,22 @@ private SslContext createSslContext() {
122122
}
123123
}
124124

125-
public static ManagedChannelFactory build(GrpcTransportBuilder builder) {
126-
return new DefaultChannelFactory(builder);
125+
public static ManagedChannelFactory.Builder build() {
126+
return new Builder() {
127+
@Override
128+
public ManagedChannelFactory buildFactory(GrpcTransportBuilder builder) {
129+
return new ShadedNettyChannelFactory(builder);
130+
}
131+
132+
@Override
133+
public String toString() {
134+
return "ShadedNettyChannelFactory";
135+
}
136+
};
127137
}
128138

129-
public static ManagedChannelFactory build(GrpcTransportBuilder builder, Consumer<NettyChannelBuilder> ci) {
130-
return new DefaultChannelFactory(builder) {
139+
public static ManagedChannelFactory.Builder withInterceptor(Consumer<NettyChannelBuilder> ci) {
140+
return builder -> new ShadedNettyChannelFactory(builder) {
131141
@Override
132142
protected void configure(NettyChannelBuilder channelBuilder) {
133143
if (ci != null) {

Diff for: core/src/main/java/tech/ydb/core/utils/Async.java

-81
This file was deleted.

0 commit comments

Comments
 (0)