From 106411f55cf7a4941ebfdc039691ce8251b2e2db Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Wed, 20 Nov 2024 11:09:02 +0530 Subject: [PATCH 01/15] removed partition_max_bytes from delayed share fetch --- .../kafka/common/requests/FetchRequest.java | 15 +++++++++++++- .../kafka/server/share/DelayedShareFetch.java | 20 ++++++++++++------- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 5d1fc9a996ed6..88a6df3bd269b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -59,9 +59,18 @@ public static final class PartitionData { public final Uuid topicId; public final long fetchOffset; public final long logStartOffset; - public final int maxBytes; public final Optional currentLeaderEpoch; public final Optional lastFetchedEpoch; + public int maxBytes; + + public PartitionData( + Uuid topicId, + long fetchOffset, + long logStartOffset, + Optional currentLeaderEpoch + ) { + this(topicId, fetchOffset, logStartOffset, 0, currentLeaderEpoch, Optional.empty()); + } public PartitionData( Uuid topicId, @@ -89,6 +98,10 @@ public PartitionData( this.lastFetchedEpoch = lastFetchedEpoch; } + public void updateMaxBytes(int maxBytes) { + this.maxBytes = maxBytes; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 9bab9818c0706..7ca88bee826ac 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -125,7 +126,7 @@ private void completeShareFetchRequest(LinkedHashMap responseData; if (partitionsAlreadyFetched.isEmpty()) - responseData = readFromLog(topicPartitionData); + responseData = readFromLog(topicPartitionData, shareFetch.fetchParams().maxBytes / topicPartitionData.size()); else // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting // updated in a different tryComplete thread. @@ -207,7 +208,6 @@ LinkedHashMap acquirablePartitions LinkedHashMap topicPartitionData = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { - int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. if (sharePartition.maybeAcquireFetchLock()) { @@ -220,7 +220,6 @@ LinkedHashMap acquirablePartitions topicIdPartition.topicId(), sharePartition.nextFetchOffset(), 0, - partitionMaxBytes, Optional.empty() ) ); @@ -251,7 +250,7 @@ private LinkedHashMap maybeReadFromLog(LinkedHa return new LinkedHashMap<>(); } // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. - return readFromLog(partitionsNotMatchingFetchOffsetMetadata); + return readFromLog(partitionsNotMatchingFetchOffsetMetadata, shareFetch.fetchParams().maxBytes / topicPartitionData.size()); } private void maybeUpdateFetchOffsetMetadata( @@ -312,7 +311,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap readFromLog(LinkedHashMap topicPartitionData) { + private LinkedHashMap readFromLog(LinkedHashMap topicPartitionData, int partitionMaxBytes) { // Filter if there already exists any erroneous topic partition. Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); if (partitionsToFetch.isEmpty()) { return new LinkedHashMap<>(); } + topicPartitionData.values().forEach(partitionData -> partitionData.updateMaxBytes(partitionMaxBytes)); Seq> responseLogResult = replicaManager.readFromLog( shareFetch.fetchParams(), @@ -393,15 +393,21 @@ private void handleFetchException( LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, LinkedHashMap existingFetchedData) { LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); + AtomicInteger totalPartitionMaxBytesUsed = new AtomicInteger(); topicPartitionData.forEach((topicIdPartition, partitionData) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { missingLogReadTopicPartitions.put(topicIdPartition, partitionData); + } else { + totalPartitionMaxBytesUsed.addAndGet(partitionData.maxBytes); } }); if (missingLogReadTopicPartitions.isEmpty()) { return existingFetchedData; } - LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); + LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog( + missingLogReadTopicPartitions, + (shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed.get()) / missingLogReadTopicPartitions.size() + ); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; } From e87da28c006ff5b0289bf32c53cbff5579d2f109 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Tue, 7 Jan 2025 14:27:11 +0530 Subject: [PATCH 02/15] Addressed Apoorv's comment and corrected combineLogReadResponse logic --- .../kafka/common/requests/FetchRequest.java | 15 +--------- .../kafka/server/share/DelayedShareFetch.java | 30 +++++++++++++++---- .../server/share/DelayedShareFetchTest.java | 9 +++++- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 88a6df3bd269b..5d1fc9a996ed6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -59,18 +59,9 @@ public static final class PartitionData { public final Uuid topicId; public final long fetchOffset; public final long logStartOffset; + public final int maxBytes; public final Optional currentLeaderEpoch; public final Optional lastFetchedEpoch; - public int maxBytes; - - public PartitionData( - Uuid topicId, - long fetchOffset, - long logStartOffset, - Optional currentLeaderEpoch - ) { - this(topicId, fetchOffset, logStartOffset, 0, currentLeaderEpoch, Optional.empty()); - } public PartitionData( Uuid topicId, @@ -98,10 +89,6 @@ public PartitionData( this.lastFetchedEpoch = lastFetchedEpoch; } - public void updateMaxBytes(int maxBytes) { - this.maxBytes = maxBytes; - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 7ca88bee826ac..190fb8a55fdcb 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -41,7 +41,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -214,12 +213,15 @@ LinkedHashMap acquirablePartitions try { // If the share partition is already at capacity, we should not attempt to fetch. if (sharePartition.canAcquireRecords()) { + // We do not know the total partitions that can be acquired at this stage, hence we set maxBytes + // to 0 for now and will update it before doing the replica manager fetch. topicPartitionData.put( topicIdPartition, new FetchRequest.PartitionData( topicIdPartition.topicId(), sharePartition.nextFetchOffset(), 0, + 0, Optional.empty() ) ); @@ -340,7 +342,19 @@ private LinkedHashMap readFromLog(LinkedHashMap if (partitionsToFetch.isEmpty()) { return new LinkedHashMap<>(); } - topicPartitionData.values().forEach(partitionData -> partitionData.updateMaxBytes(partitionMaxBytes)); + + // Update the maxBytes for every fetchable topic partition. + for (Map.Entry entry : topicPartitionData.entrySet()) { + FetchRequest.PartitionData partitionData = entry.getValue(); + FetchRequest.PartitionData updatedPartitionData = new FetchRequest.PartitionData( + partitionData.topicId, + partitionData.fetchOffset, + partitionData.logStartOffset, + partitionMaxBytes, + partitionData.currentLeaderEpoch + ); + entry.setValue(updatedPartitionData); + } Seq> responseLogResult = replicaManager.readFromLog( shareFetch.fetchParams(), @@ -393,20 +407,24 @@ private void handleFetchException( LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, LinkedHashMap existingFetchedData) { LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); - AtomicInteger totalPartitionMaxBytesUsed = new AtomicInteger(); topicPartitionData.forEach((topicIdPartition, partitionData) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { missingLogReadTopicPartitions.put(topicIdPartition, partitionData); - } else { - totalPartitionMaxBytesUsed.addAndGet(partitionData.maxBytes); } }); if (missingLogReadTopicPartitions.isEmpty()) { return existingFetchedData; } + + // Computing the total bytes that has already been fetched for the existing fetched data. + int totalPartitionMaxBytesUsed = 0; + for (LogReadResult logReadResult : existingFetchedData.values()) { + totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes(); + } + LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog( missingLogReadTopicPartitions, - (shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed.get()) / missingLogReadTopicPartitions.size() + (shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed) / missingLogReadTopicPartitions.size() ); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 11d3e26eaf39a..6e15a6a0c728b 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; @@ -39,6 +40,7 @@ import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; @@ -565,7 +567,12 @@ public void testCombineLogReadResponse() { // Case 1 - logReadResponse contains tp0. LinkedHashMap logReadResponse = new LinkedHashMap<>(); - logReadResponse.put(tp0, mock(LogReadResult.class)); + LogReadResult logReadResult = mock(LogReadResult.class); + Records records = mock(Records.class); + when(records.sizeInBytes()).thenReturn(2); + FetchDataInfo fetchDataInfo = new FetchDataInfo(mock(LogOffsetMetadata.class), records); + when(logReadResult.info()).thenReturn(fetchDataInfo); + logReadResponse.put(tp0, logReadResult); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); LinkedHashMap combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse); From 1b6fb4a68915413e91e460da5cace5ce3c9b97c1 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Tue, 7 Jan 2025 14:37:33 +0530 Subject: [PATCH 03/15] Trigger build From 6671e913dcdb44beb83df97d016221ad9f650656 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Tue, 7 Jan 2025 15:31:30 +0530 Subject: [PATCH 04/15] Fixed failing tests --- .../java/kafka/test/api/ShareConsumerTest.java | 2 +- .../server/ShareFetchAcknowledgeRequestTest.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 2107cdd7718dc..6551f80b5aaf2 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -902,7 +902,7 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); + assertEquals(2, records.count()); } } diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index 73f9fce42e6b1..b38ec285c403c 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -1310,7 +1310,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo ), ) ) - def testShareFetchBrokerRespectsPartitionsSizeLimit(): Unit = { + def testShareFetchBrokerDoesNotRespectPartitionsSizeLimit(): Unit = { val groupId: String = "group" val memberId = Uuid.randomUuid() @@ -1350,10 +1350,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setPartitionIndex(partition) .setErrorCode(Errors.NONE.code()) .setAcknowledgeErrorCode(Errors.NONE.code()) - .setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(11), Collections.singletonList(1))) + .setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(12), Collections.singletonList(1))) // The first 10 records will be consumed as it is. For the last 3 records, each of size MAX_PARTITION_BYTES/3, - // only 2 of then will be consumed (offsets 10 and 11) because the inclusion of the third last record will exceed - // the max partition bytes limit + // all 3 of then will be consumed (offsets 10, 11 and 12) because even though the inclusion of the third last record will exceed + // the max partition bytes limit. We should only consider the request level maxBytes as the hard limit. val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0) compareFetchResponsePartitions(expectedPartitionData, partitionData) @@ -1412,15 +1412,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // mocking the behaviour of multiple share consumers from the same share group val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty - val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1) + val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500) val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty - val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2) + val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500) val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty - val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3) + val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500) val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1) val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2) From f929a7628f78868c33fc5425c1928d6042eba6b5 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Wed, 8 Jan 2025 13:22:09 +0530 Subject: [PATCH 05/15] Introduced a strategy class to decide the division of partition max bytes to make it scalable --- .../kafka/server/share/DelayedShareFetch.java | 23 +++-- .../PartitionMaxBytesDivisionStrategy.java | 75 +++++++++++++++++ ...PartitionMaxBytesDivisionStrategyTest.java | 84 +++++++++++++++++++ 3 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java create mode 100644 share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 190fb8a55fdcb..604bca2cad1c8 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -27,6 +27,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; +import org.apache.kafka.server.share.fetch.PartitionMaxBytesDivisionStrategy; import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -60,6 +61,7 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetch shareFetch; private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; + private final PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; @@ -78,6 +80,7 @@ public class DelayedShareFetch extends DelayedOperation { this.partitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; + partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(PartitionMaxBytesDivisionStrategy.StrategyType.EQUAL_DIVISION); } @Override @@ -125,7 +128,9 @@ private void completeShareFetchRequest(LinkedHashMap responseData; if (partitionsAlreadyFetched.isEmpty()) - responseData = readFromLog(topicPartitionData, shareFetch.fetchParams().maxBytes / topicPartitionData.size()); + responseData = readFromLog( + topicPartitionData, + partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size())); else // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting // updated in a different tryComplete thread. @@ -252,7 +257,12 @@ private LinkedHashMap maybeReadFromLog(LinkedHa return new LinkedHashMap<>(); } // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. - return readFromLog(partitionsNotMatchingFetchOffsetMetadata, shareFetch.fetchParams().maxBytes / topicPartitionData.size()); + // Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata, + // we will take acquired partitions size = topicPartitionData.size() because we do not want to let the + // leftover partitions to starve which will be fetched later. + return readFromLog( + partitionsNotMatchingFetchOffsetMetadata, + partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size())); } private void maybeUpdateFetchOffsetMetadata( @@ -336,7 +346,9 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) } - private LinkedHashMap readFromLog(LinkedHashMap topicPartitionData, int partitionMaxBytes) { + private LinkedHashMap readFromLog( + LinkedHashMap topicPartitionData, + LinkedHashMap partitionMaxBytes) { // Filter if there already exists any erroneous topic partition. Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); if (partitionsToFetch.isEmpty()) { @@ -350,7 +362,7 @@ private LinkedHashMap readFromLog(LinkedHashMap partitionData.topicId, partitionData.fetchOffset, partitionData.logStartOffset, - partitionMaxBytes, + partitionMaxBytes.get(entry.getKey()), partitionData.currentLeaderEpoch ); entry.setValue(updatedPartitionData); @@ -424,8 +436,7 @@ LinkedHashMap combineLogReadResponse(LinkedHash LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog( missingLogReadTopicPartitions, - (shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed) / missingLogReadTopicPartitions.size() - ); + partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size())); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; } diff --git a/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java new file mode 100644 index 0000000000000..57ed5d4be0c5c --- /dev/null +++ b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.TopicIdPartition; + +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Set; + +public class PartitionMaxBytesDivisionStrategy { + + public enum StrategyType { + EQUAL_DIVISION; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + private final StrategyType type; + + public PartitionMaxBytesDivisionStrategy(StrategyType type) { + if (type == null) { + throw new IllegalArgumentException("Partition bytes division strategy is null"); + } + this.type = type; + } + + /** + * Returns the partition max bytes for a given partition based on the strategy type. + * + * @param requestMaxBytes - The total max bytes available for the share fetch request + * @param partitions - The topic partitions in the order for which we compute the partition max bytes. + * @param acquiredPartitionsSize - The total partitions that have been acquired. + * @return the partition max bytes for the topic partitions + */ + public LinkedHashMap partitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { + if (partitions == null || partitions.isEmpty()) { + throw new IllegalArgumentException("Partitions to generate max bytes is null or empty"); + } + if (requestMaxBytes <= 0) { + throw new IllegalArgumentException("Requested max bytes must be greater than 0"); + } + if (acquiredPartitionsSize <= 0) { + throw new IllegalArgumentException("Acquired partitions size must be greater than 0"); + } + + LinkedHashMap partitionMaxBytes = new LinkedHashMap<>(); + if (type == StrategyType.EQUAL_DIVISION) { + partitions.forEach(partition -> partitionMaxBytes.put( + partition, requestMaxBytes / acquiredPartitionsSize)); + } + return partitionMaxBytes; + } + + // Visible for testing + protected StrategyType type() { + return type; + } +} diff --git a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java new file mode 100644 index 0000000000000..b7ab9282e3472 --- /dev/null +++ b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.share.fetch.PartitionMaxBytesDivisionStrategy.StrategyType; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMaxBytesDivisionStrategyTest { + + @Test + public void testConstructor() { + assertThrows(IllegalArgumentException.class, () -> new PartitionMaxBytesDivisionStrategy(null)); + assertEquals(StrategyType.EQUAL_DIVISION, new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION).type()); + } + + @Test + public void testPartitionMaxBytesThrowsException() { + PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION); + TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)); + TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)); + TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); + Set partitions = new LinkedHashSet<>(); + partitions.add(topicIdPartition1); + partitions.add(topicIdPartition2); + partitions.add(topicIdPartition3); + + // acquired partitions size is 0. + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + 100, partitions, 0)); + // empty partitions set. + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + 100, Collections.EMPTY_SET, 20)); + // request max bytes is 0. + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + 0, partitions, 20)); + } + + @Test + public void testPartitionMaxBytesEqualDivision() { + PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION); + TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)); + TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)); + TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); + Set partitions = new LinkedHashSet<>(); + partitions.add(topicIdPartition1); + partitions.add(topicIdPartition2); + partitions.add(topicIdPartition3); + + LinkedHashMap result = partitionMaxBytesDivisionStrategy.partitionMaxBytes( + 100, partitions, 3); + assertEquals(result.values().stream().toList(), List.of(33, 33, 33)); + + result = partitionMaxBytesDivisionStrategy.partitionMaxBytes( + 100, partitions, 5); + assertEquals(result.values().stream().toList(), List.of(20, 20, 20)); + } +} From 2d44b948fffdc6399442477fdcaeac173f1325a6 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 9 Jan 2025 01:52:31 +0530 Subject: [PATCH 06/15] Addressed Apoorv's round 2 comment --- .../kafka/server/share/DelayedShareFetch.java | 12 +++---- ...gy.java => PartitionMaxBytesStrategy.java} | 34 ++++++++----------- ...ava => PartitionMaxBytesStrategyTest.java} | 27 ++++++++------- 3 files changed, 34 insertions(+), 39 deletions(-) rename share/src/main/java/org/apache/kafka/server/share/fetch/{PartitionMaxBytesDivisionStrategy.java => PartitionMaxBytesStrategy.java} (72%) rename share/src/test/java/org/apache/kafka/server/share/fetch/{PartitionMaxBytesDivisionStrategyTest.java => PartitionMaxBytesStrategyTest.java} (75%) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 604bca2cad1c8..f47b8923e6a55 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -27,7 +27,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; -import org.apache.kafka.server.share.fetch.PartitionMaxBytesDivisionStrategy; +import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -61,7 +61,7 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetch shareFetch; private final ReplicaManager replicaManager; private final BiConsumer exceptionHandler; - private final PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy; + private final PartitionMaxBytesStrategy partitionMaxBytesStrategy; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; @@ -80,7 +80,7 @@ public class DelayedShareFetch extends DelayedOperation { this.partitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; - partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(PartitionMaxBytesDivisionStrategy.StrategyType.EQUAL_DIVISION); + this.partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM); } @Override @@ -130,7 +130,7 @@ private void completeShareFetchRequest(LinkedHashMap maybeReadFromLog(LinkedHa // leftover partitions to starve which will be fetched later. return readFromLog( partitionsNotMatchingFetchOffsetMetadata, - partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size())); + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size())); } private void maybeUpdateFetchOffsetMetadata( @@ -436,7 +436,7 @@ LinkedHashMap combineLogReadResponse(LinkedHash LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog( missingLogReadTopicPartitions, - partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size())); + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size())); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; } diff --git a/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java similarity index 72% rename from share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java rename to share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java index 57ed5d4be0c5c..76f7ac799ac2a 100644 --- a/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategy.java +++ b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java @@ -22,34 +22,36 @@ import java.util.Locale; import java.util.Set; -public class PartitionMaxBytesDivisionStrategy { +public interface PartitionMaxBytesStrategy { - public enum StrategyType { - EQUAL_DIVISION; + enum StrategyType { + UNIFORM; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } } - private final StrategyType type; - public PartitionMaxBytesDivisionStrategy(StrategyType type) { - if (type == null) { - throw new IllegalArgumentException("Partition bytes division strategy is null"); - } - this.type = type; + LinkedHashMap maxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize); + + static PartitionMaxBytesStrategy type(StrategyType type) { + if (type == null) + throw new IllegalArgumentException("Strategy type cannot be null"); + return switch (type) { + case UNIFORM -> PartitionMaxBytesStrategy::uniformPartitionMaxBytes; + }; } /** - * Returns the partition max bytes for a given partition based on the strategy type. + * Returns the partition max bytes for a given partition based on the uniform strategy type. * * @param requestMaxBytes - The total max bytes available for the share fetch request * @param partitions - The topic partitions in the order for which we compute the partition max bytes. * @param acquiredPartitionsSize - The total partitions that have been acquired. * @return the partition max bytes for the topic partitions */ - public LinkedHashMap partitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { + static LinkedHashMap uniformPartitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { if (partitions == null || partitions.isEmpty()) { throw new IllegalArgumentException("Partitions to generate max bytes is null or empty"); } @@ -61,15 +63,7 @@ public LinkedHashMap partitionMaxBytes(int requestMax } LinkedHashMap partitionMaxBytes = new LinkedHashMap<>(); - if (type == StrategyType.EQUAL_DIVISION) { - partitions.forEach(partition -> partitionMaxBytes.put( - partition, requestMaxBytes / acquiredPartitionsSize)); - } + partitions.forEach(partition -> partitionMaxBytes.put(partition, requestMaxBytes / acquiredPartitionsSize)); return partitionMaxBytes; } - - // Visible for testing - protected StrategyType type() { - return type; - } } diff --git a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java similarity index 75% rename from share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java rename to share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java index b7ab9282e3472..62574dec1cae6 100644 --- a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesDivisionStrategyTest.java +++ b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.server.share.fetch.PartitionMaxBytesDivisionStrategy.StrategyType; +import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy.StrategyType; import org.junit.jupiter.api.Test; @@ -29,20 +29,21 @@ import java.util.List; import java.util.Set; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -public class PartitionMaxBytesDivisionStrategyTest { +public class PartitionMaxBytesStrategyTest { @Test public void testConstructor() { - assertThrows(IllegalArgumentException.class, () -> new PartitionMaxBytesDivisionStrategy(null)); - assertEquals(StrategyType.EQUAL_DIVISION, new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION).type()); + assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.type(null)); + assertDoesNotThrow(() -> PartitionMaxBytesStrategy.type(StrategyType.UNIFORM)); } @Test - public void testPartitionMaxBytesThrowsException() { - PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION); + public void testMaxBytesThrowsException() { + PartitionMaxBytesStrategy partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(StrategyType.UNIFORM); TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)); TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)); TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); @@ -52,19 +53,19 @@ public void testPartitionMaxBytesThrowsException() { partitions.add(topicIdPartition3); // acquired partitions size is 0. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( 100, partitions, 0)); // empty partitions set. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( 100, Collections.EMPTY_SET, 20)); // request max bytes is 0. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesDivisionStrategy.partitionMaxBytes( + assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( 0, partitions, 20)); } @Test - public void testPartitionMaxBytesEqualDivision() { - PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(StrategyType.EQUAL_DIVISION); + public void testUniformStrategy() { + PartitionMaxBytesStrategy partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(StrategyType.UNIFORM); TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)); TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)); TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); @@ -73,11 +74,11 @@ public void testPartitionMaxBytesEqualDivision() { partitions.add(topicIdPartition2); partitions.add(topicIdPartition3); - LinkedHashMap result = partitionMaxBytesDivisionStrategy.partitionMaxBytes( + LinkedHashMap result = partitionMaxBytesStrategy.maxBytes( 100, partitions, 3); assertEquals(result.values().stream().toList(), List.of(33, 33, 33)); - result = partitionMaxBytesDivisionStrategy.partitionMaxBytes( + result = partitionMaxBytesStrategy.maxBytes( 100, partitions, 5); assertEquals(result.values().stream().toList(), List.of(20, 20, 20)); } From 5a9440afa3890807e1ea098a5539bcfb2d5b7183 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 9 Jan 2025 12:43:23 +0530 Subject: [PATCH 07/15] Minor refactor --- .../share/fetch/PartitionMaxBytesStrategy.java | 16 ++++++++++------ .../fetch/PartitionMaxBytesStrategyTest.java | 12 +++++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java index 76f7ac799ac2a..75320635a37c2 100644 --- a/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java +++ b/share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java @@ -51,19 +51,23 @@ static PartitionMaxBytesStrategy type(StrategyType type) { * @param acquiredPartitionsSize - The total partitions that have been acquired. * @return the partition max bytes for the topic partitions */ - static LinkedHashMap uniformPartitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { + private static LinkedHashMap uniformPartitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { + checkValidArguments(requestMaxBytes, partitions, acquiredPartitionsSize); + LinkedHashMap partitionMaxBytes = new LinkedHashMap<>(); + partitions.forEach(partition -> partitionMaxBytes.put(partition, requestMaxBytes / acquiredPartitionsSize)); + return partitionMaxBytes; + } + + // Visible for testing. + static void checkValidArguments(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { if (partitions == null || partitions.isEmpty()) { throw new IllegalArgumentException("Partitions to generate max bytes is null or empty"); } if (requestMaxBytes <= 0) { - throw new IllegalArgumentException("Requested max bytes must be greater than 0"); + throw new IllegalArgumentException("Request max bytes must be greater than 0"); } if (acquiredPartitionsSize <= 0) { throw new IllegalArgumentException("Acquired partitions size must be greater than 0"); } - - LinkedHashMap partitionMaxBytes = new LinkedHashMap<>(); - partitions.forEach(partition -> partitionMaxBytes.put(partition, requestMaxBytes / acquiredPartitionsSize)); - return partitionMaxBytes; } } diff --git a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java index 62574dec1cae6..7906b180c51b7 100644 --- a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java +++ b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java @@ -42,8 +42,7 @@ public void testConstructor() { } @Test - public void testMaxBytesThrowsException() { - PartitionMaxBytesStrategy partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(StrategyType.UNIFORM); + public void testCheckValidArguments() { TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)); TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)); TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); @@ -53,14 +52,17 @@ public void testMaxBytesThrowsException() { partitions.add(topicIdPartition3); // acquired partitions size is 0. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( + assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( 100, partitions, 0)); // empty partitions set. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( + assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( 100, Collections.EMPTY_SET, 20)); // request max bytes is 0. - assertThrows(IllegalArgumentException.class, () -> partitionMaxBytesStrategy.maxBytes( + assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( 0, partitions, 20)); + + // Valid arguments. + assertDoesNotThrow(() -> PartitionMaxBytesStrategy.checkValidArguments(100, partitions, 20)); } @Test From eaecc1495947bb74bae48ac43f7a6864fdaf9256 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 9 Jan 2025 18:31:22 +0530 Subject: [PATCH 08/15] Addressed Apoorv's comment round 3-part 1 --- .../kafka/server/share/DelayedShareFetch.java | 7 ++++--- .../fetch/PartitionMaxBytesStrategy.java | 20 +++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index f47b8923e6a55..aebd1491c141c 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -172,7 +172,7 @@ public boolean tryComplete() { // those topic partitions. LinkedHashMap replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); - if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) { + if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { partitionsAcquired = topicPartitionData; partitionsAlreadyFetched = replicaManagerReadResponse; boolean completedByMe = forceComplete(); @@ -284,7 +284,8 @@ private void maybeUpdateFetchOffsetMetadata( } // minByes estimation currently assumes the common case where all fetched data is acquirable. - private boolean isMinBytesSatisfied(LinkedHashMap topicPartitionData) { + private boolean isMinBytesSatisfied(LinkedHashMap topicPartitionData, + LinkedHashMap partitionMaxBytes) { long accumulatedSize = 0; for (Map.Entry entry : topicPartitionData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); @@ -323,7 +324,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap maxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize); static PartitionMaxBytesStrategy type(StrategyType type) { @@ -43,14 +54,7 @@ static PartitionMaxBytesStrategy type(StrategyType type) { }; } - /** - * Returns the partition max bytes for a given partition based on the uniform strategy type. - * - * @param requestMaxBytes - The total max bytes available for the share fetch request - * @param partitions - The topic partitions in the order for which we compute the partition max bytes. - * @param acquiredPartitionsSize - The total partitions that have been acquired. - * @return the partition max bytes for the topic partitions - */ + private static LinkedHashMap uniformPartitionMaxBytes(int requestMaxBytes, Set partitions, int acquiredPartitionsSize) { checkValidArguments(requestMaxBytes, partitions, acquiredPartitionsSize); LinkedHashMap partitionMaxBytes = new LinkedHashMap<>(); From 695737e903b6d17ff608d212e1e0f98bf9480115 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 9 Jan 2025 21:45:11 +0530 Subject: [PATCH 09/15] Addressed Apoorv's comment round 3-part 2 --- .../kafka/server/share/DelayedShareFetch.java | 79 +++++++++---------- .../server/share/DelayedShareFetchTest.java | 6 +- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index aebd1491c141c..2a90a35b26ce5 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -65,7 +65,7 @@ public class DelayedShareFetch extends DelayedOperation { // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; - private LinkedHashMap partitionsAcquired; + private LinkedHashMap partitionsAcquired; private LinkedHashMap partitionsAlreadyFetched; DelayedShareFetch( @@ -102,7 +102,7 @@ public void onComplete() { partitionsAcquired.keySet()); try { - LinkedHashMap topicPartitionData; + LinkedHashMap topicPartitionData; // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. if (partitionsAcquired.isEmpty()) topicPartitionData = acquirablePartitions(); @@ -124,7 +124,7 @@ public void onComplete() { } } - private void completeShareFetchRequest(LinkedHashMap topicPartitionData) { + private void completeShareFetchRequest(LinkedHashMap topicPartitionData) { try { LinkedHashMap responseData; if (partitionsAlreadyFetched.isEmpty()) @@ -163,7 +163,7 @@ private void completeShareFetchRequest(LinkedHashMap topicPartitionData = acquirablePartitions(); + LinkedHashMap topicPartitionData = acquirablePartitions(); try { if (!topicPartitionData.isEmpty()) { @@ -207,9 +207,9 @@ public boolean tryComplete() { * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. */ // Visible for testing - LinkedHashMap acquirablePartitions() { + LinkedHashMap acquirablePartitions() { // Initialize the topic partitions for which the fetch should be attempted. - LinkedHashMap topicPartitionData = new LinkedHashMap<>(); + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { // Add the share partition to the list of partitions to be fetched only if we can @@ -220,16 +220,7 @@ LinkedHashMap acquirablePartitions if (sharePartition.canAcquireRecords()) { // We do not know the total partitions that can be acquired at this stage, hence we set maxBytes // to 0 for now and will update it before doing the replica manager fetch. - topicPartitionData.put( - topicIdPartition, - new FetchRequest.PartitionData( - topicIdPartition.topicId(), - sharePartition.nextFetchOffset(), - 0, - 0, - Optional.empty() - ) - ); + topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset()); } else { sharePartition.releaseFetchLock(); log.trace("Record lock partition limit exceeded for SharePartition {}, " + @@ -245,12 +236,12 @@ LinkedHashMap acquirablePartitions return topicPartitionData; } - private LinkedHashMap maybeReadFromLog(LinkedHashMap topicPartitionData) { - LinkedHashMap partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>(); - topicPartitionData.forEach((topicIdPartition, partitionData) -> { + private LinkedHashMap maybeReadFromLog(LinkedHashMap topicPartitionData) { + LinkedHashMap partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>(); + topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { SharePartition sharePartition = sharePartitions.get(topicIdPartition); - if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) { - partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData); + if (sharePartition.fetchOffsetMetadata(fetchOffset).isEmpty()) { + partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, fetchOffset); } }); if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) { @@ -266,7 +257,7 @@ private LinkedHashMap maybeReadFromLog(LinkedHa } private void maybeUpdateFetchOffsetMetadata( - LinkedHashMap topicPartitionData, + LinkedHashMap topicPartitionData, LinkedHashMap replicaManagerReadResponseData) { for (Map.Entry entry : replicaManagerReadResponseData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); @@ -278,18 +269,18 @@ private void maybeUpdateFetchOffsetMetadata( continue; } sharePartition.updateFetchOffsetMetadata( - topicPartitionData.get(topicIdPartition).fetchOffset, + topicPartitionData.get(topicIdPartition), replicaManagerLogReadResult.info().fetchOffsetMetadata); } } // minByes estimation currently assumes the common case where all fetched data is acquirable. - private boolean isMinBytesSatisfied(LinkedHashMap topicPartitionData, + private boolean isMinBytesSatisfied(LinkedHashMap topicPartitionData, LinkedHashMap partitionMaxBytes) { long accumulatedSize = 0; - for (Map.Entry entry : topicPartitionData.entrySet()) { + for (Map.Entry entry : topicPartitionData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); - FetchRequest.PartitionData partitionData = entry.getValue(); + long fetchOffset = entry.getValue(); LogOffsetMetadata endOffsetMetadata; try { @@ -306,7 +297,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset); + Optional optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(fetchOffset); if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) continue; LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get(); @@ -348,25 +339,27 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) } private LinkedHashMap readFromLog( - LinkedHashMap topicPartitionData, + LinkedHashMap topicPartitionFetchOffsets, LinkedHashMap partitionMaxBytes) { // Filter if there already exists any erroneous topic partition. - Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); + Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet()); if (partitionsToFetch.isEmpty()) { return new LinkedHashMap<>(); } + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); + // Update the maxBytes for every fetchable topic partition. - for (Map.Entry entry : topicPartitionData.entrySet()) { - FetchRequest.PartitionData partitionData = entry.getValue(); - FetchRequest.PartitionData updatedPartitionData = new FetchRequest.PartitionData( - partitionData.topicId, - partitionData.fetchOffset, - partitionData.logStartOffset, - partitionMaxBytes.get(entry.getKey()), - partitionData.currentLeaderEpoch - ); - entry.setValue(updatedPartitionData); + for (Map.Entry entry : topicPartitionFetchOffsets.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + long fetchOffset = entry.getValue(); + topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData( + topicIdPartition.topicId(), + fetchOffset, + 0, + partitionMaxBytes.get(topicIdPartition), + Optional.empty() + )); } Seq> responseLogResult = replicaManager.readFromLog( @@ -417,12 +410,12 @@ private void handleFetchException( } // Visible for testing. - LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, + LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, LinkedHashMap existingFetchedData) { - LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); - topicPartitionData.forEach((topicIdPartition, partitionData) -> { + LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); + topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { - missingLogReadTopicPartitions.put(topicIdPartition, partitionData); + missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset); } }); if (missingLogReadTopicPartitions.isEmpty()) { diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 6e15a6a0c728b..2406c7584d155 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -561,9 +561,9 @@ public void testCombineLogReadResponse() { .withSharePartitions(sharePartitions) .build(); - LinkedHashMap topicPartitionData = new LinkedHashMap<>(); - topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class)); - topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class)); + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); + topicPartitionData.put(tp0, 0L); + topicPartitionData.put(tp1, 0L); // Case 1 - logReadResponse contains tp0. LinkedHashMap logReadResponse = new LinkedHashMap<>(); From 1916c4e13d0acfeb78e8be65a10493063ca6290b Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Thu, 9 Jan 2025 21:53:11 +0530 Subject: [PATCH 10/15] Minor refactor --- core/src/main/java/kafka/server/share/DelayedShareFetch.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 2a90a35b26ce5..fba34de2888dd 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -218,8 +218,6 @@ LinkedHashMap acquirablePartitions() { try { // If the share partition is already at capacity, we should not attempt to fetch. if (sharePartition.canAcquireRecords()) { - // We do not know the total partitions that can be acquired at this stage, hence we set maxBytes - // to 0 for now and will update it before doing the replica manager fetch. topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset()); } else { sharePartition.releaseFetchLock(); @@ -349,7 +347,6 @@ private LinkedHashMap readFromLog( LinkedHashMap topicPartitionData = new LinkedHashMap<>(); - // Update the maxBytes for every fetchable topic partition. for (Map.Entry entry : topicPartitionFetchOffsets.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); long fetchOffset = entry.getValue(); From 9cd1af1b5a86933b4639bf57dddfced17b4e4e23 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sun, 12 Jan 2025 01:01:15 +0530 Subject: [PATCH 11/15] Addressed Apoorv's comments - round 4 --- .../kafka/server/share/DelayedShareFetch.java | 18 +++++++----------- .../fetch/PartitionMaxBytesStrategyTest.java | 3 +++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index fba34de2888dd..cca61045bca71 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -254,9 +254,8 @@ private LinkedHashMap maybeReadFromLog(LinkedHa partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size())); } - private void maybeUpdateFetchOffsetMetadata( - LinkedHashMap topicPartitionData, - LinkedHashMap replicaManagerReadResponseData) { + private void maybeUpdateFetchOffsetMetadata(LinkedHashMap topicPartitionData, + LinkedHashMap replicaManagerReadResponseData) { for (Map.Entry entry : replicaManagerReadResponseData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); SharePartition sharePartition = sharePartitions.get(topicIdPartition); @@ -336,9 +335,8 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) } - private LinkedHashMap readFromLog( - LinkedHashMap topicPartitionFetchOffsets, - LinkedHashMap partitionMaxBytes) { + private LinkedHashMap readFromLog(LinkedHashMap topicPartitionFetchOffsets, + LinkedHashMap partitionMaxBytes) { // Filter if there already exists any erroneous topic partition. Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet()); if (partitionsToFetch.isEmpty()) { @@ -347,9 +345,7 @@ private LinkedHashMap readFromLog( LinkedHashMap topicPartitionData = new LinkedHashMap<>(); - for (Map.Entry entry : topicPartitionFetchOffsets.entrySet()) { - TopicIdPartition topicIdPartition = entry.getKey(); - long fetchOffset = entry.getValue(); + topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> { topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData( topicIdPartition.topicId(), fetchOffset, @@ -357,7 +353,7 @@ private LinkedHashMap readFromLog( partitionMaxBytes.get(topicIdPartition), Optional.empty() )); - } + }); Seq> responseLogResult = replicaManager.readFromLog( shareFetch.fetchParams(), @@ -408,7 +404,7 @@ private void handleFetchException( // Visible for testing. LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, - LinkedHashMap existingFetchedData) { + LinkedHashMap existingFetchedData) { LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { diff --git a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java index 7906b180c51b7..3c6c7ad220766 100644 --- a/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java +++ b/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java @@ -57,6 +57,9 @@ public void testCheckValidArguments() { // empty partitions set. assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( 100, Collections.EMPTY_SET, 20)); + // partitions is null. + assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( + 100, null, 20)); // request max bytes is 0. assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( 0, partitions, 20)); From 4abf7df7f0aaa7e563b8de205a53c3409fe2ca1e Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Sun, 12 Jan 2025 01:30:10 +0530 Subject: [PATCH 12/15] Addressed Apoorv's comments round 4 - part 2 --- .../main/java/kafka/server/share/DelayedShareFetch.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index cca61045bca71..bb667d15f2503 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -415,15 +415,9 @@ LinkedHashMap combineLogReadResponse(LinkedHash return existingFetchedData; } - // Computing the total bytes that has already been fetched for the existing fetched data. - int totalPartitionMaxBytesUsed = 0; - for (LogReadResult logReadResult : existingFetchedData.values()) { - totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes(); - } - LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog( missingLogReadTopicPartitions, - partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size())); + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, missingLogReadTopicPartitions.keySet(), topicPartitionData.size())); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; } From d3da4e20b7e3de8a5ea6bf25170b79c0fd682def Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Mon, 13 Jan 2025 14:00:36 +0530 Subject: [PATCH 13/15] Addressed Apoorv's comments around adding new test --- .../kafka/server/share/DelayedShareFetch.java | 20 +++++-- .../server/share/DelayedShareFetchTest.java | 55 ++++++++++++++++++- .../share/SharePartitionManagerTest.java | 3 + 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index bb667d15f2503..1422e08524a77 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -73,6 +73,15 @@ public class DelayedShareFetch extends DelayedOperation { ReplicaManager replicaManager, BiConsumer exceptionHandler, LinkedHashMap sharePartitions) { + this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)); + } + + DelayedShareFetch( + ShareFetch shareFetch, + ReplicaManager replicaManager, + BiConsumer exceptionHandler, + LinkedHashMap sharePartitions, + PartitionMaxBytesStrategy partitionMaxBytesStrategy) { super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); this.shareFetch = shareFetch; this.replicaManager = replicaManager; @@ -80,7 +89,7 @@ public class DelayedShareFetch extends DelayedOperation { this.partitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; - this.partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM); + this.partitionMaxBytesStrategy = partitionMaxBytesStrategy; } @Override @@ -345,15 +354,14 @@ private LinkedHashMap readFromLog(LinkedHashMap LinkedHashMap topicPartitionData = new LinkedHashMap<>(); - topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> { - topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData( + topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> topicPartitionData.put(topicIdPartition, + new FetchRequest.PartitionData( topicIdPartition.topicId(), fetchOffset, 0, partitionMaxBytes.get(topicIdPartition), - Optional.empty() - )); - }); + Optional.empty()) + )); Seq> responseLogResult = replicaManager.readFromLog( shareFetch.fetchParams(), diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 2406c7584d155..9b65db63f6056 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; +import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; @@ -184,6 +185,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -288,6 +290,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -330,6 +333,7 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -377,6 +381,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -509,6 +514,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { .withShareFetchData(shareFetch2) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions2) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); // sp1 can be acquired now @@ -559,6 +565,7 @@ public void testCombineLogReadResponse() { .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build(); LinkedHashMap topicPartitionData = new LinkedHashMap<>(); @@ -626,6 +633,7 @@ public void testExceptionInMinBytesCalculation() { .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) .build()); // Try complete should return false as the share partition has errored out. @@ -714,6 +722,44 @@ public void testLocksReleasedAcquireException() { delayedShareFetch.lock().unlock(); } + @Test + public void testTryCompleteWhenPartitionMaxBytesStrategyThrowsException() { + String groupId = "grp"; + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + SharePartition sp0 = mock(SharePartition.class); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + + // partitionMaxBytesStrategy.maxBytes() function throws an exception + PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); + when(partitionMaxBytesStrategy.maxBytes(anyInt(), any(), anyInt())).thenThrow(new IllegalArgumentException("Exception thrown")); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withExceptionHandler(mockExceptionHandler()) + .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // releasePartitionLocks is called twice - first time from tryComplete and second time from onComplete + Mockito.verify(delayedShareFetch, times(2)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), @@ -743,6 +789,7 @@ static class DelayedShareFetchBuilder { private ReplicaManager replicaManager = mock(ReplicaManager.class); private BiConsumer exceptionHandler = mockExceptionHandler(); private LinkedHashMap sharePartitions = mock(LinkedHashMap.class); + private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { this.shareFetch = shareFetch; @@ -764,6 +811,11 @@ DelayedShareFetchBuilder withSharePartitions(LinkedHashMap Date: Mon, 13 Jan 2025 16:22:46 +0530 Subject: [PATCH 14/15] Addressed Apoorv's comments around adding new test - part 2 --- .../server/share/DelayedShareFetchTest.java | 288 +++++++++++++++++- 1 file changed, 287 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 9b65db63f6056..54b8cd66c34a8 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -18,6 +18,7 @@ import kafka.cluster.Partition; import kafka.server.LogReadResult; +import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -49,16 +50,21 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import scala.Tuple2; +import scala.jdk.javaapi.CollectionConverters; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; @@ -734,11 +740,12 @@ public void testTryCompleteWhenPartitionMaxBytesStrategyThrowsException() { when(sp0.canAcquireRecords()).thenReturn(true); LinkedHashMap sharePartitions = new LinkedHashMap<>(); sharePartitions.put(tp0, sp0); + CompletableFuture> future = new CompletableFuture<>(); ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), - new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + future, partitionMaxBytes, MAX_FETCH_RECORDS); // partitionMaxBytesStrategy.maxBytes() function throws an exception PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); @@ -758,6 +765,285 @@ public void testTryCompleteWhenPartitionMaxBytesStrategyThrowsException() { Mockito.verify(delayedShareFetch, times(2)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); + + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + Map partitionDataMap = future.join(); + assertEquals(1, partitionDataMap.size()); + assertTrue(partitionDataMap.containsKey(tp0)); + assertEquals("Exception thrown", partitionDataMap.get(tp0).errorMessage()); + } + + @Test + public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirable() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + String groupId = "grp"; + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 4)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + SharePartition sp4 = mock(SharePartition.class); + + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp4, PARTITION_MAX_BYTES); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp3.maybeAcquireFetchLock()).thenReturn(true); + when(sp4.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + when(sp3.canAcquireRecords()).thenReturn(true); + when(sp4.canAcquireRecords()).thenReturn(true); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + sharePartitions.put(tp3, sp3); + sharePartitions.put(tp4, sp4); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp3.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp4.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // All 5 partitions are acquirable. + doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp3.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp4.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp2, 1); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp3, 1); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp4, 1); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) + .build()); + + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + + // Since all partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 5) + int expectedPartitionMaxBytes = 1024 * 1024 / 5; + LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); + sharePartitions.keySet().forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + 0, + 0, + expectedPartitionMaxBytes, + Optional.empty() + ))); + + Mockito.verify(replicaManager, times(1)).readFromLog( + shareFetch.fetchParams(), + CollectionConverters.asScala( + sharePartitions.keySet().stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) + ), + QuotaFactory.UNBOUNDED_QUOTA, + true); + } + + @Test + public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirable() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + String groupId = "grp"; + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 4)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + SharePartition sp4 = mock(SharePartition.class); + + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp4, PARTITION_MAX_BYTES); + + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(false); + when(sp3.maybeAcquireFetchLock()).thenReturn(true); + when(sp4.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); + when(sp3.canAcquireRecords()).thenReturn(false); + when(sp4.canAcquireRecords()).thenReturn(false); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + sharePartitions.put(tp3, sp3); + sharePartitions.put(tp4, sp4); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( + ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Only 2 out of 5 partitions are acquirable. + Set acquirableTopicPartitions = new LinkedHashSet<>(); + acquirableTopicPartitions.add(tp0); + acquirableTopicPartitions.add(tp1); + doAnswer(invocation -> buildLogReadResult(acquirableTopicPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); + mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) + .build()); + + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + + // Since only 2 partitions are acquirable, maxbytes per partition = requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2) + int expectedPartitionMaxBytes = 1024 * 1024 / 2; + LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); + acquirableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + 0, + 0, + expectedPartitionMaxBytes, + Optional.empty() + ))); + + Mockito.verify(replicaManager, times(1)).readFromLog( + shareFetch.fetchParams(), + CollectionConverters.asScala( + acquirableTopicPartitions.stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) + ), + QuotaFactory.UNBOUNDED_QUOTA, + true); + } + + @Test + public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() { + String groupId = "grp"; + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) + .build(); + + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); + topicPartitionData.put(tp0, 0L); + topicPartitionData.put(tp1, 0L); + topicPartitionData.put(tp2, 0L); + + // Existing fetched data already contains tp0. + LinkedHashMap logReadResponse = new LinkedHashMap<>(); + LogReadResult logReadResult = mock(LogReadResult.class); + Records records = mock(Records.class); + when(records.sizeInBytes()).thenReturn(2); + FetchDataInfo fetchDataInfo = new FetchDataInfo(mock(LogOffsetMetadata.class), records); + when(logReadResult.info()).thenReturn(fetchDataInfo); + logReadResponse.put(tp0, logReadResult); + + Set fetchableTopicPartitions = new LinkedHashSet<>(); + fetchableTopicPartitions.add(tp1); + fetchableTopicPartitions.add(tp2); + // We will be doing replica manager fetch only for tp1 and tp2. + doAnswer(invocation -> buildLogReadResult(fetchableTopicPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + LinkedHashMap combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse); + + assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet()); + // Since only 2 partitions are fetchable but the third one has already been fetched, maxbytes per partition = requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 3) + int expectedPartitionMaxBytes = 1024 * 1024 / 3; + LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap<>(); + fetchableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + 0, + 0, + expectedPartitionMaxBytes, + Optional.empty() + ))); + + Mockito.verify(replicaManager, times(1)).readFromLog( + shareFetch.fetchParams(), + CollectionConverters.asScala( + fetchableTopicPartitions.stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList()) + ), + QuotaFactory.UNBOUNDED_QUOTA, + true); } static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { From dab84c3cfa14fd7bdb35c9657c204d7fef933987 Mon Sep 17 00:00:00 2001 From: adixitconfluent Date: Mon, 13 Jan 2025 16:40:20 +0530 Subject: [PATCH 15/15] Fix spotless build issue --- .../test/java/kafka/server/share/DelayedShareFetchTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 54b8cd66c34a8..533ffc9569352 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -50,8 +50,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import scala.Tuple2; -import scala.jdk.javaapi.CollectionConverters; import java.util.ArrayList; import java.util.Collections; @@ -66,6 +64,9 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import scala.Tuple2; +import scala.jdk.javaapi.CollectionConverters; + import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;