Skip to content

Commit

Permalink
188 fix delayed realtime stream (#189)
Browse files Browse the repository at this point in the history
* checkpoint

* improve validation

* implement clone, partialeq
  • Loading branch information
wboayue authored Dec 7, 2024
1 parent ce1680e commit 60d9d1f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
4 changes: 0 additions & 4 deletions src/market_data/realtime/decoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ use super::{
mod tests;

pub(super) fn decode_realtime_bar(message: &mut ResponseMessage) -> Result<Bar, Error> {
if message.len() < 11 {
return Err(Error::Simple("Invalid message length".into()));
}

message.skip(); // message type
message.skip(); // message version
message.skip(); // message request id
Expand Down
40 changes: 40 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ impl ResponseMessage {
}

pub fn peek_int(&self, i: usize) -> Result<i32, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected int and found end of message".into()));
}

let field = &self.fields[i];
match field.parse() {
Ok(val) => Ok(val),
Expand All @@ -451,6 +455,10 @@ impl ResponseMessage {
}

pub fn next_int(&mut self) -> Result<i32, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected int and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -461,6 +469,10 @@ impl ResponseMessage {
}

pub fn next_optional_int(&mut self) -> Result<Option<i32>, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected optional int and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -475,13 +487,21 @@ impl ResponseMessage {
}

pub fn next_bool(&mut self) -> Result<bool, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected bool and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Ok(field == "1")
}

pub fn next_long(&mut self) -> Result<i64, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected long and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -492,6 +512,10 @@ impl ResponseMessage {
}

pub fn next_optional_long(&mut self) -> Result<Option<i64>, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected optional long and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -506,6 +530,10 @@ impl ResponseMessage {
}

pub fn next_date_time(&mut self) -> Result<OffsetDateTime, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected datetime and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -522,12 +550,20 @@ impl ResponseMessage {
}

pub fn next_string(&mut self) -> Result<String, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected string and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;
Ok(String::from(field))
}

pub fn next_double(&mut self) -> Result<f64, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected double and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand All @@ -542,6 +578,10 @@ impl ResponseMessage {
}

pub fn next_optional_double(&mut self) -> Result<Option<f64>, Error> {
if self.i >= self.fields.len() {
return Err(Error::Simple("expected optional double and found end of message".into()));
}

let field = &self.fields[self.i];
self.i += 1;

Expand Down
2 changes: 2 additions & 0 deletions src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub(super) fn scanner_parameters(client: &Client) -> Result<String, Error> {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
/// Scanner subscription parameters.
pub struct ScannerSubscription {
/// The number of rows to be returned for the query
pub number_of_rows: i32,
Expand Down

0 comments on commit 60d9d1f

Please sign in to comment.