Skip to content

Commit

Permalink
implement into iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Nov 9, 2024
1 parent dfdd6e3 commit 0a3d31e
Showing 1 changed file with 52 additions and 56 deletions.
108 changes: 52 additions & 56 deletions src/market_data/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::{Debug, Display};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;

use log::{debug, error, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use time::{Date, OffsetDateTime};

Expand Down Expand Up @@ -500,35 +500,32 @@ pub(crate) fn histogram_data(client: &Client, contract: &Contract, use_rth: bool
}

pub trait TickDecoder<T> {
const MESSAGE_TYPE: IncomingMessages;
fn decode(message: &mut ResponseMessage) -> Result<(Vec<T>, bool), Error>;
fn message_type() -> IncomingMessages;
}

impl TickDecoder<TickBidAsk> for TickBidAsk {
const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTickBidAsk;

fn decode(message: &mut ResponseMessage) -> Result<(Vec<TickBidAsk>, bool), Error> {
decoders::decode_historical_ticks_bid_ask(message)
}
fn message_type() -> IncomingMessages {
IncomingMessages::HistoricalTickBidAsk
}
}

impl TickDecoder<TickLast> for TickLast {
const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTickLast;

fn decode(message: &mut ResponseMessage) -> Result<(Vec<TickLast>, bool), Error> {
decoders::decode_historical_ticks_last(message)
}
fn message_type() -> IncomingMessages {
IncomingMessages::HistoricalTickLast
}
}

impl TickDecoder<TickMidpoint> for TickMidpoint {
const MESSAGE_TYPE: IncomingMessages = IncomingMessages::HistoricalTick;

fn decode(message: &mut ResponseMessage) -> Result<(Vec<TickMidpoint>, bool), Error> {
decoders::decode_historical_ticks_mid_point(message)
}
fn message_type() -> IncomingMessages {
IncomingMessages::HistoricalTick
}
}

pub struct TickSubscription<T: TickDecoder<T>> {
Expand All @@ -548,6 +545,21 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
}
}

pub fn iter(&self) -> TickSubscriptionIter<T> {
TickSubscriptionIter { subscription: self }
}

pub fn try_iter(&self) -> TickSubscriptionTryIter<T> {
TickSubscriptionTryIter { subscription: self }
}

pub fn timeout_iter(&self, duration: std::time::Duration) -> TickSubscriptionTimeoutIter<T> {
TickSubscriptionTimeoutIter {
subscription: self,
timeout: duration,
}
}

pub fn next(&self) -> Option<T> {
self.clear_error();

Expand All @@ -560,7 +572,7 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
}

match self.messages.next() {
Some(Ok(message)) if message.message_type() == T::message_type() => {
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
self.fill_buffer(message);
self.next()
}
Expand Down Expand Up @@ -588,7 +600,7 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
}

match self.messages.try_next() {
Some(Ok(message)) if message.message_type() == T::message_type() => {
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
self.fill_buffer(message);
self.try_next()
}
Expand Down Expand Up @@ -616,7 +628,7 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
}

match self.messages.next_timeout(duration) {
Some(Ok(message)) if message.message_type() == T::message_type() => {
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
self.fill_buffer(message);
self.next_timeout(duration)
}
Expand Down Expand Up @@ -657,66 +669,50 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
}
}

impl<T: TickDecoder<T> + Debug> Iterator for TickSubscription<T> {
/// An iterator that yields items as they become available, blocking if necessary.
pub struct TickSubscriptionIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
}

impl<'a, T: TickDecoder<T>> Iterator for TickSubscriptionIter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
{
let mut buffer = self.buffer.lock().unwrap();
if !buffer.is_empty() {
return buffer.pop_front();
}
}
self.subscription.next()
}
}

if self.done.load(Ordering::Relaxed) {
return None;
}
impl<'a, T: TickDecoder<T>> IntoIterator for &'a TickSubscription<T> {
type Item = T;
type IntoIter = TickSubscriptionIter<'a, T>;

loop {
match self.messages.next() {
Some(Ok(mut message)) => {
if message.message_type() == Self::Item::message_type() {
let mut buffer = self.buffer.lock().unwrap();

let (ticks, done) = Self::Item::decode(&mut message).unwrap();

buffer.append(&mut ticks.into());
self.done.store(done, Ordering::Relaxed);

if buffer.is_empty() && self.done.load(Ordering::Relaxed) {
return None;
}

if !buffer.is_empty() {
return buffer.pop_front();
}
} else if message.message_type() == IncomingMessages::Error {
error!("error reading ticks: {:?}", message.peek_string(4));
return None;
} else {
error!("unexpected message: {:?}", message)
}
}
// TODO enumerate
_ => return None,
}
}
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

/// An iterator that yields items as they become available, blocking if necessary.
pub struct TickSubscriptionIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
pub struct TickSubscriptionOwnedIter<T: TickDecoder<T>> {
subscription: TickSubscription<T>,
}

impl<'a, T: TickDecoder<T>> Iterator for TickSubscriptionIter<'a, T> {
impl<T: TickDecoder<T>> Iterator for TickSubscriptionOwnedIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}

impl<T: TickDecoder<T>> IntoIterator for TickSubscription<T> {
type Item = T;
type IntoIter = TickSubscriptionOwnedIter<T>;

fn into_iter(self) -> Self::IntoIter {
TickSubscriptionOwnedIter { subscription: self }
}
}

/// An iterator that yields items if they are available, without waiting.
pub struct TickSubscriptionTryIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
Expand Down

0 comments on commit 0a3d31e

Please sign in to comment.