Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-327: Support force topic loading for unrecoverable errors #21759

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private boolean ledgerForceRecovery;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
Expand Down Expand Up @@ -465,6 +466,17 @@ public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}

/**
* Skip managed ledger failure to recover managed ledger forcefully.
*/
public boolean isLedgerForceRecovery() {
return ledgerForceRecovery;
}

public void setLedgerForceRecovery(boolean ledgerForceRecovery) {
this.ledgerForceRecovery = ledgerForceRecovery;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class ManagedCursorImpl implements ManagedCursor {

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
private boolean ledgerForceRecovery;

// Stat of the cursor z-node
// NOTE: Don't update cursorLedgerStat alone,
Expand Down Expand Up @@ -332,6 +333,7 @@ public interface VoidCallback {
markDeleteLimiter = null;
}
this.mbean = new ManagedCursorMXBeanImpl(this);
this.ledgerForceRecovery = getConfig().isLedgerForceRecovery();
}

private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat stat) {
Expand Down Expand Up @@ -547,7 +549,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) {
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
Expand Down Expand Up @@ -575,7 +577,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) {
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -98,6 +101,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -4538,7 +4542,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}


@Test
public void testReadEntriesWithSkipDeletedEntries() throws Exception {
@Cleanup
Expand Down Expand Up @@ -4795,5 +4798,58 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
void testForceCursorRecovery() throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setLedgerForceRecovery(true);
ManagedLedger ledger = factory.open("my_test_ledger", config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-1".getBytes(Encoding));
long invalidLedger = -1L;
bk.setErrorCodeMap(invalidLedger, BKException.Code.BookieHandleNotAvailableException);
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(invalidLedger).build();
CountDownLatch latch = new CountDownLatch(1);
MutableBoolean recovered = new MutableBoolean(false);
VoidCallback callback = new VoidCallback() {
@Override
public void operationComplete() {
recovered.setValue(true);
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
recovered.setValue(false);
latch.countDown();
}
};
c1.recoverFromLedger(info, callback);
latch.await();
assertTrue(recovered.booleanValue());
}

class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();

public TestPulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
super(orderedExecutor);
}

public void setErrorCodeMap(long ledgerId, int rc) {
ledgerErrors.put(ledgerId, rc);
}

public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd,
final OpenCallback cb, final Object ctx) {
if (ledgerErrors.containsKey(lId)) {
cb.openComplete(ledgerErrors.get(lId), null, ctx);
}
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,18 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
)
private boolean autoSkipNonRecoverableData = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Skip managed ledger failure to forcefully recover managed ledger."
)
private boolean managedLedgerForceRecovery = false;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Skip schema ledger failure to forcefully recover topic successfully."
)
private boolean schemaLedgerForceRecovery = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "operation timeout while updating managed-ledger metadata."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(

return openLedger(position.getLedgerId())
.thenCompose((ledger) ->
Functions.getLedgerEntry(ledger, position.getEntryId())
Functions.getLedgerEntry(ledger, position.getEntryId(), config.isSchemaLedgerForceRecovery())
.thenCompose(entry -> closeLedger(ledger)
.thenApply(ignore -> entry)
)
Expand Down Expand Up @@ -560,7 +560,8 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
ledgerHandle.asyncAddEntry(entry.toByteArray(),
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(), -1));
future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId(), -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(entryId);
}
Expand All @@ -582,7 +583,8 @@ private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
Expand All @@ -603,7 +605,8 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId, -1));
future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId, -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
Expand All @@ -617,7 +620,8 @@ private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(), -1));
future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId(), -1,
config.isSchemaLedgerForceRecovery()));
} else {
future.complete(null);
}
Expand Down Expand Up @@ -648,12 +652,14 @@ public CompletableFuture<List<Long>> getStoreLedgerIdsBySchemaId(String schemaId
}

interface Functions {
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) {
static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry,
boolean forceRecovery) {
final CompletableFuture<LedgerEntry> future = new CompletableFuture<>();
ledger.asyncReadEntries(entry, entry,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry));
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId(), entry,
forceRecovery));
} else {
future.complete(entries.nextElement());
}
Expand Down Expand Up @@ -700,7 +706,8 @@ static class LocatorEntry {
}
}

public static Exception bkException(String operation, int rc, long ledgerId, long entryId) {
public static Exception bkException(String operation, int rc, long ledgerId, long entryId,
boolean forceRecovery) {
String message = org.apache.bookkeeper.client.api.BKException.getMessage(rc)
+ " - ledger=" + ledgerId + " - operation=" + operation;

Expand All @@ -709,7 +716,10 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
}
boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException
// if force-recovery is enabled then made it non-recoverable exception
// and force schema to skip this exception and recover immediately
&& !forceRecovery;
rdhabalia marked this conversation as resolved.
Show resolved Hide resolved
return new SchemaException(recoverable, message);
}

Expand All @@ -732,4 +742,4 @@ public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(Completabl
throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
Expand All @@ -29,23 +30,29 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

@Test(groups = "broker")
public class BookkeeperSchemaStorageTest {

@Test
public void testBkException() {
Exception ex = bkException("test", BKException.Code.ReadException, 1, -1);
Exception ex = bkException("test", BKException.Code.ReadException, 1, -1, false);
assertEquals("Error while reading ledger - ledger=1 - operation=test", ex.getMessage());
ex = bkException("test", BKException.Code.ReadException, 1, 0);
ex = bkException("test", BKException.Code.ReadException, 1, 0, false);
assertEquals("Error while reading ledger - ledger=1 - operation=test - entry=0",
ex.getMessage());
ex = bkException("test", BKException.Code.QuorumException, 1, -1);
ex = bkException("test", BKException.Code.QuorumException, 1, -1, false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 - operation=test",
ex.getMessage());
ex = bkException("test", BKException.Code.QuorumException, 1, 0);
ex = bkException("test", BKException.Code.QuorumException, 1, 0, false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 - operation=test - entry=0",
ex.getMessage());
SchemaException sc = (SchemaException) bkException("test", BKException.Code.BookieHandleNotAvailableException, 1, 0, false);
assertTrue(sc.isRecoverable());
sc = (SchemaException) bkException("test", BKException.Code.BookieHandleNotAvailableException, 1, 0, true);
assertFalse(sc.isRecoverable());
}

@Test
Expand Down
Loading