@@ -3,7 +3,10 @@ use super::async_std;
3
3
use super :: ConnectionLike ;
4
4
use super :: { setup_connection, AsyncStream , RedisRuntime } ;
5
5
use crate :: cmd:: { cmd, Cmd } ;
6
- use crate :: connection:: { ConnectionAddr , ConnectionInfo , Msg , RedisConnectionInfo } ;
6
+ use crate :: connection:: {
7
+ resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr , ConnectionInfo ,
8
+ Msg , RedisConnectionInfo ,
9
+ } ;
7
10
#[ cfg( any( feature = "tokio-comp" , feature = "async-std-comp" ) ) ]
8
11
use crate :: parser:: ValueCodec ;
9
12
use crate :: types:: { ErrorKind , FromRedisValue , RedisError , RedisFuture , RedisResult , Value } ;
@@ -36,6 +39,9 @@ pub struct Connection<C = Pin<Box<dyn AsyncStream + Send + Sync>>> {
36
39
// This flag is checked when attempting to send a command, and if it's raised, we attempt to
37
40
// exit the pubsub state before executing the new request.
38
41
pubsub : bool ,
42
+
43
+ // Flag indicating whether resp3 mode is enabled.
44
+ resp3 : bool ,
39
45
}
40
46
41
47
fn assert_sync < T : Sync > ( ) { }
@@ -53,13 +59,15 @@ impl<C> Connection<C> {
53
59
decoder,
54
60
db,
55
61
pubsub,
62
+ resp3,
56
63
} = self ;
57
64
Connection {
58
65
con : f ( con) ,
59
66
buf,
60
67
decoder,
61
68
db,
62
69
pubsub,
70
+ resp3,
63
71
}
64
72
}
65
73
}
77
85
decoder : combine:: stream:: Decoder :: new ( ) ,
78
86
db : connection_info. db ,
79
87
pubsub : false ,
88
+ resp3 : connection_info. use_resp3 ,
80
89
} ;
81
90
setup_connection ( connection_info, & mut rv) . await ?;
82
91
Ok ( rv)
@@ -143,17 +152,32 @@ where
143
152
// messages are received until the _subscription count_ in the responses reach zero.
144
153
let mut received_unsub = false ;
145
154
let mut received_punsub = false ;
146
- loop {
147
- let res: ( Vec < u8 > , ( ) , isize ) = from_redis_value ( & self . read_response ( ) . await ?) ?;
148
-
149
- match res. 0 . first ( ) {
150
- Some ( & b'u' ) => received_unsub = true ,
151
- Some ( & b'p' ) => received_punsub = true ,
152
- _ => ( ) ,
155
+ if self . resp3 {
156
+ while let Value :: Push { kind, data } = from_redis_value ( & self . read_response ( ) . await ?) ? {
157
+ if data. len ( ) >= 2 {
158
+ if let Value :: Int ( num) = data[ 1 ] {
159
+ if resp3_is_pub_sub_state_cleared (
160
+ & mut received_unsub,
161
+ & mut received_punsub,
162
+ & kind,
163
+ num as isize ,
164
+ ) {
165
+ break ;
166
+ }
167
+ }
168
+ }
153
169
}
154
-
155
- if received_unsub && received_punsub && res. 2 == 0 {
156
- break ;
170
+ } else {
171
+ loop {
172
+ let res: ( Vec < u8 > , ( ) , isize ) = from_redis_value ( & self . read_response ( ) . await ?) ?;
173
+ if resp2_is_pub_sub_state_cleared (
174
+ & mut received_unsub,
175
+ & mut received_punsub,
176
+ & res. 0 ,
177
+ res. 2 ,
178
+ ) {
179
+ break ;
180
+ }
157
181
}
158
182
}
159
183
@@ -199,7 +223,17 @@ where
199
223
self . buf . clear ( ) ;
200
224
cmd. write_packed_command ( & mut self . buf ) ;
201
225
self . con . write_all ( & self . buf ) . await ?;
202
- self . read_response ( ) . await
226
+ if cmd. is_no_response ( ) {
227
+ return Ok ( Value :: Nil ) ;
228
+ }
229
+ loop {
230
+ match self . read_response ( ) . await ? {
231
+ Value :: Push { .. } => {
232
+ //self.execute_push_message(kind, data) //TODO
233
+ }
234
+ val => return Ok ( val) ,
235
+ }
236
+ }
203
237
} )
204
238
. boxed ( )
205
239
}
@@ -231,18 +265,28 @@ where
231
265
}
232
266
233
267
let mut rv = Vec :: with_capacity ( count) ;
234
- for _ in 0 ..count {
268
+ let mut count = count;
269
+ let mut idx = 0 ;
270
+ while idx < count {
235
271
let response = self . read_response ( ) . await ;
236
272
match response {
237
273
Ok ( item) => {
238
- rv. push ( item) ;
274
+ // RESP3 can insert push data between command replies
275
+ if let Value :: Push { .. } = item {
276
+ // if that is the case we have to extend the loop and handle push data
277
+ count += 1 ;
278
+ // self.execute_push_message(kind, data); //TODO
279
+ } else {
280
+ rv. push ( item) ;
281
+ }
239
282
}
240
283
Err ( err) => {
241
284
if first_err. is_none ( ) {
242
285
first_err = Some ( err) ;
243
286
}
244
287
}
245
288
}
289
+ idx += 1 ;
246
290
}
247
291
248
292
if let Some ( err) = first_err {
@@ -275,31 +319,42 @@ where
275
319
276
320
/// Subscribes to a new channel.
277
321
pub async fn subscribe < T : ToRedisArgs > ( & mut self , channel : T ) -> RedisResult < ( ) > {
278
- cmd ( "SUBSCRIBE" ) . arg ( channel) . query_async ( & mut self . 0 ) . await
322
+ let mut cmd = cmd ( "SUBSCRIBE" ) ;
323
+ cmd. arg ( channel) ;
324
+ if self . 0 . resp3 {
325
+ cmd. set_no_response ( true ) ;
326
+ }
327
+ cmd. query_async ( & mut self . 0 ) . await
279
328
}
280
329
281
330
/// Subscribes to a new channel with a pattern.
282
331
pub async fn psubscribe < T : ToRedisArgs > ( & mut self , pchannel : T ) -> RedisResult < ( ) > {
283
- cmd ( "PSUBSCRIBE" )
284
- . arg ( pchannel)
285
- . query_async ( & mut self . 0 )
286
- . await
332
+ let mut cmd = cmd ( "PSUBSCRIBE" ) ;
333
+ cmd. arg ( pchannel) ;
334
+ if self . 0 . resp3 {
335
+ cmd. set_no_response ( true ) ;
336
+ }
337
+ cmd. query_async ( & mut self . 0 ) . await
287
338
}
288
339
289
340
/// Unsubscribes from a channel.
290
341
pub async fn unsubscribe < T : ToRedisArgs > ( & mut self , channel : T ) -> RedisResult < ( ) > {
291
- cmd ( "UNSUBSCRIBE" )
292
- . arg ( channel)
293
- . query_async ( & mut self . 0 )
294
- . await
342
+ let mut cmd = cmd ( "UNSUBSCRIBE" ) ;
343
+ cmd. arg ( channel) ;
344
+ if self . 0 . resp3 {
345
+ cmd. set_no_response ( true ) ;
346
+ }
347
+ cmd. query_async ( & mut self . 0 ) . await
295
348
}
296
349
297
350
/// Unsubscribes from a channel with a pattern.
298
351
pub async fn punsubscribe < T : ToRedisArgs > ( & mut self , pchannel : T ) -> RedisResult < ( ) > {
299
- cmd ( "PUNSUBSCRIBE" )
300
- . arg ( pchannel)
301
- . query_async ( & mut self . 0 )
302
- . await
352
+ let mut cmd = cmd ( "PUNSUBSCRIBE" ) ;
353
+ cmd. arg ( pchannel) ;
354
+ if self . 0 . resp3 {
355
+ cmd. set_no_response ( true ) ;
356
+ }
357
+ cmd. query_async ( & mut self . 0 ) . await
303
358
}
304
359
305
360
/// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions.
0 commit comments