Skip to content

Commit 8a40935

Browse files
author
Michael Christiansen
committed
Added a synchronous write loop for connections.
1 parent 6677851 commit 8a40935

File tree

5 files changed

+64
-8
lines changed

5 files changed

+64
-8
lines changed

.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
124124
# Vim
125125
.sw?
126126
.*.sw?
127+
128+
129+
#################
130+
## JetBrains Rider
131+
#################
132+
.idea/

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

+8-1
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
274274
/// </summary>
275275
public bool TopologyRecoveryEnabled { get; set; } = true;
276276

277+
/// <summary>
278+
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
279+
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
280+
/// once requests become asynchronous. Defaults to false.
281+
/// </summary>
282+
public bool EnableSynchronousWriteLoop { get; set; } = false;
283+
277284
/// <summary>
278285
/// Filter to include/exclude entities from topology recovery.
279286
/// Default filter includes all entities in topology recovery.
@@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
640647
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
641648
{
642649
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
643-
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
650+
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
644651
return ConfigureFrameHandler(fh);
645652
}
646653

projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
4545
Func<AddressFamily, ITcpClient> socketFactory,
4646
TimeSpan connectionTimeout,
4747
TimeSpan readTimeout,
48-
TimeSpan writeTimeout)
48+
TimeSpan writeTimeout,
49+
bool enableSynchronousWriteLoop)
4950
{
50-
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
51+
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
5152
{
5253
MemoryPool = pool
5354
};

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

+33-5
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
6363
class SocketFrameHandler : IFrameHandler
6464
{
6565
private readonly AmqpTcpEndpoint _endpoint;
66+
67+
6668
// Socket poll timeout in ms. If the socket does not
6769
// become writeable in this amount of time, we throw
6870
// an exception.
@@ -78,19 +80,19 @@ class SocketFrameHandler : IFrameHandler
7880
private readonly byte[] _frameHeaderBuffer;
7981
private bool _closed;
8082
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
83+
private readonly bool _enableSynchronousWriteLoop;
8184

8285
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
8386
Func<AddressFamily, ITcpClient> socketFactory,
84-
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
87+
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
8588
{
8689
_endpoint = endpoint;
90+
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
8791
_frameHeaderBuffer = new byte[6];
8892
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
8993
new UnboundedChannelOptions
9094
{
91-
AllowSynchronousContinuations = false,
92-
SingleReader = true,
93-
SingleWriter = false
95+
AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false
9496
});
9597

9698
_channelReader = channel.Reader;
@@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
134136
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
135137

136138
WriteTimeout = writeTimeout;
137-
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
139+
if (_enableSynchronousWriteLoop)
140+
{
141+
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
142+
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
143+
}
144+
else
145+
{
146+
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
147+
}
138148
}
139149

140150
public AmqpTcpEndpoint Endpoint
@@ -281,6 +291,24 @@ private async Task WriteLoop()
281291
}
282292
}
283293

294+
private void SynchronousWriteLoop()
295+
{
296+
while (_channelReader.WaitToReadAsync().AsTask().Result)
297+
{
298+
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
299+
while (_channelReader.TryRead(out var memory))
300+
{
301+
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) &&
302+
segment.Array != null)
303+
{
304+
_writer.Write(segment.Array, segment.Offset, segment.Count);
305+
MemoryPool.Return(segment.Array);
306+
}
307+
}
308+
_writer.Flush();
309+
}
310+
}
311+
284312
private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
285313
{
286314
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;

projects/Unit/TestConnectionFactory.cs

+14
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
161161
}
162162
}
163163

164+
[Test]
165+
public void TestCreateConnectionWithSynchronousWriteLoop()
166+
{
167+
var cf = new ConnectionFactory
168+
{
169+
AutomaticRecoveryEnabled = true,
170+
HostName = "localhost",
171+
EnableSynchronousWriteLoop = true
172+
};
173+
using (IConnection conn = cf.CreateConnection()){
174+
Assert.AreEqual(5672, conn.Endpoint.Port);
175+
}
176+
}
177+
164178
[Test]
165179
public void TestCreateConnectionUsesDefaultPort()
166180
{

0 commit comments

Comments
 (0)