Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition writes not creating expected directory hierarchy on S3 (MinIO) #891

Open
hackintoshrao opened this issue Jan 13, 2025 · 6 comments · May be fixed by #893
Open

Partition writes not creating expected directory hierarchy on S3 (MinIO) #891

hackintoshrao opened this issue Jan 13, 2025 · 6 comments · May be fixed by #893

Comments

@hackintoshrao
Copy link

Hey folks,

I created this PR to validate the issue: #890 . Please let me know if I've made the assumptions in the integration test correctly.

Summary

I created this integration test to confirm that partitioned data is written into a partition-specific directory (e.g., id=100/). However, after committing the partitioned DataFile to the table, no objects appear under the expected id=100/ prefix in MinIO. Instead, the data file is written directly under another path (e.g., data/test-00000.parquet) without partition directories.

Here is the error from the test:

No objects found under prefix demo/iceberg/rust/t1/data/id=100/ - partition layout may not be correct.

Based on the table metadata, it looks like the data file is located at s3://icebergdata/demo/iceberg/rust/t1/data/test-00000.parquet, but no partition subdirectory (id=100/) is used.


Steps to Reproduce

  • Pull the PR with the integration test
  • Run the integration test RUST_BACKTRACE=1 cargo test --test append_partition_data_file_test -- --nocapture .

Once the issue is confirmed, I'll work on a quick fix.

@ZENOTME
Copy link
Contributor

ZENOTME commented Jan 14, 2025

Thanks @hackintoshrao for pointing out this. I think the partition-specific directory is not necessary to partition write?🤔 But it indeed reminds me to think about how to support partition-specific directories. For now, our design is hard to do this. We create the new file in parquet writer builder, but like following case, the parquet writer builder is hard to know create a file under which partition-specific directories. Only the data file writer know the file corresponding to which partition.

let parquet_writer_builder = ParquetWriterBuilder::new(
        WriterProperties::default(),
        table.metadata().current_schema().clone(),
        table.file_io().clone(),
        location_generator.clone(),
        file_name_generator.clone(),
    );

According to https://github.com/apache/iceberg/blob/3247964c88dbc95e1bbf4da8c39f9a5d9cf950fa/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java#L41, maybe we should redesign the FileWriterBuilder interface. I will try to send a draft PR for this.

@hackintoshrao
Copy link
Author

Thank you, @ZENOTME, for the response.

I work with a startup building a distributed query engine for large-scale Iceberg tables (>1PB). The partition-specific hierarchical directory becomes crucial from a query engine's perspective!

I'm working on a high-performance stream writer using Iceberg-Rust as the core library, I need to soon justify the ROI in picking Iceberg-Rust over the Java library, and this feature is a deal breaker. Hence, I'm happy to contribute if you could help point me in the right direction.

@ZENOTME
Copy link
Contributor

ZENOTME commented Jan 15, 2025

Thank you, @ZENOTME, for the response.

I work with a startup building a distributed query engine for large-scale Iceberg tables (>1PB). The partition-specific hierarchical directory becomes crucial from a query engine's perspective!

I'm working on a high-performance stream writer using Iceberg-Rust as the core library, I need to soon justify the ROI in picking Iceberg-Rust over the Java library, and this feature is a deal breaker. Hence, I'm happy to contribute if you could help point me in the right direction.

Hi, recently I'm thinking a design to support this and I will send it as a draft PR as a start point to help us discuss this quickly. However, it involved the interface design so I'm not sure it will reach consensus and merge it to upstream soon. Actually, we support to custom a LocationGenerator in parquet writer here: https://github.com/ZENOTME/iceberg-rust/blob/cde35ab0eefffae88c521d4e897ba86ee754861c/crates/iceberg/src/writer/file_writer/parquet_writer.rs#L66, so maybe you can try to use this to custom a LocationGenerator which produce the path in partition subdirectory, e.g.

struct PartitionLocationGenerator;
impl PartitionLocationGenerator {
  pub new(partition_value) -> Self;
  pub generate() -> String {
     let table_localtion = ...;
     format!("{}/{}/", table_location, partition_value.to_string())
  }
}

However it requires you to create a new parquet writer builder with a new LocationGenerator for every new partition value. And that's why we may need some redesign here.

@ZENOTME
Copy link
Contributor

ZENOTME commented Jan 15, 2025

I work with a startup building a distributed query engine for large-scale Iceberg tables (>1PB). The partition-specific hierarchical directory becomes crucial from a query engine's perspective!

Personally I'm confused why partition-specific hierarchical directory is crucial for iceberg.🤔 Iceberg will store the meta file and where it need to use the partition-specific hierarchical directory.

@hackintoshrao
Copy link
Author

@ZENOTME : It boils down to getting the best of the object storage system underneath (PS: I was an early engineer at Minio).

S3 /Miniodistributes objects across a large fleet of storage nodes, but storing many objects under the same prefix can cause a single partition to become overloaded 1023. Having each partition value create a distinct prefix (for example, table/data/id=100/) spreads PUT and GET traffic more evenly, leading to better parallelization and fewer “Slow Down” (503) errors if your workload suddenly ramps up in request rate.

This helps to squeeze the best performance out of the system, especially while performing highly concurrent reads / writes while ingesting data when the query engine is performing a table or even when performing concurrent compactions of multiple partitions.

If the merging takes time, I could still test it internally. Based on some of your suggestions above, I'll be sure to look into tweaking the code. I'll also wait for your draft PR.

@ZENOTME
Copy link
Contributor

ZENOTME commented Jan 16, 2025

@ZENOTME : It boils down to getting the best of the object storage system underneath (PS: I was an early engineer at Minio).

S3 /Miniodistributes objects across a large fleet of storage nodes, but storing many objects under the same prefix can cause a single partition to become overloaded 1023. Having each partition value create a distinct prefix (for example, table/data/id=100/) spreads PUT and GET traffic more evenly, leading to better parallelization and fewer “Slow Down” (503) errors if your workload suddenly ramps up in request rate.

This helps to squeeze the best performance out of the system, especially while performing highly concurrent reads / writes while ingesting data when the query engine is performing a table or even when performing concurrent compactions of multiple partitions.

If the merging takes time, I could still test it internally. Based on some of your suggestions above, I'll be sure to look into tweaking the code. I'll also wait for your draft PR.

Thanks @hackintoshrao! I learn a lot from this reply. I have draft a PR #893 to refine the writer interface to support this. Feel free for any suggestions. also cc @liurenjie1024 @Xuanwo @Fokko @sdd

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants