Skip to content

Commit 0886f6e

Browse files
authored
Merge pull request #26 from CoLearn-Dev/instant-server
- instant server; wait_user_init; protocol_attach; switch_to_generated_user; bugfix
2 parents e55f753 + b7d887f commit 0886f6e

13 files changed

+296
-27
lines changed

.github/workflows/check.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
on: [push]
1+
on:
2+
push:
3+
branches:
4+
- main
5+
pull_request:
26

37
name: check
48

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "colink"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
edition = "2021"
55
description = "CoLink Rust SDK"
66
license = "MIT"
@@ -13,7 +13,6 @@ repository = "https://github.com/CoLearn-Dev/colink-sdk-rust-dev"
1313
async-trait = "0.1"
1414
base64 = "0.13.0"
1515
chrono = "0.4"
16-
ctrlc = { version = "3.2", features = ["termination"] }
1716
futures-lite = "1.12"
1817
lapin = "2.1"
1918
prost = "0.10"
@@ -34,9 +33,10 @@ prost-build = "0.10"
3433
tonic-build = "0.7"
3534

3635
[features]
37-
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module"]
36+
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module", "instant_server"]
3837
extensions = []
3938
remote_storage = ["extensions"]
4039
variable_transfer = ["extensions", "remote_storage"]
4140
registry = []
4241
policy_module = []
42+
instant_server = []

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
99
Add this to your Cargo.toml:
1010
```toml
1111
[dependencies]
12-
colink = "0.2.0"
12+
colink = "0.2.1"
1313
```
1414

1515
## Getting Started
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#![allow(unused_variables)]
2+
use colink::{extensions::instant_server::InstantServer, CoLink, Participant, ProtocolEntry};
3+
4+
struct Initiator;
5+
#[colink::async_trait]
6+
impl ProtocolEntry for Initiator {
7+
async fn start(
8+
&self,
9+
cl: CoLink,
10+
param: Vec<u8>,
11+
participants: Vec<Participant>,
12+
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
13+
println!("initiator");
14+
Ok(())
15+
}
16+
}
17+
18+
struct Receiver;
19+
#[colink::async_trait]
20+
impl ProtocolEntry for Receiver {
21+
async fn start(
22+
&self,
23+
cl: CoLink,
24+
param: Vec<u8>,
25+
participants: Vec<Participant>,
26+
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
27+
println!("{}", String::from_utf8_lossy(&param));
28+
cl.create_entry(&format!("tasks:{}:output", cl.get_task_id()?), &param)
29+
.await?;
30+
Ok(())
31+
}
32+
}
33+
34+
#[tokio::main]
35+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
36+
let is0 = InstantServer::new();
37+
let is1 = InstantServer::new();
38+
let cl0 = is0.get_colink().switch_to_generated_user().await?;
39+
let cl1 = is0.get_colink().switch_to_generated_user().await?;
40+
colink::protocol_attach!(
41+
cl0,
42+
("greetings:initiator", Initiator),
43+
("greetings:receiver", Receiver)
44+
);
45+
colink::protocol_attach!(
46+
cl1,
47+
("greetings:initiator", Initiator),
48+
("greetings:receiver", Receiver)
49+
);
50+
let participants = vec![
51+
Participant {
52+
user_id: cl0.get_user_id()?,
53+
role: "initiator".to_string(),
54+
},
55+
Participant {
56+
user_id: cl1.get_user_id()?,
57+
role: "receiver".to_string(),
58+
},
59+
];
60+
let task_id = cl0
61+
.run_task("greetings", "test".as_bytes(), &participants, true)
62+
.await?;
63+
let res = cl1
64+
.read_or_wait(&format!("tasks:{}:output", task_id))
65+
.await?;
66+
println!("{}", String::from_utf8_lossy(&res));
67+
Ok(())
68+
}

examples/user_wait_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
2424
role: "query_from_registries".to_string(),
2525
}];
2626
let task_id = cl
27-
.run_task("registry", target_user.as_bytes(), &participants, false)
27+
.run_task("registry", &payload, &participants, false)
2828
.await?;
2929
println!(
3030
"Task {} has been created, waiting for it to finish...",

src/application.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,19 @@ impl CoLink {
4545
}
4646
}
4747

