diff --git a/src/client.rs b/src/client.rs index 12d7a7ae..c88aa32e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -130,7 +130,7 @@ impl Client { accounts::server_time(self) } - /// Subscribes to [PositionUpdate](accounts::PositionUpdate)s for all accessible accounts. + /// Subscribes to [PositionUpdate]s for all accessible accounts. /// All positions sent initially, and then only updates as positions change. /// /// # Examples @@ -152,7 +152,7 @@ impl Client { accounts::positions(self) } - /// Subscribes to [PositionUpdateMulti](accounts::PositionUpdateMulti) updates for account and/or model. + /// Subscribes to [PositionUpdateMulti] updates for account and/or model. /// Initially all positions are returned, and then updates are returned for any position changes in real time. /// /// # Arguments @@ -1015,7 +1015,7 @@ impl Client { /// Requests data histogram of specified contract. /// /// # Arguments - /// * `contract` - [Contract] to retrieve [HistogramData](historical::HistogramData) for. + /// * `contract` - [Contract] to retrieve [Histogram Entries](historical::HistogramEntry) for. /// * `use_rth` - Data from regular trading hours (true), or all available hours (false). /// * `duration` - Duration of interval to retrieve. /// @@ -1194,6 +1194,13 @@ impl Client { /// # Examples /// /// ```no_run + /// use ibapi::Client; + /// + /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + /// let exchanges = client.market_depth_exchanges().expect("error requesting market depth exchanges"); + /// for exchange in &exchanges { + /// println!("{exchange:?}"); + /// } /// ``` pub fn market_depth_exchanges(&self) -> Result, Error> { realtime::market_depth_exchanges(self) @@ -1228,6 +1235,35 @@ impl Client { /// # Examples /// /// ```no_run + /// use ibapi::{contracts::Contract, market_data::realtime::TickTypes, Client}; + /// + /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + /// + /// let contract = Contract::stock("AAPL"); + /// + /// // https://www.interactivebrokers.com/campus/ibkr-api-page/twsapi-doc/#available-tick-types + /// let generic_ticks = &["233", "293"]; + /// let snapshot = false; + /// let regulatory_snapshot = false; + /// + /// let subscription = client + /// .market_data(&contract, generic_ticks, snapshot, regulatory_snapshot) + /// .expect("error requesting market data"); + /// + /// for tick in &subscription { + /// match tick { + /// TickTypes::Price(tick_price) => println!("{:?}", tick_price), + /// TickTypes::Size(tick_size) => println!("{:?}", tick_size), + /// TickTypes::PriceSize(tick_price_size) => println!("{:?}", tick_price_size), + /// TickTypes::Generic(tick_generic) => println!("{:?}", tick_generic), + /// TickTypes::String(tick_string) => println!("{:?}", tick_string), + /// TickTypes::EFP(tick_efp) => println!("{:?}", tick_efp), + /// TickTypes::OptionComputation(option_computation) => println!("{:?}", option_computation), + /// TickTypes::RequestParameters(tick_request_parameters) => println!("{:?}", tick_request_parameters), + /// TickTypes::SnapshotEnd => subscription.cancel(), + /// TickTypes::Notice(notice) => println!("{:?}", notice), + /// } + /// } /// ``` pub fn market_data( &self, @@ -1595,7 +1631,7 @@ impl Debug for Client { /// // Process each bar here (e.g., print or use in calculations) /// println!("Received bar: {bar:?}"); /// } -/// // The subscription is automatically cancelled when it goes out of scope +/// // The subscription goes out of scope and is automatically cancelled. /// ``` /// /// Subscriptions can be explicitly canceled using the [cancel](Subscription::cancel) method. @@ -1667,7 +1703,42 @@ impl<'a, T: DataStream> Subscription<'a, T> { } } - /// Blocks until the item become available. + /// Polls the subscription for the next item and blocks until the next item is available. + /// + /// This method will wait indefinitely until either: + /// - A new item becomes available + /// - The subscription encounters an error + /// - The subscription is canceled + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// // Process bars blocking until the next bar is available + /// while let Some(bar) = subscription.next() { + /// println!("Received bar: {bar:?}"); + /// } + /// + /// // When the loop exits, check if it was due to an error + /// if let Some(err) = subscription.error() { + /// eprintln!("subscription error: {err}"); + /// } + /// ``` + /// # Returns + /// * `Some(T)` - The next available item from the subscription + /// * `None` - If the subscription has ended or encountered an error pub fn next(&self) -> Option { self.clear_error(); @@ -1708,32 +1779,108 @@ impl<'a, T: DataStream> Subscription<'a, T> { } } - /// To request the next bar in a non-blocking manner. + /// Polls the subscription for the next item, returns immediately if no data is available. + /// + /// Unlike [next](Subscription::next) which blocks waiting for data, this method provides + /// non-blocking access to the subscription data. It returns immediately with: + /// - The next item if one is available + /// - None if no data is currently available + /// - None if an error occurred + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// loop { + /// // Process all currently available data + /// while let Some(bar) = subscription.try_next() { + /// println!("Received bar: {bar:?}"); + /// } + /// + /// // Check if we stopped due to an error + /// if let Some(err) = subscription.error() { + /// eprintln!("subscription error: {err}"); + /// break; + /// } + /// + /// // No data currently available, perform other work + /// // The subscription remains active and can be checked again + /// } /// - /// ```text - /// //loop { - /// // Check if the next bar is available without waiting - /// //if let Some(bar) = subscription.try_next() { - /// // Process the available bar (e.g., use it in calculations) - /// //} - /// // Perform other work before checking for the next bar - /// //} /// ``` + /// # Returns + /// * `Some(T)` - The next available item from the subscription + /// * `None` - If no data is immediately available or if an error occurred pub fn try_next(&self) -> Option { self.process_response(self.subscription.try_next()) } - /// To request the next bar in a non-blocking manner. + /// Polls the subscription for the next item, waiting up to the specified timeout duration. + /// + /// This method provides a middle ground between [try_next](Subscription::try_next) and + /// [next](Subscription::next): + /// - Unlike try_next, it will wait for data to arrive + /// - Unlike next, it will only wait for a specified duration + /// - Returns None if no data arrives within the timeout period + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// use std::time::Duration; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// loop { + /// // Wait up to 1 second for each new piece of data + /// while let Some(bar) = subscription.next_timeout(Duration::from_secs(1)) { + /// println!("Received bar: {bar:?}"); + /// } + /// + /// // Check if we stopped due to an error + /// if let Some(err) = subscription.error() { + /// eprintln!("subscription error: {err}"); + /// break; + /// } + /// + /// // Either timeout occurred or no more data available + /// // Perform other work before checking again + /// } /// - /// ```text - /// //loop { - /// // Check if the next bar is available without waiting - /// // if let Some(bar) = subscription.next_timeout() { - /// // Process the available bar (e.g., use it in calculations) - /// // } - /// // Perform other work before checking for the next bar - /// //} /// ``` + /// + /// # Arguments + /// * `timeout` - Maximum duration to wait for the next item + /// + /// # Returns + /// * `Some(T)` - The next available item from the subscription + /// * `None` - If no data arrives within the timeout period or if an error occurred + /// + /// # See also + /// - [Subscription::next] - For blocking access without timeout + /// - [Subscription::try_next] - For immediate non-blocking access + /// - [Subscription::error] - For checking error status pub fn next_timeout(&self, timeout: Duration) -> Option { self.process_response(self.subscription.next_timeout(timeout)) } @@ -1772,18 +1919,137 @@ impl<'a, T: DataStream> Subscription<'a, T> { } } + /// Creates an iterator from the [Subscription] that blocks until the next item is available. + /// + /// The iterator does not consume the [Subscription], allowing you to explicitly cancel the subscription at any time using the [cancel](Subscription::cancel) method. + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// // Create an iterator that blocks until the next item is available. + /// for bar in subscription.iter() { + /// // Process each bar here (e.g., print or use in calculations) + /// println!("Received bar: {bar:?}"); + /// } + /// // The subscription is still in scope and can be explicitly canceled. + /// ``` + /// # Returns + /// A [SubscriptionIter] that yields items as they become available, blocking if necessary. pub fn iter(&self) -> SubscriptionIter { SubscriptionIter { subscription: self } } + /// Creates an iterator from the [Subscription] that returns the next bar if available without waiting. + /// + /// The iterator does not consume the [Subscription], allowing you to explicitly cancel the subscription at any time using the [cancel](Subscription::cancel) method. + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// use std::thread; + /// use std::time::Duration; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// // Process data in a non-blocking way. + /// loop { + /// // Create an iterator that returns the next bar without waiting. + /// for bar in subscription.try_iter() { + /// // Process all available data here. + /// } + /// + /// // Perform other work between checking for data. + /// // The subscription remains active and can be cancelled when needed. + /// + /// // Optional: Add a small delay to prevent excessive CPU usage + /// thread::sleep(Duration::from_secs(1)); + /// } + /// ``` + /// # Returns + /// A [SubscriptionTryIter] that yields items if they are available, without waiting. pub fn try_iter(&self) -> SubscriptionTryIter { SubscriptionTryIter { subscription: self } } + /// Creates an iterator from the [Subscription] that waits until specified timeout for available data. + /// + /// Similar to [try_iter](Subscription::try_iter), this iterator does not consume the [Subscription], + /// allowing you to explicitly cancel the subscription at any time using the [cancel](Subscription::cancel) method. + /// However, unlike try_iter which returns immediately, this iterator will wait up to the specified timeout + /// duration before yielding data or returning. + /// + /// # Example + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::market_data::realtime::{BarSize, WhatToShow}; + /// use ibapi::Client; + /// use std::thread; + /// use std::time::Duration; + /// + /// let connection_url = "127.0.0.1:4002"; + /// let client = Client::connect(connection_url, 100).expect("connection to TWS failed!"); + /// + /// // Request real-time bars data for AAPL with 5-second intervals + /// let contract = Contract::stock("AAPL"); + /// let subscription = client + /// .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) + /// .expect("realtime bars request failed!"); + /// + /// // Process data with a 1-second timeout between checks + /// loop { + /// // Iterator will wait up to 1 second for new data before continuing + /// for bar in subscription.timeout_iter(Duration::from_secs(1)) { + /// // Process all available data here. + /// } + /// + /// // If no data arrives within timeout, loop continues here + /// // Perform other work between checking for data. + /// // The subscription remains active and can be cancelled when needed. + /// } + /// ``` + /// + /// # Arguments + /// * `timeout` - Maximum duration to wait for data before continuing iteration + /// + /// # Returns + /// A [SubscriptionTimeoutIter] that waits for the specified timeout duration for available data. pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter { SubscriptionTimeoutIter { subscription: self, timeout } } + /// Returns any error that caused the [Subscription] to stop receiving data. + /// + /// A [Subscription] may stop yielding items either because there is no more data available + /// or because it encountered an error condition (e.g., network disconnection). + /// This method allows checking if an error occurred and retrieving the error details. + /// + /// # Returns + /// * `Some(Error)` - If an error occurred that stopped the subscription + /// * `None` - If no error has occurred (subscription may still be active or completed normally) pub fn error(&self) -> Option { let error = self.error.lock().unwrap(); error.clone() @@ -1801,6 +2067,14 @@ impl<'a, T: DataStream> Drop for Subscription<'a, T> { } } +/// Internal trait for types that can be streamed from TWS/Gateway responses. +/// +/// Implementors must provide: +/// - A decode method to convert response messages into the target type +/// - Optionally, a cancel message generator for cleaning up subscriptions +/// +/// This trait is used internally by the Subscription system to handle +/// different types of streaming data from IB. pub(crate) trait DataStream { const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[]; @@ -1810,7 +2084,7 @@ pub(crate) trait DataStream { } } -/// Blocking iterator. Blocks until next item available. +/// An iterator that yields items as they become available, blocking if necessary. #[allow(private_bounds)] pub struct SubscriptionIter<'a, T: DataStream> { subscription: &'a Subscription<'a, T>, @@ -1854,7 +2128,7 @@ impl<'a, T: DataStream + 'a> IntoIterator for Subscription<'a, T> { } } -/// Non-Blocking iterator. Returns immediately if not available. +/// An iterator that yields items if they are available, without waiting. #[allow(private_bounds)] pub struct SubscriptionTryIter<'a, T: DataStream> { subscription: &'a Subscription<'a, T>, @@ -1868,7 +2142,7 @@ impl<'a, T: DataStream> Iterator for SubscriptionTryIter<'a, T> { } } -/// Blocks and waits for timeout +/// An iterator that waits for the specified timeout duration for available data. #[allow(private_bounds)] pub struct SubscriptionTimeoutIter<'a, T: DataStream> { subscription: &'a Subscription<'a, T>,