Skip to content

Commit ea5c5c9

Browse files
committed
fix server-mode shutdown bug, add baseline specs
1 parent 4428941 commit ea5c5c9

File tree

2 files changed

+125
-14
lines changed

2 files changed

+125
-14
lines changed

lib/logstash/outputs/tcp.rb

+19-13
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,15 @@ def register
168168
end
169169
end
170170
else
171-
client_socket = nil
171+
@client_socket = nil
172172
@codec.on_event do |event, payload|
173173
begin
174-
client_socket = connect unless client_socket
174+
# not threadsafe; this is why we require `concurrency: single`
175+
@client_socket = connect unless @client_socket
175176

176177
writable_io = nil
177178
while writable_io.nil? || writable_io.any? == false
178-
readable_io, writable_io, _ = IO.select([client_socket],[client_socket])
179+
readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket])
179180

180181
# don't expect any reads, but a readable socket might
181182
# mean the remote end closed, so read it and throw it away.
@@ -186,15 +187,15 @@ def register
186187
# Now send the payload
187188
@logger.trace("transmitting #{payload.bytesize} bytes", socket: @client_socket&.peer) if @logger.trace? && payload && !payload.empty?
188189
while payload && payload.bytesize > 0
189-
written_bytes_size = client_socket.syswrite(payload)
190+
written_bytes_size = @client_socket.syswrite(payload)
190191
payload = payload.byteslice(written_bytes_size..-1)
191192
@logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @client_socket&.peer) if @logger.trace?
192193
sleep 0.1 unless payload.empty?
193194
end
194195
rescue => e
195-
log_warn "client socket failed:", e, host: @host, port: @port, socket: client_socket&.peer
196-
client_socket.close rescue nil
197-
client_socket = nil
196+
log_warn "client socket failed:", e, host: @host, port: @port, socket: @client_socket&.peer
197+
@client_socket.close rescue nil
198+
@client_socket = nil
198199
sleep @reconnect_interval
199200
retry
200201
end
@@ -204,13 +205,18 @@ def register
204205

205206
# @overload Base#close
206207
def close
207-
@closed.make_true
208-
@server_socket.close rescue nil if @server_socket
208+
if server?
209+
# server-mode clean-up
210+
@closed.make_true
211+
@server_socket.shutdown rescue nil if @server_socket
209212

210-
return unless @client_threads
211-
@client_threads.each do |thread|
212-
client = thread[:client]
213-
client.close rescue nil if client
213+
@client_threads&.each do |thread|
214+
client = thread[:client]
215+
client.close rescue nil if client
216+
end
217+
else
218+
# client-mode clean-up
219+
@client_socket&.close
214220
end
215221
end
216222

spec/outputs/tcp_spec.rb

+106-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
require "flores/pki"
44

55
describe LogStash::Outputs::Tcp do
6-
subject { described_class.new(config) }
6+
subject(:instance) { described_class.new(config) }
77
let(:config) { {
88
"host" => "localhost",
99
"port" => 2000 + rand(3000),
@@ -73,4 +73,109 @@
7373
end
7474
end
7575
end
76+
77+
context 'client mode' do
78+
context 'transmitting data' do
79+
let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives
80+
81+
let(:server_host) { 'localhost' }
82+
let(:server_port) { server.addr[1] } # get actual since we bind to port 0
83+
84+
let!(:server) { TCPServer.new(server_host, 0) }
85+
86+
let(:config) do
87+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'client' }
88+
end
89+
90+
let(:event) { LogStash::Event.new({"hello" => "world"})}
91+
92+
subject(:instance) { described_class.new(config) }
93+
94+
before(:each) do
95+
# accepts ONE connection
96+
@server_socket_thread = Thread.start do
97+
client = server.accept
98+
io.write(client.read)
99+
end
100+
instance.register
101+
end
102+
103+
after(:each) do
104+
@server_socket_thread&.join
105+
end
106+
107+
it 'encodes and transmits data' do
108+
instance.receive(event)
109+
instance.close # release the connection
110+
sleep 1
111+
expect(io.string).to include('"hello"','"world"')
112+
end
113+
114+
context 'when payload is very large' do
115+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
116+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
117+
118+
119+
it 'encodes and transmits data' do
120+
instance.receive(event)
121+
instance.close # release the connection
122+
sleep 1
123+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
124+
end
125+
end
126+
end
127+
end
128+
129+
context 'server mode' do
130+
context 'transmitting data' do
131+
let(:server_host) { 'localhost' }
132+
let(:server_port) { Random.rand(1024...5000) }
133+
134+
let(:config) do
135+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'server' }
136+
end
137+
138+
subject(:instance) { described_class.new(config) }
139+
140+
before(:each) { instance.register } # start listener
141+
after(:each) { instance.close }
142+
143+
let(:event) { LogStash::Event.new({"hello" => "world"})}
144+
145+
context 'when one client is connected' do
146+
let(:io) { StringIO.new }
147+
let(:client_socket) { TCPSocket.new(server_host, server_port) }
148+
149+
before(:each) do
150+
@client_socket_thread = Thread.start { io.write client_socket.read }
151+
sleep 1 # wait for it to actually connect
152+
end
153+
154+
it 'encodes and transmits data' do
155+
instance.receive(event)
156+
157+
sleep 1 # wait for the event to get sent...
158+
instance.close # release the connection
159+
160+
@client_socket_thread.join(30) || fail('client failed to join')
161+
expect(io.string).to include('"hello"','"world"')
162+
end
163+
164+
context 'when payload is very large' do
165+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
166+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
167+
168+
it 'encodes and transmits data' do
169+
instance.receive(event)
170+
171+
sleep 1 # wait for the event to get sent...
172+
instance.close # release the connection
173+
174+
@client_socket_thread.join(30) || fail('client failed to join')
175+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
176+
end
177+
end
178+
end
179+
end
180+
end
76181
end

0 commit comments

Comments
 (0)