Skip to content

Commit

Permalink
Historic data refactor (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
wboayue authored Nov 10, 2024
1 parent c7b689a commit bd3aa78
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 151 deletions.
18 changes: 9 additions & 9 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ impl Client {
number_of_ticks: i32,
use_rth: bool,
ignore_size: bool,
) -> Result<impl Iterator<Item = historical::TickBidAsk>, Error> {
) -> Result<historical::TickSubscription<historical::TickBidAsk>, Error> {
historical::historical_ticks_bid_ask(self, contract, start, end, number_of_ticks, use_rth, ignore_size)
}

Expand Down Expand Up @@ -968,7 +968,7 @@ impl Client {
end: Option<OffsetDateTime>,
number_of_ticks: i32,
use_rth: bool,
) -> Result<impl Iterator<Item = historical::TickMidpoint>, Error> {
) -> Result<historical::TickSubscription<historical::TickMidpoint>, Error> {
historical::historical_ticks_mid_point(self, contract, start, end, number_of_ticks, use_rth)
}

Expand Down Expand Up @@ -1008,7 +1008,7 @@ impl Client {
end: Option<OffsetDateTime>,
number_of_ticks: i32,
use_rth: bool,
) -> Result<impl Iterator<Item = historical::TickLast>, Error> {
) -> Result<historical::TickSubscription<historical::TickLast>, Error> {
historical::historical_ticks_trade(self, contract, start, end, number_of_ticks, use_rth)
}

Expand Down Expand Up @@ -1650,7 +1650,7 @@ pub struct Subscription<'a, T: DataStream<T>> {
cancelled: AtomicBool,
subscription: InternalSubscription,
response_context: ResponseContext,
error: Arc<Mutex<Option<Error>>>,
error: Mutex<Option<Error>>,
}

// Extra metadata that might be need
Expand All @@ -1672,7 +1672,7 @@ impl<'a, T: DataStream<T>> Subscription<'a, T> {
response_context: context,
phantom: PhantomData,
cancelled: AtomicBool::new(false),
error: Arc::new(Mutex::new(None)),
error: Mutex::new(None),
}
} else if let Some(order_id) = subscription.order_id {
Subscription {
Expand All @@ -1684,7 +1684,7 @@ impl<'a, T: DataStream<T>> Subscription<'a, T> {
response_context: context,
phantom: PhantomData,
cancelled: AtomicBool::new(false),
error: Arc::new(Mutex::new(None)),
error: Mutex::new(None),
}
} else if let Some(message_type) = subscription.message_type {
Subscription {
Expand All @@ -1696,7 +1696,7 @@ impl<'a, T: DataStream<T>> Subscription<'a, T> {
response_context: context,
phantom: PhantomData,
cancelled: AtomicBool::new(false),
error: Arc::new(Mutex::new(None)),
error: Mutex::new(None),
}
} else {
panic!("unsupported internal subscription: {:?}", subscription)
Expand Down Expand Up @@ -1740,8 +1740,6 @@ impl<'a, T: DataStream<T>> Subscription<'a, T> {
/// * `Some(T)` - The next available item from the subscription
/// * `None` - If the subscription has ended or encountered an error
pub fn next(&self) -> Option<T> {
self.clear_error();

match self.process_response(self.subscription.next()) {
Some(val) => Some(val),
None => match self.error() {
Expand All @@ -1755,6 +1753,8 @@ impl<'a, T: DataStream<T>> Subscription<'a, T> {
}

fn process_response(&self, response: Option<Result<ResponseMessage, Error>>) -> Option<T> {
self.clear_error();

match response {
Some(Ok(message)) => self.process_message(message),
Some(Err(e)) => {
Expand Down
Loading

0 comments on commit bd3aa78

Please sign in to comment.