Skip to content

Commit 407a15c

Browse files
committed
comments
1 parent 5a2bde6 commit 407a15c

File tree

2 files changed

+12
-21
lines changed

2 files changed

+12
-21
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2258,7 +2258,7 @@ private StreamsTopology maybeUpdateTopology(final String groupId,
22582258
records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
22592259
return streamsTopologyFromRequest;
22602260
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
2261-
log.info("[GroupId {}][MemberId {}] Member joined with stake topology epoch {}", groupId, memberId, topology.epoch());
2261+
log.info("[GroupId {}][MemberId {}] Member joined with stale topology epoch {}", groupId, memberId, topology.epoch());
22622262
return group.topology().get();
22632263
} else if (!group.topology().get().equals(streamsTopologyFromRequest)) {
22642264
throw new InvalidRequestException("Topology updates are not supported yet.");

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -15723,7 +15723,6 @@ public void testMemberJoinsEmptyStreamsGroup() {
1572315723
public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
1572415724
String groupId = "fooup";
1572515725
String memberId = Uuid.randomUuid().toString();
15726-
1572715726
String subtopology1 = "subtopology1";
1572815727
String fooTopicName = "foo";
1572915728
Uuid fooTopicId = Uuid.randomUuid();
@@ -15785,9 +15784,11 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
1578515784
List<CoordinatorRecord> expectedRecords = List.of(
1578615785
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
1578715786
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
15788-
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
15789-
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
15790-
)),
15787+
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
15788+
Map.of(
15789+
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
15790+
)
15791+
),
1579115792
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
1579215793
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
1579315794
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
@@ -15801,7 +15802,6 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
1580115802
public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
1580215803
String groupId = "fooup";
1580315804
String memberId = Uuid.randomUuid().toString();
15804-
1580515805
String subtopology1 = "subtopology1";
1580615806
String fooTopicName = "foo";
1580715807
Uuid fooTopicId = Uuid.randomUuid();
@@ -15836,7 +15836,12 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
1583615836
.setWarmupTasks(List.of()));
1583715837

