Skip to content

Problem: walletconnnect requests would not be cleared if no response (fix #302) #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion wallet-connect/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use ethers::prelude::{Address, JsonRpcClient};
use rand::Rng;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{oneshot, Mutex};
Expand All @@ -30,11 +31,14 @@ pub struct Context {
pub session: Mutex<Session>,
/// indicates whether the session is being established
pub session_pending: AtomicBool,
/// record the time of the request and have a regular cleanup
pub pending_requests_timeout: Duration,
/// limit pending requests size
pub pending_requests_limit: usize,
/// the map of the requests that were sent to the wallet
/// and the client app is awaiting a response.
/// When the response is received, the request is removed
/// and the response is sent to the receiver via the one-shot channel.
/// TODO: limit size or record the time of the request and have a regular cleanup?
pub pending_requests: DashMap<u64, oneshot::Sender<serde_json::Value>>,
}

Expand All @@ -49,6 +53,8 @@ impl SharedContext {
Self(Arc::new(Context {
session: Mutex::new(session),
session_pending: AtomicBool::new(false),
pending_requests_timeout: Duration::from_millis(60000),
pending_requests_limit: 2,
pending_requests: DashMap::new(),
}))
}
Expand Down
51 changes: 43 additions & 8 deletions wallet-connect/src/client/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::{future, SinkExt, TryStreamExt};
pub use native::*;
use serde::{de::DeserializeOwned, Serialize};
use thiserror::Error;
use tokio::time::timeout;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedSender},
Expand All @@ -31,7 +32,6 @@ use eyre::{eyre, Context};
#[derive(Debug)]
pub struct Socket {
/// queue for messages to be sent to the bridge server
/// TODO: make it bounded?
sender: UnboundedSender<(Option<u64>, Vec<u8>)>,
/// the handle of the task that writes on the websocket connection
_write_handle: JoinHandle<()>,
Expand Down Expand Up @@ -117,7 +117,12 @@ impl Socket {
context: &SharedContext,
) -> eyre::Result<R> {
let (tx, rx) = oneshot::channel();
context.0.pending_requests.insert(id, tx);
if context.0.pending_requests.len() >= context.0.pending_requests_limit {
return Err(eyre!("Reached the limit ({}) pending requests, please clear all pending requests before making new requests", context.0.pending_requests.len()));
} else {
context.0.pending_requests.insert(id, tx);
}

let session = context.0.session.lock().await;
let topic = session
.info
Expand All @@ -133,14 +138,44 @@ impl Socket {
};
drop(session);
self.send_socket_msg(context, id, message)?;
let response = rx.await?;
let code = response["code"].as_i64();
if let Some(value) = code {
if -32000 == value {
return Err(eyre!("{}", serde_json::to_string(&response)?));
// Wrap the future with a `Timeout` set to expire in `pending_requests_timeout` Duration.
match timeout(context.0.pending_requests_timeout, rx).await {
Ok(resp) => {
let response = resp?;
let code = response["code"].as_i64();
if let Some(value) = code {
if -32000 == value {
return Err(eyre!("{}", serde_json::to_string(&response)?));
}
}
serde_json::from_value(response).wrap_err("failed to parse response")
}
Err(_) => {
if let Some((_id, _sender)) = context.0.pending_requests.remove(&id) {
Err(eyre!(
"{}",
serde_json::json!({
"code": -32000,
"payload": {
"reason": "Request is dropped because of timeout",
"timeout": context.0.pending_requests_timeout.as_millis() as u64,
}
})
))
// TODO Reject the request?
} else {
Err(eyre!(
"{}",
serde_json::json!({
"code": -32000,
"payload": {
"reason": "Request is dropped because of not exists",
}
})
))
}
}
}
serde_json::from_value(response).wrap_err("failed to parse response")
}

/// attempts to create a session with the external wallet,
Expand Down