Skip to content

Commit 266ba84

Browse files
authored
Merge pull request dotnet#33415 from stephentoub/compressionstreams
Add {Deflate\GZip\Brotli}Stream.DisposeAsync overrides
2 parents d30157d + 9782335 commit 266ba84

File tree

9 files changed

+218
-3
lines changed

9 files changed

+218
-3
lines changed

src/System.IO.Compression.Brotli/ref/System.IO.Compression.Brotli.cs

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public BrotliStream(System.IO.Stream stream, System.IO.Compression.CompressionMo
3636
public override long Length { get { throw null; } }
3737
public override long Position { get { throw null; } set { } }
3838
protected override void Dispose(bool disposing) { }
39+
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
3940
public override void Flush() { }
4041
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback asyncCallback, object asyncState) { throw null; }
4142
public override int EndRead(IAsyncResult asyncResult) { throw null; }

src/System.IO.Compression.Brotli/src/System/IO/Compression/BrotliStream.cs

+26
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Diagnostics;
66
using System.Runtime.CompilerServices;
77
using System.Threading;
8+
using System.Threading.Tasks;
89

910
namespace System.IO.Compression
1011
{
@@ -74,6 +75,31 @@ protected override void Dispose(bool disposing)
7475
}
7576
}
7677

78+
public override async ValueTask DisposeAsync()
79+
{
80+
try
81+
{
82+
if (_stream != null)
83+
{
84+
if (_mode == CompressionMode.Compress)
85+
{
86+
await WriteAsyncMemoryCore(ReadOnlyMemory<byte>.Empty, CancellationToken.None, isFinalBlock: true).ConfigureAwait(false);
87+
}
88+
89+
if (!_leaveOpen)
90+
{
91+
await _stream.DisposeAsync().ConfigureAwait(false);
92+
}
93+
}
94+
}
95+
finally
96+
{
97+
_stream = null;
98+
_encoder.Dispose();
99+
_decoder.Dispose();
100+
}
101+
}
102+
77103
private static void ValidateParameters(byte[] array, int offset, int count)
78104
{
79105
if (array == null)

src/System.IO.Compression.Brotli/src/System/IO/Compression/enc/BrotliStream.Compress.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
8181
WriteAsyncMemoryCore(buffer, cancellationToken));
8282
}
8383

84-
private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
84+
private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken, bool isFinalBlock = false)
8585
{
8686
AsyncOperationStarting();
8787
try
@@ -92,7 +92,7 @@ private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, Cancellatio
9292
Memory<byte> output = new Memory<byte>(_buffer);
9393
int bytesConsumed = 0;
9494
int bytesWritten = 0;
95-
lastResult = _encoder.Compress(buffer, output, out bytesConsumed, out bytesWritten, isFinalBlock: false);
95+
lastResult = _encoder.Compress(buffer, output, out bytesConsumed, out bytesWritten, isFinalBlock);
9696
if (lastResult == OperationStatus.InvalidData)
9797
throw new InvalidOperationException(SR.BrotliStream_Compress_InvalidData);
9898
if (bytesConsumed > 0)

src/System.IO.Compression.Brotli/tests/CompressionStreamUnitTests.Brotli.cs

+21
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,27 @@ public void Precancellation()
3333
}
3434
}
3535

36+
[Theory]
37+
[InlineData(false)]
38+
[InlineData(true)]
39+
public async Task DisposeAsync_Flushes(bool leaveOpen)
40+
{
41+
var ms = new MemoryStream();
42+
var bs = new BrotliStream(ms, CompressionMode.Compress, leaveOpen);
43+
bs.WriteByte(1);
44+
Assert.Equal(0, ms.Position);
45+
await bs.DisposeAsync();
46+
Assert.InRange(ms.ToArray().Length, 1, int.MaxValue);
47+
if (leaveOpen)
48+
{
49+
Assert.InRange(ms.Position, 1, int.MaxValue);
50+
}
51+
else
52+
{
53+
Assert.Throws<ObjectDisposedException>(() => ms.Position);
54+
}
55+
}
56+
3657
[Fact]
3758
[OuterLoop("Test takes ~6 seconds to run")]
3859
public override void FlushAsync_DuringWriteAsync() { base.FlushAsync_DuringWriteAsync(); }

