Skip to content

Commit 0b01c6e

Browse files
chore(csharp/src/Drivers/Databricks): Revert previous "memory utilization of cloud downloads" change (#3655)
Reverts #3652 as it seems to have caused a regression.
1 parent eae39dd commit 0b01c6e

File tree

6 files changed

+40
-38
lines changed

6 files changed

+40
-38
lines changed

csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
using System.Net.Http;
2323
using System.Threading;
2424
using System.Threading.Tasks;
25+
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2526
using Apache.Arrow.Adbc.Tracing;
27+
using K4os.Compression.LZ4.Streams;
2628

2729
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
2830
{
@@ -481,7 +483,7 @@ await this.TraceActivityAsync(async activity =>
481483
}
482484

483485
// Process the downloaded file data
484-
Memory<byte> data;
486+
MemoryStream dataStream;
485487
long actualSize = fileData.Length;
486488

487489
// If the data is LZ4 compressed, decompress it
@@ -496,24 +498,26 @@ await this.TraceActivityAsync(async activity =>
496498
fileData,
497499
cancellationToken).ConfigureAwait(false);
498500

499-
data = new Memory<byte>(buffer, 0, length);
501+
// Create the dataStream from the decompressed buffer
502+
dataStream = new MemoryStream(buffer, 0, length, writable: false, publiclyVisible: true);
503+
dataStream.Position = 0;
500504
decompressStopwatch.Stop();
501505

502506
// Calculate throughput metrics
503-
double compressionRatio = (double)length / actualSize;
507+
double compressionRatio = (double)dataStream.Length / actualSize;
504508

505509
activity?.AddEvent("cloudfetch.decompression_complete", [
506510
new("offset", downloadResult.Link.StartRowOffset),
507511
new("sanitized_url", sanitizedUrl),
508512
new("decompression_time_ms", decompressStopwatch.ElapsedMilliseconds),
509513
new("compressed_size_bytes", actualSize),
510514
new("compressed_size_kb", actualSize / 1024.0),
511-
new("decompressed_size_bytes", length),
512-
new("decompressed_size_kb", length / 1024.0),
515+
new("decompressed_size_bytes", dataStream.Length),
516+
new("decompressed_size_kb", dataStream.Length / 1024.0),
513517
new("compression_ratio", compressionRatio)
514518
]);
515519

516-
actualSize = length;
520+
actualSize = dataStream.Length;
517521
}
518522
catch (Exception ex)
519523
{
@@ -532,7 +536,7 @@ await this.TraceActivityAsync(async activity =>
532536
}
533537
else
534538
{
535-
data = fileData;
539+
dataStream = new MemoryStream(fileData);
536540
}
537541

538542
// Stop the stopwatch and log download completion
@@ -548,7 +552,7 @@ await this.TraceActivityAsync(async activity =>
548552
]);
549553

550554
// Set the download as completed with the original size
551-
downloadResult.SetCompleted(data, size);
555+
downloadResult.SetCompleted(dataStream, size);
552556
}, activityName: "DownloadFile");
553557
}
554558

csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public CloudFetchReader(
145145
// Create a new reader for the downloaded file
146146
try
147147
{
148-
this.currentReader = new ArrowStreamReader(this.currentDownloadResult.Data);
148+
this.currentReader = new ArrowStreamReader(this.currentDownloadResult.DataStream);
149149
continue;
150150
}
151151
catch (Exception ex)

csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
using System;
19+
using System.IO;
1920
using System.Threading.Tasks;
2021
using Apache.Hive.Service.Rpc.Thrift;
2122

@@ -28,7 +29,7 @@ internal sealed class DownloadResult : IDownloadResult
2829
{
2930
private readonly TaskCompletionSource<bool> _downloadCompletionSource;
3031
private readonly ICloudFetchMemoryBufferManager _memoryManager;
31-
private ReadOnlyMemory<byte> _data;
32+
private Stream? _dataStream;
3233
private bool _isDisposed;
3334
private long _size;
3435

@@ -49,7 +50,7 @@ public DownloadResult(TSparkArrowResultLink link, ICloudFetchMemoryBufferManager
4950
public TSparkArrowResultLink Link { get; private set; }
5051

5152
/// <inheritdoc />
52-
public ReadOnlyMemory<byte> Data
53+
public Stream DataStream
5354
{
5455
get
5556
{
@@ -58,7 +59,7 @@ public ReadOnlyMemory<byte> Data
5859
{
5960
throw new InvalidOperationException("Download has not completed yet.");
6061
}
61-
return _data;
62+
return _dataStream!;
6263
}
6364
}
6465

@@ -102,14 +103,10 @@ public void UpdateWithRefreshedLink(TSparkArrowResultLink refreshedLink)
102103
}
103104

104105
/// <inheritdoc />
105-
public void SetCompleted(ReadOnlyMemory<byte> data, long size)
106+
public void SetCompleted(Stream dataStream, long size)
106107
{
107108
ThrowIfDisposed();
108-
if (data.Length == 0)
109-
{
110-
throw new ArgumentException("Data cannot be empty.", nameof(data));
111-
}
112-
_data = data;
109+
_dataStream = dataStream ?? throw new ArgumentNullException(nameof(dataStream));
113110
_downloadCompletionSource.TrySetResult(true);
114111
_size = size;
115112
}
@@ -129,9 +126,10 @@ public void Dispose()
129126
return;
130127
}
131128

132-
if (_data.Length > 0)
129+
if (_dataStream != null)
133130
{
134-
_data = default;
131+
_dataStream.Dispose();
132+
_dataStream = null;
135133

136134
// Release memory back to the manager
137135
if (_size > 0)

csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
using System;
19+
using System.IO;
1920
using System.Threading.Tasks;
2021
using Apache.Hive.Service.Rpc.Thrift;
2122

@@ -42,7 +43,7 @@ private EndOfResultsGuard()
4243
public TSparkArrowResultLink Link => throw new NotSupportedException("EndOfResultsGuard does not have a link.");
4344

4445
/// <inheritdoc />
45-
public ReadOnlyMemory<byte> Data => throw new NotSupportedException("EndOfResultsGuard does not have data.");
46+
public Stream DataStream => throw new NotSupportedException("EndOfResultsGuard does not have a data stream.");
4647

4748
/// <inheritdoc />
4849
public long Size => 0;
@@ -57,7 +58,7 @@ private EndOfResultsGuard()
5758
public int RefreshAttempts => 0;
5859

5960
/// <inheritdoc />
60-
public void SetCompleted(ReadOnlyMemory<byte> data, long size) => throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
61+
public void SetCompleted(Stream dataStream, long size) => throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
6162

6263
/// <inheritdoc />
6364
public void SetFailed(Exception exception) => throw new NotSupportedException("EndOfResultsGuard cannot fail.");

csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
using System;
19+
using System.IO;
1920
using System.Threading;
2021
using System.Threading.Tasks;
2122
using Apache.Hive.Service.Rpc.Thrift;
@@ -33,9 +34,9 @@ internal interface IDownloadResult : IDisposable
3334
TSparkArrowResultLink Link { get; }
3435

3536
/// <summary>
36-
/// Gets the memory containing the downloaded data.
37+
/// Gets the stream containing the downloaded data.
3738
/// </summary>
38-
ReadOnlyMemory<byte> Data { get; }
39+
Stream DataStream { get; }
3940

4041
/// <summary>
4142
/// Gets the size of the downloaded data in bytes.
@@ -60,9 +61,9 @@ internal interface IDownloadResult : IDisposable
6061
/// <summary>
6162
/// Sets the download as completed with the provided data stream.
6263
/// </summary>
63-
/// <param name="data">The downloaded data.</param>
64+
/// <param name="dataStream">The stream containing the downloaded data.</param>
6465
/// <param name="size">The size of the downloaded data in bytes.</param>
65-
void SetCompleted(ReadOnlyMemory<byte> data, long size);
66+
void SetCompleted(Stream dataStream, long size);
6667

6768
/// <summary>
6869
/// Sets the download as failed with the specified exception.

csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
using System.Threading.Tasks;
2626
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2727
using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
28-
using Apache.Arrow.Adbc.Tracing;
2928
using Apache.Hive.Service.Rpc.Thrift;
3029
using Moq;
3130
using Moq.Protected;
@@ -47,7 +46,6 @@ public CloudFetchDownloaderTest()
4746
_resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
4847
_mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
4948
_mockStatement = new Mock<IHiveServer2Statement>();
50-
_mockStatement.SetupGet(x => x.Trace).Returns(new ActivityTrace());
5149
_mockResultFetcher = new Mock<ICloudFetchResultFetcher>();
5250

5351
// Set up memory manager defaults
@@ -138,13 +136,13 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
138136
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
139137
mockDownloadResult.Setup(r => r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
140138

141-
// Capture the date and size passed to SetCompleted
142-
ReadOnlyMemory<byte> capturedData = default;
139+
// Capture the stream and size passed to SetCompleted
140+
Stream? capturedStream = null;
143141
long capturedSize = 0;
144-
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
145-
.Callback<ReadOnlyMemory<byte>, long>((data, size) =>
142+
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
143+
.Callback<Stream, long>((stream, size) =>
146144
{
147-
capturedData = data;
145+
capturedStream = stream;
148146
capturedSize = size;
149147
});
150148

@@ -178,11 +176,11 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
178176
Assert.Same(mockDownloadResult.Object, result);
179177

180178
// Verify SetCompleted was called
181-
mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()), Times.Once);
179+
mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()), Times.Once);
182180

183181
// Verify the content of the stream
184-
Assert.NotEqual(0, capturedData.Length);
185-
using (var reader = new StreamReader(new MemoryStream(capturedData.ToArray())))
182+
Assert.NotNull(capturedStream);
183+
using (var reader = new StreamReader(capturedStream))
186184
{
187185
string content = reader.ReadToEnd();
188186
Assert.Equal(testContent, content);
@@ -486,8 +484,8 @@ public async Task GetNextDownloadedFileAsync_RespectsMaxParallelDownloads()
486484
mockDownloadResult.Setup(r => r.Size).Returns(100);
487485
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
488486
mockDownloadResult.Setup(r => r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
489-
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
490-
.Callback<ReadOnlyMemory<byte>, long>((_, _) => { });
487+
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
488+
.Callback<Stream, long>((_, _) => { });
491489
downloadResults[i] = mockDownloadResult.Object;
492490
}
493491

0 commit comments

Comments
 (0)