Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support arrow_struct_to_iceberg_struct #731

Merged
merged 11 commits into from
Feb 27, 2025

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Nov 28, 2024

This PR introduces the function to convert arrow struct to iceberg struct. This function is needed when we add fanout partition writer: In this writer, we need to compute the partition value using record batch and convert them into struct value finally and set into data file.

@ZENOTME ZENOTME force-pushed the arrow_value_convert branch 2 times, most recently from 1ffc802 to 0830aae Compare November 28, 2024 12:00
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Nov 28, 2024

cc @liurenjie1024 @Fokko @Xuanwo

@liurenjie1024
Copy link
Contributor

Hi, @ZENOTME Thanks for this pr! I'm thinking that instead of array transformation, should we consider transforming arrow record batch to/from array of iceberg datum? It maybe also worthy to have a visitor pattern.

@ZENOTME ZENOTME force-pushed the arrow_value_convert branch from 0830aae to cc09bff Compare December 4, 2024 07:40
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 4, 2024

Hi, @ZENOTME Thanks for this pr! I'm thinking that instead of array transformation, should we consider transforming arrow record batch to/from array of iceberg datum?

For now, this function is mainly used in partition writer. And we store partition value as Struct so we need to transform it to literal now.

It maybe also worthy to have a visitor pattern.

Good point. The benefit of visitor patterns is to make it more convenient to convert between different types, e.g. datum or literal. I try to use visitor patterns for this PR so that we can add more type conversions in the future. But I'm not sure whether it's a good design. Feel free to let me know if there are some other API designs.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this pr, I have some suggestions about it.

&self,
array: &NullArray,
arrow_type: &DataType,
iceberg_type: &PrimitiveType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitating to add an iceberg_type here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think we need an arrow_type, arrow array has data type associated with it: https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.data_type

use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type};
use crate::{Error, ErrorKind, Result};

trait ArrowArrayVistor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need have clear documentation about what kind of visitor it is. Is it an post order visitor?

Copy link
Contributor

@Fokko Fokko Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Iceberg, everything is post-order, except assigning field-IDs. I think in this case post-order makes sense, since you would first convert the fields of the struct, and then move up to nested-structs, lists, maps, structs, all the way up to the schema struct.

@Fokko Fokko changed the title feat: support arrow_struct_to_iceberg_struct feat: support arrow_struct_to_iceberg_struct Dec 6, 2024

let mut columns = Vec::with_capacity(array.columns().len());

for ((array, arrow_type), iceberg_field) in array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping back @liurenjie1024 question here:

We need have clear documentation about what kind of visitor it is.

First of all, thanks for converting this into a visitor, I think that's the right approach here. One question regarding the lookup. It looks like we're expecting the Arrow data to be in the same position of the Iceberg schema. Instead, what would be more Iceberg-esque, is performing the lookup by field-ID.

For example:

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

table {
  1: city: optional string
  2: lat: optional double
  3: long: optional double
}

table = catalog.create_table("default.locations", schema)

with table.update_schema() as update:
    update.move_after("lat", "long")


table {
  1: city: optional string
  3: lat: optional double
  2: long: optional double
}

Now the fields are switched. Data that's already written needs to be projected, and new data can be read as is. Both of the files will be read by this visitor without raising an error, but for the old files, the data the fields will be switched.

This way, we could also do smart things, like fill in nullable fields:

        let struct_array = StructArray::new_null();
        let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional(
            0,
            "bool_field",
            Type::Primitive(PrimitiveType::Boolean),
        ))]);
        let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap();
        assert_eq!(result, vec![None; 3]);

And in a step after that, #737. You can find the Python equivalent here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use field-ID require user store the file id metadata for date type in struct type. I'm not sure whether is this always set up. How about providing two visit function? One would assume the schema of arrow will be same as iceberg schema, it will return error if convert fail. And the other one will look up the column using the field id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the other one will look up the column using the field id.

+1. I think we should allow user to choose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, we will have the interface SchemaWithPartnerVisitor. And I think we can unify and refactor our RecordBatchProjector and RecordBatchTransformer(I think they are doing similar thing) using this interface to support:

  1. do smart things, like fill in nullable fields
  2. project reorder schema
  3. project nested field in struct type

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME 's effort. I saw that both java/python have schema with partner visitor:

  1. SchemaWithPartnerVisitor
  2. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726

I'm thinking that we should adopt similar design?

Comment on lines 37 to 42
fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn decimal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider condense these into to one method? Listing all primitive types seems too cumbersome to me.


/// A post order arrow array visitor.
/// # TODO
/// - Add support for ListArray, MapArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand that we don't want to support ListArray, MapArray for now, but we should throw unsupport feature in implementation. As an interface, we should still take into account those types.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 16, 2024

Thanks @ZENOTME 's effort. I saw that both java/python have schema with partner visitor:

  1. SchemaWithPartnerVisitor
  2. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726

I'm thinking that we should adopt similar design?

I tried to adopt the SchemaWithPartner design, but I find the interface is not so general. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726 is not for array convert I think. To make it suitable for arrow array convert, I make some change to it:

  1. Introduce ListIterator and MapIterator, the control flow of list and map is not so suitable for arrow array convert.
  2. array in arrow rust can be NullArray, so we need some preorder access before diving into the subcolumn. So I introduce visit_type_before to solve it.

