Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18932: Removed usage of partition max bytes from share fetch requests #19148

Merged
merged 11 commits into from
Mar 12, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi

return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class ShareFetchRequest extends AbstractRequest {

Expand All @@ -49,19 +47,17 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
}

public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
int maxWait, int minBytes, int maxBytes, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
int ackOnlyPartitionMaxBytes = fetchSize;
boolean isClosingShareSession = false;
if (metadata != null) {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
if (metadata.isFinalEpoch()) {
isClosingShareSession = true;
ackOnlyPartitionMaxBytes = 0;
}
}
data.setMaxWaitMs(maxWait);
Expand All @@ -77,8 +73,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
for (TopicIdPartition tip : send) {
Map<Integer, ShareFetchRequestData.FetchPartition> partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareFetchRequestData.FetchPartition fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(fetchSize);
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
}
}
Expand All @@ -91,8 +86,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
ShareFetchRequestData.FetchPartition fetchPartition = partMap.get(tip.partition());
if (fetchPartition == null) {
fetchPartition = new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(tip.partition())
.setPartitionMaxBytes(ackOnlyPartitionMaxBytes);
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), fetchPartition);
}
fetchPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
Expand Down Expand Up @@ -151,7 +145,7 @@ public String toString() {
}

private final ShareFetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = null;
private volatile List<TopicIdPartition> shareFetchData = null;
private volatile List<TopicIdPartition> toForget = null;

public ShareFetchRequest(ShareFetchRequestData data, short version) {
Expand Down Expand Up @@ -179,41 +173,6 @@ public static ShareFetchRequest parse(ByteBuffer buffer, short version) {
);
}

public static final class SharePartitionData {
public final Uuid topicId;
public final int maxBytes;

public SharePartitionData(
Uuid topicId,
int maxBytes
) {
this.topicId = topicId;
this.maxBytes = maxBytes;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareFetchRequest.SharePartitionData that = (ShareFetchRequest.SharePartitionData) o;
return Objects.equals(topicId, that.topicId) &&
maxBytes == that.maxBytes;
}

@Override
public int hashCode() {
return Objects.hash(topicId, maxBytes);
}

@Override
public String toString() {
return "SharePartitionData(" +
"topicId=" + topicId +
", maxBytes=" + maxBytes +
')';
}
}

public int minBytes() {
return data.minBytes();
}
Expand All @@ -226,23 +185,18 @@ public int maxWait() {
return data.maxWaitMs();
}

public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData(Map<Uuid, String> topicNames) {
public List<TopicIdPartition> shareFetchData(Map<Uuid, String> topicNames) {
if (shareFetchData == null) {
synchronized (this) {
if (shareFetchData == null) {
// Assigning the lazy-initialized `shareFetchData` in the last step
// to avoid other threads accessing a half-initialized object.
final LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>();
final List<TopicIdPartition> shareFetchDataTmp = new ArrayList<>();
data.topics().forEach(shareFetchTopic -> {
String name = topicNames.get(shareFetchTopic.topicId());
shareFetchTopic.partitions().forEach(shareFetchPartition -> {
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
shareFetchDataTmp.put(new TopicIdPartition(shareFetchTopic.topicId(), new TopicPartition(name, shareFetchPartition.partitionIndex())),
new ShareFetchRequest.SharePartitionData(
shareFetchTopic.topicId(),
shareFetchPartition.partitionMaxBytes()
)
);
shareFetchDataTmp.add(new TopicIdPartition(shareFetchTopic.topicId(), shareFetchPartition.partitionIndex(), name));
});
});
shareFetchData = shareFetchDataTmp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
"about": "The partitions to fetch.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void onComplete() {
return;
} else {
// Update metric to record acquired to requested partitions.
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.partitionMaxBytes().size();
double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size();
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100));
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
Expand Down
42 changes: 16 additions & 26 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -248,7 +247,7 @@ private SharePartitionManager(
* @param memberId The member id, generated by the group-coordinator, this is used to identify the client.
* @param fetchParams The fetch parameters from the share fetch request.
* @param batchSize The number of records per acquired records batch.
* @param partitionMaxBytes The maximum number of bytes to fetch for each partition.
* @param topicIdPartitions The topic partitions to fetch for.
*
* @return A future that will be completed with the fetched messages.
*/
Expand All @@ -258,17 +257,17 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
FetchParams fetchParams,
int sessionEpoch,
int batchSize,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes
List<TopicIdPartition> topicIdPartitions
) {
log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}",
partitionMaxBytes.keySet(), groupId, fetchParams);
topicIdPartitions, groupId, fetchParams);

LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions = PartitionRotateStrategy
List<TopicIdPartition> rotatedTopicIdPartitions = PartitionRotateStrategy
.type(PartitionRotateStrategy.StrategyType.ROUND_ROBIN)
.rotate(partitionMaxBytes, new PartitionRotateMetadata(sessionEpoch));
.rotate(topicIdPartitions, new PartitionRotateMetadata(sessionEpoch));

CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, topicIdPartitions, batchSize, maxFetchRecords, brokerTopicStats));
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, brokerTopicStats));

