Skip to content

Commit

Permalink
modify code
Browse files Browse the repository at this point in the history
  • Loading branch information
hening committed Aug 6, 2024
1 parent 55f8440 commit 3752339
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,24 @@ protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map map)
this.keySerializer = null;
this.valueSerializer = null;
if (keySerializer != null && valueSerializer != null) {
javaWriteWithKVSerializers(fury, buffer, map, keySerializer, valueSerializer);
javaChunkWriteWithKVSerializers(fury, buffer, map, keySerializer, valueSerializer);
} else if (keySerializer != null) {
javaWriteWithKeySerializers(map, buffer, keySerializer);
javaChunkWriteWithKeySerializers(map, buffer, keySerializer);
} else if (valueSerializer != null) {
javaWriteWithValueSerializers(map, buffer, valueSerializer);
javaChunkWriteWithValueSerializers(map, buffer, valueSerializer);
} else {
genericJavaChunkWrite(fury, buffer, map);
}
}

private void javaWriteWithKeySerializers(Map map, MemoryBuffer buffer, Serializer keySerializer) {
private void javaChunkWriteWithKeySerializers(
Map map, MemoryBuffer buffer, Serializer keySerializer) {
MapChunkWriter mapChunkWriter = new MapChunkWriter(fury);
ClassResolver classResolver = fury.getClassResolver();
RefResolver refResolver = fury.getRefResolver();
for (Object object : map.entrySet()) {
Map.Entry entry = (Map.Entry) object;
mapChunkWriter = mapChunkWriter.next(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter = mapChunkWriter.finalKeyNext(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter.writeFinalKey(entry.getKey(), buffer, keySerializer);
Object value = entry.getValue();
mapChunkWriter.writeValue(
Expand All @@ -180,15 +181,15 @@ private void javaWriteWithKeySerializers(Map map, MemoryBuffer buffer, Serialize
mapChunkWriter.writeHeader(buffer);
}

private void javaWriteWithValueSerializers(
private void javaChunkWriteWithValueSerializers(
Map map, MemoryBuffer buffer, Serializer valueSerializer) {
MapChunkWriter mapChunkWriter = new MapChunkWriter(fury);
ClassResolver classResolver = fury.getClassResolver();
RefResolver refResolver = fury.getRefResolver();
for (Object object : map.entrySet()) {
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
mapChunkWriter = mapChunkWriter.next(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter = mapChunkWriter.finalValueNext(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter.writeKey(
key, buffer, classResolver, refResolver, fury.trackingRef(), keyClassInfoWriteCache);
mapChunkWriter.writeFinalValue(entry.getValue(), buffer, valueSerializer);
Expand All @@ -197,7 +198,7 @@ private void javaWriteWithValueSerializers(
mapChunkWriter.writeHeader(buffer);
}

private void javaWriteWithKVSerializers(
private void javaChunkWriteWithKVSerializers(
Fury fury,
MemoryBuffer buffer,
Map map,
Expand All @@ -208,14 +209,29 @@ private void javaWriteWithKVSerializers(
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
Object value = entry.getValue();
mapChunkWriter = mapChunkWriter.next(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter = mapChunkWriter.finalKVNext(entry.getKey(), entry.getValue(), buffer);
mapChunkWriter.writeFinalKey(key, buffer, keySerializer);
mapChunkWriter.writeFinalValue(value, buffer, valueSerializer);
mapChunkWriter.increaseChunkSize();
}
mapChunkWriter.writeHeader(buffer);
}

private void javaWriteWithKVSerializers(
Fury fury,
MemoryBuffer buffer,
Map map,
Serializer keySerializer,
Serializer valueSerializer) {
for (Object object : map.entrySet()) {
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
Object value = entry.getValue();
fury.writeRef(buffer, key, keySerializer);
fury.writeRef(buffer, value, valueSerializer);
}
}

private void genericJavaWrite(Fury fury, MemoryBuffer buffer, Map map) {
Generics generics = fury.getGenerics();
GenericType genericType = generics.nextGenericType();
Expand Down Expand Up @@ -355,7 +371,7 @@ private void javaKVTypesFinalChunkWrite(
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
Object value = entry.getValue();
mapChunkWriter = mapChunkWriter.next(key, value, buffer);
mapChunkWriter = mapChunkWriter.finalKVNext(key, value, buffer);
generics.pushGenericType(keyGenericType);
mapChunkWriter.writeFinalKey(key, buffer, keySerializer);
generics.popGenericType();
Expand Down Expand Up @@ -412,7 +428,7 @@ private void javaKeyTypeFinalChunkWrite(
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
Object value = entry.getValue();
mapChunkWriter = mapChunkWriter.next(key, value, buffer);
mapChunkWriter = mapChunkWriter.finalKeyNext(key, value, buffer);
generics.pushGenericType(keyGenericType);
mapChunkWriter.writeFinalKey(key, buffer, keySerializer);
generics.popGenericType();
Expand Down Expand Up @@ -441,7 +457,7 @@ private void javaValueTypeFinalChunkWrite(
Map.Entry entry = (Map.Entry) object;
Object key = entry.getKey();
Object value = entry.getValue();
mapChunkWriter = mapChunkWriter.next(key, value, buffer);
mapChunkWriter = mapChunkWriter.finalValueNext(key, value, buffer);
generics.pushGenericType(keyGenericType);
mapChunkWriter.writeKey(
key, buffer, classResolver, refResolver, trackingKeyRef, keyClassInfoWriteCache);
Expand Down Expand Up @@ -885,8 +901,8 @@ private void javaKVTypesFinalChunkRead(
while (size > 0) {
byte chunkSize = buffer.readByte();
byte header = buffer.readByte();
mapChunkWriter.setKeySerializer(null);
mapChunkWriter.setValueSerializer(null);
mapChunkWriter.setKeyReadSerializer(null);
mapChunkWriter.setValueReadSerializer(null);
Preconditions.checkArgument(
chunkSize >= 0,
"chunkSize < 0, which means serialization protocol is not same with deserialization protocol");
Expand Down Expand Up @@ -944,8 +960,8 @@ private void javaKeyTypeFinalChunkRead(
chunkSize >= 0,
"chunkSize < 0, which means serialization protocol is not same with deserialization protocol");
byte header = buffer.readByte();
mapChunkWriter.setKeySerializer(null);
mapChunkWriter.setValueSerializer(null);
mapChunkWriter.setKeyReadSerializer(null);
mapChunkWriter.setValueReadSerializer(null);
while (chunkSize > 0) {
generics.pushGenericType(keyGenericType);
Object key = mapChunkWriter.readFinalKey(buffer, header, keySerializer);
Expand Down Expand Up @@ -1004,8 +1020,8 @@ private void javaValueTypeFinalChunkRead(
chunkSize >= 0,
"chunkSize < 0, which means serialization protocol is not same with deserialization protocol");
byte header = buffer.readByte();
mapChunkWriter.setKeySerializer(null);
mapChunkWriter.setValueSerializer(null);
mapChunkWriter.setKeyReadSerializer(null);
mapChunkWriter.setValueReadSerializer(null);
while (chunkSize > 0) {
generics.pushGenericType(keyGenericType);
Object key =
Expand Down Expand Up @@ -1080,8 +1096,8 @@ private void javaKVTypesNonFinalChunkRead(
}
} else {
byte header = buffer.readByte();
mapChunkWriter.setKeySerializer(null);
mapChunkWriter.setValueSerializer(null);
mapChunkWriter.setKeyReadSerializer(null);
mapChunkWriter.setValueReadSerializer(null);
while (chunkSize > 0) {
generics.pushGenericType(keyGenericType);
Object key =
Expand Down Expand Up @@ -1145,8 +1161,8 @@ private void generalJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, int s
}
} else {
byte header = buffer.readByte();
mapChunkWriter.setKeySerializer(null);
mapChunkWriter.setValueSerializer(null);
mapChunkWriter.setKeyReadSerializer(null);
mapChunkWriter.setValueReadSerializer(null);
while (chunkSize > 0) {
Object key =
mapChunkWriter.readKey(
Expand Down
Loading

0 comments on commit 3752339

Please sign in to comment.