1- use async_std:: io:: Read as AsyncRead ;
1+ use async_std:: io:: { BufRead as AsyncBufRead , Read as AsyncRead } ;
22use async_std:: prelude:: * ;
33use async_std:: task:: { ready, Context , Poll } ;
44
@@ -10,72 +10,71 @@ pin_project_lite::pin_project! {
1010 /// An SSE protocol encoder.
1111 #[ derive( Debug ) ]
1212 pub struct Encoder {
13- buf: Option <Vec <u8 >>,
13+ buf: Box <[ u8 ] >,
14+ cursor: usize ,
1415 #[ pin]
1516 receiver: async_channel:: Receiver <Vec <u8 >>,
16- cursor: usize ,
1717 }
1818}
1919
2020impl AsyncRead for Encoder {
2121 fn poll_read (
22- mut self : Pin < & mut Self > ,
22+ self : Pin < & mut Self > ,
2323 cx : & mut Context < ' _ > ,
2424 buf : & mut [ u8 ] ,
2525 ) -> Poll < io:: Result < usize > > {
26- // Request a new buffer if we don't have one yet.
27- if let None = self . buf {
28- self . buf = match ready ! ( Pin :: new( & mut self . receiver) . poll_next( cx) ) {
26+ let mut this = self . project ( ) ;
27+ // Request a new buffer if current one is exhausted.
28+ if this. buf . len ( ) <= * this. cursor {
29+ match ready ! ( this. receiver. as_mut( ) . poll_next( cx) ) {
2930 Some ( buf) => {
3031 log:: trace!( "> Received a new buffer with len {}" , buf. len( ) ) ;
31- Some ( buf)
32+ * this. buf = buf. into_boxed_slice ( ) ;
33+ * this. cursor = 0 ;
3234 }
3335 None => {
3436 log:: trace!( "> Encoder done reading" ) ;
3537 return Poll :: Ready ( Ok ( 0 ) ) ;
3638 }
3739 } ;
38- } ;
40+ }
3941
4042 // Write the current buffer to completion.
41- let local_buf = self . buf . as_mut ( ) . unwrap ( ) ;
42- let local_len = local_buf. len ( ) ;
43+ let local_buf = & this. buf [ * this. cursor ..] ;
4344 let max = buf. len ( ) . min ( local_buf. len ( ) ) ;
4445 buf[ ..max] . clone_from_slice ( & local_buf[ ..max] ) ;
45-
46- self . cursor += max;
47-
48- // Reset values if we're done reading.
49- if self . cursor == local_len {
50- self . buf = None ;
51- self . cursor = 0 ;
52- } ;
46+ * this. cursor += max;
5347
5448 // Return bytes read.
5549 Poll :: Ready ( Ok ( max) )
5650 }
5751}
5852
59- // impl AsyncBufRead for Encoder {
60- // fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61- // match ready!(self.project().receiver.poll_next(cx)) {
62- // Some(buf) => match &self.buf {
63- // None => self.project().buf = &mut Some(buf),
64- // Some(local_buf) => local_buf.extend(buf),
65- // },
66- // None => {
67- // if let None = self.buf {
68- // self.project().buf = &mut Some(vec![]);
69- // };
70- // }
71- // };
72- // Poll::Ready(Ok(self.buf.as_ref().unwrap()))
73- // }
74-
75- // fn consume(self: Pin<&mut Self>, amt: usize) {
76- // Pin::new(self).cursor += amt;
77- // }
78- // }
53+ impl AsyncBufRead for Encoder {
54+ fn poll_fill_buf ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < io:: Result < & [ u8 ] > > {
55+ let mut this = self . project ( ) ;
56+ // Request a new buffer if current one is exhausted.
57+ if this. buf . len ( ) <= * this. cursor {
58+ match ready ! ( this. receiver. as_mut( ) . poll_next( cx) ) {
59+ Some ( buf) => {
60+ log:: trace!( "> Received a new buffer with len {}" , buf. len( ) ) ;
61+ * this. buf = buf. into_boxed_slice ( ) ;
62+ * this. cursor = 0 ;
63+ }
64+ None => {
65+ log:: trace!( "> Encoder done reading" ) ;
66+ return Poll :: Ready ( Ok ( & [ ] ) ) ;
67+ }
68+ } ;
69+ }
70+ Poll :: Ready ( Ok ( & this. buf [ * this. cursor ..] ) )
71+ }
72+
73+ fn consume ( self : Pin < & mut Self > , amt : usize ) {
74+ let this = self . project ( ) ;
75+ * this. cursor += amt;
76+ }
77+ }
7978
8079/// The sending side of the encoder.
8180#[ derive( Debug , Clone ) ]
@@ -86,7 +85,7 @@ pub fn encode() -> (Sender, Encoder) {
8685 let ( sender, receiver) = async_channel:: bounded ( 1 ) ;
8786 let encoder = Encoder {
8887 receiver,
89- buf : None ,
88+ buf : Box :: default ( ) ,
9089 cursor : 0 ,
9190 } ;
9291 ( Sender ( sender) , encoder)
0 commit comments