diff --git a/wallet-connect/src/client/core.rs b/wallet-connect/src/client/core.rs index 1ac67445..65b233c8 100644 --- a/wallet-connect/src/client/core.rs +++ b/wallet-connect/src/client/core.rs @@ -16,6 +16,7 @@ use ethers::prelude::{Address, JsonRpcClient}; use rand::Rng; use serde::{de::DeserializeOwned, Serialize}; use std::sync::{atomic::AtomicBool, Arc}; +use std::time::Duration; use thiserror::Error; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{oneshot, Mutex}; @@ -30,11 +31,14 @@ pub struct Context { pub session: Mutex, /// indicates whether the session is being established pub session_pending: AtomicBool, + /// record the time of the request and have a regular cleanup + pub pending_requests_timeout: Duration, + /// limit pending requests size + pub pending_requests_limit: usize, /// the map of the requests that were sent to the wallet /// and the client app is awaiting a response. /// When the response is received, the request is removed /// and the response is sent to the receiver via the one-shot channel. - /// TODO: limit size or record the time of the request and have a regular cleanup? pub pending_requests: DashMap>, } @@ -49,6 +53,8 @@ impl SharedContext { Self(Arc::new(Context { session: Mutex::new(session), session_pending: AtomicBool::new(false), + pending_requests_timeout: Duration::from_millis(60000), + pending_requests_limit: 2, pending_requests: DashMap::new(), })) } diff --git a/wallet-connect/src/client/socket.rs b/wallet-connect/src/client/socket.rs index 6fc02ffc..8e1db520 100644 --- a/wallet-connect/src/client/socket.rs +++ b/wallet-connect/src/client/socket.rs @@ -8,6 +8,7 @@ use futures::{future, SinkExt, TryStreamExt}; pub use native::*; use serde::{de::DeserializeOwned, Serialize}; use thiserror::Error; +use tokio::time::timeout; use tokio::{ sync::{ mpsc::{unbounded_channel, UnboundedSender}, @@ -31,7 +32,6 @@ use eyre::{eyre, Context}; #[derive(Debug)] pub struct Socket { /// queue for messages to be sent to the bridge server - /// TODO: make it bounded? sender: UnboundedSender<(Option, Vec)>, /// the handle of the task that writes on the websocket connection _write_handle: JoinHandle<()>, @@ -117,7 +117,12 @@ impl Socket { context: &SharedContext, ) -> eyre::Result { let (tx, rx) = oneshot::channel(); - context.0.pending_requests.insert(id, tx); + if context.0.pending_requests.len() >= context.0.pending_requests_limit { + return Err(eyre!("Reached the limit ({}) pending requests, please clear all pending requests before making new requests", context.0.pending_requests.len())); + } else { + context.0.pending_requests.insert(id, tx); + } + let session = context.0.session.lock().await; let topic = session .info @@ -133,14 +138,44 @@ impl Socket { }; drop(session); self.send_socket_msg(context, id, message)?; - let response = rx.await?; - let code = response["code"].as_i64(); - if let Some(value) = code { - if -32000 == value { - return Err(eyre!("{}", serde_json::to_string(&response)?)); + // Wrap the future with a `Timeout` set to expire in `pending_requests_timeout` Duration. + match timeout(context.0.pending_requests_timeout, rx).await { + Ok(resp) => { + let response = resp?; + let code = response["code"].as_i64(); + if let Some(value) = code { + if -32000 == value { + return Err(eyre!("{}", serde_json::to_string(&response)?)); + } + } + serde_json::from_value(response).wrap_err("failed to parse response") + } + Err(_) => { + if let Some((_id, _sender)) = context.0.pending_requests.remove(&id) { + Err(eyre!( + "{}", + serde_json::json!({ + "code": -32000, + "payload": { + "reason": "Request is dropped because of timeout", + "timeout": context.0.pending_requests_timeout.as_millis() as u64, + } + }) + )) + // TODO Reject the request? + } else { + Err(eyre!( + "{}", + serde_json::json!({ + "code": -32000, + "payload": { + "reason": "Request is dropped because of not exists", + } + }) + )) + } } } - serde_json::from_value(response).wrap_err("failed to parse response") } /// attempts to create a session with the external wallet,