-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathws_query_handler.go
369 lines (315 loc) · 13 KB
/
ws_query_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
package handlers
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/providers/base/websocket/errors"
"github.com/skip-mev/connect/v2/providers/base/websocket/metrics"
providertypes "github.com/skip-mev/connect/v2/providers/types"
)
// WebSocketQueryHandler is an interface that encapsulates querying a websocket
// data provider for info. The handler must respect the context timeout and close
// the connection if the context is cancelled. All responses must be sent to the
// response channel. These are processed asynchronously by the provider.
//
//go:generate mockery --name WebSocketQueryHandler --output ./mocks/ --case underscore
type WebSocketQueryHandler[K providertypes.ResponseKey, V providertypes.ResponseValue] interface {
// Start should initialize the websocket connection and start listening for
// the data (i.e. ids). All websocket responses should be sent to the response
// channel.
Start(ctx context.Context, ids []K, responseCh chan<- providertypes.GetResponse[K, V]) error
// Copy is used to create a copy of the query handler. This is useful for creating
// multiple connections to the same data provider.
Copy() WebSocketQueryHandler[K, V]
}
// WebSocketQueryHandlerImpl is the default websocket implementation of the
// WebSocketQueryHandler interface. This is used to establish a connection to the data
// provider and subscribe to events for a given set of IDs. It runs in a separate go
// routine and will send all responses to the response channel as they are received.
type WebSocketQueryHandlerImpl[K providertypes.ResponseKey, V providertypes.ResponseValue] struct {
logger *zap.Logger
metrics metrics.WebSocketMetrics
config config.WebSocketConfig
// The connection handler is used to manage the connection to the data provider. This
// establishes the connection and sends/receives messages to/from the data provider.
connHandler WebSocketConnHandler
// The data handler is used to handle messages received from the data provider. This
// is used to parse the messages and send responses to the provider.
dataHandler WebSocketDataHandler[K, V]
// ids is the set of IDs that the provider will fetch data for.
ids []K
}
// NewWebSocketQueryHandler creates a new websocket query handler.
func NewWebSocketQueryHandler[K providertypes.ResponseKey, V providertypes.ResponseValue](
logger *zap.Logger,
config config.WebSocketConfig,
dataHandler WebSocketDataHandler[K, V],
connHandler WebSocketConnHandler,
m metrics.WebSocketMetrics,
) (WebSocketQueryHandler[K, V], error) {
if err := config.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
if !config.Enabled {
return nil, fmt.Errorf("websocket is not enabled")
}
if logger == nil {
return nil, fmt.Errorf("logger is nil")
}
if dataHandler == nil {
return nil, fmt.Errorf("websocket data handler is nil")
}
if connHandler == nil {
return nil, fmt.Errorf("connection handler is nil")
}
if m == nil {
return nil, fmt.Errorf("websocket metrics is nil")
}
return &WebSocketQueryHandlerImpl[K, V]{
logger: logger.With(zap.String("web_socket_data_handler", config.Name)),
config: config,
dataHandler: dataHandler,
connHandler: connHandler,
metrics: m,
}, nil
}
// Start is used to start the connection to the data provider and start listening for
// the data (i.e. ids). All websocket responses should be sent to the response channel.
// Start will first:
// 1. Create the initial set of events that the channel will subscribe to using the data
// handler.
// 2. Start the connection to the data provider using the connection handler and url from
// the data handler.
// 3. Send the initial payload to the data provider.
// 4. Start receiving messages from the data provider.
func (h *WebSocketQueryHandlerImpl[K, V]) Start(
ctx context.Context,
ids []K,
responseCh chan<- providertypes.GetResponse[K, V],
) error {
defer func() {
if err := recover(); err != nil {
h.logger.Error("panic occurred", zap.Any("err", err))
}
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.Unhealthy)
}()
if responseCh == nil {
h.logger.Debug("response channel is nil")
return fmt.Errorf("response channel is nil")
}
h.ids = ids
if len(h.ids) == 0 {
h.logger.Debug("no ids to query; exiting")
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Initialize the connection to the data provider and subscribe to the events
// for the corresponding IDs.
if err := h.start(ctx); err != nil {
responseCh <- providertypes.NewGetResponseWithErr[K, V](
ids,
providertypes.NewErrorWithCode(
err,
providertypes.ErrorWebsocketStartFail,
),
)
return fmt.Errorf("failed to start connection: %w", err)
}
if h.config.PingInterval > 0 {
go h.heartBeat(ctx)
}
// Start receiving messages from the data provider.
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.Healthy)
return h.recv(ctx, responseCh)
}
// start is used to start the connection to the data provider.
func (h *WebSocketQueryHandlerImpl[K, V]) start(ctx context.Context) error {
// Start the connection.
h.logger.Debug("creating connection to data provider")
if err := h.connHandler.Dial(); err != nil {
h.logger.Debug("failed to create connection with data provider", zap.Error(err))
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.DialErr)
return errors.ErrDialWithErr(err)
}
// Wait for the connection timeout before sending the initial payload(s).
time.Sleep(h.config.PostConnectionTimeout)
// Create the initial set of events that the channel will subscribe to.
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.DialSuccess)
messages, err := h.dataHandler.CreateMessages(h.ids)
if err != nil {
h.logger.Debug("failed to create subscription messages", zap.Error(err))
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.CreateMessageErr)
return errors.ErrCreateMessageWithErr(err)
}
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.CreateMessageSuccess)
h.logger.Debug("connection created; sending initial payload(s)")
for index, message := range messages {
h.logger.Debug("sending payload", zap.String("payload", string(message)))
// Send the initial payload to the data provider.
if err := h.connHandler.Write(message); err != nil {
h.logger.Debug("failed to write message to websocket connection handler", zap.Error(err))
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteErr)
return errors.ErrWriteWithErr(err)
}
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteSuccess)
// Wait for the write interval before sending the next message.
if index != len(messages)-1 {
select {
case <-ctx.Done():
h.logger.Debug("context finished; stopping initial payload")
return h.close()
case <-time.After(h.config.WriteInterval):
h.logger.Debug("finished waiting for write interval")
}
}
}
h.logger.Debug("initial payload sent; websocket connection successfully started")
return nil
}
// heartBeat is used to send heartbeats to the data provider. This will
// send a heartbeat message to the data provider every ping interval.
func (h *WebSocketQueryHandlerImpl[K, V]) heartBeat(ctx context.Context) {
ticker := time.NewTicker(h.config.PingInterval)
defer ticker.Stop()
h.logger.Debug("starting heartbeat loop", zap.Duration("ping_interval", h.config.PingInterval))
for {
select {
case <-ctx.Done():
h.logger.Debug("context finished; stopping heartbeat")
return
case <-ticker.C:
h.logger.Debug("creating heartbeat messages")
msgs, err := h.dataHandler.HeartBeatMessages()
if err != nil {
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.HeartBeatErr)
h.logger.Debug("failed to create heartbeat messages", zap.Error(err))
continue
}
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.HeartBeatSuccess)
h.logger.Debug("sending heartbeat messages to data provider", zap.Int("num_msgs", len(msgs)))
for _, msg := range msgs {
if err := h.connHandler.Write(msg); err != nil {
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteErr)
h.logger.Debug("failed to write heartbeat message", zap.String("message", string(msg)), zap.Error(err))
} else {
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteSuccess)
h.logger.Debug("heartbeat message sent", zap.String("message", string(msg)))
}
}
}
}
}
// recv is used to manage the connection to the data provider.
func (h *WebSocketQueryHandlerImpl[K, V]) recv(ctx context.Context, responseCh chan<- providertypes.GetResponse[K, V]) error {
defer func() {
if err := recover(); err != nil {
h.logger.Error("panic occurred", zap.Any("err", err))
}
}()
h.logger.Debug("starting recv", zap.Int("buffer_size", cap(responseCh)))
// Track the number of read errors. If this exceeds the max read error count,
// we will close the connection and return. Read errors can occur if the data
// provider closes the connection or if the connection is interrupted.
readErrCount := 0
for {
// Track the time it takes to receive a message from the data provider.
now := time.Now().UTC()
select {
case <-ctx.Done():
// Case 1: The context is cancelled. Close the connection and return.
h.logger.Debug("context finished")
if err := h.close(); err != nil {
return err
}
return ctx.Err()
default:
// Case 2: The context is not cancelled. Wait for a message from the data provider.
message, err := h.connHandler.Read()
if err != nil {
h.logger.Error(
"failed to read message from websocket handler",
zap.String("message", string(message)),
zap.Error(err),
)
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.ReadErr)
// If the read error count is greater than the max read error count, close the
// connection and return.
readErrCount++
if readErrCount < h.config.MaxReadErrorCount {
continue
}
h.logger.Error("max read errors reached", zap.Error(err))
if err := h.close(); err != nil {
return err
}
return errors.ErrReadWithErr(err)
}
h.logger.Debug("message received; attempting to handle message", zap.String("message", string(message)))
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.ReadSuccess)
// Handle the message.
response, updateMessage, err := h.dataHandler.HandleMessage(message)
if err != nil {
h.logger.Debug("failed to handle websocket message", zap.Error(err))
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.HandleMessageErr)
continue
}
// Immediately send the response to the response channel. Even if this is
// empty, it will be handled by the provider. Note that if the context has been
// cancelled, we should not send the response to the channel. Otherwise, we risk
// sending a response to a closed channel.
select {
case <-ctx.Done():
h.logger.Debug("context finished")
if err := h.close(); err != nil {
return errors.ErrCloseWithErr(err)
}
return ctx.Err()
case responseCh <- response:
h.logger.Debug("handled message successfully; sent response to response channel", zap.String("response", response.String()))
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.HandleMessageSuccess)
}
// If the update messages are not nil, send it to the data provider.
if len(updateMessage) != 0 {
for _, msg := range updateMessage {
h.logger.Debug("sending update message to data provider", zap.String("update_message", string(msg)))
h.metrics.AddWebSocketDataHandlerStatus(h.config.Name, metrics.CreateMessageSuccess)
if err := h.connHandler.Write(msg); err != nil {
h.logger.Error("failed to write update message", zap.Error(err))
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteErr)
} else {
h.logger.Debug("update message sent")
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.WriteSuccess)
}
}
}
}
// Record the time it took to receive the message.
h.metrics.ObserveWebSocketLatency(h.config.Name, time.Since(now))
}
}
// close is used to close the connection to the data provider.
func (h *WebSocketQueryHandlerImpl[K, V]) close() error {
h.logger.Debug("closing connection to websocket handler")
if err := h.connHandler.Close(); err != nil {
h.logger.Debug("failed to close connection", zap.Error(err))
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.CloseErr)
return errors.ErrCloseWithErr(err)
}
h.logger.Debug("connection closed")
h.metrics.AddWebSocketConnectionStatus(h.config.Name, metrics.CloseSuccess)
return nil
}
// Copy is used to create a copy of the query handler. This is useful for creating
// multiple connections to the same data provider.
func (h *WebSocketQueryHandlerImpl[K, V]) Copy() WebSocketQueryHandler[K, V] {
return &WebSocketQueryHandlerImpl[K, V]{
logger: h.logger,
config: h.config,
dataHandler: h.dataHandler.Copy(),
connHandler: h.connHandler.Copy(),
metrics: h.metrics,
}
}