Skip to content

Commit 4e6a2cc

Browse files
authored
Merge pull request #45 from CoLearn-Dev/redis-stream
MQ alternative: redis stream
2 parents 81f9743 + 3c47332 commit 4e6a2cc

File tree

8 files changed

+176
-92
lines changed

8 files changed

+176
-92
lines changed

.github/workflows/check.yml

+25-6
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,25 @@ jobs:
1212
defaults:
1313
run:
1414
shell: bash
15+
strategy:
16+
matrix:
17+
mq: [standalone, rabbitmq, redis]
18+
include:
19+
- mq: rabbitmq
20+
docker_image: "rabbitmq:3.8-management"
21+
mq_uri: "amqp://guest:guest@localhost"
22+
mq_api: "http://guest:guest@localhost:15672/api"
23+
- mq: redis
24+
docker_image: "redis"
25+
mq_uri: "redis://localhost:16379"
1526
services:
16-
rabbitmq:
17-
image: rabbitmq:3.8-management
27+
mq:
28+
image: ${{ matrix.docker_image }}
1829
ports:
19-
- 5672:5672
20-
- 15672:15672
21-
redis:
30+
- 5672:5672 # rabbitmq
31+
- 15672:15672 # rabbitmq
32+
- 16379:6379 # redis
33+
redis: # for storage macro
2234
image: redis
2335
ports:
2436
- 6379:6379
@@ -44,4 +56,11 @@ jobs:
4456
run: bash download-server.sh
4557
working-directory: tests
4658
- name: Run tests
47-
run: cargo test
59+
if: ${{ matrix.mq != 'standalone' }}
60+
env:
61+
COLINK_SERVER_MQ_URI: ${{ matrix.mq_uri }}
62+
COLINK_SERVER_MQ_API: ${{ matrix.mq_api }}
63+
run: cargo test test_main # remove test_main after updating protocols(policy module, remote storage)
64+
- name: Run tests (standalone)
65+
if: ${{ matrix.mq == 'standalone' }}
66+
run: cargo test test_main # remove test_main after updating protocols(policy module, remote storage)

Cargo.toml

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "colink"
3-
version = "0.2.11"
3+
version = "0.3.0"
44
edition = "2021"
55
description = "CoLink Rust SDK"
66
license = "MIT"
@@ -23,7 +23,7 @@ lapin = "2.1"
2323
prost = "0.10"
2424
rand = { version = "0.8", features = ["std_rng"] }
2525
rcgen = { version = "0.10", optional = true }
26-
redis = { version = "0.22", features = ["tokio-comp"], optional = true }
26+
redis = { version = "0.22", features = ["tokio-comp"] }
2727
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
2828
secp256k1 = { version = "0.25", features = ["rand-std"] }
2929
serde = { version = "1.0", features = ["derive"] }
@@ -34,6 +34,7 @@ tokio-rustls = { version = "0.23", optional = true }
3434
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
3535
tracing = "0.1"
3636
tracing-subscriber = "0.2"
37+
url = "2.2"
3738
uuid = { version = "0.8", features = ["v4"] }
3839

