diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index dfa6118c27cbc3..4696f2397b10ed 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -40,6 +40,7 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.time.Clock; @@ -47,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -75,6 +77,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; +import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; @@ -244,6 +247,8 @@ public class ManagedCursorImpl implements ManagedCursor { // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. private volatile boolean isActive = false; + protected int maxPositionChunkSize = 1024 * 1024; + static class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; @@ -581,71 +586,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // Read the last entry in the ledger long lastEntryInLedger = lh.getLastAddConfirmed(); - - if (lastEntryInLedger < 0) { - log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger", - ledger.getName(), ledgerId, name); - // Rewind to last cursor snapshot available - initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); - return; - } - - lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> { - if (log.isDebugEnabled()) { - log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); - } - if (isBkErrorNotRecoverable(rc1)) { - log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), - ledgerId, name, BKException.getMessage(rc1)); - // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); - return; - } else if (rc1 != BKException.Code.OK) { - log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), - ledgerId, name, BKException.getMessage(rc1)); - - callback.operationFailed(createManagedLedgerException(rc1)); - return; - } - - LedgerEntry entry = seq.nextElement(); - mbean.addReadCursorLedgerSize(entry.getLength()); - PositionInfo positionInfo; - try { - byte[] data = entry.getEntry(); - data = decompressDataIfNeeded(data, lh); - positionInfo = PositionInfo.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - callback.operationFailed(new ManagedLedgerException(e)); - return; - } - log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, positionInfo); - - Map recoveredProperties = Collections.emptyMap(); - if (positionInfo.getPropertiesCount() > 0) { - // Recover properties map - recoveredProperties = new HashMap<>(); - for (int i = 0; i < positionInfo.getPropertiesCount(); i++) { - LongProperty property = positionInfo.getProperties(i); - recoveredProperties.put(property.getName(), property.getValue()); - } - } - - log.info("[{}] Cursor {} recovered with recoveredProperties {}, individualDeletedMessagesCount {}", - ledger.getName(), name, recoveredProperties, positionInfo.getIndividualDeletedMessagesCount()); - - PositionImpl position = new PositionImpl(positionInfo); - if (positionInfo.getIndividualDeletedMessagesCount() > 0) { - recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); - } - if (getConfig().isDeletionAtBatchIndexLevelEnabled() - && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { - recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); - } - recoveredCursor(position, recoveredProperties, cursorProperties, lh); - callback.operationComplete(); - }, null); + recoverFromLedgerByEntryId(info, callback, lh, lastEntryInLedger); }; + try { bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, null); @@ -656,6 +599,101 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac } } + private void recoverFromLedgerByEntryId(ManagedCursorInfo info, + VoidCallback callback, + LedgerHandle lh, + long entryId) { + long ledgerId = lh.getId(); + + if (entryId < 0) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No valid entries in ledger", + ledger.getName(), ledgerId, name); + // Rewind to last cursor snapshot available + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + return; + } + + lh.asyncReadEntries(entryId, entryId, (rc1, lh1, seq, ctx1) -> { + if (log.isDebugEnabled()) { + log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); + } + if (isBkErrorNotRecoverable(rc1)) { + log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + // Rewind to oldest entry available + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + return; + } else if (rc1 != BKException.Code.OK) { + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), + ledgerId, name, BKException.getMessage(rc1)); + + callback.operationFailed(createManagedLedgerException(rc1)); + return; + } + + LedgerEntry entry = seq.nextElement(); + byte[] data = entry.getEntry(); + try { + ChunkSequenceFooter chunkSequenceFooter = parseChunkSequenceFooter(data); + if (chunkSequenceFooter.numParts > 0) { + readChunkSequence(callback, lh, entryId, chunkSequenceFooter); + } else { + Throwable res = tryCompleteCursorRecovery(lh, data); + if (res == null) { + callback.operationComplete(); + } else { + log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}. " + + "Will try recovery from previous entry.", + ledger.getName(), ledgerId, entryId, name, res); + //try recovery from previous entry + recoverFromLedgerByEntryId(info, callback, lh, entryId - 1); + } + } + } catch (IOException error) { + log.error("Cannot parse footer", error); + log.warn("[{}] Error recovering from metadata ledger {} entry {} for cursor {}, cannot parse footer. " + + "Will try recovery from previous entry.", + ledger.getName(), ledgerId, entryId, name, error); + recoverFromLedgerByEntryId(info, callback, lh, entryId - 1); + } + }, null); + } + + private void readChunkSequence(VoidCallback callback, LedgerHandle lh, + long footerPosition, ChunkSequenceFooter chunkSequenceFooter) { + long startPos = footerPosition - chunkSequenceFooter.numParts; + long endPos = footerPosition - 1; + log.info("readChunkSequence from pos {}, num parts {}, startPos {}, endPos {}", + footerPosition, chunkSequenceFooter.numParts, startPos, endPos); + lh.asyncReadEntries(startPos, endPos, new AsyncCallback.ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration entries, Object ctx) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + entries.asIterator().forEachRemaining(entry -> { + log.info("pos {} len {} bytes ", entry.getEntryId(), entry.getLength()); + try { + buffer.write(entry.getEntry()); + } catch (IOException err) { + throw new RuntimeException(err); + } + }); + byte[] result = buffer.toByteArray(); + log.info("Read {} chunks, total of {} bytes, expected {} bytes", chunkSequenceFooter.numParts, + result.length, chunkSequenceFooter.length); + if (result.length != chunkSequenceFooter.length) { + callback.operationFailed(ManagedLedgerException.getManagedLedgerException(new IOException( + "Expected " + chunkSequenceFooter.length + " bytes but read " + result.length + " bytes"))); + } + Throwable res = tryCompleteCursorRecovery(lh, result); + if (res == null) { + callback.operationComplete(); + } else { + callback.operationFailed(new ManagedLedgerException(res)); + } + } + }, null); + } + @AllArgsConstructor @NoArgsConstructor @Getter @@ -675,7 +713,7 @@ private ChunkSequenceFooter parseChunkSequenceFooter(byte[] data) throws IOExcep return ObjectMapperFactory.getMapper().getObjectMapper().readValue(data, ChunkSequenceFooter.class); } - private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byte[] data) { + private Throwable tryCompleteCursorRecovery(LedgerHandle lh, byte[] data) { mbean.addReadCursorLedgerSize(data.length); try { @@ -683,8 +721,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt } catch (Throwable e) { log.error("[{}] Failed to decompress position info from ledger {} for cursor {}: {}", ledger.getName(), lh.getId(), name, e); - callback.operationFailed(new ManagedLedgerException(e)); - return; + return e; } PositionInfo positionInfo; @@ -693,8 +730,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt } catch (InvalidProtocolBufferException e) { log.error("[{}] Failed to parse position info from ledger {} for cursor {}: {}", ledger.getName(), lh.getId(), name, e); - callback.operationFailed(new ManagedLedgerException(e)); - return; + return e; } Map recoveredProperties = Collections.emptyMap(); @@ -716,7 +752,7 @@ private void completeCursorRecovery(VoidCallback callback, LedgerHandle lh, byt recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } recoveredCursor(position, recoveredProperties, cursorProperties, lh); - callback.operationComplete(); + return null; } private void recoverIndividualDeletedMessages(List individualDeletedMessagesList) { @@ -3282,6 +3318,7 @@ private void buildBatchEntryDeletionIndexInfoList( } void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { + checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be greater than zero"); long now = System.nanoTime(); PositionImpl position = mdEntry.newPosition; @@ -3302,10 +3339,9 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin long endCompress = System.nanoTime(); - int maxSize = 1024 * 1024; int offset = 0; final int len = data.readableBytes(); - int numParts = 1 + (len / maxSize); + int numParts = 1 + (len / maxPositionChunkSize); if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}", @@ -3328,7 +3364,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin int part = 0; while (part != numParts) { int remaining = len - offset; - int currentLen = Math.min(maxSize, remaining); + int currentLen = Math.min(maxPositionChunkSize, remaining); boolean isLast = part == numParts - 1; if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 767d3c5e2e08b4..92f2927f4c9d59 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -75,6 +75,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -98,6 +99,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.util.FutureUtil; @@ -3618,6 +3620,82 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(c.getReadPosition(), readPositionBeforeRecover); assertEquals(c.getNumberOfEntries(), 2L); } + + @Test(timeOut = 20000) + public void testRecoverCursorCorruptLastEntry() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorCorruptLastEntry"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + // force chunking + c.maxPositionChunkSize = 2; + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + c.resetCursor(PositionImpl.LATEST); + //corrupt last entry + LedgerHandle cursorLedger = (LedgerHandle)FieldUtils.readDeclaredField(c, "cursorLedger", true); + // can't parse json + cursorLedger.addEntry("{{".getBytes()); + // can't parse PositionInfo protobuf + cursorLedger.addEntry("aa".getBytes()); + + assertEquals(c.getMarkDeletedPosition().getEntryId(), 3); + assertEquals(c.getReadPosition().getEntryId(), 4); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3); + + // Publish messages to move the lastConfirmedEntry field forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 2L); + } + @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4f521f1e99e91d..89728b435dcc84 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3256,7 +3256,7 @@ public void testManagedLedgerWithAddEntryTimeOut() throws Exception { class MockLedgerHandle extends PulsarMockLedgerHandle { public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { - super(bk, id, digest, passwd); + super(bk, id, digest, passwd, null); } @Override