-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathclient.go
138 lines (116 loc) · 3.71 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package rawtopic
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)
type Client struct {
service Ydb_Topic_V1.TopicServiceClient
}
func NewClient(service Ydb_Topic_V1.TopicServiceClient) Client {
return Client{service: service}
}
func (c *Client) AlterTopic(ctx context.Context, req *AlterTopicRequest) (res AlterTopicResult, err error) {
resp, err := c.service.AlterTopic(ctx, req.ToProto())
if err != nil {
return res, xerrors.WithStackTrace(fmt.Errorf("ydb: alter topic grpc failed: %w", err))
}
err = res.FromProto(resp)
return res, err
}
func (c *Client) CreateTopic(
ctx context.Context,
req *CreateTopicRequest,
) (res CreateTopicResult, err error) {
resp, err := c.service.CreateTopic(ctx, req.ToProto())
if err != nil {
return res, xerrors.WithStackTrace(fmt.Errorf("ydb: create topic grpc failed: %w", err))
}
err = res.FromProto(resp)
return res, err
}
func (c *Client) DescribeTopic(ctx context.Context, req DescribeTopicRequest) (res DescribeTopicResult, err error) {
resp, err := c.service.DescribeTopic(ctx, req.ToProto())
if err != nil {
return DescribeTopicResult{}, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: describe topic grpc failed: %w", err),
),
)
}
err = res.FromProto(resp)
return res, err
}
func (c *Client) DescribeConsumer(ctx context.Context, req DescribeConsumerRequest) (
res DescribeConsumerResult, err error,
) {
resp, err := c.service.DescribeConsumer(ctx, req.ToProto())
if err != nil {
return DescribeConsumerResult{}, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: describe topic consumer grpc failed: %w", err),
),
)
}
err = res.FromProto(resp)
return res, err
}
func (c *Client) DropTopic(
ctx context.Context,
req DropTopicRequest,
) (res DropTopicResult, err error) {
resp, err := c.service.DropTopic(ctx, req.ToProto())
if err != nil {
return res, xerrors.WithStackTrace(fmt.Errorf("ydb: drop topic grpc failed: %w", err))
}
err = res.FromProto(resp)
return res, err
}
func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.StreamReader, error) {
protoResp, err := c.service.StreamRead(ctxStreamLifeTime)
if err != nil {
return rawtopicreader.StreamReader{}, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: failed start grpc topic stream read: %w", err),
),
)
}
return rawtopicreader.StreamReader{Stream: protoResp}, nil
}
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
logContext *context.Context,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
return nil, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: failed start grpc topic stream write: %w", err),
),
)
}
return &rawtopicwriter.StreamWriter{
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
LogContext: logContext,
}, nil
}
func (c *Client) UpdateOffsetsInTransaction(
ctx context.Context,
req *UpdateOffsetsInTransactionRequest,
) error {
protoResp, err := c.service.UpdateOffsetsInTransaction(ctx, req.ToProto())
if err != nil {
return xerrors.WithStackTrace(fmt.Errorf("ydb: update offsets in transaction failed: %w", err))
}
var operation rawydb.Operation
return operation.FromProtoWithStatusCheck(protoResp.GetOperation())
}