Skip to content

Commit eae39dd

Browse files
feat(csharp/src/Drivers/Databricks): Improve memory utilization of cloud downloads (#3652)
Improves memory utilization of cloud downloads by casting the downloaded and/or decompressed cloud data sets directly into Arrow data rather than having to deserialize them through a stream. NOTE: I have not benchmarked this change.
1 parent fc8c21a commit eae39dd

File tree

6 files changed

+38
-40
lines changed

6 files changed

+38
-40
lines changed

csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
using System.Net.Http;
2323
using System.Threading;
2424
using System.Threading.Tasks;
25-
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2625
using Apache.Arrow.Adbc.Tracing;
27-
using K4os.Compression.LZ4.Streams;
2826

2927
namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
3028
{
@@ -483,7 +481,7 @@ await this.TraceActivityAsync(async activity =>
483481
}
484482

485483
// Process the downloaded file data
486-
MemoryStream dataStream;
484+
Memory<byte> data;
487485
long actualSize = fileData.Length;
488486

489487
// If the data is LZ4 compressed, decompress it
@@ -498,26 +496,24 @@ await this.TraceActivityAsync(async activity =>
498496
fileData,
499497
cancellationToken).ConfigureAwait(false);
500498

501-
// Create the dataStream from the decompressed buffer
502-
dataStream = new MemoryStream(buffer, 0, length, writable: false, publiclyVisible: true);
503-
dataStream.Position = 0;
499+
data = new Memory<byte>(buffer, 0, length);
504500
decompressStopwatch.Stop();
505501

506502
// Calculate throughput metrics
507-
double compressionRatio = (double)dataStream.Length / actualSize;
503+
double compressionRatio = (double)length / actualSize;
508504

509505
activity?.AddEvent("cloudfetch.decompression_complete", [
510506
new("offset", downloadResult.Link.StartRowOffset),
511507
new("sanitized_url", sanitizedUrl),
512508
new("decompression_time_ms", decompressStopwatch.ElapsedMilliseconds),
513509
new("compressed_size_bytes", actualSize),
514510
new("compressed_size_kb", actualSize / 1024.0),
515-
new("decompressed_size_bytes", dataStream.Length),
516-
new("decompressed_size_kb", dataStream.Length / 1024.0),
511+
new("decompressed_size_bytes", length),
512+
new("decompressed_size_kb", length / 1024.0),
517513
new("compression_ratio", compressionRatio)
518514
]);
519515

520-
actualSize = dataStream.Length;
516+
actualSize = length;
521517
}
522518
catch (Exception ex)
523519
{
@@ -536,7 +532,7 @@ await this.TraceActivityAsync(async activity =>
536532
}
537533
else
538534
{
539-
dataStream = new MemoryStream(fileData);
535+
data = fileData;
540536
}
541537

542538
// Stop the stopwatch and log download completion
@@ -552,7 +548,7 @@ await this.TraceActivityAsync(async activity =>
552548
]);
553549

554550
// Set the download as completed with the original size
555-
downloadResult.SetCompleted(dataStream, size);
551+
downloadResult.SetCompleted(data, size);
556552
}, activityName: "DownloadFile");
557553
}
558554

csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public CloudFetchReader(
145145
// Create a new reader for the downloaded file
146146
try
147147
{
148-
this.currentReader = new ArrowStreamReader(this.currentDownloadResult.DataStream);
148+
this.currentReader = new ArrowStreamReader(this.currentDownloadResult.Data);
149149
continue;
150150
}
151151
catch (Exception ex)

csharp/src/Drivers/Databricks/Reader/CloudFetch/DownloadResult.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717

1818
using System;
19-
using System.IO;
2019
using System.Threading.Tasks;
2120
using Apache.Hive.Service.Rpc.Thrift;
2221

@@ -29,7 +28,7 @@ internal sealed class DownloadResult : IDownloadResult
2928
{
3029
private readonly TaskCompletionSource<bool> _downloadCompletionSource;
3130
private readonly ICloudFetchMemoryBufferManager _memoryManager;
32-
private Stream? _dataStream;
31+
private ReadOnlyMemory<byte> _data;
3332
private bool _isDisposed;
3433
private long _size;
3534

@@ -50,7 +49,7 @@ public DownloadResult(TSparkArrowResultLink link, ICloudFetchMemoryBufferManager
5049
public TSparkArrowResultLink Link { get; private set; }
5150

5251
/// <inheritdoc />
53-
public Stream DataStream
52+
public ReadOnlyMemory<byte> Data
5453
{
5554
get
5655
{
@@ -59,7 +58,7 @@ public Stream DataStream
5958
{
6059
throw new InvalidOperationException("Download has not completed yet.");
6160
}
62-
return _dataStream!;
61+
return _data;
6362
}
6463
}
6564

@@ -103,10 +102,14 @@ public void UpdateWithRefreshedLink(TSparkArrowResultLink refreshedLink)
103102
}
104103

105104
/// <inheritdoc />
106-
public void SetCompleted(Stream dataStream, long size)
105+
public void SetCompleted(ReadOnlyMemory<byte> data, long size)
107106
{
108107
ThrowIfDisposed();
109-
_dataStream = dataStream ?? throw new ArgumentNullException(nameof(dataStream));
108+
if (data.Length == 0)
109+
{
110+
throw new ArgumentException("Data cannot be empty.", nameof(data));
111+
}
112+
_data = data;
110113
_downloadCompletionSource.TrySetResult(true);
111114
_size = size;
112115
}
@@ -126,10 +129,9 @@ public void Dispose()
126129
return;
127130
}
128131

129-
if (_dataStream != null)
132+
if (_data.Length > 0)
130133
{
131-
_dataStream.Dispose();
132-
_dataStream = null;
134+
_data = default;
133135

134136
// Release memory back to the manager
135137
if (_size > 0)

csharp/src/Drivers/Databricks/Reader/CloudFetch/EndOfResultsGuard.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717

1818
using System;
19-
using System.IO;
2019
using System.Threading.Tasks;
2120
using Apache.Hive.Service.Rpc.Thrift;
2221

@@ -43,7 +42,7 @@ private EndOfResultsGuard()
4342
public TSparkArrowResultLink Link => throw new NotSupportedException("EndOfResultsGuard does not have a link.");
4443

4544
/// <inheritdoc />
46-
public Stream DataStream => throw new NotSupportedException("EndOfResultsGuard does not have a data stream.");
45+
public ReadOnlyMemory<byte> Data => throw new NotSupportedException("EndOfResultsGuard does not have data.");
4746

4847
/// <inheritdoc />
4948
public long Size => 0;
@@ -58,7 +57,7 @@ private EndOfResultsGuard()
5857
public int RefreshAttempts => 0;
5958

6059
/// <inheritdoc />
61-
public void SetCompleted(Stream dataStream, long size) => throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
60+
public void SetCompleted(ReadOnlyMemory<byte> data, long size) => throw new NotSupportedException("EndOfResultsGuard cannot be completed.");
6261

6362
/// <inheritdoc />
6463
public void SetFailed(Exception exception) => throw new NotSupportedException("EndOfResultsGuard cannot fail.");

csharp/src/Drivers/Databricks/Reader/CloudFetch/ICloudFetchInterfaces.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717

1818
using System;
19-
using System.IO;
2019
using System.Threading;
2120
using System.Threading.Tasks;
2221
using Apache.Hive.Service.Rpc.Thrift;
@@ -34,9 +33,9 @@ internal interface IDownloadResult : IDisposable
3433
TSparkArrowResultLink Link { get; }
3534

3635
/// <summary>
37-
/// Gets the stream containing the downloaded data.
36+
/// Gets the memory containing the downloaded data.
3837
/// </summary>
39-
Stream DataStream { get; }
38+
ReadOnlyMemory<byte> Data { get; }
4039

4140
/// <summary>
4241
/// Gets the size of the downloaded data in bytes.
@@ -61,9 +60,9 @@ internal interface IDownloadResult : IDisposable
6160
/// <summary>
6261
/// Sets the download as completed with the provided data stream.
6362
/// </summary>
64-
/// <param name="dataStream">The stream containing the downloaded data.</param>
63+
/// <param name="data">The downloaded data.</param>
6564
/// <param name="size">The size of the downloaded data in bytes.</param>
66-
void SetCompleted(Stream dataStream, long size);
65+
void SetCompleted(ReadOnlyMemory<byte> data, long size);
6766

6867
/// <summary>
6968
/// Sets the download as failed with the specified exception.

csharp/test/Drivers/Databricks/E2E/CloudFetch/CloudFetchDownloaderTest.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
using System.Threading.Tasks;
2626
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
2727
using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch;
28+
using Apache.Arrow.Adbc.Tracing;
2829
using Apache.Hive.Service.Rpc.Thrift;
2930
using Moq;
3031
using Moq.Protected;
@@ -46,6 +47,7 @@ public CloudFetchDownloaderTest()
4647
_resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), 10);
4748
_mockMemoryManager = new Mock<ICloudFetchMemoryBufferManager>();
4849
_mockStatement = new Mock<IHiveServer2Statement>();
50+
_mockStatement.SetupGet(x => x.Trace).Returns(new ActivityTrace());
4951
_mockResultFetcher = new Mock<ICloudFetchResultFetcher>();
5052

