diff --git a/.gitignore b/.gitignore
index 3e8850c4b0..8d9fe0e6bc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
# Vim
.sw?
.*.sw?
+
+
+#################
+## JetBrains Rider
+#################
+.idea/
diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
index ae8f7c1dad..0a6c7a6c15 100644
--- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
+++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
@@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
///
public bool TopologyRecoveryEnabled { get; set; } = true;
+ ///
+ /// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
+ /// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
+ /// once requests become asynchronous. Defaults to false.
+ ///
+ public bool EnableSynchronousWriteLoop { get; set; } = false;
+
///
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
@@ -642,7 +649,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
- RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
+ RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
return ConfigureFrameHandler(fh);
}
diff --git a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
index e8c2acd125..8edce34190 100644
--- a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
+++ b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
@@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
Func socketFactory,
TimeSpan connectionTimeout,
TimeSpan readTimeout,
- TimeSpan writeTimeout)
+ TimeSpan writeTimeout,
+ bool enableSynchronousWriteLoop)
{
- return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
+ return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
{
MemoryPool = pool
};
diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
index 141e02aee0..9553bfec43 100644
--- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
+++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
@@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
private readonly byte[] _frameHeaderBuffer;
private bool _closed;
private ArrayPool _pool = ArrayPool.Shared;
+ private readonly bool _enableSynchronousWriteLoop;
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func socketFactory,
- TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
+ TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
{
_endpoint = endpoint;
+ _enableSynchronousWriteLoop = enableSynchronousWriteLoop;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded>(
new UnboundedChannelOptions
@@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
WriteTimeout = writeTimeout;
- _writerTask = Task.Run(WriteLoop, CancellationToken.None);
+ if (_enableSynchronousWriteLoop)
+ {
+ TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
+ _writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
+ }
+ else
+ {
+ _writerTask = Task.Run(WriteLoop, CancellationToken.None);
+ }
}
public AmqpTcpEndpoint Endpoint
@@ -270,17 +280,41 @@ private async Task WriteLoop()
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
- while (_channelReader.TryRead(out var memory))
+ while (_channelReader.TryRead(out ReadOnlyMemory memory))
{
- MemoryMarshal.TryGetArray(memory, out ArraySegment segment);
- await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
- MemoryPool.Return(segment.Array);
+ if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment))
+ {
+ if (segment.Array != null)
+ {
+ await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
+ MemoryPool.Return(segment.Array);
+ }
+ }
}
-
await _writer.FlushAsync().ConfigureAwait(false);
}
}
+ private void SynchronousWriteLoop()
+ {
+ while (_channelReader.WaitToReadAsync().AsTask().Result)
+ {
+ _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
+ while (_channelReader.TryRead(out ReadOnlyMemory memory))
+ {
+ if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment))
+ {
+ if (segment.Array != null)
+ {
+ _writer.Write(segment.Array, segment.Offset, segment.Count);
+ MemoryPool.Return(segment.Array);
+ }
+ }
+ }
+ _writer.Flush();
+ }
+ }
+
private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt
index d7bcdaab0c..78bb462a28 100644
--- a/projects/Unit/APIApproval.Approve.verified.txt
+++ b/projects/Unit/APIApproval.Approve.verified.txt
@@ -101,6 +101,7 @@ namespace RabbitMQ.Client
public RabbitMQ.Client.ICredentialsProvider CredentialsProvider { get; set; }
public RabbitMQ.Client.ICredentialsRefresher CredentialsRefresher { get; set; }
public bool DispatchConsumersAsync { get; set; }
+ public bool EnableSynchronousWriteLoop { get; set; }
public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; }
public System.Func, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
diff --git a/projects/Unit/TestConnectionFactory.cs b/projects/Unit/TestConnectionFactory.cs
index c21b19dedc..f75ea9a880 100644
--- a/projects/Unit/TestConnectionFactory.cs
+++ b/projects/Unit/TestConnectionFactory.cs
@@ -196,6 +196,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
}
}
+ [Test]
+ public void TestCreateConnectionWithSynchronousWriteLoop()
+ {
+ var cf = new ConnectionFactory
+ {
+ AutomaticRecoveryEnabled = true,
+ HostName = "localhost",
+ EnableSynchronousWriteLoop = true
+ };
+ using (IConnection conn = cf.CreateConnection()){
+ Assert.AreEqual(5672, conn.Endpoint.Port);
+ }
+ }
+
[Test]
public void TestCreateConnectionUsesDefaultPort()
{