Skip to content

Commit 5a0bc44

Browse files
authored
Merge pull request #61 from CoLearn-Dev/storage_macro_rdbc
RDBC support for storage macro extension
2 parents 634c6f8 + 08a8f49 commit 5a0bc44

File tree

6 files changed

+165
-5
lines changed

6 files changed

+165
-5
lines changed

.github/workflows/check.yml

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ jobs:
3030
run: |
3131
brew install docker
3232
colima start
33+
- name: Start container (MySQL)
34+
run: |
35+
docker run --name mysql -e MYSQL_ROOT_PASSWORD=password -e MYSQL_DATABASE=test_db -p 3306:3306 -d mysql:8.0
36+
MYSQL_HOST="127.0.0.1"
37+
echo "MYSQL_DATABASE_URL=mysql://root:password@${MYSQL_HOST}:3306/test_db" >> $GITHUB_ENV
3338
- name: Start container (mq)
3439
if: ${{ matrix.mq != 'standalone' }}
3540
run: docker run -d -p 5672:5672 -p 15672:15672 -p 16379:6379 ${{ matrix.docker_image }}

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "colink"
3-
version = "0.3.7"
3+
version = "0.3.8"
44
edition = "2021"
55
description = "CoLink Rust SDK"
66
license = "MIT"
@@ -23,6 +23,7 @@ lapin = "2.1"
2323
prost = "0.10"
2424
rand = { version = "0.8", features = ["std_rng"] }
2525
rcgen = { version = "0.10", optional = true }
26+
rdbc2 = { version = "0.2.2", optional = true }
2627
redis = { version = "0.22", features = ["tokio-comp"] }
2728
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
2829
secp256k1 = { version = "0.25", features = ["rand-std"] }
@@ -49,4 +50,4 @@ variable_transfer = ["extensions", "remote_storage", "hyper", "jsonwebtoken", "r
4950
registry = []
5051
policy_module = []
5152
instant_server = ["reqwest"]
52-
storage_macro = ["async-recursion"]
53+
storage_macro = ["async-recursion", "rdbc2"]

src/extensions/instant_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl InstantServer {
8181
.join(instant_server_id.clone());
8282
std::fs::create_dir_all(&working_dir).unwrap();
8383
let mut user_init_config_file =
84-
std::fs::File::create(&Path::new(&working_dir).join("user_init_config.toml")).unwrap();
84+
std::fs::File::create(Path::new(&working_dir).join("user_init_config.toml")).unwrap();
8585
user_init_config_file
8686
.write_all(user_init_config.as_bytes())
8787
.unwrap();

src/extensions/storage_macro.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use crate::StorageEntry;
2+
13
mod append;
24
mod chunk;
5+
mod dbc;
36
mod fs;
47
mod redis;
5-
use crate::StorageEntry;
68

79
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
810

@@ -64,6 +66,7 @@ impl crate::application::CoLink {
6466
match macro_type.as_str() {
6567
"chunk" => self._read_entry_chunk(&string_before).await,
6668
"redis" => self._read_entry_redis(&string_before, &string_after).await,
69+
"dbc" => self._read_entry_dbc(&string_before, &string_after).await,
6770
"fs" => self._read_entry_fs(&string_before, &string_after).await,
6871
_ => Err(format!(
6972
"invalid storage macro, found {} in key name {}",
@@ -136,7 +139,7 @@ impl crate::application::CoLink {
136139
"invalid storage macro, found {} in prefix {}",
137140
macro_type, key_name_prefix
138141
)
139-
.into())
142+
.into());
140143
}
141144
};
142145
let mut res: Vec<StorageEntry> = Vec::new();

src/extensions/storage_macro/dbc.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use async_recursion::async_recursion;
2+
use rdbc2;
3+
4+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
5+
6+
impl crate::application::CoLink {
7+
#[async_recursion]
8+
async fn _search_and_generate_query(
9+
&self,
10+
string_before_dbc: &str,
11+
string_after_dbc: &str,
12+
) -> Result<(String, Vec<String>), Error> {
13+
let split_key_path: Vec<&str> = string_after_dbc.split(':').collect();
14+
for i in (0..split_key_path.len()).rev() {
15+
let current_key_path =
16+
format!("{}:{}", string_before_dbc, split_key_path[0..=i].join(":"));
17+
let payload = self.read_entry(current_key_path.as_str()).await;
18+
if let Ok(payload) = payload {
19+
let query_string = String::from_utf8(payload)?;
20+
let count = query_string.matches('?').count();
21+
if count != split_key_path.len() - (i + 1) {
22+
return Err("Number of parameters does not match specified query string")?;
23+
}
24+
let params = split_key_path[(i + 1)..]
25+
.iter()
26+
.map(|x| x.to_string())
27+
.collect::<Vec<String>>();
28+
return Ok((query_string, params));
29+
}
30+
}
31+
Err("no query string found.")?
32+
}
33+
34+
#[async_recursion]
35+
pub(crate) async fn _read_entry_dbc(
36+
&self,
37+
string_before_dbc: &str,
38+
string_after_dbc: &str,
39+
) -> Result<Vec<u8>, Error> {
40+
let url_key = format!("{}:url", string_before_dbc);
41+
let url = self.read_entry(url_key.as_str()).await?;
42+
let url_string = String::from_utf8(url)?;
43+
let (query_string, params) = self
44+
._search_and_generate_query(string_before_dbc, string_after_dbc)
45+
.await?;
46+
let params: Vec<&str> = params.iter().map(AsRef::as_ref).collect();
47+
let mut database = rdbc2::dbc::Database::new(url_string.as_str())?;
48+
let result = database.execute_query_with_params(query_string.as_str(), &params)?;
49+
let seralized_result = serde_json::to_vec(&result)?;
50+
Ok(seralized_result)
51+
}
52+
}

tests/test_storage_macro_dbc.rs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use common::*;
2+
mod common;
3+
4+
fn _get_mysql_connection_url() -> String {
5+
if std::env::var("MYSQL_DATABASE_URL").is_ok() {
6+
std::env::var("MYSQL_DATABASE_URL").unwrap()
7+
} else {
8+
panic!("Please set the environment variable MYSQL_DATABASE_URL to run this test.")
9+
}
10+
}
11+
12+
#[tokio::test]
13+
async fn test_storage_macro_dbc_mysql(
14+
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
15+
let (_ir, _is, cl) = set_up_test_env_single_user().await?;
16+
17+
cl.create_entry(
18+
"storage_macro_test:db:url",
19+
_get_mysql_connection_url().as_bytes(),
20+
)
21+
.await?;
22+
23+
cl.create_entry(
24+
"storage_macro_test:db:create_db",
25+
b"CREATE DATABASE IF NOT EXISTS test_db" as &[u8],
26+
)
27+
.await?;
28+
29+
// Create a table and insert dummy data
30+
cl.create_entry(
31+
"storage_macro_test:db:create_table",
32+
b"CREATE TABLE IF NOT EXISTS users (name VARCHAR(255), age INT)" as &[u8],
33+
)
34+
.await?;
35+
36+
cl.create_entry(
37+
"storage_macro_test:db:insert_data",
38+
b"INSERT INTO users VALUES ('Alice', 20)" as &[u8],
39+
)
40+
.await?;
41+
42+
cl.create_entry(
43+
"storage_macro_test:db:query_users",
44+
b"SELECT * FROM users WHERE name = ? AND age = ?" as &[u8],
45+
)
46+
.await?;
47+
48+
cl.create_entry(
49+
"storage_macro_test:db:cleanup",
50+
b"DROP TABLE IF EXISTS users" as &[u8],
51+
)
52+
.await?;
53+
54+
cl.read_entry("storage_macro_test:db:$dbc:create_db")
55+
.await?;
56+
cl.read_entry("storage_macro_test:db:$dbc:create_table")
57+
.await?;
58+
cl.read_entry("storage_macro_test:db:$dbc:insert_data")
59+
.await?;
60+
let query_result = cl
61+
.read_entry("storage_macro_test:db:$dbc:query_users:'Alice':'20'")
62+
.await?;
63+
64+
let stringified = String::from_utf8(query_result.clone())?;
65+
println!("{}", stringified);
66+
assert_eq!(
67+
stringified,
68+
r#"{"rows":[{"values":[{"Bytes":"QWxpY2U"},{"Int":20}],"columns":[{"name":"name","column_type":"VARCHAR"},{"name":"age","column_type":"INT"}]}],"affected_row_count":0}"#
69+
);
70+
71+
let deserialized: rdbc2::dbc::QueryResult = serde_json::from_slice(&query_result)?;
72+
assert_eq!(deserialized.rows.len(), 1);
73+
74+
// Test query string parsing order
75+
cl.create_entry(
76+
"storage_macro_test:db:query_users2",
77+
b"SELECT * FROM users WHERE name = ? AND age = ?" as &[u8],
78+
)
79+
.await?;
80+
81+
cl.create_entry(
82+
"storage_macro_test:db:query_users2:additional",
83+
b"SELECT * FROM users WHERE name = ?" as &[u8],
84+
)
85+
.await?;
86+
87+
let result = cl
88+
.read_entry("storage_macro_test:db:$dbc:query_users2:additional:'Alice'")
89+
.await?;
90+
91+
assert_eq!(
92+
String::from_utf8(result)?,
93+
r#"{"rows":[{"values":[{"Bytes":"QWxpY2U"},{"Int":20}],"columns":[{"name":"name","column_type":"VARCHAR"},{"name":"age","column_type":"INT"}]}],"affected_row_count":0}"#
94+
);
95+
96+
cl.read_entry("storage_macro_test:db:$dbc:cleanup").await?;
97+
98+
Ok(())
99+
}

0 commit comments

Comments
 (0)