5153
// Set up memory manager defaults
@@ -136,13 +138,13 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
136138
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
137139
mockDownloadResult.Setup(r => r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
138140

139-
// Capture the stream and size passed to SetCompleted
140-
Stream? capturedStream = null;
141+
// Capture the date and size passed to SetCompleted
142+
ReadOnlyMemory<byte> capturedData = default;
141143
long capturedSize = 0;
142-
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
143-
.Callback<Stream, long>((stream, size) =>
144+
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
145+
.Callback<ReadOnlyMemory<byte>, long>((data, size) =>
144146
{
145-
capturedStream = stream;
147+
capturedData = data;
146148
capturedSize = size;
147149
});
148150

@@ -176,11 +178,11 @@ public async Task DownloadFileAsync_ProcessesFile_AndAddsToResultQueue()
176178
Assert.Same(mockDownloadResult.Object, result);
177179

178180
// Verify SetCompleted was called
179-
mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()), Times.Once);
181+
mockDownloadResult.Verify(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()), Times.Once);
180182

181183
// Verify the content of the stream
182-
Assert.NotNull(capturedStream);
183-
using (var reader = new StreamReader(capturedStream))
184+
Assert.NotEqual(0, capturedData.Length);
185+
using (var reader = new StreamReader(new MemoryStream(capturedData.ToArray())))
184186
{
185187
string content = reader.ReadToEnd();
186188
Assert.Equal(testContent, content);
@@ -484,8 +486,8 @@ public async Task GetNextDownloadedFileAsync_RespectsMaxParallelDownloads()
484486
mockDownloadResult.Setup(r => r.Size).Returns(100);
485487
mockDownloadResult.Setup(r => r.RefreshAttempts).Returns(0);
486488
mockDownloadResult.Setup(r => r.IsExpiredOrExpiringSoon(It.IsAny<int>())).Returns(false);
487-
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<Stream>(), It.IsAny<long>()))
488-
.Callback<Stream, long>((_, _) => { });
489+
mockDownloadResult.Setup(r => r.SetCompleted(It.IsAny<ReadOnlyMemory<byte>>(), It.IsAny<long>()))
490+
.Callback<ReadOnlyMemory<byte>, long>((_, _) => { });
489491
downloadResults[i] = mockDownloadResult.Object;
490492
}
491493

0 commit comments

Comments
 (0)