@@ -141,7 +141,7 @@ def register
141
141
break if @closed . value
142
142
Thread . start ( server_socket . accept ) do |client_socket |
143
143
# monkeypatch a 'peer' method onto the socket.
144
- client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
144
+ client_socket . extend ( ::LogStash ::Util ::SocketPeer )
145
145
@logger . debug ( "Accepted connection" , :client => client_socket . peer ,
146
146
:server => "#{ @host } :#{ @port } " )
147
147
client = Client . new ( client_socket , self )
@@ -164,14 +164,22 @@ def register
164
164
@codec . on_event do |event , payload |
165
165
begin
166
166
client_socket = connect unless client_socket
167
- r , w , e = IO . select ( [ client_socket ] , [ client_socket ] , [ client_socket ] , nil )
168
- # don't expect any reads, but a readable socket might
169
- # mean the remote end closed, so read it and throw it away.
170
- # we'll get an EOFError if it happens.
171
- client_socket . sysread ( 16384 ) if r . any?
167
+
168
+ writable_io = nil
169
+ while writable_io . nil? || writable_io . any? == false
170
+ readable_io , writable_io , _ = IO . select ( [ client_socket ] , [ client_socket ] )
171
+
172
+ # don't expect any reads, but a readable socket might
173
+ # mean the remote end closed, so read it and throw it away.
174
+ # we'll get an EOFError if it happens.
175
+ readable_io . each { |readable | readable . sysread ( 16384 ) }
176
+ end
172
177
173
178
# Now send the payload
174
- client_socket . syswrite ( payload ) if w . any?
179
+ while payload && payload . bytesize > 0
180
+ written_bytes_size = client_socket . syswrite ( payload )
181
+ payload = payload . byteslice ( written_bytes_size ..-1 )
182
+ end
175
183
rescue => e
176
184
log_warn "client socket failed:" , e , host : @host , port : @port , socket : client_socket &.to_s
177
185
client_socket . close rescue nil
@@ -210,7 +218,7 @@ def connect
210
218
raise
211
219
end
212
220
end
213
- client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
221
+ client_socket . extend ( ::LogStash ::Util ::SocketPeer )
214
222
@logger . debug ( "Opened connection" , :client => "#{ client_socket . peer } " )
215
223
return client_socket
216
224
rescue StandardError => e
0 commit comments