1583815838
assertEquals(
15839-
Map.of(barTopicName, new CreatableTopic().setName(barTopicName).setNumPartitions(6).setReplicationFactor((short) -1)),
15839+
Map.of(barTopicName,
15840+
new CreatableTopic()
15841+
.setName(barTopicName)
15842+
.setNumPartitions(6)
15843+
.setReplicationFactor((short) -1)
15844+
),
1584015845
result.response().creatableTopics()
1584115846
);
1584215847
assertResponseEquals(
@@ -15881,7 +15886,6 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
1588115886
public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() {
1588215887
String groupId = "fooup";
1588315888
String memberId = Uuid.randomUuid().toString();
15884-
1588515889
String subtopology1 = "subtopology1";
1588615890
String fooTopicName = "foo";
1588715891
Uuid fooTopicId = Uuid.randomUuid();
@@ -15964,7 +15968,6 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() {
1596415968
public void testStreamsGroupMemberJoiningWithStaleTopology() {
1596515969
String groupId = "fooup";
1596615970
String memberId = Uuid.randomUuid().toString();
15967-
1596815971
String subtopology1 = "subtopology1";
1596915972
String fooTopicName = "foo";
1597015973
Uuid fooTopicId = Uuid.randomUuid();
@@ -16059,7 +16062,6 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() {
1605916062
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
1606016063
String groupId = "fooup";
1606116064
String memberId = Uuid.randomUuid().toString();
16062-
1606316065
String subtopology1 = "subtopology1";
1606416066
String fooTopicName = "foo";
1606516067
Uuid fooTopicId = Uuid.randomUuid();
@@ -16258,7 +16260,6 @@ public void testStreamsNewJoiningMemberTriggersNewTargetAssignment() {
1625816260
String memberId1 = Uuid.randomUuid().toString();
1625916261
String memberId2 = Uuid.randomUuid().toString();
1626016262
String memberId3 = Uuid.randomUuid().toString();
16261-
1626216263
String subtopology1 = "subtopology1";
1626316264
String fooTopicName = "foo";
1626416265
Uuid fooTopicId = Uuid.randomUuid();
@@ -16354,7 +16355,6 @@ public void testStreamsLeavingMemberBumpsGroupEpoch() {
1635416355
String groupId = "fooup";
1635516356
String memberId1 = Uuid.randomUuid().toString();
1635616357
String memberId2 = Uuid.randomUuid().toString();
16357-
1635816358
String subtopology1 = "subtopology1";
1635916359
String fooTopicName = "foo";
1636016360
Uuid fooTopicId = Uuid.randomUuid();
@@ -16425,7 +16425,6 @@ public void testStreamsLeavingMemberBumpsGroupEpoch() {
1642516425
public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
1642616426
String groupId = "fooup";
1642716427
String memberId = Uuid.randomUuid().toString();
16428-
1642916428
String subtopology1 = "subtopology1";
1643016429
String fooTopicName = "foo";
1643116430
Uuid fooTopicId = Uuid.randomUuid();
@@ -16495,7 +16494,6 @@ public void testStreamsReconciliationProcess() {
1649516494
String memberId1 = Uuid.randomUuid().toString();
1649616495
String memberId2 = Uuid.randomUuid().toString();
1649716496
String memberId3 = Uuid.randomUuid().toString();
16498-
1649916497
String subtopology1 = "subtopology1";
1650016498
String fooTopicName = "foo";
1650116499
Uuid fooTopicId = Uuid.randomUuid();
@@ -17003,7 +17001,6 @@ public void testStreamsStreamsGroupStates() {
1700317001
public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() {
1700417002
String groupId = "fooup";
1700517003
String memberId1 = Uuid.randomUuid().toString();
17006-
1700717004
String subtopology1 = "subtopology1";
1700817005
String fooTopicName = "foo";
1700917006
Uuid fooTopicId = Uuid.randomUuid();
@@ -17045,7 +17042,6 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() {
1704517042
public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() {
1704617043
String groupId = "fooup";
1704717044
String memberId = Uuid.randomUuid().toString();
17048-
1704917045
String subtopology1 = "subtopology1";
1705017046
String fooTopicName = "foo";
1705117047
Uuid fooTopicId = Uuid.randomUuid();
@@ -17142,7 +17138,6 @@ public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() {
1714217138
public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() {
1714317139
String groupId = "fooup";
1714417140
String memberId = Uuid.randomUuid().toString();
17145-
1714617141
String subtopology1 = "subtopology1";
1714717142
String fooTopicName = "foo";
1714817143
Uuid fooTopicId = Uuid.randomUuid();
@@ -17259,7 +17254,6 @@ public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() {
1725917254
public void testStreamsSessionTimeoutLifecycle() {
1726017255
String groupId = "fooup";
1726117256
String memberId = Uuid.randomUuid().toString();
17262-
1726317257
String subtopology1 = "subtopology1";
1726417258
String fooTopicName = "foo";
1726517259
Uuid fooTopicId = Uuid.randomUuid();
@@ -17336,7 +17330,6 @@ public void testStreamsSessionTimeoutLifecycle() {
1733617330
public void testStreamsSessionTimeoutExpiration() {
1733717331
String groupId = "fooup";
1733817332
String memberId = Uuid.randomUuid().toString();
17339-
1734017333
String subtopology1 = "subtopology1";
1734117334
String fooTopicName = "foo";
1734217335
Uuid fooTopicId = Uuid.randomUuid();
@@ -17402,7 +17395,6 @@ public void testStreamsRebalanceTimeoutLifecycle() {
1740217395
String groupId = "fooup";
1740317396
String memberId1 = Uuid.randomUuid().toString();
1740417397
String memberId2 = Uuid.randomUuid().toString();
17405-
1740617398
String subtopology1 = "subtopology1";
1740717399
String fooTopicName = "foo";
1740817400
Uuid fooTopicId = Uuid.randomUuid();
@@ -17558,7 +17550,6 @@ public void testStreamsRebalanceTimeoutExpiration() {
1755817550
String groupId = "fooup";
1755917551
String memberId1 = Uuid.randomUuid().toString();
1756017552
String memberId2 = Uuid.randomUuid().toString();
17561-
1756217553
String subtopology1 = "subtopology1";
1756317554
String fooTopicName = "foo";
1756417555
Uuid fooTopicId = Uuid.randomUuid();

0 commit comments

Comments
 (0)