Skip to content

Commit cc9ac37

Browse files
committed
Add null_array serialization
1 parent 478d903 commit cc9ac37

File tree

4 files changed

+295
-1
lines changed

4 files changed

+295
-1
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ set(SPARROW_IPC_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
4444
set(SPARROW_IPC_HEADERS
4545
${SPARROW_IPC_INCLUDE_DIR}/config/config.hpp
4646
${SPARROW_IPC_INCLUDE_DIR}/serialize.hpp
47+
${SPARROW_IPC_INCLUDE_DIR}/serialize_null_array.hpp
4748
${SPARROW_IPC_INCLUDE_DIR}/utils.hpp
4849
)
4950

include/serialize_null_array.hpp

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#pragma once
2+
3+
// TODO check needs of all these below
4+
#include <cstdint>
5+
#include <cstring>
6+
#include <optional>
7+
#include <stdexcept>
8+
#include <string>
9+
#include <vector>
10+
11+
#include "sparrow.hpp"
12+
13+
// TODO check needs of these two
14+
#include "Message_generated.h"
15+
#include "Schema_generated.h"
16+
17+
#include "utils.hpp"
18+
19+
namespace sparrow_ipc
20+
{
21+
// TODO move to cpp if not templated
22+
// TODO ask to add comments and review them thouroughly
23+
24+
// This function serializes a sparrow::null_array into a byte vector compliant
25+
// with the Apache Arrow IPC Streaming Format. It mirrors the structure of
26+
// serialize_primitive_array but is optimized for null_array's properties.
27+
// A null_array is represented by metadata only (Schema, RecordBatch) and has no data buffers,
28+
// making its message body zero-length.
29+
std::vector<uint8_t> serialize_null_array(sparrow::null_array& arr)
30+
{
31+
// Use the Arrow C Data Interface to get a generic description of the array.
32+
// For a null_array, the ArrowArray struct will report n_buffers = 0.
33+
auto [arrow_arr_ptr, arrow_schema_ptr] = sparrow::get_arrow_structures(arr);
34+
auto& arrow_arr = *arrow_arr_ptr;
35+
auto& arrow_schema = *arrow_schema_ptr;
36+
37+
std::vector<uint8_t> final_buffer;
38+
39+
// I - Serialize the Schema message
40+
// This part is almost identical to how a primitive_array's schema is serialized.
41+
{
42+
flatbuffers::FlatBufferBuilder schema_builder;
43+
44+
flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0;
45+
if (arrow_schema.name)
46+
{
47+
fb_name_offset = schema_builder.CreateString(arrow_schema.name);
48+
}
49+
50+
// For null_array, the format string is "n".
51+
auto [type_enum, type_offset] = utils::get_flatbuffer_type(schema_builder, arrow_schema.format);
52+
53+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
54+
fb_metadata_offset = 0;
55+
56+
if (arr.metadata())
57+
{
58+
sparrow::key_value_view metadata_view = *(arr.metadata());
59+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
60+
// kv_offsets.reserve(metadata_view.size());
61+
// for (const auto& pair : metadata_view)
62+
// {
63+
// auto key_offset = schema_builder.CreateString(std::string(pair.first));
64+
// auto value_offset = schema_builder.CreateString(std::string(pair.second));
65+
// kv_offsets.push_back(
66+
// org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
67+
// }
68+
auto mv_it = metadata_view.cbegin();
69+
for (auto i = 0; i < metadata_view.size(); ++i, ++mv_it)
70+
{
71+
auto key_offset = schema_builder.CreateString(std::string((*mv_it).first));
72+
auto value_offset = schema_builder.CreateString(std::string((*mv_it).second));
73+
kv_offsets.push_back(
74+
org::apache::arrow::flatbuf::CreateKeyValue(schema_builder, key_offset, value_offset));
75+
}
76+
fb_metadata_offset = schema_builder.CreateVector(kv_offsets);
77+
}
78+
79+
auto fb_field = org::apache::arrow::flatbuf::CreateField(
80+
schema_builder,
81+
fb_name_offset,
82+
(arrow_schema.flags & static_cast<int64_t>(sparrow::ArrowFlag::NULLABLE)) != 0,
83+
type_enum,
84+
type_offset,
85+
0, // dictionary
86+
0, // children
87+
fb_metadata_offset);
88+
89+
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
90+
auto fb_fields = schema_builder.CreateVector(fields_vec);
91+
92+
auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
93+
94+
auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage(
95+
schema_builder,
96+
org::apache::arrow::flatbuf::MetadataVersion::V5,
97+
org::apache::arrow::flatbuf::MessageHeader::Schema,
98+
schema_offset.Union(),
99+
0 // bodyLength
100+
);
101+
schema_builder.Finish(schema_message_offset);
102+
103+
uint32_t schema_len = schema_builder.GetSize();
104+
final_buffer.resize(sizeof(uint32_t) + schema_len);
105+
memcpy(final_buffer.data() + sizeof(uint32_t), schema_builder.GetBufferPointer(), schema_len);
106+
*(reinterpret_cast<uint32_t*>(final_buffer.data())) = schema_len;
107+
}
108+
109+
// II - Serialize the RecordBatch message
110+
{
111+
flatbuffers::FlatBufferBuilder batch_builder;
112+
113+
// The FieldNode describes the layout (length and null count).
114+
// For a null_array, length and null_count are always equal.
115+
org::apache::arrow::flatbuf::FieldNode field_node_struct(arrow_arr.length, arrow_arr.null_count);
116+
auto fb_nodes_vector = batch_builder.CreateVectorOfStructs(&field_node_struct, 1);
117+
118+
// A null_array has no buffers. The ArrowArray struct reports n_buffers = 0,
119+
// so we create an empty vector of buffers for the Flatbuffers message.
120+
auto fb_buffers_vector = batch_builder.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>({});
121+
122+
auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch(batch_builder, arrow_arr.length, fb_nodes_vector, fb_buffers_vector);
123+
124+
// The bodyLength is 0 because there are no data buffers.
125+
auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage(
126+
batch_builder,
127+
org::apache::arrow::flatbuf::MetadataVersion::V5,
128+
org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
129+
record_batch_offset.Union(),
130+
0 // bodyLength
131+
);
132+
batch_builder.Finish(batch_message_offset);
133+
134+
uint32_t batch_meta_len = batch_builder.GetSize();
135+
int64_t aligned_batch_meta_len = utils::align_to_8(batch_meta_len);
136+
137+
size_t current_size = final_buffer.size();
138+
// Resize for the RecordBatch metadata. There is no body to append.
139+
final_buffer.resize(current_size + sizeof(uint32_t) + aligned_batch_meta_len);
140+
uint8_t* dst = final_buffer.data() + current_size;
141+
142+
*(reinterpret_cast<uint32_t*>(dst)) = batch_meta_len;
143+
dst += sizeof(uint32_t);
144+
memcpy(dst, batch_builder.GetBufferPointer(), batch_meta_len);
145+
memset(dst + batch_meta_len, 0, aligned_batch_meta_len - batch_meta_len);
146+
}
147+
148+
return final_buffer;
149+
}
150+
151+
// This function deserializes a byte vector into a sparrow::null_array.
152+
// It reads the Schema and RecordBatch messages to extract the array's length,
153+
// name, and metadata, then constructs a null_array.
154+
sparrow::null_array deserialize_null_array(const std::vector<uint8_t>& buffer)
155+
{
156+
const uint8_t* buf_ptr = buffer.data();
157+
size_t current_offset = 0;
158+
159+
// I - Deserialize the Schema message
160+
uint32_t schema_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
161+
current_offset += sizeof(uint32_t);
162+
auto schema_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
163+
if (schema_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::Schema)
164+
{
165+
throw std::runtime_error("Expected Schema message at the start of the buffer.");
166+
}
167+
auto flatbuffer_schema = static_cast<const org::apache::arrow::flatbuf::Schema*>(schema_message->header());
168+
auto fields = flatbuffer_schema->fields();
169+
if (fields->size() != 1)
170+
{
171+
throw std::runtime_error("Expected schema with exactly one field for null_array.");
172+
}
173+
auto field = fields->Get(0);
174+
if (field->type_type() != org::apache::arrow::flatbuf::Type::Null)
175+
{
176+
throw std::runtime_error("Expected Null type in schema.");
177+
}
178+
179+
std::optional<std::string> name;
180+
if (auto fb_name = field->name())
181+
{
182+
name = std::string(fb_name->c_str(), fb_name->size());
183+
}
184+
185+
// std::optional<sparrow::key_value_metadata> metadata;
186+
std::optional<std::vector<sparrow::metadata_pair>> metadata;
187+
if (auto fb_metadata = field->custom_metadata())
188+
{
189+
if (fb_metadata->size() > 0)
190+
{
191+
metadata = std::vector<sparrow::metadata_pair>();
192+
metadata->reserve(fb_metadata->size());
193+
// sparrow::metadata_map map;
194+
for (const auto& kv : *fb_metadata)
195+
{
196+
// map.emplace(kv->key()->str(), kv->value()->str());
197+
metadata->emplace_back(kv->key()->str(), kv->value()->str());
198+
}
199+
// metadata = sparrow::key_value_metadata(map);
200+
}
201+
}
202+
203+
current_offset += schema_meta_len;
204+
205+
// II - Deserialize the RecordBatch message
206+
uint32_t batch_meta_len = *(reinterpret_cast<const uint32_t*>(buf_ptr + current_offset));
207+
current_offset += sizeof(uint32_t);
208+
auto batch_message = org::apache::arrow::flatbuf::GetMessage(buf_ptr + current_offset);
209+
if (batch_message->header_type() != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
210+
{
211+
throw std::runtime_error("Expected RecordBatch message, but got a different type.");
212+
}
213+
auto record_batch = static_cast<const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header());
214+
215+
// The body is empty, so we don't need to read any further.
216+
// Construct the null_array from the deserialized metadata.
217+
return sparrow::null_array(record_batch->length(), name, metadata);
218+
}
219+
}

tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ find_package(doctest CONFIG REQUIRED)
44

55
set(test_target "test_sparrow_ipc_lib")
66

7-
add_executable(${test_target} main.cpp test_primitive_array.cpp test_utils.cpp)
7+
add_executable(${test_target} main.cpp test_utils.cpp test_primitive_array.cpp test_serialize_null_array.cpp)
88
target_link_libraries(${test_target}
99
PRIVATE
1010
sparrow-ipc

tests/test_serialize_null_array.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include "doctest/doctest.h"
2+
#include "sparrow.hpp"
3+
4+
#include "serialize_null_array.hpp"
5+
6+
namespace sparrow_ipc
7+
{
8+
namespace sp = sparrow;
9+
// TODO have a generic compare_metadata in tests/helpers.hpp, cpp
10+
// taking pa and na
11+
void compare_metadata(sp::null_array& na1, sp::null_array& na2)
12+
{
13+
if (!na1.metadata().has_value())
14+
{
15+
CHECK(!na2.metadata().has_value());
16+
return;
17+
}
18+
19+
CHECK(na2.metadata().has_value());
20+
sp::key_value_view kvs1_view = *(na1.metadata());
21+
sp::key_value_view kvs2_view = *(na2.metadata());
22+
23+
CHECK_EQ(kvs1_view.size(), kvs2_view.size());
24+
auto kvs1_it = kvs1_view.cbegin();
25+
auto kvs2_it = kvs2_view.cbegin();
26+
for (auto i = 0; i < kvs1_view.size(); ++i)
27+
{
28+
CHECK_EQ(*kvs1_it, *kvs2_it);
29+
++kvs1_it;
30+
++kvs2_it;
31+
}
32+
}
33+
34+
TEST_CASE("Serialize and deserialize null_array")
35+
{
36+
const std::size_t size = 10;
37+
const std::string_view name = "my_null_array";
38+
39+
const std::vector<sp::metadata_pair> metadata_vec = {{"key1", "value1"}, {"key2", "value2"}};
40+
const std::optional<std::vector<sp::metadata_pair>> metadata = metadata_vec;
41+
42+
sp::null_array arr(size, name, metadata);
43+
44+
auto buffer = serialize_null_array(arr);
45+
auto deserialized_arr = deserialize_null_array(buffer);
46+
47+
CHECK_EQ(deserialized_arr.size(), arr.size());
48+
REQUIRE(deserialized_arr.name().has_value());
49+
CHECK_EQ(deserialized_arr.name().value(), arr.name().value());
50+
51+
REQUIRE(deserialized_arr.metadata().has_value());
52+
compare_metadata(arr, deserialized_arr);
53+
54+
// Check the deserialized object is a null_array
55+
const auto& arrow_proxy = sp::detail::array_access::get_arrow_proxy(deserialized_arr);
56+
CHECK_EQ(arrow_proxy.format(), "n");
57+
CHECK_EQ(arrow_proxy.n_children(), 0);
58+
CHECK_EQ(arrow_proxy.flags(), std::unordered_set<sp::ArrowFlag>{sp::ArrowFlag::NULLABLE});
59+
CHECK_EQ(arrow_proxy.name(), name);
60+
CHECK_EQ(arrow_proxy.dictionary(), nullptr);
61+
CHECK_EQ(arrow_proxy.buffers().size(), 0);
62+
}
63+
64+
TEST_CASE("Serialize and deserialize null_array with no name and no metadata")
65+
{
66+
const std::size_t size = 100;
67+
sp::null_array arr(size);
68+
auto buffer = serialize_null_array(arr);
69+
auto deserialized_arr = deserialize_null_array(buffer);
70+
CHECK_EQ(deserialized_arr.size(), arr.size());
71+
CHECK_FALSE(deserialized_arr.name().has_value());
72+
CHECK_FALSE(deserialized_arr.metadata().has_value());
73+
}
74+
}

0 commit comments

Comments
 (0)