Skip to content

Commit 688bd3e

Browse files
tedyyanchia7712
authored andcommitted
KAFKA-18399 Remove ZooKeeper from KafkaApis (11/N): CREATE_ACLS and DELETE_ACLS (#18540)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent f82bbd4 commit 688bd3e

File tree

2 files changed

+0
-38
lines changed

2 files changed

+0
-38
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

-10
Original file line numberDiff line numberDiff line change
@@ -2054,16 +2054,6 @@ class KafkaApis(val requestChannel: RequestChannel,
20542054
aclApis.handleDescribeAcls(request)
20552055
}
20562056

2057-
def handleCreateAcls(request: RequestChannel.Request): Unit = {
2058-
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2059-
aclApis.handleCreateAcls(request)
2060-
}
2061-
2062-
def handleDeleteAcls(request: RequestChannel.Request): Unit = {
2063-
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
2064-
aclApis.handleDeleteAcls(request)
2065-
}
2066-
20672057
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
20682058
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
20692059
val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

-28
Original file line numberDiff line numberDiff line change
@@ -9773,34 +9773,6 @@ class KafkaApisTest extends Logging {
97739773
assertEquals("Ongoing", transactionState.transactionState)
97749774
}
97759775

9776-
private def createMockRequest(): RequestChannel.Request = {
9777-
val request: RequestChannel.Request = mock(classOf[RequestChannel.Request])
9778-
val requestHeader: RequestHeader = mock(classOf[RequestHeader])
9779-
when(request.header).thenReturn(requestHeader)
9780-
when(requestHeader.apiKey()).thenReturn(ApiKeys.values().head)
9781-
request
9782-
}
9783-
9784-
private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
9785-
val request = createMockRequest()
9786-
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
9787-
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
9788-
}
9789-
9790-
@Test
9791-
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
9792-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
9793-
kafkaApis = createKafkaApis(raftSupport = true)
9794-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls)
9795-
}
9796-
9797-
@Test
9798-
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
9799-
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
9800-
kafkaApis = createKafkaApis(raftSupport = true)
9801-
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls)
9802-
}
9803-
98049776
@Test
98059777
def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
98069778
val request = buildRequest(new AlterConfigsRequest(new AlterConfigsRequestData(), 1.toShort))

0 commit comments

Comments
 (0)