From a49c133c5c15b3afb4630b4ac1effe6cf7048339 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 18 Mar 2025 22:50:20 -0500 Subject: [PATCH] made committable data members into `readonly struct`s These objects get allocated frequently in the committing pipeline and are usually rooted as part of a larger data structure. --- .../Messages/CommittedMarker.cs | 20 ++++++++-- .../Messages/GroupTopicPartitionOffset.cs | 37 +++++++------------ .../Stages/Consumers/MessageBuilders.cs | 2 +- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs index 96087366..e4a541c8 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs @@ -24,7 +24,7 @@ internal interface ICommittedMarker /// /// Used by /// - internal sealed record PartitionOffsetCommittedMarker : GroupTopicPartitionOffset + internal sealed record PartitionOffsetCommittedMarker { /// /// Committed marker @@ -32,15 +32,27 @@ internal sealed record PartitionOffsetCommittedMarker : GroupTopicPartitionOffse public ICommittedMarker CommittedMarker { get; } public PartitionOffsetCommittedMarker(string groupId, string topic, int partition, Offset offset, ICommittedMarker committedMarker) - : base(groupId, topic, partition, offset) + : this(new GroupTopicPartitionOffset(groupId, topic, partition, offset), committedMarker) { CommittedMarker = committedMarker; } - public PartitionOffsetCommittedMarker(GroupTopicPartition groupTopicPartition, Offset offset, ICommittedMarker committedMarker) - : base(groupTopicPartition, offset) + public PartitionOffsetCommittedMarker(GroupTopicPartitionOffset groupTopicPartition, ICommittedMarker committedMarker) { CommittedMarker = committedMarker; + GroupTopicPartitionOffset = groupTopicPartition; } + + public GroupTopicPartitionOffset GroupTopicPartitionOffset { get; } + + public string GroupId => GroupTopicPartitionOffset.GroupId; + + public string Topic => GroupTopicPartitionOffset.Topic; + + public int Partition => GroupTopicPartitionOffset.Partition; + + public Offset Offset => GroupTopicPartitionOffset.Offset; + + public GroupTopicPartition GroupTopicPartition => GroupTopicPartitionOffset.GroupTopicPartition; } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs index 91d4b8ea..8e5c3771 100644 --- a/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs +++ b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs @@ -6,57 +6,46 @@ namespace Akka.Streams.Kafka.Messages /// /// Offset position for a groupId, topic, partition. /// - public record GroupTopicPartitionOffset + public readonly record struct GroupTopicPartitionOffset(GroupTopicPartition GroupTopicPartition, Offset Offset) { - /// - /// GroupTopicPartitionOffset - /// public GroupTopicPartitionOffset(string groupId, string topic, int partition, Offset offset) - { - GroupId = groupId; - Topic = topic; - Partition = partition; - Offset = offset; - } - - /// - /// GroupTopicPartitionOffset - /// - public GroupTopicPartitionOffset(GroupTopicPartition groupTopicPartition, Offset offset) - : this(groupTopicPartition.GroupId, groupTopicPartition.Topic, groupTopicPartition.Partition, offset) + : this(new GroupTopicPartition(groupId, topic, partition), offset) { } /// /// Consumer's group Id /// - public string GroupId { get; } + public string GroupId => GroupTopicPartition.GroupId; + /// /// Topic /// - public string Topic { get; } + public string Topic => GroupTopicPartition.Topic; + /// /// Partition /// - public int Partition { get; } + public int Partition => GroupTopicPartition.Partition; + /// /// Kafka partition offset value /// - public Offset Offset { get; } - + public Offset Offset { get; } = Offset; + /// /// Group topic partition info /// - public GroupTopicPartition GroupTopicPartition => new(GroupId, Topic, Partition); + public GroupTopicPartition GroupTopicPartition { get; } = GroupTopicPartition; } /// /// Group, topic and partition info /// - public sealed record GroupTopicPartition(string GroupId, string Topic, int Partition) + public readonly record struct GroupTopicPartition(string GroupId, string Topic, int Partition) { public TopicPartition TopicPartition { get; } = new(Topic, Partition); } - public sealed record OffsetAndMetadata(Offset Offset, string Metadata); + public readonly record struct OffsetAndMetadata(Offset Offset, string Metadata); } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index 7a613a5b..13f4981a 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -172,7 +172,7 @@ public TransactionalMessage CreateMessage(ConsumeResult record) record.Offset, _transactionalMessageBuilderStage.CommittedMarker); - return new TransactionalMessage(record, offset); + return new TransactionalMessage(record, offset.GroupTopicPartitionOffset); } } } \ No newline at end of file