return future;
}
Expand Down Expand Up @@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Par
/**
* The newContext method is used to create a new share fetch context for every share fetch request.
* @param groupId The group id in the share fetch request.
* @param shareFetchData The topic-partitions and their corresponding maxBytes data in the share fetch request.
* @param shareFetchData The topic-partitions in the share fetch request.
* @param toForget The topic-partitions to forget present in the share fetch request.
* @param reqMetadata The metadata in the share fetch request.
* @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not
* @return The new share fetch context object
*/
public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData,
public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData,
List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
ShareFetchContext context;
// TopicPartition with maxBytes as 0 should not be added in the cachedPartitions
Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>();
shareFetchData.forEach((tp, sharePartitionData) -> {
if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
});
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases.
if (reqMetadata.isFull()) {
ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId());
if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
// If the epoch is FINAL_EPOCH, don't try to create a new session.
if (!shareFetchDataWithMaxBytes.isEmpty()) {
throw Errors.INVALID_REQUEST.exception();
}
if (cache.get(key) == null) {
log.error("Share session error for {}: no such share session found", key);
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
Expand All @@ -464,20 +454,20 @@ public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareF
log.debug("Removed share session with key {}", key);
}
ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new
ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size());
shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false)));
ImplicitLinkedHashCollection<>(shareFetchData.size());
shareFetchData.forEach(topicIdPartition ->
cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, false)));
ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(),
time.milliseconds(), cachedSharePartitions);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
}

context = new ShareSessionContext(reqMetadata, shareFetchDataWithMaxBytes);
context = new ShareSessionContext(reqMetadata, shareFetchData);
log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share " +
"session will be started.", responseShareSessionKey, false,
partitionsToLogString(shareFetchDataWithMaxBytes.keySet()));
partitionsToLogString(shareFetchData));
}
} else {
// We update the already existing share session.
Expand All @@ -494,7 +484,7 @@ public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareF
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
}
Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update(
shareFetchDataWithMaxBytes, toForget);
shareFetchData, toForget);
cache.touch(shareSession, time.milliseconds());
shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " +
Expand Down Expand Up @@ -586,7 +576,7 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti

// Visible for testing.
void processShareFetch(ShareFetch shareFetch) {
if (shareFetch.partitionMaxBytes().isEmpty()) {
if (shareFetch.topicIdPartitions().isEmpty()) {
// If there are no partitions to fetch then complete the future with an empty map.
shareFetch.maybeComplete(Collections.emptyMap());
return;
Expand All @@ -596,7 +586,7 @@ void processShareFetch(ShareFetch shareFetch) {
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
// Track the topics for which we have received a share fetch request for metrics.
Set<String> topics = new HashSet<>();
for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) {
for (TopicIdPartition topicIdPartition : shareFetch.topicIdPartitions()) {
topics.add(topicIdPartition.topic());
SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetch.groupId(),
Expand Down
18 changes: 7 additions & 11 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2997,12 +2997,8 @@ class KafkaApis(val requestChannel: RequestChannel,
erroneousAndValidPartitionData.erroneous.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
shareFetchData.forEach {
case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp
}
erroneousAndValidPartitionData.validTopicIdPartitions.forEach(tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp)
shareFetchData.forEach { tp => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp}

// Kafka share consumers need READ permission on each topic they are fetching.
val authorizedTopics = authHelper.filterByAuthorized(
Expand Down Expand Up @@ -3139,15 +3135,15 @@ class KafkaApis(val requestChannel: RequestChannel,
val erroneous = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
erroneousAndValidPartitionData.erroneous.forEach { (topicIdPartition, partitionData) => erroneous.put(topicIdPartition, partitionData) }

val interestedWithMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
val interestedTopicPartitions = new util.ArrayList[TopicIdPartition]

erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case (topicIdPartition, sharePartitionData) =>
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case topicIdPartition =>
if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interestedWithMaxBytes.put(topicIdPartition, sharePartitionData.maxBytes)
interestedTopicPartitions.add(topicIdPartition)
}

val shareFetchRequest = request.body[ShareFetchRequest]
Expand All @@ -3156,7 +3152,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val versionId = request.header.apiVersion
val groupId = shareFetchRequest.data.groupId

if (interestedWithMaxBytes.isEmpty) {
if (interestedTopicPartitions.isEmpty) {
CompletableFuture.completedFuture(erroneous)
} else {
// for share fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being
Expand Down Expand Up @@ -3194,7 +3190,7 @@ class KafkaApis(val requestChannel: RequestChannel,
params,
shareSessionEpoch,
shareFetchRequest.data.batchSize,
interestedWithMaxBytes
interestedTopicPartitions
).thenApply{ result =>
val combinedResult = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
result.asScala.foreach { case (tp, data) =>
Expand Down
Loading