Skip to content

Naming netty threads. #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 7.0.1
- Name netty threads with plugin id and their purpose [229](https://github.com/logstash-plugins/logstash-input-tcp/pull/229)

## 7.0.0
- SSL settings that were marked deprecated in version `6.4.0` are now marked obsolete, and will prevent the plugin from starting.
- These settings are:
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def register
validate_ssl_config!

if server?
@loop = InputLoop.new(@host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
end
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-input-tcp.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Gem::Specification.new do |s|
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
s.email = '[email protected]'
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
s.homepage = "https://elastic.co/logstash"
s.platform = "java"
s.require_paths = ["lib", "vendor/jar-dependencies"]

Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import static org.logstash.tcp.util.DaemonThreadFactory.daemonThreadFactory;

/**
* Plain TCP Server Implementation.
*/
Expand Down Expand Up @@ -66,13 +68,13 @@ public final class InputLoop implements Runnable, Closeable {
* @param decoder {@link Decoder} provided by Jruby
* @param keepAlive set to true to instruct the socket to issue TCP keep alive
*/
public InputLoop(final String host, final int port, final Decoder decoder, final boolean keepAlive,
public InputLoop(final String id, final String host, final int port, final Decoder decoder, final boolean keepAlive,
final SslContext sslContext) {
this.sslContext = sslContext;
this.host = host;
this.port = port;
worker = new NioEventLoopGroup();
boss = new NioEventLoopGroup(1);
boss = new NioEventLoopGroup(1, daemonThreadFactory(id + "-bossGroup"));
worker = new NioEventLoopGroup(daemonThreadFactory(id + "-workGroup"));
serverBootstrap = new ServerBootstrap().group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
Expand Down Expand Up @@ -152,7 +154,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

/**
* Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed.
* Listeners that flushes the JRuby supplied {@link Decoder} when the socket is closed.
*/
private static final class FlushOnCloseListener implements GenericFutureListener<Future<Void>> {

Expand Down Expand Up @@ -199,7 +201,7 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
this.decoder = decoder;
}

// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remote address field
// corresponding interface updated
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/logstash/tcp/util/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.logstash.tcp.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
group = Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new DaemonThreadFactory(namePrefix);
}

}
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.0.0
7.0.1