Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch #17870

Merged
Merged
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 65 additions & 39 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -60,10 +61,11 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

DelayedShareFetch(
@@ -78,6 +80,7 @@ public class DelayedShareFetch extends DelayedOperation {
this.partitionsAlreadyFetched = new LinkedHashMap<>();
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
this.partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM);
}

@Override
@@ -99,7 +102,7 @@ public void onComplete() {
partitionsAcquired.keySet());

try {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
@@ -121,11 +124,13 @@ public void onComplete() {
}
}

private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
responseData = readFromLog(
topicPartitionData,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()));
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
@@ -158,7 +163,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
*/
@Override
public boolean tryComplete() {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions();

try {
if (!topicPartitionData.isEmpty()) {
@@ -167,7 +172,7 @@ public boolean tryComplete() {
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
boolean completedByMe = forceComplete();
@@ -202,28 +207,18 @@ public boolean tryComplete() {
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
)
);
topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset());
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
@@ -239,23 +234,28 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
return topicPartitionData;
}

private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
LinkedHashMap<TopicIdPartition, Long> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData);
if (sharePartition.fetchOffsetMetadata(fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, fetchOffset);
}
});
if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
// Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
// we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
// leftover partitions to starve which will be fetched later.
return readFromLog(
partitionsNotMatchingFetchOffsetMetadata,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size()));
}

private void maybeUpdateFetchOffsetMetadata(
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
@@ -267,17 +267,18 @@ private void maybeUpdateFetchOffsetMetadata(
continue;
}
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition).fetchOffset,
topicPartitionData.get(topicIdPartition),
replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}

// minByes estimation currently assumes the common case where all fetched data is acquirable.
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
for (Map.Entry<TopicIdPartition, Long> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
FetchRequest.PartitionData partitionData = entry.getValue();
long fetchOffset = entry.getValue();

LogOffsetMetadata endOffsetMetadata;
try {
@@ -294,7 +295,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest

SharePartition sharePartition = sharePartitions.get(topicIdPartition);

Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
@@ -312,7 +313,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
return true;
} else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when accumulating the bytes.
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionData.maxBytes);
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionMaxBytes.get(topicIdPartition));
accumulatedSize += bytesAvailable;
}
}
@@ -335,13 +336,29 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(
LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
if (partitionsToFetch.isEmpty()) {
return new LinkedHashMap<>();
}

LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

for (Map.Entry<TopicIdPartition, Long> entry : topicPartitionFetchOffsets.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
long fetchOffset = entry.getValue();
topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData(
topicIdPartition.topicId(),
fetchOffset,
0,
partitionMaxBytes.get(topicIdPartition),
Optional.empty()
));
}

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetch.fetchParams(),
CollectionConverters.asScala(
@@ -390,18 +407,27 @@ private void handleFetchException(
}

// Visible for testing.
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);

// Computing the total bytes that has already been fetched for the existing fetched data.
int totalPartitionMaxBytesUsed = 0;
for (LogReadResult logReadResult : existingFetchedData.values()) {
totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes();
}

LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(
missingLogReadTopicPartitions,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size()));
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
15 changes: 11 additions & 4 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -39,6 +40,7 @@
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;

@@ -559,13 +561,18 @@ public void testCombineLogReadResponse() {
.withSharePartitions(sharePartitions)
.build();

LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class));
topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class));
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>();
topicPartitionData.put(tp0, 0L);
topicPartitionData.put(tp1, 0L);

// Case 1 - logReadResponse contains tp0.
LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new LinkedHashMap<>();
logReadResponse.put(tp0, mock(LogReadResult.class));
LogReadResult logReadResult = mock(LogReadResult.class);
Records records = mock(Records.class);
when(records.sizeInBytes()).thenReturn(2);
FetchDataInfo fetchDataInfo = new FetchDataInfo(mock(LogOffsetMetadata.class), records);
when(logReadResult.info()).thenReturn(fetchDataInfo);
logReadResponse.put(tp0, logReadResult);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
@@ -902,7 +902,7 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th
shareConsumer.subscribe(Collections.singleton(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
assertEquals(2, records.count());
}
}

Loading