Skip to content

Commit

Permalink
Support reading and writing data with the Arrow format (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve authored Sep 11, 2023
1 parent 323b6f1 commit 4fdc979
Show file tree
Hide file tree
Showing 27 changed files with 2,735 additions and 32 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ Supported platforms:
## Quickstart

The following examples show how to write and then read a Parquet file with three columns representing a timeseries of object-value pairs.
These use the low-level API, which is the recommended API and closely maps to the API of Apache Parquet C++.
These use the low-level API, which is the recommended API for working with native .NET types and closely maps to the API of Apache Parquet C++.
For reading and writing data in the [Apache Arrow](https://arrow.apache.org/) format, an [Arrow based API](docs/Arrow.md) is also provided.

Writing a Parquet File:

Expand Down Expand Up @@ -87,6 +88,7 @@ For more detailed information on how to use ParquetSharp, see the following docu
* [Writing parquet files](docs/Writing.md)
* [Reading parquet files](docs/Reading.md)
* [Working with nested data](docs/Nested.md)
* [Reading and writing Arrow data](docs/Arrow.md) — how to read and write data using the [Apache Arrow format](https://arrow.apache.org/)
* [Row-oriented API](docs/RowOriented.md) — a higher level API that abstracts away the column-oriented nature of Parquet files
* [Custom types](docs/TypeFactories.md) — how to override the mapping between .NET and Parquet types
* [Use from PowerShell](docs/PowerShell.md)
Expand Down
5 changes: 5 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ add_library(ParquetSharpNative SHARED
TypedStatistics.cpp
WriterProperties.cpp
WriterPropertiesBuilder.cpp
arrow/ArrowReaderProperties.cpp
arrow/ArrowWriterProperties.cpp
arrow/ArrowWriterPropertiesBuilder.cpp
arrow/FileReader.cpp
arrow/FileWriter.cpp
)

generate_export_header(ParquetSharpNative
Expand Down
8 changes: 8 additions & 0 deletions cpp/Enums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ namespace
static_assert(LogicalType::TimeUnit::MILLIS == 1);
static_assert(LogicalType::TimeUnit::MICROS == 2);
static_assert(LogicalType::TimeUnit::NANOS == 3);

static_assert(ArrowWriterProperties::EngineVersion::V1 == 0);
static_assert(ArrowWriterProperties::EngineVersion::V2 == 1);

static_assert(::arrow::TimeUnit::type::SECOND == 0);
static_assert(::arrow::TimeUnit::type::MILLI == 1);
static_assert(::arrow::TimeUnit::type::MICRO == 2);
static_assert(::arrow::TimeUnit::type::NANO == 3);
}

}
69 changes: 69 additions & 0 deletions cpp/arrow/ArrowReaderProperties.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <parquet/properties.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetDefault(ArrowReaderProperties** properties)
{
TRYCATCH(*properties = new ArrowReaderProperties(default_arrow_reader_properties());)
}

PARQUETSHARP_EXPORT void ArrowReaderProperties_Free(ArrowReaderProperties* properties)
{
delete properties;
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetUseThreads(ArrowReaderProperties* properties, bool* use_threads)
{
TRYCATCH(*use_threads = properties->use_threads();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_SetUseThreads(ArrowReaderProperties* properties, bool use_threads)
{
TRYCATCH(properties->set_use_threads(use_threads);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetBatchSize(ArrowReaderProperties* properties, int64_t* batch_size)
{
TRYCATCH(*batch_size = properties->batch_size();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_SetBatchSize(ArrowReaderProperties* properties, int64_t batch_size)
{
TRYCATCH(properties->set_batch_size(batch_size);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetReadDictionary(ArrowReaderProperties* properties, int column_index, bool* read_dictionary)
{
TRYCATCH(*read_dictionary = properties->read_dictionary(column_index);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_SetReadDictionary(ArrowReaderProperties* properties, int column_index, bool read_dictionary)
{
TRYCATCH(properties->set_read_dictionary(column_index, read_dictionary);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetPreBuffer(ArrowReaderProperties* properties, bool* pre_buffer)
{
TRYCATCH(*pre_buffer = properties->pre_buffer();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_SetPreBuffer(ArrowReaderProperties* properties, bool pre_buffer)
{
TRYCATCH(properties->set_pre_buffer(pre_buffer);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_GetCoerceInt96TimestampUnit(ArrowReaderProperties* properties, ::arrow::TimeUnit::type* unit)
{
TRYCATCH(*unit = properties->coerce_int96_timestamp_unit();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowReaderProperties_SetCoerceInt96TimestampUnit(ArrowReaderProperties* properties, ::arrow::TimeUnit::type unit)
{
TRYCATCH(properties->set_coerce_int96_timestamp_unit(unit);)
}
}
54 changes: 54 additions & 0 deletions cpp/arrow/ArrowWriterProperties.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <parquet/properties.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_GetDefault(std::shared_ptr<ArrowWriterProperties>** properties)
{
TRYCATCH(*properties = new std::shared_ptr<ArrowWriterProperties>(default_arrow_writer_properties());)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_CoerceTimestampsEnabled(std::shared_ptr<ArrowWriterProperties>* properties, bool* enabled)
{
TRYCATCH(*enabled = (*properties)->coerce_timestamps_enabled();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_CoerceTimestampsUnit(std::shared_ptr<ArrowWriterProperties>* properties, ::arrow::TimeUnit::type* unit)
{
TRYCATCH(*unit = (*properties)->coerce_timestamps_unit();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_TruncatedTimestampsAllowed(std::shared_ptr<ArrowWriterProperties>* properties, bool* allowed)
{
TRYCATCH(*allowed = (*properties)->truncated_timestamps_allowed();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_StoreSchema(std::shared_ptr<ArrowWriterProperties>* properties, bool* storeSchema)
{
TRYCATCH(*storeSchema = (*properties)->store_schema();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_CompliantNestedTypes(std::shared_ptr<ArrowWriterProperties>* properties, bool* compliantNestedTypes)
{
TRYCATCH(*compliantNestedTypes = (*properties)->compliant_nested_types();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_EngineVersion(std::shared_ptr<ArrowWriterProperties>* properties, ArrowWriterProperties::EngineVersion* engineVersion)
{
TRYCATCH(*engineVersion = (*properties)->engine_version();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterProperties_UseThreads(std::shared_ptr<ArrowWriterProperties>* properties, bool* useThreads)
{
TRYCATCH(*useThreads = (*properties)->use_threads();)
}

PARQUETSHARP_EXPORT void ArrowWriterProperties_Free(std::shared_ptr<ArrowWriterProperties>* properties)
{
delete properties;
}
}
65 changes: 65 additions & 0 deletions cpp/arrow/ArrowWriterPropertiesBuilder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

#include <parquet/properties.h>

using namespace parquet;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_Create(ArrowWriterProperties::Builder** builder)
{
TRYCATCH(*builder = new ArrowWriterProperties::Builder();)
}

PARQUETSHARP_EXPORT void ArrowWriterPropertiesBuilder_Free(ArrowWriterProperties::Builder* builder)
{
delete builder;
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_Build(ArrowWriterProperties::Builder* builder, std::shared_ptr<ArrowWriterProperties>** writerProperties)
{
TRYCATCH(*writerProperties = new std::shared_ptr(builder->build());)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_CoerceTimestamps(ArrowWriterProperties::Builder* builder, ::arrow::TimeUnit::type unit)
{
TRYCATCH(builder->coerce_timestamps(unit);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_AllowTruncatedTimestamps(ArrowWriterProperties::Builder* builder)
{
TRYCATCH(builder->allow_truncated_timestamps();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_DisallowTruncatedTimestamps(ArrowWriterProperties::Builder* builder)
{
TRYCATCH(builder->disallow_truncated_timestamps();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_StoreSchema(ArrowWriterProperties::Builder* builder)
{
TRYCATCH(builder->store_schema();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_EnableCompliantNestedTypes(ArrowWriterProperties::Builder* builder)
{
TRYCATCH(builder->enable_compliant_nested_types();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_DisableCompliantNestedTypes(ArrowWriterProperties::Builder* builder)
{
TRYCATCH(builder->disable_compliant_nested_types();)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_EngineVersion(ArrowWriterProperties::Builder* builder, ArrowWriterProperties::EngineVersion engineVersion)
{
TRYCATCH(builder->set_engine_version(engineVersion);)
}

PARQUETSHARP_EXPORT ExceptionInfo* ArrowWriterPropertiesBuilder_UseThreads(ArrowWriterProperties::Builder* builder, bool useThreads)
{
TRYCATCH(builder->set_use_threads(useThreads);)
}
}
111 changes: 111 additions & 0 deletions cpp/arrow/FileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#include <numeric>
#include <arrow/io/file.h>
#include <arrow/c/abi.h>
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <parquet/arrow/reader.h>

#include "cpp/ParquetSharpExport.h"
#include "../ExceptionInfo.h"

using namespace parquet::arrow;

extern "C"
{
PARQUETSHARP_EXPORT ExceptionInfo* FileReader_OpenPath(
const char* const path,
const parquet::ReaderProperties* reader_properties,
const parquet::ArrowReaderProperties* arrow_reader_properties,
FileReader** reader)
{
TRYCATCH
(
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::ReadableFile> input_file;
std::unique_ptr<FileReader> reader_ptr;
PARQUET_ASSIGN_OR_THROW(input_file, arrow::io::ReadableFile::Open(path, pool));
FileReaderBuilder builder;
PARQUET_THROW_NOT_OK(builder.Open(input_file, *reader_properties));
if (arrow_reader_properties != nullptr) {
builder.properties(*arrow_reader_properties);
}
PARQUET_THROW_NOT_OK(builder.Build(&reader_ptr));
*reader = reader_ptr.release();
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile>* readable_file_interface,
const parquet::ReaderProperties* reader_properties,
const parquet::ArrowReaderProperties* arrow_reader_properties,
FileReader** reader)
{
TRYCATCH
(
std::unique_ptr<FileReader> reader_ptr;
FileReaderBuilder builder;
PARQUET_THROW_NOT_OK(builder.Open(*readable_file_interface, *reader_properties));
if (arrow_reader_properties != nullptr) {
builder.properties(*arrow_reader_properties);
}
PARQUET_THROW_NOT_OK(builder.Build(&reader_ptr));
*reader = reader_ptr.release();
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_GetSchema(FileReader* reader, struct ArrowSchema* schema_out)
{
TRYCATCH
(
std::shared_ptr<arrow::Schema> schema;
PARQUET_THROW_NOT_OK(reader->GetSchema(&schema));
PARQUET_THROW_NOT_OK(arrow::ExportSchema(*schema, schema_out));
)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_NumRowGroups(FileReader* reader, int* num_row_groups)
{
TRYCATCH(*num_row_groups = reader->num_row_groups();)
}

PARQUETSHARP_EXPORT ExceptionInfo* FileReader_GetRecordBatchReader(
FileReader* reader,
const int32_t* row_groups,
int32_t row_groups_count,
const int32_t* columns,
int32_t columns_count,
struct ArrowArrayStream* stream_out)
{
TRYCATCH
(
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups_vec;
if (row_groups == nullptr)
{
row_groups_vec.resize(reader->num_row_groups());
std::iota(row_groups_vec.begin(), row_groups_vec.end(), 0);
}
else
{
row_groups_vec.resize(row_groups_count);
std::copy(row_groups, row_groups + row_groups_count, row_groups_vec.begin());
}
if (columns == nullptr)
{
PARQUET_THROW_NOT_OK(reader->GetRecordBatchReader(row_groups_vec, &batch_reader));
}
else
{
std::vector<int> columns_vec(columns_count);
std::copy(columns, columns + columns_count, columns_vec.begin());
PARQUET_THROW_NOT_OK(reader->GetRecordBatchReader(row_groups_vec, columns_vec, &batch_reader));
}
PARQUET_THROW_NOT_OK(arrow::ExportRecordBatchReader(batch_reader, stream_out));
)
}

PARQUETSHARP_EXPORT void FileReader_Free(FileReader* reader)
{
delete reader;
}
}
Loading

0 comments on commit 4fdc979

Please sign in to comment.