Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18404: Remove partitionMaxBytes usage from DelayedShareFetch #17870

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 66 additions & 42 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
Expand Down Expand Up @@ -60,24 +61,35 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
this(shareFetch, replicaManager, exceptionHandler, sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
}

DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
PartitionMaxBytesStrategy partitionMaxBytesStrategy) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
this.partitionsAlreadyFetched = new LinkedHashMap<>();
this.exceptionHandler = exceptionHandler;
this.sharePartitions = sharePartitions;
this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
}

@Override
Expand All @@ -99,7 +111,7 @@ public void onComplete() {
partitionsAcquired.keySet());

try {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
Expand All @@ -121,11 +133,13 @@ public void onComplete() {
}
}

private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
responseData = readFromLog(
topicPartitionData,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()));
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
Expand Down Expand Up @@ -158,7 +172,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
*/
@Override
public boolean tryComplete() {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions();

try {
if (!topicPartitionData.isEmpty()) {
Expand All @@ -167,7 +181,7 @@ public boolean tryComplete() {
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
boolean completedByMe = forceComplete();
Expand Down Expand Up @@ -202,28 +216,18 @@ public boolean tryComplete() {
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
try {
// If the share partition is already at capacity, we should not attempt to fetch.
if (sharePartition.canAcquireRecords()) {
topicPartitionData.put(
topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
sharePartition.nextFetchOffset(),
0,
partitionMaxBytes,
Optional.empty()
)
);
topicPartitionData.put(topicIdPartition, sharePartition.nextFetchOffset());
} else {
sharePartition.releaseFetchLock();
log.trace("Record lock partition limit exceeded for SharePartition {}, " +
Expand All @@ -239,24 +243,28 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
return topicPartitionData;
}

private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
LinkedHashMap<TopicIdPartition, Long> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData);
if (sharePartition.fetchOffsetMetadata(fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, fetchOffset);
}
});
if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
// Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
// we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
// leftover partitions to starve which will be fetched later.
return readFromLog(
partitionsNotMatchingFetchOffsetMetadata,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, partitionsNotMatchingFetchOffsetMetadata.keySet(), topicPartitionData.size()));
}

private void maybeUpdateFetchOffsetMetadata(
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
private void maybeUpdateFetchOffsetMetadata(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
Expand All @@ -267,17 +275,18 @@ private void maybeUpdateFetchOffsetMetadata(
continue;
}
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition).fetchOffset,
topicPartitionData.get(topicIdPartition),
replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}

// minByes estimation currently assumes the common case where all fetched data is acquirable.
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
for (Map.Entry<TopicIdPartition, Long> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
FetchRequest.PartitionData partitionData = entry.getValue();
long fetchOffset = entry.getValue();

LogOffsetMetadata endOffsetMetadata;
try {
Expand All @@ -294,7 +303,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest

SharePartition sharePartition = sharePartitions.get(topicIdPartition);

Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
Expand All @@ -312,7 +321,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
return true;
} else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when accumulating the bytes.
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionData.maxBytes);
long bytesAvailable = Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), partitionMaxBytes.get(topicIdPartition));
accumulatedSize += bytesAvailable;
}
}
Expand All @@ -335,13 +344,25 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
if (partitionsToFetch.isEmpty()) {
return new LinkedHashMap<>();
}

LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> topicPartitionData.put(topicIdPartition,
new FetchRequest.PartitionData(
topicIdPartition.topicId(),
fetchOffset,
0,
partitionMaxBytes.get(topicIdPartition),
Optional.empty())
));

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetch.fetchParams(),
CollectionConverters.asScala(
Expand Down Expand Up @@ -390,18 +411,21 @@ private void handleFetchException(
}

// Visible for testing.
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset);
}
});
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);

LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(
missingLogReadTopicPartitions,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, missingLogReadTopicPartitions.keySet(), topicPartitionData.size()));
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
Expand Down
Loading
Loading