src/System.IO.Compression/ref/System.IO.Compression.cs

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public DeflateStream(System.IO.Stream stream, System.IO.Compression.CompressionM
3232
public override long Position { get { throw null; } set { } }
3333
public override System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, int bufferSize, System.Threading.CancellationToken cancellationToken) { throw null; }
3434
protected override void Dispose(bool disposing) { }
35+
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
3536
public override void Flush() { }
3637
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback asyncCallback, object asyncState) { throw null; }
3738
public override int EndRead(IAsyncResult asyncResult) { throw null; }
@@ -66,6 +67,7 @@ public GZipStream(System.IO.Stream stream, System.IO.Compression.CompressionMode
6667
public override void CopyTo(System.IO.Stream destination, int bufferSize) { }
6768
public override System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, int bufferSize, System.Threading.CancellationToken cancellationToken) { throw null; }
6869
protected override void Dispose(bool disposing) { }
70+
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
6971
public override void Flush() { }
7072
public override IAsyncResult BeginRead(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState) { throw null; }
7173
public override int EndRead(IAsyncResult asyncResult) { throw null; }

src/System.IO.Compression/src/System/IO/Compression/DeflateZLib/DeflateStream.cs

+99-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System.Buffers;
66
using System.Diagnostics;
7-
using System.Runtime.CompilerServices;
87
using System.Runtime.InteropServices;
98
using System.Threading;
109
using System.Threading.Tasks;
@@ -628,6 +627,53 @@ private void PurgeBuffers(bool disposing)
628627
}
629628
}
630629

630+
private async Task PurgeBuffersAsync()
631+
{
632+
// Same logic as PurgeBuffers, except with async counterparts.
633+
634+
if (_stream == null)
635+
return;
636+
637+
if (_mode != CompressionMode.Compress)
638+
return;
639+
640+
// Some deflaters (e.g. ZLib) write more than zero bytes for zero byte inputs.
641+
// This round-trips and we should be ok with this, but our legacy managed deflater
642+
// always wrote zero output for zero input and upstack code (e.g. ZipArchiveEntry)
643+
// took dependencies on it. Thus, make sure to only "flush" when we actually had
644+
// some input:
645+
if (_wroteBytes)
646+
{
647+
// Compress any bytes left
648+
await WriteDeflaterOutputAsync(default).ConfigureAwait(false);
649+
650+
// Pull out any bytes left inside deflater:
651+
bool finished;
652+
do
653+
{
654+
int compressedBytes;
655+
finished = _deflater.Finish(_buffer, out compressedBytes);
656+
657+
if (compressedBytes > 0)
658+
await _stream.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, compressedBytes)).ConfigureAwait(false);
659+
} while (!finished);
660+
}
661+
else
662+
{
663+
// In case of zero length buffer, we still need to clean up the native created stream before
664+
// the object get disposed because eventually ZLibNative.ReleaseHandle will get called during
665+
// the dispose operation and although it frees the stream but it return error code because the
666+
// stream state was still marked as in use. The symptoms of this problem will not be seen except
667+
// if running any diagnostic tools which check for disposing safe handle objects
668+
bool finished;
669+
do
670+
{
671+
int compressedBytes;
672+
finished = _deflater.Finish(_buffer, out compressedBytes);
673+
} while (!finished);
674+
}
675+
}
676+
631677
protected override void Dispose(bool disposing)
632678
{
633679
try
@@ -674,6 +720,58 @@ protected override void Dispose(bool disposing)
674720
}
675721
}
676722

723+
public override ValueTask DisposeAsync()
724+
{
725+
return GetType() == typeof(DeflateStream) ?
726+
DisposeAsyncCore() :
727+
base.DisposeAsync();
728+
}
729+
730+
private async ValueTask DisposeAsyncCore()
731+
{
732+
// Same logic as Dispose(true), except with async counterparts.
733+
try
734+
{
735+
await PurgeBuffersAsync().ConfigureAwait(false);
736+
}
737+
finally
738+
{
739+
// Close the underlying stream even if PurgeBuffers threw.
740+
// Stream.Close() may throw here (may or may not be due to the same error).
741+
// In this case, we still need to clean up internal resources, hence the inner finally blocks.
742+
Stream stream = _stream;
743+
_stream = null;
744+
try
745+
{
746+
if (!_leaveOpen && stream != null)
747+
await stream.DisposeAsync().ConfigureAwait(false);
748+
}
749+
finally
750+
{
751+
try
752+
{
753+
_deflater?.Dispose();
754+
_inflater?.Dispose();
755+
}
756+
finally
757+
{
758+
_deflater = null;
759+
_inflater = null;
760+
761+
byte[] buffer = _buffer;
762+
if (buffer != null)
763+
{
764+
_buffer = null;
765+
if (!AsyncOperationIsActive)
766+
{
767+
ArrayPool<byte>.Shared.Return(buffer);
768+
}
769+
}
770+
}
771+
}
772+
}
773+
}
774+
677775
public override IAsyncResult BeginWrite(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState) =>
678776
TaskToApm.Begin(WriteAsync(array, offset, count, CancellationToken.None), asyncCallback, asyncState);
679777

