Skip to content

Commit

Permalink
[improve][broker] PIP-327: Support force topic loading for unrecovera…
Browse files Browse the repository at this point in the history
…ble errors
  • Loading branch information
rdhabalia committed Oct 3, 2024
1 parent eee9283 commit 97ccefa
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private boolean ledgerForceRecovery;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
Expand Down Expand Up @@ -465,6 +466,17 @@ public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}

/**
* Skip managed ledger failure to recover managed ledger forcefully.
*/
public boolean isLedgerForceRecovery() {
return ledgerForceRecovery;
}

public void setLedgerForceRecovery(boolean ledgerForceRecovery) {
this.ledgerForceRecovery = ledgerForceRecovery;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class ManagedCursorImpl implements ManagedCursor {

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
private boolean ledgerForceRecovery;

// Stat of the cursor z-node
// NOTE: Don't update cursorLedgerStat alone,
Expand Down Expand Up @@ -332,6 +333,7 @@ public interface VoidCallback {
markDeleteLimiter = null;
}
this.mbean = new ManagedCursorMXBeanImpl(this);
this.ledgerForceRecovery = getConfig().isLedgerForceRecovery();
}

private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
Expand Down Expand Up @@ -547,7 +549,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) {
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
Expand Down Expand Up @@ -575,7 +577,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -98,6 +101,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -4538,7 +4542,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}


@Test
public void testReadEntriesWithSkipDeletedEntries() throws Exception {
@Cleanup
Expand Down Expand Up @@ -4795,5 +4798,58 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
void testForceCursorRecovery() throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setLedgerForceRecovery(true);
ManagedLedger ledger = factory.open("my_test_ledger", config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-1".getBytes(Encoding));
long invalidLedger = -1L;
bk.setErrorCodeMap(invalidLedger, BKException.Code.BookieHandleNotAvailableException);
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(invalidLedger).build();
CountDownLatch latch = new CountDownLatch(1);
MutableBoolean recovered = new MutableBoolean(false);
VoidCallback callback = new VoidCallback() {
@Override
public void operationComplete() {
recovered.setValue(true);
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
recovered.setValue(false);
latch.countDown();
}
};
c1.recoverFromLedger(info, callback);
latch.await();
assertTrue(recovered.booleanValue());
}

class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();

public TestPulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
super(orderedExecutor);
}

public void setErrorCodeMap(long ledgerId, int rc) {
ledgerErrors.put(ledgerId, rc);
}

public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
if (ledgerErrors.containsKey(lId)) {
cb.openComplete(ledgerErrors.get(lId), null, ctx);
}
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,18 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
)
private boolean autoSkipNonRecoverableData = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Skip managed ledger failure to forcefully recover managed ledger."
)
private boolean managedLedgerForceRecovery = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Skip schema ledger failure to forcefully recover topic successfully."
)
private boolean schemaLedgerForceRecovery = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "operation timeout while updating managed-ledger metadata."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(

return openLedger(position.getLedgerId())
.thenCompose((ledger) ->
Functions.getLedgerEntry(ledger, position.getEntryId())
Functions.getLedgerEntry(ledger, position.getEntryId(), config.isSchemaLedgerForceRecovery())
.thenCompose(entry -> closeLedger(ledger)
.thenApply(ignore -> entry)
)
Expand Down Expand Up @@ -560,7 +560,8 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
ledgerHandle.asyncAddEntry(entry.toByteArray(),
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(), -1));
future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(), -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(entryId);
}
Expand All @@ -582,7 +583,8 @@ private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
Expand All @@ -603,7 +605,8 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId, -1));
future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId, -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
Expand All @@ -617,7 +620,8 @@ private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(), -1));
future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(), -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(null);
}
Expand Down Expand Up @@ -648,12 +652,14 @@ public CompletableFuture<List<Long>> getStoreLedgerIdsBySchemaId(String schemaId
}

interface Functions {
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry,
boolean forceRecovery) {
final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
ledger.asyncReadEntries(entry, entry,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry));
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry,
forceRecovery));
} else {
future.complete(entries.nextElement());
}
Expand Down Expand Up @@ -700,7 +706,8 @@ static class LocatorEntry {
}
}

public static Exception bkException(String operation, int rc, long ledgerId, long entryId) {
public static Exception bkException(String operation, int rc, long ledgerId, long entryId,
boolean forceRecovery) {
String message = org.apache.bookkeeper.client.api.BKException.getMessage(rc)
+ " - ledger=" + ledgerId + " - operation=" + operation;

Expand All @@ -709,7 +716,10 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
}
boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException
// if force-recovery is enabled then made it non-recoverable exception
// and force schema to skip this exception and recover immediately
&& !forceRecovery;
return new SchemaException(recoverable, message);
}

Expand All @@ -732,4 +742,4 @@ public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(Completabl
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
Expand All @@ -29,23 +30,29 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

@Test(groups = "broker")
public class BookkeeperSchemaStorageTest {

@Test
public void testBkException() {
Exception ex = bkException("test", BKException.Code.ReadException, 1, -1);
Exception ex = bkException("test", BKException.Code.ReadException, 1, -1, false);
assertEquals("Error while reading ledger - ledger=1 - operation=test", ex.getMessage());
ex = bkException("test", BKException.Code.ReadException, 1, 0);
ex = bkException("test", BKException.Code.ReadException, 1, 0, false);
assertEquals("Error while reading ledger - ledger=1 - operation=test - entry=0",
ex.getMessage());
ex = bkException("test", BKException.Code.QuorumException, 1, -1);
ex = bkException("test", BKException.Code.QuorumException, 1, -1, false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 - operation=test",
ex.getMessage());
ex = bkException("test", BKException.Code.QuorumException, 1, 0);
ex = bkException("test", BKException.Code.QuorumException, 1, 0, false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 - operation=test - entry=0",
ex.getMessage());
SchemaException sc = (SchemaException) bkException("test", BKException.Code.BookieHandleNotAvailableException, 1, 0, false);
assertTrue(sc.isRecoverable());
sc = (SchemaException) bkException("test", BKException.Code.BookieHandleNotAvailableException, 1, 0, true);
assertFalse(sc.isRecoverable());
}

@Test
Expand Down

0 comments on commit 97ccefa

Please sign in to comment.