Skip to content

Commit

Permalink
Split up LogicalBatchReader/Writer into separate files (#371)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: philjdf <[email protected]>
  • Loading branch information
philjdf and philjdf authored Sep 10, 2023
1 parent ea865a4 commit 323b6f1
Show file tree
Hide file tree
Showing 19 changed files with 1,259 additions and 1,204 deletions.
687 changes: 0 additions & 687 deletions csharp/LogicalBatchReader.cs

This file was deleted.

149 changes: 149 additions & 0 deletions csharp/LogicalBatchReader/ArrayReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System;
using System.Collections.Generic;

namespace ParquetSharp.LogicalBatchReader
{
/// <summary>
/// Reads array values
/// </summary>
/// <typeparam name="TPhysical">The underlying physical type of leaf values in the column</typeparam>
/// <typeparam name="TLogical">The .NET logical type for the column leaf values</typeparam>
/// <typeparam name="TItem">The type of items contained in the array</typeparam>
internal sealed class ArrayReader<TPhysical, TLogical, TItem> : ILogicalBatchReader<TItem[]?>
where TPhysical : unmanaged
{
public ArrayReader(
ILogicalBatchReader<TItem> innerReader,
BufferedReader<TLogical, TPhysical> bufferedReader,
short definitionLevel,
short repetitionLevel,
bool innerNodeIsOptional)
{
_innerReader = innerReader;
_bufferedReader = bufferedReader;
_definitionLevel = definitionLevel;
_repetitionLevel = repetitionLevel;
_innerNodeIsOptional = innerNodeIsOptional;
}

public int ReadBatch(Span<TItem[]?> destination)
{
for (var i = 0; i < destination.Length; ++i)
{
if (_bufferedReader.IsEofDefinition)
{
return i;
}

var defn = _bufferedReader.GetCurrentDefinition();
if (defn.DefLevel > _definitionLevel)
{
if (typeof(TItem) == typeof(TLogical))
{
destination[i] = ReadLogicalTypeArray() as TItem[];
}
else
{
destination[i] = ReadInnerTypeArray();
}
}
else if (defn.DefLevel == _definitionLevel)
{
destination[i] = Array.Empty<TItem>();
_bufferedReader.NextDefinition();
}
else
{
destination[i] = null;
_bufferedReader.NextDefinition();
}
}

return destination.Length;
}

/// <summary>
/// Read an array of values using the inner logical batch reader
/// </summary>
private TItem[] ReadInnerTypeArray()
{
var values = new List<TItem>();
var value = new TItem[1];

var firstValue = true;
while (!_bufferedReader.IsEofDefinition)
{
var defn = _bufferedReader.GetCurrentDefinition();
if (!firstValue && defn.RepLevel <= _repetitionLevel)
{
break;
}

_innerReader.ReadBatch(value);
values.Add(value[0]);
firstValue = false;
}
return values.ToArray();
}

/// <summary>
/// Read an array of values directly from the buffered reader, for when the items in arrays
/// are the leaf level logical values.
/// </summary>
private TLogical[] ReadLogicalTypeArray()
{
var valueChunks = new List<TLogical[]>();
var innerDefLevel = (short) (_innerNodeIsOptional ? _definitionLevel + 2 : _definitionLevel + 1);
var innerRepLevel = (short) (_repetitionLevel + 1);

var atArrayStart = true;
while (!_bufferedReader.IsEofDefinition)
{
var reachedArrayEnd =
_bufferedReader.ReadValuesAtRepetitionLevel(innerRepLevel, innerDefLevel, atArrayStart,
out var valuesSpan);
if (reachedArrayEnd && atArrayStart)
{
return valuesSpan.ToArray();
}
atArrayStart = false;
valueChunks.Add(valuesSpan.ToArray());
if (reachedArrayEnd)
{
break;
}
}

if (valueChunks.Count == 1)
{
return valueChunks[0];
}

var totalSize = 0;
foreach (var chunk in valueChunks)
{
totalSize += chunk.Length;
}
var offset = 0;
var values = new TLogical[totalSize];
foreach (var chunk in valueChunks)
{
chunk.CopyTo(values, offset);
offset += chunk.Length;
}

return values;
}

public bool HasNext()
{
return !_bufferedReader.IsEofDefinition;
}

private readonly ILogicalBatchReader<TItem> _innerReader;
private readonly BufferedReader<TLogical, TPhysical> _bufferedReader;
private readonly short _definitionLevel;
private readonly short _repetitionLevel;
private readonly bool _innerNodeIsOptional;
}
}
40 changes: 40 additions & 0 deletions csharp/LogicalBatchReader/DirectReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;

namespace ParquetSharp.LogicalBatchReader
{
/// <summary>
/// Uses a direct reader to read physical values as the logical value type.
/// This doesn't use a buffered reader so is only compatible with plain scalar columns.
/// </summary>
internal sealed class DirectReader<TLogical, TPhysical> : ILogicalBatchReader<TLogical>
where TPhysical : unmanaged
{
public DirectReader(
ColumnReader<TPhysical> physicalReader,
LogicalRead<TLogical, TPhysical>.DirectReader directReader)
{
_physicalReader = physicalReader;
_directReader = directReader;
}

public int ReadBatch(Span<TLogical> destination)
{
var totalRowsRead = 0;
while (totalRowsRead < destination.Length && _physicalReader.HasNext)
{
var toRead = destination.Length - totalRowsRead;
var rowsRead = checked((int) _directReader(_physicalReader, destination.Slice(totalRowsRead, toRead)));
totalRowsRead += rowsRead;
}
return totalRowsRead;
}

public bool HasNext()
{
return _physicalReader.HasNext;
}

private readonly ColumnReader<TPhysical> _physicalReader;
private readonly LogicalRead<TLogical, TPhysical>.DirectReader _directReader;
}
}
15 changes: 15 additions & 0 deletions csharp/LogicalBatchReader/ILogicalBatchReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;

namespace ParquetSharp.LogicalBatchReader
{
/// <summary>
/// Reads batches of data of an element type corresponding to a level within the type hierarchy of a column
/// </summary>
/// <typeparam name="TElement">The type of values that are read</typeparam>
internal interface ILogicalBatchReader<TElement>
{
int ReadBatch(Span<TElement> destination);

bool HasNext();
}
}
39 changes: 39 additions & 0 deletions csharp/LogicalBatchReader/LeafReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;

namespace ParquetSharp.LogicalBatchReader
{
/// <summary>
/// Reads leaf level values within a compound structure.
/// </summary>
internal sealed class LeafReader<TLogical, TPhysical> : ILogicalBatchReader<TLogical>
where TPhysical : unmanaged
{
public LeafReader(
BufferedReader<TLogical, TPhysical> bufferedReader)
{
_bufferedReader = bufferedReader;
}

public int ReadBatch(Span<TLogical> destination)
{
for (var i = 0; i < destination.Length; ++i)
{
if (_bufferedReader.IsEofDefinition)
{
return i;
}
destination[i] = _bufferedReader.ReadValue();
_bufferedReader.NextDefinition();
}

return destination.Length;
}

public bool HasNext()
{
return !_bufferedReader.IsEofDefinition;
}

private readonly BufferedReader<TLogical, TPhysical> _bufferedReader;
}
}
Loading

0 comments on commit 323b6f1

Please sign in to comment.