Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/fiber-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [
tentacle = { version = "0.7", default-features = false, features = [
"upnp",
"parking_lot",
"unstable",
"openssl-vendored",
"tokio-runtime",
"tokio-timer",
Expand Down Expand Up @@ -112,6 +113,7 @@ jsonrpsee = { version = "0.25.1", features = [
"wasm-client",
] }
tentacle = { version = "0.7", default-features = false, features = [
"unstable",
"wasm-timer",
] }
tokio = { version = "1", features = ["io-util", "macros", "rt", "sync"] }
Expand Down
187 changes: 114 additions & 73 deletions crates/fiber-lib/src/fiber/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
cmp::max,
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
sync::{Arc, Mutex, Once},
time::Duration,
};

Expand All @@ -14,20 +14,19 @@ use ckb_types::{
packed::OutPoint,
prelude::Unpack,
};
use futures::StreamExt as _;
use ractor::{
call, call_t, concurrency::JoinHandle, Actor, ActorCell, ActorProcessingErr, ActorRef,
ActorRuntime, MessagingErr, OutputPort, RpcReplyPort, SupervisionEvent,
};
use strum::AsRefStr;
use tentacle::{
async_trait as tasync_trait,
builder::MetaBuilder,
bytes::Bytes,
context::{ProtocolContext, ProtocolContextMutRef, SessionContext},
service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, SessionType},
traits::ServiceProtocol,
context::SessionContext,
service::{ProtocolMeta, ServiceAsyncControl, SessionType},
traits::ProtocolSpawn,
utils::{is_reachable, multiaddr_to_socketaddr},
SessionId,
SessionId, SubstreamReadPart,
};
use tokio::sync::oneshot;
use tokio_util::codec::length_delimited;
Expand All @@ -38,7 +37,7 @@ use crate::fiber::network::DEFAULT_CHAIN_ACTOR_TIMEOUT;
use crate::{
ckb::{client::CkbChainClient, CkbChainMessage, GetTxResponse},
fiber::network::MAX_SERVICE_PROTOCOAL_DATA_SIZE,
now_timestamp_as_millis_u64, unwrap_or_return,
now_timestamp_as_millis_u64,
utils::actor::ActorHandleLogGuard,
Error,
};
Expand Down Expand Up @@ -2044,7 +2043,30 @@ async fn send_message_to_session(

pub(crate) struct GossipProtocolHandle {
actor: ActorRef<GossipActorMessage>,
sender: Option<oneshot::Sender<ServiceAsyncControl>>,
sender: Arc<OneTimeSender<ServiceAsyncControl>>,
}

struct OneTimeSender<T> {
sender: Mutex<Option<oneshot::Sender<T>>>,
once: Once,
}

impl<T> OneTimeSender<T> {
fn new(sender: oneshot::Sender<T>) -> Self {
Self {
sender: Mutex::new(Some(sender)),
once: Once::new(),
}
}

fn send_once(&self, value: T) {
self.once.call_once(move || {
let sender = self.sender.lock().expect("lock one-time sender").take();
if let Some(sender) = sender {
let _ = sender.send(value);
}
});
}
}

async fn get_message_cursor<S: GossipMessageStore>(
Expand Down Expand Up @@ -2451,7 +2473,7 @@ impl GossipProtocolHandle {
) -> Self {
Self {
actor,
sender: Some(sender),
sender: Arc::new(OneTimeSender::new(sender)),
}
}

Expand All @@ -2469,10 +2491,7 @@ impl GossipProtocolHandle {
.new_codec(),
)
})
.service_handle(move || {
let handle = Box::new(self);
ProtocolHandle::Callback(handle)
})
.protocol_spawn(self)
.build()
}
}
Expand Down Expand Up @@ -2848,80 +2867,102 @@ where
}
}

