Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_serializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
Expand All @@ -109,24 +111,34 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/file_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/memory_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp
)

set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/chunk_memory_serializer.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
${SPARROW_IPC_SOURCE_DIR}/encapsulated_message.cpp
${SPARROW_IPC_SOURCE_DIR}/file_output_stream.cpp
${SPARROW_IPC_SOURCE_DIR}/flatbuffer_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/metadata.cpp
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
${SPARROW_IPC_SOURCE_DIR}/serializer.cpp
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
)

Expand Down
81 changes: 81 additions & 0 deletions include/sparrow_ipc/chunk_memory_output_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include <cstdint>
#include <ranges>

#include "sparrow_ipc/output_stream.hpp"

namespace sparrow_ipc
{
template <typename R>
requires std::ranges::random_access_range<R>
&& std::ranges::random_access_range<std::ranges::range_value_t<R>>
&& std::same_as<typename std::ranges::range_value_t<R>::value_type, uint8_t>
class chuncked_memory_output_stream final : public output_stream
{
public:

explicit chuncked_memory_output_stream(R& chunks)
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in class name 'chuncked_memory_output_stream' – should be 'chunked_memory_output_stream' to match file name and conventional spelling.

Suggested change
class chuncked_memory_output_stream final : public output_stream
{
public:
explicit chuncked_memory_output_stream(R& chunks)
class chunked_memory_output_stream final : public output_stream
{
public:
explicit chunked_memory_output_stream(R& chunks)

Copilot uses AI. Check for mistakes.

: m_chunks(&chunks) {};

std::size_t write(std::span<const std::uint8_t> span) override
{
m_chunks->emplace_back(span.begin(), span.end());
return span.size();
}

std::size_t write(std::vector<uint8_t>&& buffer)
{
m_chunks->emplace_back(std::move(buffer));
return m_chunks->back().size();
}

std::size_t write(uint8_t value, std::size_t count) override
{
m_chunks->emplace_back(count, value);
return count;
}

void reserve(std::size_t size) override
{
m_chunks->reserve(size);
}

void reserve(const std::function<std::size_t()>& calculate_reserve_size) override
{
m_chunks->reserve(calculate_reserve_size());
}

size_t size() const override
{
return std::accumulate(
m_chunks->begin(),
m_chunks->end(),
0,
[](size_t acc, const auto& chunk)
{
return acc + chunk.size();
}
);
}

void flush() override
{
// Implementation for flushing memory
}

void close() override
{
// Implementation for closing the stream
}

bool is_open() const override
{
return true;
}

private:

R* m_chunks;
};
}
73 changes: 73 additions & 0 deletions include/sparrow_ipc/chunk_memory_serializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <sparrow/record_batch.hpp>

#include "sparrow_ipc/chunk_memory_output_stream.hpp"
#include "sparrow_ipc/memory_output_stream.hpp"
#include "sparrow_ipc/serialize.hpp"
#include "sparrow_ipc/serialize_utils.hpp"

namespace sparrow_ipc
{
class chunk_serializer
{
public:

chunk_serializer(
const sparrow::record_batch& rb,
chuncked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream
);

template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
chunk_serializer(
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same range constraint issue: constructor uses record_batches[0] and record_batches.size(); tighten constraints to std::ranges::random_access_range + sized_range or adapt to first/advance.

Copilot uses AI. Check for mistakes.

const R& record_batches,
chuncked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream
)
: m_pstream(&stream)
{
if (record_batches.empty())
{
throw std::invalid_argument("Record batches collection is empty");
}
m_dtypes = get_column_dtypes(record_batches[0]);
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same duplication issue: the first record batch is serialized twice (once explicitly and once through append(record_batches)). Adjust append to skip the first element or iterate from the second batch here.

Copilot uses AI. Check for mistakes.


m_pstream->reserve(record_batches.size() + 1);
std::vector<uint8_t> buffer;
memory_output_stream schema_stream(buffer);
serialize_schema_message(record_batches[0], schema_stream);
m_pstream->write(std::move(buffer));
append(record_batches);
}

void append(const sparrow::record_batch& rb);

template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
void append(const R& record_batches)
{
if (m_ended)
{
throw std::runtime_error("Cannot append to a serializer that has been ended");
}

m_pstream->reserve(m_pstream->size() + record_batches.size());

for (const auto& rb : record_batches)
{
std::vector<uint8_t> buffer;
memory_output_stream stream(buffer);
serialize_record_batch(rb, stream);
m_pstream->write(std::move(buffer));
}
}

void end();

private:

std::vector<sparrow::data_type> m_dtypes;
chuncked_memory_output_stream<std::vector<std::vector<uint8_t>>>* m_pstream;
bool m_ended{false};
};
}
1 change: 1 addition & 0 deletions include/sparrow_ipc/encapsulated_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cstdint>
#include <variant>
#include <span>

#include "Message_generated.h"

Expand Down
36 changes: 36 additions & 0 deletions include/sparrow_ipc/file_output_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <fstream>
#include <functional>

#include "sparrow_ipc/output_stream.hpp"


namespace sparrow_ipc
{
class SPARROW_IPC_API file_output_stream final : public output_stream
{
public:

explicit file_output_stream(std::ofstream& file);

std::size_t write(std::span<const std::uint8_t> span) override;

std::size_t write(uint8_t value, std::size_t count = 1) override;

size_t size() const override;

void reserve(std::size_t size) override;

void reserve(const std::function<std::size_t()>& calculate_reserve_size) override;

void flush() override;

void close() override;

bool is_open() const override;

private:

std::ofstream& m_file;
size_t m_written_bytes = 0;
};
}
Loading
Loading