Skip to content

Commit

Permalink
Implement EntryReadCountHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Dec 2, 2024
1 parent 1326dc9 commit c0750c3
Show file tree
Hide file tree
Showing 23 changed files with 273 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.util.ReferenceCounted;

public interface CachedEntry extends Entry, ReferenceCounted {
boolean addToExpectedReadCount(int delta);
boolean canEvict();
boolean matchesKey(Position key);
boolean canEvict();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ public interface Entry {
* of data reached to 0).
*/
boolean release();

default EntryReadCountHandler getReadCountHandler() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger;

public interface EntryReadCountHandler {
void setExpectedReadCount(int expectedReadCount);
int getExpectedReadCount();
boolean incrementExpectedReadCount();
void markRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,14 +946,4 @@ default Set<Position> filterDeletedMessages(Collection<? extends Position> posit
int applyMaxSizeCap(int maxEntries, long maxSizeBytes);

void updateReadStats(int readEntriesCount, long readEntriesSize);

/**
* Called when entry cannot be dispatched due to consumer being out of permits and when entry
* gets added to the replay queue. The implementation should extend the caching lifetime for the entry
* to increase the cache hit rate for replay reads
* @param entry entry that was added to replay and should be cached
*/
default void maybeCacheReplayedEntry(Entry entry) {
// no-op by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class AbstractEntryImpl<T extends AbstractEntryImpl<T>> extends Abstrac
int length;
private Position position;
private Runnable onDeallocate;
protected EntryReadCountHandlerImpl readCountHandler;

public AbstractEntryImpl(Recycler.Handle<T> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
Expand Down Expand Up @@ -147,6 +148,7 @@ protected final void deallocate() {
ledgerId = -1;
entryId = -1;
position = null;
readCountHandler = null;
beforeRecycle();
recyclerHandle.recycle(self());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.CachedEntry;
import org.apache.bookkeeper.mledger.Position;

Expand All @@ -32,14 +31,13 @@ protected CachedEntryImpl newObject(Handle<CachedEntryImpl> handle) {
}
};

private final AtomicInteger expectedReadCount = new AtomicInteger(1);

public static CachedEntryImpl create(Position position, ByteBuf data) {
public static CachedEntryImpl create(Position position, ByteBuf data, EntryReadCountHandlerImpl readCountHandler) {
CachedEntryImpl entry = RECYCLER.get();
entry.expectedReadCount.set(1);
entry.timestamp = System.nanoTime();
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.readCountHandler = readCountHandler;
entry.setDataBuffer(data.retainedDuplicate());
entry.setRefCnt(1);
return entry;
Expand All @@ -49,33 +47,16 @@ private CachedEntryImpl(Recycler.Handle<CachedEntryImpl> recyclerHandle) {
super(recyclerHandle);
}

@Override
public boolean addToExpectedReadCount(int delta) {
if (expectedReadCount.updateAndGet(v -> v >= 0 ? v + delta : -1) >= 0) {
return true;
}
return false;
}

@Override
public boolean canEvict() {
return expectedReadCount.get() < 1;
if (readCountHandler != null) {
return readCountHandler.getExpectedReadCount() < 1;
}
return true;
}

@Override
public boolean matchesKey(Position key) {
return key != null && entryId == key.getEntryId() && ledgerId == key.getLedgerId();
}

@Override
protected void refCountDecremented(int refCount, int decrement) {
if (refCount >= 1 && decrement == 1) {
expectedReadCount.decrementAndGet();
}
}

@Override
protected void beforeRecycle() {
expectedReadCount.set(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
}
};

public static EntryImpl create(LedgerEntry ledgerEntry) {
public static EntryImpl create(LedgerEntry ledgerEntry, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerEntry.getLedgerId();
entry.entryId = ledgerEntry.getEntryId();
entry.setDataBuffer(ledgerEntry.getEntryBuffer().retainedDuplicate());
entry.readCountHandler = EntryReadCountHandlerImpl.create(expectedReadCount);
entry.setRefCnt(1);
return entry;
}
Expand All @@ -56,11 +57,18 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
return create(ledgerId, entryId, data, 0);
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data, int expectedReadCount) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.setDataBuffer(data.retainedDuplicate());
if (expectedReadCount > 0) {
entry.readCountHandler = EntryReadCountHandlerImpl.create(expectedReadCount);
}
entry.setRefCnt(1);
return entry;
}
Expand All @@ -80,6 +88,7 @@ public static EntryImpl create(Entry other) {
entry.timestamp = System.nanoTime();
entry.ledgerId = other.getLedgerId();
entry.entryId = other.getEntryId();
entry.readCountHandler = (EntryReadCountHandlerImpl) other.getReadCountHandler();
entry.setDataBuffer(other.getDataBuffer().retainedDuplicate());
entry.setRefCnt(1);
return entry;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.EntryReadCountHandler;

public class EntryReadCountHandlerImpl implements EntryReadCountHandler {
private static AtomicIntegerFieldUpdater<EntryReadCountHandlerImpl> expectedReadCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(EntryReadCountHandlerImpl.class, "expectedReadCount");

private volatile int expectedReadCount;

private EntryReadCountHandlerImpl(int expectedReadCount) {
this.expectedReadCount = expectedReadCount;
}

public int getExpectedReadCount() {
return expectedReadCount;
}

@Override
public boolean incrementExpectedReadCount() {
expectedReadCountUpdater.incrementAndGet(this);
return true;
}

@Override
public void markRead() {
expectedReadCountUpdater.decrementAndGet(this);
}

public static EntryReadCountHandlerImpl create(int expectedReadCount) {
return new EntryReadCountHandlerImpl(expectedReadCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,19 @@ public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) {
rwLock.unlockRead(stamp);
}
}

public int size() {
long stamp = rwLock.tryOptimisticRead();
int size = cursors.size();
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
size = cursors.size();
} finally {
rwLock.unlockRead(stamp);
}
}
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3905,8 +3905,13 @@ public ManagedLedgerInternalStats.CursorStats getCursorStats() {
return cs;
}

@Override
public void maybeCacheReplayedEntry(Entry entry) {
ledger.entryCache.insert(entry);
public int getNumberOfCursorsAtSamePositionOrBefore() {
if (ledger.getConfig().isCacheEvictionByExpectedReadCount()) {
return ledger.getNumberOfCursorsAtSamePositionOrBefore(this);
} else if (isCacheReadEntry()) {
return 1;
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -495,7 +496,7 @@ protected ManagedLedgerInterceptor.LastEntryHandle createLastEntryHandle(LedgerH
entries.getEntry(lh.getLastAddConfirmed());
if (ledgerEntry != null) {
promise.complete(
Optional.of(EntryImpl.create(ledgerEntry)));
Optional.of(EntryImpl.create(ledgerEntry, 0)));
} else {
promise.complete(Optional.empty());
}
Expand Down Expand Up @@ -2163,20 +2164,17 @@ protected void asyncReadEntry(ReadHandle ledger, Position position, ReadEntryCal

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
IntSupplier expectedReadCount = () -> opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore();
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
// TODO: we should not use Integer.MAX_VALUE here, it's just a workaround for now
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry,
e -> opReadEntry.cursor.isCacheReadEntry() ? Integer.MAX_VALUE : 0, readCallback, readOpCount);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, readCallback, readOpCount);
} else {
// TODO: we should not use Integer.MAX_VALUE here, it's just a workaround for now
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry,
e -> opReadEntry.cursor.isCacheReadEntry() ? Integer.MAX_VALUE : 0, opReadEntry, ctx);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, expectedReadCount, opReadEntry, ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,16 @@ public void run() {

long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId();
// Don't insert to the entry cache for the ShadowManagedLedger
if (!(ml instanceof ShadowManagedLedgerImpl) && ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
if (!(ml instanceof ShadowManagedLedgerImpl)) {
int expectedReadCount = ml.getActiveCursors().size();
if (expectedReadCount > 0) {
// Avoid caching entries if no cursor has been created
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
}
}

Position lastEntry = PositionFactory.create(ledgerId, entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.function.ToIntFunction;
import java.util.function.IntSupplier;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
Expand Down Expand Up @@ -89,7 +89,7 @@ public interface EntryCache {
* @param ctx
* the context object
*/
void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, ToIntFunction<Entry> expectedReadCount,
void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount,
ReadEntriesCallback callback, Object ctx);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.ToIntFunction;
import java.util.function.IntSupplier;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void clear() {
}

@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, ToIntFunction<Entry> expectedReadCount,
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
Expand All @@ -78,7 +78,8 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, ToInt
try {
for (LedgerEntry e : ledgerEntries) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
EntryImpl entry =
RangeEntryCacheManagerImpl.create(e, interceptor, 0);
entries.add(entry);
totalSize += entry.getLength();
}
Expand Down Expand Up @@ -111,7 +112,8 @@ public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.Read
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
EntryImpl returnEntry =
RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor, 0);

ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength());
ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
Expand Down
Loading

0 comments on commit c0750c3

Please sign in to comment.