Skip to content

Commit

Permalink
fix(java): fix big buffer trunc (#1402)
Browse files Browse the repository at this point in the history
fix big buffer trunc introduced in #1397, which reset buffer before
copying data from it
  • Loading branch information
chaokunyang authored Mar 11, 2024
1 parent 27b6249 commit c4c65b5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 13 deletions.
40 changes: 27 additions & 13 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,19 @@ public byte[] serialize(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serialize(buf, obj, null);
return buf.getBytes(0, buf.writerIndex());
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
}

/** Return serialized <code>obj</code> as a byte array. */
public byte[] serialize(Object obj, BufferCallback callback) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serialize(buf, obj, callback);
return buf.getBytes(0, buf.writerIndex());
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
}

@Override
Expand Down Expand Up @@ -262,12 +266,13 @@ public void serialize(OutputStream outputStream, Object obj, BufferCallback call
buf.writerIndex(0);
buf.writeInt(-1);
serialize(buf, obj, callback);

buf.putInt(0, buf.writerIndex() - 4);
try {
outputStream.write(buf.getBytes(0, buf.writerIndex()));
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resetBuffer();
}
}

Expand Down Expand Up @@ -296,6 +301,13 @@ private MemoryBuffer getBuffer() {
return buf;
}

private void resetBuffer() {
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
}
}

private void write(MemoryBuffer buffer, Object obj) {
if (config.shareMetaContext()) {
int startOffset = buffer.writerIndex();
Expand Down Expand Up @@ -752,6 +764,8 @@ public Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfB
return deserialize(buf, outOfBandBuffers);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resetBuffer();
}
}

Expand Down Expand Up @@ -974,7 +988,9 @@ public byte[] serializeJavaObject(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serializeJavaObject(buf, obj);
return buf.getBytes(0, buf.writerIndex());
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
}

@Override
Expand Down Expand Up @@ -1058,7 +1074,9 @@ public byte[] serializeJavaObjectAndClass(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serializeJavaObjectAndClass(buf, obj);
return buf.getBytes(0, buf.writerIndex());
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
}

/**
Expand Down Expand Up @@ -1146,6 +1164,8 @@ private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer>
outputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resetBuffer();
}
}
}
Expand All @@ -1172,6 +1192,8 @@ private Object deserializeFromStream(
return o;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resetBuffer();
}
}

Expand Down Expand Up @@ -1218,10 +1240,6 @@ public void resetWrite() {
nativeObjects.clear();
bufferCallback = null;
depth = 0;
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
}
}

public void resetRead() {
Expand All @@ -1232,10 +1250,6 @@ public void resetRead() {
nativeObjects.clear();
peerOutOfBandEnabled = false;
depth = 0;
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
}
}

private void checkDepthForSerialization() {
Expand Down
41 changes: 41 additions & 0 deletions java/fury-core/src/test/java/org/apache/fury/FuryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.fury.type.Descriptor;
import org.apache.fury.util.DateTimeUtils;
import org.apache.fury.util.Platform;
import org.apache.fury.util.ReflectionUtils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -622,4 +623,44 @@ public void testPkgAccessLevelParentClass() {
table.put("r", "c", 100);
serDeCheckSerializer(fury, table, "Codec");
}

@Test
public void testBufferReset() {
Fury fury = Fury.builder().withRefTracking(true).requireClassRegistration(false).build();
byte[] bytes = fury.serialize(new byte[1000 * 1000]);
checkBuffer(fury);
assertEquals(fury.deserialize(bytes), new byte[1000 * 1000]);
bytes = fury.serializeJavaObject(new byte[1000 * 1000]);
checkBuffer(fury);
assertEquals(fury.deserializeJavaObject(bytes, byte[].class), new byte[1000 * 1000]);

bytes = fury.serializeJavaObjectAndClass(new byte[1000 * 1000]);
checkBuffer(fury);
assertEquals(fury.deserializeJavaObjectAndClass(bytes), new byte[1000 * 1000]);

ByteArrayOutputStream bas = new ByteArrayOutputStream();
fury.serialize(bas, new byte[1000 * 1000]);
checkBuffer(fury);
Object o = fury.deserialize(new ByteArrayInputStream(bas.toByteArray()));
assertEquals(o, new byte[1000 * 1000]);

bas.reset();
fury.serializeJavaObject(bas, new byte[1000 * 1000]);
checkBuffer(fury);
o = fury.deserializeJavaObject(new ByteArrayInputStream(bas.toByteArray()), byte[].class);
assertEquals(o, new byte[1000 * 1000]);

bas.reset();
fury.serializeJavaObjectAndClass(bas, new byte[1000 * 1000]);
checkBuffer(fury);
o = fury.deserializeJavaObjectAndClass(new ByteArrayInputStream(bas.toByteArray()));
assertEquals(o, new byte[1000 * 1000]);
}

private void checkBuffer(Fury fury) {
Object buf = ReflectionUtils.getObjectFieldValue(fury, "buffer");
MemoryBuffer buffer = (MemoryBuffer) buf;
assert buffer != null;
assertTrue(buffer.size() < 1000 * 1000);
}
}

0 comments on commit c4c65b5

Please sign in to comment.