forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtxnoffsetcommit.go
142 lines (118 loc) · 4.21 KB
/
txnoffsetcommit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/txnoffsetcommit"
)
// TxnOffsetCommitRequest represents a request sent to a kafka broker to commit
// offsets for a partition within a transaction.
type TxnOffsetCommitRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The transactional id key.
TransactionalID string
// ID of the consumer group to publish the offsets for.
GroupID string
// The Producer ID (PID) for the current producer session;
// received from an InitProducerID request.
ProducerID int
// The epoch associated with the current producer session for the given PID
ProducerEpoch int
// GenerationID is the current generation for the group.
GenerationID int
// ID of the group member submitting the offsets.
MemberID string
// GroupInstanceID is a unique identifier for the consumer.
GroupInstanceID string
// Set of topic partitions to publish the offsets for.
//
// Not that offset commits need to be submitted to the broker acting as the
// group coordinator. This will be automatically resolved by the transport.
Topics map[string][]TxnOffsetCommit
}
// TxnOffsetCommit represent the commit of an offset to a partition within a transaction.
//
// The extra metadata is opaque to the kafka protocol, it is intended to hold
// information like an identifier for the process that committed the offset,
// or the time at which the commit was made.
type TxnOffsetCommit struct {
Partition int
Offset int64
Metadata string
}
// TxnOffsetFetchResponse represents a response from a kafka broker to an offset
// commit request within a transaction.
type TxnOffsetCommitResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Set of topic partitions that the kafka broker has accepted offset commits
// for.
Topics map[string][]TxnOffsetCommitPartition
}
// TxnOffsetFetchPartition represents the state of a single partition in responses
// to committing offsets within a transaction.
type TxnOffsetCommitPartition struct {
// ID of the partition.
Partition int
// An error that may have occurred while attempting to publish consumer
// group offsets for this partition.
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker. Programs may use the standard errors.Is
// function to test the error against kafka error codes.
Error error
}
// TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns the
// response.
func (c *Client) TxnOffsetCommit(
ctx context.Context,
req *TxnOffsetCommitRequest,
) (*TxnOffsetCommitResponse, error) {
protoReq := &txnoffsetcommit.Request{
TransactionalID: req.TransactionalID,
GroupID: req.GroupID,
ProducerID: int64(req.ProducerID),
ProducerEpoch: int16(req.ProducerEpoch),
GenerationID: int32(req.GenerationID),
MemberID: req.MemberID,
GroupInstanceID: req.GroupInstanceID,
Topics: make([]txnoffsetcommit.RequestTopic, 0, len(req.Topics)),
}
for topic, partitions := range req.Topics {
parts := make([]txnoffsetcommit.RequestPartition, len(partitions))
for i, partition := range partitions {
parts[i] = txnoffsetcommit.RequestPartition{
Partition: int32(partition.Partition),
CommittedOffset: int64(partition.Offset),
CommittedMetadata: partition.Metadata,
}
}
t := txnoffsetcommit.RequestTopic{
Name: topic,
Partitions: parts,
}
protoReq.Topics = append(protoReq.Topics, t)
}
m, err := c.roundTrip(ctx, req.Addr, protoReq)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).TxnOffsetCommit: %w", err)
}
r := m.(*txnoffsetcommit.Response)
res := &TxnOffsetCommitResponse{
Throttle: makeDuration(r.ThrottleTimeMs),
Topics: make(map[string][]TxnOffsetCommitPartition, len(r.Topics)),
}
for _, topic := range r.Topics {
partitions := make([]TxnOffsetCommitPartition, 0, len(topic.Partitions))
for _, partition := range topic.Partitions {
partitions = append(partitions, TxnOffsetCommitPartition{
Partition: int(partition.Partition),
Error: makeError(partition.ErrorCode, ""),
})
}
res.Topics[topic.Name] = partitions
}
return res, nil
}