Skip to content

Commit

Permalink
feat(java): support streaming encode/decode to/from buffer for row fo…
Browse files Browse the repository at this point in the history
…rmat (#2024)

## What does this PR do?

support streaming encode/decode to/from buffer for row format

## Related issues

Closes #2019

## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?

## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Jan 25, 2025
1 parent 7fd582a commit 2faede2
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public Expression buildEncodeExpression() {
expressions.add(array);

Expression.Reference fieldExpr = new Expression.Reference(FIELD_NAME, ARROW_FIELD_TYPE, false);
Expression listExpression = serializeForArray(array, arrayWriter, arrayToken, fieldExpr);
Expression listExpression =
serializeForArrayByWriter(array, arrayWriter, arrayToken, fieldExpr);

expressions.add(listExpression);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.fury.format.encoder;

import static org.apache.fury.type.TypeUtils.PRIMITIVE_INT_TYPE;
import static org.apache.fury.type.TypeUtils.getRawType;

import java.math.BigDecimal;
Expand Down Expand Up @@ -70,7 +71,6 @@
import org.apache.fury.util.StringUtils;

/** Base encoder builder for {@link Row}, {@link ArrayData} and {@link MapData}. */
@SuppressWarnings("UnstableApiUsage")
public abstract class BaseBinaryEncoderBuilder extends CodecBuilder {
protected static final String REFERENCES_NAME = "references";
protected static final TypeRef<Schema> SCHEMA_TYPE = TypeRef.of(Schema.class);
Expand Down Expand Up @@ -222,22 +222,14 @@ protected Expression serializeFor(
}
}

/**
* Returns an expression to write iterable <code>inputObject</code> of type <code>typeToken</code>
* as {@link BinaryArray} using given <code>writer</code>.
*/
protected Expression serializeForArray(
Expression inputObject, Expression writer, TypeRef<?> typeRef, Expression arrowField) {
return serializeForArray(inputObject, writer, typeRef, arrowField, false);
Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField, writer);
return serializeForArrayByWriter(inputObject, arrayWriter, typeRef, arrowField);
}

protected Expression serializeForArray(
Expression inputObject,
Expression writer,
TypeRef<?> typeRef,
Expression arrowField,
boolean reuse) {
Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField, writer, reuse);
protected Expression serializeForArrayByWriter(
Expression inputObject, Expression arrayWriter, TypeRef<?> typeRef, Expression arrowField) {
StaticInvoke arrayElementField =
new StaticInvoke(
DataTypes.class, "arrayElementField", "elemField", ARROW_FIELD_TYPE, false, arrowField);
Expand Down Expand Up @@ -285,21 +277,8 @@ protected Expression serializeForArray(
}
}

/**
* Get or create an ArrayWriter for given <code>type</code> and use <code>writer</code> as parent
* writer.
*/
protected Reference getOrCreateArrayWriter(
TypeRef<?> typeRef, Expression arrayDataType, Expression writer) {
return getOrCreateArrayWriter(typeRef, arrayDataType, writer, false);
}

protected Reference getOrCreateArrayWriter(
TypeRef<?> typeRef, Expression arrayDataType, Expression writer, boolean reuse) {
if (reuse) {
return (Reference) writer;
}

return arrayWriterMap.computeIfAbsent(
typeRef,
t -> {
Expand Down Expand Up @@ -344,39 +323,37 @@ protected Expression serializeForMap(
TypeRef<?> keySetType = supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE);
TypeRef<?> valuesType = supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE);

ListExpression expressions = new ListExpression();

Invoke offset = new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE);
// preserve 8 bytes to write the key array numBytes later
Invoke preserve = new Invoke(writer, "writeDirectly", Literal.ofInt(-1));
expressions.add(offset, preserve);

Invoke keySet = new Invoke(inputObject, "keySet", keySetType);
Expression keySerializationExpr = serializeForArray(keySet, writer, keySetType, keyArrayField);
expressions.add(keySet, keySerializationExpr);

expressions.add(
new Expression.Invoke(
writer,
"writeDirectly",
offset,
Expression.Invoke.inlineInvoke(keySerializationExpr, "size", PRIMITIVE_INT_TYPE)));

Invoke values = new Invoke(inputObject, "values", valuesType);
Expression valueSerializationExpr =
serializeForArray(values, writer, valuesType, valueArrayField);
expressions.add(values, valueSerializationExpr);

Invoke offset = new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE);
// preserve 8 bytes to write the key array numBytes later
Invoke preserve =
new Invoke(writer, "writeDirectly", new Literal(-1, TypeUtils.PRIMITIVE_INT_TYPE));
Invoke writeKeyArrayNumBytes =
new Invoke(
writer,
"writeDirectly",
offset,
new Invoke(keySerializationExpr, "size", TypeUtils.PRIMITIVE_INT_TYPE));
Arithmetic size =
ExpressionUtils.subtract(
new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE), offset);
new Invoke(writer, "writerIndex", "writerIndex", PRIMITIVE_INT_TYPE), offset);
Invoke setOffsetAndSize = new Invoke(writer, "setOffsetAndSize", ordinal, offset, size);

ListExpression expression =
new ListExpression(
offset,
preserve,
keySerializationExpr,
writeKeyArrayNumBytes,
valueSerializationExpr,
setOffsetAndSize);
expressions.add(setOffsetAndSize);

return new If(
ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt", ordinal), expression);
ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt", ordinal), expressions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.fury.format.encoder;

import org.apache.fury.memory.MemoryBuffer;

/**
* The encoding interface for encode/decode object to/from binary. The implementation class must
* have a constructor with signature {@code Object[] references}, so we can pass any params to
Expand All @@ -28,7 +30,11 @@
*/
public interface Encoder<T> {

T decode(MemoryBuffer buffer);

T decode(byte[] bytes);

byte[] encode(T obj);

void encode(MemoryBuffer buffer, T obj);
}
Loading

0 comments on commit 2faede2

Please sign in to comment.