From ad790f3ead92b2e97e5a34db5242899c45d3ff7a Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Wed, 15 Jan 2025 21:07:23 +0800 Subject: [PATCH 01/10] feat: add file_io and local fs impl add reader and writer interfaces Signed-off-by: Junwang Zhao --- src/iceberg/file_io.h | 110 +++++++++++++++++++++++++++++++ src/iceberg/io/CMakeLists.txt | 61 +++++++++++++++++ src/iceberg/io/fs_file_io.cc | 105 +++++++++++++++++++++++++++++ src/iceberg/io/fs_file_io.h | 94 ++++++++++++++++++++++++++ src/iceberg/io/fs_file_reader.cc | 81 +++++++++++++++++++++++ src/iceberg/io/fs_file_reader.h | 65 ++++++++++++++++++ src/iceberg/io/fs_file_writer.cc | 89 +++++++++++++++++++++++++ src/iceberg/io/fs_file_writer.h | 83 +++++++++++++++++++++++ src/iceberg/reader.h | 68 +++++++++++++++++++ src/iceberg/writer.h | 82 +++++++++++++++++++++++ test/io/CMakeLists.txt | 24 +++++++ test/io/fs_file_io_test.cc | 45 +++++++++++++ 12 files changed, 907 insertions(+) create mode 100644 src/iceberg/file_io.h create mode 100644 src/iceberg/io/CMakeLists.txt create mode 100644 src/iceberg/io/fs_file_io.cc create mode 100644 src/iceberg/io/fs_file_io.h create mode 100644 src/iceberg/io/fs_file_reader.cc create mode 100644 src/iceberg/io/fs_file_reader.h create mode 100644 src/iceberg/io/fs_file_writer.cc create mode 100644 src/iceberg/io/fs_file_writer.h create mode 100644 src/iceberg/reader.h create mode 100644 src/iceberg/writer.h create mode 100644 test/io/CMakeLists.txt create mode 100644 test/io/fs_file_io_test.cc diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h new file mode 100644 index 00000000..08600bc2 --- /dev/null +++ b/src/iceberg/file_io.h @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/reader.h" +#include "iceberg/writer.h" + +namespace iceberg { + +/// \brief An interface used to read input files using Reader and AsyncReader +class ICEBERG_EXPORT InputFile { + public: + explicit InputFile(std::string location) : location_(std::move(location)) {} + + virtual ~InputFile() = default; + + /// \brief Checks whether the file exists. + virtual bool exists() const = 0; + /// \brief Returns the total length of the file, in bytes. + virtual int64_t getLength() const = 0; + + /// \brief Get a Reader instance to read bytes from the file. + virtual std::unique_ptr newReader() = 0; + + /// \brief Get the file location + const std::string& location() const { return location_; } + + protected: + std::string location_; +}; + +/// \brief An interface used to write output files using Writer and AsyncWriter +class ICEBERG_EXPORT OutputFile { + public: + explicit OutputFile(std::string location) : location_(std::move(location)) {} + + virtual ~OutputFile() = default; + + /// \brief Create the file. + /// + /// If the file exists, or an error will be thrown. + virtual void create() = 0; + + /// \brief Get a Writer instance to write bytes to the file. + virtual std::unique_ptr newWriter() = 0; + + /// \brief Get the file location + const std::string& location() const { return location_; } + + /// \brief Return an InputFile for the location of this OutputFile. + virtual std::shared_ptr toInputFile() const = 0; + + protected: + std::string location_; +}; + +/// \brief Pluggable module for reading, writing, and deleting files. +/// +/// Both table metadata files and data files can be written and read by this module. +class ICEBERG_EXPORT FileIO { + public: + explicit FileIO(std::string name) : name_(std::move(name)) {} + + virtual ~FileIO() = default; + + /// \brief Get an InputFile + /// + /// Get a InputFile instance to read bytes from the file at the given location. + virtual std::shared_ptr newInputFile(const std::string& location) = 0; + + /// \brief Get an OutputFile + /// + /// Get a OutputFile instance to write bytes to the file at the given location. + virtual std::shared_ptr newOutputFile(const std::string& location) = 0; + + /// \brief Delete file + /// + /// Delete the file at the given location. + virtual void DeleteFile(const std::string& location) = 0; + void DeleteFile(const InputFile& ifile) { return DeleteFile(ifile.location()); } + void DeleteFile(const OutputFile& ofile) { return DeleteFile(ofile.location()); } + + const std::string& name() const { return name_; } + + protected: + std::string name_; +}; + +} // namespace iceberg diff --git a/src/iceberg/io/CMakeLists.txt b/src/iceberg/io/CMakeLists.txt new file mode 100644 index 00000000..bce36223 --- /dev/null +++ b/src/iceberg/io/CMakeLists.txt @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +if(NOT ICEBERG_IO) + return() +endif() + +set(ICEBERG_IO_SOURCES fs_file_io.cc fs_file_reader.cc fs_file_writer.cc) +set(ICEBERG_IO_INCLUDES "${ICEBERG_INCLUDES}") + +# Libraries to link with exported libiceberg_io.{so,a}. +set(ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS) +set(ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS) +set(ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS) +set(ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS) + +list(APPEND ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS + "$,iceberg_static,iceberg_shared>") +list(APPEND ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS + "$,iceberg_shared,iceberg_static>") +list(APPEND + ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS + "$,Iceberg::iceberg_static,Iceberg::iceberg_shared>" +) +list(APPEND + ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS + "$,Iceberg::iceberg_shared,Iceberg::iceberg_static>" +) + +add_iceberg_lib(iceberg_io + SOURCES + ${ICEBERG_IO_SOURCES} + PRIVATE_INCLUDES + ${ICEBERG_IO_INCLUDES} + SHARED_LINK_LIBS + ${ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS} + STATIC_LINK_LIBS + ${ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS} + STATIC_INSTALL_INTERFACE_LIBS + ${ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS} + SHARED_INSTALL_INTERFACE_LIBS + ${ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS}) + +iceberg_install_all_headers(iceberg/io) + +install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_io_export.h + DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg/io) diff --git a/src/iceberg/io/fs_file_io.cc b/src/iceberg/io/fs_file_io.cc new file mode 100644 index 00000000..15d508ba --- /dev/null +++ b/src/iceberg/io/fs_file_io.cc @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/io/fs_file_io.h" + +#include + +#include +#include +#include + +#include + +#include "iceberg/exception.h" +#include "iceberg/io/fs_file_reader.h" +#include "iceberg/io/fs_file_writer.h" + +namespace iceberg::io { + +bool FsInputFile::exists() const { return std::filesystem::exists(location()); } + +int64_t FsInputFile::getLength() const { + struct stat stat_buffer; + if (stat(location().c_str(), &stat_buffer) != 0) { + throw IcebergError(std::format( + "Failed to get file length. File does not exist or is inaccessible: {}", + location_)); + } + return stat_buffer.st_size; +} + +std::unique_ptr FsInputFile::newReader() { + return std::make_unique(location_); +} + +void FsOutputFile::create() { + // Check if the file already exists + std::ifstream existing_file(location_); + bool file_exists = existing_file.good(); + existing_file.close(); + + if (file_exists) { + throw IcebergError(std::format("File already exists: {}", location_)); + } + + // Create or overwrite the file by opening it in truncating mode + std::ofstream new_file(location_, std::ios::binary | std::ios::out | std::ios::trunc); + if (!new_file.is_open()) { + throw IcebergError(std::format("Failed to create or overwrite file: {}", location_)); + } + new_file.close(); +} + +std::unique_ptr FsOutputFile::newWriter() { + return std::make_unique(location_); +} + +std::shared_ptr FsFileIO::newInputFile(const std::string& location) { + // Check if the file exists + if (!fileExists(location)) { + throw IcebergError(std::format("InputFile does not exist: {}", location)); + } + + // Create and return an FsInputFile instance + return std::make_shared(location); +} + +std::shared_ptr FsFileIO::newOutputFile(const std::string& location) { + return std::make_shared(location); +} + +void FsFileIO::DeleteFile(const std::string& location) { + // Check if the file exists + if (!fileExists(location)) { + throw IcebergError(std::format("InputFile does not exist: {}", location)); + } + std::error_code ec; + if (std::filesystem::remove(location, ec) == false) { + throw IcebergError( + std::format("Failed to delete file: {}, error code: {}", location, ec.message())); + } +} + +bool FsFileIO::fileExists(const std::string& location) { + std::ifstream file(location); + return file.good(); +} + +} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_io.h b/src/iceberg/io/fs_file_io.h new file mode 100644 index 00000000..35440cdc --- /dev/null +++ b/src/iceberg/io/fs_file_io.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_io.h" +#include "iceberg/io/iceberg_io_export.h" + +namespace iceberg::io { + +class ICEBERG_IO_EXPORT FsInputFile : public InputFile { + public: + explicit FsInputFile(std::string location) : InputFile(std::move(location)) {} + ~FsInputFile() override = default; + + /// \brief Checks whether the file exists. + bool exists() const override; + + /// \brief Returns the total length of the file, in bytes. + int64_t getLength() const override; + + /// \brief Get a Reader instance to read bytes from the file. + std::unique_ptr newReader() override; +}; + +class ICEBERG_IO_EXPORT FsOutputFile : public OutputFile { + public: + explicit FsOutputFile(std::string location) : OutputFile(std::move(location)) {} + ~FsOutputFile() override = default; + + /// \brief Create the file, optionally overwriting if it exists. + /// + /// If the file already exists, an exception is thrown. + void create() override; + + /// \brief Get a Writer instance to write bytes to the file. + /// + /// Returns a unique pointer to a `FsFileWriter`. + std::unique_ptr newWriter() override; + + /// \brief Return an InputFile for the location of this OutputFile. + /// + /// Creates an FsInputFile for reading the file pointed by this OutputFile's location. + std::shared_ptr toInputFile() const override { + return std::make_shared(location_); + } +}; + +/// \brief A concrete implementation of FileIO for file system. +class ICEBERG_IO_EXPORT FsFileIO : public FileIO { + public: + explicit FsFileIO(const std::string& name) : FileIO(name) {} + + ~FsFileIO() override = default; + + /// \brief Get an InputFile + /// + /// Returns a shared pointer to an FsInputFile instance, representing the file at + /// `location`. + std::shared_ptr newInputFile(const std::string& location) override; + + /// \brief Get an OutputFile + /// + /// Returns a shared pointer to an FsOutputFile instance, representing the file at + /// `location`. + std::shared_ptr newOutputFile(const std::string& location) override; + + /// \brief Delete file + /// + /// Deletes the file at the given location using file system operations. + void DeleteFile(const std::string& location) override; + + private: + /// \brief Check if a file exists + bool fileExists(const std::string& location); +}; + +} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_reader.cc b/src/iceberg/io/fs_file_reader.cc new file mode 100644 index 00000000..16337c4e --- /dev/null +++ b/src/iceberg/io/fs_file_reader.cc @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/io/fs_file_reader.h" + +#include + +#include "iceberg/exception.h" + +namespace iceberg::io { + +FsFileReader::FsFileReader(std::string file_path) : file_path_(std::move(file_path)) { + // Open the file in binary mode + input_file_.open(file_path_, std::ios::binary | std::ios::in); + if (!input_file_.is_open()) { + throw IcebergError(std::format("Failed to open file: {}", file_path_)); + } + + // Calculate the file size + input_file_.seekg(0, std::ios::end); + file_size_ = input_file_.tellg(); + input_file_.seekg(0, std::ios::beg); + + if (file_size_ < 0) { + throw IcebergError(std::format("Failed to determine file size: {}", file_path_)); + } +} + +FsFileReader::~FsFileReader() { + if (input_file_.is_open()) { + input_file_.close(); + } +} + +int64_t FsFileReader::read(int64_t offset, int64_t length, void* buffer) { + if (!input_file_.is_open()) { + throw IcebergError("File is not open for reading"); + } + + if (offset < 0 || offset + length > file_size_) { + throw IcebergError( + std::format("Invalid read range: [{}, {})", offset, offset + length)); + } + + // Seek to the starting position + input_file_.seekg(offset, std::ios::beg); + if (input_file_.fail()) { + throw IcebergError(std::format("Failed to seek to offset: {}", offset)); + } + + // Read the data into the buffer + input_file_.read(static_cast(buffer), length); + auto bytes_read = static_cast(input_file_.gcount()); + + return bytes_read; // Return actual bytes read +} + +std::future FsFileReader::readAsync(int64_t offset, int64_t length, + void* buffer) { + return std::async(std::launch::async, [this, offset, length, buffer]() -> int64_t { + return this->read(offset, length, buffer); + }); +} + +} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_reader.h b/src/iceberg/io/fs_file_reader.h new file mode 100644 index 00000000..8ea0d2ec --- /dev/null +++ b/src/iceberg/io/fs_file_reader.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "iceberg/io/iceberg_io_export.h" +#include "iceberg/reader.h" + +namespace iceberg::io { + +/// \brief A concrete implementation of Reader for file system-based reading. +class ICEBERG_IO_EXPORT FsFileReader : public Reader { + public: + explicit FsFileReader(std::string file_path); + + ~FsFileReader() override; + + // Disable copy constructor and assignment operator + FsFileReader(const FsFileReader&) = delete; + FsFileReader& operator=(const FsFileReader&) = delete; + + /// \brief Get the size of the file. + int64_t getSize() const override { return file_size_; } + + /// \brief Read a range of bytes from the file synchronously. + /// + /// \param offset The starting offset of the read operation. + /// \param length The number of bytes to read. + /// \param buffer The buffer address to write the bytes to. + /// \return The actual number of bytes read. + int64_t read(int64_t offset, int64_t length, void* buffer) override; + + /// \brief Read a range of bytes from the file asynchronously. + /// + /// \param offset The starting offset of the read operation. + /// \param length The number of bytes to read. + /// \param buffer The buffer address to write the bytes to. + /// \return A future resolving to the actual number of bytes read. + std::future readAsync(int64_t offset, int64_t length, void* buffer) override; + + private: + std::string file_path_; // Path to the file being read. + mutable std::ifstream input_file_; // File stream for reading. + int64_t file_size_{0}; // Size of the file in bytes. +}; + +} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_writer.cc b/src/iceberg/io/fs_file_writer.cc new file mode 100644 index 00000000..854d0150 --- /dev/null +++ b/src/iceberg/io/fs_file_writer.cc @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/io/fs_file_writer.h" + +#include "iceberg/exception.h" + +namespace iceberg::io { + +FsFileWriter::FsFileWriter(std::string file_path) : file_path_(std::move(file_path)) { + // Open the file in binary write mode, truncating any existing file + output_file_.open(file_path_, std::ios::binary | std::ios::out | std::ios::trunc); + if (!output_file_.is_open()) { + throw IcebergError(std::format("Failed to open file for writing: {}", file_path_)); + } + // Calculate the file size after opening the file + output_file_.seekp(0, std::ios::end); + file_size_ = output_file_.tellp(); +} + +FsFileWriter::~FsFileWriter() { + if (output_file_.is_open()) { + output_file_.close(); + } +} + +int64_t FsFileWriter::write(int64_t offset, const void* buffer, int64_t length) { + if (!output_file_.is_open()) { + throw IcebergError(std::format("File is not open for writing: {}", file_path_)); + } + + if (offset < 0) { + throw IcebergError( + std::format("Invalid write range. Offset must be non-negative: {}", offset)); + } + + // Seek the position to write + output_file_.seekp(offset, std::ios::beg); + if (output_file_.fail()) { + throw IcebergError(std::format("Failed to seek to offset: {}", offset)); + } + + // Write data to the file + output_file_.write(static_cast(buffer), length); + if (output_file_.fail()) { + throw IcebergError("Failed to write data to file."); + } + + // Update the file size based on the last written position + file_size_ = std::max(file_size_, offset + length); + + return length; // Return number of bytes successfully written +} + +std::future FsFileWriter::writeAsync(int64_t offset, const void* buffer, + int64_t length) { + return std::async(std::launch::async, [this, offset, buffer, length]() { + return this->write(offset, buffer, length); + }); +} + +void FsFileWriter::flush() { + if (!output_file_.is_open()) { + throw IcebergError(std::format("File is not open for flushing: {}", file_path_)); + } + + output_file_.flush(); + if (output_file_.fail()) { + throw IcebergError("Failed to flush data to file."); + } +} + +} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_writer.h b/src/iceberg/io/fs_file_writer.h new file mode 100644 index 00000000..0f68903d --- /dev/null +++ b/src/iceberg/io/fs_file_writer.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/io/iceberg_io_export.h" +#include "iceberg/writer.h" + +namespace iceberg::io { + +/// \brief A concrete implementation of Writer for file system files. +class ICEBERG_IO_EXPORT FsFileWriter : public Writer { + public: + explicit FsFileWriter(std::string file_path); + + ~FsFileWriter() override; + + // Disable copy constructor and assignment operator + FsFileWriter(const FsFileWriter&) = delete; + FsFileWriter& operator=(const FsFileWriter&) = delete; + + /// \brief Get the size of the file currently allocated. + /// + /// This implementation assumes size equals the last written offset + length of the + /// written bytes. + int64_t getSize() const override { return file_size_; } + + /// \brief Write data to the file synchronously. + /// + /// \param offset The starting offset of the write operation. + /// \param buffer The buffer address containing the bytes to write. + /// \param length The number of bytes to write. + /// \return The actual number of bytes written. + int64_t write(int64_t offset, const void* buffer, int64_t length) override; + + /// \brief Write data to the file asynchronously. + /// + /// \param offset The starting offset of the write operation. + /// \param buffer The buffer address containing the bytes to write. + /// \return A future resolving to the actual number of bytes written. + std::future writeAsync(int64_t offset, const void* buffer, + int64_t length) override; + + /// \brief Flush the written data to the file synchronously. + /// + /// This ensures all buffered data is committed to disk. + void flush() override; + + /// \brief Flush the written data to the file asynchronously. + /// + /// This ensures all buffered data is committed to disk. + std::future flushAsync() override { + return std::async(std::launch::async, [this]() { this->flush(); }); + } + + private: + std::string file_path_; // Path of the file being written to + mutable std::ofstream output_file_; // File stream for writing + int64_t file_size_; // Current file size after last write +}; + +} // namespace iceberg::io diff --git a/src/iceberg/reader.h b/src/iceberg/reader.h new file mode 100644 index 00000000..a85e56ba --- /dev/null +++ b/src/iceberg/reader.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/exception.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Interface for reading bytes from a file. +class ICEBERG_EXPORT Reader { + public: + Reader() = default; + virtual ~Reader() = default; + + /// \brief Get the size of the file. + virtual int64_t getSize() const { throw IcebergError("getSize() not implemented"); } + + /// \brief Get the size of the file (asynchronous). + /// + /// \return A future resolving to the file size in bytes. + virtual std::future getSizeAsync() const { + return std::async(std::launch::deferred, [this] { return getSize(); }); + } + + /// \brief Read a range of bytes from the file. + /// + /// \param offset The starting offset of the read operation. + /// \param length The number of bytes to read. + /// \param buffer The buffer address to write the bytes to. + /// \return The actual number of bytes read. + virtual int64_t read(int64_t offset, int64_t length, void* buffer) { + throw IcebergError("read() not implemented"); + } + + /// \brief Read a range of bytes from the file (asynchronous). + /// + /// \param offset The starting offset of the read operation. + /// \param length The number of bytes to read. + /// \param buffer The buffer address to write the bytes to. + /// \return A future resolving to the actual number of bytes read. + virtual std::future readAsync(int64_t offset, int64_t length, void* buffer) { + return std::async(std::launch::deferred, [this, offset, length, buffer] { + return read(offset, length, buffer); + }); + } +}; + +} // namespace iceberg diff --git a/src/iceberg/writer.h b/src/iceberg/writer.h new file mode 100644 index 00000000..d52627fa --- /dev/null +++ b/src/iceberg/writer.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/exception.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Interface for writing bytes to a file. +class ICEBERG_EXPORT Writer { + public: + virtual ~Writer() = default; + + /// \brief Get the size of the file currently allocated. + /// + /// Default implementation throws a runtime exception if not overridden. + virtual int64_t getSize() const { throw IcebergError("getSize() not implemented"); } + + /// \brief Get the size of the file (asynchronous). + /// + /// Default implementation wraps `getSize()` in a future. + virtual std::future getSizeAsync() const { + return std::async(std::launch::deferred, [this] { return getSize(); }); + } + + /// \brief Write data to the file. + /// + /// \param offset The starting offset of the write operation. + /// \param buffer The buffer address containing the bytes to write. + /// \param length The number of bytes to write. + /// \return The actual number of bytes written. + virtual int64_t write(int64_t offset, const void* buffer, int64_t length) { + throw IcebergError("write() not implemented"); + } + + /// \brief Write data to the file (asynchronous). + /// + /// \param offset The starting offset of the write operation. + /// \param buffer The buffer address containing the bytes to write. + /// \param length The number of bytes to write. + /// \return A future resolving to the actual number of bytes written. + virtual std::future writeAsync(int64_t offset, const void* buffer, + int64_t length) { + return std::async(std::launch::deferred, [this, offset, buffer, length] { + return write(offset, buffer, length); + }); + } + + /// \brief Flush buffered data to the file. + /// + /// Default implementation does nothing if not overridden. + virtual void flush() {} + + /// \brief Flush buffered data to the file (asynchronous). + /// + /// Default implementation wraps `flush()` in a future. + virtual std::future flushAsync() { + return std::async(std::launch::deferred, [this] { flush(); }); + } +}; + +} // namespace iceberg diff --git a/test/io/CMakeLists.txt b/test/io/CMakeLists.txt new file mode 100644 index 00000000..07d19fe9 --- /dev/null +++ b/test/io/CMakeLists.txt @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +if(ICEBERG_IO) + add_executable(fs_file_io_test) + target_sources(fs_file_io_test PRIVATE fs_file_io_test.cc) + target_link_libraries(fs_file_io_test PRIVATE iceberg_io_static GTest::gtest_main) + target_include_directories(fs_file_io_test PRIVATE "${ICEBERG_INCLUDES}") + add_test(NAME fs_file_io_test COMMAND fs_file_io_test) +endif() diff --git a/test/io/fs_file_io_test.cc b/test/io/fs_file_io_test.cc new file mode 100644 index 00000000..883f76fa --- /dev/null +++ b/test/io/fs_file_io_test.cc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/io/fs_file_io.h" + +#include + +#include + +class FsFileIOTest : public testing::Test { + protected: + void SetUp() override { fs = std::make_shared("fs file io"); } + + std::shared_ptr fs; + std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; +}; + +TEST_F(FsFileIOTest, newOutputFile) { + auto out = fs->newOutputFile(tmpfile.string()); + out->create(); + ASSERT_EQ(out->location(), tmpfile.string()); +} + +TEST_F(FsFileIOTest, newInputFile) { + auto in = fs->newInputFile(tmpfile.string()); + ASSERT_TRUE(in->exists()); + ASSERT_EQ(in->location(), tmpfile.string()); + fs->DeleteFile(in->location()); +} From 2e6d8d996aa34314457287b6e098daf8262c7fb4 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sun, 23 Mar 2025 11:24:07 +0800 Subject: [PATCH 02/10] fix cpp linter Signed-off-by: Junwang Zhao --- src/iceberg/reader.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/iceberg/reader.h b/src/iceberg/reader.h index a85e56ba..47e4d8ff 100644 --- a/src/iceberg/reader.h +++ b/src/iceberg/reader.h @@ -29,7 +29,6 @@ namespace iceberg { /// \brief Interface for reading bytes from a file. class ICEBERG_EXPORT Reader { public: - Reader() = default; virtual ~Reader() = default; /// \brief Get the size of the file. From 1f8ed57eb7cd6edf530abe4dec74e9af288844a4 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Thu, 27 Mar 2025 09:45:34 +0800 Subject: [PATCH 03/10] FileIO only deal with metadata files --- .../IcebergThirdpartyToolchain.cmake | 2 +- src/iceberg/CMakeLists.txt | 3 +- src/iceberg/arrow/CMakeLists.txt | 2 + .../arrow/arrow_error_transform_internal.h | 60 ++++++++++ {test => src/iceberg/arrow}/io/CMakeLists.txt | 8 +- src/iceberg/arrow/io/local_file_io.cc | 76 +++++++++++++ src/iceberg/arrow/io/local_file_io.h | 57 ++++++++++ src/iceberg/error.h | 6 + src/iceberg/file_io.h | 103 ++++++----------- src/iceberg/io/CMakeLists.txt | 61 ---------- src/iceberg/io/fs_file_io.cc | 105 ------------------ src/iceberg/io/fs_file_io.h | 94 ---------------- src/iceberg/io/fs_file_reader.cc | 81 -------------- src/iceberg/io/fs_file_reader.h | 65 ----------- src/iceberg/io/fs_file_writer.cc | 89 --------------- src/iceberg/io/fs_file_writer.h | 83 -------------- src/iceberg/reader.h | 67 ----------- src/iceberg/writer.h | 82 -------------- test/CMakeLists.txt | 9 +- test/io/fs_file_io_test.cc | 45 -------- test/local_file_io_test.cc | 58 ++++++++++ 21 files changed, 301 insertions(+), 855 deletions(-) create mode 100644 src/iceberg/arrow/arrow_error_transform_internal.h rename {test => src/iceberg/arrow}/io/CMakeLists.txt (70%) create mode 100644 src/iceberg/arrow/io/local_file_io.cc create mode 100644 src/iceberg/arrow/io/local_file_io.h delete mode 100644 src/iceberg/io/CMakeLists.txt delete mode 100644 src/iceberg/io/fs_file_io.cc delete mode 100644 src/iceberg/io/fs_file_io.h delete mode 100644 src/iceberg/io/fs_file_reader.cc delete mode 100644 src/iceberg/io/fs_file_reader.h delete mode 100644 src/iceberg/io/fs_file_writer.cc delete mode 100644 src/iceberg/io/fs_file_writer.h delete mode 100644 src/iceberg/reader.h delete mode 100644 src/iceberg/writer.h delete mode 100644 test/io/fs_file_io_test.cc create mode 100644 test/local_file_io_test.cc diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index e361f1e2..783d99c4 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -70,7 +70,7 @@ function(resolve_arrow_dependency) ON CACHE BOOL "" FORCE) set(ARROW_FILESYSTEM - OFF + ON CACHE BOOL "" FORCE) set(ARROW_SIMD_LEVEL "NONE" diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 81765224..f14831f1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -57,7 +57,8 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc avro/demo_avro.cc) + set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/local_file_io.cc + avro/demo_avro.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/arrow/CMakeLists.txt b/src/iceberg/arrow/CMakeLists.txt index 3416d5e9..3f3746f2 100644 --- a/src/iceberg/arrow/CMakeLists.txt +++ b/src/iceberg/arrow/CMakeLists.txt @@ -15,4 +15,6 @@ # specific language governing permissions and limitations # under the License. +add_subdirectory(io) + iceberg_install_all_headers(iceberg/arrow) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h new file mode 100644 index 00000000..fe228cd6 --- /dev/null +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/error.h" +#include "iceberg/expected.h" + +namespace iceberg::arrow::internal { + +inline ErrorKind ToErrorKind(const ::arrow::Status& status) { + switch (status.code()) { + case ::arrow::StatusCode::IOError: + return ErrorKind::kIOError; + default: + return ErrorKind::kUnknownError; + } +} + +#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \ + auto&& result_name = (rexpr); \ + if (!result_name.ok()) { \ + return unexpected( \ + Error(error_transform(result_name.status()), result_name.status().ToString())); \ + } \ + lhs = std::move(result_name).ValueOrDie(); + +#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \ + ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ + internal::ToErrorKind) + +#define ICEBERG_INTERNAL_RETURN_NOT_OK(expr) \ + do { \ + auto&& _status = (expr); \ + if (!_status.ok()) { \ + return unexpected(Error(internal::ToErrorKind(_status), _status.ToString())); \ + } \ + } while (0) + +} // namespace iceberg::arrow::internal diff --git a/test/io/CMakeLists.txt b/src/iceberg/arrow/io/CMakeLists.txt similarity index 70% rename from test/io/CMakeLists.txt rename to src/iceberg/arrow/io/CMakeLists.txt index 07d19fe9..b0c0059e 100644 --- a/test/io/CMakeLists.txt +++ b/src/iceberg/arrow/io/CMakeLists.txt @@ -15,10 +15,4 @@ # specific language governing permissions and limitations # under the License. -if(ICEBERG_IO) - add_executable(fs_file_io_test) - target_sources(fs_file_io_test PRIVATE fs_file_io_test.cc) - target_link_libraries(fs_file_io_test PRIVATE iceberg_io_static GTest::gtest_main) - target_include_directories(fs_file_io_test PRIVATE "${ICEBERG_INCLUDES}") - add_test(NAME fs_file_io_test COMMAND fs_file_io_test) -endif() +iceberg_install_all_headers(iceberg/arrow/io) diff --git a/src/iceberg/arrow/io/local_file_io.cc b/src/iceberg/arrow/io/local_file_io.cc new file mode 100644 index 00000000..97621632 --- /dev/null +++ b/src/iceberg/arrow/io/local_file_io.cc @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow/io/local_file_io.h" + +#include + +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" + +namespace iceberg::arrow::io { + +/// \brief Read the content of the file at the given location. +expected LocalFileIO::ReadFile(const std::string& file_location, + std::optional length) { + // We don't support reading a file with a specific length. + if (length.has_value()) { + return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported")); + } + std::string content; + ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, local_fs_->OpenInputFile(file_location)); + ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); + + content.resize(file_size); + ICEBERG_INTERNAL_ASSIGN_OR_RETURN( + auto read_length, + file->ReadAt(0, file_size, reinterpret_cast(&content[0]))); + + return content; +} + +/// \brief Write the given content to the file at the given location. +expected LocalFileIO::WriteFile(const std::string& file_location, + std::string_view content, bool overwrite) { + if (!overwrite && FileExists(file_location)) { + return unexpected(Error(ErrorKind::kAlreadyExists, "")); + } + ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, + local_fs_->OpenOutputStream(file_location)); + ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size())); + return {}; +} + +/// \brief Delete a file at the given location. +expected LocalFileIO::DeleteFile(const std::string& file_location) { + if (!FileExists(file_location)) { + return unexpected(Error(ErrorKind::kNoSuchFile, + std::format("File {} does not exist", file_location))); + } + ICEBERG_INTERNAL_RETURN_NOT_OK(local_fs_->DeleteFile(file_location)); + return {}; +} + +bool LocalFileIO::FileExists(const std::string& location) { + // ::arrow::fs::LocalFileSystem does not have a exists method. + return std::filesystem::exists(location); +} + +} // namespace iceberg::arrow::io diff --git a/src/iceberg/arrow/io/local_file_io.h b/src/iceberg/arrow/io/local_file_io.h new file mode 100644 index 00000000..0645db87 --- /dev/null +++ b/src/iceberg/arrow/io/local_file_io.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::arrow::io { + +/// \brief A concrete implementation of FileIO for file system. +class ICEBERG_BUNDLE_EXPORT LocalFileIO : public FileIO { + public: + explicit LocalFileIO(std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs) + : local_fs_(local_fs) {} + + ~LocalFileIO() override = default; + + /// \brief Read the content of the file at the given location. + expected ReadFile(const std::string& file_location, + std::optional length) override; + + /// \brief Write the given content to the file at the given location. + expected WriteFile(const std::string& file_location, + std::string_view content, bool overwrite) override; + + /// \brief Delete a file at the given location. + expected DeleteFile(const std::string& file_location) override; + + private: + /// \brief Check if a file exists + bool FileExists(const std::string& location); + + std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs_; +}; + +} // namespace iceberg::arrow::io diff --git a/src/iceberg/error.h b/src/iceberg/error.h index 066ef87f..0acd0b9d 100644 --- a/src/iceberg/error.h +++ b/src/iceberg/error.h @@ -30,16 +30,22 @@ namespace iceberg { enum class ErrorKind { kNoSuchNamespace, kAlreadyExists, + kNoSuchFile, kNoSuchTable, kCommitStateUnknown, kInvalidSchema, kInvalidArgument, + kIOError, + kUnknownError, }; /// \brief Error with a kind and a message. struct ICEBERG_EXPORT [[nodiscard]] Error { ErrorKind kind; std::string message; + + explicit Error(ErrorKind kind, std::string message) + : kind(kind), message(std::move(message)) {}; }; } // namespace iceberg diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 08600bc2..94375507 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -19,92 +19,51 @@ #pragma once -#include +#include #include +#include +#include "iceberg/error.h" +#include "iceberg/expected.h" #include "iceberg/iceberg_export.h" -#include "iceberg/reader.h" -#include "iceberg/writer.h" namespace iceberg { -/// \brief An interface used to read input files using Reader and AsyncReader -class ICEBERG_EXPORT InputFile { - public: - explicit InputFile(std::string location) : location_(std::move(location)) {} - - virtual ~InputFile() = default; - - /// \brief Checks whether the file exists. - virtual bool exists() const = 0; - /// \brief Returns the total length of the file, in bytes. - virtual int64_t getLength() const = 0; - - /// \brief Get a Reader instance to read bytes from the file. - virtual std::unique_ptr newReader() = 0; - - /// \brief Get the file location - const std::string& location() const { return location_; } - - protected: - std::string location_; -}; - -/// \brief An interface used to write output files using Writer and AsyncWriter -class ICEBERG_EXPORT OutputFile { - public: - explicit OutputFile(std::string location) : location_(std::move(location)) {} - - virtual ~OutputFile() = default; - - /// \brief Create the file. - /// - /// If the file exists, or an error will be thrown. - virtual void create() = 0; - - /// \brief Get a Writer instance to write bytes to the file. - virtual std::unique_ptr newWriter() = 0; - - /// \brief Get the file location - const std::string& location() const { return location_; } - - /// \brief Return an InputFile for the location of this OutputFile. - virtual std::shared_ptr toInputFile() const = 0; - - protected: - std::string location_; -}; - -/// \brief Pluggable module for reading, writing, and deleting files. +/// \brief Pluggable module for reading, writing, and deleting metadata files. /// -/// Both table metadata files and data files can be written and read by this module. +/// This module only handle metadata files, not data files. The metadata files +/// are typically small and are used to store schema, partition information, +/// and other metadata about the table. class ICEBERG_EXPORT FileIO { public: - explicit FileIO(std::string name) : name_(std::move(name)) {} - + FileIO() = default; virtual ~FileIO() = default; - /// \brief Get an InputFile + /// \brief Read the content of the file at the given location. /// - /// Get a InputFile instance to read bytes from the file at the given location. - virtual std::shared_ptr newInputFile(const std::string& location) = 0; - - /// \brief Get an OutputFile + /// \param file_location The location of the file to read. + /// \param length The number of bytes to read. Some object storage need to specify + /// the length to read, e.g. S3 `GetObject` has a Range parameter. + /// \return The content of the file if the read succeeded, an error code if the read + /// failed. + virtual expected ReadFile(const std::string& file_location, + std::optional length) = 0; + + /// \brief Write the given content to the file at the given location. /// - /// Get a OutputFile instance to write bytes to the file at the given location. - virtual std::shared_ptr newOutputFile(const std::string& location) = 0; - - /// \brief Delete file + /// \param file_location The location of the file to write. + /// \param content The content to write to the file. + /// \param overwrite If true, overwrite the file if it exists. If false, fail if the + /// file exists. + /// \return void if the write succeeded, an error code if the write failed. + virtual expected WriteFile(const std::string& file_location, + std::string_view content, bool overwrite) = 0; + + /// \brief Delete a file at the given location. /// - /// Delete the file at the given location. - virtual void DeleteFile(const std::string& location) = 0; - void DeleteFile(const InputFile& ifile) { return DeleteFile(ifile.location()); } - void DeleteFile(const OutputFile& ofile) { return DeleteFile(ofile.location()); } - - const std::string& name() const { return name_; } - - protected: - std::string name_; + /// \param file_location The location of the file to delete. + /// \return void if the delete succeeded, an error code if the delete failed. + virtual expected DeleteFile(const std::string& file_location) = 0; }; } // namespace iceberg diff --git a/src/iceberg/io/CMakeLists.txt b/src/iceberg/io/CMakeLists.txt deleted file mode 100644 index bce36223..00000000 --- a/src/iceberg/io/CMakeLists.txt +++ /dev/null @@ -1,61 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -if(NOT ICEBERG_IO) - return() -endif() - -set(ICEBERG_IO_SOURCES fs_file_io.cc fs_file_reader.cc fs_file_writer.cc) -set(ICEBERG_IO_INCLUDES "${ICEBERG_INCLUDES}") - -# Libraries to link with exported libiceberg_io.{so,a}. -set(ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS) -set(ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS) -set(ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS) -set(ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS) - -list(APPEND ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS - "$,iceberg_static,iceberg_shared>") -list(APPEND ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS - "$,iceberg_shared,iceberg_static>") -list(APPEND - ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS - "$,Iceberg::iceberg_static,Iceberg::iceberg_shared>" -) -list(APPEND - ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS - "$,Iceberg::iceberg_shared,Iceberg::iceberg_static>" -) - -add_iceberg_lib(iceberg_io - SOURCES - ${ICEBERG_IO_SOURCES} - PRIVATE_INCLUDES - ${ICEBERG_IO_INCLUDES} - SHARED_LINK_LIBS - ${ICEBERG_IO_SHARED_BUILD_INTERFACE_LIBS} - STATIC_LINK_LIBS - ${ICEBERG_IO_STATIC_BUILD_INTERFACE_LIBS} - STATIC_INSTALL_INTERFACE_LIBS - ${ICEBERG_IO_STATIC_INSTALL_INTERFACE_LIBS} - SHARED_INSTALL_INTERFACE_LIBS - ${ICEBERG_IO_SHARED_INSTALL_INTERFACE_LIBS}) - -iceberg_install_all_headers(iceberg/io) - -install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_io_export.h - DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg/io) diff --git a/src/iceberg/io/fs_file_io.cc b/src/iceberg/io/fs_file_io.cc deleted file mode 100644 index 15d508ba..00000000 --- a/src/iceberg/io/fs_file_io.cc +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/io/fs_file_io.h" - -#include - -#include -#include -#include - -#include - -#include "iceberg/exception.h" -#include "iceberg/io/fs_file_reader.h" -#include "iceberg/io/fs_file_writer.h" - -namespace iceberg::io { - -bool FsInputFile::exists() const { return std::filesystem::exists(location()); } - -int64_t FsInputFile::getLength() const { - struct stat stat_buffer; - if (stat(location().c_str(), &stat_buffer) != 0) { - throw IcebergError(std::format( - "Failed to get file length. File does not exist or is inaccessible: {}", - location_)); - } - return stat_buffer.st_size; -} - -std::unique_ptr FsInputFile::newReader() { - return std::make_unique(location_); -} - -void FsOutputFile::create() { - // Check if the file already exists - std::ifstream existing_file(location_); - bool file_exists = existing_file.good(); - existing_file.close(); - - if (file_exists) { - throw IcebergError(std::format("File already exists: {}", location_)); - } - - // Create or overwrite the file by opening it in truncating mode - std::ofstream new_file(location_, std::ios::binary | std::ios::out | std::ios::trunc); - if (!new_file.is_open()) { - throw IcebergError(std::format("Failed to create or overwrite file: {}", location_)); - } - new_file.close(); -} - -std::unique_ptr FsOutputFile::newWriter() { - return std::make_unique(location_); -} - -std::shared_ptr FsFileIO::newInputFile(const std::string& location) { - // Check if the file exists - if (!fileExists(location)) { - throw IcebergError(std::format("InputFile does not exist: {}", location)); - } - - // Create and return an FsInputFile instance - return std::make_shared(location); -} - -std::shared_ptr FsFileIO::newOutputFile(const std::string& location) { - return std::make_shared(location); -} - -void FsFileIO::DeleteFile(const std::string& location) { - // Check if the file exists - if (!fileExists(location)) { - throw IcebergError(std::format("InputFile does not exist: {}", location)); - } - std::error_code ec; - if (std::filesystem::remove(location, ec) == false) { - throw IcebergError( - std::format("Failed to delete file: {}, error code: {}", location, ec.message())); - } -} - -bool FsFileIO::fileExists(const std::string& location) { - std::ifstream file(location); - return file.good(); -} - -} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_io.h b/src/iceberg/io/fs_file_io.h deleted file mode 100644 index 35440cdc..00000000 --- a/src/iceberg/io/fs_file_io.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include "iceberg/file_io.h" -#include "iceberg/io/iceberg_io_export.h" - -namespace iceberg::io { - -class ICEBERG_IO_EXPORT FsInputFile : public InputFile { - public: - explicit FsInputFile(std::string location) : InputFile(std::move(location)) {} - ~FsInputFile() override = default; - - /// \brief Checks whether the file exists. - bool exists() const override; - - /// \brief Returns the total length of the file, in bytes. - int64_t getLength() const override; - - /// \brief Get a Reader instance to read bytes from the file. - std::unique_ptr newReader() override; -}; - -class ICEBERG_IO_EXPORT FsOutputFile : public OutputFile { - public: - explicit FsOutputFile(std::string location) : OutputFile(std::move(location)) {} - ~FsOutputFile() override = default; - - /// \brief Create the file, optionally overwriting if it exists. - /// - /// If the file already exists, an exception is thrown. - void create() override; - - /// \brief Get a Writer instance to write bytes to the file. - /// - /// Returns a unique pointer to a `FsFileWriter`. - std::unique_ptr newWriter() override; - - /// \brief Return an InputFile for the location of this OutputFile. - /// - /// Creates an FsInputFile for reading the file pointed by this OutputFile's location. - std::shared_ptr toInputFile() const override { - return std::make_shared(location_); - } -}; - -/// \brief A concrete implementation of FileIO for file system. -class ICEBERG_IO_EXPORT FsFileIO : public FileIO { - public: - explicit FsFileIO(const std::string& name) : FileIO(name) {} - - ~FsFileIO() override = default; - - /// \brief Get an InputFile - /// - /// Returns a shared pointer to an FsInputFile instance, representing the file at - /// `location`. - std::shared_ptr newInputFile(const std::string& location) override; - - /// \brief Get an OutputFile - /// - /// Returns a shared pointer to an FsOutputFile instance, representing the file at - /// `location`. - std::shared_ptr newOutputFile(const std::string& location) override; - - /// \brief Delete file - /// - /// Deletes the file at the given location using file system operations. - void DeleteFile(const std::string& location) override; - - private: - /// \brief Check if a file exists - bool fileExists(const std::string& location); -}; - -} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_reader.cc b/src/iceberg/io/fs_file_reader.cc deleted file mode 100644 index 16337c4e..00000000 --- a/src/iceberg/io/fs_file_reader.cc +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/io/fs_file_reader.h" - -#include - -#include "iceberg/exception.h" - -namespace iceberg::io { - -FsFileReader::FsFileReader(std::string file_path) : file_path_(std::move(file_path)) { - // Open the file in binary mode - input_file_.open(file_path_, std::ios::binary | std::ios::in); - if (!input_file_.is_open()) { - throw IcebergError(std::format("Failed to open file: {}", file_path_)); - } - - // Calculate the file size - input_file_.seekg(0, std::ios::end); - file_size_ = input_file_.tellg(); - input_file_.seekg(0, std::ios::beg); - - if (file_size_ < 0) { - throw IcebergError(std::format("Failed to determine file size: {}", file_path_)); - } -} - -FsFileReader::~FsFileReader() { - if (input_file_.is_open()) { - input_file_.close(); - } -} - -int64_t FsFileReader::read(int64_t offset, int64_t length, void* buffer) { - if (!input_file_.is_open()) { - throw IcebergError("File is not open for reading"); - } - - if (offset < 0 || offset + length > file_size_) { - throw IcebergError( - std::format("Invalid read range: [{}, {})", offset, offset + length)); - } - - // Seek to the starting position - input_file_.seekg(offset, std::ios::beg); - if (input_file_.fail()) { - throw IcebergError(std::format("Failed to seek to offset: {}", offset)); - } - - // Read the data into the buffer - input_file_.read(static_cast(buffer), length); - auto bytes_read = static_cast(input_file_.gcount()); - - return bytes_read; // Return actual bytes read -} - -std::future FsFileReader::readAsync(int64_t offset, int64_t length, - void* buffer) { - return std::async(std::launch::async, [this, offset, length, buffer]() -> int64_t { - return this->read(offset, length, buffer); - }); -} - -} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_reader.h b/src/iceberg/io/fs_file_reader.h deleted file mode 100644 index 8ea0d2ec..00000000 --- a/src/iceberg/io/fs_file_reader.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include -#include - -#include "iceberg/io/iceberg_io_export.h" -#include "iceberg/reader.h" - -namespace iceberg::io { - -/// \brief A concrete implementation of Reader for file system-based reading. -class ICEBERG_IO_EXPORT FsFileReader : public Reader { - public: - explicit FsFileReader(std::string file_path); - - ~FsFileReader() override; - - // Disable copy constructor and assignment operator - FsFileReader(const FsFileReader&) = delete; - FsFileReader& operator=(const FsFileReader&) = delete; - - /// \brief Get the size of the file. - int64_t getSize() const override { return file_size_; } - - /// \brief Read a range of bytes from the file synchronously. - /// - /// \param offset The starting offset of the read operation. - /// \param length The number of bytes to read. - /// \param buffer The buffer address to write the bytes to. - /// \return The actual number of bytes read. - int64_t read(int64_t offset, int64_t length, void* buffer) override; - - /// \brief Read a range of bytes from the file asynchronously. - /// - /// \param offset The starting offset of the read operation. - /// \param length The number of bytes to read. - /// \param buffer The buffer address to write the bytes to. - /// \return A future resolving to the actual number of bytes read. - std::future readAsync(int64_t offset, int64_t length, void* buffer) override; - - private: - std::string file_path_; // Path to the file being read. - mutable std::ifstream input_file_; // File stream for reading. - int64_t file_size_{0}; // Size of the file in bytes. -}; - -} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_writer.cc b/src/iceberg/io/fs_file_writer.cc deleted file mode 100644 index 854d0150..00000000 --- a/src/iceberg/io/fs_file_writer.cc +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/io/fs_file_writer.h" - -#include "iceberg/exception.h" - -namespace iceberg::io { - -FsFileWriter::FsFileWriter(std::string file_path) : file_path_(std::move(file_path)) { - // Open the file in binary write mode, truncating any existing file - output_file_.open(file_path_, std::ios::binary | std::ios::out | std::ios::trunc); - if (!output_file_.is_open()) { - throw IcebergError(std::format("Failed to open file for writing: {}", file_path_)); - } - // Calculate the file size after opening the file - output_file_.seekp(0, std::ios::end); - file_size_ = output_file_.tellp(); -} - -FsFileWriter::~FsFileWriter() { - if (output_file_.is_open()) { - output_file_.close(); - } -} - -int64_t FsFileWriter::write(int64_t offset, const void* buffer, int64_t length) { - if (!output_file_.is_open()) { - throw IcebergError(std::format("File is not open for writing: {}", file_path_)); - } - - if (offset < 0) { - throw IcebergError( - std::format("Invalid write range. Offset must be non-negative: {}", offset)); - } - - // Seek the position to write - output_file_.seekp(offset, std::ios::beg); - if (output_file_.fail()) { - throw IcebergError(std::format("Failed to seek to offset: {}", offset)); - } - - // Write data to the file - output_file_.write(static_cast(buffer), length); - if (output_file_.fail()) { - throw IcebergError("Failed to write data to file."); - } - - // Update the file size based on the last written position - file_size_ = std::max(file_size_, offset + length); - - return length; // Return number of bytes successfully written -} - -std::future FsFileWriter::writeAsync(int64_t offset, const void* buffer, - int64_t length) { - return std::async(std::launch::async, [this, offset, buffer, length]() { - return this->write(offset, buffer, length); - }); -} - -void FsFileWriter::flush() { - if (!output_file_.is_open()) { - throw IcebergError(std::format("File is not open for flushing: {}", file_path_)); - } - - output_file_.flush(); - if (output_file_.fail()) { - throw IcebergError("Failed to flush data to file."); - } -} - -} // namespace iceberg::io diff --git a/src/iceberg/io/fs_file_writer.h b/src/iceberg/io/fs_file_writer.h deleted file mode 100644 index 0f68903d..00000000 --- a/src/iceberg/io/fs_file_writer.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include -#include -#include -#include - -#include "iceberg/io/iceberg_io_export.h" -#include "iceberg/writer.h" - -namespace iceberg::io { - -/// \brief A concrete implementation of Writer for file system files. -class ICEBERG_IO_EXPORT FsFileWriter : public Writer { - public: - explicit FsFileWriter(std::string file_path); - - ~FsFileWriter() override; - - // Disable copy constructor and assignment operator - FsFileWriter(const FsFileWriter&) = delete; - FsFileWriter& operator=(const FsFileWriter&) = delete; - - /// \brief Get the size of the file currently allocated. - /// - /// This implementation assumes size equals the last written offset + length of the - /// written bytes. - int64_t getSize() const override { return file_size_; } - - /// \brief Write data to the file synchronously. - /// - /// \param offset The starting offset of the write operation. - /// \param buffer The buffer address containing the bytes to write. - /// \param length The number of bytes to write. - /// \return The actual number of bytes written. - int64_t write(int64_t offset, const void* buffer, int64_t length) override; - - /// \brief Write data to the file asynchronously. - /// - /// \param offset The starting offset of the write operation. - /// \param buffer The buffer address containing the bytes to write. - /// \return A future resolving to the actual number of bytes written. - std::future writeAsync(int64_t offset, const void* buffer, - int64_t length) override; - - /// \brief Flush the written data to the file synchronously. - /// - /// This ensures all buffered data is committed to disk. - void flush() override; - - /// \brief Flush the written data to the file asynchronously. - /// - /// This ensures all buffered data is committed to disk. - std::future flushAsync() override { - return std::async(std::launch::async, [this]() { this->flush(); }); - } - - private: - std::string file_path_; // Path of the file being written to - mutable std::ofstream output_file_; // File stream for writing - int64_t file_size_; // Current file size after last write -}; - -} // namespace iceberg::io diff --git a/src/iceberg/reader.h b/src/iceberg/reader.h deleted file mode 100644 index 47e4d8ff..00000000 --- a/src/iceberg/reader.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/exception.h" -#include "iceberg/iceberg_export.h" - -namespace iceberg { - -/// \brief Interface for reading bytes from a file. -class ICEBERG_EXPORT Reader { - public: - virtual ~Reader() = default; - - /// \brief Get the size of the file. - virtual int64_t getSize() const { throw IcebergError("getSize() not implemented"); } - - /// \brief Get the size of the file (asynchronous). - /// - /// \return A future resolving to the file size in bytes. - virtual std::future getSizeAsync() const { - return std::async(std::launch::deferred, [this] { return getSize(); }); - } - - /// \brief Read a range of bytes from the file. - /// - /// \param offset The starting offset of the read operation. - /// \param length The number of bytes to read. - /// \param buffer The buffer address to write the bytes to. - /// \return The actual number of bytes read. - virtual int64_t read(int64_t offset, int64_t length, void* buffer) { - throw IcebergError("read() not implemented"); - } - - /// \brief Read a range of bytes from the file (asynchronous). - /// - /// \param offset The starting offset of the read operation. - /// \param length The number of bytes to read. - /// \param buffer The buffer address to write the bytes to. - /// \return A future resolving to the actual number of bytes read. - virtual std::future readAsync(int64_t offset, int64_t length, void* buffer) { - return std::async(std::launch::deferred, [this, offset, length, buffer] { - return read(offset, length, buffer); - }); - } -}; - -} // namespace iceberg diff --git a/src/iceberg/writer.h b/src/iceberg/writer.h deleted file mode 100644 index d52627fa..00000000 --- a/src/iceberg/writer.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include - -#include "iceberg/exception.h" -#include "iceberg/iceberg_export.h" - -namespace iceberg { - -/// \brief Interface for writing bytes to a file. -class ICEBERG_EXPORT Writer { - public: - virtual ~Writer() = default; - - /// \brief Get the size of the file currently allocated. - /// - /// Default implementation throws a runtime exception if not overridden. - virtual int64_t getSize() const { throw IcebergError("getSize() not implemented"); } - - /// \brief Get the size of the file (asynchronous). - /// - /// Default implementation wraps `getSize()` in a future. - virtual std::future getSizeAsync() const { - return std::async(std::launch::deferred, [this] { return getSize(); }); - } - - /// \brief Write data to the file. - /// - /// \param offset The starting offset of the write operation. - /// \param buffer The buffer address containing the bytes to write. - /// \param length The number of bytes to write. - /// \return The actual number of bytes written. - virtual int64_t write(int64_t offset, const void* buffer, int64_t length) { - throw IcebergError("write() not implemented"); - } - - /// \brief Write data to the file (asynchronous). - /// - /// \param offset The starting offset of the write operation. - /// \param buffer The buffer address containing the bytes to write. - /// \param length The number of bytes to write. - /// \return A future resolving to the actual number of bytes written. - virtual std::future writeAsync(int64_t offset, const void* buffer, - int64_t length) { - return std::async(std::launch::deferred, [this, offset, buffer, length] { - return write(offset, buffer, length); - }); - } - - /// \brief Flush buffered data to the file. - /// - /// Default implementation does nothing if not overridden. - virtual void flush() {} - - /// \brief Flush buffered data to the file (asynchronous). - /// - /// Default implementation wraps `flush()` in a future. - virtual std::future flushAsync() { - return std::async(std::launch::deferred, [this] { flush(); }); - } -}; - -} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 76006e0f..a6d8964d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -42,7 +42,12 @@ if(ICEBERG_BUILD_BUNDLE) add_executable(arrow_test) target_sources(arrow_test PRIVATE arrow_test.cc) - target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static - GTest::gtest_main GTest::gmock) + target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main) add_test(NAME arrow_test COMMAND arrow_test) + + add_executable(local_file_io_test) + target_sources(local_file_io_test PRIVATE local_file_io_test.cc) + target_link_libraries(local_file_io_test PRIVATE iceberg_bundle_static + GTest::gtest_main) + add_test(NAME local_file_io_test COMMAND local_file_io_test) endif() diff --git a/test/io/fs_file_io_test.cc b/test/io/fs_file_io_test.cc deleted file mode 100644 index 883f76fa..00000000 --- a/test/io/fs_file_io_test.cc +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/io/fs_file_io.h" - -#include - -#include - -class FsFileIOTest : public testing::Test { - protected: - void SetUp() override { fs = std::make_shared("fs file io"); } - - std::shared_ptr fs; - std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; -}; - -TEST_F(FsFileIOTest, newOutputFile) { - auto out = fs->newOutputFile(tmpfile.string()); - out->create(); - ASSERT_EQ(out->location(), tmpfile.string()); -} - -TEST_F(FsFileIOTest, newInputFile) { - auto in = fs->newInputFile(tmpfile.string()); - ASSERT_TRUE(in->exists()); - ASSERT_EQ(in->location(), tmpfile.string()); - fs->DeleteFile(in->location()); -} diff --git a/test/local_file_io_test.cc b/test/local_file_io_test.cc new file mode 100644 index 00000000..9b869d4a --- /dev/null +++ b/test/local_file_io_test.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow/io/local_file_io.h" + +#include + +#include + +class LocalFileIOTest : public testing::Test { + protected: + void SetUp() override { + local_fs_ = std::make_shared(); + file_io_ = std::make_shared(local_fs_); + } + + std::shared_ptr local_fs_; + std::shared_ptr file_io_; + std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; +}; + +TEST_F(LocalFileIOTest, readWriteFile) { + auto read_res = file_io_->ReadFile(tmpfile.string(), 1024); + EXPECT_EQ(read_res.error().kind, iceberg::ErrorKind::kInvalidArgument); + + read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + EXPECT_EQ(read_res.error().kind, iceberg::ErrorKind::kIOError); + + auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world", false); + EXPECT_TRUE(write_res.has_value()); + + read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + EXPECT_EQ(read_res.value(), "hello world"); +} + +TEST_F(LocalFileIOTest, deleteFile) { + auto del_res = file_io_->DeleteFile(tmpfile.string()); + EXPECT_TRUE(del_res.has_value()); + + del_res = file_io_->DeleteFile(tmpfile.string()); + EXPECT_EQ(del_res.error().kind, iceberg::ErrorKind::kNoSuchFile); +} From cd573fac72d7008363c47ca79f5d55131dd9f910 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Fri, 28 Mar 2025 15:31:59 +0800 Subject: [PATCH 04/10] fix windows linking error Signed-off-by: Junwang Zhao --- .../arrow/arrow_error_transform_internal.h | 7 ++++--- src/iceberg/arrow/io/local_file_io.cc | 3 ++- src/iceberg/demo.cc | 1 + src/iceberg/error.h | 4 +--- src/iceberg/file_io.h | 16 ++++++++++++++-- test/local_file_io_test.cc | 4 ++-- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index fe228cd6..e8687376 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -39,8 +39,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { #define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \ auto&& result_name = (rexpr); \ if (!result_name.ok()) { \ - return unexpected( \ - Error(error_transform(result_name.status()), result_name.status().ToString())); \ + return unexpected{{.kind = error_transform(result_name.status()), \ + .message = result_name.status().ToString()}}; \ } \ lhs = std::move(result_name).ValueOrDie(); @@ -53,7 +53,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { do { \ auto&& _status = (expr); \ if (!_status.ok()) { \ - return unexpected(Error(internal::ToErrorKind(_status), _status.ToString())); \ + return unexpected{ \ + {.kind = internal::ToErrorKind(_status), .message = _status.ToString()}}; \ } \ } while (0) diff --git a/src/iceberg/arrow/io/local_file_io.cc b/src/iceberg/arrow/io/local_file_io.cc index 97621632..002264fe 100644 --- a/src/iceberg/arrow/io/local_file_io.cc +++ b/src/iceberg/arrow/io/local_file_io.cc @@ -50,7 +50,8 @@ expected LocalFileIO::ReadFile(const std::string& file_locat expected LocalFileIO::WriteFile(const std::string& file_location, std::string_view content, bool overwrite) { if (!overwrite && FileExists(file_location)) { - return unexpected(Error(ErrorKind::kAlreadyExists, "")); + return unexpected( + Error(ErrorKind::kAlreadyExists, std::format("File {} exists", file_location))); } ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, local_fs_->OpenOutputStream(file_location)); diff --git a/src/iceberg/demo.cc b/src/iceberg/demo.cc index a9c0a031..aa1835b2 100644 --- a/src/iceberg/demo.cc +++ b/src/iceberg/demo.cc @@ -21,6 +21,7 @@ #include "iceberg/avro.h" // include to export symbols #include "iceberg/catalog.h" +#include "iceberg/file_io.h" #include "iceberg/location_provider.h" #include "iceberg/table.h" #include "iceberg/transaction.h" diff --git a/src/iceberg/error.h b/src/iceberg/error.h index 0acd0b9d..04c8857a 100644 --- a/src/iceberg/error.h +++ b/src/iceberg/error.h @@ -36,6 +36,7 @@ enum class ErrorKind { kInvalidSchema, kInvalidArgument, kIOError, + kNotImplemented, kUnknownError, }; @@ -43,9 +44,6 @@ enum class ErrorKind { struct ICEBERG_EXPORT [[nodiscard]] Error { ErrorKind kind; std::string message; - - explicit Error(ErrorKind kind, std::string message) - : kind(kind), message(std::move(message)) {}; }; } // namespace iceberg diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 94375507..ed9a2902 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -47,7 +47,13 @@ class ICEBERG_EXPORT FileIO { /// \return The content of the file if the read succeeded, an error code if the read /// failed. virtual expected ReadFile(const std::string& file_location, - std::optional length) = 0; + std::optional length) { + // The following line is to avoid Windows linker error LNK2019. + // If this function is defined as pure virtual function, the `unexpected` will + // not be instantiated and exported in libiceberg. + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}}; + } /// \brief Write the given content to the file at the given location. /// @@ -57,7 +63,13 @@ class ICEBERG_EXPORT FileIO { /// file exists. /// \return void if the write succeeded, an error code if the write failed. virtual expected WriteFile(const std::string& file_location, - std::string_view content, bool overwrite) = 0; + std::string_view content, bool overwrite) { + // The following line is to avoid Windows linker error LNK2019. + // If this function is defined as pure virtual function, the `unexpected` will + // not be instantiated and exported in libiceberg. + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}}; + } /// \brief Delete a file at the given location. /// diff --git a/test/local_file_io_test.cc b/test/local_file_io_test.cc index 9b869d4a..6d1057fd 100644 --- a/test/local_file_io_test.cc +++ b/test/local_file_io_test.cc @@ -35,7 +35,7 @@ class LocalFileIOTest : public testing::Test { std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; }; -TEST_F(LocalFileIOTest, readWriteFile) { +TEST_F(LocalFileIOTest, ReadWriteFile) { auto read_res = file_io_->ReadFile(tmpfile.string(), 1024); EXPECT_EQ(read_res.error().kind, iceberg::ErrorKind::kInvalidArgument); @@ -49,7 +49,7 @@ TEST_F(LocalFileIOTest, readWriteFile) { EXPECT_EQ(read_res.value(), "hello world"); } -TEST_F(LocalFileIOTest, deleteFile) { +TEST_F(LocalFileIOTest, DeleteFile) { auto del_res = file_io_->DeleteFile(tmpfile.string()); EXPECT_TRUE(del_res.has_value()); From fe5e23aa8a5b906b25f0cbb05b99e8cd3c5179fb Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 31 Mar 2025 20:24:05 +0800 Subject: [PATCH 05/10] fix review comments Signed-off-by: Junwang Zhao --- src/iceberg/CMakeLists.txt | 2 +- .../arrow/arrow_error_transform_internal.h | 4 ++ .../{local_file_io.cc => arrow_fs_file_io.cc} | 41 +++++++++---------- .../{local_file_io.h => arrow_fs_file_io.h} | 17 ++++---- src/iceberg/error.h | 1 - src/iceberg/file_io.h | 2 +- test/CMakeLists.txt | 10 ++--- ...le_io_test.cc => arrow_fs_file_io_test.cc} | 7 ++-- 8 files changed, 41 insertions(+), 43 deletions(-) rename src/iceberg/arrow/io/{local_file_io.cc => arrow_fs_file_io.cc} (61%) rename src/iceberg/arrow/io/{local_file_io.h => arrow_fs_file_io.h} (77%) rename test/{local_file_io_test.cc => arrow_fs_file_io_test.cc} (89%) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index f14831f1..bf264640 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -57,7 +57,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/local_file_io.cc + set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/arrow_fs_file_io.cc avro/demo_avro.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index e8687376..4992b5e2 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -27,6 +27,8 @@ namespace iceberg::arrow::internal { +namespace { + inline ErrorKind ToErrorKind(const ::arrow::Status& status) { switch (status.code()) { case ::arrow::StatusCode::IOError: @@ -44,6 +46,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } \ lhs = std::move(result_name).ValueOrDie(); +} // namespace + #define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \ ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \ ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ diff --git a/src/iceberg/arrow/io/local_file_io.cc b/src/iceberg/arrow/io/arrow_fs_file_io.cc similarity index 61% rename from src/iceberg/arrow/io/local_file_io.cc rename to src/iceberg/arrow/io/arrow_fs_file_io.cc index 002264fe..6f5cd5ac 100644 --- a/src/iceberg/arrow/io/local_file_io.cc +++ b/src/iceberg/arrow/io/arrow_fs_file_io.cc @@ -17,7 +17,7 @@ * under the License. */ -#include "iceberg/arrow/io/local_file_io.h" +#include "iceberg/arrow/io/arrow_fs_file_io.h" #include @@ -28,14 +28,14 @@ namespace iceberg::arrow::io { /// \brief Read the content of the file at the given location. -expected LocalFileIO::ReadFile(const std::string& file_location, - std::optional length) { +expected ArrowFileSystemFileIO::ReadFile( + const std::string& file_location, std::optional length) { // We don't support reading a file with a specific length. if (length.has_value()) { return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported")); } std::string content; - ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, local_fs_->OpenInputFile(file_location)); + ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_location)); ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); content.resize(file_size); @@ -47,31 +47,28 @@ expected LocalFileIO::ReadFile(const std::string& file_locat } /// \brief Write the given content to the file at the given location. -expected LocalFileIO::WriteFile(const std::string& file_location, - std::string_view content, bool overwrite) { - if (!overwrite && FileExists(file_location)) { - return unexpected( - Error(ErrorKind::kAlreadyExists, std::format("File {} exists", file_location))); - } +expected ArrowFileSystemFileIO::WriteFile(const std::string& file_location, + std::string_view content, + bool overwrite) { + // auto file_info = arrow_fs_->GetFileInfo(file_location); + // if (file_info.status().ok() && !overwrite) { + // return unexpected( + // Error(ErrorKind::kAlreadyExists, std::format("File {} exists", + // file_location))); + // } ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, - local_fs_->OpenOutputStream(file_location)); + arrow_fs_->OpenOutputStream(file_location)); ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size())); + ICEBERG_INTERNAL_RETURN_NOT_OK(file->Flush()); + ICEBERG_INTERNAL_RETURN_NOT_OK(file->Close()); return {}; } /// \brief Delete a file at the given location. -expected LocalFileIO::DeleteFile(const std::string& file_location) { - if (!FileExists(file_location)) { - return unexpected(Error(ErrorKind::kNoSuchFile, - std::format("File {} does not exist", file_location))); - } - ICEBERG_INTERNAL_RETURN_NOT_OK(local_fs_->DeleteFile(file_location)); +expected ArrowFileSystemFileIO::DeleteFile( + const std::string& file_location) { + ICEBERG_INTERNAL_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location)); return {}; } -bool LocalFileIO::FileExists(const std::string& location) { - // ::arrow::fs::LocalFileSystem does not have a exists method. - return std::filesystem::exists(location); -} - } // namespace iceberg::arrow::io diff --git a/src/iceberg/arrow/io/local_file_io.h b/src/iceberg/arrow/io/arrow_fs_file_io.h similarity index 77% rename from src/iceberg/arrow/io/local_file_io.h rename to src/iceberg/arrow/io/arrow_fs_file_io.h index 0645db87..b84c412b 100644 --- a/src/iceberg/arrow/io/local_file_io.h +++ b/src/iceberg/arrow/io/arrow_fs_file_io.h @@ -21,20 +21,20 @@ #include -#include +#include #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" namespace iceberg::arrow::io { -/// \brief A concrete implementation of FileIO for file system. -class ICEBERG_BUNDLE_EXPORT LocalFileIO : public FileIO { +/// \brief A concrete implementation of FileIO for Arrow file system. +class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { public: - explicit LocalFileIO(std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs) - : local_fs_(local_fs) {} + explicit ArrowFileSystemFileIO(std::shared_ptr<::arrow::fs::FileSystem> arrow_fs) + : arrow_fs_(std::move(arrow_fs)) {} - ~LocalFileIO() override = default; + ~ArrowFileSystemFileIO() override = default; /// \brief Read the content of the file at the given location. expected ReadFile(const std::string& file_location, @@ -48,10 +48,7 @@ class ICEBERG_BUNDLE_EXPORT LocalFileIO : public FileIO { expected DeleteFile(const std::string& file_location) override; private: - /// \brief Check if a file exists - bool FileExists(const std::string& location); - - std::shared_ptr<::arrow::fs::LocalFileSystem>& local_fs_; + std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_; }; } // namespace iceberg::arrow::io diff --git a/src/iceberg/error.h b/src/iceberg/error.h index 04c8857a..dbf2ba01 100644 --- a/src/iceberg/error.h +++ b/src/iceberg/error.h @@ -30,7 +30,6 @@ namespace iceberg { enum class ErrorKind { kNoSuchNamespace, kAlreadyExists, - kNoSuchFile, kNoSuchTable, kCommitStateUnknown, kInvalidSchema, diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index ed9a2902..8bde2d1b 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -29,7 +29,7 @@ namespace iceberg { -/// \brief Pluggable module for reading, writing, and deleting metadata files. +/// \brief Pluggable module for reading, writing, and deleting files. /// /// This module only handle metadata files, not data files. The metadata files /// are typically small and are used to store schema, partition information, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a6d8964d..f42d773e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -45,9 +45,9 @@ if(ICEBERG_BUILD_BUNDLE) target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main) add_test(NAME arrow_test COMMAND arrow_test) - add_executable(local_file_io_test) - target_sources(local_file_io_test PRIVATE local_file_io_test.cc) - target_link_libraries(local_file_io_test PRIVATE iceberg_bundle_static - GTest::gtest_main) - add_test(NAME local_file_io_test COMMAND local_file_io_test) + add_executable(arrow_fs_file_io_test) + target_sources(arrow_fs_file_io_test PRIVATE arrow_fs_file_io_test.cc) + target_link_libraries(arrow_fs_file_io_test PRIVATE iceberg_bundle_static + GTest::gtest_main) + add_test(NAME arrow_fs_file_io_test COMMAND arrow_fs_file_io_test) endif() diff --git a/test/local_file_io_test.cc b/test/arrow_fs_file_io_test.cc similarity index 89% rename from test/local_file_io_test.cc rename to test/arrow_fs_file_io_test.cc index 6d1057fd..0e4f85d2 100644 --- a/test/local_file_io_test.cc +++ b/test/arrow_fs_file_io_test.cc @@ -17,17 +17,18 @@ * under the License. */ -#include "iceberg/arrow/io/local_file_io.h" +#include "iceberg/arrow/io/arrow_fs_file_io.h" #include +#include #include class LocalFileIOTest : public testing::Test { protected: void SetUp() override { local_fs_ = std::make_shared(); - file_io_ = std::make_shared(local_fs_); + file_io_ = std::make_shared(local_fs_); } std::shared_ptr local_fs_; @@ -54,5 +55,5 @@ TEST_F(LocalFileIOTest, DeleteFile) { EXPECT_TRUE(del_res.has_value()); del_res = file_io_->DeleteFile(tmpfile.string()); - EXPECT_EQ(del_res.error().kind, iceberg::ErrorKind::kNoSuchFile); + EXPECT_EQ(del_res.error().kind, iceberg::ErrorKind::kIOError); } From b6a8047a62a8fbeda5052f3863323e17a1053082 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 31 Mar 2025 20:51:42 +0800 Subject: [PATCH 06/10] cpp-linter complains about unnamed namespaces in header files Signed-off-by: Junwang Zhao --- src/iceberg/arrow/arrow_error_transform_internal.h | 4 ---- src/iceberg/arrow/io/arrow_fs_file_io.cc | 10 ++++------ test/CMakeLists.txt | 3 ++- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index 4992b5e2..e8687376 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -27,8 +27,6 @@ namespace iceberg::arrow::internal { -namespace { - inline ErrorKind ToErrorKind(const ::arrow::Status& status) { switch (status.code()) { case ::arrow::StatusCode::IOError: @@ -46,8 +44,6 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } \ lhs = std::move(result_name).ValueOrDie(); -} // namespace - #define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \ ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \ ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ diff --git a/src/iceberg/arrow/io/arrow_fs_file_io.cc b/src/iceberg/arrow/io/arrow_fs_file_io.cc index 6f5cd5ac..2e4a646a 100644 --- a/src/iceberg/arrow/io/arrow_fs_file_io.cc +++ b/src/iceberg/arrow/io/arrow_fs_file_io.cc @@ -50,12 +50,10 @@ expected ArrowFileSystemFileIO::ReadFile( expected ArrowFileSystemFileIO::WriteFile(const std::string& file_location, std::string_view content, bool overwrite) { - // auto file_info = arrow_fs_->GetFileInfo(file_location); - // if (file_info.status().ok() && !overwrite) { - // return unexpected( - // Error(ErrorKind::kAlreadyExists, std::format("File {} exists", - // file_location))); - // } + auto exists = arrow_fs_->OpenInputFile(file_location).ok(); + if (!overwrite && exists) { + return unexpected(Error(ErrorKind::kAlreadyExists, "File already exists")); + } ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location)); ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size())); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f42d773e..172ca3ba 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -42,7 +42,8 @@ if(ICEBERG_BUILD_BUNDLE) add_executable(arrow_test) target_sources(arrow_test PRIVATE arrow_test.cc) - target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main) + target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main + GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) add_executable(arrow_fs_file_io_test) From b590aac912d595993b7752ad8c409b55ea1c0b39 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 1 Apr 2025 11:10:37 +0800 Subject: [PATCH 07/10] adapt to custom matchers for expected Signed-off-by: Junwang Zhao --- src/iceberg/file_io.h | 9 +++++---- test/CMakeLists.txt | 2 +- test/arrow_fs_file_io_test.cc | 24 +++++++++++++++++------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 8bde2d1b..2fdd8781 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -49,8 +49,9 @@ class ICEBERG_EXPORT FileIO { virtual expected ReadFile(const std::string& file_location, std::optional length) { // The following line is to avoid Windows linker error LNK2019. - // If this function is defined as pure virtual function, the `unexpected` will - // not be instantiated and exported in libiceberg. + // If this function is defined as pure virtual function, the `expected` and `unexpected` will not be instantiated and exported in + // libiceberg. return unexpected{ {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}}; } @@ -65,8 +66,8 @@ class ICEBERG_EXPORT FileIO { virtual expected WriteFile(const std::string& file_location, std::string_view content, bool overwrite) { // The following line is to avoid Windows linker error LNK2019. - // If this function is defined as pure virtual function, the `unexpected` will - // not be instantiated and exported in libiceberg. + // If this function is defined as pure virtual function, the `expected` + // will not be instantiated and exported in libiceberg. return unexpected{ {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}}; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 172ca3ba..73874c9d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,6 +49,6 @@ if(ICEBERG_BUILD_BUNDLE) add_executable(arrow_fs_file_io_test) target_sources(arrow_fs_file_io_test PRIVATE arrow_fs_file_io_test.cc) target_link_libraries(arrow_fs_file_io_test PRIVATE iceberg_bundle_static - GTest::gtest_main) + GTest::gtest_main GTest::gmock) add_test(NAME arrow_fs_file_io_test COMMAND arrow_fs_file_io_test) endif() diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc index 0e4f85d2..2c17afdd 100644 --- a/test/arrow_fs_file_io_test.cc +++ b/test/arrow_fs_file_io_test.cc @@ -24,36 +24,46 @@ #include #include +#include "matchers.h" + +namespace iceberg { + class LocalFileIOTest : public testing::Test { protected: void SetUp() override { - local_fs_ = std::make_shared(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); file_io_ = std::make_shared(local_fs_); } - std::shared_ptr local_fs_; + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; std::shared_ptr file_io_; std::filesystem::path tmpfile = std::filesystem::temp_directory_path() / "123.txt"; }; TEST_F(LocalFileIOTest, ReadWriteFile) { auto read_res = file_io_->ReadFile(tmpfile.string(), 1024); - EXPECT_EQ(read_res.error().kind, iceberg::ErrorKind::kInvalidArgument); + EXPECT_THAT(read_res, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(read_res, HasErrorMessage("Length is not supported")); read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); - EXPECT_EQ(read_res.error().kind, iceberg::ErrorKind::kIOError); + EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(read_res, HasErrorMessage("No such file or directory")); auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world", false); - EXPECT_TRUE(write_res.has_value()); + EXPECT_THAT(write_res, IsOk()); read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + EXPECT_THAT(read_res, IsOk()); EXPECT_EQ(read_res.value(), "hello world"); } TEST_F(LocalFileIOTest, DeleteFile) { auto del_res = file_io_->DeleteFile(tmpfile.string()); - EXPECT_TRUE(del_res.has_value()); + EXPECT_THAT(del_res, IsOk()); del_res = file_io_->DeleteFile(tmpfile.string()); - EXPECT_EQ(del_res.error().kind, iceberg::ErrorKind::kIOError); + EXPECT_THAT(del_res, IsError(ErrorKind::kIOError)); + EXPECT_THAT(del_res, HasErrorMessage("No such file or directory")); } + +} // namespace iceberg From 11e1da0c2d5ef48c908747acb87abd789984add6 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 1 Apr 2025 12:31:36 +0800 Subject: [PATCH 08/10] fix more review comments Signed-off-by: Junwang Zhao --- src/iceberg/CMakeLists.txt | 2 +- src/iceberg/arrow/CMakeLists.txt | 2 - .../arrow/arrow_error_transform_internal.h | 18 +++---- .../arrow/{io => }/arrow_fs_file_io.cc | 47 +++++++++---------- src/iceberg/arrow/{io => }/arrow_fs_file_io.h | 6 +-- src/iceberg/arrow/io/CMakeLists.txt | 18 ------- src/iceberg/file_io.h | 19 ++++---- test/CMakeLists.txt | 8 +--- test/arrow_fs_file_io_test.cc | 14 ++---- 9 files changed, 52 insertions(+), 82 deletions(-) rename src/iceberg/arrow/{io => }/arrow_fs_file_io.cc (57%) rename src/iceberg/arrow/{io => }/arrow_fs_file_io.h (92%) delete mode 100644 src/iceberg/arrow/io/CMakeLists.txt diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index bf264640..959e15db 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -57,7 +57,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) if(ICEBERG_BUILD_BUNDLE) - set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/io/arrow_fs_file_io.cc + set(ICEBERG_BUNDLE_SOURCES arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc avro/demo_avro.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. diff --git a/src/iceberg/arrow/CMakeLists.txt b/src/iceberg/arrow/CMakeLists.txt index 3f3746f2..3416d5e9 100644 --- a/src/iceberg/arrow/CMakeLists.txt +++ b/src/iceberg/arrow/CMakeLists.txt @@ -15,6 +15,4 @@ # specific language governing permissions and limitations # under the License. -add_subdirectory(io) - iceberg_install_all_headers(iceberg/arrow) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index e8687376..c5fb9094 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -36,20 +36,20 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } } -#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \ - auto&& result_name = (rexpr); \ - if (!result_name.ok()) { \ - return unexpected{{.kind = error_transform(result_name.status()), \ - .message = result_name.status().ToString()}}; \ - } \ +#define ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL(result_name, lhs, rexpr, error_transform) \ + auto&& result_name = (rexpr); \ + if (!result_name.ok()) { \ + return unexpected{{.kind = error_transform(result_name.status()), \ + .message = result_name.status().ToString()}}; \ + } \ lhs = std::move(result_name).ValueOrDie(); -#define ICEBERG_INTERNAL_ASSIGN_OR_RETURN(lhs, rexpr) \ - ICEBERG_INTERNAL_ASSIGN_OR_RETURN_IMPL( \ +#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ internal::ToErrorKind) -#define ICEBERG_INTERNAL_RETURN_NOT_OK(expr) \ +#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ do { \ auto&& _status = (expr); \ if (!_status.ok()) { \ diff --git a/src/iceberg/arrow/io/arrow_fs_file_io.cc b/src/iceberg/arrow/arrow_fs_file_io.cc similarity index 57% rename from src/iceberg/arrow/io/arrow_fs_file_io.cc rename to src/iceberg/arrow/arrow_fs_file_io.cc index 2e4a646a..270ecb78 100644 --- a/src/iceberg/arrow/io/arrow_fs_file_io.cc +++ b/src/iceberg/arrow/arrow_fs_file_io.cc @@ -17,56 +17,55 @@ * under the License. */ -#include "iceberg/arrow/io/arrow_fs_file_io.h" - -#include +#include "iceberg/arrow/arrow_fs_file_io.h" #include #include "iceberg/arrow/arrow_error_transform_internal.h" -namespace iceberg::arrow::io { +namespace iceberg::arrow { /// \brief Read the content of the file at the given location. expected ArrowFileSystemFileIO::ReadFile( const std::string& file_location, std::optional length) { - // We don't support reading a file with a specific length. + ::arrow::fs::FileInfo file_info(file_location); if (length.has_value()) { - return unexpected(Error(ErrorKind::kInvalidArgument, "Length is not supported")); + file_info.set_size(length.value()); } std::string content; - ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_location)); - ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); content.resize(file_size); - ICEBERG_INTERNAL_ASSIGN_OR_RETURN( - auto read_length, - file->ReadAt(0, file_size, reinterpret_cast(&content[0]))); + size_t remain = file_size; + size_t offset = 0; + while (remain > 0) { + size_t read_length = std::min(remain, static_cast(1024 * 1024)); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto read_bytes, + file->Read(read_length, reinterpret_cast(&content[offset]))); + remain -= read_bytes; + offset += read_bytes; + } return content; } /// \brief Write the given content to the file at the given location. expected ArrowFileSystemFileIO::WriteFile(const std::string& file_location, - std::string_view content, - bool overwrite) { - auto exists = arrow_fs_->OpenInputFile(file_location).ok(); - if (!overwrite && exists) { - return unexpected(Error(ErrorKind::kAlreadyExists, "File already exists")); - } - ICEBERG_INTERNAL_ASSIGN_OR_RETURN(auto file, - arrow_fs_->OpenOutputStream(file_location)); - ICEBERG_INTERNAL_RETURN_NOT_OK(file->Write(content.data(), content.size())); - ICEBERG_INTERNAL_RETURN_NOT_OK(file->Flush()); - ICEBERG_INTERNAL_RETURN_NOT_OK(file->Close()); + std::string_view content) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location)); + ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size())); + ICEBERG_ARROW_RETURN_NOT_OK(file->Flush()); + ICEBERG_ARROW_RETURN_NOT_OK(file->Close()); return {}; } /// \brief Delete a file at the given location. expected ArrowFileSystemFileIO::DeleteFile( const std::string& file_location) { - ICEBERG_INTERNAL_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location)); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location)); return {}; } -} // namespace iceberg::arrow::io +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/io/arrow_fs_file_io.h b/src/iceberg/arrow/arrow_fs_file_io.h similarity index 92% rename from src/iceberg/arrow/io/arrow_fs_file_io.h rename to src/iceberg/arrow/arrow_fs_file_io.h index b84c412b..e79e75fe 100644 --- a/src/iceberg/arrow/io/arrow_fs_file_io.h +++ b/src/iceberg/arrow/arrow_fs_file_io.h @@ -26,7 +26,7 @@ #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" -namespace iceberg::arrow::io { +namespace iceberg::arrow { /// \brief A concrete implementation of FileIO for Arrow file system. class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { @@ -42,7 +42,7 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { /// \brief Write the given content to the file at the given location. expected WriteFile(const std::string& file_location, - std::string_view content, bool overwrite) override; + std::string_view content) override; /// \brief Delete a file at the given location. expected DeleteFile(const std::string& file_location) override; @@ -51,4 +51,4 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_; }; -} // namespace iceberg::arrow::io +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/io/CMakeLists.txt b/src/iceberg/arrow/io/CMakeLists.txt deleted file mode 100644 index b0c0059e..00000000 --- a/src/iceberg/arrow/io/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -iceberg_install_all_headers(iceberg/arrow/io) diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 2fdd8781..03922f65 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -34,6 +34,10 @@ namespace iceberg { /// This module only handle metadata files, not data files. The metadata files /// are typically small and are used to store schema, partition information, /// and other metadata about the table. +/// +/// Note that these functions are not atomic. For example, if a write fails, +/// the file may be partially written. Implementations should be careful to +/// avoid corrupting metadata files. class ICEBERG_EXPORT FileIO { public: FileIO() = default; @@ -48,10 +52,7 @@ class ICEBERG_EXPORT FileIO { /// failed. virtual expected ReadFile(const std::string& file_location, std::optional length) { - // The following line is to avoid Windows linker error LNK2019. - // If this function is defined as pure virtual function, the `expected` and `unexpected` will not be instantiated and exported in - // libiceberg. + // We provide a default implementation to avoid Windows linker error LNK2019. return unexpected{ {.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}}; } @@ -64,10 +65,7 @@ class ICEBERG_EXPORT FileIO { /// file exists. /// \return void if the write succeeded, an error code if the write failed. virtual expected WriteFile(const std::string& file_location, - std::string_view content, bool overwrite) { - // The following line is to avoid Windows linker error LNK2019. - // If this function is defined as pure virtual function, the `expected` - // will not be instantiated and exported in libiceberg. + std::string_view content) { return unexpected{ {.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}}; } @@ -76,7 +74,10 @@ class ICEBERG_EXPORT FileIO { /// /// \param file_location The location of the file to delete. /// \return void if the delete succeeded, an error code if the delete failed. - virtual expected DeleteFile(const std::string& file_location) = 0; + virtual expected DeleteFile(const std::string& file_location) { + return unexpected{ + {.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}}; + } }; } // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 73874c9d..aa8805fe 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -41,14 +41,8 @@ if(ICEBERG_BUILD_BUNDLE) add_test(NAME avro_test COMMAND avro_test) add_executable(arrow_test) - target_sources(arrow_test PRIVATE arrow_test.cc) + target_sources(arrow_test PRIVATE arrow_test.cc arrow_fs_file_io_test.cc) target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) - - add_executable(arrow_fs_file_io_test) - target_sources(arrow_fs_file_io_test PRIVATE arrow_fs_file_io_test.cc) - target_link_libraries(arrow_fs_file_io_test PRIVATE iceberg_bundle_static - GTest::gtest_main GTest::gmock) - add_test(NAME arrow_fs_file_io_test COMMAND arrow_fs_file_io_test) endif() diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc index 2c17afdd..a87bc649 100644 --- a/test/arrow_fs_file_io_test.cc +++ b/test/arrow_fs_file_io_test.cc @@ -17,7 +17,7 @@ * under the License. */ -#include "iceberg/arrow/io/arrow_fs_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io.h" #include @@ -32,7 +32,7 @@ class LocalFileIOTest : public testing::Test { protected: void SetUp() override { local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); - file_io_ = std::make_shared(local_fs_); + file_io_ = std::make_shared(local_fs_); } std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; @@ -41,20 +41,16 @@ class LocalFileIOTest : public testing::Test { }; TEST_F(LocalFileIOTest, ReadWriteFile) { - auto read_res = file_io_->ReadFile(tmpfile.string(), 1024); - EXPECT_THAT(read_res, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(read_res, HasErrorMessage("Length is not supported")); - - read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); + auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); EXPECT_THAT(read_res, HasErrorMessage("No such file or directory")); - auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world", false); + auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world"); EXPECT_THAT(write_res, IsOk()); read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); EXPECT_THAT(read_res, IsOk()); - EXPECT_EQ(read_res.value(), "hello world"); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello world"))); } TEST_F(LocalFileIOTest, DeleteFile) { From 496bdadd478a530813016e9099b53d4ea7eb5883 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 1 Apr 2025 14:30:28 +0800 Subject: [PATCH 09/10] fix windows ci Signed-off-by: Junwang Zhao --- test/arrow_fs_file_io_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/arrow_fs_file_io_test.cc b/test/arrow_fs_file_io_test.cc index a87bc649..193c13df 100644 --- a/test/arrow_fs_file_io_test.cc +++ b/test/arrow_fs_file_io_test.cc @@ -43,7 +43,7 @@ class LocalFileIOTest : public testing::Test { TEST_F(LocalFileIOTest, ReadWriteFile) { auto read_res = file_io_->ReadFile(tmpfile.string(), std::nullopt); EXPECT_THAT(read_res, IsError(ErrorKind::kIOError)); - EXPECT_THAT(read_res, HasErrorMessage("No such file or directory")); + EXPECT_THAT(read_res, HasErrorMessage("Failed to open local file")); auto write_res = file_io_->WriteFile(tmpfile.string(), "hello world"); EXPECT_THAT(write_res, IsOk()); @@ -59,7 +59,7 @@ TEST_F(LocalFileIOTest, DeleteFile) { del_res = file_io_->DeleteFile(tmpfile.string()); EXPECT_THAT(del_res, IsError(ErrorKind::kIOError)); - EXPECT_THAT(del_res, HasErrorMessage("No such file or directory")); + EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); } } // namespace iceberg From b8cf9a923e8a1bb84e3a75b54850350b272cbd2c Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 1 Apr 2025 16:15:19 +0800 Subject: [PATCH 10/10] remove useless internal namespace Signed-off-by: Junwang Zhao --- .../arrow/arrow_error_transform_internal.h | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index c5fb9094..588bab54 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -25,7 +25,7 @@ #include "iceberg/error.h" #include "iceberg/expected.h" -namespace iceberg::arrow::internal { +namespace iceberg::arrow { inline ErrorKind ToErrorKind(const ::arrow::Status& status) { switch (status.code()) { @@ -44,18 +44,17 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } \ lhs = std::move(result_name).ValueOrDie(); -#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ - ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ - ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ - internal::ToErrorKind) - -#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ - do { \ - auto&& _status = (expr); \ - if (!_status.ok()) { \ - return unexpected{ \ - {.kind = internal::ToErrorKind(_status), .message = _status.ToString()}}; \ - } \ +#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ + ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind) + +#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ + do { \ + auto&& _status = (expr); \ + if (!_status.ok()) { \ + return unexpected{ \ + {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \ + } \ } while (0) -} // namespace iceberg::arrow::internal +} // namespace iceberg::arrow