Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lh fix individual read deduplication and limiting #195

Open
wants to merge 83 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
163707f
Add validation to getManagedLedgerMaxReadsInFlightSizeInMB
lhotari Oct 21, 2024
f046777
Don't divide by 0 if it can be avoided
lhotari Oct 21, 2024
0f9c5e8
Estimate entry size for InflightReadsLimiter by keeping stats
lhotari Oct 21, 2024
8c06ef3
Limit replay messages by bytes size
lhotari Oct 21, 2024
4b94fab
Test limiting by bytes size
lhotari Oct 21, 2024
9ad9b95
Test
lhotari Oct 21, 2024
40ea8e8
Avoid exception which commonly happens
lhotari Oct 22, 2024
b0f4220
Add debug logging to InflightReadsLimiter
lhotari Oct 22, 2024
49c614a
Clear the cache in the test
lhotari Oct 22, 2024
c253f4f
Add logging
lhotari Jan 23, 2025
170131c
Fix read limits for individual reads
lhotari Oct 22, 2024
9e930b9
Revert "Test"
lhotari Oct 22, 2024
09fdbf3
Reduce timeout
lhotari Jan 23, 2025
1948950
Apply cap for delayed messages too
lhotari Oct 23, 2024
7dd5d4c
Improve asyncReplayEntries
lhotari Oct 23, 2024
0580094
Refactor to have a single replay method
lhotari Oct 23, 2024
795f266
Optimize filtering deleted messages
lhotari Jan 23, 2025
8829edd
deprecate synchronous replayEntries method
lhotari Oct 23, 2024
e969e46
move asyncReplayEntries without sortEntries as a default method
lhotari Oct 23, 2024
0ccc8fa
Perform replay reads in ranges
lhotari Oct 23, 2024
9da2126
Make test to fail
lhotari Jan 23, 2025
eaefca5
Fix disabled timeout in InflightReadsLimiter
lhotari Oct 23, 2024
7c9d365
Add queuing to InflightReadsLimiter when the limit is reached
lhotari Nov 1, 2024
602a026
revisit
lhotari Nov 1, 2024
aa26373
Ensure that cached entry readerIndex is not tied to the original one
lhotari Nov 1, 2024
7d5a239
Adjust InflightReadsLimiterTest
lhotari Nov 4, 2024
87ac319
Add timeout executor to InflightReadsLimiter
lhotari Nov 1, 2024
8aad6c8
Polish
lhotari Nov 4, 2024
204f2b7
Fix test
lhotari Nov 4, 2024
8932caa
Instantiate ArrayList directly
lhotari Nov 4, 2024
1a664b3
Support caching replayed entries
lhotari Nov 4, 2024
7002102
Start addressing the removal issue with a removal queue
lhotari Nov 4, 2024
c8d371a
Move EntryWrapper to upper level
lhotari Nov 4, 2024
45f6eb3
Move RangeCache to cache package
lhotari Nov 4, 2024
9e21d72
Start adding removal queue
lhotari Nov 4, 2024
cbcf417
Move towards adding removal queue changes
lhotari Nov 4, 2024
841ca6b
More moves towards removal queue solution
lhotari Nov 4, 2024
25c32eb
Handle removing by size in scheduled task so that blocking would be rare
lhotari Nov 4, 2024
afc3ed6
Remove unused code
lhotari Nov 5, 2024
a83f1d9
Update RangeCacheTest with removal queue
lhotari Nov 5, 2024
6d3d4eb
Fix imports
lhotari Nov 5, 2024
c93b93f
Add license headers
lhotari Nov 5, 2024
af806bd
Handle all cache evictions by the same thread
lhotari Nov 5, 2024
4789d7c
Move RangeCacheRemovalCounters to top level
lhotari Nov 8, 2024
61db0a0
Use removal queue in RangeCacheTest
lhotari Nov 8, 2024
e582d2b
Handle eviction
lhotari Nov 10, 2024
67f01ed
Handle adding atomically to removal queue and cache
lhotari Nov 12, 2024
70982a5
Improve javadoc
lhotari Nov 12, 2024
b459760
Use unbounded queue for removal queue since cache size is bounded by …
lhotari Nov 12, 2024
bde197f
Refactor eviction when cache size exceeds evictionTriggerThreshold
lhotari Nov 12, 2024
72d3c6f
Start adding cacheEvictionByExpectedReadCount
lhotari Nov 12, 2024
c80e1d5
Add CachedEntry
lhotari Nov 12, 2024
4fafdd9
Activate cursor when consumers disconnect and connect
lhotari Nov 12, 2024
a70d158
disable checkCursorsToCacheEntries when eviction by read count is ena…
lhotari Nov 12, 2024
4eb1ad6
Always keep cursors with connected consumers in "active" state when e…
lhotari Nov 12, 2024
6593953
Add a way to find cursors before current cursor
lhotari Nov 12, 2024
346be3c
Pass predicate for deciding whether to cache or not
lhotari Nov 12, 2024
3290e52
Reduce coupling to EntryImpl
lhotari Nov 12, 2024
4265481
Extract abstract base class to be used for Entry implementations
lhotari Nov 12, 2024
2b157e9
Introduce CachedEntry for eviction by read count implementation
lhotari Nov 12, 2024
c4aa461
Remove generics type parameters from RangeCache since there's no need…
lhotari Nov 13, 2024
ae67445
Add TODO about the solution for handling skipping of the entries
lhotari Nov 13, 2024
77c4229
Add note of replacing the use of peek
lhotari Nov 13, 2024
d85a6b1
update design
lhotari Nov 13, 2024
7d49148
Evaluate entry size when it gets added
lhotari Nov 14, 2024
3ab4485
Fix issue in getting the entry
lhotari Nov 14, 2024
ba3e1b8
Fix RangeCacheTest
lhotari Nov 14, 2024
d0f971a
Remove generics
lhotari Nov 14, 2024
245a441
Add simple stash
lhotari Nov 14, 2024
a63bd65
Add note
lhotari Nov 14, 2024
43dbcff
Fix test that expects old defaults
lhotari Nov 14, 2024
8133704
Adapt test to new behavior
lhotari Nov 14, 2024
03f7939
Improve removal queue
lhotari Nov 14, 2024
6e9e8bd
Fix EntryCacheManagerTest
lhotari Nov 14, 2024
47d24bd
Support configuring ManagedLedgerConfig defaults for tests
lhotari Nov 14, 2024
e65db4e
Fix ManagedLedgerTests
lhotari Nov 15, 2024
ebefc07
Use waitForPendingCacheEvictions in EntryCacheManagerTest
lhotari Nov 15, 2024
61378b6
Use Double.NaN instead of Double.POSITIVE_INFINITY
lhotari Nov 15, 2024
2010fae
Start adding expected read count solution
lhotari Nov 18, 2024
1820673
Improve ManagedCursorContainerTest
lhotari Nov 18, 2024
cbdb691
Implement getNumberOfCursorsAtSamePositionOrBefore
lhotari Nov 18, 2024
a8ea5e7
Implement EntryReadCountHandler
lhotari Nov 19, 2024
b13f910
Implement decreasing read count on release
lhotari Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import io.netty.util.ReferenceCounted;

