-
Notifications
You must be signed in to change notification settings - Fork 3
Add output streams and serializers #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add output streams and serializers #29
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Introduces a new streaming-based serialization architecture replacing vector-returning functions with pluggable output_stream abstractions (memory, file, chunked) and adds higher-level serializer / chunk_serializer utilities. Refactors flatbuffer construction logic into flatbuffer_utils, moves type mapping out of utils, and adds size‑estimation helpers for preallocation.
- Added output_stream interface with memory, file, and chunked implementations plus new (chunk_)serializer classes
- Refactored flatbuffer and body serialization into modular helpers; functions now write directly to streams
- Added size estimation utilities (calculate_*_message_size / calculate_total_serialized_size) and extensive new tests
Reviewed Changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 20 comments.
Show a summary per file
File | Description |
---|---|
src/utils.cpp | Removes flatbuffer type logic; retains parsing & alignment helpers |
src/flatbuffer_utils.cpp / include/sparrow_ipc/flatbuffer_utils.hpp | New central flatbuffer construction and buffer/node utilities |
src/serialize_utils.cpp / include/sparrow_ipc/serialize_utils.hpp | Stream-oriented serialization helpers and size calculators |
src/serialize.cpp / include/sparrow_ipc/serialize.hpp | Stream-based schema & record batch serialization entry points |
src/serializer.cpp / include/sparrow_ipc/serializer.hpp | Adds serializer class for continuous IPC stream writing |
src/chunk_memory_serializer.cpp / include/sparrow_ipc/chunk_memory_serializer.hpp | Adds chunked (per-message vector) serialization |
include/sparrow_ipc/output_stream.hpp | Defines abstract output_stream interface |
include/sparrow_ipc/memory_output_stream.hpp | In-memory implementation |
include/sparrow_ipc/file_output_stream.hpp / src/file_output_stream.cpp | File-backed implementation |
include/sparrow_ipc/chunk_memory_output_stream.hpp | Chunked multi-vector output stream (name misspelled) |
tests/* | Updated & expanded tests for new streaming APIs and utilities |
CMakeLists.txt / tests/CMakeLists.txt | Adds new sources & headers to build system |
include/sparrow_ipc/utils.hpp | API changes: align_to_8 signature and new parse_format exposure |
Other headers | Minor adjustments (e.g., added ) |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
include/sparrow_ipc/serializer.hpp
Outdated
template <std::ranges::input_range R> | ||
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch> | ||
serializer(const R& record_batches, output_stream& stream) | ||
: m_pstream(&stream) | ||
, m_dtypes(get_column_dtypes(record_batches[0])) | ||
{ | ||
if (record_batches.empty()) | ||
{ | ||
throw std::invalid_argument("Record batches collection is empty"); | ||
} |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multi-batch constructor writes the first record batch twice: once via append(record_batches) (which iterates all elements) after already serializing record_batches[0]. This duplicates the first batch in the output. Either start the append loop from the second element or exclude record_batches[0] from append.
Copilot uses AI. Check for mistakes.
}; | ||
m_pstream->reserve(reserve_function); | ||
serialize_schema_message(record_batches[0], *m_pstream); | ||
append(record_batches); |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multi-batch constructor writes the first record batch twice: once via append(record_batches) (which iterates all elements) after already serializing record_batches[0]. This duplicates the first batch in the output. Either start the append loop from the second element or exclude record_batches[0] from append.
append(record_batches); | |
if (std::ranges::distance(record_batches) > 1) { | |
append(std::ranges::subrange(std::next(record_batches.begin()), record_batches.end())); | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we first serialize the schema, then we serialize the arrays
template <std::ranges::input_range R> | ||
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch> | ||
chunk_serializer( | ||
const R& record_batches, | ||
chuncked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream | ||
) | ||
: m_pstream(&stream) | ||
{ | ||
if (record_batches.empty()) | ||
{ | ||
throw std::invalid_argument("Record batches collection is empty"); | ||
} | ||
m_dtypes = get_column_dtypes(record_batches[0]); |
Copilot
AI
Oct 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same duplication issue: the first record batch is serialized twice (once explicitly and once through append(record_batches)). Adjust append to skip the first element or iterate from the second batch here.
Copilot uses AI. Check for mistakes.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #29 +/- ##
=======================================
Coverage ? 78.46%
=======================================
Files ? 32
Lines ? 1291
Branches ? 0
=======================================
Hits ? 1013
Misses ? 278
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Co-authored-by: Copilot <[email protected]>
…m/Alex-PLACET/sparrow-ipc into add_output_stream_and_serializers
Co-authored-by: Copilot <[email protected]>
14952d0
to
75c9827
Compare
CHECK_EQ(utils::align_to_8(15), 16); | ||
CHECK_EQ(utils::align_to_8(16), 16); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to another test file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from another test file
…m/Alex-PLACET/sparrow-ipc into add_output_stream_and_serializers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is neat, I have just a few remarks regarding the architecture and API.
API
We want to be able to write something like:
std::ofstream out("my_file");
auto serializer ser(out);
ser << my_schema << my_array << my_list_of_batches << end_of_record;
Serializer
This means that:
- The serializer must accept any kind of streams, including the standard ones (see next section for more detail)
- The serializer should store an internal state (Waiting for a schema before accepting array ror record batches for instance, etc) so that its constructor does not need to accept a record_batch
The serializer methods names should reflect the stream methods names (i.e. append is actually write); also it should contain all the logic specific to sparrow, like adding padding.
Streams
The hierarchy of streams is actually a hierarchy of stream adaptor. I think this hierarchy can be removed, and some concepts can be provided instead, to help the implementation of the serializer:
template <class T>
concept output_stream = requires(T& t, const char* str)
{
t.write(str, size_t(0));
t.flush();
};
template <class T>
concept reservable_output_stream = output_stream<T> && requires(T& t)
{
t.reserve(size_t(0));
};
If you need a layer between the streams and the serializer to adapt the signatures (because streams accept const char*
while you serialize into std::span<uint8_t>
, a stream_adapter
class can be used to avoid repeating the cast everywhere. This layer should not contain additional logic like add_padding, which should live in the serializer.
Markers
The markers (for indicating the end of a record_batch list for instance) should follow the same pattern as std:endl for instance: a function accepting and returning a serializer.
No description provided.