Skip to content

Commit 3fcaec7

Browse files
authored
KAFKA-18399 Remove ZooKeeper from KafkaApis (10/N): ALTER_CONFIG and INCREMENETAL_ALTER_CONFIG (#18432)
Reviewers: Christo Lolov <[email protected]>, Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6b8cc5d commit 3fcaec7

File tree

2 files changed

+41
-241
lines changed

2 files changed

+41
-241
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

+3-95
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@ import kafka.server.share.SharePartitionManager
2626
import kafka.utils.Logging
2727
import org.apache.kafka.admin.AdminUtils
2828
import org.apache.kafka.clients.CommonClientConfigs
29-
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
30-
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, EndpointType}
29+
import org.apache.kafka.clients.admin.EndpointType
3130
import org.apache.kafka.common.acl.AclOperation
3231
import org.apache.kafka.common.acl.AclOperation._
33-
import org.apache.kafka.common.config.ConfigResource
3432
import org.apache.kafka.common.errors._
3533
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
3634
import org.apache.kafka.common.internals.{FatalExitError, Topic}
3735
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
38-
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
3936
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
4037
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
4138
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
@@ -2120,55 +2117,11 @@ class KafkaApis(val requestChannel: RequestChannel,
21202117
}
21212118
if (remaining.resources().isEmpty) {
21222119
sendResponse(Some(new AlterConfigsResponseData()))
2123-
} else if ((!request.isForwarded) && metadataSupport.canForward()) {
2120+
} else {
21242121
metadataSupport.forwardingManager.get.forwardRequest(request,
21252122
new AlterConfigsRequest(remaining, request.header.apiVersion()),
21262123
response => sendResponse(response.map(_.data())))
2127-
} else {
2128-
sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining)))
2129-
}
2130-
}
2131-
2132-
def processLegacyAlterConfigsRequest(
2133-
originalRequest: RequestChannel.Request,
2134-
data: AlterConfigsRequestData
2135-
): AlterConfigsResponseData = {
2136-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
2137-
val alterConfigsRequest = new AlterConfigsRequest(data, originalRequest.header.apiVersion())
2138-
val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
2139-
resource.`type` match {
2140-
case ConfigResource.Type.BROKER_LOGGER =>
2141-
throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
2142-
case ConfigResource.Type.BROKER | ConfigResource.Type.CLIENT_METRICS =>
2143-
authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
2144-
case ConfigResource.Type.TOPIC =>
2145-
authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name)
2146-
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
2147-
}
2148-
}
2149-
val authorizedResult = zkSupport.adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
2150-
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
2151-
resource -> configsAuthorizationApiError(resource)
2152-
}
2153-
val response = new AlterConfigsResponseData()
2154-
(authorizedResult ++ unauthorizedResult).foreach { case (resource, error) =>
2155-
response.responses().add(new AlterConfigsResourceResponse()
2156-
.setErrorCode(error.error.code)
2157-
.setErrorMessage(error.message)
2158-
.setResourceName(resource.name)
2159-
.setResourceType(resource.`type`.id))
2160-
}
2161-
response
2162-
}
2163-
2164-
private def configsAuthorizationApiError(resource: ConfigResource): ApiError = {
2165-
val error = resource.`type` match {
2166-
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
2167-
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
2168-
case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED
2169-
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
21702124
}
2171-
new ApiError(error, null)
21722125
}
21732126

21742127
def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -2177,15 +2130,6 @@ class KafkaApis(val requestChannel: RequestChannel,
21772130
(rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, rType, rName))
21782131
val remaining = ConfigAdminManager.copyWithoutPreprocessed(original.data(), preprocessingResponses)
21792132

2180-
// Before deciding whether to forward or handle locally, a ZK broker needs to check if
2181-
// the active controller is ZK or KRaft. If the controller is KRaft, we need to forward.
2182-
// If the controller is ZK, we need to process the request locally.
2183-
val isKRaftController = metadataSupport match {
2184-
case ZkSupport(_, _, _, _, metadataCache, _) =>
2185-
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])
2186-
case RaftSupport(_, _) => true
2187-
}
2188-
21892133
def sendResponse(secondPart: Option[ApiMessage]): Unit = {
21902134
secondPart match {
21912135
case Some(result: IncrementalAlterConfigsResponseData) =>
@@ -2198,49 +2142,13 @@ class KafkaApis(val requestChannel: RequestChannel,
21982142
}
21992143
}
22002144

2201-
// Forwarding has not happened yet, so handle both ZK and KRaft cases here
22022145
if (remaining.resources().isEmpty) {
22032146
sendResponse(Some(new IncrementalAlterConfigsResponseData()))
2204-
} else if ((!request.isForwarded) && metadataSupport.canForward() && isKRaftController) {
2147+
} else {
22052148
metadataSupport.forwardingManager.get.forwardRequest(request,
22062149
new IncrementalAlterConfigsRequest(remaining, request.header.apiVersion()),
22072150
response => sendResponse(response.map(_.data())))
2208-
} else {
2209-
sendResponse(Some(processIncrementalAlterConfigsRequest(request, remaining)))
2210-
}
2211-
}
2212-
2213-
def processIncrementalAlterConfigsRequest(
2214-
originalRequest: RequestChannel.Request,
2215-
data: IncrementalAlterConfigsRequestData
2216-
): IncrementalAlterConfigsResponseData = {
2217-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
2218-
val configs = data.resources.iterator.asScala.map { alterConfigResource =>
2219-
val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType),
2220-
alterConfigResource.resourceName)
2221-
configResource -> alterConfigResource.configs.iterator.asScala.map {
2222-
alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name, alterConfig.value),
2223-
OpType.forId(alterConfig.configOperation))
2224-
}.toBuffer
2225-
}.toMap
2226-
2227-
val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
2228-
resource.`type` match {
2229-
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS =>
2230-
authHelper.authorize(originalRequest.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
2231-
case ConfigResource.Type.TOPIC =>
2232-
authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, resource.name)
2233-
case ConfigResource.Type.GROUP =>
2234-
authHelper.authorize(originalRequest.context, ALTER_CONFIGS, GROUP, resource.name)
2235-
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
2236-
}
2237-
}
2238-
2239-
val authorizedResult = zkSupport.adminManager.incrementalAlterConfigs(authorizedResources, data.validateOnly)
2240-
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
2241-
resource -> configsAuthorizationApiError(resource)
22422151
}
2243-
new IncrementalAlterConfigsResponse(0, (authorizedResult ++ unauthorizedResult).asJava).data()
22442152
}
22452153

22462154
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {

0 commit comments

Comments
 (0)