Skip to content

Commit 08a52c5

Browse files
authored
MINOR: Refactor GroupCoordinatorConfig (#19092)
We defined multiple `ConfigDef`s in `GroupCoordinatorConfig` in then we merge them in a few places because we always use them together. Having multiple `ConfigDef`s does not seem necessary to me. This patch changes it to have just one. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 1bfa4cd commit 08a52c5

File tree

5 files changed

+19
-41
lines changed

5 files changed

+19
-41
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

+11-16
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ public class GroupCoordinatorConfig {
238238
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
239239
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
240240

241-
public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new ConfigDef()
241+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
242+
// Group coordinator configs
242243
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
243244
ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
244245
.define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC)
@@ -250,20 +251,20 @@ public class GroupCoordinatorConfig {
250251
.define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
251252
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
252253
// Internal configuration used by integration and system tests.
253-
.defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, NEW_GROUP_COORDINATOR_ENABLE_DOC);
254+
.defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, NEW_GROUP_COORDINATOR_ENABLE_DOC)
254255

255-
public static final ConfigDef OFFSET_MANAGEMENT_CONFIG_DEF = new ConfigDef()
256+
// Offset configs
256257
.define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
257258
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
258-
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC);
259+
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
259260

260-
public static final ConfigDef CLASSIC_GROUP_CONFIG_DEF = new ConfigDef()
261+
// Classic group configs
261262
.define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
262263
.define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
263264
.define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
264-
.define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC);
265+
.define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC)
265266

266-
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
267+
// Consumer group configs
267268
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
268269
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
269270
.define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@@ -272,9 +273,9 @@ public class GroupCoordinatorConfig {
272273
.define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
273274
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
274275
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
275-
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
276+
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
276277

277-
public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef()
278+
// Share group configs
278279
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
279280
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
280281
.define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@@ -406,13 +407,7 @@ public static GroupCoordinatorConfig fromProps(
406407
) {
407408
return new GroupCoordinatorConfig(
408409
new AbstractConfig(
409-
Utils.mergeConfigs(List.of(
410-
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
411-
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
412-
GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
413-
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
414-
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
415-
)),
410+
GroupCoordinatorConfig.CONFIG_DEF,
416411
props
417412
)
418413
);

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
import org.apache.kafka.common.Configurable;
2020
import org.apache.kafka.common.KafkaException;
2121
import org.apache.kafka.common.config.AbstractConfig;
22-
import org.apache.kafka.common.config.ConfigDef;
2322
import org.apache.kafka.common.config.ConfigException;
2423
import org.apache.kafka.common.record.CompressionType;
25-
import org.apache.kafka.common.utils.Utils;
2624
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
2725
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
2826
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
@@ -44,13 +42,6 @@
4442
import static org.junit.jupiter.api.Assertions.assertTrue;
4543

4644
public class GroupCoordinatorConfigTest {
47-
private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS = List.of(
48-
GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
49-
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
50-
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
51-
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
52-
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
53-
);
5445

5546
public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable {
5647
public Map<String, ?> configs;
@@ -318,7 +309,10 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig(
318309
}
319310

320311
public static GroupCoordinatorConfig createConfig(Map<String, Object> configs) {
321-
return new GroupCoordinatorConfig(
322-
new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
312+
return new GroupCoordinatorConfig(new AbstractConfig(
313+
GroupCoordinatorConfig.CONFIG_DEF,
314+
configs,
315+
false
316+
));
323317
}
324318
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.kafka.common.security.auth.SecurityProtocol;
5353
import org.apache.kafka.common.utils.LogContext;
5454
import org.apache.kafka.common.utils.MockTime;
55-
import org.apache.kafka.common.utils.Utils;
5655
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
5756
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
5857
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
@@ -169,13 +168,7 @@ public static GroupCoordinatorConfig fromProps(
169168
) {
170169
return new GroupCoordinatorConfigContext(
171170
new AbstractConfig(
172-
Utils.mergeConfigs(List.of(
173-
GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
174-
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
175-
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
176-
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
177-
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
178-
)),
171+
GroupCoordinatorConfig.CONFIG_DEF,
179172
props
180173
)
181174
);

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,6 @@ public static ShareGroupConfig createShareGroupConfig(
145145

146146
private static ShareGroupConfig createConfig(Map<String, Object> configs) {
147147
return new ShareGroupConfig(
148-
new AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF)), configs, false));
148+
new AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, GroupCoordinatorConfig.CONFIG_DEF)), configs, false));
149149
}
150150
}

server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
4949
KRaftConfigs.CONFIG_DEF,
5050
SocketServerConfigs.CONFIG_DEF,
5151
ReplicationConfigs.CONFIG_DEF,
52-
GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
53-
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
54-
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
55-
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
56-
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,
52+
GroupCoordinatorConfig.CONFIG_DEF,
5753
CleanerConfig.CONFIG_DEF,
5854
LogConfig.SERVER_CONFIG_DEF,
5955
ShareGroupConfig.CONFIG_DEF,

0 commit comments

Comments
 (0)