Skip to content

Commit f8e6d43

Browse files
committedJul 9, 2024·
Merge remote-tracking branch 'origin/kip-932' into AKCORE-253
2 parents 97c1fb7 + dfcf22c commit f8e6d43

File tree

2 files changed

+1
-7
lines changed

2 files changed

+1
-7
lines changed
 

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

-7
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,6 @@ private void handleShareAcknowledgeFailure(Node fetchTarget,
575575
metadata.topicNames().get(topic.topicId()));
576576
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
577577
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
578-
acknowledgeRequestState.inFlightAcknowledgements.remove(tip);
579578
}));
580579
} finally {
581580
log.debug("Removing pending request for node {} - failed", fetchTarget);
@@ -599,7 +598,6 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget,
599598
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
600599
}
601600
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode()));
602-
acknowledgeRequestState.inFlightAcknowledgements.remove(tip);
603601
}));
604602

605603
metricsManager.recordLatency(resp.requestLatencyMs());
@@ -625,7 +623,6 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget,
625623
metadata.topicNames().get(topic.topicId()));
626624
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
627625
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
628-
acknowledgeRequestState.inFlightAcknowledgements.remove(tip);
629626
}));
630627
} finally {
631628
log.debug("Removing pending request for node {} - failed", fetchTarget);
@@ -839,10 +836,6 @@ void retryRequest() {
839836
inFlightAcknowledgements.clear();
840837
}
841838

842-
boolean isIncompleteMapEmpty() {
843-
return incompleteAcknowledgements.isEmpty();
844-
}
845-
846839
boolean maybeExpire() {
847840
return numAttempts > 0 && isExpired();
848841
}

‎tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public static void main(String[] args) {
7373
Map<MetricName, ? extends Metric> metrics = null;
7474
if (options.printMetrics())
7575
metrics = shareConsumer.metrics();
76+
shareConsumer.commitAsync();
7677
shareConsumer.close();
7778

7879
// print final stats

0 commit comments

Comments
 (0)
Please sign in to comment.