3940
[build-dependencies]
@@ -48,4 +49,4 @@ variable_transfer = ["extensions", "remote_storage", "hyper", "jsonwebtoken", "r
4849
registry = []
4950
policy_module = []
5051
instant_server = ["reqwest"]
51-
storage_macro = ["async-recursion", "redis"]
52+
storage_macro = ["async-recursion"]

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CoLink SDK helps both application and protocol developers access the functionali
99
Add this to your Cargo.toml:
1010
```toml
1111
[dependencies]
12-
colink = "0.2.11"
12+
colink = "0.3.0"
1313
```
1414

1515
## Getting Started

src/application.rs

+82-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use futures_lite::stream::StreamExt;
44
use lapin::{
55
options::{BasicAckOptions, BasicConsumeOptions},
66
types::FieldTable,
7-
Connection, ConnectionProperties, Consumer,
7+
ConnectionProperties,
8+
};
9+
use redis::{
10+
streams::{StreamReadOptions, StreamReadReply},
11+
AsyncCommands, FromRedisValue,
812
};
913
use secp256k1::Secp256k1;
1014
use serde::{Deserialize, Serialize};
@@ -544,32 +548,91 @@ pub struct CoLinkInfo {
544548
pub version: String,
545549
}
546550

551+
pub enum CoLinkMQType {
552+
RabbitMQ,
553+
RedisStream,
554+
}
547555
pub struct CoLinkSubscriber {
548-
consumer: Consumer,
556+
mq_type: CoLinkMQType,
557+
queue_name: String,
558+
rabbitmq_consumer: Option<lapin::Consumer>,
559+
redis_connection: Option<redis::aio::Connection>,
549560
}
550561

551562
impl CoLinkSubscriber {
552563
pub async fn new(mq_uri: &str, queue_name: &str) -> Result<Self, Error> {
553-
let mq = Connection::connect(mq_uri, ConnectionProperties::default()).await?;
554-
let channel = mq.create_channel().await?;
555-
let consumer = channel
556-
.basic_consume(
557-
queue_name,
558-
"",
559-
BasicConsumeOptions::default(),
560-
FieldTable::default(),
561-
)
562-
.await?;
563-
Ok(Self { consumer })
564+
let uri_parsed = url::Url::parse(mq_uri)?;
565+
if uri_parsed.scheme().starts_with("redis") {
566+
let client = redis::Client::open(mq_uri)?;
567+
let con = client.get_async_connection().await?;
568+
Ok(Self {
569+
mq_type: CoLinkMQType::RedisStream,
570+
queue_name: queue_name.to_string(),
571+
rabbitmq_consumer: None,
572+
redis_connection: Some(con),
573+
})
574+
} else {
575+
let mq = lapin::Connection::connect(mq_uri, ConnectionProperties::default()).await?;
576+
let channel = mq.create_channel().await?;
577+
let consumer = channel
578+
.basic_consume(
579+
queue_name,
580+
"",
581+
BasicConsumeOptions::default(),
582+
FieldTable::default(),
583+
)
584+
.await?;
585+
Ok(Self {
586+
mq_type: CoLinkMQType::RabbitMQ,
587+
queue_name: queue_name.to_string(),
588+
rabbitmq_consumer: Some(consumer),
589+
redis_connection: None,
590+
})
591+
}
564592
}
565593

566594
pub async fn get_next(&mut self) -> Result<Vec<u8>, Error> {
567-
let delivery = self.consumer.next().await.expect("error in consumer");
568-
let delivery = delivery.expect("error in consumer");
569-
let data = String::from_utf8_lossy(&delivery.data);
570-
debug!("CoLinkSubscriber Received [{}]", data);
571-
delivery.ack(BasicAckOptions::default()).await?;
572-
Ok(delivery.data)
595+
match self.mq_type {
596+
CoLinkMQType::RabbitMQ => {
597+
let delivery = self
598+
.rabbitmq_consumer
599+
.as_mut()
600+
.unwrap()
601+
.next()
602+
.await
603+
.expect("error in consumer");
604+
let delivery = delivery.expect("error in consumer");
605+
delivery.ack(BasicAckOptions::default()).await?;
606+
Ok(delivery.data)
607+
}
608+
CoLinkMQType::RedisStream => {
609+
let opts = StreamReadOptions::default()
610+
.group(&self.queue_name, uuid::Uuid::new_v4().to_string())
611+
.block(0)
612+
.count(1);
613+
let res: StreamReadReply = self
614+
.redis_connection
615+
.as_mut()
616+
.unwrap()
617+
.xread_options(&[&self.queue_name], &[">"], &opts)
618+
.await?;
619+
let id = &res.keys[0].ids[0].id;
620+
let data: Vec<u8> = FromRedisValue::from_redis_value(
621+
res.keys[0].ids[0].map.get("payload").unwrap(),
622+
)?;
623+
self.redis_connection
624+
.as_mut()
625+
.unwrap()
626+
.xack(&self.queue_name, &self.queue_name, &[id])
627+
.await?;
628+
self.redis_connection
629+
.as_mut()
630+
.unwrap()
631+
.xdel(&self.queue_name, &[id])
632+
.await?;
633+
Ok(data)
634+
}
635+
}
573636
}
574637
}
575638

src/extensions/instant_server.rs

+48-28
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl InstantServer {
4848
.arg("bash -c \"$(curl -fsSL https://raw.githubusercontent.com/CoLearn-Dev/colinkctl/main/install_colink.sh)\"")
4949
.env("COLINK_INSTALL_SERVER_ONLY", "true")
5050
.env("COLINK_INSTALL_SILENT", "true")
51-
.env("COLINK_SERVER_VERSION", "v0.2.9")
51+
.env("COLINK_SERVER_VERSION", "v0.3.0")
5252
.status()
5353
.unwrap();
5454
}
@@ -61,48 +61,68 @@ impl InstantServer {
6161
.join("instant_servers")
6262
.join(instant_server_id.clone());
6363
std::fs::create_dir_all(&working_dir).unwrap();
64-
let mq_amqp = if std::env::var("COLINK_SERVER_MQ_AMQP").is_ok() {
65-
std::env::var("COLINK_SERVER_MQ_AMQP").unwrap()
64+
let mq_uri = if std::env::var("COLINK_SERVER_MQ_URI").is_ok() {
65+
Some(std::env::var("COLINK_SERVER_MQ_URI").unwrap())
6666
} else {
67-
"amqp://guest:guest@localhost:5672".to_string()
67+
None
6868
};
6969
let mq_api = if std::env::var("COLINK_SERVER_MQ_API").is_ok() {
70-
std::env::var("COLINK_SERVER_MQ_API").unwrap()
70+
Some(std::env::var("COLINK_SERVER_MQ_API").unwrap())
7171
} else {
72-
"http://guest:guest@localhost:15672/api".to_string()
72+
None
7373
};
74-
let (mq_amqp, mq_api) = std::thread::spawn(move || {
74+
let (mq_uri, mq_api) = std::thread::spawn(move || {
7575
tokio::runtime::Builder::new_multi_thread()
7676
.enable_all()
7777
.build()
7878
.unwrap()
7979
.block_on(async {
80-
let res = reqwest::get(&mq_api).await.unwrap();
81-
assert!(res.status() == hyper::StatusCode::OK);
82-
lapin::Connection::connect(&mq_amqp, lapin::ConnectionProperties::default())
83-
.await
84-
.unwrap();
80+
if mq_uri.is_some() {
81+
let mq_uri = mq_uri.clone().unwrap();
82+
if mq_uri.starts_with("amqp") {
83+
lapin::Connection::connect(
84+
&mq_uri,
85+
lapin::ConnectionProperties::default(),
86+
)
87+
.await
88+
.unwrap();
89+
if mq_api.is_some() {
90+
let res = reqwest::get(&mq_api.clone().unwrap()).await.unwrap();
91+
assert!(res.status() == hyper::StatusCode::OK);
92+
}
93+
} else if mq_uri.starts_with("redis") {
94+
let client = redis::Client::open(mq_uri).unwrap();
95+
let _con = client.get_async_connection().await.unwrap();
96+
} else {
97+
panic!("mq_uri({}) is not supported.", mq_uri);
98+
}
99+
}
85100
});
86-
(mq_amqp, mq_api)
101+
(mq_uri, mq_api)
87102
})
88103
.join()
89104
.unwrap();
105+
let mut args = vec![
106+
"--address".to_string(),
107+
"0.0.0.0".to_string(),
108+
"--port".to_string(),
109+
port.to_string(),
110+
"--mq-prefix".to_string(),
111+
format!("colink-instant-server-{}", port),
112+
"--core-uri".to_string(),
113+
format!("http://127.0.0.1:{}", port),
114+
"--inter-core-reverse-mode".to_string(),
115+
];
116+
if let Some(mq_uri) = mq_uri {
117+
args.push("--mq-uri".to_string());
118+
args.push(mq_uri);
119+
}
120+
if let Some(mq_api) = mq_api {
121+
args.push("--mq-api".to_string());
122+
args.push(mq_api);
123+
}
90124
let child = Command::new(program)
91-
.args([
92-
"--address",
93-
"0.0.0.0",
94-
"--port",
95-
&port.to_string(),
96-
"--mq-amqp",
97-
&mq_amqp,
98-
"--mq-api",
99-
&mq_api,
100-
"--mq-prefix",
101-
&format!("colink-instant-server-{}", port),
102-
"--core-uri",
103-
&format!("http://127.0.0.1:{}", port),
104-
"--inter-core-reverse-mode",
105-
])
125+
.args(&args)
106126
.env("COLINK_HOME", colink_home)
107127
.current_dir(working_dir.clone())
108128
.spawn()

src/protocol.rs

+7-28
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
use crate::{application::*, utils::get_path_timestamp};
22
pub use async_trait::async_trait;
33
use clap::Parser;
4-
use futures_lite::stream::StreamExt;
5-
use lapin::{
6-
options::{BasicAckOptions, BasicConsumeOptions, BasicQosOptions},
7-
types::FieldTable,
8-
Connection, ConnectionProperties, Consumer,
9-
};
104
use prost::Message;
115
use rand::Rng;
126
use std::{
137
collections::{HashMap, HashSet},
148
sync::{Arc, Mutex},
159
thread,
1610
};
17-
use tracing::{debug, error};
11+
use tracing::error;
1812

1913
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
2014

@@ -50,12 +44,9 @@ impl CoLinkProtocol {
5044
}
5145

5246
pub async fn start(&self) -> Result<(), Error> {
53-
let mut consumer = self.get_mq_consumer().await?;
54-
while let Some(delivery) = consumer.next().await {
55-
let delivery = delivery.expect("error in consumer");
56-
let data = String::from_utf8_lossy(&delivery.data);
57-
debug!("Received [{}]", data);
58-
let message: SubscriptionMessage = prost::Message::decode(&*delivery.data).unwrap();
47+
let mut subscriber = self.get_subscriber().await?;
48+
while let Ok(data) = subscriber.get_next().await {
49+
let message: SubscriptionMessage = prost::Message::decode(&*data).unwrap();
5950
if message.change_type != "delete" {
6051
let task_id: Task = prost::Message::decode(&*message.payload).unwrap();
6152
let res = self
@@ -74,7 +65,6 @@ impl CoLinkProtocol {
7465
Err(e) => error!("Pull Task Error: {}.", e),
7566
}
7667
}
77-
delivery.ack(BasicAckOptions::default()).await.unwrap();
7868
}
7969

8070
Ok(())
@@ -115,7 +105,7 @@ impl CoLinkProtocol {
115105
Ok(())
116106
}
117107

118-
async fn get_mq_consumer(&self) -> Result<Consumer, Error> {
108+
async fn get_subscriber(&self) -> Result<CoLinkSubscriber, Error> {
119109
let operator_mq_key = format!("_internal:protocols:{}:operator_mq", self.protocol_and_role);
120110
let lock = self.cl.lock(&operator_mq_key).await?;
121111
let res = self
@@ -172,19 +162,8 @@ impl CoLinkProtocol {
172162
};
173163
self.cl.unlock(lock).await?;
174164

175-
let mq_addr = self.cl.request_info().await?.mq_uri;
176-
let mq = Connection::connect(&mq_addr, ConnectionProperties::default()).await?;
177-
let channel = mq.create_channel().await?;
178-
channel.basic_qos(1, BasicQosOptions::default()).await?;
179-
let consumer = channel
180-
.basic_consume(
181-
&queue_name,
182-
"",
183-
BasicConsumeOptions::default(),
184-
FieldTable::default(),
185-
)
186-
.await?;
187-
Ok(consumer)
165+
let subscriber = self.cl.new_subscriber(&queue_name).await?;
166+
Ok(subscriber)
188167
}
189168
}
190169

tests/download-server.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
set -e
33
rm -rf colink-server
44
mkdir colink-server && cd colink-server
5-
wget https://github.com/CoLearn-Dev/colink-server-dev/releases/download/v0.2.9/colink-server-linux-x86_64.tar.gz
5+
wget https://github.com/CoLearn-Dev/colink-server-dev/releases/download/v0.3.0/colink-server-linux-x86_64.tar.gz
66
tar -xzf colink-server-linux-x86_64.tar.gz
77
touch user_init_config.toml # create an empty user init config to prevent automatically starting protocols when importing users.
88
cd ..

0 commit comments

Comments
 (0)