Skip to content

Commit ad031b9

Browse files
KAFKA-18635: reenable the unclean shutdown detection (apache#18277)
We need to re-enable the unclean shutdown detection when in ELR mode, which was inadvertently removed during the development process. Reviewers: David Mao <[email protected]>, Jun Rao <[email protected]>
1 parent 7719b5f commit ad031b9

File tree

8 files changed

+613
-69
lines changed

8 files changed

+613
-69
lines changed

core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java

+446
Large diffs are not rendered by default.

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

+18-13
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ static class Builder {
9090
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
9191
private ReplicaPlacer replicaPlacer = null;
9292
private FeatureControlManager featureControl = null;
93-
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;
93+
private BrokerShutdownHandler brokerShutdownHandler = null;
9494
private String interBrokerListenerName = "PLAINTEXT";
9595

9696
Builder setLogContext(LogContext logContext) {
@@ -128,8 +128,8 @@ Builder setFeatureControlManager(FeatureControlManager featureControl) {
128128
return this;
129129
}
130130

131-
Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncleanShutdownHandler) {
132-
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
131+
Builder setBrokerShutdownHandler(BrokerShutdownHandler brokerShutdownHandler) {
132+
this.brokerShutdownHandler = brokerShutdownHandler;
133133
return this;
134134
}
135135

@@ -154,8 +154,8 @@ ClusterControlManager build() {
154154
if (featureControl == null) {
155155
throw new RuntimeException("You must specify FeatureControlManager");
156156
}
157-
if (brokerUncleanShutdownHandler == null) {
158-
throw new RuntimeException("You must specify BrokerUncleanShutdownHandler");
157+
if (brokerShutdownHandler == null) {
158+
throw new RuntimeException("You must specify BrokerShutdownHandler");
159159
}
160160
return new ClusterControlManager(logContext,
161161
clusterId,
@@ -164,7 +164,7 @@ ClusterControlManager build() {
164164
sessionTimeoutNs,
165165
replicaPlacer,
166166
featureControl,
167-
brokerUncleanShutdownHandler,
167+
brokerShutdownHandler,
168168
interBrokerListenerName
169169
);
170170
}
@@ -252,7 +252,7 @@ boolean check() {
252252
*/
253253
private final FeatureControlManager featureControl;
254254

255-
private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
255+
private final BrokerShutdownHandler brokerShutdownHandler;
256256

257257
/**
258258
* The statically configured inter-broker listener name.
@@ -277,7 +277,7 @@ private ClusterControlManager(
277277
long sessionTimeoutNs,
278278
ReplicaPlacer replicaPlacer,
279279
FeatureControlManager featureControl,
280-
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
280+
BrokerShutdownHandler brokerShutdownHandler,
281281
String interBrokerListenerName
282282
) {
283283
this.logContext = logContext;
@@ -293,7 +293,7 @@ private ClusterControlManager(
293293
this.featureControl = featureControl;
294294
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
295295
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
296-
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
296+
this.brokerShutdownHandler = brokerShutdownHandler;
297297
this.interBrokerListenerName = interBrokerListenerName;
298298
}
299299

@@ -335,7 +335,8 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
335335
public ControllerResult<BrokerRegistrationReply> registerBroker(
336336
BrokerRegistrationRequestData request,
337337
long newBrokerEpoch,
338-
FinalizedControllerFeatures finalizedFeatures
338+
FinalizedControllerFeatures finalizedFeatures,
339+
boolean cleanShutdownDetectionEnabled
339340
) {
340341
if (heartbeatManager == null) {
341342
throw new RuntimeException("ClusterControlManager is not active.");
@@ -348,8 +349,10 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
348349
List<ApiMessageAndVersion> records = new ArrayList<>();
349350
BrokerRegistration existing = brokerRegistrations.get(brokerId);
350351
Uuid prevIncarnationId = null;
352+
long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1
351353
if (existing != null) {
352354
prevIncarnationId = existing.incarnationId();
355+
storedBrokerEpoch = existing.epoch();
353356
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
354357
if (!request.incarnationId().equals(prevIncarnationId)) {
355358
throw new DuplicateBrokerRegistrationException("Another broker is " +
@@ -424,7 +427,9 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
424427

425428
if (!request.incarnationId().equals(prevIncarnationId)) {
426429
int prevNumRecords = records.size();
427-
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
430+
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
431+
storedBrokerEpoch == request.previousBrokerEpoch() : false;
432+
brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
428433
int numRecordsAdded = records.size() - prevNumRecords;
429434
if (existing == null) {
430435
log.info("No previous registration found for broker {}. New incarnation ID is " +
@@ -847,7 +852,7 @@ public Entry<Integer, Map<String, VersionRange>> next() {
847852
}
848853

849854
@FunctionalInterface
850-
interface BrokerUncleanShutdownHandler {
851-
void addRecordsForShutdown(int brokerId, List<ApiMessageAndVersion> records);
855+
interface BrokerShutdownHandler {
856+
void addRecordsForShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records);
852857
}
853858
}

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -1555,7 +1555,7 @@ private QuorumController(
15551555
setSessionTimeoutNs(sessionTimeoutNs).
15561556
setReplicaPlacer(replicaPlacer).
15571557
setFeatureControlManager(featureControl).
1558-
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
1558+
setBrokerShutdownHandler(this::handleBrokerShutdown).
15591559
setInterBrokerListenerName(interBrokerListenerName).
15601560
build();
15611561
this.configurationControl = new ConfigurationControlManager.Builder().
@@ -2025,7 +2025,8 @@ public CompletableFuture<BrokerRegistrationReply> registerBroker(
20252025
return appendWriteEvent("registerBroker", context.deadlineNs(),
20262026
() -> clusterControl.
20272027
registerBroker(request, offsetControl.nextWriteOffset(),
2028-
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE)),
2028+
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
2029+
context.requestHeader().requestApiVersion() >= 3),
20292030
EnumSet.noneOf(ControllerOperationFlag.class));
20302031
}
20312032

@@ -2203,7 +2204,7 @@ QuorumControllerMetrics controllerMetrics() {
22032204
return controllerMetrics;
22042205
}
22052206

2206-
void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
2207-
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
2207+
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
2208+
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
22082209
}
22092210
}

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -1461,21 +1461,22 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMe
14611461
}
14621462

14631463
/**
1464-
* Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown.
1464+
* Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected.
14651465
*
1466-
* @param brokerId The broker id.
1467-
* @param records The record list to append to.
1466+
* @param brokerId The broker id to be shut down.
1467+
* @param isCleanShutdown Whether the broker has a clean shutdown.
1468+
* @param records The record list to append to.
14681469
*/
1469-
void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion> records) {
1470-
if (featureControl.metadataVersion().isElrSupported()) {
1470+
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
1471+
if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) {
14711472
// ELR is enabled, generate unclean shutdown partition change records
14721473
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
14731474
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
14741475
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
14751476
brokersToElrs.partitionsWithBrokerInElr(brokerId));
14761477
} else {
1477-
// ELR is not enabled, handle the unclean shutdown as if the broker was fenced
1478-
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", brokerId, NO_LEADER, NO_LEADER, records,
1478+
// ELR is not enabled or if it is a clean shutdown, handle the shutdown as if the broker was fenced
1479+
generateLeaderAndIsrUpdates("handleBrokerShutdown", brokerId, NO_LEADER, NO_LEADER, records,
14791480
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
14801481
}
14811482
}

0 commit comments

Comments
 (0)