From 7fd582a587b6ffbe79629b944f300057884b216c Mon Sep 17 00:00:00 2001 From: hn <72974271+Hen1ng@users.noreply.github.com> Date: Fri, 24 Jan 2025 22:33:57 +0800 Subject: [PATCH] feat(java): Chunk by chunk predictive map serialization protocol (#1722) ## What does this PR do? Implement chunk based map serialization in #925. This pr doesn't provide JIT support, it will be implemented in later PR. ## Related issues ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark --------- Co-authored-by: hening Co-authored-by: chaokunyang --- .../src/main/java/org/apache/fury/Fury.java | 9 + .../collection/AbstractMapSerializer.java | 1571 ++++++++++++++++- .../fury/serializer/collection/MapFlags.java | 49 + .../serializer/collection/MapSerializer.java | 6 +- .../collection/MapSerializersTest.java | 83 + 5 files changed, 1716 insertions(+), 2 deletions(-) create mode 100644 java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 7b364042df..89e23fc23e 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -935,6 +935,15 @@ public Object readNullable(MemoryBuffer buffer) { } } + public Object readNullable(MemoryBuffer buffer, ClassInfoHolder classInfoHolder) { + byte headFlag = buffer.readByte(); + if (headFlag == Fury.NULL_FLAG) { + return null; + } else { + return readNonRef(buffer, classInfoHolder); + } + } + /** Class should be read already. */ public Object readData(MemoryBuffer buffer, ClassInfo classInfo) { depth++; diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java index 0a925dfe89..9807ab3894 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java @@ -39,12 +39,16 @@ import org.apache.fury.type.GenericType; import org.apache.fury.type.Generics; import org.apache.fury.type.TypeUtils; +import org.apache.fury.util.Preconditions; /** Serializer for all map-like objects. */ @SuppressWarnings({"unchecked", "rawtypes"}) public abstract class AbstractMapSerializer extends Serializer { + private static final int MAX_CHUNK_SIZE = 127; + private static final byte MARK_HAS_WRITE_CLASS_INFO = -1; protected MethodHandle constructor; protected final boolean supportCodegenHook; + protected boolean useChunkSerialize; private Serializer keySerializer; private Serializer valueSerializer; protected final ClassInfoHolder keyClassInfoWriteCache; @@ -110,7 +114,11 @@ public void setValueSerializer(Serializer valueSerializer) { @Override public void write(MemoryBuffer buffer, T value) { Map map = onMapWrite(buffer, value); - writeElements(fury, buffer, map); + if (useChunkSerialize) { + chunkWriteElements(fury, buffer, map); + } else { + writeElements(fury, buffer, map); + } } @Override @@ -154,6 +162,375 @@ protected final void writeElements(Fury fury, MemoryBuffer buffer, Map map) { } } + protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map map) { + Serializer keySerializer = this.keySerializer; + Serializer valueSerializer = this.valueSerializer; + // clear the elemSerializer to avoid conflict if the nested + // serialization has collection field. + // TODO use generics for compatible serializer. + this.keySerializer = null; + this.valueSerializer = null; + if (keySerializer != null && valueSerializer != null) { + javaChunkWriteWithKVSerializers(buffer, map, keySerializer, valueSerializer); + } else if (keySerializer != null) { + javaChunkWriteWithKeySerializers(map, buffer, keySerializer); + } else if (valueSerializer != null) { + javaChunkWriteWithValueSerializers(map, buffer, valueSerializer); + } else { + genericJavaChunkWrite(fury, buffer, map); + } + } + + private void javaChunkWriteWithKeySerializers( + Map map, MemoryBuffer buffer, Serializer keySerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean valueIsDifferentType = false; + Class valueClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + final Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + if (valueIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + valueClass = value == null ? null : value.getClass(); + reset = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = fury.trackingRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, valueIsDifferentType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + /** + * user preserve 2 bytes to mark whether class info have been written avoid to use a variable to + * mark these 2 bytes will be overwritten when we finish the chunk. + * + * @param buffer buffer to write. + * @param offset offset to mark. + */ + private void markHasWriteClassInfo(MemoryBuffer buffer, int offset) { + int writeIndex = buffer.writerIndex(); + buffer.writerIndex(offset); + buffer.writeByte(MARK_HAS_WRITE_CLASS_INFO); + buffer.writerIndex(writeIndex); + } + + private void writeCommonKey( + boolean trackingKeyRef, + boolean keyIsDifferentType, + int startOffset, + Object key, + MemoryBuffer buffer, + ClassResolver classResolver, + RefResolver refResolver) { + if (!trackingKeyRef) { + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!keyIsDifferentType) { + Serializer keyWriteSerializer = + getKeyWriteSerializer(startOffset, key, buffer, classResolver); + keyWriteSerializer.write(buffer, key); + } else { + fury.writeNonRef( + buffer, key, classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache)); + } + } + } else { + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!keyIsDifferentType) { + Serializer keyWriteSerializer = + getKeyWriteSerializer(startOffset, key, buffer, classResolver); + writeNoNullRef(keyWriteSerializer, key, buffer, refResolver); + } else { + if (!refResolver.writeNullFlag(buffer, key)) { + fury.writeRef( + buffer, key, classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache)); + } + } + } + } + } + + private Serializer getKeyWriteSerializer( + int startOffset, Object key, MemoryBuffer buffer, ClassResolver classResolver) { + ClassInfo classInfo = classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache); + if (buffer.getByte(startOffset) != MARK_HAS_WRITE_CLASS_INFO) { + classResolver.writeClass(buffer, classInfo); + markHasWriteClassInfo(buffer, startOffset); + } + return classInfo.getSerializer(); + } + + private void writeCommonValue( + int header, + boolean trackingValueRef, + boolean valueIsDifferentType, + int startOffset, + Object value, + MemoryBuffer buffer, + ClassResolver classResolver, + RefResolver refResolver) { + if (!trackingValueRef) { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!valueIsDifferentType) { + if (valueHasNull(header)) { + buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG); + } + Serializer valueWriteSerializer = + getValueWriteSerializer(startOffset, value, buffer, classResolver); + valueWriteSerializer.write(buffer, value); + } else { + fury.writeNullable( + buffer, + value, + classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache)); + } + } + } else { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!valueIsDifferentType) { + Serializer valueWriteSerializer = + getValueWriteSerializer(startOffset, value, buffer, classResolver); + if (!valueHasNull(header)) { + writeNoNullRef(valueWriteSerializer, value, buffer, refResolver); + } else { + fury.writeRef(buffer, value, valueWriteSerializer); + } + } else { + if (!refResolver.writeNullFlag(buffer, value)) { + fury.writeRef( + buffer, + value, + classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache)); + } + } + } + } + } + + private Serializer getValueWriteSerializer( + int startOffset, Object value, MemoryBuffer buffer, ClassResolver classResolver) { + ClassInfo classInfo = classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache); + if (buffer.getByte(startOffset + 1) != MARK_HAS_WRITE_CLASS_INFO) { + classResolver.writeClass(buffer, classInfo); + markHasWriteClassInfo(buffer, startOffset + 1); + } + return classInfo.getSerializer(); + } + + private void javaChunkWriteWithValueSerializers( + Map map, MemoryBuffer buffer, Serializer valueSerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean keyIsDifferentType = false; + Class keyClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + final Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + if (keyIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + keyClass = key == null ? null : key.getClass(); + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = fury.trackingRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, keyIsDifferentType, false); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private int preserveByte(MemoryBuffer buffer, int startOffset) { + if (startOffset == -1) { + int writerIndex = buffer.writerIndex(); + // preserve two byte for header and chunk size + buffer.writerIndex(writerIndex + 2); + return writerIndex; + } + return startOffset; + } + + private void javaChunkWriteWithKVSerializers( + MemoryBuffer buffer, Map map, Serializer keySerializer, Serializer valueSerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) { + // update header at the beginning of the chunk when we reset chunk + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, header, false, false); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + chunkSize++; + } + // update header at the beginning of the chunk when we finish the iteration + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void writeFinalKey( + Object key, MemoryBuffer buffer, Serializer keySerializer, boolean trackingKeyRef) { + if (!trackingKeyRef) { + // map key has one null at most, use one chunk to write + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + keySerializer.write(buffer, key); + } + } else { + RefResolver refResolver = fury.getRefResolver(); + if (!refResolver.writeRefOrNull(buffer, key)) { + keySerializer.write(buffer, key); + } + } + } + + private void writeFinalValue( + Object value, + MemoryBuffer buffer, + Serializer valueSerializer, + boolean trackingValueRef, + int header) { + if (!trackingValueRef) { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (valueHasNull(header)) { + buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG); + valueSerializer.write(buffer, value); + } else { + valueSerializer.write(buffer, value); + } + } + } else { + RefResolver refResolver = fury.getRefResolver(); + if (!refResolver.writeRefOrNull(buffer, value)) { + valueSerializer.write(buffer, value); + } + } + } + + private int updateKVHeader( + Object key, + boolean trackingKeyRef, + Object value, + boolean trackingValueRef, + int header, + boolean keyIsDifferentType, + boolean valueIsDifferentType) { + if (trackingKeyRef) { + header |= MapFlags.TRACKING_KEY_REF; + } + if (key == null) { + header |= MapFlags.KEY_HAS_NULL; + } + if (trackingValueRef) { + header |= MapFlags.TRACKING_VALUE_REF; + } + if (value == null) { + header |= MapFlags.VALUE_HAS_NULL; + } + if (keyIsDifferentType) { + header |= MapFlags.KEY_NOT_SAME_TYPE; + } + if (valueIsDifferentType) { + header |= MapFlags.VALUE_NOT_SAME_TYPE; + } + return header; + } + private void javaWriteWithKVSerializers( Fury fury, MemoryBuffer buffer, @@ -216,6 +593,54 @@ private void genericJavaWrite(Fury fury, MemoryBuffer buffer, Map map) { } } + private void genericJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map map) { + Generics generics = fury.getGenerics(); + GenericType genericType = generics.nextGenericType(); + if (genericType == null) { + generalJavaChunkWrite(fury, buffer, map); + } else { + GenericType keyGenericType = genericType.getTypeParameter0(); + GenericType valueGenericType = genericType.getTypeParameter1(); + // type parameters count for `Map field` will be 0; + // type parameters count for `SubMap field` which SubMap is + // `SubMap implements Map` will be 1; + if (genericType.getTypeParametersCount() < 2) { + Tuple2 kvGenericType = getKVGenericType(genericType); + if (keyGenericType == objType && valueGenericType == objType) { + generalJavaChunkWrite(fury, buffer, map); + return; + } + keyGenericType = kvGenericType.f0; + valueGenericType = kvGenericType.f1; + } + // Can't avoid push generics repeatedly in loop by stack depth, because push two + // generic type changed generics stack top, which is depth index, update stack top + // and depth will have some cost too. + // Stack depth to avoid push generics repeatedly in loop. + // Note push two generic type changed generics stack top, which is depth index, + // stack top should be updated when using for serialization k/v. + // int depth = fury.getDepth(); + // // depth + 1 to leave a slot for value generics, otherwise value generics will + // // be overwritten by nested key generics. + // fury.setDepth(depth + 1); + // generics.pushGenericType(keyGenericType); + // fury.setDepth(depth); + // generics.pushGenericType(valueGenericType); + boolean keyGenericTypeFinal = keyGenericType.isMonomorphic(); + boolean valueGenericTypeFinal = valueGenericType.isMonomorphic(); + if (keyGenericTypeFinal && valueGenericTypeFinal) { + javaKVTypesFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else if (keyGenericTypeFinal) { + javaKeyTypeFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else if (valueGenericTypeFinal) { + javaValueTypeFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else { + javaKVTypesNonFinalChunkWrite( + fury, buffer, map, keyGenericType, valueGenericType, generics); + } + } + } + private void javaKVTypesFinalWrite( Fury fury, MemoryBuffer buffer, @@ -236,6 +661,51 @@ private void javaKVTypesFinalWrite( } } + /** + * kv final write do not need to predict , since key and value is almost same type unless null. + */ + private void javaKVTypesFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, header, false, false); + generics.pushGenericType(keyGenericType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + private void javaKeyTypeFinalWrite( Fury fury, MemoryBuffer buffer, @@ -265,6 +735,279 @@ private void javaKeyTypeFinalWrite( } } + private void javaKeyTypeFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + boolean trackingValueRef = fury.getClassResolver().needToWriteRef(valueGenericType.getCls()); + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean valueIsDifferentType = false; + Class valueClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + valueClass = value == null ? null : value.getClass(); + reset = false; + } + startOffset = preserveByte(buffer, startOffset); + generics.pushGenericType(keyGenericType); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, valueIsDifferentType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void javaValueTypeFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + int header = 0; + int chunkSize = 0; + boolean prevKeyIsNull = false; + boolean keyIsDifferentType = false; + int startOffset = -1; + Class keyClass = null; + boolean reset = false; + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + if (keyIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + prevKeyIsNull = false; + startOffset = -1; + keyClass = key == null ? null : key.getClass(); + reset = false; + } + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, keyIsDifferentType); + startOffset = preserveByte(buffer, startOffset); + generics.pushGenericType(keyGenericType); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void javaKVTypesNonFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + int header = 0; + int startOffset = -1; + int chunkSize = 0; + Class keyClass = null; + Class valueClass = null; + boolean keyIsDifferentType = false; + boolean valueIsDifferentType = false; + boolean prevKeyIsNull = false; + boolean markChunkWriteFinish = false; + boolean reset = false; + boolean needMarkFinish = false; + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = fury.getClassResolver().needToWriteRef(valueGenericType.getCls()); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (!markChunkWriteFinish) { + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + } + if (keyIsDifferentType) { + reset = true; + } + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (keyIsDifferentType && valueIsDifferentType) { + needMarkFinish = true; + } + if (needMarkFinish) { + writeHeader(buffer, chunkSize, header, startOffset); + // set chunk size = 0 + buffer.writeByte(0); + markChunkWriteFinish = true; + } else { + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + prevKeyIsNull = false; + keyClass = key == null ? null : key.getClass(); + valueClass = value == null ? null : value.getClass(); + reset = false; + startOffset = -1; + } + } + } + if (markChunkWriteFinish) { + generics.pushGenericType(keyGenericType); + writeJavaRefOptimized( + fury, classResolver, refResolver, trackingKeyRef, buffer, key, keyClassInfoWriteCache); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeJavaRefOptimized( + fury, + classResolver, + refResolver, + trackingValueRef, + buffer, + value, + keyClassInfoWriteCache); + generics.popGenericType(); + } else { + startOffset = preserveByte(buffer, startOffset); + header = + updateKVHeader( + key, + trackingKeyRef, + value, + trackingValueRef, + header, + keyIsDifferentType, + valueIsDifferentType); + generics.pushGenericType(keyGenericType); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + chunkSize++; + } + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private boolean needReset( + Object key, + int chunkSize, + boolean prevKeyIsNull, + Object value, + int header, + boolean needReset) { + return (key == null && chunkSize > 0) + || (prevKeyIsNull && key != null) + || (value == null && chunkSize > 0 && !valueHasNull(header)) + || (chunkSize >= MAX_CHUNK_SIZE) + || needReset; + } + private void javaValueTypeFinalWrite( Fury fury, MemoryBuffer buffer, @@ -342,6 +1085,130 @@ private void generalJavaWrite(Fury fury, MemoryBuffer buffer, Map map) { } } + protected void generalJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map map) { + int header = 0; + int startOffset = -1; + int chunkSize = 0; + Class keyClass = null; + Class valueClass = null; + boolean keyIsDifferentType = false; + boolean valueIsDifferentType = false; + boolean prevKeyIsNull = false; + boolean markChunkWriteFinish = false; + boolean reset = false; + boolean needMarkFinish = false; + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (!markChunkWriteFinish) { + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + } + if (keyIsDifferentType) { + reset = true; + } + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (valueIsDifferentType && keyIsDifferentType) { + needMarkFinish = true; + } + if (needMarkFinish) { + writeHeader(buffer, chunkSize, header, startOffset); + // set chunk size = 0 + buffer.writeByte(0); + markChunkWriteFinish = true; + } else { + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + keyClass = key == null ? null : key.getClass(); + valueClass = value == null ? null : value.getClass(); + reset = false; + } + } + } + if (!markChunkWriteFinish) { + startOffset = preserveByte(buffer, startOffset); + boolean trackingRef = fury.trackingRef(); + header = + updateKVHeader( + key, + trackingRef, + value, + trackingRef, + header, + keyIsDifferentType, + valueIsDifferentType); + writeCommonKey( + trackingRef, keyIsDifferentType, startOffset, key, buffer, classResolver, refResolver); + writeCommonValue( + header, + trackingRef, + valueIsDifferentType, + startOffset, + value, + buffer, + classResolver, + refResolver); + chunkSize++; + } else { + writeJavaRefOptimized( + fury, classResolver, refResolver, buffer, entry.getKey(), keyClassInfoWriteCache); + writeJavaRefOptimized( + fury, classResolver, refResolver, buffer, entry.getValue(), valueClassInfoWriteCache); + } + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void writeNoNullRef( + Serializer serializer, Object o, MemoryBuffer buffer, RefResolver refResolver) { + if (serializer.needToWriteRef()) { + if (!refResolver.writeRefOrNull(buffer, o)) { + serializer.write(buffer, o); + } + } else { + serializer.write(buffer, o); + } + } + + private boolean valueHasNull(int header) { + return (header & MapFlags.VALUE_HAS_NULL) == MapFlags.VALUE_HAS_NULL; + } + + public void writeHeader(MemoryBuffer memoryBuffer, int chunkSize, int header, int startOffset) { + if (chunkSize > 0) { + int currentWriteIndex = memoryBuffer.writerIndex(); + memoryBuffer.writerIndex(startOffset); + memoryBuffer.writeByte(chunkSize); + memoryBuffer.writeByte(header); + memoryBuffer.writerIndex(currentWriteIndex); + } + } + public static void xwriteElements(Fury fury, MemoryBuffer buffer, Map value) { Generics generics = fury.getGenerics(); GenericType genericType = generics.nextGenericType(); @@ -496,6 +1363,26 @@ protected void copyEntry(Map originMap, Object[] elements) { } } + @SuppressWarnings("unchecked") + protected final void chunkReadElements(MemoryBuffer buffer, int size, Map map) { + Serializer keySerializer = this.keySerializer; + Serializer valueSerializer = this.valueSerializer; + // clear the elemSerializer to avoid conflict if the nested + // serialization has collection field. + // TODO use generics for compatible serializer. + this.keySerializer = null; + this.valueSerializer = null; + if (keySerializer != null && valueSerializer != null) { + javaChunkReadWithKVSerializers(buffer, map, size, keySerializer, valueSerializer); + } else if (keySerializer != null) { + javaChunkReadWithKeySerializer(buffer, map, size, keySerializer); + } else if (valueSerializer != null) { + javaChunkReadWithValueSerializer(buffer, map, size, valueSerializer); + } else { + genericJavaChunkRead(fury, buffer, map, size); + } + } + @SuppressWarnings("unchecked") protected final void readElements(MemoryBuffer buffer, int size, Map map) { Serializer keySerializer = this.keySerializer; @@ -560,6 +1447,253 @@ private void genericJavaRead(Fury fury, MemoryBuffer buffer, Map map, int size) } } + private void javaChunkReadWithKeySerializer( + MemoryBuffer buffer, Map map, int size, Serializer keySerializer) { + final ClassResolver classResolver = fury.getClassResolver(); + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + Serializer valueReadSerializer = null; + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + key = readFinalKey(buffer, header, keySerializer); + if (!fury.trackingRef()) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + map.put(key, value); + size--; + } + } + } + + private void javaChunkReadWithValueSerializer( + MemoryBuffer buffer, Map map, int size, Serializer valueSerializer) { + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + Serializer keyReadSerializer = null; + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + if (!fury.trackingRef()) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + value = readFinalValue(buffer, header, valueSerializer); + map.put(key, value); + size--; + } + } + } + + private void javaChunkReadWithKVSerializers( + MemoryBuffer buffer, + Map map, + int size, + Serializer keySerializer, + Serializer valueSerializer) { + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + key = readFinalKey(buffer, header, keySerializer); + value = readFinalValue(buffer, header, valueSerializer); + map.put(key, value); + size--; + } + } + } + + public Object readFinalKey(MemoryBuffer buffer, int header, Serializer keySerializer) { + boolean trackingKeyRef = keySerializer.needToWriteRef(); + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected NULL_FLAG"); + return null; + } else { + return keySerializer.read(buffer); + } + } else { + return fury.readRef(buffer, keySerializer); + } + } + + public Object readFinalValue(MemoryBuffer buffer, int header, Serializer valueSerializer) { + boolean trackingValueRef = valueSerializer.needToWriteRef(); + if (!trackingValueRef) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + return valueSerializer.read(buffer); + } else { + return null; + } + } else { + return valueSerializer.read(buffer); + } + } else { + return fury.readRef(buffer, valueSerializer); + } + } + + private void genericJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, int size) { + Generics generics = fury.getGenerics(); + GenericType genericType = generics.nextGenericType(); + if (genericType == null) { + generalJavaChunkRead(fury, buffer, map, size); + } else { + GenericType keyGenericType = genericType.getTypeParameter0(); + GenericType valueGenericType = genericType.getTypeParameter1(); + if (genericType.getTypeParametersCount() < 2) { + Tuple2 kvGenericType = getKVGenericType(genericType); + if (keyGenericType == objType && valueGenericType == objType) { + generalJavaChunkRead(fury, buffer, map, size); + return; + } + keyGenericType = kvGenericType.f0; + valueGenericType = kvGenericType.f1; + } + boolean keyGenericTypeFinal = keyGenericType.isMonomorphic(); + boolean valueGenericTypeFinal = valueGenericType.isMonomorphic(); + if (keyGenericTypeFinal && valueGenericTypeFinal) { + javaKVTypesFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else if (keyGenericTypeFinal) { + javaKeyTypeFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else if (valueGenericTypeFinal) { + javaValueTypeFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else { + javaKVTypesNonFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } + generics.popGenericType(); + } + } + + private void javaKVTypesFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + generics.pushGenericType(keyGenericType); + key = readFinalKey(buffer, header, keySerializer); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + value = readFinalValue(buffer, header, valueSerializer); + generics.popGenericType(); + map.put(key, value); + size--; + } + } + } + private void javaKVTypesFinalRead( Fury fury, MemoryBuffer buffer, @@ -581,6 +1715,86 @@ private void javaKVTypesFinalRead( } } + private void javaKeyTypeFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + ClassResolver classResolver = fury.getClassResolver(); + boolean trackingValueRef = classResolver.needToWriteRef(valueGenericType.getCls()); + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + byte header = buffer.readByte(); + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key = readFinalKey(buffer, header, keySerializer); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value; + if (!trackingValueRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + private void javaKeyTypeFinalRead( Fury fury, MemoryBuffer buffer, @@ -605,6 +1819,74 @@ private void javaKeyTypeFinalRead( } } + private void javaValueTypeFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key; + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value = readFinalValue(buffer, header, valueSerializer); + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + private void javaValueTypeFinalRead( Fury fury, MemoryBuffer buffer, @@ -628,6 +1910,142 @@ private void javaValueTypeFinalRead( } } + private void javaKVTypesNonFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + boolean trackingKeyRef = classResolver.needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = classResolver.needToWriteRef(valueGenericType.getCls()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + if (chunkSize == 0) { + while (size > 0) { + generics.pushGenericType(keyGenericType); + Object key = + readJavaRefOptimized( + fury, refResolver, trackingKeyRef, buffer, keyClassInfoReadCache); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value = + readJavaRefOptimized( + fury, refResolver, trackingValueRef, buffer, valueClassInfoReadCache); + generics.popGenericType(); + map.put(key, value); + size--; + } + } else { + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key; + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value; + if (!trackingValueRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + } + private void javaKVTypesNonFinalRead( Fury fury, MemoryBuffer buffer, @@ -654,6 +2072,149 @@ private void javaKVTypesNonFinalRead( } } + private void generalJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, int size) { + ClassResolver classResolver = fury.getClassResolver(); + boolean trackingRef = fury.trackingRef(); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + if (chunkSize == 0) { + while (size > 0) { + Object key = fury.readRef(buffer, keyClassInfoReadCache); + Object value = fury.readRef(buffer, keyClassInfoReadCache); + map.put(key, value); + size--; + } + } else { + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + Object key; + if (!trackingRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + Object value; + if (!trackingRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + chunkSize--; + size--; + map.put(key, value); + } + } + } + } + + private boolean keyHasNull(int header) { + return (header & MapFlags.KEY_HAS_NULL) == MapFlags.KEY_HAS_NULL; + } + + private boolean keyIsDifferentType(int header) { + return (header & MapFlags.KEY_NOT_SAME_TYPE) == MapFlags.KEY_NOT_SAME_TYPE; + } + + private boolean valueIsDifferentType(int header) { + return (header & MapFlags.VALUE_NOT_SAME_TYPE) == MapFlags.VALUE_NOT_SAME_TYPE; + } + + private Object readNoNullRef(Serializer serializer, MemoryBuffer memoryBuffer) { + if (serializer.needToWriteRef()) { + final RefResolver refResolver = fury.getRefResolver(); + int nextReadRefId = refResolver.tryPreserveRefId(memoryBuffer); + if (nextReadRefId >= Fury.NOT_NULL_VALUE_FLAG) { + Object obj = serializer.read(memoryBuffer); + refResolver.setReadObject(nextReadRefId, obj); + return obj; + } else { + return refResolver.getReadObject(); + } + } else { + return serializer.read(memoryBuffer); + } + } + private void generalJavaRead(Fury fury, MemoryBuffer buffer, Map map, int size) { for (int i = 0; i < size; i++) { Object key = fury.readRef(buffer, keyClassInfoReadCache); @@ -734,6 +2295,14 @@ public final boolean supportCodegenHook() { return supportCodegenHook; } + public boolean isUseChunkSerialize() { + return useChunkSerialize; + } + + public void setUseChunkSerialize(boolean useChunkSerialize) { + this.useChunkSerialize = useChunkSerialize; + } + /** * Write data except size and elements. * diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java new file mode 100644 index 0000000000..52ca2d2175 --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.serializer.collection; + +public class MapFlags { + + /** Whether track key ref. */ + public static int TRACKING_KEY_REF = 0b1; + + /** Whether key has null. */ + public static int KEY_HAS_NULL = 0b10; + + // /** + // * Whether key is not declare type. + // */ + // public static int KEY_NOT_DECL_TYPE = 0b100; + + /** Whether keys type are different. */ + public static int KEY_NOT_SAME_TYPE = 0b100; + + /** Whether track value ref. */ + public static int TRACKING_VALUE_REF = 0b1000; + + /** Whether value has null. */ + public static int VALUE_HAS_NULL = 0b10000; + + /** Whether value is not declare type. */ + // public static int VALUE_NOT_DECL_TYPE = 0b1000000; + + /** Whether values type are different. */ + public static int VALUE_NOT_SAME_TYPE = 0b100000; +} diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java index 046f0a7e82..ca40de4c0f 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java @@ -65,7 +65,11 @@ public T onMapCopy(Map map) { @Override public T read(MemoryBuffer buffer) { Map map = newMap(buffer); - readElements(buffer, getAndClearNumElements(), map); + if (useChunkSerialize) { + chunkReadElements(buffer, getAndClearNumElements(), map); + } else { + readElements(buffer, getAndClearNumElements(), map); + } return onMapRead(map); } diff --git a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java index 74640c6870..240fc8a390 100644 --- a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java @@ -51,6 +51,7 @@ import org.apache.fury.collection.MapEntry; import org.apache.fury.config.Language; import org.apache.fury.reflect.TypeRef; +import org.apache.fury.serializer.Serializer; import org.apache.fury.serializer.collection.CollectionSerializersTest.TestEnum; import org.apache.fury.test.bean.Cyclic; import org.apache.fury.test.bean.MapFields; @@ -605,4 +606,86 @@ public void testNestedValueByPrivateMapSerializer() { map.put("k", 1); serDeCheck(fury, new LazyMapCollectionFieldStruct(ofArrayList(map), map)); } + + @Test(dataProvider = "referenceTrackingConfig") + public void testObjectKeyValueChunkSerializer(boolean referenceTrackingConfig) { + Fury fury = Fury.builder().withRefTracking(referenceTrackingConfig).build(); + final Map differentKeyAndValueTypeMap = createDifferentKeyAndValueTypeMap(); + final Serializer serializer = + fury.getSerializer(differentKeyAndValueTypeMap.getClass()); + MapSerializers.HashMapSerializer mapSerializer = (MapSerializers.HashMapSerializer) serializer; + mapSerializer.setUseChunkSerialize(true); + final byte[] serialize = fury.serialize(differentKeyAndValueTypeMap); + final Object deserialize = fury.deserialize(serialize); + assertEquals(deserialize, differentKeyAndValueTypeMap); + } + + @Test(dataProvider = "referenceTrackingConfig") + public void testMapFieldsChunkSerializer(boolean referenceTrackingConfig) { + Fury fury = + Fury.builder() + .withRefTracking(referenceTrackingConfig) + .requireClassRegistration(false) + .build(); + final MapFields mapFieldsObject = createMapFieldsObject(); + // hashmap + final Serializer serializer = fury.getSerializer(HashMap.class); + MapSerializers.HashMapSerializer mapSerializer = (MapSerializers.HashMapSerializer) serializer; + mapSerializer.setUseChunkSerialize(true); + + // LinkedHashMap + final Serializer serializer1 = fury.getSerializer(LinkedHashMap.class); + MapSerializers.LinkedHashMapSerializer linkedHashMapSerializer = + (MapSerializers.LinkedHashMapSerializer) serializer1; + linkedHashMapSerializer.setUseChunkSerialize(true); + + // TreeMap + final Serializer serializer2 = fury.getSerializer(TreeMap.class); + MapSerializers.SortedMapSerializer sortedMapSerializer = + (MapSerializers.SortedMapSerializer) serializer2; + sortedMapSerializer.setUseChunkSerialize(true); + + // ConcurrentHashMap + final Serializer serializer3 = fury.getSerializer(ConcurrentHashMap.class); + MapSerializers.ConcurrentHashMapSerializer concurrentHashMapSerializer = + (MapSerializers.ConcurrentHashMapSerializer) serializer3; + concurrentHashMapSerializer.setUseChunkSerialize(true); + + // ConcurrentSkipListMap + final Serializer serializer4 = + fury.getSerializer(ConcurrentSkipListMap.class); + MapSerializers.ConcurrentSkipListMapSerializer concurrentSkipListMapSerializer = + (MapSerializers.ConcurrentSkipListMapSerializer) serializer4; + concurrentSkipListMapSerializer.setUseChunkSerialize(true); + + final Serializer serializer5 = fury.getSerializer(EnumMap.class); + MapSerializers.EnumMapSerializer enumMapSerializer = + (MapSerializers.EnumMapSerializer) serializer5; + enumMapSerializer.setUseChunkSerialize(true); + + final byte[] serialize = fury.serialize(mapFieldsObject); + final Object deserialize = fury.deserialize(serialize); + assertEquals(deserialize, mapFieldsObject); + } + + private static Map createDifferentKeyAndValueTypeMap() { + Map map = new HashMap<>(); + map.put(null, "1"); + map.put(2, "1"); + map.put(4, "1"); + map.put(6, "1"); + map.put(7, "1"); + map.put(10, "1"); + map.put(12, "null"); + map.put(19, "null"); + map.put(11, null); + map.put(20, null); + map.put(21, 9); + map.put(22, 99); + map.put(291, 900); + map.put("292", 900); + map.put("293", 900); + map.put("23", 900); + return map; + } }