diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 6b11081681ac2..cbc7a293b01fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -43,19 +43,19 @@ import org.slf4j.Logger; import java.io.Closeable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -83,7 +83,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map fetchAcknowledgementsMap; - private final Queue acknowledgeRequestStates; + private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; private boolean closing = false; @@ -114,7 +114,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi this.retryBackoffMaxMs = retryBackoffMaxMs; this.sessionHandlers = new HashMap<>(); this.nodesWithPendingRequests = new HashSet<>(); - this.acknowledgeRequestStates = new LinkedList<>(); + this.acknowledgeRequestStates = new HashMap<>(); this.fetchAcknowledgementsMap = new HashMap<>(); this.closeFuture = new CompletableFuture<>(); } @@ -201,6 +201,7 @@ public void fetch(Map acknowledgementsMap) { fetchMoreRecords = true; } acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge)); + log.warn("Going to fetch with piggy back acks = {}", fetchAcknowledgementsMap); } /** @@ -212,37 +213,17 @@ public void fetch(Map acknowledgementsMap) { */ private PollResult processAcknowledgements(long currentTimeMs) { List unsentRequests = new ArrayList<>(); - Iterator iterator = acknowledgeRequestStates.iterator(); - while (iterator.hasNext()) { - AcknowledgeRequestState acknowledgeRequestState = iterator.next(); - if (acknowledgeRequestState.isProcessed()) { - iterator.remove(); - } else if (!acknowledgeRequestState.maybeExpire()) { - if (nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) { - log.trace("Skipping acknowledge request because previous request to {} has not been processed", acknowledgeRequestState.nodeId); - } else { - if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { - acknowledgeRequestState.onSendAttempt(currentTimeMs); - UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); - if (request != null) { - unsentRequests.add(request); - } - } - } - } else { - // Fill in TimeoutException - for (TopicIdPartition tip : acknowledgeRequestState.acknowledgementsMap.keySet()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.REQUEST_TIMED_OUT); - } - iterator.remove(); - } + for (Map.Entry requestStates : acknowledgeRequestStates.values()) { + // For commitAsync + maybeBuildRequest(requestStates.getKey(), currentTimeMs).ifPresent(unsentRequests::add); + // For commitSync + maybeBuildRequest(requestStates.getValue(), currentTimeMs).ifPresent(unsentRequests::add); } PollResult pollResult = null; if (!unsentRequests.isEmpty()) { pollResult = new PollResult(unsentRequests); - } else if (!acknowledgeRequestStates.isEmpty()) { + } else if (areAnyAcknowledgementsLeft()) { // Return empty result until all the acknowledgement request states are processed pollResult = PollResult.EMPTY; } else if (closing) { @@ -251,10 +232,54 @@ private PollResult processAcknowledgements(long currentTimeMs) { } pollResult = PollResult.EMPTY; } - return pollResult; } + private Optional maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState, long currentTimeMs) { + if (acknowledgeRequestState == null || (acknowledgeRequestState.acknowledgementsToSend.isEmpty() && acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) { + return Optional.empty(); + } else if (!acknowledgeRequestState.maybeExpire()) { + if (nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed", acknowledgeRequestState.nodeId); + } else { + if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { + acknowledgeRequestState.onSendAttempt(currentTimeMs); + return Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs)); + } + } + } else { + // Fill in TimeoutException + for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.REQUEST_TIMED_OUT); + } + acknowledgeRequestState.incompleteAcknowledgements.clear(); + } + return Optional.empty(); + } + + private boolean areAnyAcknowledgementsLeft() { + AtomicBoolean b = new AtomicBoolean(false); + Iterator>> iterator = acknowledgeRequestStates.entrySet().iterator(); + //Iterator> iterator = acknowledgeRequestStates.iterator(); + while (iterator.hasNext()) { + Map.Entry> acknowledgeRequestStatePair = iterator.next(); + if (isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getKey()) && isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getValue())) { + b.set(true); + } else { + acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey()); + } + } + return b.get(); + } + + private boolean isRequestStateEmpty(AcknowledgeRequestState acknowledgeRequestState) { + return acknowledgeRequestState == null + || !acknowledgeRequestState.acknowledgementsToSend.isEmpty() + || !acknowledgeRequestState.incompleteAcknowledgements.isEmpty() + || !acknowledgeRequestState.inFlightAcknowledgements.isEmpty(); + } + /** * Enqueue an AcknowledgeRequestState to be picked up on the next poll * @@ -287,7 +312,10 @@ public CompletableFuture> commitSync( resultCount.incrementAndGet(); } } - acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, + acknowledgeRequestStates.putIfAbsent(nodeId, new AbstractMap.SimpleEntry<>(null, null)); + + // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state. + acknowledgeRequestStates.put(nodeId, new AbstractMap.SimpleEntry<>(acknowledgeRequestStates.get(nodeId).getKey(), new AcknowledgeRequestState(logContext, ShareConsumeRequestManager.class.getSimpleName() + ":1", deadlineMs, retryBackoffMs, @@ -298,7 +326,7 @@ public CompletableFuture> commitSync( this::handleShareAcknowledgeSuccess, this::handleShareAcknowledgeFailure, resultHandler - )); + ))); } }); @@ -320,6 +348,9 @@ public void commitAsync(final Map acknowledg Node node = cluster.nodeById(nodeId); if (node != null) { Map acknowledgementsMapForNode = new HashMap<>(); + + acknowledgeRequestStates.putIfAbsent(nodeId, new AbstractMap.SimpleEntry<>(null, null)); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { Acknowledgements acknowledgements = acknowledgementsMap.get(tip); if (acknowledgements != null) { @@ -328,20 +359,25 @@ public void commitAsync(final Map acknowledg metricsManager.recordAcknowledgementSent(acknowledgements.size()); log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); resultCount.incrementAndGet(); + if (acknowledgeRequestStates.get(nodeId).getKey() == null) { + acknowledgeRequestStates.replace(nodeId, new AbstractMap.SimpleEntry<>(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":2", + Long.MAX_VALUE, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + this::handleShareAcknowledgeSuccess, + this::handleShareAcknowledgeFailure, + resultHandler + ), acknowledgeRequestStates.get(nodeId).getValue())); + } else { + acknowledgeRequestStates.get(nodeId).getKey().acknowledgementsToSend.putIfAbsent(tip, acknowledgements); + acknowledgeRequestStates.get(nodeId).getKey().acknowledgementsToSend.get(tip).merge(acknowledgements); + } } } - acknowledgeRequestStates.add(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":2", - Long.MAX_VALUE, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - this::handleShareAcknowledgeSuccess, - this::handleShareAcknowledgeFailure, - resultHandler - )); } }); @@ -380,8 +416,12 @@ public CompletableFuture acknowledgeOnClose(final Map(null, null)); + // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state. + acknowledgeRequestStates.put(nodeId, new AbstractMap.SimpleEntry<>(acknowledgeRequestStates.get(nodeId).getKey(), + new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":1", deadlineMs, retryBackoffMs, retryBackoffMaxMs, @@ -392,7 +432,7 @@ public CompletableFuture acknowledgeOnClose(final Map topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error()); })); acknowledgeRequestState.processingComplete(); } } else { + acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs); + response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); if (partition.errorCode() != Errors.NONE.code()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); } acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); })); - + log.warn("SHARE-ACK-SUCCESS : {}", response.data()); acknowledgeRequestState.processingComplete(); } metricsManager.recordLatency(resp.requestLatencyMs()); } finally { - log.debug("Removing pending request for node {} - success", fetchTarget); + log.debug("Removing pending request for node {} - success {}", fetchTarget, resp); nodesWithPendingRequests.remove(fetchTarget.id()); } } @@ -560,7 +604,7 @@ private void handleShareAcknowledgeFailure(Node fetchTarget, TopicIdPartition tip = new TopicIdPartition(topic.topicId(), partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); })); } finally { @@ -582,11 +626,13 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); if (partition.errorCode() != Errors.NONE.code()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); } acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); })); + acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs); + log.warn("SHARE-ACK-CLOSE-SUCCESS : {}", response); metricsManager.recordLatency(resp.requestLatencyMs()); acknowledgeRequestState.processingComplete(); } finally { @@ -608,7 +654,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, TopicIdPartition tip = new TopicIdPartition(topic.topicId(), partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); })); } finally { @@ -661,7 +707,17 @@ class AcknowledgeRequestState extends TimedRequestState { /** * The map of acknowledgements to send */ - private final Map acknowledgementsMap; + private Map acknowledgementsToSend; + + /** + * + */ + private Map incompleteAcknowledgements; + + /** + * The in-flight acknowledgements + */ + private Map inFlightAcknowledgements; /** * The handler to call on a successful response from ShareAcknowledge. @@ -673,11 +729,6 @@ class AcknowledgeRequestState extends TimedRequestState { */ private final ResponseHandler errorHandler; - /** - * Whether the request has been processed and will not be retried. - */ - private boolean isProcessed = false; - /** * This handles completing a future when all results are known. */ @@ -718,11 +769,13 @@ class AcknowledgeRequestState extends TimedRequestState { super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, deadlineTimer(time, deadlineMs)); this.sessionHandler = sessionHandler; this.nodeId = nodeId; - this.acknowledgementsMap = acknowledgementsMap; this.successHandler = successHandler; this.errorHandler = errorHandler; + this.acknowledgementsToSend = acknowledgementsMap; this.resultHandler = resultHandler; this.onClose = onClose; + this.inFlightAcknowledgements = new HashMap<>(); + this.incompleteAcknowledgements = new HashMap<>(); } UnsentRequest buildRequest(long currentTimeMs) { @@ -731,7 +784,11 @@ UnsentRequest buildRequest(long currentTimeMs) { sessionHandler.notifyClose(); } - for (Map.Entry entry : acknowledgementsMap.entrySet()) { + Map finalAcknowledgementsToSend = new HashMap<>( + incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend : incompleteAcknowledgements); + + for (Map.Entry entry : finalAcknowledgementsToSend.entrySet()) { + log.warn("SHARE_ACKNOWLEDGE-- finalAcksToSend is : {}", acknowledgementsToSend); sessionHandler.addPartitionToFetch(entry.getKey(), entry.getValue()); } @@ -754,12 +811,27 @@ UnsentRequest buildRequest(long currentTimeMs) { handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND); return null; } else { + inFlightAcknowledgements.putAll(finalAcknowledgementsToSend); + if (incompleteAcknowledgements.isEmpty()) { + acknowledgementsToSend.clear(); + } else { + incompleteAcknowledgements.clear(); + } return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler); } } - int getAcknowledgementsCount(TopicIdPartition tip) { - Acknowledgements acks = acknowledgementsMap.get(tip); + int getInFlightAcknowledgementsCount(TopicIdPartition tip) { + Acknowledgements acks = inFlightAcknowledgements.get(tip); + if (acks == null) { + return 0; + } else { + return acks.size(); + } + } + + int getIncompleteAcknowledgementsCount(TopicIdPartition tip) { + Acknowledgements acks = incompleteAcknowledgements.get(tip); if (acks == null) { return 0; } else { @@ -772,7 +844,12 @@ int getAcknowledgementsCount(TopicIdPartition tip) { * through a background event. */ void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCode) { - Acknowledgements acks = acknowledgementsMap.remove(tip); + Acknowledgements acks; + if (acknowledgeErrorCode == Errors.REQUEST_TIMED_OUT) { + acks = incompleteAcknowledgements.get(tip); + } else { + acks = inFlightAcknowledgements.get(tip); + } if (acks != null) { acks.setAcknowledgeErrorCode(acknowledgeErrorCode); } @@ -785,13 +862,12 @@ void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors acknowledgeErrorCod * being sent. */ void handleSessionErrorCode(Errors errorCode) { - acknowledgementsMap.forEach((tip, acks) -> { + inFlightAcknowledgements.forEach((tip, acks) -> { if (acks != null) { acks.setAcknowledgeErrorCode(errorCode); } resultHandler.complete(tip, acks); }); - acknowledgementsMap.clear(); processingComplete(); } @@ -800,17 +876,28 @@ ShareSessionHandler sessionHandler() { } void processingComplete() { - isProcessed = true; + log.warn("Clients : Acknowledgement Success : {}", inFlightAcknowledgements); + incompleteAcknowledgements.clear(); + inFlightAcknowledgements.clear(); resultHandler.completeIfEmpty(); } - boolean isProcessed() { - return isProcessed; + void retryRequest() { + incompleteAcknowledgements.putAll(inFlightAcknowledgements); + inFlightAcknowledgements.clear(); } boolean maybeExpire() { return numAttempts > 0 && isExpired(); } + + // For testing + @Override + public String toString() { + return "ACKS_TO_SEND : " + acknowledgementsToSend + + "INCOMPLETE_ACKS : " + incompleteAcknowledgements + + "IN_FLIGHT_ACKS : " + inFlightAcknowledgements; + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 9cd7d837fda27..125270125d9d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -607,7 +607,7 @@ private ShareFetch pollForFetches(final Timer timer) { long pollTimeout = Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs()); Map acknowledgementsMap = currentFetch.takeAcknowledgedRecords(); - + //log.warn("TAKEACKS : While polling : {}", acknowledgementsMap); // If data is available already, return it immediately final ShareFetch fetch = collect(acknowledgementsMap); if (!fetch.isEmpty()) { @@ -702,6 +702,7 @@ public Map> commitSync(final Duration Timer requestTimer = time.timer(timeout.toMillis()); Map acknowledgementsMap = acknowledgementsToSend(); + //log.warn("TAKEACKS : COMMIT-SYNC : {}", acknowledgementsMap); if (acknowledgementsMap.isEmpty()) { return Collections.emptyMap(); } else { @@ -750,6 +751,7 @@ public void commitAsync() { acknowledgeBatchIfImplicitAcknowledgement(false); Map acknowledgementsMap = acknowledgementsToSend(); + //rn("TAKEACKS : COMMIT-ASYNC : {}", acknowledgementsMap); if (!acknowledgementsMap.isEmpty()) { ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap); applicationEventHandler.add(event); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java index 37895b891ac6c..0c3b30d314391 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java @@ -64,6 +64,7 @@ public int acknowledgeAll(AcknowledgeType type) { recordsAcknowledged++; } } + //System.out.println(recordsAcknowledged + "in SIFBatch : " + acknowledgedRecords); return recordsAcknowledged; } diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 1e860dcdee32b..6b88f49cfb3b0 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -109,7 +109,7 @@ public void createCluster() throws Exception { .setConfigProp("group.share.enable", "true") .setConfigProp("group.share.partition.max.record.locks", "10000") .setConfigProp("group.share.persister.class.name", "org.apache.kafka.server.group.share.DefaultStatePersister") - .setConfigProp("group.share.record.lock.duration.ms", "10000") + .setConfigProp("group.share.record.lock.duration.ms", "1000") .setConfigProp("offsets.topic.replication.factor", "1") .setConfigProp("share.coordinator.state.topic.min.isr", "1") .setConfigProp("share.coordinator.state.topic.replication.factor", "1") @@ -1094,6 +1094,7 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption() { CompletableFuture future = new CompletableFuture<>(); futuresSuccess.add(future); consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, "group1", consumerNumber, 25, true, future); + System.out.println(totalMessagesConsumed.get() + " is total msgs consumed"); }); } producerExecutorService.shutdown(); @@ -1646,8 +1647,8 @@ private CompletableFuture produceMessages(int messageCount) { Future[] recordFutures = new Future[messageCount]; int messagesSent = 0; try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { - ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); for (int i = 0; i < messageCount; i++) { + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, ("key" + i).getBytes(), ("value" + i).getBytes()); recordFutures[i] = producer.send(record); } for (int i = 0; i < messageCount; i++) { @@ -1710,6 +1711,7 @@ private void consumeMessages(AtomicInteger totalMessagesConsumed, } catch (Exception e) { fail("Consumer " + consumerNumber + " failed with exception: " + e); } finally { + shareConsumer.commitAsync(); shareConsumer.close(); future.complete(messagesConsumed); } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index f7fb7364a3c38..4b994d1166c0d 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -20,7 +20,9 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=WARN log4j.logger.org.apache.kafka=WARN - +log4j.logger.org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager=WARN +log4j.logger.org.apache.kafka.clients.consumer.internals.ShareConsumerImpl=WARN +log4j.logger.org.apache.kafka.clients.consumer.internals.ShareSessionHandler=DEBUG # zkclient can be verbose, during debugging it is common to adjust it separately log4j.logger.org.apache.zookeeper=WARN