Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle incoming requests one by one #203

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 87 additions & 23 deletions WalletConnectSharp.Core/Controllers/TypedMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System.Security.Cryptography;
using System.Text;
using Newtonsoft.Json;
using WalletConnectSharp.Common.Events;
using WalletConnectSharp.Common.Model.Errors;
using Newtonsoft.Json;
using WalletConnectSharp.Common.Logging;
using WalletConnectSharp.Common.Utils;
using WalletConnectSharp.Core.Interfaces;
using WalletConnectSharp.Core.Models;
Expand All @@ -15,12 +12,18 @@ namespace WalletConnectSharp.Core.Controllers
{
public class TypedMessageHandler : ITypedMessageHandler
{
private bool _initialized = false;
private Dictionary<string, DecodeOptions> _decodeOptionsMap = new Dictionary<string, DecodeOptions>();
private HashSet<string> _typeSafeCache = new HashSet<string>();
private bool _initialized;
private bool _isRequestQueueProcessing;

private readonly Dictionary<string, DecodeOptions> _decodeOptionsMap = new();

private readonly HashSet<string> _typeSafeCache = [];

private readonly Queue<(string method, MessageEvent messageEvent)> _requestQueue = new();
private readonly Dictionary<string, List<Func<MessageEvent, Task>>> _requestCallbacksMap = new();
private readonly Dictionary<string, List<Action<MessageEvent>>> _responseCallbacksMap = new();

public event EventHandler<DecodedMessageEvent> RawMessage;
private EventHandlerMap<MessageEvent> messageEventHandlerMap = new();

protected bool Disposed;

Expand Down Expand Up @@ -57,24 +60,26 @@ public Task Init()
{
if (!_initialized)
{
this.Core.Relayer.OnMessageReceived += RelayerMessageCallback;
Core.Relayer.OnMessageReceived += RelayMessageCallback;
}

_initialized = true;
return Task.CompletedTask;
}

async void RelayerMessageCallback(object sender, MessageEvent e)
private async void RelayMessageCallback(object sender, MessageEvent e)
{
var topic = e.Topic;
var message = e.Message;


var options = DecodeOptionForTopic(topic);

var payload = await this.Core.Crypto.Decode<JsonRpcPayload>(topic, message, options);
if (payload.IsRequest)
{
messageEventHandlerMap[$"request_{payload.Method}"](this, e);
_requestQueue.Enqueue((payload.Method, e));
await ProcessRequestQueue();
}
else if (payload.IsResponse)
{
Expand All @@ -83,6 +88,44 @@ async void RelayerMessageCallback(object sender, MessageEvent e)
}
}

private async Task ProcessRequestQueue()
{
if (_isRequestQueueProcessing)
{
return;
}

_isRequestQueueProcessing = true;

try
{
while (_requestQueue.Count > 0)
{
var (method, messageEvent) = _requestQueue.Dequeue();
if (!_requestCallbacksMap.TryGetValue(method, out var callbacks))
{
continue;
}

foreach (var callback in callbacks)
{
try
{
await callback(messageEvent);
}
catch (Exception e)
{
WCLogger.LogError(e);
}
}
}
}
finally
{
_isRequestQueueProcessing = false;
}
}

/// <summary>
/// Handle a specific request / response type and call the given callbacks for requests and responses. The
/// response callback is only triggered when it originates from the request of the same type.
Expand All @@ -97,7 +140,7 @@ public async Task<DisposeHandlerToken> HandleMessageType<T, TR>(Func<string, Jso
var method = RpcMethodAttribute.MethodForType<T>();
var rpcHistory = await this.Core.History.JsonRpcHistoryOfType<T, TR>();

async void RequestCallback(object sender, MessageEvent e)
async Task RequestCallback(MessageEvent e)
{
try
{
Expand Down Expand Up @@ -128,7 +171,7 @@ async void RequestCallback(object sender, MessageEvent e)
}
}

async void ResponseCallback(object sender, MessageEvent e)
async void ResponseCallback(MessageEvent e)
{
if (responseCallback == null || Disposed)
{
Expand Down Expand Up @@ -184,15 +227,36 @@ record = await rpcHistory.Get(topic, payload.Id);
var resMethod = record.Request.Method;

// Trigger the true response event, which will trigger ResponseCallback
messageEventHandlerMap[$"response_{resMethod}"](this,
new MessageEvent

if (_responseCallbacksMap.TryGetValue(resMethod, out var callbacks))
{
var callbacksCopy = callbacks.ToList();
foreach (var callback in callbacksCopy)
{
Topic = topic, Message = message
});
callback(new MessageEvent
{
Topic = topic, Message = message
});
}
}
}

if (!_requestCallbacksMap.TryGetValue(method, out var requestCallbacks))
{
requestCallbacks = [];
_requestCallbacksMap.Add(method, requestCallbacks);
}

requestCallbacks.Add(RequestCallback);


if (!_responseCallbacksMap.TryGetValue(method, out var responseCallbacks))
{
responseCallbacks = [];
_responseCallbacksMap.Add(method, responseCallbacks);
}

messageEventHandlerMap[$"request_{method}"] += RequestCallback;
messageEventHandlerMap[$"response_{method}"] += ResponseCallback;
responseCallbacks.Add(ResponseCallback);

// Handle response_raw in this context
// This will allow us to examine response_raw in every typed context registered
Expand All @@ -202,8 +266,8 @@ record = await rpcHistory.Get(topic, payload.Id);
{
this.RawMessage -= InspectResponseRaw;

messageEventHandlerMap[$"request_{method}"] -= RequestCallback;
messageEventHandlerMap[$"response_{method}"] -= ResponseCallback;
_requestCallbacksMap[method].Remove(RequestCallback);
_responseCallbacksMap[method].Remove(ResponseCallback);
});
}

Expand Down Expand Up @@ -429,7 +493,7 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
this.Core.Relayer.OnMessageReceived -= RelayerMessageCallback;
Core.Relayer.OnMessageReceived -= RelayMessageCallback;
}

Disposed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ protected virtual async void Teardown()

protected virtual Task ResponseCallback(string arg1, JsonRpcResponse<TR> arg2)
{
WCLogger.Log($"Got generic response for type {typeof(TR)}");
var rea = new ResponseEventArgs<TR>(arg2, arg1);
return ResponsePredicate != null && !ResponsePredicate(rea) ? Task.CompletedTask :
_onResponse != null ? _onResponse(rea) : Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions WalletConnectSharp.Sign/Internals/EngineHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ async Task IEnginePrivate.OnSessionEventRequest(string topic, JsonRpcRequest<Ses
var eventData = @params.Event;
var eventName = eventData.Name;

await PrivateThis.IsValidEmit(topic, eventData, @params.ChainId);

await IsValidSessionTopic(topic);
_customSessionEventsHandlerMap[eventName]?.Invoke(this, @params);

await MessageHandler.SendResult<SessionEvent<EventData<JToken>>, bool>(id, topic, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protected override void Setup()

private Task WrappedRefOnOnResponse(ResponseEventArgs<TR> e)
{
WCLogger.Log($"Got response for type {typeof(TR)}");
return base.ResponseCallback(e.Topic, e.Response);
}

Expand Down
Loading