src/System.IO.Compression/src/System/IO/Compression/GZipStream.cs

+12
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ protected override void Dispose(bool disposing)
150150
}
151151
}
152152

153+
public override ValueTask DisposeAsync()
154+
{
155+
if (GetType() != typeof(GZipStream))
156+
{
157+
return base.DisposeAsync();
158+
}
159+
160+
DeflateStream ds = _deflateStream;
161+
_deflateStream = null;
162+
return ds.DisposeAsync();
163+
}
164+
153165
public Stream BaseStream => _deflateStream?.BaseStream;
154166

155167
public override Task<int> ReadAsync(byte[] array, int offset, int count, CancellationToken cancellationToken)

src/System.IO.Compression/tests/CompressionStreamUnitTests.Deflate.cs

+25
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ public void DerivedStream_ReadWriteSpan_UsesReadWriteArray()
7777
}
7878
}
7979

80+
[Theory]
81+
[InlineData(false, false)]
82+
[InlineData(false, true)]
83+
[InlineData(true, false)]
84+
[InlineData(true, false)]
85+
public async Task DisposeAsync_Flushes(bool derived, bool leaveOpen)
86+
{
87+
var ms = new MemoryStream();
88+
var ds = derived ?
89+
new DerivedDeflateStream(ms, CompressionMode.Compress, leaveOpen) :
90+
new DeflateStream(ms, CompressionMode.Compress, leaveOpen);
91+
ds.WriteByte(1);
92+
Assert.Equal(0, ms.Position);
93+
await ds.DisposeAsync();
94+
Assert.InRange(ms.ToArray().Length, 1, int.MaxValue);
95+
if (leaveOpen)
96+
{
97+
Assert.InRange(ms.Position, 1, int.MaxValue);
98+
}
99+
else
100+
{
101+
Assert.Throws<ObjectDisposedException>(() => ms.Position);
102+
}
103+
}
104+
80105
private sealed class DerivedDeflateStream : DeflateStream
81106
{
82107
public bool ReadArrayInvoked = false, WriteArrayInvoked = false;

src/System.IO.Compression/tests/CompressionStreamUnitTests.Gzip.cs

+30
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,36 @@ public void DerivedStream_ReadWriteSpan_UsesReadWriteArray()
193193
}
194194
}
195195

196+
[Theory]
197+
[InlineData(false, false)]
198+
[InlineData(false, true)]
199+
[InlineData(true, false)]
200+
[InlineData(true, false)]
201+
public async Task DisposeAsync_Flushes(bool derived, bool leaveOpen)
202+
{
203+
var ms = new MemoryStream();
204+
var gs = derived ?
205+
new DerivedGZipStream(ms, CompressionMode.Compress, leaveOpen) :
206+
new GZipStream(ms, CompressionMode.Compress, leaveOpen);
207+
gs.WriteByte(1);
208+
await gs.FlushAsync();
209+
210+
long pos = ms.Position;
211+
gs.WriteByte(1);
212+
Assert.Equal(pos, ms.Position);
213+
214+
await gs.DisposeAsync();
215+
Assert.InRange(ms.ToArray().Length, pos + 1, int.MaxValue);
216+
if (leaveOpen)
217+
{
218+
Assert.InRange(ms.Position, pos + 1, int.MaxValue);
219+
}
220+
else
221+
{
222+
Assert.Throws<ObjectDisposedException>(() => ms.Position);
223+
}
224+
}
225+
196226
private sealed class DerivedGZipStream : GZipStream
197227
{
198228
public bool ReadArrayInvoked = false, WriteArrayInvoked = false;

0 commit comments

Comments
 (0)