From 12a86f483490a2182bc420de88e73c2c66766057 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Sun, 13 Oct 2024 00:08:53 -0700 Subject: [PATCH 01/12] add structs for account updates --- examples/managed_accounts.rs | 2 +- src/accounts.rs | 51 ++++++++++++++++++++++++++++++++++++ src/accounts/tests.rs | 6 ++--- src/testdata/responses.rs | 2 +- src/transport.rs | 9 +++---- 5 files changed, 59 insertions(+), 11 deletions(-) diff --git a/examples/managed_accounts.rs b/examples/managed_accounts.rs index e56c26bc..08f6cc7f 100644 --- a/examples/managed_accounts.rs +++ b/examples/managed_accounts.rs @@ -5,4 +5,4 @@ fn main() { let accounts = client.managed_accounts().expect("error requesting managed accounts"); println!("managed accounts: {accounts:?}") -} \ No newline at end of file +} diff --git a/src/accounts.rs b/src/accounts.rs index c4d70197..d4ead9bd 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -272,6 +272,57 @@ pub struct FamilyCode { pub family_code: String, } +/// Account's information, portfolio and last update time +#[allow(clippy::large_enum_variant)] +pub enum AccountUpdates { + /// Receives the subscribed account's information. + Value(AccountValue), + /// Receives the subscribed account's portfolio. + Portfolio(AccountPortfolio), + /// Receives the last time on which the account was updated. + Time(AccountTime), + /// Notifies when all the account’s information has finished. + End, +} + +/// A value of subscribed account's information. +pub struct AccountValue { + /// The value being updated. + pub key: String, + /// Current value + pub value: String, + /// The currency inn which the value is expressed. + pub currency: String, + /// The account identifier. + pub account: String, +} + +/// Subscribed account's portfolio. +pub struct AccountPortfolio { + /// The Contract for which a position is held. + pub contract: Contract, + /// The number of positions held. + pub position: f64, + /// The instrument's unitary price + pub market_price: f64, + /// Total market value of the instrument. + pub market_value: f64, + /// Average cost of the overall position. + pub average_cost: f64, + /// Daily unrealized profit and loss on the position. + pub unrealized_pnl: f64, + /// Daily realized profit and loss on the position. + pub realized_pnl: f64, + /// Account identifier for the update. + pub account: String, +} + +/// Last time at which the account was updated. +pub struct AccountTime { + /// The last update system time. + pub timestamp: String, +} + // Subscribes to position updates for all accessible accounts. // All positions sent initially, and then only updates as positions change. pub(crate) fn positions(client: &Client) -> Result, Error> { diff --git a/src/accounts/tests.rs b/src/accounts/tests.rs index 01b029c3..70c267d9 100644 --- a/src/accounts/tests.rs +++ b/src/accounts/tests.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Mutex, RwLock}; -use crate::{accounts::AccountSummaryTags, server_versions, stubs::MessageBusStub, Client}; use crate::testdata::responses; +use crate::{accounts::AccountSummaryTags, server_versions, stubs::MessageBusStub, Client}; #[test] fn test_pnl() { @@ -117,9 +117,7 @@ fn test_account_summary() { fn test_managed_accounts() { let message_bus = Arc::new(Mutex::new(MessageBusStub { request_messages: RwLock::new(vec![]), - response_messages: vec![ - responses::MANAGED_ACCOUNT.into(), - ], + response_messages: vec![responses::MANAGED_ACCOUNT.into()], })); let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); diff --git a/src/testdata/responses.rs b/src/testdata/responses.rs index 067c3511..bb3e6afa 100644 --- a/src/testdata/responses.rs +++ b/src/testdata/responses.rs @@ -1,2 +1,2 @@ pub const POSITION: &str = "61\03\0DU1234567\076792991\0TSLA\0STK\0\00.0\0\0\0NASDAQ\0USD\0TSLA\0NMS\0500\0196.77\0"; -pub const MANAGED_ACCOUNT: &str = "15|1|DU1234567,DU7654321|"; \ No newline at end of file +pub const MANAGED_ACCOUNT: &str = "15|1|DU1234567,DU7654321|"; diff --git a/src/transport.rs b/src/transport.rs index 224dce7e..14311485 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -104,10 +104,9 @@ impl SharedChannels { // Get receiver for specified message type. Panics if receiver not found. pub fn get_receiver(&self, message_type: OutgoingMessages) -> Arc> { - let receiver = self - .receivers - .get(&message_type) - .unwrap_or_else(|| panic!("unsupported request message {message_type:?}. check mapping in SharedChannels::new() located in transport.rs")); + let receiver = self.receivers.get(&message_type).unwrap_or_else(|| { + panic!("unsupported request message {message_type:?}. check mapping in SharedChannels::new() located in transport.rs") + }); Arc::clone(receiver) } @@ -321,7 +320,7 @@ fn dispatch_message( } else { process_response(requests, orders, shared_channels, message); } - }, + } IncomingMessages::OrderStatus | IncomingMessages::OpenOrder | IncomingMessages::OpenOrderEnd From 1709958f80c2f0909f22c33de3139ecdfc827377 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Sun, 13 Oct 2024 00:24:26 -0700 Subject: [PATCH 02/12] cleanup tests --- src/transport/recorder/tests.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/transport/recorder/tests.rs b/src/transport/recorder/tests.rs index 038826b5..76cfe18e 100644 --- a/src/transport/recorder/tests.rs +++ b/src/transport/recorder/tests.rs @@ -1,8 +1,15 @@ use super::*; use std::env; +use std::sync::Mutex; + +struct EnvMutex {} + +static ENV_MUTEX: Mutex = Mutex::new(EnvMutex {}); #[test] fn env_var_enables_recorder() { + let _guard = ENV_MUTEX.lock().unwrap(); + let key = String::from("IBAPI_RECORDING_DIR"); let dir = String::from("/tmp/records"); @@ -10,21 +17,22 @@ fn env_var_enables_recorder() { let recorder = MessageRecorder::new(); - // TODO - refactor - // assert_eq!(true, recorder.enabled); - // assert!(&recorder.recording_dir.starts_with(&dir), "{} != {}", &recorder.recording_dir, &dir) + assert_eq!(true, recorder.enabled); + assert!(&recorder.recording_dir.starts_with(&dir), "{} != {}", &recorder.recording_dir, &dir) } #[test] fn recorder_is_disabled() { + let _guard = ENV_MUTEX.lock().unwrap(); + let key = String::from("IBAPI_RECORDING_DIR"); env::set_var(&key, &""); - let _recorder = MessageRecorder::new(); + let recorder = MessageRecorder::new(); - // assert_eq!(false, recorder.enabled); - // assert_eq!("", &recorder.recording_dir); + assert_eq!(false, recorder.enabled); + assert_eq!("", &recorder.recording_dir); } #[test] From 572c118f38ef99f602b8dc4cfdffbc588d8d6cc7 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Sun, 13 Oct 2024 00:26:11 -0700 Subject: [PATCH 03/12] added doc --- src/transport/recorder.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/transport/recorder.rs b/src/transport/recorder.rs index f56f20cf..a8bb6f29 100644 --- a/src/transport/recorder.rs +++ b/src/transport/recorder.rs @@ -1,3 +1,11 @@ +//! The MessageRecorder is used to log interactions between the client and +//! the TWS server. +//! The record is enabled by setting the environment variable IBAPI_RECORDING_DIR +//! IBAPI_RECORDING_DIR is set to the path to store logs +//! e.g. set to /tmp/logs +//! /tmp/logs/0001-request.msg +//! /tmp/logs/0002-response.msg + use std::env; use std::fs; use std::sync::atomic::{AtomicUsize, Ordering}; From 19b79094795405ebf1911345dc7cb8628b33a556 Mon Sep 17 00:00:00 2001 From: Wilfred Boayue Date: Tue, 15 Oct 2024 20:19:03 -0700 Subject: [PATCH 04/12] checkpoint --- src/accounts.rs | 31 ++++++++++++++++++++++++++++++- src/client.rs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index 90cdb725..f2205b89 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -284,6 +284,24 @@ pub enum AccountUpdates { End, } +impl Subscribable for AccountUpdates { + const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd]; + + fn decode(_server_version: i32, message: &mut ResponseMessage) -> Result { + // match message.message_type() { + // IncomingMessages::PositionMulti => Ok(PositionUpdateMulti::Position(decoders::decode_position_multi(message)?)), + // IncomingMessages::PositionMultiEnd => Ok(PositionUpdateMulti::PositionEnd), + // message => Err(Error::Simple(format!("unexpected message: {message:?}"))), + // } + Err(Error::NotImplemented) + } + + fn cancel_message(_server_version: i32, request_id: Option) -> Result { + let request_id = request_id.expect("Request ID required to encode cancel positions multi"); + encoders::encode_cancel_positions_multi(request_id) + } +} + /// A value of subscribed account's information. pub struct AccountValue { /// The value being updated. @@ -310,7 +328,7 @@ pub struct AccountPortfolio { pub average_cost: f64, /// Daily unrealized profit and loss on the position. pub unrealized_pnl: f64, - /// Daily realized profit and loss on the position. + /// Daily realized profit and loss on the position. pub realized_pnl: f64, /// Account identifier for the update. pub account: String, @@ -412,6 +430,17 @@ pub fn account_summary<'a>(client: &'a Client, group: &str, tags: &[&str]) -> Re Ok(Subscription::new(client, subscription)) } +pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result, Error> { + client.check_server_version(server_versions::ACCOUNT_SUMMARY, "It does not support account summary requests.")?; + + // let request_id = client.next_request_id(); + // let request = encoders::encode_request_account_summary(request_id, group, tags)?; + // let subscription = client.send_request(request_id, request)?; + + // Ok(Subscription::new(client, subscription)) + Err(Error::NotImplemented) +} + pub fn managed_accounts(client: &Client) -> Result, Error> { let request = encoders::encode_request_managed_accounts()?; let subscription = client.send_shared_request(OutgoingMessages::RequestManagedAccounts, request)?; diff --git a/src/client.rs b/src/client.rs index d693bc61..86ae0c24 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,7 @@ use log::{debug, error}; use time::OffsetDateTime; use time_tz::Tz; -use crate::accounts::{AccountSummaries, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; +use crate::accounts::{AccountSummaries, AccountUpdates, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; use crate::contracts::Contract; use crate::errors::Error; use crate::market_data::historical; @@ -229,6 +229,36 @@ impl Client { accounts::account_summary(self, group, tags) } + /// Subscribes to a specific account’s information and portfolio. + /// + /// All account values and positions will be returned initially, and then there will only be updates when there is a change in a position, or to an account value every 3 minutes if it has changed. Only one account can be subscribed at a time. + /// + /// # Arguments + /// * `account` - The account id (i.e. U1234567) for which the information is requested. + /// + /// # Examples + /// + /// ```no_run + /// use ibapi::Client; + /// + /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + /// + /// let account = "U1234567"; + /// + /// let subscription = client.account_updates(account).expect("error requesting account updates"); + /// for update in &subscription { + /// println!("{update:?}") + /// + /// // stop after full initial update + /// if let AccountUpdates::End == update { + /// subscription.cancel(); + /// } + /// } + /// ``` + pub fn account_updates<'a>(&'a self, account: &str) -> Result, Error> { + accounts::account_updates(self, account) + } + /// Requests the accounts to which the logged user has access to. /// /// # Examples From f3ad3b2613447309979cdb00dd5778439fd177cc Mon Sep 17 00:00:00 2001 From: Wilfred Boayue Date: Tue, 15 Oct 2024 20:22:53 -0700 Subject: [PATCH 05/12] checkpoint --- src/accounts.rs | 4 ++++ src/client.rs | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index f2205b89..d7f5080a 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -273,6 +273,7 @@ pub struct FamilyCode { /// Account's information, portfolio and last update time #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum AccountUpdates { /// Receives the subscribed account's information. Value(AccountValue), @@ -303,6 +304,7 @@ impl Subscribable for AccountUpdates { } /// A value of subscribed account's information. +#[derive(Debug)] pub struct AccountValue { /// The value being updated. pub key: String, @@ -315,6 +317,7 @@ pub struct AccountValue { } /// Subscribed account's portfolio. +#[derive(Debug)] pub struct AccountPortfolio { /// The Contract for which a position is held. pub contract: Contract, @@ -335,6 +338,7 @@ pub struct AccountPortfolio { } /// Last time at which the account was updated. +#[derive(Debug)] pub struct AccountTime { /// The last update system time. pub timestamp: String, diff --git a/src/client.rs b/src/client.rs index 86ae0c24..46915e41 100644 --- a/src/client.rs +++ b/src/client.rs @@ -240,6 +240,7 @@ impl Client { /// /// ```no_run /// use ibapi::Client; + /// use ibapi::accounts::AccountUpdates; /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// @@ -247,10 +248,10 @@ impl Client { /// /// let subscription = client.account_updates(account).expect("error requesting account updates"); /// for update in &subscription { - /// println!("{update:?}") + /// println!("{update:?}"); /// /// // stop after full initial update - /// if let AccountUpdates::End == update { + /// if let AccountUpdates::End = update { /// subscription.cancel(); /// } /// } From 8527ee0d6dc5387df7ed7dec6d4341bc3c8957a8 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 10:59:17 -0700 Subject: [PATCH 06/12] implemented account updates --- examples/account_updates.rs | 20 +++++++++ src/accounts.rs | 61 +++++++++++++++------------- src/accounts/decoders.rs | 81 ++++++++++++++++++++++++++++++++++++- src/accounts/encoders.rs | 30 ++++++++++++++ src/client.rs | 4 +- src/messages.rs | 4 +- src/transport.rs | 9 +++++ 7 files changed, 174 insertions(+), 35 deletions(-) create mode 100644 examples/account_updates.rs diff --git a/examples/account_updates.rs b/examples/account_updates.rs new file mode 100644 index 00000000..60c07791 --- /dev/null +++ b/examples/account_updates.rs @@ -0,0 +1,20 @@ +use ibapi::accounts::AccountUpdates; +use ibapi::Client; + +fn main() { + env_logger::init(); + + let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + + let account = "DU1234567"; + + let subscription = client.account_updates(account).expect("error requesting account updates"); + for update in &subscription { + println!("{update:?}"); + + // stop after full initial update + if let AccountUpdates::End = update { + subscription.cancel(); + } + } +} diff --git a/src/accounts.rs b/src/accounts.rs index d7f5080a..3a52d9ac 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -276,35 +276,42 @@ pub struct FamilyCode { #[derive(Debug)] pub enum AccountUpdates { /// Receives the subscribed account's information. - Value(AccountValue), + AccountValue(AccountValue), /// Receives the subscribed account's portfolio. - Portfolio(AccountPortfolio), + PortfolioValue(AccountPortfolioValue), /// Receives the last time on which the account was updated. - Time(AccountTime), + UpdateTime(AccountUpdateTime), /// Notifies when all the account’s information has finished. End, } impl Subscribable for AccountUpdates { - const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd]; - - fn decode(_server_version: i32, message: &mut ResponseMessage) -> Result { - // match message.message_type() { - // IncomingMessages::PositionMulti => Ok(PositionUpdateMulti::Position(decoders::decode_position_multi(message)?)), - // IncomingMessages::PositionMultiEnd => Ok(PositionUpdateMulti::PositionEnd), - // message => Err(Error::Simple(format!("unexpected message: {message:?}"))), - // } - Err(Error::NotImplemented) + const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[ + IncomingMessages::AccountValue, + IncomingMessages::PortfolioValue, + IncomingMessages::AccountUpdateTime, + IncomingMessages::AccountDownloadEnd, + ]; + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::AccountValue => Ok(AccountUpdates::AccountValue(decoders::decode_account_value(message)?)), + IncomingMessages::PortfolioValue => Ok(AccountUpdates::PortfolioValue(decoders::decode_account_portfolio_value( + server_version, + message, + )?)), + IncomingMessages::AccountUpdateTime => Ok(AccountUpdates::UpdateTime(decoders::decode_account_update_time(message)?)), + IncomingMessages::AccountDownloadEnd => Ok(AccountUpdates::End), + message => Err(Error::Simple(format!("unexpected message: {message:?}"))), + } } - fn cancel_message(_server_version: i32, request_id: Option) -> Result { - let request_id = request_id.expect("Request ID required to encode cancel positions multi"); - encoders::encode_cancel_positions_multi(request_id) + fn cancel_message(server_version: i32, _request_id: Option) -> Result { + encoders::encode_cancel_account_updates(server_version) } } /// A value of subscribed account's information. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct AccountValue { /// The value being updated. pub key: String, @@ -313,12 +320,12 @@ pub struct AccountValue { /// The currency inn which the value is expressed. pub currency: String, /// The account identifier. - pub account: String, + pub account: Option, } /// Subscribed account's portfolio. -#[derive(Debug)] -pub struct AccountPortfolio { +#[derive(Debug, Default)] +pub struct AccountPortfolioValue { /// The Contract for which a position is held. pub contract: Contract, /// The number of positions held. @@ -334,12 +341,12 @@ pub struct AccountPortfolio { /// Daily realized profit and loss on the position. pub realized_pnl: f64, /// Account identifier for the update. - pub account: String, + pub account: Option, } /// Last time at which the account was updated. -#[derive(Debug)] -pub struct AccountTime { +#[derive(Debug, Default)] +pub struct AccountUpdateTime { /// The last update system time. pub timestamp: String, } @@ -435,14 +442,10 @@ pub fn account_summary<'a>(client: &'a Client, group: &str, tags: &[&str]) -> Re } pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result, Error> { - client.check_server_version(server_versions::ACCOUNT_SUMMARY, "It does not support account summary requests.")?; + let request = encoders::encode_request_account_updates(client.server_version(), account)?; + let subscription = client.send_shared_request(OutgoingMessages::RequestAccountData, request)?; - // let request_id = client.next_request_id(); - // let request = encoders::encode_request_account_summary(request_id, group, tags)?; - // let subscription = client.send_request(request_id, request)?; - - // Ok(Subscription::new(client, subscription)) - Err(Error::NotImplemented) + Ok(Subscription::new(client, subscription)) } pub fn managed_accounts(client: &Client) -> Result, Error> { diff --git a/src/accounts/decoders.rs b/src/accounts/decoders.rs index add4da19..c6cbe23e 100644 --- a/src/accounts/decoders.rs +++ b/src/accounts/decoders.rs @@ -1,8 +1,8 @@ -use crate::contracts::SecurityType; +use crate::contracts::{Contract, SecurityType}; use crate::messages::ResponseMessage; use crate::{server_versions, Error}; -use super::{AccountSummary, FamilyCode, PnL, PnLSingle, Position, PositionMulti}; +use super::{AccountPortfolioValue, AccountSummary, AccountUpdateTime, AccountValue, FamilyCode, PnL, PnLSingle, Position, PositionMulti}; pub(crate) fn decode_position(message: &mut ResponseMessage) -> Result { message.skip(); // message type @@ -144,5 +144,82 @@ pub(crate) fn decode_account_summary(_server_version: i32, message: &mut Respons }) } +pub(crate) fn decode_account_value(message: &mut ResponseMessage) -> Result { + message.skip(); // message type + + let message_version = message.next_int()?; + + let mut account_value = AccountValue { + key: message.next_string()?, + value: message.next_string()?, + currency: message.next_string()?, + ..Default::default() + }; + + if message_version >= 2 { + account_value.account = Some(message.next_string()?); + } + + Ok(account_value) +} + +pub(crate) fn decode_account_portfolio_value(server_version: i32, message: &mut ResponseMessage) -> Result { + message.skip(); // message type + + let message_version = message.next_int()?; + + let mut contract = Contract::default(); + if message_version >= 6 { + contract.contract_id = message.next_int()?; + } + contract.symbol = message.next_string()?; + contract.security_type = SecurityType::from(&message.next_string()?); + contract.last_trade_date_or_contract_month = message.next_string()?; + contract.strike = message.next_double()?; + contract.right = message.next_string()?; + if message_version >= 7 { + contract.multiplier = message.next_string()?; + contract.primary_exchange = message.next_string()?; + } + contract.currency = message.next_string()?; + if message_version >= 2 { + contract.local_symbol = message.next_string()?; + } + if message_version >= 8 { + contract.trading_class = message.next_string()?; + } + + let mut portfolio_value = AccountPortfolioValue { + contract, + ..Default::default() + }; + + portfolio_value.position = message.next_double()?; + portfolio_value.market_price = message.next_double()?; + portfolio_value.market_value = message.next_double()?; + if message_version >= 3 { + portfolio_value.average_cost = message.next_double()?; + portfolio_value.unrealized_pnl = message.next_double()?; + portfolio_value.realized_pnl = message.next_double()?; + } + if message_version >= 4 { + portfolio_value.account = Some(message.next_string()?); + } + if message_version == 6 && server_version == 39 { + portfolio_value.contract.primary_exchange = message.next_string()? + } + + Ok(portfolio_value) +} + +pub(crate) fn decode_account_update_time(message: &mut ResponseMessage) -> Result { + message.skip(); // message type + message.skip(); // version + + Ok(AccountUpdateTime { + timestamp: message.next_string()?, + }) +} + #[cfg(test)] mod tests; diff --git a/src/accounts/encoders.rs b/src/accounts/encoders.rs index 17a41403..2ebe427a 100644 --- a/src/accounts/encoders.rs +++ b/src/accounts/encoders.rs @@ -96,6 +96,36 @@ pub(crate) fn encode_request_managed_accounts() -> Result Ok(message) } +pub(crate) fn encode_request_account_updates(server_version: i32, account: &str) -> Result { + const VERSION: i32 = 2; + + let mut message = RequestMessage::new(); + + message.push_field(&OutgoingMessages::RequestAccountData); + message.push_field(&VERSION); + message.push_field(&true); // subscribe + if server_version > 9 { + message.push_field(&account); + } + + Ok(message) +} + +pub(crate) fn encode_cancel_account_updates(server_version: i32) -> Result { + const VERSION: i32 = 2; + + let mut message = RequestMessage::new(); + + message.push_field(&OutgoingMessages::RequestAccountData); + message.push_field(&VERSION); + message.push_field(&false); // subscribe + if server_version > 9 { + message.push_field(&""); + } + + Ok(message) +} + fn encode_simple(message_type: OutgoingMessages, version: i32) -> Result { let mut message = RequestMessage::new(); diff --git a/src/client.rs b/src/client.rs index 46915e41..c94f3092 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::Duration; -use log::{debug, error}; +use log::{debug, error, info}; use time::OffsetDateTime; use time_tz::Tz; @@ -1064,7 +1064,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { error!("{error_message}"); return None; } else { - error!("subscription iterator unexpected message: {message:?}"); + info!("subscription iterator unexpected message: {message:?}"); } } Some(Response::Cancelled) => { diff --git a/src/messages.rs b/src/messages.rs index 0693c5a3..be288f6e 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -1,7 +1,7 @@ use std::ops::Index; use std::str::{self, FromStr}; -use log::error; +use log::debug; use time::OffsetDateTime; use crate::{Error, ToField}; @@ -218,7 +218,7 @@ pub fn request_id_index(kind: IncomingMessages) -> Option { | IncomingMessages::AccountSummary | IncomingMessages::AccountSummaryEnd => Some(2), _ => { - error!("could not determine request id index for {kind:?}"); + debug!("could not determine request id index for {kind:?}"); None } } diff --git a/src/transport.rs b/src/transport.rs index ee3519f0..33c2d765 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -103,6 +103,15 @@ impl SharedChannels { &[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd], ); instance.register(OutgoingMessages::RequestManagedAccounts, &[IncomingMessages::ManagedAccounts]); + instance.register( + OutgoingMessages::RequestAccountData, + &[ + IncomingMessages::AccountValue, + IncomingMessages::PortfolioValue, + IncomingMessages::AccountDownloadEnd, + IncomingMessages::AccountUpdateTime, + ], + ); instance } From 3dc3074aae0963bf81d9a65dc3a8cab38a72b70e Mon Sep 17 00:00:00 2001 From: Wilfred Boayue Date: Wed, 16 Oct 2024 17:32:37 -0700 Subject: [PATCH 07/12] implements account updates multi --- src/accounts.rs | 56 +++++++++++++++++++++++++++++++++++++++- src/accounts/decoders.rs | 20 +++++++++++++- src/accounts/encoders.rs | 19 ++++++++++++++ src/client.rs | 39 +++++++++++++++++++++++++++- 4 files changed, 131 insertions(+), 3 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index 3a52d9ac..e5cc923e 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -317,7 +317,7 @@ pub struct AccountValue { pub key: String, /// Current value pub value: String, - /// The currency inn which the value is expressed. + /// The currency in which the value is expressed. pub currency: String, /// The account identifier. pub account: Option, @@ -351,6 +351,46 @@ pub struct AccountUpdateTime { pub timestamp: String, } +/// Account's information, portfolio and last update time +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum AccountUpdateMulti { + /// Receives the subscribed account's information. + AccountMultiValue(AccountMultiValue), + /// Notifies when all the account’s information has finished. + End, +} + +// Provides the account updates. +#[derive(Debug, Default)] +pub struct AccountMultiValue { + /// he account with updates. + pub account: String, + /// The model code with updates. + pub model_code: String, + /// The name of parameter. + pub key: String, + /// The value of parameter. + pub value: String, + /// The currency of parameter. + pub currency: String, +} + +impl Subscribable for AccountUpdateMulti { + const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::AccountUpdateMulti, IncomingMessages::AccountUpdateMultiEnd]; + fn decode(_server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::AccountUpdateMulti => Ok(AccountUpdateMulti::AccountMultiValue(decoders::decode_account_multi_value(message)?)), + IncomingMessages::AccountUpdateMultiEnd => Ok(AccountUpdateMulti::End), + message => Err(Error::Simple(format!("unexpected message: {message:?}"))), + } + } + + fn cancel_message(server_version: i32, _request_id: Option) -> Result { + encoders::encode_cancel_account_updates(server_version) + } +} + // Subscribes to position updates for all accessible accounts. // All positions sent initially, and then only updates as positions change. pub(crate) fn positions(client: &Client) -> Result, Error> { @@ -448,6 +488,20 @@ pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result( + client: &'a Client, + account: Option<&str>, + model_code: Option<&str>, +) -> Result, Error> { + client.check_server_version(server_versions::MODELS_SUPPORT, "It does not support account updates multi requests.")?; + + let request_id = client.next_request_id(); + let request = encoders::encode_request_account_updates_multi(request_id, account, model_code)?; + let subscription = client.send_request(request_id, request)?; + + Ok(Subscription::new(client, subscription)) +} + pub fn managed_accounts(client: &Client) -> Result, Error> { let request = encoders::encode_request_managed_accounts()?; let subscription = client.send_shared_request(OutgoingMessages::RequestManagedAccounts, request)?; diff --git a/src/accounts/decoders.rs b/src/accounts/decoders.rs index c6cbe23e..0cfb1c26 100644 --- a/src/accounts/decoders.rs +++ b/src/accounts/decoders.rs @@ -2,7 +2,9 @@ use crate::contracts::{Contract, SecurityType}; use crate::messages::ResponseMessage; use crate::{server_versions, Error}; -use super::{AccountPortfolioValue, AccountSummary, AccountUpdateTime, AccountValue, FamilyCode, PnL, PnLSingle, Position, PositionMulti}; +use super::{ + AccountMultiValue, AccountPortfolioValue, AccountSummary, AccountUpdateTime, AccountValue, FamilyCode, PnL, PnLSingle, Position, PositionMulti, +}; pub(crate) fn decode_position(message: &mut ResponseMessage) -> Result { message.skip(); // message type @@ -221,5 +223,21 @@ pub(crate) fn decode_account_update_time(message: &mut ResponseMessage) -> Resul }) } +pub(crate) fn decode_account_multi_value(message: &mut ResponseMessage) -> Result { + message.skip(); // message type + message.skip(); // message version + message.skip(); // request id + + let value = AccountMultiValue { + account: message.next_string()?, + model_code: message.next_string()?, + key: message.next_string()?, + value: message.next_string()?, + currency: message.next_string()?, + }; + + Ok(value) +} + #[cfg(test)] mod tests; diff --git a/src/accounts/encoders.rs b/src/accounts/encoders.rs index 2ebe427a..8fd8d840 100644 --- a/src/accounts/encoders.rs +++ b/src/accounts/encoders.rs @@ -111,6 +111,25 @@ pub(crate) fn encode_request_account_updates(server_version: i32, account: &str) Ok(message) } +pub(crate) fn encode_request_account_updates_multi( + request_id: i32, + account: Option<&str>, + model_code: Option<&str>, +) -> Result { + const VERSION: i32 = 1; + + let mut message = RequestMessage::new(); + + message.push_field(&OutgoingMessages::RequestAccountUpdatesMulti); + message.push_field(&VERSION); + message.push_field(&request_id); + message.push_field(&account); + message.push_field(&model_code); + message.push_field(&true); // subscribe + + Ok(message) +} + pub(crate) fn encode_cancel_account_updates(server_version: i32) -> Result { const VERSION: i32 = 2; diff --git a/src/client.rs b/src/client.rs index c94f3092..9589d6ec 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,7 @@ use log::{debug, error, info}; use time::OffsetDateTime; use time_tz::Tz; -use crate::accounts::{AccountSummaries, AccountUpdates, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; +use crate::accounts::{AccountSummaries, AccountUpdateMulti, AccountUpdates, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; use crate::contracts::Contract; use crate::errors::Error; use crate::market_data::historical; @@ -260,6 +260,43 @@ impl Client { accounts::account_updates(self, account) } + /// Requests account updates for account and/or model. + /// + /// All account values and positions will be returned initially, and then there will only be updates when there is a change in a position, or to an account value every 3 minutes if it has changed. Only one account can be subscribed at a time. + /// + /// # Arguments + /// * `account` - Account values can be requested for a particular account. + /// * `model_code` - Account values can also be requested for a model. + /// * `ledger_and_nlv` - Returns light-weight request; only currency positions as opposed to account values and currency positions. + /// + /// # Examples + /// + /// ```no_run + /// use ibapi::Client; + /// use ibapi::accounts::AccountUpdateMulti; + /// + /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + /// + /// let account = Some("U1234567"); + /// + /// let subscription = client.account_updates_multi(account, None).expect("error requesting account updates multi"); + /// for update in &subscription { + /// println!("{update:?}"); + /// + /// // stop after full initial update + /// if let AccountUpdateMulti::End = update { + /// subscription.cancel(); + /// } + /// } + /// ``` + pub fn account_updates_multi<'a>( + &'a self, + account: Option<&str>, + model_code: Option<&str>, + ) -> Result, Error> { + accounts::account_updates_multi(self, account, model_code) + } + /// Requests the accounts to which the logged user has access to. /// /// # Examples From 73eac9897b49452895b6c49e8a793d2a5c1d097d Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 19:49:46 -0700 Subject: [PATCH 08/12] refactor channel mapping --- src/messages.rs | 1 + src/messages/shared_channel_configuration.rs | 47 ++++++++++++++++++++ src/transport.rs | 31 ++----------- 3 files changed, 52 insertions(+), 27 deletions(-) create mode 100644 src/messages/shared_channel_configuration.rs diff --git a/src/messages.rs b/src/messages.rs index be288f6e..f4d3f127 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -534,5 +534,6 @@ impl ResponseMessage { } } +pub(crate) mod shared_channel_configuration; #[cfg(test)] mod tests; diff --git a/src/messages/shared_channel_configuration.rs b/src/messages/shared_channel_configuration.rs new file mode 100644 index 00000000..2ff963c6 --- /dev/null +++ b/src/messages/shared_channel_configuration.rs @@ -0,0 +1,47 @@ +use super::{IncomingMessages, OutgoingMessages}; + +pub struct ChannelMapping<'a> { + pub request: OutgoingMessages, + pub responses: &'a [IncomingMessages], +} + +// For shared channels configures mapping of request message id to response message ids. +pub(crate) const CHANNEL_MAPPINGS: &[ChannelMapping] = &[ + ChannelMapping { + request: OutgoingMessages::RequestIds, + responses: &[IncomingMessages::NextValidId], + }, + ChannelMapping { + request: OutgoingMessages::RequestFamilyCodes, + responses: &[IncomingMessages::FamilyCodes], + }, + ChannelMapping { + request: OutgoingMessages::RequestMarketRule, + responses: &[IncomingMessages::MarketRule], + }, + ChannelMapping { + request: OutgoingMessages::RequestPositions, + responses: &[IncomingMessages::Position, IncomingMessages::PositionEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestPositionsMulti, + responses: &[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestOpenOrders, + responses: &[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestManagedAccounts, + responses: &[IncomingMessages::ManagedAccounts], + }, + ChannelMapping { + request: OutgoingMessages::RequestAccountData, + responses: &[ + IncomingMessages::AccountValue, + IncomingMessages::PortfolioValue, + IncomingMessages::AccountDownloadEnd, + IncomingMessages::AccountUpdateTime, + ], + }, +]; diff --git a/src/transport.rs b/src/transport.rs index 33c2d765..9743aa87 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -17,8 +17,7 @@ use time::macros::format_description; use time::OffsetDateTime; use time_tz::{timezones, OffsetResult, PrimitiveDateTimeExt, Tz}; -use crate::messages::{IncomingMessages, OutgoingMessages}; -use crate::messages::{RequestMessage, ResponseMessage}; +use crate::messages::{shared_channel_configuration, IncomingMessages, OutgoingMessages, RequestMessage, ResponseMessage}; use crate::{server_versions, Error}; use recorder::MessageRecorder; @@ -87,31 +86,9 @@ impl SharedChannels { }; // Register request/response pairs. - instance.register(OutgoingMessages::RequestIds, &[IncomingMessages::NextValidId]); - instance.register(OutgoingMessages::RequestFamilyCodes, &[IncomingMessages::FamilyCodes]); - instance.register(OutgoingMessages::RequestMarketRule, &[IncomingMessages::MarketRule]); - instance.register( - OutgoingMessages::RequestPositions, - &[IncomingMessages::Position, IncomingMessages::PositionEnd], - ); - instance.register( - OutgoingMessages::RequestPositionsMulti, - &[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd], - ); - instance.register( - OutgoingMessages::RequestOpenOrders, - &[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd], - ); - instance.register(OutgoingMessages::RequestManagedAccounts, &[IncomingMessages::ManagedAccounts]); - instance.register( - OutgoingMessages::RequestAccountData, - &[ - IncomingMessages::AccountValue, - IncomingMessages::PortfolioValue, - IncomingMessages::AccountDownloadEnd, - IncomingMessages::AccountUpdateTime, - ], - ); + for mapping in shared_channel_configuration::CHANNEL_MAPPINGS { + instance.register(mapping.request, mapping.responses); + } instance } From 7dde8be1320744d219bcbba64683f5ed2f6c916e Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 21:12:00 -0700 Subject: [PATCH 09/12] cleanup --- examples/account_updates.rs | 4 ++-- examples/account_updates_multi.rs | 22 ++++++++++++++++++++++ src/accounts.rs | 21 ++++++++++++--------- src/accounts/encoders.rs | 15 +++++++++++++++ src/client.rs | 8 ++++---- src/messages.rs | 4 +++- 6 files changed, 58 insertions(+), 16 deletions(-) create mode 100644 examples/account_updates_multi.rs diff --git a/examples/account_updates.rs b/examples/account_updates.rs index 60c07791..f45616d4 100644 --- a/examples/account_updates.rs +++ b/examples/account_updates.rs @@ -1,4 +1,4 @@ -use ibapi::accounts::AccountUpdates; +use ibapi::accounts::AccountUpdate; use ibapi::Client; fn main() { @@ -13,7 +13,7 @@ fn main() { println!("{update:?}"); // stop after full initial update - if let AccountUpdates::End = update { + if let AccountUpdate::End = update { subscription.cancel(); } } diff --git a/examples/account_updates_multi.rs b/examples/account_updates_multi.rs new file mode 100644 index 00000000..a8ef2739 --- /dev/null +++ b/examples/account_updates_multi.rs @@ -0,0 +1,22 @@ +use ibapi::accounts::AccountUpdateMulti; +use ibapi::Client; + +fn main() { + env_logger::init(); + + let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + + let account = Some("DU1234567"); + + let subscription = client + .account_updates_multi(account, None) + .expect("error requesting account updates multi"); + for update in &subscription { + println!("{update:?}"); + + // stop after full initial update + if let AccountUpdateMulti::End = update { + subscription.cancel(); + } + } +} diff --git a/src/accounts.rs b/src/accounts.rs index e5cc923e..c70d3107 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -274,7 +274,7 @@ pub struct FamilyCode { /// Account's information, portfolio and last update time #[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum AccountUpdates { +pub enum AccountUpdate { /// Receives the subscribed account's information. AccountValue(AccountValue), /// Receives the subscribed account's portfolio. @@ -285,22 +285,23 @@ pub enum AccountUpdates { End, } -impl Subscribable for AccountUpdates { +impl Subscribable for AccountUpdate { const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[ IncomingMessages::AccountValue, IncomingMessages::PortfolioValue, IncomingMessages::AccountUpdateTime, IncomingMessages::AccountDownloadEnd, ]; + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { match message.message_type() { - IncomingMessages::AccountValue => Ok(AccountUpdates::AccountValue(decoders::decode_account_value(message)?)), - IncomingMessages::PortfolioValue => Ok(AccountUpdates::PortfolioValue(decoders::decode_account_portfolio_value( + IncomingMessages::AccountValue => Ok(AccountUpdate::AccountValue(decoders::decode_account_value(message)?)), + IncomingMessages::PortfolioValue => Ok(AccountUpdate::PortfolioValue(decoders::decode_account_portfolio_value( server_version, message, )?)), - IncomingMessages::AccountUpdateTime => Ok(AccountUpdates::UpdateTime(decoders::decode_account_update_time(message)?)), - IncomingMessages::AccountDownloadEnd => Ok(AccountUpdates::End), + IncomingMessages::AccountUpdateTime => Ok(AccountUpdate::UpdateTime(decoders::decode_account_update_time(message)?)), + IncomingMessages::AccountDownloadEnd => Ok(AccountUpdate::End), message => Err(Error::Simple(format!("unexpected message: {message:?}"))), } } @@ -378,6 +379,7 @@ pub struct AccountMultiValue { impl Subscribable for AccountUpdateMulti { const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::AccountUpdateMulti, IncomingMessages::AccountUpdateMultiEnd]; + fn decode(_server_version: i32, message: &mut ResponseMessage) -> Result { match message.message_type() { IncomingMessages::AccountUpdateMulti => Ok(AccountUpdateMulti::AccountMultiValue(decoders::decode_account_multi_value(message)?)), @@ -386,8 +388,9 @@ impl Subscribable for AccountUpdateMulti { } } - fn cancel_message(server_version: i32, _request_id: Option) -> Result { - encoders::encode_cancel_account_updates(server_version) + fn cancel_message(server_version: i32, request_id: Option) -> Result { + let request_id = request_id.expect("Request ID required to encode cancel account updates multi"); + encoders::encode_cancel_account_updates_multi(server_version, request_id) } } @@ -481,7 +484,7 @@ pub fn account_summary<'a>(client: &'a Client, group: &str, tags: &[&str]) -> Re Ok(Subscription::new(client, subscription)) } -pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result, Error> { +pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result, Error> { let request = encoders::encode_request_account_updates(client.server_version(), account)?; let subscription = client.send_shared_request(OutgoingMessages::RequestAccountData, request)?; diff --git a/src/accounts/encoders.rs b/src/accounts/encoders.rs index 8fd8d840..1a3f5356 100644 --- a/src/accounts/encoders.rs +++ b/src/accounts/encoders.rs @@ -145,6 +145,21 @@ pub(crate) fn encode_cancel_account_updates(server_version: i32) -> Result Result { + const VERSION: i32 = 1; + + let mut message = RequestMessage::new(); + + message.push_field(&OutgoingMessages::RequestAccountUpdatesMulti); + message.push_field(&VERSION); + message.push_field(&request_id); + message.push_field(&""); // account + message.push_field(&""); // model code + message.push_field(&false); // subscribe + + Ok(message) +} + fn encode_simple(message_type: OutgoingMessages, version: i32) -> Result { let mut message = RequestMessage::new(); diff --git a/src/client.rs b/src/client.rs index 9589d6ec..0a900b08 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,7 @@ use log::{debug, error, info}; use time::OffsetDateTime; use time_tz::Tz; -use crate::accounts::{AccountSummaries, AccountUpdateMulti, AccountUpdates, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; +use crate::accounts::{AccountSummaries, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; use crate::contracts::Contract; use crate::errors::Error; use crate::market_data::historical; @@ -240,7 +240,7 @@ impl Client { /// /// ```no_run /// use ibapi::Client; - /// use ibapi::accounts::AccountUpdates; + /// use ibapi::accounts::AccountUpdate; /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// @@ -251,12 +251,12 @@ impl Client { /// println!("{update:?}"); /// /// // stop after full initial update - /// if let AccountUpdates::End = update { + /// if let AccountUpdate::End = update { /// subscription.cancel(); /// } /// } /// ``` - pub fn account_updates<'a>(&'a self, account: &str) -> Result, Error> { + pub fn account_updates<'a>(&'a self, account: &str) -> Result, Error> { accounts::account_updates(self, account) } diff --git a/src/messages.rs b/src/messages.rs index f4d3f127..5ce3c057 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -216,7 +216,9 @@ pub fn request_id_index(kind: IncomingMessages) -> Option { | IncomingMessages::Error | IncomingMessages::ExecutionDataEnd | IncomingMessages::AccountSummary - | IncomingMessages::AccountSummaryEnd => Some(2), + | IncomingMessages::AccountSummaryEnd + | IncomingMessages::AccountUpdateMulti + | IncomingMessages::AccountUpdateMultiEnd => Some(2), _ => { debug!("could not determine request id index for {kind:?}"); None From 1cd89c5ba603f8a074d63c49466bc8a0865bec63 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 22:25:45 -0700 Subject: [PATCH 10/12] fixed shudown --- src/accounts/encoders.rs | 5 +---- src/stubs.rs | 2 ++ src/transport.rs | 39 ++++++++++++++++++++++++++------------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/accounts/encoders.rs b/src/accounts/encoders.rs index 1a3f5356..18ac8a3c 100644 --- a/src/accounts/encoders.rs +++ b/src/accounts/encoders.rs @@ -150,12 +150,9 @@ pub(crate) fn encode_cancel_account_updates_multi(_server_version: i32, request_ let mut message = RequestMessage::new(); - message.push_field(&OutgoingMessages::RequestAccountUpdatesMulti); + message.push_field(&OutgoingMessages::CancelAccountUpdatesMulti); message.push_field(&VERSION); message.push_field(&request_id); - message.push_field(&""); // account - message.push_field(&""); // model code - message.push_field(&false); // subscribe Ok(message) } diff --git a/src/stubs.rs b/src/stubs.rs index f8ebb1e0..c7a27fcb 100644 --- a/src/stubs.rs +++ b/src/stubs.rs @@ -46,6 +46,8 @@ impl MessageBus for MessageBusStub { Ok(()) } + fn ensure_shutdown(&self) {} + // fn process_messages(&mut self, _server_version: i32) -> Result<(), Error> { // Ok(()) // } diff --git a/src/transport.rs b/src/transport.rs index 9743aa87..b272d0a0 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -45,7 +45,7 @@ pub(crate) trait MessageBus: Send + Sync { fn cancel_order_subscription(&self, request_id: i32, packet: &RequestMessage) -> Result<(), Error>; - fn ensure_shutdown(&self) {} + fn ensure_shutdown(&self); // Testing interface. Tracks requests sent messages when Bus is stubbed. #[cfg(test)] @@ -172,6 +172,8 @@ impl TcpMessageBus { } fn request_shutdown(&self) { + debug!("shutdown requested"); + self.requests.notify_all(&Response::Disconnected); self.orders.notify_all(&Response::Disconnected); @@ -218,6 +220,13 @@ impl TcpMessageBus { backoff.reset(); retry_attempt = 0; } + Err(Error::Io(e)) if e.kind() == ErrorKind::WouldBlock => { + if message_bus.is_shutting_down() { + debug!("dispatcher thread exiting"); + return; + } + thread::sleep(Duration::from_millis(1)); + } Err(Error::Io(e)) if RECONNECT_ERRORS.contains(&e.kind()) => { error!("error reading packet: {:?}", e); // reset hashes @@ -248,10 +257,6 @@ impl TcpMessageBus { return; } }; - - if message_bus.is_shutting_down() { - return; - } } }) } @@ -381,20 +386,23 @@ impl TcpMessageBus { fn start_cleanup_thread(self: &Arc) -> JoinHandle<()> { let message_bus = Arc::clone(self); - thread::spawn(move || loop { + thread::spawn(move || { let signal_recv = message_bus.signals_recv.clone(); - for signal in &signal_recv { - match signal { - Signal::Request(request_id) => { - message_bus.clean_request(request_id); - } - Signal::Order(order_id) => { - message_bus.clean_order(order_id); + loop { + if let Ok(signal) = signal_recv.recv_timeout(Duration::from_secs(1)) { + match signal { + Signal::Request(request_id) => { + message_bus.clean_request(request_id); + } + Signal::Order(order_id) => { + message_bus.clean_order(order_id); + } } } if message_bus.is_shutting_down() { + debug!("cleanup thread exiting"); return; } } @@ -510,6 +518,7 @@ impl MessageBus for TcpMessageBus { } fn ensure_shutdown(&self) { + self.request_shutdown(); self.join(); } } @@ -809,6 +818,8 @@ impl Connection { let reader = TcpStream::connect(connection_url)?; let writer = reader.try_clone()?; + reader.set_read_timeout(Some(Duration::from_secs(1)))?; + let connection = Self { client_id, connection_url: connection_url.into(), @@ -845,6 +856,8 @@ impl Connection { let mut writer = self.writer.lock()?; *reader = stream.try_clone()?; + reader.set_read_timeout(Some(Duration::from_secs(1)))?; + *writer = stream; } From 05ef656c67636a1fe1f1b789ddb24ec44db42eb0 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 23:14:48 -0700 Subject: [PATCH 11/12] cleanup cancel --- examples/account_summary.rs | 2 +- examples/readme_realtime_data_1.rs | 2 +- examples/readme_realtime_data_2.rs | 4 ++-- src/accounts/decoders/tests.rs | 15 ++++++++++++- src/accounts/tests.rs | 35 ++++++++++++++++++++++++++++++ src/client.rs | 21 ++++++++++-------- src/messages.rs | 7 ++++++ src/testdata/responses.rs | 5 +++++ 8 files changed, 77 insertions(+), 14 deletions(-) diff --git a/examples/account_summary.rs b/examples/account_summary.rs index fd6f1106..7f09ceec 100644 --- a/examples/account_summary.rs +++ b/examples/account_summary.rs @@ -15,7 +15,7 @@ fn main() { for update in &subscription { match update { AccountSummaries::Summary(summary) => println!("{summary:?}"), - AccountSummaries::End => subscription.cancel().expect("cancel failed"), + AccountSummaries::End => subscription.cancel(), } } } diff --git a/examples/readme_realtime_data_1.rs b/examples/readme_realtime_data_1.rs index db6800f3..c3538c97 100644 --- a/examples/readme_realtime_data_1.rs +++ b/examples/readme_realtime_data_1.rs @@ -19,6 +19,6 @@ fn main() { println!("bar: {bar:?}"); // when your algorithm is done, cancel subscription - subscription.cancel().expect("cancel failed"); + subscription.cancel(); } } diff --git a/examples/readme_realtime_data_2.rs b/examples/readme_realtime_data_2.rs index 8f6ee8b5..006bf5e6 100644 --- a/examples/readme_realtime_data_2.rs +++ b/examples/readme_realtime_data_2.rs @@ -22,7 +22,7 @@ fn main() { println!("NVDA {}, AAPL {}", bar_nvda.close, bar_aapl.close); // when your algorithm is done, cancel subscription - subscription_aapl.cancel().expect("cancel failed"); - subscription_nvda.cancel().expect("cancel failed"); + subscription_aapl.cancel(); + subscription_nvda.cancel(); } } diff --git a/src/accounts/decoders/tests.rs b/src/accounts/decoders/tests.rs index 9b640832..880fdfb9 100644 --- a/src/accounts/decoders/tests.rs +++ b/src/accounts/decoders/tests.rs @@ -1,4 +1,4 @@ -use crate::{accounts::AccountSummaryTags, server_versions}; +use crate::{accounts::AccountSummaryTags, server_versions, testdata::responses}; #[test] fn test_decode_positions() { @@ -119,3 +119,16 @@ fn test_decode_account_summary() { assert_eq!(account_summary.value, "FA", "account_summary.value"); assert_eq!(account_summary.currency, "", "account_summary.currency"); } + +#[test] +fn test_decode_account_multi_value() { + let mut message = super::ResponseMessage::from_simple(responses::ACCOUNT_UPDATE_MULTI_CURRENCY); + + let value = super::decode_account_multi_value(&mut message).expect("error decoding account multi value"); + + assert_eq!(value.account, "DU1234567", "value.account"); + assert_eq!(value.model_code, "", "value.model_code"); + assert_eq!(value.key, "Currency", "value.key"); + assert_eq!(value.value, "USD", "value.value"); + assert_eq!(value.currency, "USD", "value.currency"); +} diff --git a/src/accounts/tests.rs b/src/accounts/tests.rs index b9d1a21c..c0e066c3 100644 --- a/src/accounts/tests.rs +++ b/src/accounts/tests.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, RwLock}; +use crate::accounts::AccountUpdateMulti; use crate::testdata::responses; use crate::{accounts::AccountSummaryTags, server_versions, stubs::MessageBusStub, Client}; @@ -126,3 +127,37 @@ fn test_managed_accounts() { assert_eq!(accounts, &["DU1234567", "DU7654321"]); } + +#[test] +fn test_account_updates_multi() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + response_messages: vec![ + responses::ACCOUNT_UPDATE_MULTI_CASH_BALANCE.into(), + responses::ACCOUNT_UPDATE_MULTI_CURRENCY.into(), + responses::ACCOUNT_UPDATE_MULTI_END.into(), + ], + }); + + let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); + + let account = Some("DU1234567"); + let subscription = client.account_updates_multi(account, None).expect("request managed accounts failed"); + + let update = subscription.next().unwrap(); + match update { + AccountUpdateMulti::AccountMultiValue(value) => { + assert_eq!(value.key, "CashBalance"); + } + AccountUpdateMulti::End => { + panic!("value expected") + } + } + + subscription.cancel(); + + let request_messages = client.message_bus.request_messages(); + + assert_eq!(request_messages[0].encode_simple(), "76|1|9000|DU1234567||1|"); + assert_eq!(request_messages[1].encode_simple(), "77|1|9000|"); +} diff --git a/src/client.rs b/src/client.rs index 0a900b08..e21439b1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::Duration; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use time::OffsetDateTime; use time_tz::Tz; @@ -1180,26 +1180,31 @@ impl<'a, T: Subscribable> Subscription<'a, T> { } /// Cancel the subscription - pub fn cancel(&self) -> Result<(), Error> { + pub fn cancel(&self) { if let Some(request_id) = self.request_id { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_subscription(request_id, &message)?; + if let Err(e) = self.client.message_bus.cancel_subscription(request_id, &message) { + warn!("error cancelling subscription: {e}") + } self.subscription.cancel(); } } else if let Some(order_id) = self.order_id { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_order_subscription(order_id, &message)?; + if let Err(e) = self.client.message_bus.cancel_order_subscription(order_id, &message) { + warn!("error cancelling order subscription: {e}") + } self.subscription.cancel(); } } else if let Some(message_type) = self.message_type { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { - self.client.message_bus.cancel_shared_subscription(message_type, &message)?; + if let Err(e) = self.client.message_bus.cancel_shared_subscription(message_type, &message) { + warn!("error cancelling shared subscription: {e}") + } self.subscription.cancel(); } } else { debug!("Could not determine cancel method") } - Ok(()) } pub fn iter(&self) -> SubscriptionIter { @@ -1217,9 +1222,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { impl<'a, T: Subscribable> Drop for Subscription<'a, T> { fn drop(&mut self) { - if let Err(err) = self.cancel() { - error!("error cancelling subscription: {err}"); - } + self.cancel(); } } diff --git a/src/messages.rs b/src/messages.rs index 5ce3c057..8d313a03 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -519,12 +519,19 @@ impl ResponseMessage { } pub fn from(fields: &str) -> ResponseMessage { + let fields = fields.replace("|", "\0"); ResponseMessage { i: 0, fields: fields.split('\x00').map(|x| x.to_string()).collect(), } } + #[cfg(test)] + pub fn from_simple(fields: &str) -> ResponseMessage { + let fields = fields.replace("|", "\0"); + Self::from(&fields) + } + pub fn skip(&mut self) { self.i += 1; } diff --git a/src/testdata/responses.rs b/src/testdata/responses.rs index bb3e6afa..d3cdd09f 100644 --- a/src/testdata/responses.rs +++ b/src/testdata/responses.rs @@ -1,2 +1,7 @@ pub const POSITION: &str = "61\03\0DU1234567\076792991\0TSLA\0STK\0\00.0\0\0\0NASDAQ\0USD\0TSLA\0NMS\0500\0196.77\0"; pub const MANAGED_ACCOUNT: &str = "15|1|DU1234567,DU7654321|"; + +pub const ACCOUNT_UPDATE_MULTI_CASH_BALANCE: &str = "73|1|9000|DU1234567||CashBalance|94629.71|USD||"; +pub const ACCOUNT_UPDATE_MULTI_CURRENCY: &str = "73|1|9000|DU1234567||Currency|USD|USD||"; +pub const ACCOUNT_UPDATE_MULTI_STOCK_MARKET_VALUE: &str = "73|1|9000|DU1234567||StockMarketValue|0.00|BASE||"; +pub const ACCOUNT_UPDATE_MULTI_END: &str = "74|1|9000||"; From fc59cf568f83271585588b9efaac5486dc99e56d Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Wed, 16 Oct 2024 23:21:12 -0700 Subject: [PATCH 12/12] prevent double cancel --- src/client.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index e21439b1..b9d6ffff 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; use std::marker::PhantomData; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -1047,6 +1047,7 @@ pub struct Subscription<'a, T: Subscribable> { pub(crate) message_type: Option, pub(crate) subscription: InternalSubscription, pub(crate) phantom: PhantomData, + cancelled: AtomicBool, } #[allow(private_bounds)] @@ -1060,6 +1061,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { message_type: None, subscription, phantom: PhantomData, + cancelled: AtomicBool::new(false), } } else if let Some(order_id) = subscription.order_id { Subscription { @@ -1069,6 +1071,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { message_type: None, subscription, phantom: PhantomData, + cancelled: AtomicBool::new(false), } } else if let Some(message_type) = subscription.message_type { Subscription { @@ -1078,6 +1081,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { message_type: Some(message_type), subscription, phantom: PhantomData, + cancelled: AtomicBool::new(false), } } else { panic!("unsupported internal subscription: {:?}", subscription) @@ -1181,6 +1185,12 @@ impl<'a, T: Subscribable> Subscription<'a, T> { /// Cancel the subscription pub fn cancel(&self) { + if self.cancelled.load(Ordering::Relaxed) { + return; + } + + self.cancelled.store(true, Ordering::Relaxed); + if let Some(request_id) = self.request_id { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { if let Err(e) = self.client.message_bus.cancel_subscription(request_id, &message) {