Skip to content

Commit

Permalink
cleanup cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Oct 17, 2024
1 parent 1cd89c5 commit 05ef656
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/account_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn main() {
for update in &subscription {
match update {
AccountSummaries::Summary(summary) => println!("{summary:?}"),
AccountSummaries::End => subscription.cancel().expect("cancel failed"),
AccountSummaries::End => subscription.cancel(),
}
}
}
2 changes: 1 addition & 1 deletion examples/readme_realtime_data_1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ fn main() {
println!("bar: {bar:?}");

// when your algorithm is done, cancel subscription
subscription.cancel().expect("cancel failed");
subscription.cancel();
}
}
4 changes: 2 additions & 2 deletions examples/readme_realtime_data_2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn main() {
println!("NVDA {}, AAPL {}", bar_nvda.close, bar_aapl.close);

// when your algorithm is done, cancel subscription
subscription_aapl.cancel().expect("cancel failed");
subscription_nvda.cancel().expect("cancel failed");
subscription_aapl.cancel();
subscription_nvda.cancel();
}
}
15 changes: 14 additions & 1 deletion src/accounts/decoders/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{accounts::AccountSummaryTags, server_versions};
use crate::{accounts::AccountSummaryTags, server_versions, testdata::responses};

#[test]
fn test_decode_positions() {
Expand Down Expand Up @@ -119,3 +119,16 @@ fn test_decode_account_summary() {
assert_eq!(account_summary.value, "FA", "account_summary.value");
assert_eq!(account_summary.currency, "", "account_summary.currency");
}

#[test]
fn test_decode_account_multi_value() {
let mut message = super::ResponseMessage::from_simple(responses::ACCOUNT_UPDATE_MULTI_CURRENCY);

let value = super::decode_account_multi_value(&mut message).expect("error decoding account multi value");

assert_eq!(value.account, "DU1234567", "value.account");
assert_eq!(value.model_code, "", "value.model_code");
assert_eq!(value.key, "Currency", "value.key");
assert_eq!(value.value, "USD", "value.value");
assert_eq!(value.currency, "USD", "value.currency");
}
35 changes: 35 additions & 0 deletions src/accounts/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{Arc, RwLock};

use crate::accounts::AccountUpdateMulti;
use crate::testdata::responses;
use crate::{accounts::AccountSummaryTags, server_versions, stubs::MessageBusStub, Client};

Expand Down Expand Up @@ -126,3 +127,37 @@ fn test_managed_accounts() {

assert_eq!(accounts, &["DU1234567", "DU7654321"]);
}

#[test]
fn test_account_updates_multi() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
response_messages: vec![
responses::ACCOUNT_UPDATE_MULTI_CASH_BALANCE.into(),
responses::ACCOUNT_UPDATE_MULTI_CURRENCY.into(),
responses::ACCOUNT_UPDATE_MULTI_END.into(),
],
});

let client = Client::stubbed(message_bus, server_versions::SIZE_RULES);

let account = Some("DU1234567");
let subscription = client.account_updates_multi(account, None).expect("request managed accounts failed");

let update = subscription.next().unwrap();
match update {
AccountUpdateMulti::AccountMultiValue(value) => {
assert_eq!(value.key, "CashBalance");
}
AccountUpdateMulti::End => {
panic!("value expected")
}
}

subscription.cancel();

let request_messages = client.message_bus.request_messages();

assert_eq!(request_messages[0].encode_simple(), "76|1|9000|DU1234567||1|");
assert_eq!(request_messages[1].encode_simple(), "77|1|9000|");
}
21 changes: 12 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use log::{debug, error, info};
use log::{debug, error, info, warn};
use time::OffsetDateTime;
use time_tz::Tz;

Expand Down Expand Up @@ -1180,26 +1180,31 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {
}

/// Cancel the subscription
pub fn cancel(&self) -> Result<(), Error> {
pub fn cancel(&self) {
if let Some(request_id) = self.request_id {
if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) {
self.client.message_bus.cancel_subscription(request_id, &message)?;
if let Err(e) = self.client.message_bus.cancel_subscription(request_id, &message) {
warn!("error cancelling subscription: {e}")
}
self.subscription.cancel();
}
} else if let Some(order_id) = self.order_id {
if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) {
self.client.message_bus.cancel_order_subscription(order_id, &message)?;
if let Err(e) = self.client.message_bus.cancel_order_subscription(order_id, &message) {
warn!("error cancelling order subscription: {e}")
}
self.subscription.cancel();
}
} else if let Some(message_type) = self.message_type {
if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) {
self.client.message_bus.cancel_shared_subscription(message_type, &message)?;
if let Err(e) = self.client.message_bus.cancel_shared_subscription(message_type, &message) {
warn!("error cancelling shared subscription: {e}")
}
self.subscription.cancel();
}
} else {
debug!("Could not determine cancel method")
}
Ok(())
}

pub fn iter(&self) -> SubscriptionIter<T> {
Expand All @@ -1217,9 +1222,7 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {

impl<'a, T: Subscribable<T>> Drop for Subscription<'a, T> {
fn drop(&mut self) {
if let Err(err) = self.cancel() {
error!("error cancelling subscription: {err}");
}
self.cancel();
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,12 +519,19 @@ impl ResponseMessage {
}

pub fn from(fields: &str) -> ResponseMessage {
let fields = fields.replace("|", "\0");
ResponseMessage {
i: 0,
fields: fields.split('\x00').map(|x| x.to_string()).collect(),
}
}

#[cfg(test)]
pub fn from_simple(fields: &str) -> ResponseMessage {
let fields = fields.replace("|", "\0");
Self::from(&fields)
}

pub fn skip(&mut self) {
self.i += 1;
}
Expand Down
5 changes: 5 additions & 0 deletions src/testdata/responses.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
pub const POSITION: &str = "61\03\0DU1234567\076792991\0TSLA\0STK\0\00.0\0\0\0NASDAQ\0USD\0TSLA\0NMS\0500\0196.77\0";
pub const MANAGED_ACCOUNT: &str = "15|1|DU1234567,DU7654321|";

pub const ACCOUNT_UPDATE_MULTI_CASH_BALANCE: &str = "73|1|9000|DU1234567||CashBalance|94629.71|USD||";
pub const ACCOUNT_UPDATE_MULTI_CURRENCY: &str = "73|1|9000|DU1234567||Currency|USD|USD||";
pub const ACCOUNT_UPDATE_MULTI_STOCK_MARKET_VALUE: &str = "73|1|9000|DU1234567||StockMarketValue|0.00|BASE||";
pub const ACCOUNT_UPDATE_MULTI_END: &str = "74|1|9000||";

0 comments on commit 05ef656

Please sign in to comment.