From 1397fafb04429a4ce2509758622630b0aa0a775c Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Mon, 23 Sep 2024 15:50:40 -0700 Subject: [PATCH] removed usage of byte[] where possible --- .../mledger/impl/ManagedCursorImpl.java | 84 +++++++++++-------- .../mledger/impl/ManagedCursorTest.java | 6 +- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 870a1ea224791..82d56f81ce3ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -39,10 +39,8 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.util.ArrayDeque; import java.util.ArrayList; @@ -636,7 +634,7 @@ private void recoverFromLedgerByEntryId(ManagedCursorInfo info, } LedgerEntry entry = seq.nextElement(); - byte[] data = entry.getEntry(); + ByteBuf data = entry.getEntryBuffer(); try { ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data); if (chunkSequenceFooter.numParts > 0) { @@ -672,23 +670,28 @@ private void readChunkSequence(VoidCallback callback, LedgerHandle lh, lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() { @Override public void readComplete(int rc, LedgerHandle lh, Enumeration entries, Object ctx) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + CompositeByteBuf buffer = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + + AtomicInteger readableBytes = new AtomicInteger(0); entries.asIterator().forEachRemaining(entry -> { - log.info("pos {} len {} bytes ", entry.getEntryId(), entry.getLength()); - try { - buffer.write(entry.getEntry()); - } catch (IOException err) { - throw new RuntimeException(err); + if (log.isInfoEnabled()) { + log.debug("pos {} len {} bytes ", entry.getEntryId(), entry.getLength()); } + ByteBuf part = entry.getEntryBuffer(); + buffer.addComponent(part); + readableBytes.addAndGet(part.readableBytes()); }); - byte[] result = buffer.toByteArray(); + buffer.writerIndex(readableBytes.get()) + .readerIndex(0); + log.info("Read {} chunks, total of {} bytes, expected {} bytes", chunkSequenceFooter.numParts, - result.length, chunkSequenceFooter.length); - if (result.length != chunkSequenceFooter.length) { + buffer.readableBytes(), chunkSequenceFooter.length); + if (buffer.readableBytes() != chunkSequenceFooter.length) { callback.operationFailed(ManagedLedgerException.getManagedLedgerException(new IOException( - "Expected " + chunkSequenceFooter.length + " bytes but read " + result.length + " bytes"))); + "Expected " + chunkSequenceFooter.length + " bytes but read " + + buffer.readableBytes() + " bytes"))); } - Throwable res = tryCompleteCursorRecovery(lh, result); + Throwable res = tryCompleteCursorRecovery(lh, buffer); if (res == null) { callback.operationComplete(); } else { @@ -709,20 +712,28 @@ public static final class ChunkSequenceFooter { private int length; } - private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOException { - if (data.length == 0 || data[0] != '{') { + private ChunkSequenceFooter parseChunkSequenceFooter(ByteBuf data) throws IOException { + // getChar() doesn't move the reader index + if (data.readableBytes() == 0 || data.getByte(0) != '{') { // this is not JSON return ChunkSequenceFooter.NOT_CHUNKED; } - return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class); + + try { + return ObjectMapperFactory.getMapper().getObjectMapper() + .readValue(data.toString(StandardCharsets.UTF_8), ChunkSequenceFooter.class); + } catch (JsonProcessingException e) { + return ChunkSequenceFooter.NOT_CHUNKED; + } } - private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) { - mbean.addReadCursorLedgerSize(data.length); + private Throwable tryCompleteCursorRecovery(LedgerHandle lh, ByteBuf data) { + mbean.addReadCursorLedgerSize(data.readableBytes()); try { data = decompressDataIfNeeded(data, lh); } catch (Throwable e) { + data.release(); log.error("[{}] Failed to decompress position info from ledger {} for cursor {}: {}", ledger.getName(), lh.getId(), name, e); return e; @@ -730,11 +741,13 @@ private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) { PositionInfo positionInfo; try { - positionInfo = PositionInfo.parseFrom(data); + positionInfo = PositionInfo.parseFrom(data.nioBuffer()); } catch (InvalidProtocolBufferException e) { log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(), lh.getId(), name, e); return e; + } finally { + data.release(); } Map recoveredProperties = Collections.emptyMap(); @@ -3492,42 +3505,39 @@ private ByteBuf compressDataIfNeeded(ByteBuf data, LedgerHandle lh) { result.readerIndex(0) .writerIndex(4 + compressedSize); - int ratio = (int) (compressedSize * 100.0 / uncompressedSize); - log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)", - ledger.getName(), name, compressedSize, pulsarCursorInfoCompressionString, uncompressedSize, ratio); + if (log.isInfoEnabled()) { + int ratio = (int) (compressedSize * 100.0 / uncompressedSize); + log.info("[{}] Cursor {} Compressed data size {} bytes (with {}, original size {} bytes, ratio {}%)", + ledger.getName(), name, compressedSize, pulsarCursorInfoCompressionString, + uncompressedSize, ratio); + } return result; } finally { data.release(); } } - static byte[] decompressDataIfNeeded(byte[] data, LedgerHandle lh) { + static ByteBuf decompressDataIfNeeded(ByteBuf data, LedgerHandle lh) { byte[] pulsarCursorInfoCompression = lh.getCustomMetadata().get(METADATA_PROPERTY_CURSOR_COMPRESSION_TYPE); if (pulsarCursorInfoCompression != null) { String pulsarCursorInfoCompressionString = new String(pulsarCursorInfoCompression); if (log.isDebugEnabled()) { log.debug("Ledger {} compression {} decompressing {} bytes, full {}", - lh.getId(), pulsarCursorInfoCompressionString, data.length, + lh.getId(), pulsarCursorInfoCompressionString, data.readableBytes(), ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data))); } - ByteArrayInputStream input = new ByteArrayInputStream(data); - DataInputStream dataInputStream = new DataInputStream(input); try { - int uncompressedSize = dataInputStream.readInt(); - byte[] compressedData = dataInputStream.readAllBytes(); + // this moves readerIndex + int uncompressedSize = data.readInt(); CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec( CompressionType.valueOf(pulsarCursorInfoCompressionString)); - ByteBuf decode = compressionCodec.decode(Unpooled.wrappedBuffer(compressedData), uncompressedSize); - try { - return ByteBufUtil.getBytes(decode); - } finally { - decode.release(); - } + ByteBuf decode = compressionCodec.decode(data, uncompressedSize); + return decode; } catch (IOException | MalformedInputException error) { log.error("Cannot decompress cursor position using {}. Payload is {}", pulsarCursorInfoCompressionString, - ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(data)), error); + ByteBufUtil.prettyHexDump(data), error); throw new RuntimeException(error); } } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index a46193e93292c..96c6a3e2cd42f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -149,7 +149,7 @@ public void testCloseCursor() throws Exception { ledger.addEntry(new byte[]{3}); ledger.addEntry(new byte[]{4}); ledger.addEntry(new byte[]{5}); - // Persistent cursor info to ledger. + // Persist cursor info to ledger. c1.delete(PositionFactory.create(c1.getReadPosition().getLedgerId(), c1.getReadPosition().getEntryId())); Awaitility.await().until(() ->c1.getStats().getPersistLedgerSucceed() > 0); // Make cursor ledger can not work. @@ -3276,9 +3276,9 @@ public void operationFailed(MetaStoreException e) { try { LedgerEntry entry = seq.nextElement(); PositionInfo positionInfo; - byte[] data = entry.getEntry(); + ByteBuf data = entry.getEntryBuffer(); data = ManagedCursorImpl.decompressDataIfNeeded(data, lh); - positionInfo = PositionInfo.parseFrom(data); + positionInfo = PositionInfo.parseFrom(data.nioBuffer()); individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount()); } catch (Exception e) { }