From 0a3d31ecf721afcaf971216f0c223ca65f722312 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Sat, 9 Nov 2024 15:38:44 -0800 Subject: [PATCH] implement into iterator --- src/market_data/historical.rs | 108 ++++++++++++++++------------------ 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/src/market_data/historical.rs b/src/market_data/historical.rs index bf4ac277..d0b23579 100644 --- a/src/market_data/historical.rs +++ b/src/market_data/historical.rs @@ -3,7 +3,7 @@ use std::fmt::{Debug, Display}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; -use log::{debug, error, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use time::{Date, OffsetDateTime}; @@ -500,35 +500,32 @@ pub(crate) fn histogram_data(client: &Client, contract: &Contract, use_rth: bool } pub trait TickDecoder { + const MESSAGE_TYPE: IncomingMessages; fn decode(message: &mut ResponseMessage) -> Result<(Vec, bool), Error>; - fn message_type() -> IncomingMessages; } impl TickDecoder for TickBidAsk { + const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTickBidAsk; + fn decode(message: &mut ResponseMessage) -> Result<(Vec, bool), Error> { decoders::decode_historical_ticks_bid_ask(message) } - fn message_type() -> IncomingMessages { - IncomingMessages::HistoricalTickBidAsk - } } impl TickDecoder for TickLast { + const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTickLast; + fn decode(message: &mut ResponseMessage) -> Result<(Vec, bool), Error> { decoders::decode_historical_ticks_last(message) } - fn message_type() -> IncomingMessages { - IncomingMessages::HistoricalTickLast - } } impl TickDecoder for TickMidpoint { + const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTick; + fn decode(message: &mut ResponseMessage) -> Result<(Vec, bool), Error> { decoders::decode_historical_ticks_mid_point(message) } - fn message_type() -> IncomingMessages { - IncomingMessages::HistoricalTick - } } pub struct TickSubscription> { @@ -548,6 +545,21 @@ impl> TickSubscription { } } + pub fn iter(&self) -> TickSubscriptionIter { + TickSubscriptionIter { subscription: self } + } + + pub fn try_iter(&self) -> TickSubscriptionTryIter { + TickSubscriptionTryIter { subscription: self } + } + + pub fn timeout_iter(&self, duration: std::time::Duration) -> TickSubscriptionTimeoutIter { + TickSubscriptionTimeoutIter { + subscription: self, + timeout: duration, + } + } + pub fn next(&self) -> Option { self.clear_error(); @@ -560,7 +572,7 @@ impl> TickSubscription { } match self.messages.next() { - Some(Ok(message)) if message.message_type() == T::message_type() => { + Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => { self.fill_buffer(message); self.next() } @@ -588,7 +600,7 @@ impl> TickSubscription { } match self.messages.try_next() { - Some(Ok(message)) if message.message_type() == T::message_type() => { + Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => { self.fill_buffer(message); self.try_next() } @@ -616,7 +628,7 @@ impl> TickSubscription { } match self.messages.next_timeout(duration) { - Some(Ok(message)) if message.message_type() == T::message_type() => { + Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => { self.fill_buffer(message); self.next_timeout(duration) } @@ -657,59 +669,34 @@ impl> TickSubscription { } } -impl + Debug> Iterator for TickSubscription { +/// An iterator that yields items as they become available, blocking if necessary. +pub struct TickSubscriptionIter<'a, T: TickDecoder> { + subscription: &'a TickSubscription, +} + +impl<'a, T: TickDecoder> Iterator for TickSubscriptionIter<'a, T> { type Item = T; fn next(&mut self) -> Option { - { - let mut buffer = self.buffer.lock().unwrap(); - if !buffer.is_empty() { - return buffer.pop_front(); - } - } + self.subscription.next() + } +} - if self.done.load(Ordering::Relaxed) { - return None; - } +impl<'a, T: TickDecoder> IntoIterator for &'a TickSubscription { + type Item = T; + type IntoIter = TickSubscriptionIter<'a, T>; - loop { - match self.messages.next() { - Some(Ok(mut message)) => { - if message.message_type() == Self::Item::message_type() { - let mut buffer = self.buffer.lock().unwrap(); - - let (ticks, done) = Self::Item::decode(&mut message).unwrap(); - - buffer.append(&mut ticks.into()); - self.done.store(done, Ordering::Relaxed); - - if buffer.is_empty() && self.done.load(Ordering::Relaxed) { - return None; - } - - if !buffer.is_empty() { - return buffer.pop_front(); - } - } else if message.message_type() == IncomingMessages::Error { - error!("error reading ticks: {:?}", message.peek_string(4)); - return None; - } else { - error!("unexpected message: {:?}", message) - } - } - // TODO enumerate - _ => return None, - } - } + fn into_iter(self) -> Self::IntoIter { + self.iter() } } /// An iterator that yields items as they become available, blocking if necessary. -pub struct TickSubscriptionIter<'a, T: TickDecoder> { - subscription: &'a TickSubscription, +pub struct TickSubscriptionOwnedIter> { + subscription: TickSubscription, } -impl<'a, T: TickDecoder> Iterator for TickSubscriptionIter<'a, T> { +impl> Iterator for TickSubscriptionOwnedIter { type Item = T; fn next(&mut self) -> Option { @@ -717,6 +704,15 @@ impl<'a, T: TickDecoder> Iterator for TickSubscriptionIter<'a, T> { } } +impl> IntoIterator for TickSubscription { + type Item = T; + type IntoIter = TickSubscriptionOwnedIter; + + fn into_iter(self) -> Self::IntoIter { + TickSubscriptionOwnedIter { subscription: self } + } +} + /// An iterator that yields items if they are available, without waiting. pub struct TickSubscriptionTryIter<'a, T: TickDecoder> { subscription: &'a TickSubscription,