Skip to content

Commit 3a6cbf1

Browse files
Michael Christiansenlukebakken
Michael Christiansen
authored andcommitted
Added a synchronous write loop for connections.
1 parent 6677851 commit 3a6cbf1

File tree

6 files changed

+73
-10
lines changed

6 files changed

+73
-10
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
124124
# Vim
125125
.sw?
126126
.*.sw?
127+
128+
129+
#################
130+
## JetBrains Rider
131+
#################
132+
.idea/

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
274274
/// </summary>
275275
public bool TopologyRecoveryEnabled { get; set; } = true;
276276

277+
/// <summary>
278+
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
279+
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
280+
/// once requests become asynchronous. Defaults to false.
281+
/// </summary>
282+
public bool EnableSynchronousWriteLoop { get; set; } = false;
283+
277284
/// <summary>
278285
/// Filter to include/exclude entities from topology recovery.
279286
/// Default filter includes all entities in topology recovery.
@@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
640647
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
641648
{
642649
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
643-
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
650+
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
644651
return ConfigureFrameHandler(fh);
645652
}
646653

projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
4545
Func<AddressFamily, ITcpClient> socketFactory,
4646
TimeSpan connectionTimeout,
4747
TimeSpan readTimeout,
48-
TimeSpan writeTimeout)
48+
TimeSpan writeTimeout,
49+
bool enableSynchronousWriteLoop)
4950
{
50-
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
51+
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
5152
{
5253
MemoryPool = pool
5354
};

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
7878
private readonly byte[] _frameHeaderBuffer;
7979
private bool _closed;
8080
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
81+
private readonly bool _enableSynchronousWriteLoop;
8182

8283
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
8384
Func<AddressFamily, ITcpClient> socketFactory,
84-
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
85+
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
8586
{
8687
_endpoint = endpoint;
88+
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
8789
_frameHeaderBuffer = new byte[6];
8890
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
8991
new UnboundedChannelOptions
@@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
134136
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
135137

136138
WriteTimeout = writeTimeout;
137-
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
139+
if (_enableSynchronousWriteLoop)
140+
{
141+
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
142+
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
143+
}
144+
else
145+
{
146+
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
147+
}
138148
}
139149

140150
public AmqpTcpEndpoint Endpoint
@@ -270,17 +280,41 @@ private async Task WriteLoop()
270280
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
271281
{
272282
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
273-
while (_channelReader.TryRead(out var memory))
283+
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
274284
{
275-
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
276-
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
277-
MemoryPool.Return(segment.Array);
285+
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
286+
{
287+
if (segment.Array != null)
288+
{
289+
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
290+
MemoryPool.Return(segment.Array);
291+
}
292+
}
278293
}
279-
280294
await _writer.FlushAsync().ConfigureAwait(false);
281295
}
282296
}
283297

298+
private void SynchronousWriteLoop()
299+
{
300+
while (_channelReader.WaitToReadAsync().AsTask().Result)
301+
{
302+
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
303+
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
304+
{
305+
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
306+
{
307+
if (segment.Array != null)
308+
{
309+
_writer.Write(segment.Array, segment.Offset, segment.Count);
310+
MemoryPool.Return(segment.Array);
311+
}
312+
}
313+
}
314+
_writer.Flush();
315+
}
316+
}
317+
284318
private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
285319
{
286320
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ namespace RabbitMQ.Client
101101
public RabbitMQ.Client.ICredentialsProvider CredentialsProvider { get; set; }
102102
public RabbitMQ.Client.ICredentialsRefresher CredentialsRefresher { get; set; }
103103
public bool DispatchConsumersAsync { get; set; }
104+
public bool EnableSynchronousWriteLoop { get; set; }
104105
public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; }
105106
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
106107
public System.TimeSpan HandshakeContinuationTimeout { get; set; }

projects/Unit/TestConnectionFactory.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
161161
}
162162
}
163163

164+
[Test]
165+
public void TestCreateConnectionWithSynchronousWriteLoop()
166+
{
167+
var cf = new ConnectionFactory
168+
{
169+
AutomaticRecoveryEnabled = true,
170+
HostName = "localhost",
171+
EnableSynchronousWriteLoop = true
172+
};
173+
using (IConnection conn = cf.CreateConnection()){
174+
Assert.AreEqual(5672, conn.Endpoint.Port);
175+
}
176+
}
177+
164178
[Test]
165179
public void TestCreateConnectionUsesDefaultPort()
166180
{

0 commit comments

Comments
 (0)