Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer #18532

Draft
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand Down Expand Up @@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set small enough session timeout
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",

// Tests will run for CONSUMER and CLASSIC group protocol, so set the group max size property
// required for each.
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,

ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
Expand Down Expand Up @@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: String): Unit = consumeWithBrokerFailures(10)

/*
Expand Down Expand Up @@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5)

def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
Expand Down Expand Up @@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: String): Unit = {
val numRecords = 1000
val newtopic = "newtopic"
Expand Down Expand Up @@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {

checkCloseGoodPath(numRecords, "group1")
checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
checkCloseWithClusterFailure(numRecords, "group4", "group5")
checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)
}

/**
Expand Down Expand Up @@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* there is no coordinator, but close should timeout and return. If close is invoked with a very
* large timeout, close should timeout after request timeout.
*/
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String): Unit = {
private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String,
groupProtocol: String): Unit = {
val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords)

val requestTimeout = 6000
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords)

Expand All @@ -319,17 +327,20 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config.
* Then, 1 consumer should be left out of the group.
*/
@Test
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13421)
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum: String, groupProtocol: String): Unit = {
val group = "group-max-size-test"
val topic = "group-max-size-test"
val maxGroupSize = 2
val consumerCount = maxGroupSize + 1
val partitionCount = consumerCount * 2

this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)

Expand Down Expand Up @@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, groupProtocol: String): Unit = {
val group = "fatal-exception-test"
val topic = "fatal-exception-test"
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
Expand Down Expand Up @@ -400,12 +413,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* close should terminate immediately without sending leave group.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
val topic = "closetest"
createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
checkCloseDuringRebalance("group1", topic, executor, brokersAvailableDuringClose = true)
}
Expand Down
Loading