diff --git a/kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupProfile.java b/kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupProfile.java index cdc35e5..77b806c 100644 --- a/kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupProfile.java +++ b/kafka-streams-partitioners/weighted-group-partitioner/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupProfile.java @@ -41,11 +41,7 @@ public WeightedGroupProfile(PartitionerProfile profile) { groupConfig -> buildEntriesForEachMember(groupConfig, weightConsumedSoFar, totalWeight)) .collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue)); - this.defaultGroup = - new WeightedGroup( - "[[default]]", - weightConsumedSoFar.get(), - weightConsumedSoFar.addAndGet(defaultGroupWeight / totalWeight)); + this.defaultGroup = new WeightedGroup("[[default]]", weightConsumedSoFar.get(), 1.0); log.info( "partitioner default group config - weight range: {}, range end: {}", diff --git a/kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java b/kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java index ffc3ca0..b0a6a44 100644 --- a/kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java +++ b/kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java @@ -228,6 +228,75 @@ public void testWithNonMultipleWeightRatio() { } } + @Test + public void testWithNonMultipleRoundingWeightRatio() { + + int numPartitions = 64; + int testCount = numPartitions * 2; + int lastPartitionSeen = -1; + + WeightedGroupPartitioner partitioner = + getPartitionerForTestWithNonMultipleRoundingWeightRatio(); + int partition; + + // Test case 1: tenant-1 belong to group-1 (partitions: [0 to 22]) + for (int i = 1; i <= testCount; i++) { + partition = partitioner.partition("test-topic", "tenant-1", "span-" + i, numPartitions); + if (partition > lastPartitionSeen) lastPartitionSeen = partition; + assertTrue( + partition >= 0 && partition <= 22, + "actual partition not in expected range. partition: " + partition); + } + + // Test case 2: tenant-2 belong to group-2 (partitions: [23 to 40]) + for (int i = 1; i <= testCount; i++) { + partition = partitioner.partition("test-topic", "tenant-2", "span-" + i, numPartitions); + if (partition > lastPartitionSeen) lastPartitionSeen = partition; + assertTrue( + partition >= 23 && partition <= 40, + "actual partition not in expected range. partition: " + partition); + } + + // Test case 3: tenant-3 belong to group-2 (partitions: [23 to 40]) + for (int i = 1; i <= testCount; i++) { + partition = partitioner.partition("test-topic", "tenant-3", "span-" + i, numPartitions); + if (partition > lastPartitionSeen) lastPartitionSeen = partition; + assertTrue( + partition >= 23 && partition <= 40, + "actual partition not in expected range. partition: " + partition); + } + + // Test case 4: groupKey=unknown should use default group [41 to 63] + for (int i = 1; i <= testCount; i++) { + partition = partitioner.partition("test-topic", "unknown", "span-" + i, numPartitions); + if (partition > lastPartitionSeen) lastPartitionSeen = partition; + assertTrue( + partition >= 41 && partition <= 63, + "actual partition not in expected range. partition: " + partition); + } + assertEquals(lastPartitionSeen, numPartitions - 1); + } + + private WeightedGroupPartitioner + getPartitionerForTestWithNonMultipleRoundingWeightRatio() { + PartitionerConfigServiceClient testClient = + (profileName) -> + new WeightedGroupProfile( + PartitionerProfile.newBuilder() + .addGroups(newPartitionerGroup("group1", new String[] {"tenant-1"}, 37)) + .addGroups( + newPartitionerGroup("group2", new String[] {"tenant-2", "tenant-3"}, 29)) + .addGroups(newPartitionerGroup("group3", new String[] {"tenant-4"}, 16)) + .setDefaultGroupWeight(20) + .setName(profileName) + .build()); + + WeightedGroupPartitioner partitioner = + new WeightedGroupPartitioner<>( + "spans", testClient, groupKeyExtractor, roundRobinPartitioner); + return partitioner; + } + private PartitionerConfigServiceClient getTestServiceClient() { return (profileName) -> new WeightedGroupProfile(