Skip to content

Commit

Permalink
fixed shudown
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Oct 17, 2024
1 parent 7dde8be commit 1cd89c5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
5 changes: 1 addition & 4 deletions src/accounts/encoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ pub(crate) fn encode_cancel_account_updates_multi(_server_version: i32, request_

let mut message = RequestMessage::new();

message.push_field(&OutgoingMessages::RequestAccountUpdatesMulti);
message.push_field(&OutgoingMessages::CancelAccountUpdatesMulti);
message.push_field(&VERSION);
message.push_field(&request_id);
message.push_field(&""); // account
message.push_field(&""); // model code
message.push_field(&false); // subscribe

Ok(message)
}
Expand Down
2 changes: 2 additions & 0 deletions src/stubs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ impl MessageBus for MessageBusStub {
Ok(())
}

fn ensure_shutdown(&self) {}

// fn process_messages(&mut self, _server_version: i32) -> Result<(), Error> {
// Ok(())
// }
Expand Down
39 changes: 26 additions & 13 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) trait MessageBus: Send + Sync {

fn cancel_order_subscription(&self, request_id: i32, packet: &RequestMessage) -> Result<(), Error>;

fn ensure_shutdown(&self) {}
fn ensure_shutdown(&self);

// Testing interface. Tracks requests sent messages when Bus is stubbed.
#[cfg(test)]
Expand Down Expand Up @@ -172,6 +172,8 @@ impl TcpMessageBus {
}

fn request_shutdown(&self) {
debug!("shutdown requested");

self.requests.notify_all(&Response::Disconnected);
self.orders.notify_all(&Response::Disconnected);

Expand Down Expand Up @@ -218,6 +220,13 @@ impl TcpMessageBus {
backoff.reset();
retry_attempt = 0;
}
Err(Error::Io(e)) if e.kind() == ErrorKind::WouldBlock => {
if message_bus.is_shutting_down() {
debug!("dispatcher thread exiting");
return;
}
thread::sleep(Duration::from_millis(1));
}
Err(Error::Io(e)) if RECONNECT_ERRORS.contains(&e.kind()) => {
error!("error reading packet: {:?}", e);
// reset hashes
Expand Down Expand Up @@ -248,10 +257,6 @@ impl TcpMessageBus {
return;
}
};

if message_bus.is_shutting_down() {
return;
}
}
})
}
Expand Down Expand Up @@ -381,20 +386,23 @@ impl TcpMessageBus {
fn start_cleanup_thread(self: &Arc<Self>) -> JoinHandle<()> {
let message_bus = Arc::clone(self);

thread::spawn(move || loop {
thread::spawn(move || {
let signal_recv = message_bus.signals_recv.clone();

for signal in &signal_recv {
match signal {
Signal::Request(request_id) => {
message_bus.clean_request(request_id);
}
Signal::Order(order_id) => {
message_bus.clean_order(order_id);
loop {
if let Ok(signal) = signal_recv.recv_timeout(Duration::from_secs(1)) {
match signal {
Signal::Request(request_id) => {
message_bus.clean_request(request_id);
}
Signal::Order(order_id) => {
message_bus.clean_order(order_id);
}
}
}

if message_bus.is_shutting_down() {
debug!("cleanup thread exiting");
return;
}
}
Expand Down Expand Up @@ -510,6 +518,7 @@ impl MessageBus for TcpMessageBus {
}

fn ensure_shutdown(&self) {
self.request_shutdown();
self.join();
}
}
Expand Down Expand Up @@ -809,6 +818,8 @@ impl Connection {
let reader = TcpStream::connect(connection_url)?;
let writer = reader.try_clone()?;

reader.set_read_timeout(Some(Duration::from_secs(1)))?;

let connection = Self {
client_id,
connection_url: connection_url.into(),
Expand Down Expand Up @@ -845,6 +856,8 @@ impl Connection {
let mut writer = self.writer.lock()?;

*reader = stream.try_clone()?;
reader.set_read_timeout(Some(Duration::from_secs(1)))?;

*writer = stream;
}

Expand Down

0 comments on commit 1cd89c5

Please sign in to comment.