48-
pub fn ca_certificate(self, ca_certificate: &str) -> Self {
48+
pub fn ca_certificate(mut self, ca_certificate: &str) -> Self {
4949
let ca_certificate = std::fs::read(ca_certificate).unwrap();
5050
let ca_certificate = Certificate::from_pem(ca_certificate);
51-
Self {
52-
ca_certificate: Some(ca_certificate),
53-
..self
54-
}
51+
self.ca_certificate = Some(ca_certificate);
52+
self
5553
}
5654

57-
pub fn identity(self, client_cert: &str, client_key: &str) -> Self {
55+
pub fn identity(mut self, client_cert: &str, client_key: &str) -> Self {
5856
let client_cert = std::fs::read(client_cert).unwrap();
5957
let client_key = std::fs::read(client_key).unwrap();
6058
let identity = Identity::from_pem(client_cert, client_key);
61-
Self {
62-
identity: Some(identity),
63-
..self
64-
}
59+
self.identity = Some(identity);
60+
self
6561
}
6662

6763
async fn _grpc_connect(&self, address: &str) -> Result<CoLinkClient<Channel>, Error> {

src/extensions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#[cfg(feature = "extensions")]
22
mod get_participant_id;
3+
#[cfg(feature = "instant_server")]
4+
pub mod instant_server;
35
#[cfg(feature = "extensions")]
46
mod lock;
57
#[cfg(feature = "policy_module")]
@@ -10,7 +12,11 @@ mod read_or_wait;
1012
pub mod registry;
1113
#[cfg(feature = "remote_storage")]
1214
mod remote_storage;
15+
#[cfg(feature = "extensions")]
16+
mod switch_to_generated_user;
1317
#[cfg(feature = "variable_transfer")]
1418
mod variable_transfer;
1519
#[cfg(feature = "extensions")]
1620
mod wait_task;
21+
#[cfg(feature = "extensions")]
22+
mod wait_user_init;

src/extensions/instant_server.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use crate::{utils::get_colink_home, CoLink};
2+
use rand::Rng;
3+
use std::{
4+
path::Path,
5+
process::{Child, Command, Stdio},
6+
};
7+
8+
pub struct InstantServer {
9+
id: String,
10+
port: i32,
11+
host_token: String,
12+
process: Child,
13+
}
14+
15+
impl Drop for InstantServer {
16+
fn drop(&mut self) {
17+
Command::new("pkill")
18+
.arg("-9")
19+
.arg("-P")
20+
.arg(&self.process.id().to_string())
21+
.stdout(Stdio::null())
22+
.stderr(Stdio::null())
23+
.status()
24+
.unwrap();
25+
self.process.kill().unwrap();
26+
let colink_home = get_colink_home().unwrap();
27+
let working_dir = Path::new(&colink_home)
28+
.join("instant_servers")
29+
.join(self.id.clone());
30+
std::fs::remove_dir_all(&working_dir).unwrap();
31+
}
32+
}
33+
34+
impl Default for InstantServer {
35+
fn default() -> Self {
36+
Self::new()
37+
}
38+
}
39+
40+
impl InstantServer {
41+
pub fn new() -> Self {
42+
let colink_home = get_colink_home().unwrap();
43+
let program = Path::new(&colink_home).join("colink-server");
44+
if std::fs::metadata(program.clone()).is_err() {
45+
Command::new("bash")
46+
.arg("-c")
47+
.arg("bash -c \"$(curl -fsSL https://raw.githubusercontent.com/CoLearn-Dev/colinkctl/main/install_colink.sh)\"")
48+
.status()
49+
.unwrap();
50+
}
51+
let instant_server_id = uuid::Uuid::new_v4().to_string();
52+
let mut port = rand::thread_rng().gen_range(10000..20000);
53+
while std::net::TcpStream::connect(&format!("127.0.0.1:{}", port)).is_ok() {
54+
port = rand::thread_rng().gen_range(10000..20000);
55+
}
56+
let working_dir = Path::new(&colink_home)
57+
.join("instant_servers")
58+
.join(instant_server_id.clone());
59+
std::fs::create_dir_all(&working_dir).unwrap();
60+
let child = Command::new(program)
61+
.args([
62+
"--address",
63+
"0.0.0.0",
64+
"--port",
65+
&port.to_string(),
66+
"--mq-amqp",
67+
"amqp://guest:guest@localhost:5672",
68+
"--mq-api",
69+
"http://guest:guest@localhost:15672/api",
70+
"--mq-prefix",
71+
&format!("colink-instant-server-{}", port),
72+
"--core-uri",
73+
&format!("http://127.0.0.1:{}", port),
74+
"--inter-core-reverse-mode",
75+
])
76+
.env("COLINK_HOME", colink_home)
77+
.current_dir(working_dir.clone())
78+
.stdout(Stdio::null())
79+
.stderr(Stdio::null())
80+
.spawn()
81+
.unwrap();
82+
loop {
83+
if std::fs::metadata(working_dir.join("host_token.txt")).is_ok()
84+
&& std::net::TcpStream::connect(&format!("127.0.0.1:{}", port)).is_ok()
85+
{
86+
break;
87+
}
88+
std::thread::sleep(core::time::Duration::from_millis(10));
89+
}
90+
let host_token: String =
91+
String::from_utf8_lossy(&std::fs::read(working_dir.join("host_token.txt")).unwrap())
92+
.parse()
93+
.unwrap();
94+
Self {
95+
id: instant_server_id,
96+
port,
97+
host_token,
98+
process: child,
99+
}
100+
}
101+
102+
pub fn get_colink(&self) -> CoLink {
103+
CoLink::new(&format!("http://127.0.0.1:{}", self.port), &self.host_token)
104+
}
105+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::decode_jwt_without_validation;
2+
3+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
4+
5+
impl crate::application::CoLink {
6+
async fn generate_user_and_import(&self) -> Result<String, Error> {
7+
let auth_content = decode_jwt_without_validation(&self.jwt)?;
8+
let expiration_timestamp = auth_content.exp;
9+
let (pk, sk) = crate::generate_user();
10+
let (_, core_pub_key, _) = self.request_info().await?;
11+
let (signature_timestamp, sig) =
12+
crate::prepare_import_user_signature(&pk, &sk, &core_pub_key, expiration_timestamp);
13+
self.import_user(&pk, signature_timestamp, expiration_timestamp, &sig)
14+
.await
15+
}
16+
17+
pub async fn switch_to_generated_user(&self) -> Result<Self, Error> {
18+
let cl = Self::new(&self.core_addr, &self.generate_user_and_import().await?);
19+
cl.wait_user_init().await?;
20+
Ok(cl)
21+
}
22+
}

src/extensions/wait_user_init.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use crate::{colink_proto::*, utils::get_path_timestamp};
2+
use prost::Message;
3+
use tracing::debug;
4+
5+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
6+
7+
impl crate::application::CoLink {
8+
pub async fn wait_user_init(&self) -> Result<(), Error> {
9+
let is_initialized_key = "_internal:_is_initialized";
10+
let start_timestamp = match self
11+
.read_entries(&[StorageEntry {
12+
key_name: is_initialized_key.to_string(),
13+
..Default::default()
14+
}])
15+
.await
16+
{
17+
Ok(res) => {
18+
if res[0].payload[0] == 1 {
19+
return Ok(());
20+
}
21+
get_path_timestamp(&res[0].key_path) + 1
22+
}
23+
Err(_) => 0,
24+
};
25+
let queue_name = self
26+
.subscribe(is_initialized_key, Some(start_timestamp))
27+
.await?;
28+
let mut subscriber = self.new_subscriber(&queue_name).await?;
29+
loop {
30+
let data = subscriber.get_next().await?;
31+
debug!("Received [{}]", String::from_utf8_lossy(&data));
32+
let message: SubscriptionMessage = Message::decode(&*data).unwrap();
33+
if message.change_type != "delete" && message.payload[0] == 1 {
34+
break;
35+
}
36+
}
37+
self.unsubscribe(&queue_name).await?;
38+
Ok(())
39+
}
40+
}

0 commit comments

Comments
 (0)