diff --git a/CHANGELOG.md b/CHANGELOG.md index 07e0415..b720416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 7.0.1 + - Name netty threads with plugin id and their purpose [229](https://github.com/logstash-plugins/logstash-input-tcp/pull/229) + ## 7.0.0 - SSL settings that were marked deprecated in version `6.4.0` are now marked obsolete, and will prevent the plugin from starting. - These settings are: diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 2d41bfa..fafcd81 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -177,7 +177,7 @@ def register validate_ssl_config! if server? - @loop = InputLoop.new(@host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context) + @loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context) end end diff --git a/logstash-input-tcp.gemspec b/logstash-input-tcp.gemspec index 1b8407c..76936a7 100644 --- a/logstash-input-tcp.gemspec +++ b/logstash-input-tcp.gemspec @@ -6,7 +6,7 @@ Gem::Specification.new do |s| s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] s.email = 'info@elastic.co' - s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" + s.homepage = "https://elastic.co/logstash" s.platform = "java" s.require_paths = ["lib", "vendor/jar-dependencies"] diff --git a/src/main/java/org/logstash/tcp/InputLoop.java b/src/main/java/org/logstash/tcp/InputLoop.java index 3c7a89d..86342d1 100644 --- a/src/main/java/org/logstash/tcp/InputLoop.java +++ b/src/main/java/org/logstash/tcp/InputLoop.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import static org.logstash.tcp.util.DaemonThreadFactory.daemonThreadFactory; + /** * Plain TCP Server Implementation. */ @@ -66,13 +68,13 @@ public final class InputLoop implements Runnable, Closeable { * @param decoder {@link Decoder} provided by Jruby * @param keepAlive set to true to instruct the socket to issue TCP keep alive */ - public InputLoop(final String host, final int port, final Decoder decoder, final boolean keepAlive, + public InputLoop(final String id, final String host, final int port, final Decoder decoder, final boolean keepAlive, final SslContext sslContext) { this.sslContext = sslContext; this.host = host; this.port = port; - worker = new NioEventLoopGroup(); - boss = new NioEventLoopGroup(1); + boss = new NioEventLoopGroup(1, daemonThreadFactory(id + "-bossGroup")); + worker = new NioEventLoopGroup(daemonThreadFactory(id + "-workGroup")); serverBootstrap = new ServerBootstrap().group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) @@ -152,7 +154,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } /** - * Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed. + * Listeners that flushes the JRuby supplied {@link Decoder} when the socket is closed. */ private static final class FlushOnCloseListener implements GenericFutureListener> { @@ -199,7 +201,7 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter { this.decoder = decoder; } - // 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field + // 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remote address field // corresponding interface updated @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { diff --git a/src/main/java/org/logstash/tcp/util/DaemonThreadFactory.java b/src/main/java/org/logstash/tcp/util/DaemonThreadFactory.java new file mode 100644 index 0000000..be7bfca --- /dev/null +++ b/src/main/java/org/logstash/tcp/util/DaemonThreadFactory.java @@ -0,0 +1,30 @@ +package org.logstash.tcp.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + group = Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", + 0); + t.setDaemon(true); + return t; + } + + public static ThreadFactory daemonThreadFactory(String namePrefix) { + return new DaemonThreadFactory(namePrefix); + } + +} \ No newline at end of file diff --git a/version b/version index 66ce77b..9fe9ff9 100644 --- a/version +++ b/version @@ -1 +1 @@ -7.0.0 +7.0.1