@@ -548,8 +548,12 @@ pub struct CoLinkInfo {
548
548
pub version : String ,
549
549
}
550
550
551
+ pub enum CoLinkMQType {
552
+ RabbitMQ ,
553
+ RedisStream ,
554
+ }
551
555
pub struct CoLinkSubscriber {
552
- mq_type : i32 , // 0 for rabbitmq, 1 for redis stream
556
+ mq_type : CoLinkMQType ,
553
557
queue_name : String ,
554
558
rabbitmq_consumer : Option < lapin:: Consumer > ,
555
559
redis_connection : Option < redis:: aio:: Connection > ,
@@ -562,7 +566,7 @@ impl CoLinkSubscriber {
562
566
let client = redis:: Client :: open ( mq_uri) ?;
563
567
let con = client. get_async_connection ( ) . await ?;
564
568
Ok ( Self {
565
- mq_type : 1 ,
569
+ mq_type : CoLinkMQType :: RedisStream ,
566
570
queue_name : queue_name. to_string ( ) ,
567
571
rabbitmq_consumer : None ,
568
572
redis_connection : Some ( con) ,
@@ -579,7 +583,7 @@ impl CoLinkSubscriber {
579
583
)
580
584
. await ?;
581
585
Ok ( Self {
582
- mq_type : 0 ,
586
+ mq_type : CoLinkMQType :: RabbitMQ ,
583
587
queue_name : queue_name. to_string ( ) ,
584
588
rabbitmq_consumer : Some ( consumer) ,
585
589
redis_connection : None ,
@@ -588,44 +592,46 @@ impl CoLinkSubscriber {
588
592
}
589
593
590
594
pub async fn get_next ( & mut self ) -> Result < Vec < u8 > , Error > {
591
- if self . mq_type == 0 {
592
- let delivery = self
593
- . rabbitmq_consumer
594
- . as_mut ( )
595
- . unwrap ( )
596
- . next ( )
597
- . await
598
- . expect ( "error in consumer" ) ;
599
- let delivery = delivery. expect ( "error in consumer" ) ;
600
- delivery. ack ( BasicAckOptions :: default ( ) ) . await ?;
601
- Ok ( delivery. data )
602
- } else if self . mq_type == 1 {
603
- let opts = StreamReadOptions :: default ( )
604
- . group ( & self . queue_name , uuid:: Uuid :: new_v4 ( ) . to_string ( ) )
605
- . block ( 0 )
606
- . count ( 1 ) ;
607
- let res: StreamReadReply = self
608
- . redis_connection
609
- . as_mut ( )
610
- . unwrap ( )
611
- . xread_options ( & [ & self . queue_name ] , & [ ">" ] , & opts)
612
- . await ?;
613
- let id = & res. keys [ 0 ] . ids [ 0 ] . id ;
614
- let data: Vec < u8 > =
615
- FromRedisValue :: from_redis_value ( res. keys [ 0 ] . ids [ 0 ] . map . get ( "payload" ) . unwrap ( ) ) ?;
616
- self . redis_connection
617
- . as_mut ( )
618
- . unwrap ( )
619
- . xack ( & self . queue_name , & self . queue_name , & [ id] )
620
- . await ?;
621
- self . redis_connection
622
- . as_mut ( )
623
- . unwrap ( )
624
- . xdel ( & self . queue_name , & [ id] )
625
- . await ?;
626
- Ok ( data)
627
- } else {
628
- Err ( "mq type is not supported." ) ?
595
+ match self . mq_type {
596
+ CoLinkMQType :: RabbitMQ => {
597
+ let delivery = self
598
+ . rabbitmq_consumer
599
+ . as_mut ( )
600
+ . unwrap ( )
601
+ . next ( )
602
+ . await
603
+ . expect ( "error in consumer" ) ;
604
+ let delivery = delivery. expect ( "error in consumer" ) ;
605
+ delivery. ack ( BasicAckOptions :: default ( ) ) . await ?;
606
+ Ok ( delivery. data )
607
+ }
608
+ CoLinkMQType :: RedisStream => {
609
+ let opts = StreamReadOptions :: default ( )
610
+ . group ( & self . queue_name , uuid:: Uuid :: new_v4 ( ) . to_string ( ) )
611
+ . block ( 0 )
612
+ . count ( 1 ) ;
613
+ let res: StreamReadReply = self
614
+ . redis_connection
615
+ . as_mut ( )
616
+ . unwrap ( )
617
+ . xread_options ( & [ & self . queue_name ] , & [ ">" ] , & opts)
618
+ . await ?;
619
+ let id = & res. keys [ 0 ] . ids [ 0 ] . id ;
620
+ let data: Vec < u8 > = FromRedisValue :: from_redis_value (
621
+ res. keys [ 0 ] . ids [ 0 ] . map . get ( "payload" ) . unwrap ( ) ,
622
+ ) ?;
623
+ self . redis_connection
624
+ . as_mut ( )
625
+ . unwrap ( )
626
+ . xack ( & self . queue_name , & self . queue_name , & [ id] )
627
+ . await ?;
628
+ self . redis_connection
629
+ . as_mut ( )
630
+ . unwrap ( )
631
+ . xdel ( & self . queue_name , & [ id] )
632
+ . await ?;
633
+ Ok ( data)
634
+ }
629
635
}
630
636
}
631
637
}
0 commit comments