Skip to content

Commit

Permalink
refactor subscription use
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Nov 9, 2024
1 parent 9c445ef commit 9577a84
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions src/market_data/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,14 @@ pub(crate) fn head_timestamp(client: &Client, contract: &Contract, what_to_show:

let request_id = client.next_request_id();
let request = encoders::encode_request_head_timestamp(request_id, contract, what_to_show, use_rth)?;

let subscription = client.send_request(request_id, request)?;

if let Some(Ok(mut message)) = subscription.next() {
decoders::decode_head_timestamp(&mut message)
} else {
Err(Error::Simple("did not receive head timestamp message".into()))
match subscription.next() {
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HeadTimestamp => Ok(decoders::decode_head_timestamp(&mut message)?),
Some(Ok(message)) => Err(Error::UnexpectedResponse(message)),
Some(Err(Error::ConnectionReset)) => head_timestamp(client, contract, what_to_show, use_rth),
Some(Err(e)) => Err(e),
None => Err(Error::UnexpectedEndOfStream),
}
}

Expand Down Expand Up @@ -352,20 +353,23 @@ pub(crate) fn historical_data(

let subscription = client.send_request(request_id, request)?;

if let Some(Ok(mut message)) = subscription.next() {
let time_zone = if let Some(tz) = client.time_zone {
tz
} else {
warn!("server timezone unknown. assuming UTC, but that may be incorrect!");
time_tz::timezones::db::UTC
};
match message.message_type() {
IncomingMessages::HistoricalData => decoders::decode_historical_data(client.server_version, time_zone, &mut message),
IncomingMessages::Error => Err(Error::Simple(message.peek_string(4))),
_ => Err(Error::Simple(format!("unexpected message: {:?}", message.message_type()))),
match subscription.next() {
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HistoricalData => {
Ok(decoders::decode_historical_data(client.server_version, time_zone(client), &mut message)?)
}
Some(Ok(message)) => Err(Error::UnexpectedResponse(message)),
Some(Err(Error::ConnectionReset)) => historical_data(client, contract, end_date, duration, bar_size, what_to_show, use_rth),
Some(Err(e)) => Err(e),
None => Err(Error::UnexpectedEndOfStream),
}
}

fn time_zone(client: &Client) -> &time_tz::Tz {
if let Some(tz) = client.time_zone {
tz
} else {
Err(Error::Simple("did not receive historical data response".into()))
warn!("server timezone unknown. assuming UTC, but that may be incorrect!");
time_tz::timezones::db::UTC
}
}

Expand Down

0 comments on commit 9577a84

Please sign in to comment.