1+ use arrayvec:: ArrayVec ;
12use std:: error:: Error ;
23use std:: { pin:: Pin , task:: Poll } ;
34
@@ -34,7 +35,8 @@ pin_project! {
3435 pub client_name: String ,
3536 pub exchange_name: u8 ,
3637 pub snapshot_enabled: bool ,
37- pub websocket_depth_buffer: Vec <DepthUpdate >,
38+ // todo: update webssocket_depth_buffer take the array vecs size by pragma or something similar -- it should be configurable
39+ pub websocket_depth_buffer: ArrayVec <DepthUpdate , 1000 >,
3840 pub pull_retry_count: u8 ,
3941 pub http_snapshot_uri: String ,
4042 pub buffer_websocket_depths: bool ,
@@ -46,8 +48,9 @@ pin_project! {
4648
4749 pub snapshot_sync: Option <stateSync<( ) >>,
4850
51+ // todo: update webssocket_depth_buffer take the array vecs size by pragma or something similar -- it should be configurable
4952 #[ pin]
50- buffer : Vec <DepthUpdate >,
53+ depth_update_buffer : ArrayVec <DepthUpdate , 1000 >,
5154 #[ pin]
5255 ws_connection_orderbook: Option <WebSocketStream <MaybeTlsStream <TcpStream >>>,
5356 #[ pin]
@@ -75,9 +78,7 @@ impl ExchangeStream {
7578 client_name : exchange_config. client_name . clone ( ) ,
7679 exchange_name : exchange_config. exchange_name ,
7780 snapshot_sync : Some ( snapshot_sync) ,
78- websocket_depth_buffer : Vec :: with_capacity (
79- exchange_config. ws_presequenced_depth_buffer ,
80- ) ,
81+ websocket_depth_buffer : ArrayVec :: new ( ) ,
8182 buffer_websocket_depths : false ,
8283 snapshot_enabled : exchange_config. snapshot_enabled ,
8384 pull_retry_count : 5 ,
@@ -88,7 +89,7 @@ impl ExchangeStream {
8889 watched_pair : exchange_config. watched_pair . clone ( ) ,
8990 stream_count : 0 ,
9091 ws_connection_orderbook : None :: < WebSocketStream < MaybeTlsStream < TcpStream > > > ,
91- buffer : Vec :: with_capacity ( exchange_config . buffer_size ) ,
92+ depth_update_buffer : ArrayVec :: new ( ) ,
9293 ws_connection_orderbook_reader : None ,
9394 depths_producer : orders_producer,
9495 http_client,
@@ -133,7 +134,7 @@ impl ExchangeStream {
133134 match pull_result {
134135 Ok ( mut depths) => {
135136 while let Some ( depth) = depths. next ( ) {
136- self . buffer . push ( depth) ;
137+ self . depth_update_buffer . push ( depth) ;
137138 // we must keep processing snapshot depths and depths from the websocket
138139 // but this time the websocket depths are stored in their own buffer
139140 // to be sequenced aftr snapshot depths are processed
@@ -155,7 +156,7 @@ impl ExchangeStream {
155156
156157 pub async fn push_buffered_ws_depths ( & mut self ) {
157158 while let Some ( websocket_depth) = self . websocket_depth_buffer . pop ( ) {
158- self . buffer . push ( websocket_depth) ;
159+ self . depth_update_buffer . push ( websocket_depth) ;
159160 }
160161 self . buffer_websocket_depths = false ;
161162 }
@@ -421,12 +422,12 @@ impl Stream for ExchangeStream {
421422 let woven_depths = interleave ( depths. 0 , depths. 1 ) ;
422423 if * this. buffer_websocket_depths {
423424 for depth in woven_depths {
424- this. buffer . push ( depth) ;
425+ this. depth_update_buffer . push ( depth) ;
425426 }
426427 continue ;
427428 } else {
428429 for depth in woven_depths {
429- this. buffer . push ( depth) ;
430+ this. depth_update_buffer . push ( depth) ;
430431 }
431432 }
432433 }
@@ -436,12 +437,12 @@ impl Stream for ExchangeStream {
436437 let woven_depths = interleave ( depths. 0 , depths. 1 ) ;
437438 if * this. buffer_websocket_depths {
438439 for depth in woven_depths {
439- this. buffer . push ( depth) ;
440+ this. depth_update_buffer . push ( depth) ;
440441 }
441442 continue ;
442443 }
443444 for depth in woven_depths {
444- this. buffer . push ( depth) ;
445+ this. depth_update_buffer . push ( depth) ;
445446 }
446447 } else {
447448 warn ! ( "failed to deserialize the object." ) ;
0 commit comments