Skip to content

Commit 4a1bec0

Browse files
authored
fix: issue of last partition not included for certain groups weightage (#104)
1 parent db1bc29 commit 4a1bec0

File tree

2 files changed

+70
-5
lines changed

2 files changed

+70
-5
lines changed

kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupProfile.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,7 @@ public WeightedGroupProfile(PartitionerProfile profile) {
4141
groupConfig ->
4242
buildEntriesForEachMember(groupConfig, weightConsumedSoFar, totalWeight))
4343
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
44-
this.defaultGroup =
45-
new WeightedGroup(
46-
"[[default]]",
47-
weightConsumedSoFar.get(),
48-
weightConsumedSoFar.addAndGet(defaultGroupWeight / totalWeight));
44+
this.defaultGroup = new WeightedGroup("[[default]]", weightConsumedSoFar.get(), 1.0);
4945

5046
log.info(
5147
"partitioner default group config - weight range: {}, range end: {}",

kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java

+69
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,75 @@ public void testWithNonMultipleWeightRatio() {
228228
}
229229
}
230230

231+
@Test
232+
public void testWithNonMultipleRoundingWeightRatio() {
233+
234+
int numPartitions = 64;
235+
int testCount = numPartitions * 2;
236+
int lastPartitionSeen = -1;
237+
238+
WeightedGroupPartitioner<String, String> partitioner =
239+
getPartitionerForTestWithNonMultipleRoundingWeightRatio();
240+
int partition;
241+
242+
// Test case 1: tenant-1 belong to group-1 (partitions: [0 to 22])
243+
for (int i = 1; i <= testCount; i++) {
244+
partition = partitioner.partition("test-topic", "tenant-1", "span-" + i, numPartitions);
245+
if (partition > lastPartitionSeen) lastPartitionSeen = partition;
246+
assertTrue(
247+
partition >= 0 && partition <= 22,
248+
"actual partition not in expected range. partition: " + partition);
249+
}
250+
251+
// Test case 2: tenant-2 belong to group-2 (partitions: [23 to 40])
252+
for (int i = 1; i <= testCount; i++) {
253+
partition = partitioner.partition("test-topic", "tenant-2", "span-" + i, numPartitions);
254+
if (partition > lastPartitionSeen) lastPartitionSeen = partition;
255+
assertTrue(
256+
partition >= 23 && partition <= 40,
257+
"actual partition not in expected range. partition: " + partition);
258+
}
259+
260+
// Test case 3: tenant-3 belong to group-2 (partitions: [23 to 40])
261+
for (int i = 1; i <= testCount; i++) {
262+
partition = partitioner.partition("test-topic", "tenant-3", "span-" + i, numPartitions);
263+
if (partition > lastPartitionSeen) lastPartitionSeen = partition;
264+
assertTrue(
265+
partition >= 23 && partition <= 40,
266+
"actual partition not in expected range. partition: " + partition);
267+
}
268+
269+
// Test case 4: groupKey=unknown should use default group [41 to 63]
270+
for (int i = 1; i <= testCount; i++) {
271+
partition = partitioner.partition("test-topic", "unknown", "span-" + i, numPartitions);
272+
if (partition > lastPartitionSeen) lastPartitionSeen = partition;
273+
assertTrue(
274+
partition >= 41 && partition <= 63,
275+
"actual partition not in expected range. partition: " + partition);
276+
}
277+
assertEquals(lastPartitionSeen, numPartitions - 1);
278+
}
279+
280+
private WeightedGroupPartitioner<String, String>
281+
getPartitionerForTestWithNonMultipleRoundingWeightRatio() {
282+
PartitionerConfigServiceClient testClient =
283+
(profileName) ->
284+
new WeightedGroupProfile(
285+
PartitionerProfile.newBuilder()
286+
.addGroups(newPartitionerGroup("group1", new String[] {"tenant-1"}, 37))
287+
.addGroups(
288+
newPartitionerGroup("group2", new String[] {"tenant-2", "tenant-3"}, 29))
289+
.addGroups(newPartitionerGroup("group3", new String[] {"tenant-4"}, 16))
290+
.setDefaultGroupWeight(20)
291+
.setName(profileName)
292+
.build());
293+
294+
WeightedGroupPartitioner<String, String> partitioner =
295+
new WeightedGroupPartitioner<>(
296+
"spans", testClient, groupKeyExtractor, roundRobinPartitioner);
297+
return partitioner;
298+
}
299+
231300
private PartitionerConfigServiceClient getTestServiceClient() {
232301
return (profileName) ->
233302
new WeightedGroupProfile(

0 commit comments

Comments
 (0)