Skip to content

Commit

Permalink
implemented account updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Oct 16, 2024
1 parent 23e9cee commit 8527ee0
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 35 deletions.
20 changes: 20 additions & 0 deletions examples/account_updates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use ibapi::accounts::AccountUpdates;
use ibapi::Client;

fn main() {
env_logger::init();

let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");

let account = "DU1234567";

let subscription = client.account_updates(account).expect("error requesting account updates");
for update in &subscription {
println!("{update:?}");

// stop after full initial update
if let AccountUpdates::End = update {
subscription.cancel();
}
}
}
61 changes: 32 additions & 29 deletions src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,35 +276,42 @@ pub struct FamilyCode {
#[derive(Debug)]
pub enum AccountUpdates {
/// Receives the subscribed account's information.
Value(AccountValue),
AccountValue(AccountValue),
/// Receives the subscribed account's portfolio.
Portfolio(AccountPortfolio),
PortfolioValue(AccountPortfolioValue),
/// Receives the last time on which the account was updated.
Time(AccountTime),
UpdateTime(AccountUpdateTime),
/// Notifies when all the account’s information has finished.
End,
}

impl Subscribable<AccountUpdates> for AccountUpdates {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd];

fn decode(_server_version: i32, message: &mut ResponseMessage) -> Result<Self, Error> {
// match message.message_type() {
// IncomingMessages::PositionMulti => Ok(PositionUpdateMulti::Position(decoders::decode_position_multi(message)?)),
// IncomingMessages::PositionMultiEnd => Ok(PositionUpdateMulti::PositionEnd),
// message => Err(Error::Simple(format!("unexpected message: {message:?}"))),
// }
Err(Error::NotImplemented)
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[
IncomingMessages::AccountValue,
IncomingMessages::PortfolioValue,
IncomingMessages::AccountUpdateTime,
IncomingMessages::AccountDownloadEnd,
];
fn decode(server_version: i32, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::AccountValue => Ok(AccountUpdates::AccountValue(decoders::decode_account_value(message)?)),
IncomingMessages::PortfolioValue => Ok(AccountUpdates::PortfolioValue(decoders::decode_account_portfolio_value(
server_version,
message,
)?)),
IncomingMessages::AccountUpdateTime => Ok(AccountUpdates::UpdateTime(decoders::decode_account_update_time(message)?)),
IncomingMessages::AccountDownloadEnd => Ok(AccountUpdates::End),
message => Err(Error::Simple(format!("unexpected message: {message:?}"))),
}
}

fn cancel_message(_server_version: i32, request_id: Option<i32>) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel positions multi");
encoders::encode_cancel_positions_multi(request_id)
fn cancel_message(server_version: i32, _request_id: Option<i32>) -> Result<RequestMessage, Error> {
encoders::encode_cancel_account_updates(server_version)
}
}

/// A value of subscribed account's information.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct AccountValue {
/// The value being updated.
pub key: String,
Expand All @@ -313,12 +320,12 @@ pub struct AccountValue {
/// The currency inn which the value is expressed.
pub currency: String,
/// The account identifier.
pub account: String,
pub account: Option<String>,
}

/// Subscribed account's portfolio.
#[derive(Debug)]
pub struct AccountPortfolio {
#[derive(Debug, Default)]
pub struct AccountPortfolioValue {
/// The Contract for which a position is held.
pub contract: Contract,
/// The number of positions held.
Expand All @@ -334,12 +341,12 @@ pub struct AccountPortfolio {
/// Daily realized profit and loss on the position.
pub realized_pnl: f64,
/// Account identifier for the update.
pub account: String,
pub account: Option<String>,
}

/// Last time at which the account was updated.
#[derive(Debug)]
pub struct AccountTime {
#[derive(Debug, Default)]
pub struct AccountUpdateTime {
/// The last update system time.
pub timestamp: String,
}
Expand Down Expand Up @@ -435,14 +442,10 @@ pub fn account_summary<'a>(client: &'a Client, group: &str, tags: &[&str]) -> Re
}

pub fn account_updates<'a>(client: &'a Client, account: &str) -> Result<Subscription<'a, AccountUpdates>, Error> {
client.check_server_version(server_versions::ACCOUNT_SUMMARY, "It does not support account summary requests.")?;
let request = encoders::encode_request_account_updates(client.server_version(), account)?;
let subscription = client.send_shared_request(OutgoingMessages::RequestAccountData, request)?;

// let request_id = client.next_request_id();
// let request = encoders::encode_request_account_summary(request_id, group, tags)?;
// let subscription = client.send_request(request_id, request)?;

// Ok(Subscription::new(client, subscription))
Err(Error::NotImplemented)
Ok(Subscription::new(client, subscription))
}

pub fn managed_accounts(client: &Client) -> Result<Vec<String>, Error> {
Expand Down
81 changes: 79 additions & 2 deletions src/accounts/decoders.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::contracts::SecurityType;
use crate::contracts::{Contract, SecurityType};
use crate::messages::ResponseMessage;
use crate::{server_versions, Error};

use super::{AccountSummary, FamilyCode, PnL, PnLSingle, Position, PositionMulti};
use super::{AccountPortfolioValue, AccountSummary, AccountUpdateTime, AccountValue, FamilyCode, PnL, PnLSingle, Position, PositionMulti};

pub(crate) fn decode_position(message: &mut ResponseMessage) -> Result<Position, Error> {
message.skip(); // message type
Expand Down Expand Up @@ -144,5 +144,82 @@ pub(crate) fn decode_account_summary(_server_version: i32, message: &mut Respons
})
}

