-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch #17870
KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch #17870
Conversation
78f38d4
to
106411f
Compare
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
Outdated
Show resolved
Hide resolved
…ytes to make it scalable
Test failures are unrelated to my changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR. Can you please help me understand how we will extend this code in future with minimal changes?
import java.util.Locale; | ||
import java.util.Set; | ||
|
||
public class PartitionMaxBytesDivisionStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this class or way extensible in future to add more startegies? Shouldn't you need the interface with enum which shall provide you the right implementing strategy class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10, you can add a strategy to enum StrategyType
and add the corresponding code to partitionMaxBytes
function for handling the new strategy type. If you do not like this way I can create an interface
public interface PartitionMaxBytesDivisionStrategy {
public LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize);
}
Then use subclasses to implement this function. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's good as you would require to chnage couple of additional lines in DelayedShareFetch. Rather we should write over a single class/interface which should provide the implementation based on StrategyType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about below implementation and usage?
this.partitionMaxBytesStrategy = PartitionMaxBytesStrategy.type(StrategyType.UNIFORM);
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Set;
public interface PartitionMaxBytesStrategy {
enum StrategyType {
UNIFORM;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
LinkedHashMap<TopicIdPartition, Integer> maxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize);
static PartitionMaxBytesStrategy type(StrategyType type) {
return switch (type) {
case UNIFORM -> PartitionMaxBytesStrategy::uniformPartitionMaxBytes;
};
}
/**
* Returns the partition max bytes for a given partition based on the strategy type.
*
* @param requestMaxBytes - The total max bytes available for the share fetch request
* @param partitions - The topic partitions in the order for which we compute the partition max bytes.
* @param acquiredPartitionsSize - The total partitions that have been acquired.
* @return the partition max bytes for the topic partitions
*/
static LinkedHashMap<TopicIdPartition, Integer> uniformPartitionMaxBytes(int requestMaxBytes, Set<TopicIdPartition> partitions, int acquiredPartitionsSize) {
if (partitions == null || partitions.isEmpty()) {
throw new IllegalArgumentException("Partitions to generate max bytes is null or empty");
}
if (requestMaxBytes <= 0) {
throw new IllegalArgumentException("Requested max bytes must be greater than 0");
}
if (acquiredPartitionsSize <= 0) {
throw new IllegalArgumentException("Acquired partitions size must be greater than 0");
}
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = new LinkedHashMap<>();
partitions.forEach(partition -> partitionMaxBytes.put(
partition, requestMaxBytes / acquiredPartitionsSize));
return partitionMaxBytes;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, some comments and questions.
share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java
Show resolved
Hide resolved
share/src/main/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategy.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, some comments to address.
share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes. I have a comment around tests, I think we should add tests for the functionality in DelayedShareFetch.
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR and fixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…pache#17870) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
100, partitions, 0)); | ||
// empty partitions set. | ||
assertThrows(IllegalArgumentException.class, () -> PartitionMaxBytesStrategy.checkValidArguments( | ||
100, Collections.EMPTY_SET, 20)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apologize for the delayed review. Could you please use Set.of()
to address the following warnings?
> Task :share:compileTestJava
Note: /home/jenkins/kafka/share/src/test/java/org/apache/kafka/server/share/fetch/PartitionMaxBytesStrategyTest.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 I missed it in review, thanks for pointing out. @adixitconfluent is busy so I just raised this minor PR: #18541
I cherry-picked this to 4.0 as it made another cherry-pick simpler (#18524). |
@AndrewJSchofield Do you think it's sensible to include the config for limiting bytes in single share fetch as the partition max bytes is now dynamic, in your doc? I am just thinking in terms of clients if using default value in 4.0 then might see bigger chunk of data flowing to client. |
…17870) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
…Fetch (apache#17870)" This reverts commit b021b51.
…Fetch (#17870)" (#18643) This reverts commit b021b51. Reviewers: Ismael Juma <[email protected]>, Andrew Schofield <[email protected]>
…pache#17870) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
…pache#17870) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
…pache#17870) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
…quests (#19148) This PR aims to remove the usage of partition max bytes from share fetch requests. Partition Max Bytes is being defined by `PartitionMaxBytesStrategy` which was added to the broker as part of PR #17870 Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>
About
This PR aims to remove
partitionMaxBytes
from share fetch requests(SFR). In the current flow, we cannot fetch more than 1 MB for each topic partition in the share fetch request by default although the overall SFR max bytes is 50MB. This PR has removed this limit by using strategies from a strategy classPartitionMaxBytesStrategy
.PartitionMaxBytesStrategy
contains anUNIFORM
strategy that divides overall SFR max bytes equally among the acquired topic partitions. For example - If SFR max bytes = 50MB and there are 5 acquired topic partitions, then total data that can be fetched for each topic partition from replica manager can be upto 10MB each.Testing
Added unit tests for
PartitionMaxBytesStrategy
and tested with already present unit tests on broker and integration tests for share consumers