diff --git a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs index 96087366..e4a541c8 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs @@ -24,7 +24,7 @@ internal interface ICommittedMarker /// /// Used by /// - internal sealed record PartitionOffsetCommittedMarker : GroupTopicPartitionOffset + internal sealed record PartitionOffsetCommittedMarker { /// /// Committed marker @@ -32,15 +32,27 @@ internal sealed record PartitionOffsetCommittedMarker : GroupTopicPartitionOffse public ICommittedMarker CommittedMarker { get; } public PartitionOffsetCommittedMarker(string groupId, string topic, int partition, Offset offset, ICommittedMarker committedMarker) - : base(groupId, topic, partition, offset) + : this(new GroupTopicPartitionOffset(groupId, topic, partition, offset), committedMarker) { CommittedMarker = committedMarker; } - public PartitionOffsetCommittedMarker(GroupTopicPartition groupTopicPartition, Offset offset, ICommittedMarker committedMarker) - : base(groupTopicPartition, offset) + public PartitionOffsetCommittedMarker(GroupTopicPartitionOffset groupTopicPartition, ICommittedMarker committedMarker) { CommittedMarker = committedMarker; + GroupTopicPartitionOffset = groupTopicPartition; } + + public GroupTopicPartitionOffset GroupTopicPartitionOffset { get; } + + public string GroupId => GroupTopicPartitionOffset.GroupId; + + public string Topic => GroupTopicPartitionOffset.Topic; + + public int Partition => GroupTopicPartitionOffset.Partition; + + public Offset Offset => GroupTopicPartitionOffset.Offset; + + public GroupTopicPartition GroupTopicPartition => GroupTopicPartitionOffset.GroupTopicPartition; } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs index 91d4b8ea..8e5c3771 100644 --- a/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs +++ b/src/Akka.Streams.Kafka/Messages/GroupTopicPartitionOffset.cs @@ -6,57 +6,46 @@ namespace Akka.Streams.Kafka.Messages /// /// Offset position for a groupId, topic, partition. /// - public record GroupTopicPartitionOffset + public readonly record struct GroupTopicPartitionOffset(GroupTopicPartition GroupTopicPartition, Offset Offset) { - /// - /// GroupTopicPartitionOffset - /// public GroupTopicPartitionOffset(string groupId, string topic, int partition, Offset offset) - { - GroupId = groupId; - Topic = topic; - Partition = partition; - Offset = offset; - } - - /// - /// GroupTopicPartitionOffset - /// - public GroupTopicPartitionOffset(GroupTopicPartition groupTopicPartition, Offset offset) - : this(groupTopicPartition.GroupId, groupTopicPartition.Topic, groupTopicPartition.Partition, offset) + : this(new GroupTopicPartition(groupId, topic, partition), offset) { } /// /// Consumer's group Id /// - public string GroupId { get; } + public string GroupId => GroupTopicPartition.GroupId; + /// /// Topic /// - public string Topic { get; } + public string Topic => GroupTopicPartition.Topic; + /// /// Partition /// - public int Partition { get; } + public int Partition => GroupTopicPartition.Partition; + /// /// Kafka partition offset value /// - public Offset Offset { get; } - + public Offset Offset { get; } = Offset; + /// /// Group topic partition info /// - public GroupTopicPartition GroupTopicPartition => new(GroupId, Topic, Partition); + public GroupTopicPartition GroupTopicPartition { get; } = GroupTopicPartition; } /// /// Group, topic and partition info /// - public sealed record GroupTopicPartition(string GroupId, string Topic, int Partition) + public readonly record struct GroupTopicPartition(string GroupId, string Topic, int Partition) { public TopicPartition TopicPartition { get; } = new(Topic, Partition); } - public sealed record OffsetAndMetadata(Offset Offset, string Metadata); + public readonly record struct OffsetAndMetadata(Offset Offset, string Metadata); } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index 7a613a5b..13f4981a 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -172,7 +172,7 @@ public TransactionalMessage CreateMessage(ConsumeResult record) record.Offset, _transactionalMessageBuilderStage.CommittedMarker); - return new TransactionalMessage(record, offset); + return new TransactionalMessage(record, offset.GroupTopicPartitionOffset); } } } \ No newline at end of file