Skip to content

Commit 6d28c9f

Browse files
committed
Break out an abstract channel factory
1 parent cae5fea commit 6d28c9f

File tree

6 files changed

+250
-208
lines changed

6 files changed

+250
-208
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package com.eatthepath.pushy.apns;
2+
3+
import com.eatthepath.pushy.apns.proxy.ProxyHandlerFactory;
4+
import io.netty.bootstrap.Bootstrap;
5+
import io.netty.channel.*;
6+
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.handler.ssl.SslContext;
8+
import io.netty.handler.ssl.SslHandler;
9+
import io.netty.resolver.AddressResolverGroup;
10+
import io.netty.resolver.NoopAddressResolverGroup;
11+
import io.netty.util.AttributeKey;
12+
import io.netty.util.ReferenceCounted;
13+
import io.netty.util.concurrent.Future;
14+
import io.netty.util.concurrent.Promise;
15+
import io.netty.util.concurrent.PromiseNotifier;
16+
17+
import javax.net.ssl.SSLEngine;
18+
import javax.net.ssl.SSLParameters;
19+
import java.io.Closeable;
20+
import java.net.InetSocketAddress;
21+
import java.net.SocketAddress;
22+
import java.time.Duration;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
/**
28+
* An APNs channel factory creates new channels connected to an APNs server. Channels constructed by this factory are
29+
* intended for use in an {@link ApnsChannelPool}.
30+
*/
31+
abstract class AbstractApnsChannelFactory implements PooledObjectFactory<Channel>, Closeable {
32+
33+
private final SslContext sslContext;
34+
private final AtomicBoolean hasReleasedSslContext = new AtomicBoolean(false);
35+
36+
private final AddressResolverGroup<? extends SocketAddress> addressResolverGroup;
37+
38+
private final Bootstrap bootstrapTemplate;
39+
40+
private final AtomicLong currentDelaySeconds = new AtomicLong(0);
41+
42+
private static final long MIN_CONNECT_DELAY_SECONDS = 1;
43+
private static final long MAX_CONNECT_DELAY_SECONDS = 60;
44+
45+
static final AttributeKey<Promise<Channel>> CHANNEL_READY_PROMISE_ATTRIBUTE_KEY =
46+
AttributeKey.valueOf(ApnsNotificationChannelFactory.class, "channelReadyPromise");
47+
48+
AbstractApnsChannelFactory(final InetSocketAddress serverAddress,
49+
final SslContext sslContext,
50+
final ProxyHandlerFactory proxyHandlerFactory,
51+
final boolean hostnameVerificationEnabled,
52+
final Duration connectionTimeout,
53+
final ApnsClientResources clientResources) {
54+
55+
this.sslContext = sslContext;
56+
57+
if (this.sslContext instanceof ReferenceCounted) {
58+
((ReferenceCounted) this.sslContext).retain();
59+
}
60+
61+
this.addressResolverGroup = proxyHandlerFactory != null
62+
? NoopAddressResolverGroup.INSTANCE
63+
: clientResources.getRoundRobinDnsAddressResolverGroup();
64+
65+
this.bootstrapTemplate = new Bootstrap();
66+
this.bootstrapTemplate.group(clientResources.getEventLoopGroup());
67+
this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true);
68+
this.bootstrapTemplate.remoteAddress(serverAddress);
69+
this.bootstrapTemplate.resolver(this.addressResolverGroup);
70+
71+
if (connectionTimeout != null) {
72+
this.bootstrapTemplate.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectionTimeout.toMillis());
73+
}
74+
75+
this.bootstrapTemplate.handler(new ChannelInitializer<SocketChannel>() {
76+
77+
@Override
78+
protected void initChannel(final SocketChannel channel) {
79+
final String authority = serverAddress.getHostName();
80+
final SslHandler sslHandler = sslContext.newHandler(channel.alloc(), authority, serverAddress.getPort());
81+
82+
if (hostnameVerificationEnabled) {
83+
final SSLEngine sslEngine = sslHandler.engine();
84+
final SSLParameters sslParameters = sslEngine.getSSLParameters();
85+
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
86+
sslEngine.setSSLParameters(sslParameters);
87+
}
88+
89+
constructPipeline(sslHandler, channel.pipeline());
90+
}
91+
});
92+
}
93+
94+
protected abstract void constructPipeline(final SslHandler sslHandler, final ChannelPipeline pipeline);
95+
96+
/**
97+
* Creates and connects a new channel. The initial connection attempt may be delayed to accommodate exponential
98+
* back-off requirements.
99+
*
100+
* @param channelReadyPromise the promise to be notified when a channel has been created and connected to the APNs
101+
* server
102+
*
103+
* @return a future that will be notified once a channel has been created and connected to the APNs server
104+
*/
105+
@Override
106+
public Future<Channel> create(final Promise<Channel> channelReadyPromise) {
107+
final long delay = this.currentDelaySeconds.get();
108+
109+
channelReadyPromise.addListener(future -> {
110+
final long updatedDelay = future.isSuccess() ? 0 :
111+
Math.max(Math.min(delay * 2, MAX_CONNECT_DELAY_SECONDS), MIN_CONNECT_DELAY_SECONDS);
112+
113+
this.currentDelaySeconds.compareAndSet(delay, updatedDelay);
114+
});
115+
116+
117+
this.bootstrapTemplate.config().group().schedule(() -> {
118+
final Bootstrap bootstrap = this.bootstrapTemplate.clone()
119+
.channelFactory(new AugmentingReflectiveChannelFactory<>(
120+
ClientChannelClassUtil.getSocketChannelClass(this.bootstrapTemplate.config().group()),
121+
CHANNEL_READY_PROMISE_ATTRIBUTE_KEY, channelReadyPromise));
122+
123+
final ChannelFuture connectFuture = bootstrap.connect();
124+
125+
connectFuture.addListener(future -> {
126+
if (!future.isSuccess()) {
127+
channelReadyPromise.tryFailure(future.cause());
128+
}
129+
});
130+
}, delay, TimeUnit.SECONDS);
131+
132+
return channelReadyPromise;
133+
}
134+
135+
/**
136+
* Destroys a channel by closing it.
137+
*
138+
* @param channel the channel to destroy
139+
* @param promise the promise to notify when the channel has been destroyed
140+
*
141+
* @return a future that will be notified when the channel has been destroyed
142+
*/
143+
@Override
144+
public Future<Void> destroy(final Channel channel, final Promise<Void> promise) {
145+
channel.close().addListener(new PromiseNotifier<>(promise));
146+
return promise;
147+
}
148+
149+
@Override
150+
public void close() {
151+
try {
152+
this.addressResolverGroup.close();
153+
} finally {
154+
if (this.sslContext instanceof ReferenceCounted) {
155+
if (this.hasReleasedSslContext.compareAndSet(false, true)) {
156+
((ReferenceCounted) this.sslContext).release();
157+
}
158+
}
159+
}
160+
}
161+
}

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelFactory.java

Lines changed: 0 additions & 205 deletions
This file was deleted.

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.Set;
3737

3838
/**
39-
* <p>A pool of channels connected to an APNs server. Channel pools use a {@link ApnsChannelFactory} to create
39+
* <p>A pool of channels connected to an APNs server. Channel pools use a {@link ApnsNotificationChannelFactory} to create
4040
* connections (up to a given maximum capacity) on demand.</p>
4141
*
4242
* <p>Callers acquire channels from the pool via the {@link ApnsChannelPool#acquire()} method, and must return them to

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void handleConnectionCreationFailed() {
129129
this.metricsListener = clientConfiguration.getMetricsListener()
130130
.orElseGet(NoopApnsClientMetricsListener::new);
131131

132-
final ApnsChannelFactory channelFactory = new ApnsChannelFactory(clientConfiguration, this.clientResources);
132+
final ApnsNotificationChannelFactory channelFactory = new ApnsNotificationChannelFactory(clientConfiguration, this.clientResources);
133133

134134
final ApnsChannelPoolMetricsListener channelPoolMetricsListener = new ApnsChannelPoolMetricsListener() {
135135

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,6 @@ public void exceptionCaught(final ChannelHandlerContext context, final Throwable
490490
}
491491

492492
private Promise<Channel> getChannelReadyPromise(final Channel channel) {
493-
return channel.attr(ApnsChannelFactory.CHANNEL_READY_PROMISE_ATTRIBUTE_KEY).get();
493+
return channel.attr(ApnsNotificationChannelFactory.CHANNEL_READY_PROMISE_ATTRIBUTE_KEY).get();
494494
}
495495
}

0 commit comments

Comments
 (0)