#[tasync_trait]
impl ServiceProtocol for GossipProtocolHandle {
async fn init(&mut self, context: &mut ProtocolContext) {
let sender = self
.sender
.take()
.expect("service control sender set and init called once");
if sender.send(context.control().clone()).is_err() {
panic!("Failed to send service control");
}
}

async fn connected(&mut self, context: ProtocolContextMutRef<'_>, version: &str) {
impl ProtocolSpawn for GossipProtocolHandle {
fn spawn(
&self,
context: Arc<SessionContext>,
control: &ServiceAsyncControl,
mut read_part: SubstreamReadPart,
) {
self.sender.send_once(control.clone());
trace!(
"proto id [{}] open on session [{}], address: [{}], type: [{:?}], version: {}",
context.proto_id,
context.session.id,
context.session.address,
context.session.ty,
version
read_part.protocol_id(),
context.id,
context.address,
context.ty,
"spawn"
);

if let Some(remote_pubkey) = context.session.remote_pubkey.clone() {
if let Some(remote_pubkey) = context.remote_pubkey.clone() {
let _ = self.actor.send_message(GossipActorMessage::PeerConnected(
remote_pubkey.into(),
context.session.clone(),
context.as_ref().clone(),
));
} else {
warn!("Peer connected without remote pubkey {:?}", context.session);
warn!("Peer connected without remote pubkey {:?}", context);
}
}

async fn disconnected(&mut self, context: ProtocolContextMutRef<'_>) {
trace!(
"proto id [{}] close on session [{}], address: [{}], type: [{:?}]",
context.proto_id,
context.session.id,
&context.session.address,
&context.session.ty
);
let actor = self.actor.clone();
tokio::spawn(async move {
while let Some(frame) = read_part.next().await {
let data = match frame {
Ok(data) => data,
Err(err) => {
warn!("Failed to read gossip protocol stream data: {:?}", err);
break;
}
};
let message = match GossipMessage::from_molecule_slice(&data) {
Ok(message) => message,
Err(err) => {
warn!("Failed to parse gossip protocol message: {:?}", err);
continue;
}
};
match context.remote_pubkey.as_ref() {
Some(pubkey) => {
let _ = actor.send_message(GossipActorMessage::GossipMessageReceived(
GossipMessageWithTarget {
target: pubkey.clone().into(),
message,
},
));
}
None => {
unreachable!("Received message without remote pubkey");
}
}
}

match context.session.remote_pubkey.as_ref() {
Some(remote_pubkey) => {
let _ = self
.actor
.send_message(GossipActorMessage::PeerDisconnected(
trace!(
"proto id [{}] close on session [{}], address: [{}], type: [{:?}]",
read_part.protocol_id(),
context.id,
&context.address,
&context.ty
);

match context.remote_pubkey.as_ref() {
Some(remote_pubkey) => {
let _ = actor.send_message(GossipActorMessage::PeerDisconnected(
remote_pubkey.clone().into(),
context.session.clone(),
context.as_ref().clone(),
));
}
None => {
unreachable!("Received message without remote pubkey");
}
}
None => {
unreachable!("Received message without remote pubkey");
}
}
});
}
}

async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: Bytes) {
let message = unwrap_or_return!(GossipMessage::from_molecule_slice(&data), "parse message");
match context.session.remote_pubkey.as_ref() {
Some(pubkey) => {
let _ = self
.actor
.send_message(GossipActorMessage::GossipMessageReceived(
GossipMessageWithTarget {
target: pubkey.clone().into(),
message,
},
));
}
None => {
unreachable!("Received message without remote pubkey");
}
}
}
#[cfg(test)]
mod tests {
use tokio::sync::oneshot;

use super::OneTimeSender;

async fn notify(&mut self, _context: &mut ProtocolContext, _token: u64) {}
#[tokio::test]
async fn test_one_time_sender_only_sends_once() {
let (tx, rx) = oneshot::channel();
let sender = OneTimeSender::new(tx);

sender.send_once(1_u8);
sender.send_once(2_u8);

let value = rx.await.expect("receiver should get first value");
assert_eq!(value, 1);
}
}
Loading
Loading