Skip to content

Commit cbd72cc

Browse files
authored
KAFKA-14121: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor (#18983)
Reviewers: Christo Lolov <[email protected]>, Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]>
1 parent c3a9b0f commit cbd72cc

File tree

14 files changed

+270
-25
lines changed

14 files changed

+270
-25
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+7
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,13 @@ default AlterPartitionReassignmentsResult alterPartitionReassignments(
10831083
* if the request timed out before the controller could record the new assignments.</li>
10841084
* <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
10851085
* If the specified assignment was not valid.</li>
1086+
* <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
1087+
* If the replication factor was changed in an invalid way.
1088+
* Only thrown when {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} is set to false and
1089+
* the request is attempting to alter reassignments (not cancel)</li>
1090+
* <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
1091+
* If {@link AlterPartitionReassignmentsOptions#allowReplicationFactorChange()} was changed outside the default
1092+
* and the server does not support the option (e.g due to an old Kafka version).</li>
10861093
* <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
10871094
* If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
10881095
* </ul>

clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java

+21
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,25 @@
2323
* Options for {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
2424
*/
2525
public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> {
26+
27+
private boolean allowReplicationFactorChange = true;
28+
29+
/**
30+
* Set the option indicating if the alter partition reassignments call should be
31+
* allowed to alter the replication factor of a partition.
32+
* In cases where it is not allowed, any replication factor change will result in an exception thrown by the API.
33+
*/
34+
public AlterPartitionReassignmentsOptions allowReplicationFactorChange(boolean allow) {
35+
this.allowReplicationFactorChange = allow;
36+
return this;
37+
}
38+
39+
/**
40+
* A boolean indicating if the alter partition reassignments should be
41+
* allowed to alter the replication factor of a partition.
42+
* In cases where it is not allowed, any replication factor change will result in an exception thrown by the API.
43+
*/
44+
public boolean allowReplicationFactorChange() {
45+
return this.allowReplicationFactorChange;
46+
}
2647
}

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+1
Original file line numberDiff line numberDiff line change
@@ -3944,6 +3944,7 @@ public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) {
39443944
data.topics().add(reassignableTopic);
39453945
}
39463946
data.setTimeoutMs(timeoutMs);
3947+
data.setAllowReplicationFactorChange(options.allowReplicationFactorChange());
39473948
return new AlterPartitionReassignmentsRequest.Builder(data);
39483949
}
39493950

clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsRequest.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kafka.common.requests;
1919

20+
import org.apache.kafka.common.errors.UnsupportedVersionException;
2021
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
2122
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
2223
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@@ -42,6 +43,11 @@ public Builder(AlterPartitionReassignmentsRequestData data) {
4243

4344
@Override
4445
public AlterPartitionReassignmentsRequest build(short version) {
46+
if (!data.allowReplicationFactorChange() && version < 1) {
47+
throw new UnsupportedVersionException("The broker does not support the AllowReplicationFactorChange " +
48+
"option for the AlterPartitionReassignments API. Consider re-sending the request without the " +
49+
"option or updating the server version");
50+
}
4551
return new AlterPartitionReassignmentsRequest(data, version);
4652
}
4753

clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
"type": "request",
1919
"listeners": ["broker", "controller"],
2020
"name": "AlterPartitionReassignmentsRequest",
21-
"validVersions": "0",
21+
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
22+
"validVersions": "0-1",
2223
"flexibleVersions": "0+",
2324
"fields": [
2425
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
2526
"about": "The time in ms to wait for the request to complete." },
27+
{ "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true",
28+
"about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid move." },
2629
{ "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
2730
"about": "The topics to reassign.", "fields": [
2831
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",

clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
"apiKey": 45,
1818
"type": "response",
1919
"name": "AlterPartitionReassignmentsResponse",
20-
"validVersions": "0",
20+
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
21+
"validVersions": "0-1",
2122
"flexibleVersions": "0+",
2223
"fields": [
2324
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
2425
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
26+
{ "name": "AllowReplicationFactorChange", "type": "bool", "versions": "1+", "default": "true", "ignorable": true,
27+
"about": "The option indicating whether changing the replication factor of any given partition as part of the request was allowed." },
2528
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
2629
"about": "The top-level error code, or 0 if there was no error." },
2730
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

+17-2
Original file line numberDiff line numberDiff line change
@@ -3452,13 +3452,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
34523452
val tp1 = new TopicPartition(topic, 0)
34533453
val tp2 = new TopicPartition(topic, 1)
34543454
val tp3 = new TopicPartition(topic, 2)
3455-
createTopic(topic, numPartitions = 4)
3456-
3455+
createTopic(topic, numPartitions = 4, replicationFactor = 2)
34573456

34583457
val validAssignment = Optional.of(new NewPartitionReassignment(
34593458
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
34603459
))
34613460

3461+
val alterOptions = new AlterPartitionReassignmentsOptions
3462+
alterOptions.allowReplicationFactorChange(false)
3463+
val alterReplicaNumberTo1 = Optional.of(new NewPartitionReassignment(List(1.asInstanceOf[Integer]).asJava))
3464+
val alterReplicaNumberTo2 = Optional.of(new NewPartitionReassignment((0 until brokerCount - 1).map(_.asInstanceOf[Integer]).asJava))
3465+
val alterReplicaNumberTo3 = Optional.of(new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava))
3466+
val alterReplicaResults = client.alterPartitionReassignments(Map(
3467+
tp1 -> alterReplicaNumberTo1,
3468+
tp2 -> alterReplicaNumberTo2,
3469+
tp3 -> alterReplicaNumberTo3,
3470+
).asJava, alterOptions).values()
3471+
assertDoesNotThrow(() => alterReplicaResults.get(tp2).get())
3472+
assertEquals("The replication factor is changed from 2 to 1",
3473+
assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp1)).getMessage)
3474+
assertEquals("The replication factor is changed from 2 to 3",
3475+
assertFutureThrows(classOf[InvalidReplicationFactorException], alterReplicaResults.get(tp3)).getMessage)
3476+
34623477
val nonExistentTp1 = new TopicPartition("topicA", 0)
34633478
val nonExistentTp2 = new TopicPartition(topic, 4)
34643479
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -2058,16 +2058,18 @@ void generateLeaderAndIsrUpdates(String context,
20582058
ControllerResult<AlterPartitionReassignmentsResponseData>
20592059
alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
20602060
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
2061+
boolean allowRFChange = request.allowReplicationFactorChange();
20612062
AlterPartitionReassignmentsResponseData result =
2062-
new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
2063+
new AlterPartitionReassignmentsResponseData().setErrorMessage(null)
2064+
.setAllowReplicationFactorChange(allowRFChange);
20632065
int successfulAlterations = 0, totalAlterations = 0;
20642066
for (ReassignableTopic topic : request.topics()) {
20652067
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
20662068
setName(topic.name());
20672069
for (ReassignablePartition partition : topic.partitions()) {
20682070
ApiError error = ApiError.NONE;
20692071
try {
2070-
alterPartitionReassignment(topic.name(), partition, records);
2072+
alterPartitionReassignment(topic.name(), partition, records, allowRFChange);
20712073
successfulAlterations++;
20722074
} catch (Throwable e) {
20732075
log.info("Unable to alter partition reassignment for " +
@@ -2090,7 +2092,8 @@ void generateLeaderAndIsrUpdates(String context,
20902092

20912093
void alterPartitionReassignment(String topicName,
20922094
ReassignablePartition target,
2093-
List<ApiMessageAndVersion> records) {
2095+
List<ApiMessageAndVersion> records,
2096+
boolean allowRFChange) {
20942097
Uuid topicId = topicsByName.get(topicName);
20952098
if (topicId == null) {
20962099
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
@@ -2111,7 +2114,7 @@ void alterPartitionReassignment(String topicName,
21112114
if (target.replicas() == null) {
21122115
record = cancelPartitionReassignment(topicName, tp, part);
21132116
} else {
2114-
record = changePartitionReassignment(tp, part, target);
2117+
record = changePartitionReassignment(tp, part, target, allowRFChange);
21152118
}
21162119
record.ifPresent(records::add);
21172120
}
@@ -2175,18 +2178,23 @@ Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
21752178
* @param tp The topic id and partition id.
21762179
* @param part The existing partition info.
21772180
* @param target The target partition info.
2181+
* @param allowRFChange Validate if partition replication factor can change. KIP-860
21782182
*
21792183
* @return The ChangePartitionRecord for the new partition assignment,
21802184
* or empty if no change is needed.
21812185
*/
21822186
Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
21832187
PartitionRegistration part,
2184-
ReassignablePartition target) {
2188+
ReassignablePartition target,
2189+
boolean allowRFChange) {
21852190
// Check that the requested partition assignment is valid.
21862191
PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory);
21872192
PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber);
21882193

21892194
validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());
2195+
if (!allowRFChange) {
2196+
validatePartitionReplicationFactorUnchanged(part, target);
2197+
}
21902198

21912199
List<Integer> currentReplicas = Replicas.toList(part.replicas);
21922200
PartitionReassignmentReplicas reassignment =
@@ -2406,6 +2414,30 @@ private void updatePartitionInfo(
24062414
newPartInfo.elr);
24072415
}
24082416

2417+
private void validatePartitionReplicationFactorUnchanged(PartitionRegistration part,
2418+
ReassignablePartition target) {
2419+
int currentReassignmentSetSize;
2420+
if (isReassignmentInProgress(part)) {
2421+
Set<Integer> set = new HashSet<>();
2422+
for (int r : part.replicas) {
2423+
set.add(r);
2424+
}
2425+
for (int r : part.addingReplicas) {
2426+
set.add(r);
2427+
}
2428+
for (int r : part.removingReplicas) {
2429+
set.remove(r);
2430+
}
2431+
currentReassignmentSetSize = set.size();
2432+
} else {
2433+
currentReassignmentSetSize = part.replicas.length;
2434+
}
2435+
if (currentReassignmentSetSize != target.replicas().size()) {
2436+
throw new InvalidReplicationFactorException("The replication factor is changed from " +
2437+
currentReassignmentSetSize + " to " + target.replicas().size());
2438+
}
2439+
}
2440+
24092441
private static final class IneligibleReplica {
24102442
private final int replicaId;
24112443
private final String reason;

0 commit comments

Comments
 (0)