From 809d0db8d6078ee6bbc9de4a66b2eb365c20f43d Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 00:40:32 +0800 Subject: [PATCH 1/4] remove all field and refactor constructor --- .../org/apache/kafka/clients/NetworkClient.java | 1 - .../apache/kafka/clients/NodeApiVersions.java | 16 +++------------- .../apache/kafka/clients/ApiVersionsTest.java | 2 -- .../kafka/clients/NodeApiVersionsTest.java | 1 - .../clients/producer/KafkaProducerTest.java | 2 -- .../internals/TransactionManagerTest.java | 6 ------ 6 files changed, 3 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ce5c9b25a0133..1100479021fc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1047,7 +1047,6 @@ private void handleApiVersionsResponse(List responses, NodeApiVersions nodeVersionInfo = new NodeApiVersions( apiVersionsResponse.data().apiKeys(), apiVersionsResponse.data().supportedFeatures(), - apiVersionsResponse.data().zkMigrationReady(), apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().finalizedFeaturesEpoch()); apiVersions.update(node, nodeVersionInfo); diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index 43bd9125a1493..b2eaa481f557e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -49,8 +49,6 @@ public class NodeApiVersions { private final Map supportedFeatures; - private final boolean zkMigrationEnabled; - private final Map finalizedFeatures; private final long finalizedFeaturesEpoch; @@ -83,7 +81,7 @@ public static NodeApiVersions create(Collection overrides) { } if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey)); } - return new NodeApiVersions(apiVersions, Collections.emptyList(), false, Collections.emptyList(), -1); + return new NodeApiVersions(apiVersions, Collections.emptyList(), Collections.emptyList(), -1); } @@ -104,16 +102,14 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe public NodeApiVersions( Collection nodeApiVersions, - Collection nodeSupportedFeatures, - boolean zkMigrationEnabled + Collection nodeSupportedFeatures ) { - this(nodeApiVersions, nodeSupportedFeatures, zkMigrationEnabled, Collections.emptyList(), -1); + this(nodeApiVersions, nodeSupportedFeatures, Collections.emptyList(), -1); } public NodeApiVersions( Collection nodeApiVersions, Collection nodeSupportedFeatures, - boolean zkMigrationEnabled, Collection nodeFinalizedFeatures, long finalizedFeaturesEpoch ) { @@ -133,8 +129,6 @@ public NodeApiVersions( new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion())); } this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder); - this.zkMigrationEnabled = zkMigrationEnabled; - this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; this.finalizedFeatures = new HashMap<>(); for (ApiVersionsResponseData.FinalizedFeatureKey finalizedFeature : nodeFinalizedFeatures) { @@ -264,10 +258,6 @@ public Map supportedFeatures() { return supportedFeatures; } - public boolean zkMigrationEnabled() { - return zkMigrationEnabled; - } - public Map finalizedFeatures() { return finalizedFeatures; } diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java index fc2e72c390972..65be3c2b16620 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java @@ -37,7 +37,6 @@ public void testFinalizedFeaturesUpdate() { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 2) @@ -53,7 +52,6 @@ public void testFinalizedFeaturesUpdate() { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 1) diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index 6ef4ac7333330..89f35e5adc961 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -180,7 +180,6 @@ public void testFeatures() { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 2) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 2bd111da16c35..8f354863d7732 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1634,7 +1634,6 @@ public void testSendTxnOffsetsWithGroupIdTransactionV2() { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 2) @@ -1701,7 +1700,6 @@ public void testTransactionV2Produce() throws Exception { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 2) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index e1dccd4bb760a..579c8d7fbb251 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -182,7 +182,6 @@ private void initializeTransactionManager(Optional transactionalId, bool .setName("transaction.version") .setMaxVersion(transactionV2Enabled ? (short) 2 : (short) 1) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1) @@ -930,7 +929,6 @@ public void testTransactionManagerEnablesV2() { .setName("transaction.version") .setMaxVersion((short) 2) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 2) @@ -1035,7 +1033,6 @@ public void testTransactionManagerDisablesV2() { .setName("transaction.version") .setMaxVersion((short) 1) .setMinVersion((short) 0)), - false, Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey() .setName("transaction.version") .setMaxVersionLevel((short) 1) @@ -2972,7 +2969,6 @@ public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws Interru .setMinVersion((short) 0) .setMaxVersion((short) 7)), Collections.emptyList(), - false, Collections.emptyList(), 0)); @@ -3267,7 +3263,6 @@ public void testAbortTransactionAndReuseSequenceNumberOnError() throws Interrupt .setMinVersion((short) 0) .setMaxVersion((short) 7)), Collections.emptyList(), - false, Collections.emptyList(), 0)); @@ -3328,7 +3323,6 @@ public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() thro .setMinVersion((short) 0) .setMaxVersion((short) 4)), Collections.emptyList(), - false, Collections.emptyList(), 0)); From a13d0f9f4610399f130752429074107468eef3ad Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 00:44:18 +0800 Subject: [PATCH 2/4] fix the test --- .../org/apache/kafka/clients/NodeApiVersionsTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index 89f35e5adc961..ad1f614509fc9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -44,7 +44,7 @@ public class NodeApiVersionsTest { @Test public void testUnsupportedVersionsToString() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.clientApis()) { @@ -73,7 +73,7 @@ public void testVersionsToString() { .setMaxVersion((short) 10001)); } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey)); } - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.values()) { @@ -130,7 +130,7 @@ public void testLatestUsableVersionOutOfRangeHigh() { @Test public void testUsableVersionCalculationNoKnownVersions() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); assertThrows(UnsupportedVersionException.class, () -> versions.latestUsableVersion(ApiKeys.FETCH)); } @@ -152,7 +152,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { .setApiKey((short) 100) .setMinVersion((short) 0) .setMaxVersion((short) 1)); - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) { assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey)); } @@ -162,7 +162,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { @EnumSource(ApiMessageType.ListenerType.class) public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) { ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(scope); - NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false); + NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList()); for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) { ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey())); From f0d31d7747a7301f05c77a93123b678cda760874 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 00:48:07 +0800 Subject: [PATCH 3/4] fix the test --- .../java/org/apache/kafka/tools/BrokerApiVersionsCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java index bf6a4e7adcfed..13c30721a57f7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java @@ -249,7 +249,7 @@ protected KafkaFuture getNodeApiVersions(Node node) { if (error.exception() != null) { future.completeExceptionally(error.exception()); } else { - future.complete(new NodeApiVersions(response.data().apiKeys(), response.data().supportedFeatures(), response.data().zkMigrationReady())); + future.complete(new NodeApiVersions(response.data().apiKeys(), response.data().supportedFeatures())); } } catch (Exception e) { future.completeExceptionally(e); From 75ade6629ccf7094840362ce3e0aa7819c035837 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 01:42:03 +0800 Subject: [PATCH 4/4] fix the test --- .../org/apache/kafka/tools/BrokerApiVersionsCommandTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java index aac6a3f48ffa9..3f20bb4e50201 100644 --- a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java @@ -60,8 +60,7 @@ public void testBrokerApiVersionsCommandOutput(ClusterInstance clusterInstance) NodeApiVersions nodeApiVersions = new NodeApiVersions( ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true), - Collections.emptyList(), - false); + Collections.emptyList()); Iterator apiKeysIter = ApiKeys.clientApis().iterator(); while (apiKeysIter.hasNext()) { ApiKeys apiKey = apiKeysIter.next();