diff --git a/examples/breakout.rs b/examples/breakout.rs index 09193616..ad329f5e 100644 --- a/examples/breakout.rs +++ b/examples/breakout.rs @@ -15,7 +15,7 @@ fn main() { let mut channel = BreakoutChannel::new(30); - for bar in bars { + for bar in bars.iter() { channel.add_bar(&bar); // Ensure enough bars and no open positions. diff --git a/examples/pnl.rs b/examples/pnl.rs index 74381dc8..423d6d7b 100644 --- a/examples/pnl.rs +++ b/examples/pnl.rs @@ -15,7 +15,7 @@ fn main() { let client = Client::connect(&gateway_url, 919).expect("connection failed"); - let mut subscription = client.pnl(&account, None).expect("pnl request failed"); + let subscription = client.pnl(&account, None).expect("pnl request failed"); // Get next item non-blocking if let Some(pnl) = subscription.try_next() { diff --git a/examples/pnl_single.rs b/examples/pnl_single.rs index 343def47..003dd33b 100644 --- a/examples/pnl_single.rs +++ b/examples/pnl_single.rs @@ -18,7 +18,7 @@ fn main() { let client = Client::connect(&gateway_url, 919).expect("connection failed"); - let mut subscription = client.pnl_single(&account, contract_id, None).expect("pnl single request failed"); + let subscription = client.pnl_single(&account, contract_id, None).expect("pnl single request failed"); // Get next item non-blocking if let Some(pnl) = subscription.try_next() { diff --git a/examples/positions.rs b/examples/positions.rs index 9b2eb2cc..341a4ce8 100644 --- a/examples/positions.rs +++ b/examples/positions.rs @@ -3,7 +3,7 @@ use ibapi::{accounts::PositionUpdate, Client}; fn main() { let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); - let mut positions = client.positions().expect("request failed"); + let positions = client.positions().expect("request failed"); while let Some(position_update) = positions.next() { match position_update { PositionUpdate::Position(position) => { diff --git a/examples/positions_multi.rs b/examples/positions_multi.rs index efc69110..7dbb96ad 100644 --- a/examples/positions_multi.rs +++ b/examples/positions_multi.rs @@ -8,7 +8,7 @@ pub fn main() { let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); let subscription = client.positions_multi(Some(&account), None).expect("error requesting positions by model"); - for position in subscription { + for position in subscription.iter() { println!("{position:?}") } } diff --git a/examples/readme_realtime_data_1.rs b/examples/readme_realtime_data_1.rs index 299e01c2..514d0ce9 100644 --- a/examples/readme_realtime_data_1.rs +++ b/examples/readme_realtime_data_1.rs @@ -8,7 +8,7 @@ fn main() { // Request real-time bars data for AAPL with 5-second intervals let contract = Contract::stock("AAPL"); - let mut subscription = client + let subscription = client .realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false) .expect("realtime bars request failed!"); diff --git a/examples/readme_realtime_data_2.rs b/examples/readme_realtime_data_2.rs index a3069cc7..8f6ee8b5 100644 --- a/examples/readme_realtime_data_2.rs +++ b/examples/readme_realtime_data_2.rs @@ -10,10 +10,10 @@ fn main() { let contract_aapl = Contract::stock("AAPL"); let contract_nvda = Contract::stock("NVDA"); - let mut subscription_aapl = client + let subscription_aapl = client .realtime_bars(&contract_aapl, BarSize::Sec5, WhatToShow::Trades, false) .expect("realtime bars request failed!"); - let mut subscription_nvda = client + let subscription_nvda = client .realtime_bars(&contract_nvda, BarSize::Sec5, WhatToShow::Trades, false) .expect("realtime bars request failed!"); diff --git a/examples/stream_bars.rs b/examples/stream_bars.rs index 5d72a97d..4baf59d3 100644 --- a/examples/stream_bars.rs +++ b/examples/stream_bars.rs @@ -34,7 +34,7 @@ fn main() -> anyhow::Result<()> { let bars = client.realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false)?; - for (i, bar) in bars.enumerate().take(60) { + for (i, bar) in bars.iter().enumerate().take(60) { println!("bar: {i:?} {bar:?}"); } diff --git a/examples/tick_by_tick.rs b/examples/tick_by_tick.rs index 97f775dc..44521e65 100644 --- a/examples/tick_by_tick.rs +++ b/examples/tick_by_tick.rs @@ -102,7 +102,7 @@ fn stream_mid_point(client: &mut Client, _symbol: &str) -> anyhow::Result<()> { let contract = contract_es(); let ticks = client.tick_by_tick_midpoint(&contract, 0, false)?; - for (i, tick) in ticks.enumerate().take(60) { + for (i, tick) in ticks.iter().enumerate().take(60) { println!("tick: {i:?} {tick:?}"); } diff --git a/src/accounts.rs b/src/accounts.rs index 21115528..92b137bc 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -108,7 +108,7 @@ impl Subscribable for PositionUpdate { } fn cancel_message(_server_version: i32, _request_id: Option) -> Result { - Ok(encoders::encode_cancel_positions()?) + encoders::encode_cancel_positions() } } @@ -129,15 +129,15 @@ impl From for PositionUpdateMulti { #[derive(Debug, Clone, Default)] pub struct PositionMulti { /// The account holding the position. - account: String, + pub account: String, /// The model code holding the position. - model_code: String, + pub model_code: String, /// The position's Contract - contract: Contract, + pub contract: Contract, /// The number of positions held. - position: f64, + pub position: f64, /// The average cost of the position. - average_cost: f64, + pub average_cost: f64, } impl Subscribable for PositionUpdateMulti { @@ -153,7 +153,7 @@ impl Subscribable for PositionUpdateMulti { fn cancel_message(_server_version: i32, request_id: Option) -> Result { let request_id = request_id.expect("Request ID required to encode cancel positions multi"); - Ok(encoders::encode_cancel_positions_multi(request_id)?) + encoders::encode_cancel_positions_multi(request_id) } } @@ -207,11 +207,10 @@ pub(crate) fn positions_multi<'a>( pub(crate) fn family_codes(client: &Client) -> Result, Error> { client.check_server_version(server_versions::REQ_FAMILY_CODES, "It does not support family codes requests.")?; - let message = encoders::encode_request_family_codes()?; - - let mut messages = client.send_shared_request(OutgoingMessages::RequestFamilyCodes, message)?; + let request = encoders::encode_request_family_codes()?; + let subscription = client.send_shared_request(OutgoingMessages::RequestFamilyCodes, request)?; - if let Some(mut message) = messages.next() { + if let Some(mut message) = subscription.next() { decoders::decode_family_codes(&mut message) } else { Ok(Vec::default()) diff --git a/src/client.rs b/src/client.rs index 920ffd8c..2458013a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,7 @@ use time::macros::format_description; use time::OffsetDateTime; use time_tz::{timezones, OffsetResult, PrimitiveDateTimeExt, Tz}; -use crate::accounts::{FamilyCode, PnL, PnLSingle, PositionMulti, PositionUpdate, PositionUpdateMulti}; +use crate::accounts::{FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; use crate::contracts::Contract; use crate::errors::Error; use crate::market_data::historical; @@ -222,7 +222,7 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// let subscription = client.positions().expect("error requesting positions"); - /// for position_response in subscription { + /// for position_response in subscription.iter() { /// match position_response { /// PositionUpdate::Position(position) => println!("{position:?}"), /// PositionUpdate::PositionEnd => println!("initial set of positions received"), @@ -249,7 +249,7 @@ impl Client { /// /// let account = "U1234567"; /// let subscription = client.positions_multi(Some(account), None).expect("error requesting positions by model"); - /// for position in subscription { + /// for position in subscription.iter() { /// println!("{position:?}") /// } /// ``` @@ -270,8 +270,8 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// let account = "account id"; - /// let responses = client.pnl(account, None).expect("error requesting pnl"); - /// for pnl in responses { + /// let subscription = client.pnl(account, None).expect("error requesting pnl"); + /// for pnl in subscription.iter() { /// println!("{pnl:?}") /// } /// ``` @@ -296,8 +296,8 @@ impl Client { /// let account = ""; /// let contract_id = 1001; /// - /// let responses = client.pnl_single(account, contract_id, None).expect("error requesting pnl"); - /// for pnl in responses { + /// let subscription = client.pnl_single(account, contract_id, None).expect("error requesting pnl"); + /// for pnl in &subscription { /// println!("{pnl:?}") /// } /// ``` @@ -895,9 +895,9 @@ impl Client { /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// /// let contract = Contract::stock("TSLA"); - /// let bars = client.realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false).expect("request failed"); + /// let subscription = client.realtime_bars(&contract, BarSize::Sec5, WhatToShow::Trades, false).expect("request failed"); /// - /// for (i, bar) in bars.enumerate().take(60) { + /// for (i, bar) in subscription.iter().enumerate().take(60) { /// println!("bar[{i}]: {bar:?}"); /// } /// ``` @@ -1031,7 +1031,11 @@ impl Debug for Client { } } +/// Server sends data until not required /// Supports the handling of responses from TWS. +/// Cancelled with dropped if not already cancelled. +/// +#[allow(private_bounds)] pub struct Subscription<'a, T: Subscribable> { pub(crate) client: &'a Client, pub(crate) request_id: Option, @@ -1039,10 +1043,35 @@ pub struct Subscription<'a, T: Subscribable> { pub(crate) phantom: PhantomData, } +#[allow(private_bounds)] impl<'a, T: Subscribable> Subscription<'a, T> { + /// Blocks until the item become available. + pub fn next(&self) -> Option { + loop { + if let Some(mut message) = self.responses.next() { + if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) { + match T::decode(self.client.server_version(), &mut message) { + Ok(val) => return Some(val), + Err(err) => { + error!("error decoding execution data: {err}"); + } + } + } else if message.message_type() == IncomingMessages::Error { + let error_message = message.peek_string(4); + error!("{error_message}"); + return None; + } else { + error!("subscription iterator unexpected message: {message:?}"); + } + } else { + return None; + } + } + } + /// To request the next bar in a non-blocking manner. /// - /// ``` + /// ```text /// //loop { /// // Check if the next bar is available without waiting /// //if let Some(bar) = subscription.try_next() { @@ -1051,18 +1080,20 @@ impl<'a, T: Subscribable> Subscription<'a, T> { /// // Perform other work before checking for the next bar /// //} /// ``` - pub fn try_next(&mut self) -> Option { + pub fn try_next(&self) -> Option { if let Some(mut message) = self.responses.try_next() { - if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) { - match T::decode(self.client.server_version(), &mut message) { - Ok(val) => return Some(val), - Err(err) => { - error!("error decoding execution data: {err}"); - return None; - } + if message.message_type() == IncomingMessages::Error { + error!("{}", message.peek_string(4)); + return None; + } + + match T::decode(self.client.server_version(), &mut message) { + Ok(val) => Some(val), + Err(err) => { + error!("error decoding message: {err}"); + None } } - None } else { None } @@ -1070,7 +1101,7 @@ impl<'a, T: Subscribable> Subscription<'a, T> { /// To request the next bar in a non-blocking manner. /// - /// ``` + /// ```text /// //loop { /// // Check if the next bar is available without waiting /// // if let Some(bar) = subscription.next_timeout() { @@ -1079,30 +1110,44 @@ impl<'a, T: Subscribable> Subscription<'a, T> { /// // Perform other work before checking for the next bar /// //} /// ``` - pub fn next_timeout(&mut self, timeout: Duration) -> Option { + pub fn next_timeout(&self, timeout: Duration) -> Option { if let Some(mut message) = self.responses.next_timeout(timeout) { - if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) { - match T::decode(self.client.server_version(), &mut message) { - Ok(val) => return Some(val), - Err(err) => { - error!("error decoding execution data: {err}"); - return None; - } + if message.message_type() == IncomingMessages::Error { + error!("{}", message.peek_string(4)); + return None; + } + + match T::decode(self.client.server_version(), &mut message) { + Ok(val) => Some(val), + Err(err) => { + error!("error decoding message: {err}"); + None } } - None } else { None } } /// Cancel the subscription - pub fn cancel(&mut self) -> Result<(), Error> { + pub fn cancel(&self) -> Result<(), Error> { if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id) { self.client.send_message(message)?; } Ok(()) } + + pub fn iter(&self) -> SubscriptionIter { + SubscriptionIter { subscription: self } + } + + pub fn try_iter(&self) -> SubscriptionTryIter { + SubscriptionTryIter { subscription: self } + } + + pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter { + SubscriptionTimeoutIter { subscription: self, timeout } + } } impl<'a, T: Subscribable> Drop for Subscription<'a, T> { @@ -1123,34 +1168,59 @@ pub(crate) trait Subscribable { } } -impl<'a, T: Subscribable> Iterator for Subscription<'a, T> { +/// Blocking iterator. Blocks until next item available. +#[allow(private_bounds)] +pub struct SubscriptionIter<'a, T: Subscribable> { + subscription: &'a Subscription<'a, T>, +} + +impl<'a, T: Subscribable> Iterator for SubscriptionIter<'a, T> { type Item = T; - // Returns the next [Position]. Waits up to x seconds for next [OrderDataResult]. fn next(&mut self) -> Option { - loop { - if let Some(mut message) = self.responses.next() { - if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) { - match T::decode(self.client.server_version(), &mut message) { - Ok(val) => return Some(val), - Err(err) => { - error!("error decoding execution data: {err}"); - } - } - } else if message.message_type() == IncomingMessages::Error { - let error_message = message.peek_string(4); - error!("{error_message}"); - return None; - } else { - error!("subscription iterator unexpected message: {message:?}"); - } - } else { - return None; - } - } + self.subscription.next() + } +} + +impl<'a, T: Subscribable> IntoIterator for &'a Subscription<'a, T> { + type Item = T; + type IntoIter = SubscriptionIter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +/// Non-Blocking iterator. Returns immediately if not available. +#[allow(private_bounds)] +pub struct SubscriptionTryIter<'a, T: Subscribable> { + subscription: &'a Subscription<'a, T>, +} + +impl<'a, T: Subscribable> Iterator for SubscriptionTryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.subscription.try_next() + } +} + +/// Blocks and waits for timeout +#[allow(private_bounds)] +pub struct SubscriptionTimeoutIter<'a, T: Subscribable> { + subscription: &'a Subscription<'a, T>, + timeout: Duration, +} + +impl<'a, T: Subscribable> Iterator for SubscriptionTimeoutIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.subscription.next_timeout(self.timeout) } } +/// Marker trait for shared channels pub trait SharesChannel {} // Parses following format: 20230405 22:20:39 PST diff --git a/src/contracts.rs b/src/contracts.rs index deb322ae..c46cfae8 100644 --- a/src/contracts.rs +++ b/src/contracts.rs @@ -403,7 +403,7 @@ pub(crate) fn contract_details(client: &Client, contract: &Contract) -> Result = Vec::default(); // TODO create iterator - for mut message in responses { + while let Some(mut message) = responses.next() { match message.message_type() { IncomingMessages::ContractData => { let decoded = decoders::contract_details(client.server_version(), &mut message)?; @@ -474,10 +474,9 @@ pub(crate) fn matching_symbols(client: &Client, pattern: &str) -> Result { return decoders::contract_descriptions(client.server_version(), &mut message); @@ -517,10 +516,9 @@ pub(crate) fn market_rule(client: &Client, market_rule_id: i32) -> Result Ok(decoders::market_rule(&mut message)?), None => Err(Error::Simple("no market rule found".into())), } diff --git a/src/lib.rs b/src/lib.rs index aa2fad80..37783cab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ //! //! let mut channel = BreakoutChannel::new(30); //! -//! for bar in bars { +//! for bar in bars.iter() { //! channel.add_bar(&bar); //! //! // Ensure enough bars and no open positions. @@ -121,6 +121,8 @@ pub(crate) mod news; pub mod orders; mod server_versions; + +#[cfg(test)] pub(crate) mod stubs; #[doc(inline)] diff --git a/src/market_data/historical.rs b/src/market_data/historical.rs index e12ea22f..6bfe88f3 100644 --- a/src/market_data/historical.rs +++ b/src/market_data/historical.rs @@ -302,9 +302,9 @@ pub(crate) fn head_timestamp(client: &Client, contract: &Contract, what_to_show: let request_id = client.next_request_id(); let request = encoders::encode_request_head_timestamp(request_id, contract, what_to_show, use_rth)?; - let mut messages = client.send_request(request_id, request)?; + let subscription = client.send_request(request_id, request)?; - if let Some(mut message) = messages.next() { + if let Some(mut message) = subscription.next() { decoders::decode_head_timestamp(&mut message) } else { Err(Error::Simple("did not receive head timestamp message".into())) @@ -357,9 +357,9 @@ pub(crate) fn historical_data( Vec::::default(), )?; - let mut messages = client.send_request(request_id, request)?; + let subscription = client.send_request(request_id, request)?; - if let Some(mut message) = messages.next() { + if let Some(mut message) = subscription.next() { let time_zone = if let Some(tz) = client.time_zone { tz } else { @@ -408,9 +408,9 @@ pub(crate) fn historical_schedule( Vec::::default(), )?; - let mut messages = client.send_request(request_id, request)?; + let subscription = client.send_request(request_id, request)?; - if let Some(mut message) = messages.next() { + if let Some(mut message) = subscription.next() { match message.message_type() { IncomingMessages::HistoricalSchedule => decoders::decode_historical_schedule(&mut message), IncomingMessages::Error => Err(Error::Simple(message.peek_string(4))), diff --git a/src/messages.rs b/src/messages.rs index 1fb0aff8..8e2a8063 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -522,10 +522,6 @@ impl ResponseMessage { self.i += 1; } - pub fn reset(&mut self) { - self.i = 0; - } - pub fn encode(&self) -> String { let mut data = self.fields.join("\0"); data.push('\0'); diff --git a/src/orders.rs b/src/orders.rs index d6a5eafc..cbc41956 100644 --- a/src/orders.rs +++ b/src/orders.rs @@ -1371,9 +1371,9 @@ pub(crate) fn global_cancel(client: &Client) -> Result<(), Error> { pub(crate) fn next_valid_order_id(client: &Client) -> Result { let message = encoders::encode_next_valid_order_id()?; - let mut messages = client.send_shared_request(OutgoingMessages::RequestIds, message)?; + let subscription = client.send_shared_request(OutgoingMessages::RequestIds, message)?; - if let Some(message) = messages.next() { + if let Some(message) = subscription.next() { let order_id_index = 2; let next_order_id = message.peek_int(order_id_index)?; diff --git a/src/transport.rs b/src/transport.rs index 3ccaf9bb..4180b607 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use std::io::{prelude::*, Cursor}; -use std::iter::Iterator; use std::net::TcpStream; use std::sync::Mutex; use std::sync::{Arc, RwLock}; @@ -44,7 +43,8 @@ pub(crate) trait MessageBus: Send + Sync { // Starts a dedicated thread to process responses from TWS. fn process_messages(&mut self, server_version: i32) -> Result<(), Error>; - // Testing interface. Tracks requests sent when Bus is stubbed. + // Testing interface. Tracks requests sent messages when Bus is stubbed. + #[cfg(test)] fn request_messages(&self) -> Vec { vec![] } @@ -554,67 +554,76 @@ impl SenderHash>, // for client to receive incoming messages - shared_receiver: Option>>, - signaler: Option>, // for client to signal termination - request_id: Option, // initiating request_id - order_id: Option, // initiating order_id - timeout: Option, // How long to wait for next message + receiver: Option>, // requests with request ids receive responses via this channel + shared_receiver: Option>>, // this channel is for responses that share channel based on message type + signaler: Option>, // for client to signal termination + request_id: Option, // initiating request_id + order_id: Option, // initiating order_id } impl InternalSubscription { - pub(crate) fn new( - messages: Receiver, - signals: Sender, - request_id: Option, - order_id: Option, - timeout: Option, - ) -> Self { - InternalSubscription { - receiver: Some(messages), - shared_receiver: None, - signaler: Some(signals), - request_id, - order_id, - timeout, + // Blocks until next message become available. + pub(crate) fn next(&self) -> Option { + if let Some(receiver) = &self.receiver { + Self::receive(receiver) + } else if let Some(receiver) = &self.shared_receiver { + Self::receive(receiver) + } else { + None } } - pub(crate) fn try_next(&mut self) -> Option { + // Returns message if available or immediately returns None. + pub(crate) fn try_next(&self) -> Option { if let Some(receiver) = &self.receiver { - match receiver.try_recv() { - Ok(message) => Some(message), - Err(err) => { - debug!("try_next: {err}"); - None - } - } + Self::try_receive(receiver) } else if let Some(receiver) = &self.shared_receiver { - match receiver.try_recv() { - Ok(message) => Some(message), - Err(err) => { - debug!("try_next: {err}"); - None - } - } + Self::try_receive(receiver) } else { None } } - pub(crate) fn next_timeout(&mut self, timeout: Duration) -> Option { + // Waits for next message until specified timeout. + pub(crate) fn next_timeout(&self, timeout: Duration) -> Option { if let Some(receiver) = &self.receiver { - match receiver.recv_timeout(timeout) { - Ok(message) => Some(message), - Err(err) => { - info!("timeout receiving message: {err}"); - None - } - } + Self::timeout_receive(receiver, timeout) + } else if let Some(receiver) = &self.shared_receiver { + Self::timeout_receive(receiver, timeout) } else { None } } + + fn receive(receiver: &Receiver) -> Option { + match receiver.recv() { + Ok(message) => Some(message), + Err(err) => { + error!("error receiving message: {err}"); + None + } + } + } + + fn try_receive(receiver: &Receiver) -> Option { + match receiver.try_recv() { + Ok(message) => Some(message), + Err(err) => { + error!("error receiving message: {err}"); + None + } + } + } + + fn timeout_receive(receiver: &Receiver, timeout: Duration) -> Option { + match receiver.recv_timeout(timeout) { + Ok(message) => Some(message), + Err(err) => { + error!("error receiving message: {err}"); + None + } + } + } } impl Drop for InternalSubscription { @@ -629,39 +638,6 @@ impl Drop for InternalSubscription { } } -impl Iterator for InternalSubscription { - type Item = ResponseMessage; - fn next(&mut self) -> Option { - if let (Some(timeout), Some(receiver)) = (self.timeout, &self.receiver) { - match receiver.recv_timeout(timeout) { - Ok(message) => Some(message), - Err(err) => { - info!("timeout receiving message: {err}"); - None - } - } - } else if let Some(receiver) = &self.receiver { - match receiver.recv() { - Ok(message) => Some(message), - Err(err) => { - error!("error receiving message: {err}"); - None - } - } - } else if let Some(receiver) = &self.shared_receiver { - match receiver.recv() { - Ok(message) => Some(message), - Err(err) => { - error!("error receiving message: {err}"); - None - } - } - } else { - None - } - } -} - pub(crate) struct SubscriptionBuilder { receiver: Option>, shared_receiver: Option>>, @@ -708,26 +684,24 @@ impl SubscriptionBuilder { pub(crate) fn build(self) -> InternalSubscription { if let (Some(receiver), Some(signaler)) = (self.receiver, self.signaler) { - return InternalSubscription { + InternalSubscription { receiver: Some(receiver), shared_receiver: None, signaler: Some(signaler), request_id: self.request_id, order_id: self.order_id, - timeout: None, - }; + } } else if let Some(receiver) = self.shared_receiver { - return InternalSubscription { + InternalSubscription { receiver: None, shared_receiver: Some(receiver), signaler: None, request_id: self.request_id, order_id: self.order_id, - timeout: None, - }; + } + } else { + panic!("bad configuration"); } - - panic!("bad configuration"); } }