@@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
78
78
private readonly byte [ ] _frameHeaderBuffer ;
79
79
private bool _closed ;
80
80
private ArrayPool < byte > _pool = ArrayPool < byte > . Shared ;
81
+ private readonly bool _enableSynchronousWriteLoop ;
81
82
82
83
public SocketFrameHandler ( AmqpTcpEndpoint endpoint ,
83
84
Func < AddressFamily , ITcpClient > socketFactory ,
84
- TimeSpan connectionTimeout , TimeSpan readTimeout , TimeSpan writeTimeout )
85
+ TimeSpan connectionTimeout , TimeSpan readTimeout , TimeSpan writeTimeout , bool enableSynchronousWriteLoop )
85
86
{
86
87
_endpoint = endpoint ;
88
+ _enableSynchronousWriteLoop = enableSynchronousWriteLoop ;
87
89
_frameHeaderBuffer = new byte [ 6 ] ;
88
90
var channel = Channel . CreateUnbounded < ReadOnlyMemory < byte > > (
89
91
new UnboundedChannelOptions
@@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
134
136
_writer = new BufferedStream ( netstream , _socket . Client . SendBufferSize ) ;
135
137
136
138
WriteTimeout = writeTimeout ;
137
- _writerTask = Task . Run ( WriteLoop , CancellationToken . None ) ;
139
+ if ( _enableSynchronousWriteLoop )
140
+ {
141
+ TaskCreationOptions tco = TaskCreationOptions . LongRunning | TaskCreationOptions . DenyChildAttach ;
142
+ _writerTask = Task . Factory . StartNew ( SynchronousWriteLoop , CancellationToken . None , tco , TaskScheduler . Default ) ;
143
+ }
144
+ else
145
+ {
146
+ _writerTask = Task . Run ( WriteLoop , CancellationToken . None ) ;
147
+ }
138
148
}
139
149
140
150
public AmqpTcpEndpoint Endpoint
@@ -270,17 +280,41 @@ private async Task WriteLoop()
270
280
while ( await _channelReader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
271
281
{
272
282
_socket . Client . Poll ( _writeableStateTimeoutMicroSeconds , SelectMode . SelectWrite ) ;
273
- while ( _channelReader . TryRead ( out var memory ) )
283
+ while ( _channelReader . TryRead ( out ReadOnlyMemory < byte > memory ) )
274
284
{
275
- MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) ;
276
- await _writer . WriteAsync ( segment . Array , segment . Offset , segment . Count ) . ConfigureAwait ( false ) ;
277
- MemoryPool . Return ( segment . Array ) ;
285
+ if ( MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) )
286
+ {
287
+ if ( segment . Array != null )
288
+ {
289
+ await _writer . WriteAsync ( segment . Array , segment . Offset , segment . Count ) . ConfigureAwait ( false ) ;
290
+ MemoryPool . Return ( segment . Array ) ;
291
+ }
292
+ }
278
293
}
279
-
280
294
await _writer . FlushAsync ( ) . ConfigureAwait ( false ) ;
281
295
}
282
296
}
283
297
298
+ private void SynchronousWriteLoop ( )
299
+ {
300
+ while ( _channelReader . WaitToReadAsync ( ) . AsTask ( ) . Result )
301
+ {
302
+ _socket . Client . Poll ( _writeableStateTimeoutMicroSeconds , SelectMode . SelectWrite ) ;
303
+ while ( _channelReader . TryRead ( out ReadOnlyMemory < byte > memory ) )
304
+ {
305
+ if ( MemoryMarshal . TryGetArray ( memory , out ArraySegment < byte > segment ) )
306
+ {
307
+ if ( segment . Array != null )
308
+ {
309
+ _writer . Write ( segment . Array , segment . Offset , segment . Count ) ;
310
+ MemoryPool . Return ( segment . Array ) ;
311
+ }
312
+ }
313
+ }
314
+ _writer . Flush ( ) ;
315
+ }
316
+ }
317
+
284
318
private static bool ShouldTryIPv6 ( AmqpTcpEndpoint endpoint )
285
319
{
286
320
return Socket . OSSupportsIPv6 && endpoint . AddressFamily != AddressFamily . InterNetwork ;
0 commit comments