Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch #17870

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesDivisionStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesDivisionStrategy partitionMaxBytesDivisionStrategy;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
Expand All @@ -78,6 +80,7 @@ public class DelayedShareFetch extends DelayedOperation {
this.partitionsAlreadyFetched = new LinkedHashMap<>();
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
partitionMaxBytesDivisionStrategy = new PartitionMaxBytesDivisionStrategy(PartitionMaxBytesDivisionStrategy.StrategyType.EQUAL_DIVISION);
}

@Override
Expand Down Expand Up @@ -125,7 +128,9 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
responseData = readFromLog(
topicPartitionData,
partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()));
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
Expand Down Expand Up @@ -207,20 +212,21 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
// We do not know the total partitions that can be acquired at this stage, hence we set maxBytes
// to 0 for now and will update it before doing the replica manager fetch.
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
0,
Optional.empty()
)
);
Expand Down Expand Up @@ -251,7 +257,12 @@ private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHa
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
// Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
// we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
// leftover partitions to starve which will be fetched later.
return readFromLog(
partitionsNotMatchingFetchOffsetMetadata,
partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size()));
}

private void maybeUpdateFetchOffsetMetadata(
Expand Down Expand Up @@ -312,7 +323,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
return true;
} else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when accumulating the bytes.
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionData.maxBytes);
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), shareFetch.fetchParams().maxBytes / topicPartitionData.size());
accumulatedSize += bytesAvailable;
}
}
Expand All @@ -335,13 +346,28 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
if (partitionsToFetch.isEmpty()) {
return new LinkedHashMap<>();
}

// Update the maxBytes for every fetchable topic partition.
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
FetchRequest.PartitionData partitionData = entry.getValue();
FetchRequest.PartitionData updatedPartitionData = new FetchRequest.PartitionData(
partitionData.topicId,
partitionData.fetchOffset,
partitionData.logStartOffset,
partitionMaxBytes.get(entry.getKey()),
partitionData.currentLeaderEpoch
);
entry.setValue(updatedPartitionData);
}

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetch.fetchParams(),
CollectionConverters.asScala(
Expand Down Expand Up @@ -401,7 +427,16 @@ LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHash
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);

// Computing the total bytes that has already been fetched for the existing fetched data.
int totalPartitionMaxBytesUsed = 0;
for (LogReadResult logReadResult : existingFetchedData.values()) {
totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes();
}

LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(
missingLogReadTopicPartitions,
partitionMaxBytesDivisionStrategy.partitionMaxBytes(shareFetch.fetchParams().maxBytes - totalPartitionMaxBytesUsed, missingLogReadTopicPartitions.keySet(), missingLogReadTopicPartitions.size()));
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
Expand All @@ -39,6 +40,7 @@
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;

Expand Down Expand Up @@ -565,7 +567,12 @@ public void testCombineLogReadResponse() {

// Case 1 - logReadResponse contains tp0.
LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new LinkedHashMap<>();
logReadResponse.put(tp0, mock(LogReadResult.class));
LogReadResult logReadResult = mock(LogReadResult.class);
Records records = mock(Records.class);
when(records.sizeInBytes()).thenReturn(2);
FetchDataInfo fetchDataInfo = new FetchDataInfo(mock(LogOffsetMetadata.class), records);
when(logReadResult.info()).thenReturn(fetchDataInfo);
logReadResponse.put(tp0, logReadResult);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th
shareConsumer.subscribe(Collections.singleton(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
assertEquals(2, records.count());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
),
)
)
def testShareFetchBrokerRespectsPartitionsSizeLimit(): Unit = {
def testShareFetchBrokerDoesNotRespectPartitionsSizeLimit(): Unit = {
val groupId: String = "group"
val memberId = Uuid.randomUuid()

Expand Down Expand Up @@ -1350,10 +1350,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(11), Collections.singletonList(1)))
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(12), Collections.singletonList(1)))
// The first 10 records will be consumed as it is. For the last 3 records, each of size MAX_PARTITION_BYTES/3,
// only 2 of then will be consumed (offsets 10 and 11) because the inclusion of the third last record will exceed
// the max partition bytes limit
// all 3 of then will be consumed (offsets 10, 11 and 12) because even though the inclusion of the third last record will exceed
// the max partition bytes limit. We should only consider the request level maxBytes as the hard limit.

val partitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
Expand Down Expand Up @@ -1412,15 +1412,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// mocking the behaviour of multiple share consumers from the same share group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)

val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)

val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)

val shareFetchResponse1 = connectAndReceive[ShareFetchResponse](shareFetchRequest1)
val shareFetchResponse2 = connectAndReceive[ShareFetchResponse](shareFetchRequest2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;

import org.apache.kafka.common.TopicIdPartition;

import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Set;

public class PartitionMaxBytesDivisionStrategy {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this class or way extensible in future to add more startegies? Shouldn't you need the interface with enum which shall provide you the right implementing strategy class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10, you can add a strategy to enum StrategyType and add the corresponding code to partitionMaxBytes function for handling the new strategy type. If you do not like this way I can create an interface

public interface PartitionMaxBytesDivisionStrategy {
       public LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize);
}

Then use subclasses to implement this function. Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good as you would require to chnage couple of additional lines in DelayedShareFetch. Rather we should write over a single class/interface which should provide the implementation based on StrategyType.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about below implementation and usage?

this.partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(StrategyType.UNIFORM);
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.server.share.fetch;

import org.apache.kafka.common.TopicIdPartition;

import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Set;

public interface PartitionMaxBytesStrategy {

    enum StrategyType {
        UNIFORM;

        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }

    LinkedHashMap<TopicIdPartition, Integer> maxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize);

    static PartitionMaxBytesStrategy type(StrategyType type) {
        return switch (type) {
            case UNIFORM -> PartitionMaxBytesStrategy::uniformPartitionMaxBytes;
        };
    }

    /**
     * Returns the partition max bytes for a given partition based on the strategy type.
     *
     * @param requestMaxBytes - The total max bytes available for the share fetch request
     * @param partitions - The topic partitions in the order for which we compute the partition max bytes.
     * @param acquiredPartitionsSize - The total partitions that have been acquired.
     * @return the partition max bytes for the topic partitions
     */
    static LinkedHashMap<TopicIdPartition, Integer> uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize) {
        if (partitions == null || partitions.isEmpty()) {
            throw new IllegalArgumentException("Partitions to generate max bytes is null or empty");
        }
        if (requestMaxBytes <= 0) {
            throw new IllegalArgumentException("Requested max bytes must be greater than 0");
        }
        if (acquiredPartitionsSize <= 0) {
            throw new IllegalArgumentException("Acquired partitions size must be greater than 0");
        }

        LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new LinkedHashMap<>();
            partitions.forEach(partition -> partitionMaxBytes.put(
                partition, requestMaxBytes / acquiredPartitionsSize));
        return partitionMaxBytes;
    }
}


public enum StrategyType {
EQUAL_DIVISION;

@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
private final StrategyType type;

public PartitionMaxBytesDivisionStrategy(StrategyType type) {
if (type == null) {
throw new IllegalArgumentException("Partition bytes division strategy is null");
}
this.type = type;
}

/**
* Returns the partition max bytes for a given partition based on the strategy type.
*
* @param requestMaxBytes - The total max bytes available for the share fetch request
* @param partitions - The topic partitions in the order for which we compute the partition max bytes.
* @param acquiredPartitionsSize - The total partitions that have been acquired.
* @return the partition max bytes for the topic partitions
*/
public LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize) {
if (partitions == null || partitions.isEmpty()) {
throw new IllegalArgumentException("Partitions to generate max bytes is null or empty");
}
if (requestMaxBytes <= 0) {
throw new IllegalArgumentException("Requested max bytes must be greater than 0");
}
if (acquiredPartitionsSize <= 0) {
throw new IllegalArgumentException("Acquired partitions size must be greater than 0");
}

LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new LinkedHashMap<>();
if (type == StrategyType.EQUAL_DIVISION) {
partitions.forEach(partition -> partitionMaxBytes.put(
partition, requestMaxBytes / acquiredPartitionsSize));
}
return partitionMaxBytes;
}

// Visible for testing
protected StrategyType type() {
return type;
}
}
Loading
Loading