diff --git a/examples/account_summary.rs b/examples/account_summary.rs index fd6f1106..7f09ceec 100644 --- a/examples/account_summary.rs +++ b/examples/account_summary.rs @@ -15,7 +15,7 @@ fn main() { for update in &subscription { match update { AccountSummaries::Summary(summary) => println!("{summary:?}"), - AccountSummaries::End => subscription.cancel().expect("cancel failed"), + AccountSummaries::End => subscription.cancel(), } } } diff --git a/examples/readme_realtime_data_1.rs b/examples/readme_realtime_data_1.rs index db6800f3..c3538c97 100644 --- a/examples/readme_realtime_data_1.rs +++ b/examples/readme_realtime_data_1.rs @@ -19,6 +19,6 @@ fn main() { println!("bar: {bar:?}"); // when your algorithm is done, cancel subscription - subscription.cancel().expect("cancel failed"); + subscription.cancel(); } } diff --git a/examples/readme_realtime_data_2.rs b/examples/readme_realtime_data_2.rs index 8f6ee8b5..006bf5e6 100644 --- a/examples/readme_realtime_data_2.rs +++ b/examples/readme_realtime_data_2.rs @@ -22,7 +22,7 @@ fn main() { println!("NVDA {}, AAPL {}", bar_nvda.close, bar_aapl.close); // when your algorithm is done, cancel subscription - subscription_aapl.cancel().expect("cancel failed"); - subscription_nvda.cancel().expect("cancel failed"); + subscription_aapl.cancel(); + subscription_nvda.cancel(); } } diff --git a/src/accounts/decoders/tests.rs b/src/accounts/decoders/tests.rs index 9b640832..880fdfb9 100644 --- a/src/accounts/decoders/tests.rs +++ b/src/accounts/decoders/tests.rs @@ -1,4 +1,4 @@ -use crate::{accounts::AccountSummaryTags, server_versions}; +use crate::{accounts::AccountSummaryTags, server_versions, testdata::responses}; #[test] fn test_decode_positions() { @@ -119,3 +119,16 @@ fn test_decode_account_summary() { assert_eq!(account_summary.value, "FA", "account_summary.value"); assert_eq!(account_summary.currency, "", "account_summary.currency"); } + +#[test] +fn test_decode_account_multi_value() { + let mut message = super::ResponseMessage::from_simple(responses::ACCOUNT_UPDATE_MULTI_CURRENCY); + + let value = super::decode_account_multi_value(&mut message).expect("error decoding account multi value"); + + assert_eq!(value.account, "DU1234567", "value.account"); + assert_eq!(value.model_code, "", "value.model_code"); + assert_eq!(value.key, "Currency", "value.key"); + assert_eq!(value.value, "USD", "value.value"); + assert_eq!(value.currency, "USD", "value.currency"); +} diff --git a/src/accounts/tests.rs b/src/accounts/tests.rs index b9d1a21c..c0e066c3 100644 --- a/src/accounts/tests.rs +++ b/src/accounts/tests.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, RwLock}; +use crate::accounts::AccountUpdateMulti; use crate::testdata::responses; use crate::{accounts::AccountSummaryTags, server_versions, stubs::MessageBusStub, Client}; @@ -126,3 +127,37 @@ fn test_managed_accounts() { assert_eq!(accounts, &["DU1234567", "DU7654321"]); } + +#[test] +fn test_account_updates_multi() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + response_messages: vec![ + responses::ACCOUNT_UPDATE_MULTI_CASH_BALANCE.into(), + responses::ACCOUNT_UPDATE_MULTI_CURRENCY.into(), + responses::ACCOUNT_UPDATE_MULTI_END.into(), + ], + }); + + let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); + + let account = Some("DU1234567"); + let subscription = client.account_updates_multi(account, None).expect("request managed accounts failed"); + + let update = subscription.next().unwrap(); + match update { + AccountUpdateMulti::AccountMultiValue(value) => { + assert_eq!(value.key, "CashBalance"); + } + AccountUpdateMulti::End => { + panic!("value expected") + } + } + + subscription.cancel(); + + let request_messages = client.message_bus.request_messages(); + + assert_eq!(request_messages[0].encode_simple(), "76|1|9000|DU1234567||1|"); + assert_eq!(request_messages[1].encode_simple(), "77|1|9000|"); +} diff --git a/src/client.rs b/src/client.rs index 0a900b08..e21439b1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::Duration; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use time::OffsetDateTime; use time_tz::Tz; @@ -1180,26 +1180,31 @@ impl<'a, T: Subscribable> Subscription<'a, T> { } /// Cancel the subscription - pub fn cancel(&self) -> Result<(), Error> { + pub fn cancel(&self) { if let Some(request_id) = self.request_id { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_subscription(request_id, &message)?; + if let Err(e) = self.client.message_bus.cancel_subscription(request_id, &message) { + warn!("error cancelling subscription: {e}") + } self.subscription.cancel(); } } else if let Some(order_id) = self.order_id { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_order_subscription(order_id, &message)?; + if let Err(e) = self.client.message_bus.cancel_order_subscription(order_id, &message) { + warn!("error cancelling order subscription: {e}") + } self.subscription.cancel(); } } else if let Some(message_type) = self.message_type { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_shared_subscription(message_type, &message)?; + if let Err(e) = self.client.message_bus.cancel_shared_subscription(message_type, &message) { + warn!("error cancelling shared subscription: {e}") + } self.subscription.cancel(); } } else { debug!("Could not determine cancel method") } - Ok(()) } pub fn iter(&self) -> SubscriptionIter { @@ -1217,9 +1222,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { impl<'a, T: Subscribable> Drop for Subscription<'a, T> { fn drop(&mut self) { - if let Err(err) = self.cancel() { - error!("error cancelling subscription: {err}"); - } + self.cancel(); } } diff --git a/src/messages.rs b/src/messages.rs index 5ce3c057..8d313a03 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -519,12 +519,19 @@ impl ResponseMessage { } pub fn from(fields: &str) -> ResponseMessage { + let fields = fields.replace("|", "\0"); ResponseMessage { i: 0, fields: fields.split('\x00').map(|x| x.to_string()).collect(), } } + #[cfg(test)] + pub fn from_simple(fields: &str) -> ResponseMessage { + let fields = fields.replace("|", "\0"); + Self::from(&fields) + } + pub fn skip(&mut self) { self.i += 1; } diff --git a/src/testdata/responses.rs b/src/testdata/responses.rs index bb3e6afa..d3cdd09f 100644 --- a/src/testdata/responses.rs +++ b/src/testdata/responses.rs @@ -1,2 +1,7 @@ pub const POSITION: &str = "61\03\0DU1234567\076792991\0TSLA\0STK\0\00.0\0\0\0NASDAQ\0USD\0TSLA\0NMS\0500\0196.77\0"; pub const MANAGED_ACCOUNT: &str = "15|1|DU1234567,DU7654321|"; + +pub const ACCOUNT_UPDATE_MULTI_CASH_BALANCE: &str = "73|1|9000|DU1234567||CashBalance|94629.71|USD||"; +pub const ACCOUNT_UPDATE_MULTI_CURRENCY: &str = "73|1|9000|DU1234567||Currency|USD|USD||"; +pub const ACCOUNT_UPDATE_MULTI_STOCK_MARKET_VALUE: &str = "73|1|9000|DU1234567||StockMarketValue|0.00|BASE||"; +pub const ACCOUNT_UPDATE_MULTI_END: &str = "74|1|9000||";