Skip to content
This repository was archived by the owner on Jul 30, 2019. It is now read-only.

Commit 4a7a01e

Browse files
author
Laszlo Hordos
committed
OPENICF-454 - Add periodical message to sync request statuses between client-server
1 parent 70d44a5 commit 4a7a01e

File tree

6 files changed

+209
-77
lines changed

6 files changed

+209
-77
lines changed

FrameworkProtoBuf/protobuf/RPCMessages.proto

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ message ControlRequest {
4040
REQUESTS = 2;
4141
}
4242
repeated InfoLevel infoLevel = 1;
43+
repeated int64 localRequestId = 2;
44+
repeated int64 remoteRequestId = 3;
4345
}
4446

4547
message ControlResponse {

FrameworkRPC/Rpc.cs

+24
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,18 @@ protected internal LocalRequest(long requestId, TH socket)
177177
_remoteConnectionContext.RemoteConnectionGroup.ReceiveRequest<TV, TE, LocalRequest<TV, TE, TG, TH, TP>>(this);
178178
}
179179

180+
/// <summary>
181+
/// Check if this object was {@ref Inconsistent}-ed and don't dispose.
182+
/// </summary>
183+
/// <returns> 'true' when object is still active or 'false' when this can be
184+
/// disposed. </returns>
185+
public abstract bool Check();
186+
187+
/// <summary>
188+
/// Signs that the object state is inconsistent.
189+
/// </summary>
190+
public abstract void Inconsistent();
191+
180192
protected internal abstract bool TryHandleResult(TV result);
181193

182194
protected internal abstract bool TryHandleError(TE error);
@@ -783,6 +795,18 @@ protected RemoteRequest(TP context, Int64 requestId,
783795
_cancellationToken = cancellationToken;
784796
}
785797

798+
/// <summary>
799+
/// Check if this object was {@ref Inconsistent}-ed and don't dispose.
800+
/// </summary>
801+
/// <returns> 'true' when object is still active or 'false' when this can be
802+
/// disposed. </returns>
803+
public abstract bool Check();
804+
805+
/// <summary>
806+
/// Signs that the object state is inconsistent.
807+
/// </summary>
808+
public abstract void Inconsistent();
809+
786810
public abstract void HandleIncomingMessage(TH sourceConnection, Object message);
787811

788812
protected internal abstract RemoteConnectionGroup<TG, TH, TP>.AsyncMessageQueueRecord CreateMessageElement(

FrameworkServer/Client.cs

+24-6
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public ClientRemoteConnectorInfoManager(RemoteWSFrameworkConnectionInfo info,
7575
{
7676
if (result.IsCompleted)
7777
{
78-
connections[(int) o] = result.Result;
78+
connections[(int)o] = result.Result;
7979
}
8080
else if (result.IsFaulted)
8181
{
@@ -119,7 +119,8 @@ private sealed class RemoteDelegatingAsyncConnectorInfoManager : DelegatingAsync
119119
{
120120
private readonly ClientRemoteConnectorInfoManager _parent;
121121

122-
public RemoteDelegatingAsyncConnectorInfoManager(ClientRemoteConnectorInfoManager parent) : base(true)
122+
public RemoteDelegatingAsyncConnectorInfoManager(ClientRemoteConnectorInfoManager parent)
123+
: base(true)
123124
{
124125
_parent = parent;
125126
}
@@ -169,13 +170,29 @@ private readonly ConcurrentDictionary<RemoteWSFrameworkConnectionInfo, ClientRem
169170
registry =
170171
new ConcurrentDictionary<RemoteWSFrameworkConnectionInfo, ClientRemoteConnectorInfoManager>();
171172

173+
private readonly Timer _timer;
174+
172175
internal RemoteConnectionInfoManagerFactory(
173176
IMessageListener<WebSocketConnectionGroup, WebSocketConnectionHolder, RemoteOperationContext>
174177
messageListener,
175178
ConnectionManagerConfig managerConfig)
176179
{
177180
this._messageListener = messageListener;
178181
this.managerConfig = managerConfig;
182+
183+
184+
_timer = new Timer(state =>
185+
{
186+
if (Running)
187+
{
188+
foreach (var connectionGroup in connectionGroups.Values)
189+
{
190+
Trace.TraceInformation("Check ConnectionGroup:{0} - operational={1}", connectionGroup.RemoteSessionId
191+
, connectionGroup.Operational);
192+
connectionGroup.CheckIsActive();
193+
}
194+
}
195+
}, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(4));
179196
}
180197

181198
public IRemoteConnectorInfoManager Connect(RemoteWSFrameworkConnectionInfo info)
@@ -215,6 +232,7 @@ public IRemoteConnectorInfoManager Connect(RemoteWSFrameworkConnectionInfo info)
215232

216233
protected internal void doClose()
217234
{
235+
_timer.Dispose();
218236
foreach (var clientRemoteConnectorInfoManager in registry.Values)
219237
{
220238
clientRemoteConnectorInfoManager.Dispose();
@@ -356,7 +374,7 @@ public virtual long HeartbeatInterval
356374

357375
#endregion
358376

359-
#region sd
377+
#region WebSocketWrapper
360378

361379
public class WebSocketWrapper : WebSocketConnectionHolder
362380
{
@@ -524,15 +542,15 @@ private static void RunInTask(Action action)
524542
protected override async Task WriteMessageAsync(byte[] entry, WebSocketMessageType messageType)
525543
{
526544
var messageBuffer = entry;
527-
var messagesCount = (int) Math.Ceiling((double) messageBuffer.Length/SendChunkSize);
545+
var messagesCount = (int)Math.Ceiling((double)messageBuffer.Length / SendChunkSize);
528546

529547
for (var i = 0; i < messagesCount; i++)
530548
{
531-
var offset = (SendChunkSize*i);
549+
var offset = (SendChunkSize * i);
532550
var count = SendChunkSize;
533551
var lastMessage = ((i + 1) == messagesCount);
534552

535-
if ((count*(i + 1)) > messageBuffer.Length)
553+
if ((count * (i + 1)) > messageBuffer.Length)
536554
{
537555
count = messageBuffer.Length - offset;
538556
}

FrameworkServer/FrameworkServer.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
</Reference>
6565
<Reference Include="System" />
6666
<Reference Include="Microsoft.CSharp" />
67+
<Reference Include="System.Collections.Concurrent" />
6768
</ItemGroup>
6869
<ItemGroup>
6970
<Compile Include="Async.cs" />

0 commit comments

Comments
 (0)