pub(crate) fn decode_account_value(message: &mut ResponseMessage) -> Result<AccountValue, Error> {
message.skip(); // message type

let message_version = message.next_int()?;

let mut account_value = AccountValue {
key: message.next_string()?,
value: message.next_string()?,
currency: message.next_string()?,
..Default::default()
};

if message_version >= 2 {
account_value.account = Some(message.next_string()?);
}

Ok(account_value)
}

pub(crate) fn decode_account_portfolio_value(server_version: i32, message: &mut ResponseMessage) -> Result<AccountPortfolioValue, Error> {
message.skip(); // message type

let message_version = message.next_int()?;

let mut contract = Contract::default();
if message_version >= 6 {
contract.contract_id = message.next_int()?;
}
contract.symbol = message.next_string()?;
contract.security_type = SecurityType::from(&message.next_string()?);
contract.last_trade_date_or_contract_month = message.next_string()?;
contract.strike = message.next_double()?;
contract.right = message.next_string()?;
if message_version >= 7 {
contract.multiplier = message.next_string()?;
contract.primary_exchange = message.next_string()?;
}
contract.currency = message.next_string()?;
if message_version >= 2 {
contract.local_symbol = message.next_string()?;
}
if message_version >= 8 {
contract.trading_class = message.next_string()?;
}

let mut portfolio_value = AccountPortfolioValue {
contract,
..Default::default()
};

portfolio_value.position = message.next_double()?;
portfolio_value.market_price = message.next_double()?;
portfolio_value.market_value = message.next_double()?;
if message_version >= 3 {
portfolio_value.average_cost = message.next_double()?;
portfolio_value.unrealized_pnl = message.next_double()?;
portfolio_value.realized_pnl = message.next_double()?;
}
if message_version >= 4 {
portfolio_value.account = Some(message.next_string()?);
}
if message_version == 6 && server_version == 39 {
portfolio_value.contract.primary_exchange = message.next_string()?
}

Ok(portfolio_value)
}

pub(crate) fn decode_account_update_time(message: &mut ResponseMessage) -> Result<AccountUpdateTime, Error> {
message.skip(); // message type
message.skip(); // version

Ok(AccountUpdateTime {
timestamp: message.next_string()?,
})
}

#[cfg(test)]
mod tests;
30 changes: 30 additions & 0 deletions src/accounts/encoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,36 @@ pub(crate) fn encode_request_managed_accounts() -> Result<RequestMessage, Error>
Ok(message)
}

pub(crate) fn encode_request_account_updates(server_version: i32, account: &str) -> Result<RequestMessage, Error> {
const VERSION: i32 = 2;

let mut message = RequestMessage::new();

message.push_field(&OutgoingMessages::RequestAccountData);
message.push_field(&VERSION);
message.push_field(&true); // subscribe
if server_version > 9 {
message.push_field(&account);
}

Ok(message)
}

pub(crate) fn encode_cancel_account_updates(server_version: i32) -> Result<RequestMessage, Error> {
const VERSION: i32 = 2;

let mut message = RequestMessage::new();

message.push_field(&OutgoingMessages::RequestAccountData);
message.push_field(&VERSION);
message.push_field(&false); // subscribe
if server_version > 9 {
message.push_field(&"");
}

Ok(message)
}

fn encode_simple(message_type: OutgoingMessages, version: i32) -> Result<RequestMessage, Error> {
let mut message = RequestMessage::new();

Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use log::{debug, error};
use log::{debug, error, info};
use time::OffsetDateTime;
use time_tz::Tz;

Expand Down Expand Up @@ -1064,7 +1064,7 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {
error!("{error_message}");
return None;
} else {
error!("subscription iterator unexpected message: {message:?}");
info!("subscription iterator unexpected message: {message:?}");
}
}
Some(Response::Cancelled) => {
Expand Down
4 changes: 2 additions & 2 deletions src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Index;
use std::str::{self, FromStr};

use log::error;
use log::debug;
use time::OffsetDateTime;

use crate::{Error, ToField};
Expand Down Expand Up @@ -218,7 +218,7 @@ pub fn request_id_index(kind: IncomingMessages) -> Option<usize> {
| IncomingMessages::AccountSummary
| IncomingMessages::AccountSummaryEnd => Some(2),
_ => {
error!("could not determine request id index for {kind:?}");
debug!("could not determine request id index for {kind:?}");
None
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ impl SharedChannels {
&[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd],
);
instance.register(OutgoingMessages::RequestManagedAccounts, &[IncomingMessages::ManagedAccounts]);
instance.register(
OutgoingMessages::RequestAccountData,
&[
IncomingMessages::AccountValue,
IncomingMessages::PortfolioValue,
IncomingMessages::AccountDownloadEnd,
IncomingMessages::AccountUpdateTime,
],
);

instance
}
Expand Down

0 comments on commit 8527ee0

Please sign in to comment.