diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 29764a0..1be029f 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -23,7 +23,7 @@ # #### Accepting log4j2 logs # # Log4j2 can send JSON over a socket, and we can use that combined with our tcp -# input to accept the logs. +# input to accept the logs. # # First, we need to configure your application to send logs in JSON over a # socket. The following log4j2.xml accomplishes this task. @@ -179,13 +179,13 @@ def close end def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket) + proxy_port, tbuf, socket, cert_subject = "") codec.decode(tbuf) do |event| if @proxy_protocol event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD) event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD) end - enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + enqueue_decorated(event, client_ip_address, client_address, client_port, socket, cert_subject) end end @@ -257,15 +257,15 @@ def handle_socket(socket) flush_codec(codec, client_ip_address, client_address, client_port, socket) end - def enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + def enqueue_decorated(event, client_ip_address, client_address, client_port, socket, cert_subject = "") event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(HOST_IP_FIELD, client_ip_address) unless event.get(HOST_IP_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) - event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? + event.set(SSLSUBJECT_FIELD, cert_subject) if !cert_subject.empty? && @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? decorate(event) @output_queue << event end - + def server? @mode == "server" end diff --git a/lib/logstash/inputs/tcp/decoder_impl.rb b/lib/logstash/inputs/tcp/decoder_impl.rb index dc52cad..789ce20 100644 --- a/lib/logstash/inputs/tcp/decoder_impl.rb +++ b/lib/logstash/inputs/tcp/decoder_impl.rb @@ -13,7 +13,7 @@ def initialize(codec, tcp) @first_read = true end - def decode(channel_addr, data) + def decode(channel_addr, data, cert_subject = "") bytes = Java::byte[data.readableBytes].new data.getBytes(0, bytes) data.release @@ -22,7 +22,7 @@ def decode(channel_addr, data) tbuf = init_first_read(channel_addr, tbuf) end @tcp.decode_buffer(@ip_address, @address, @port, @codec, - @proxy_address, @proxy_port, tbuf, nil) + @proxy_address, @proxy_port, tbuf, nil, cert_subject) end def copy diff --git a/src/main/java/org/logstash/tcp/Decoder.java b/src/main/java/org/logstash/tcp/Decoder.java index 2d7e4c8..6d44eb9 100644 --- a/src/main/java/org/logstash/tcp/Decoder.java +++ b/src/main/java/org/logstash/tcp/Decoder.java @@ -12,8 +12,9 @@ public interface Decoder { * Decode data coming from specific {@link SocketAddress} session. * @param key {@link SocketAddress} * @param message Data {@link ByteBuf} for this address + * @param peerSslCertSubject String The subject of the peer's SSL certificate */ - void decode(SocketAddress key, ByteBuf message); + void decode(SocketAddress key, ByteBuf message, String peerSslCertSubject); /** * Creates a copy of this decoder, that has all internal meta data cleared. diff --git a/src/main/java/org/logstash/tcp/InputLoop.java b/src/main/java/org/logstash/tcp/InputLoop.java index 292b634..818c49b 100644 --- a/src/main/java/org/logstash/tcp/InputLoop.java +++ b/src/main/java/org/logstash/tcp/InputLoop.java @@ -12,6 +12,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import javax.net.ssl.SSLPeerUnverifiedException; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.apache.logging.log4j.Logger; @@ -201,7 +203,17 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg); + String sslPeerCertSubject = ""; + SslHandler sslhandler = (SslHandler) ctx.channel().pipeline().get(SslHandler.class); + if (sslhandler != null) { + try { + sslPeerCertSubject = sslhandler.engine().getSession().getPeerCertificateChain()[0].getSubjectDN().getName(); + } catch(SSLPeerUnverifiedException e) { + } catch(Exception e) { + logger.error("Error when getting peer SSL certificate: " + e); + } + } + decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg, sslPeerCertSubject); } @Override