public interface CachedEntry extends Entry, ReferenceCounted {
boolean matchesKey(Position key);
boolean canEvict();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public interface Entry {
/**
* @return the data
*/
@Deprecated
byte[] getData();

@Deprecated
byte[] getDataAndRelease();

/**
Expand Down Expand Up @@ -66,4 +68,8 @@ public interface Entry {
* of data reached to 0).
*/
boolean release();

default EntryReadCountHandler getReadCountHandler() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public interface EntryReadCountHandler {
int getExpectedReadCount();
boolean incrementExpectedReadCount();
void markRead();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.bookkeeper.mledger;

import com.google.common.collect.Range;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -718,6 +721,7 @@ default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicat
* @throws InterruptedException
* @throws ManagedLedgerException
*/
@Deprecated
List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException;

Expand All @@ -733,8 +737,10 @@ List<Entry> replayEntries(Set<? extends Position> positions)
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
Set<? extends Position> asyncReplayEntries(
Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
default Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions,
ReadEntriesCallback callback, Object ctx) {
return asyncReplayEntries(positions, callback, ctx, false);
}

/**
* Read the specified set of positions from ManagedLedger.
Expand All @@ -753,6 +759,28 @@ Set<? extends Position> asyncReplayEntries(
Set<? extends Position> asyncReplayEntries(
Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx, boolean sortEntries);

/**
* Read the specified set of positions from ManagedLedger in ranges.
* This method is used to read entries in ranges to avoid reading all entries at once.
*
* @param positions
* set of positions to read
* @param callback
* callback object returning the result of each range
* @param ctx
* opaque context
* @param invokeCallbacksInOrder
* when true, the callback will be invoked in order of the positions, otherwise the callback will be
* invoked in the order of the completion of the range read.
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
default Set<? extends Position> asyncReplayEntriesInRanges(SortedSet<? extends Position> positions,
ManagedCursorReplayReadEntriesCallback callback,
Object ctx, boolean invokeCallbacksInOrder) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Close the cursor and releases the associated resources.
*
Expand Down Expand Up @@ -918,6 +946,24 @@ default ManagedCursorAttributes getManagedCursorAttributes() {

boolean isMessageDeleted(Position position);

/**
* Returns the deleted messages from the given positions.
* Implementation classes can override this method to provide a more efficient way to filter deleted messages.
*
* @param positions the positions to filter
* @return the set of deleted positions
*/
default Set<Position> filterDeletedMessages(Collection<? extends Position> positions) {
Set<Position> deletedPositions = new HashSet<>();
// prefer for loop to avoid creating stream related instances
for (Position position : positions) {
if (isMessageDeleted(position)) {
deletedPositions.add(position);
}
}
return deletedPositions;
}

ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException;

long[] getBatchPositionAckSet(Position position);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.util.List;

public interface ManagedCursorReplayReadEntriesCallback {
void readEntriesComplete(ManagedCursorReplayReadRange range, boolean isLast, List<Entry> entries, Object ctx);

void readEntriesFailed(ManagedCursorReplayReadRange range, boolean isLast, ManagedLedgerException exception,
Object ctx);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public interface ManagedCursorReplayReadRange extends Comparable<ManagedCursorReplayReadRange> {
int rangeIndex();

int totalRanges();

Position startPosition();

Position lastPosition();

default int size() {
if (startPosition().getLedgerId() != lastPosition().getLedgerId()) {
throw new IllegalStateException("Cannot calculate size for range spanning multiple ledgers");
}
return (int) (lastPosition().getEntryId() - startPosition().getEntryId() + 1);
}

@Override
default int compareTo(ManagedCursorReplayReadRange o) {
return Integer.compare(rangeIndex(), o.rangeIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
@Getter
@Setter
private boolean cacheEvictionByExpectedReadCount = true;
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ public class ManagedLedgerFactoryConfig {
*/
private long managedLedgerMaxReadsInFlightSize = 0;

/**
* Maximum time to wait for acquiring permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is
* set (>0) and the limit is reached.
*/
private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000;

/**
* Maximum number of reads that can be queued for acquiring permits for max reads in flight when
* managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.
*/
private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 10000;

/**
* Whether trace managed ledger task execution time.
*/
Expand Down
Loading
Loading