Skip to content

Commit

Permalink
[improve][broker] PIP-327: Support force topic loading for unrecovera…
Browse files Browse the repository at this point in the history
…ble errors
  • Loading branch information
rdhabalia committed Dec 19, 2023
1 parent f970534 commit eaaee76
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private boolean ledgerForceRecovery;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
Expand Down Expand Up @@ -464,6 +465,17 @@ public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}

/**
* Skip managed ledger failure to recover managed ledger forcefully.
*/
public boolean isLedgerForceRecovery() {
return ledgerForceRecovery;
}

public void setLedgerForceRecovery(boolean ledgerForceRecovery) {
this.ledgerForceRecovery = ledgerForceRecovery;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class ManagedCursorImpl implements ManagedCursor {

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
private boolean ledgerForceRecovery;

// Stat of the cursor z-node
// NOTE: Don't update cursorLedgerStat alone,
Expand Down Expand Up @@ -328,6 +329,7 @@ public interface VoidCallback {
markDeleteLimiter = null;
}
this.mbean = new ManagedCursorMXBeanImpl(this);
this.ledgerForceRecovery = config.isLedgerForceRecovery();
}

private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
Expand Down Expand Up @@ -531,7 +533,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) {
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
Expand Down Expand Up @@ -559,7 +561,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -68,10 +69,13 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -94,6 +98,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -4484,5 +4489,58 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
void testForceCursorRecovery() throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setLedgerForceRecovery(true);
ManagedLedger ledger = factory.open("my_test_ledger", config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-1".getBytes(Encoding));
long invalidLedger = -1L;
bk.setErrorCodeMap(invalidLedger, BKException.Code.BookieHandleNotAvailableException);
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(invalidLedger).build();
CountDownLatch latch = new CountDownLatch(1);
MutableBoolean recovered = new MutableBoolean(false);
VoidCallback callback = new VoidCallback() {
@Override
public void operationComplete() {
recovered.setValue(true);
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
recovered.setValue(false);
latch.countDown();
}
};
c1.recoverFromLedger(info, callback);
latch.await();
assertTrue(recovered.booleanValue());
}

class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();

public TestPulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
super(orderedExecutor);
}

public void setErrorCodeMap(long ledgerId, int rc) {
ledgerErrors.put(ledgerId, rc);
}

public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
if (ledgerErrors.containsKey(lId)) {
cb.openComplete(ledgerErrors.get(lId), null, ctx);
}
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
)
private boolean autoSkipNonRecoverableData = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Skip managed ledger failure to forcefully recover managed ledger."
)
private boolean managedLedgerForceRecovery = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "operation timeout while updating managed-ledger metadata."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit eaaee76

Please sign in to comment.