Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-19001: Use streams group-level configurations in heartbeat #19219

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* @return A result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(BiFunction<Strin
* @param ownedAssignment A collection of active, standby and warm-up tasks
* @return This object.
*/
protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
public CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
this.ownedTasks = Optional.ofNullable(ownedAssignment);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,29 +330,61 @@ public String staticMemberId(String groupInstanceId) {
}

/**
* Gets or creates a new member but without adding it to the group. Adding a member is done via the
* Gets a new member or throws an exception, if the member does not exist.
*
* @param memberId The member ID.
* @throws UnknownMemberIdException If the member is not found.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getMemberOrThrow(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}

throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId)
);
}

/**
* Gets or creates a new member, but keeping its fields uninitialized. This is used on the replay-path.
* The member is not added to the group, adding a member is done via the
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
*
* @param memberId The member ID.
* @param createIfNotExists Booleans indicating whether the member must be created if it does not exist.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getOrMaybeCreateMember(
String memberId,
boolean createIfNotExists
public StreamsGroupMember getOrCreateUninitializedMember(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}

if (!createIfNotExists) {
throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId)
);
return new StreamsGroupMember.Builder(memberId).build();
}

/**
* Gets or creates a new member, setting default values on the fields. This is used on the replay-path.
* The member is not added to the group, adding a member is done via the
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
*
* @param memberId The member ID.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getOrCreateDefaultMember(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}

return new StreamsGroupMember.Builder(memberId).build();
return StreamsGroupMember.Builder.withDefaults(memberId).build();
}

/**
Expand All @@ -363,7 +395,7 @@ public StreamsGroupMember getOrMaybeCreateMember(
*/
public StreamsGroupMember staticMember(String instanceId) {
String existingMemberId = staticMemberId(instanceId);
return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
return existingMemberId == null ? null : getMemberOrThrow(existingMemberId);
}

/**
Expand Down Expand Up @@ -656,7 +688,7 @@ public void validateOffsetCommit(
memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && groupInstanceId == null)
return;

final StreamsGroupMember member = getOrMaybeCreateMember(memberId, false);
final StreamsGroupMember member = getMemberOrThrow(memberId);

// If the commit is not transactional and the member uses the new streams protocol (KIP-1071),
// the member should be using the OffsetCommit API version >= 9.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,21 @@ private static Map<String, Set<Integer>> assignmentFromTaskIds(
taskIds -> Set.copyOf(taskIds.partitions())));
}

public static Builder withDefaults(String memberId) {
return new Builder(memberId)
.setRebalanceTimeoutMs(-1)
.setTopologyEpoch(-1)
.setInstanceId(null)
.setRackId(null)
.setProcessId("")
.setClientTags(Collections.emptyMap())
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setUserEndpoint(null);
}

public StreamsGroupMember build() {
return new StreamsGroupMember(
memberId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
Expand Down Expand Up @@ -81,4 +82,17 @@ public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) {
record.subtopologies().stream().collect(Collectors.toMap(Subtopology::subtopologyId, x -> x))
);
}

/**
* Creates an instance of StreamsTopology from a StreamsGroupHeartbeatRequestData request.
*
* @param topology The topology supplied in the request.
* @return The instance of StreamsTopology created from the request.
*/
public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestData.Topology topology) {
StreamsGroupTopologyValue recordValue = StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology);
final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.subtopologies().stream()
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
return new StreamsTopology(topology.epoch(), subtopologyMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -30,11 +34,11 @@
* An immutable tuple containing active, standby and warm-up tasks.
*
* @param activeTasks Active tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
* @param standbyTasks Standby tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
* @param warmupTasks Warm-up tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
*/
public record TasksTuple(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
Expand Down Expand Up @@ -88,7 +92,7 @@ private static Map<String, Set<Integer>> merge(final Map<String, Set<Integer>> t
/**
* Checks if this task tuple contains any of the tasks in another task tuple.
*
* @param other The other task tuple.
* @param other Another task tuple.
* @return true if there is at least one active, standby or warm-up task that is present in both tuples.
*/
public boolean containsAny(TasksTuple other) {
Expand Down Expand Up @@ -130,4 +134,63 @@ public static TasksTuple fromTargetAssignmentRecord(StreamsGroupTargetAssignment
)
);
}

public String toString() {
return "(active=" + taskAssignmentToString(activeTasks) +
", standby=" + taskAssignmentToString(standbyTasks) +
", warmup=" + taskAssignmentToString(warmupTasks) +
')';
}

public static TasksTuple fromHeartbeatRequest(final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks) {
return new TasksTuple(
ownedActiveTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
ownedStandbyTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
ownedWarmupTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
)
);
}

/**
* @return The provided assignment as a String.
*
* Example:
* [subtopologyID1-0, subtopologyID1-1, subtopologyID2-0, subtopologyID2-1]
*/
private static String taskAssignmentToString(
Map<String, Set<Integer>> assignment
) {
StringBuilder builder = new StringBuilder("[");
Iterator<Entry<String, Set<Integer>>> subtopologyIterator = assignment.entrySet().iterator();
while (subtopologyIterator.hasNext()) {
Map.Entry<String, Set<Integer>> entry = subtopologyIterator.next();
Iterator<Integer> partitionsIterator = entry.getValue().iterator();
while (partitionsIterator.hasNext()) {
builder.append(entry.getKey());
builder.append("-");
builder.append(partitionsIterator.next());
if (partitionsIterator.hasNext() || subtopologyIterator.hasNext()) {
builder.append(", ");
}
}
}
builder.append("]");
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
/**
* The task assignment for a Streams group member.
*
* @param activeTasks The target tasks assigned to this member keyed by subtopologyId.
* @param activeTasks The active tasks assigned to this member keyed by subtopologyId.
* @param standbyTasks The standby tasks assigned to this member keyed by subtopologyId.
* @param warmupTasks The warm-up tasks assigned to this member keyed by subtopologyId.
*/
public record MemberAssignment(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo
enforceCopartitioning(
topology,
copartitionGroupsBySubtopology,
log,
decidedPartitionCountsForInternalTopics,
copartitionedTopicsEnforcer
);
Expand All @@ -168,7 +167,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo

private static void enforceCopartitioning(final StreamsTopology topology,
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger log,
final Map<String, Integer> decidedPartitionCountsForInternalTopics,
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) {
final Set<String> fixedRepartitionTopics =
Expand All @@ -180,17 +178,13 @@ private static void enforceCopartitioning(final StreamsTopology topology,
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
} else {
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
}
}
Expand Down
Loading