1919 */
2020
2121use std:: io:: { self , Read , Write } ;
22- use std:: time:: Duration ;
23-
24- use mio:: net:: TcpStream ;
25- use mio:: { Events , Interest , Poll } ;
22+ use std:: net:: TcpStream ;
23+ use std:: time:: { Duration , Instant } ;
2624
2725use crate :: BoxResult ;
2826
2927/// how long to wait for keepalive events
3028/// the communications channels typically exchange data every second, so 2s is reasonable to avoid excess noise
31- pub const KEEPALIVE_DURATION : Duration = Duration :: from_secs ( 2 ) ;
29+ #[ cfg( unix) ]
30+ pub const KEEPALIVE_DURATION : Duration = Duration :: from_secs ( 3 ) ;
3231
3332/// how long to block on polling operations
3433const POLL_TIMEOUT : Duration = Duration :: from_millis ( 50 ) ;
3534
35+ /// how long to allow for send-operations to complete
36+ const SEND_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
37+
3638/// sends JSON data over a client-server communications stream
3739pub fn send ( stream : & mut TcpStream , message : & serde_json:: Value ) -> BoxResult < ( ) > {
40+ stream. set_write_timeout ( Some ( POLL_TIMEOUT ) ) ?;
41+
3842 let serialised_message = serde_json:: to_vec ( message) ?;
3943
4044 log:: debug!(
@@ -46,61 +50,72 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
4650 let mut output_buffer = vec ! [ 0_u8 ; serialised_message. len( ) + 2 ] ;
4751 output_buffer[ ..2 ] . copy_from_slice ( & ( serialised_message. len ( ) as u16 ) . to_be_bytes ( ) ) ;
4852 output_buffer[ 2 ..] . copy_from_slice ( serialised_message. as_slice ( ) ) ;
49- Ok ( stream. write_all ( & output_buffer) ?)
53+
54+ let start = Instant :: now ( ) ;
55+ let mut total_bytes_written: usize = 0 ;
56+
57+ while start. elapsed ( ) < SEND_TIMEOUT {
58+ match stream. write ( & output_buffer[ total_bytes_written..] ) {
59+ Ok ( bytes_written) => {
60+ total_bytes_written += bytes_written;
61+ if total_bytes_written == output_buffer. len ( ) {
62+ return Ok ( ( ) ) ;
63+ }
64+ }
65+ Err ( ref e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
66+ // unable to write at the moment; keep trying until the full timeout is reached
67+ continue ;
68+ }
69+ Err ( e) => {
70+ return Err ( Box :: new ( e) ) ;
71+ }
72+ }
73+ }
74+ Err ( Box :: new ( simple_error:: simple_error!(
75+ "timed out while attempting to send status-message to {}" ,
76+ stream. peer_addr( ) ?
77+ ) ) )
5078}
5179
5280/// receives the length-count of a pending message over a client-server communications stream
5381fn receive_length ( stream : & mut TcpStream , alive_check : fn ( ) -> bool , results_handler : & mut dyn FnMut ( ) -> BoxResult < ( ) > ) -> BoxResult < u16 > {
54- let mio_token = crate :: get_global_token ( ) ;
55- let mut poll = Poll :: new ( ) ?;
56- poll. registry ( ) . register ( stream, mio_token, Interest :: READABLE ) ?;
57- let mut events = Events :: with_capacity ( 1 ) ; //only interacting with one stream
82+ stream. set_read_timeout ( Some ( POLL_TIMEOUT ) ) . expect ( "unable to set TCP read-timeout" ) ;
5883
5984 let mut length_bytes_read = 0 ;
6085 let mut length_spec: [ u8 ; 2 ] = [ 0 ; 2 ] ;
61- let result: BoxResult < u16 > = ' exiting: loop {
62- if !alive_check ( ) {
63- break ' exiting Ok ( 0 ) ;
64- }
86+ while alive_check ( ) {
6587 //waiting to find out how long the next message is
6688 results_handler ( ) ?; //send any outstanding results between cycles
67- poll. poll ( & mut events, Some ( POLL_TIMEOUT ) ) ?;
68- for event in events. iter ( ) {
69- event. token ( ) ;
70- loop {
71- let size = match stream. read ( & mut length_spec[ length_bytes_read..] ) {
72- Ok ( size) => size,
73- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => {
74- //nothing left to process
75- break ;
76- }
77- Err ( e) => {
78- break ' exiting Err ( Box :: new ( e) ) ;
79- }
80- } ;
81-
82- if size == 0 {
83- if alive_check ( ) {
84- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
85- } else {
86- //shutting down; a disconnect is expected
87- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
88- }
89- }
9089
91- length_bytes_read += size;
92- if length_bytes_read == 2 {
93- let length = u16:: from_be_bytes ( length_spec) ;
94- log:: debug!( "received length-spec of {} from {}" , length, stream. peer_addr( ) ?) ;
95- break ' exiting Ok ( length) ;
96- } else {
97- log:: debug!( "received partial length-spec from {}" , stream. peer_addr( ) ?) ;
98- }
90+ let size = match stream. read ( & mut length_spec[ length_bytes_read..] ) {
91+ Ok ( size) => size,
92+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
93+ // nothing available to process
94+ continue ;
95+ }
96+ Err ( e) => {
97+ return Err ( Box :: new ( e) ) ;
98+ }
99+ } ;
100+ if size == 0 {
101+ if alive_check ( ) {
102+ return Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
103+ } else {
104+ // shutting down; a disconnect is expected
105+ return Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
99106 }
100107 }
101- } ;
102- poll. registry ( ) . deregister ( stream) ?;
103- result
108+
109+ length_bytes_read += size;
110+ if length_bytes_read == 2 {
111+ let length = u16:: from_be_bytes ( length_spec) ;
112+ log:: debug!( "received length-spec of {} from {}" , length, stream. peer_addr( ) ?) ;
113+ return Ok ( length) ;
114+ } else {
115+ log:: debug!( "received partial length-spec from {}" , stream. peer_addr( ) ?) ;
116+ }
117+ }
118+ Err ( Box :: new ( simple_error:: simple_error!( "system shutting down" ) ) )
104119}
105120
106121/// receives the data-value of a pending message over a client-server communications stream
@@ -110,62 +125,49 @@ fn receive_payload(
110125 results_handler : & mut dyn FnMut ( ) -> BoxResult < ( ) > ,
111126 length : u16 ,
112127) -> BoxResult < serde_json:: Value > {
113- let mio_token = crate :: get_global_token ( ) ;
114- let mut poll = Poll :: new ( ) ?;
115- poll. registry ( ) . register ( stream, mio_token, Interest :: READABLE ) ?;
116- let mut events = Events :: with_capacity ( 1 ) ; //only interacting with one stream
128+ stream. set_read_timeout ( Some ( POLL_TIMEOUT ) ) . expect ( "unable to set TCP read-timeout" ) ;
117129
118130 let mut bytes_read = 0 ;
119131 let mut buffer = vec ! [ 0_u8 ; length. into( ) ] ;
120- let result: BoxResult < serde_json:: Value > = ' exiting: loop {
121- if !alive_check ( ) {
122- break ' exiting Ok ( serde_json:: from_slice ( & buffer[ 0 ..0 ] ) ?) ;
123- }
132+ while alive_check ( ) {
124133 //waiting to receive the payload
125134 results_handler ( ) ?; //send any outstanding results between cycles
126- poll. poll ( & mut events, Some ( POLL_TIMEOUT ) ) ?;
127- for event in events. iter ( ) {
128- event. token ( ) ;
129- loop {
130- let size = match stream. read ( & mut buffer[ bytes_read..] ) {
131- Ok ( size) => size,
132- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => {
133- // nothing left to process
134- break ;
135- }
136- Err ( e) => {
137- break ' exiting Err ( Box :: new ( e) ) ;
138- }
139- } ;
140-
141- if size == 0 {
142- if alive_check ( ) {
143- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
144- } else {
145- // shutting down; a disconnect is expected
146- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
147- }
148- }
149135
150- bytes_read += size;
151- if bytes_read == length as usize {
152- match serde_json:: from_slice ( & buffer) {
153- Ok ( v) => {
154- log:: debug!( "received {:?} from {}" , v, stream. peer_addr( ) ?) ;
155- break ' exiting Ok ( v) ;
156- }
157- Err ( e) => {
158- break ' exiting Err ( Box :: new ( e) ) ;
159- }
160- }
161- } else {
162- log:: debug!( "received partial payload from {}" , stream. peer_addr( ) ?) ;
136+ let size = match stream. read ( & mut buffer[ bytes_read..] ) {
137+ Ok ( size) => size,
138+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
139+ // nothing available to process
140+ continue ;
141+ }
142+ Err ( e) => {
143+ return Err ( Box :: new ( e) ) ;
144+ }
145+ } ;
146+ if size == 0 {
147+ if alive_check ( ) {
148+ return Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
149+ } else {
150+ //shutting down; a disconnect is expected
151+ return Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
152+ }
153+ }
154+
155+ bytes_read += size;
156+ if bytes_read == length as usize {
157+ match serde_json:: from_slice ( & buffer) {
158+ Ok ( v) => {
159+ log:: debug!( "received {:?} from {}" , v, stream. peer_addr( ) ?) ;
160+ return Ok ( v) ;
161+ }
162+ Err ( e) => {
163+ return Err ( Box :: new ( e) ) ;
163164 }
164165 }
166+ } else {
167+ log:: debug!( "received partial payload from {}" , stream. peer_addr( ) ?) ;
165168 }
166- } ;
167- poll. registry ( ) . deregister ( stream) ?;
168- result
169+ }
170+ Err ( Box :: new ( simple_error:: simple_error!( "system shutting down" ) ) )
169171}
170172
171173/// handles the full process of retrieving a message from a client-server communications stream
0 commit comments