I think we can still implement e.g. arrow array project and cast using the changed interface, but I'm not sure whether the change is reasonable. There are some errors in this PR, I will fix them if we reach a consensus on this design.


/// Called before every type, if this function return `Some`, the following visiting will be skipped.
/// This function used to implement early return.
fn visit_type_before(&mut self, _ty: &Type, _partner: &P) -> Result<Option<Self::T>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should add this part. I think this is an optimization to prune some fields, right? Instead adding this, why we don't prune unnecessary fields before visiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't prune unnecessary fields before visiting?

I think this prune will also incur visits recursively. Let me explain why we need this prune. In arrow, the struct array maybe represent as a nullarray which means that all child field are null. So in this case, we need to recognize this and stop to visit child column. But in our visitor pattern, we can't control this, the control path is in the visit_function rather than vistor. So we need to use this function to notify the visit function that stop visit child column if this function return Some.

results: Vec<Self::T>,
) -> Result<Self::T>;
/// Called after list fields visited.
fn list(&mut self, list: &ListType, partner: &P, value: Vec<Self::T>) -> Result<Self::T>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the value of a Vec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the visit fucnton, this function will be called after visit all child column. So the Vec means the result return by all child columns, and then this function may combine this result.

&mut self,
map: &MapType,
partner: &P,
key_value: Vec<Self::T>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar like struct, In the visit fucnton, this function will be called after visit all row. So the Vec means the result return by all row, and then this function may combine this result.

Type::List(list) => {
let mut results = Vec::new();
let mut list_element_partner_iter = accessor.list_element_partner(partner)?;
if let Some(list_element_partner) = list_element_partner_iter.next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner. If I understand correctly, this is because you have trouble when dealing with list array of arrow? In this case, you can get inner child array by using array_data()[0].

Copy link
Contributor Author

@ZENOTME ZENOTME Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner.

For now, our visit pattern is that the control logic is in visit function. The visitor only take care of how to access data. If we let the actual visitor's reponsibility to do visit partner, which means that it need to know access which partner, e.g it need to know some index I think. 🤔 And the interface should looks like:

fn before_list_element(&mut self, _field: &NestedFieldRef, _partner: &P, element_idx: usize) -> Result<()> {
       Ok(())
   }

Is that what you means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @liurenjie1024 is there other suggestion to design this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cc @Xuanwo @sdd @Fokko

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply, I'll take a look at this pr recently.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this pr. I have some concerns about the processing of list/map

@ZENOTME ZENOTME mentioned this pull request Jan 11, 2025
2 tasks
@ZENOTME ZENOTME force-pushed the arrow_value_convert branch from b85a9ad to 024c2b6 Compare February 10, 2025 17:59
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Feb 10, 2025

Thanks @ZENOTME for this pr. I have some concerns about the processing of list/map

Hi @liurenjie1024, I find a way to avoid the iterator design. And for now, our SchemaWithPartnerVisitor interface is the same as iceberg-python and iceberg-java. And I fulfilled the test. PTAL.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this great pr, generally LGTM! Left some suggestions to improve.

}

/// Visiting a type in post order.
pub fn visit_type_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAccessor<P>>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this method as private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's possible external usage to visit schema:

impl SchemaVisitor for HiveSchemaBuilder {
.

Comment on lines 1568 to 1573
pub fn timestamp_nano(value: i64) -> Self {
Self::Primitive(PrimitiveLiteral::Long(value))
}

/// Creates a timestamp with timezone from unix epoch in nanoseconds.
pub fn timestamptz_nano(value: i64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should no longer constructing directly from Literal but using Datum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about another PR to refine them together?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but please mark these new added methods as crate private.

};
use crate::{Error, ErrorKind, Result};

struct ArrowArrayConverter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct ArrowArrayConverter;
struct ArrowArrayToIcebergStructConverter;

/// A post order schema visitor with partner.
///
/// For order of methods called, please refer to [`visit_schema_with_partner`].
pub trait SchemaWithPartnerVisitor<P> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe it's time to split schema module to multi file module, but we can do it later.

Comment on lines 1222 to 1223
field_id: i32,
field_name: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just passing the ref to NestedField?


fn list(
&mut self,
list: &crate::spec::ListType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
list: &crate::spec::ListType,
list: &ListType,


fn map(
&mut self,
_map: &crate::spec::MapType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_map: &crate::spec::MapType,
_map: &MapType,

@ZENOTME ZENOTME force-pushed the arrow_value_convert branch from 77170a3 to df3dd40 Compare February 24, 2025 10:36
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME , I think we are close to merge, just some minor point.

}

/// Visiting a type in post order.
pub fn visit_type_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAccessor<P>>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think user only need pub access to visit_schema_with_partner and visit_struct_with_partner, there is no need to access this?

Comment on lines 1568 to 1573
pub fn timestamp_nano(value: i64) -> Self {
Self::Primitive(PrimitiveLiteral::Long(value))
}

/// Creates a timestamp with timezone from unix epoch in nanoseconds.
pub fn timestamptz_nano(value: i64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but please mark these new added methods as crate private.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this great pr!

@liurenjie1024 liurenjie1024 merged commit ba71eea into apache:main Feb 27, 2025
17 checks passed
@ZENOTME ZENOTME deleted the arrow_value_convert branch February 27, 2025 03:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants