Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ public class NettyConnector extends AbstractConnector {

private boolean httpEnabled;

private long httpMaxClientIdleTime;

private long httpClientIdleScanPeriod;

private boolean httpRequiresSessionId;

// if true, after the connection, the connector will send
Expand Down Expand Up @@ -368,8 +364,6 @@ public NettyConnector(final Map<String, Object> configuration,
httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
servletPath = ConfigurationHelper.getStringProperty(TransportConstants.SERVLET_PATH, TransportConstants.DEFAULT_SERVLET_PATH, configuration);
if (httpEnabled) {
httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration);
httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration);
httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID, TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID, configuration);
httpHeaders = new HashMap<>();
for (Map.Entry<String, Object> header : configuration.entrySet()) {
Expand All @@ -378,8 +372,6 @@ public NettyConnector(final Map<String, Object> configuration,
}
}
} else {
httpMaxClientIdleTime = 0;
httpClientIdleScanPeriod = -1;
httpRequiresSessionId = false;
httpHeaders = Collections.emptyMap();
}
Expand Down Expand Up @@ -1120,14 +1112,6 @@ public boolean awaitHandshake() {

public class HttpHandler extends ChannelDuplexHandler {

private Channel channel;

private long lastSendTime = 0;

private boolean waitingGet = false;

private HttpIdleTimer task;

private final String url;

private final FutureLatch handShakeFuture = new FutureLatch();
Expand All @@ -1148,26 +1132,6 @@ public Map<String, String> getHeaders() {
return headers;
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channel = ctx.channel();
if (httpClientIdleScanPeriod > 0) {
task = new HttpIdleTimer();
java.util.concurrent.Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, httpClientIdleScanPeriod, httpClientIdleScanPeriod, TimeUnit.MILLISECONDS);
task.setFuture(future);
}
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
if (task != null) {
task.close();
}

super.channelInactive(ctx);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
FullHttpResponse response = (FullHttpResponse) msg;
Expand All @@ -1183,12 +1147,12 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
active = true;
handShakeFuture.run();
}
waitingGet = false;
ctx.fireChannelRead(response.content());
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
Object toSend = msg;
if (msg instanceof ByteBuf buf) {
if (httpRequiresSessionId && !active) {
if (handshaking) {
Expand All @@ -1208,48 +1172,9 @@ public void write(final ChannelHandlerContext ctx, final Object msg, ChannelProm
httpRequest.headers().add(HttpHeaderNames.COOKIE, cookie);
}
httpRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
ctx.write(httpRequest, promise);
lastSendTime = System.currentTimeMillis();
} else {
ctx.write(msg, promise);
lastSendTime = System.currentTimeMillis();
}
}

private class HttpIdleTimer implements Runnable {

private boolean closed = false;

private java.util.concurrent.Future<?> future;

@Override
public synchronized void run() {
if (closed) {
return;
}

if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) {
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
httpRequest.headers().add(HttpHeaderNames.HOST, String.format("%s:%d", host, port));
for (Map.Entry<String, String> header : headers.entrySet()) {
httpRequest.headers().add(header.getKey(), header.getValue());
}
waitingGet = true;
channel.writeAndFlush(httpRequest);
}
}

public synchronized void setFuture(final java.util.concurrent.Future<?> future) {
this.future = future;
}

public void close() {
if (future != null) {
future.cancel(false);
}

closed = true;
toSend = httpRequest;
}
ctx.write(toSend, promise);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public class TransportConstants {

public static final String HTTP_ENABLED_PROP_NAME = "httpEnabled";

@Deprecated(forRemoval = true)
public static final String HTTP_CLIENT_IDLE_PROP_NAME = "httpClientIdleTime";

@Deprecated(forRemoval = true)
public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "httpClientIdleScanPeriod";

public static final String NETTY_HTTP_HEADER_PREFIX = "nettyHttpHeader.";

@Deprecated(forRemoval = true)
public static final String HTTP_RESPONSE_TIME_PROP_NAME = "httpResponseTime";

@Deprecated(forRemoval = true)
public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "httpServerScanPeriod";

public static final String HTTP_REQUIRES_SESSION_ID = "httpRequiresSessionId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,7 +42,6 @@
import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
import org.apache.activemq.artemis.core.remoting.impl.netty.HAProxyMessageEnforcer;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettySNIHostnameHandler;
Expand All @@ -68,8 +66,6 @@ public class ProtocolHandler {

private ScheduledExecutorService scheduledThreadPool;

private HttpKeepAliveRunnable httpKeepAliveRunnable;

private final List<String> websocketSubprotocolIds;

public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
Expand All @@ -95,16 +91,6 @@ public ChannelHandler getProtocolDecoder() {
return new ProtocolDecoder(true, false);
}

public HttpKeepAliveRunnable getHttpKeepAliveRunnable() {
return httpKeepAliveRunnable;
}

public void close() {
if (httpKeepAliveRunnable != null) {
httpKeepAliveRunnable.close();
}
}

public ProtocolManager getProtocol(String name) {
return this.protocolMap.get(name);
}
Expand Down Expand Up @@ -288,16 +274,8 @@ private void switchToHttp(ChannelHandlerContext ctx) {
p.addLast("http-decoder", new HttpRequestDecoder());
p.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("http-encoder", new HttpResponseEncoder());
//create it lazily if and when we need it
if (httpKeepAliveRunnable == null) {
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, nettyAcceptor.getConfiguration());
httpKeepAliveRunnable = new HttpKeepAliveRunnable();
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
httpKeepAliveRunnable.setFuture(future);
}
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
ctx.pipeline().addLast(HTTP_HANDLER, httpHandler);
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(ctx.channel());
p.addLast(HTTP_HANDLER, httpHandler);
p.addLast(new ProtocolDecoder(false, true));
p.remove(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -42,39 +41,26 @@
*/
public class HttpAcceptorHandler extends ChannelDuplexHandler {

private final BlockingQueue<ResponseHolder> responses = new LinkedBlockingQueue<>();
private final BlockingQueue<FullHttpResponse> responses = new LinkedBlockingQueue<>();

private final BlockingQueue<Runnable> delayedResponses = new LinkedBlockingQueue<>();

private final ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);

private final HttpKeepAliveRunnable httpKeepAliveTask;

private final long responseTime;

private Channel channel;

public HttpAcceptorHandler(final HttpKeepAliveRunnable httpKeepAliveTask, final long responseTime, Channel channel) {
public HttpAcceptorHandler(Channel channel) {
super();
this.responseTime = responseTime;
this.httpKeepAliveTask = httpKeepAliveTask;
this.channel = channel;
httpKeepAliveTask.registerKeepAliveHandler(this);
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
httpKeepAliveTask.unregisterKeepAliveHandler(this);
shutdown();
channel = null;
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
httpKeepAliveTask.unregisterKeepAliveHandler(this);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
Expand All @@ -83,7 +69,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
if (method.equals(HttpMethod.POST)) {
ctx.fireChannelRead(ReferenceCountUtil.retain(((FullHttpRequest) msg).content()));
// add a new response
responses.put(new ResponseHolder(System.currentTimeMillis() + responseTime, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
responses.put(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
ReferenceCountUtil.release(msg);
return;
}
Expand All @@ -100,21 +86,6 @@ public void write(final ChannelHandlerContext ctx, final Object msg, ChannelProm
}
}

public void keepAlive(final long time) {
// send some responses to catch up thus avoiding any timeout.
int lateResponses = 0;
for (ResponseHolder response : responses) {
if (response.timeReceived < time) {
lateResponses++;
} else {
break;
}
}
for (int i = 0; i < lateResponses; i++) {
executor.execute(new ResponseRunner());
}
}

/**
* this is prompted to delivery when a response is available in the response queue.
*/
Expand All @@ -132,32 +103,26 @@ final class ResponseRunner implements Runnable {
this.promise = promise;
}

ResponseRunner() {
bogusResponse = true;
buffer = Unpooled.buffer(0);
promise = channel.newPromise();
}

@Override
public void run() {
ResponseHolder responseHolder = null;
FullHttpResponse response = null;
do {
try {
responseHolder = responses.take();
response = responses.take();
} catch (InterruptedException e) {
if (executor.isShutdown())
return;
// otherwise ignore, we'll just try again
}
}
while (responseHolder == null);
while (response == null);
if (!bogusResponse) {
piggyBackResponses(responseHolder.response.content());
piggyBackResponses(response.content());
} else {
responseHolder.response.content().writeBytes(buffer);
response.content().writeBytes(buffer);
}
responseHolder.response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(responseHolder.response.content().readableBytes()));
channel.writeAndFlush(responseHolder.response, promise);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(response.content().readableBytes()));
channel.writeAndFlush(response, promise);

buffer.release();

Expand Down Expand Up @@ -199,20 +164,4 @@ public void shutdown() {
}
responses.clear();
}

/**
* a holder class so we know what time the request first arrived
*/
private static final class ResponseHolder {

final FullHttpResponse response;

final long timeReceived;

private ResponseHolder(final long timeReceived, final FullHttpResponse response) {
this.timeReceived = timeReceived;
this.response = response;
}
}

}
Loading