@@ -51,31 +51,36 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
51
51
# SSL key passphrase
52
52
config :ssl_key_passphrase , :validate => :password , :default => nil
53
53
54
+ ##
55
+ # @param socket [Socket]
56
+ # @param logger_context [#log_warn&#log_error]
54
57
class Client
55
- public
56
- def initialize ( socket , logger )
58
+ def initialize ( socket , logger_context )
57
59
@socket = socket
58
- @logger = logger
60
+ @logger_context = logger_context
59
61
@queue = Queue . new
60
62
end
61
63
62
- public
63
64
def run
64
65
loop do
65
66
begin
66
67
@socket . write ( @queue . pop )
67
68
rescue => e
68
- @logger . warn ( "tcp output exception" , :socket => @socket ,
69
- :exception => e )
69
+ @logger_context . log_warn ( "tcp output exception: socket write failed" , e , :socket => @socket &.to_s )
70
70
break
71
71
end
72
72
end
73
73
end # def run
74
74
75
- public
76
75
def write ( msg )
77
76
@queue . push ( msg )
78
77
end # def write
78
+
79
+ def close
80
+ @socket . close
81
+ rescue => e
82
+ @logger_context . log_warn 'socket close failed:' , e , socket : @socket &.to_s )
83
+ end
79
84
end # class Client
80
85
81
86
private
@@ -113,6 +118,8 @@ def register
113
118
if @ssl_enable
114
119
setup_ssl
115
120
end # @ssl_enable
121
+ @closed = Concurrent ::AtomicBoolean . new ( false )
122
+ @thread_no = Concurrent ::AtomicFixnum . new ( 0 )
116
123
117
124
if server?
118
125
@logger . info ( "Starting tcp output listener" , :address => "#{ @host } :#{ @port } " )
@@ -129,25 +136,28 @@ def register
129
136
@client_threads = [ ]
130
137
131
138
@accept_thread = Thread . new ( @server_socket ) do |server_socket |
139
+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|server_accept" )
132
140
loop do
141
+ break if @closed . value
133
142
Thread . start ( server_socket . accept ) do |client_socket |
134
143
# monkeypatch a 'peer' method onto the socket.
135
144
client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
136
145
@logger . debug ( "Accepted connection" , :client => client_socket . peer ,
137
146
:server => "#{ @host } :#{ @port } " )
138
- client = Client . new ( client_socket , @logger )
147
+ client = Client . new ( client_socket , self )
139
148
Thread . current [ :client ] = client
149
+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @thread_no . increment } " )
140
150
@client_threads << Thread . current
141
- client . run
151
+ client . run unless @closed . value
142
152
end
143
153
end
144
154
end
145
155
146
156
@codec . on_event do |event , payload |
157
+ @client_threads . select! ( &:alive? )
147
158
@client_threads . each do |client_thread |
148
159
client_thread [ :client ] . write ( payload )
149
160
end
150
- @client_threads . reject! { |t | !t . alive? }
151
161
end
152
162
else
153
163
client_socket = nil
@@ -163,8 +173,7 @@ def register
163
173
# Now send the payload
164
174
client_socket . syswrite ( payload ) if w . any?
165
175
rescue => e
166
- @logger . warn ( "tcp output exception" , :host => @host , :port => @port ,
167
- :exception => e , :backtrace => e . backtrace )
176
+ log_warn "client socket failed:" , e , host : @host , port : @port , socket : client_socket &.to_s
168
177
client_socket . close rescue nil
169
178
client_socket = nil
170
179
sleep @reconnect_interval
@@ -174,6 +183,18 @@ def register
174
183
end
175
184
end # def register
176
185
186
+ # @overload Base#close
187
+ def close
188
+ @closed . make_true
189
+ @server_socket . close rescue nil if @server_socket
190
+
191
+ return unless @client_threads
192
+ @client_threads . each do |thread |
193
+ client = thread [ :client ]
194
+ client . close rescue nil if client
195
+ end
196
+ end
197
+
177
198
private
178
199
def connect
179
200
begin
@@ -183,7 +204,7 @@ def connect
183
204
begin
184
205
client_socket . connect
185
206
rescue OpenSSL ::SSL ::SSLError => ssle
186
- @logger . error ( "SSL Error" , :exception => ssle , : backtrace => ssle . backtrace )
207
+ log_error 'connect ssl failure:' , ssle , backtrace : false
187
208
# NOTE(mrichar1): Hack to prevent hammering peer
188
209
sleep ( 5 )
189
210
raise
@@ -193,7 +214,7 @@ def connect
193
214
@logger . debug ( "Opened connection" , :client => "#{ client_socket . peer } " )
194
215
return client_socket
195
216
rescue StandardError => e
196
- @logger . error ( "Failed to connect: #{ e . message } " , :exception => e . class , :backtrace => e . backtrace )
217
+ log_error 'failed to connect:' , e
197
218
sleep @reconnect_interval
198
219
retry
199
220
end
@@ -208,4 +229,20 @@ def server?
208
229
def receive ( event )
209
230
@codec . encode ( event )
210
231
end # def receive
232
+
233
+ def pipeline_id
234
+ execution_context . pipeline_id || 'main'
235
+ end
236
+
237
+ def log_warn ( msg , e , backtrace : @logger . debug? , **details )
238
+ details = details . merge message : e . message , exception : e . class
239
+ details [ :backtrace ] = e . backtrace if backtrace
240
+ @logger . warn ( msg , details )
241
+ end
242
+
243
+ def log_error ( msg , e , backtrace : @logger . info? , **details )
244
+ details = details . merge message : e . message , exception : e . class
245
+ details [ :backtrace ] = e . backtrace if backtrace
246
+ @logger . error ( msg , details )
247
+ end
211
248
end # class LogStash::Outputs::Tcp
0 commit comments