-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathwebsocket.go
219 lines (169 loc) · 7.83 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package config
import (
"fmt"
"time"
)
const (
// DefaultMaxBufferSize is the default maximum number of messages that the provider
// will buffer at any given time.
DefaultMaxBufferSize = 1024
// DefaultReconnectionTimeout is the default timeout for the provider to attempt
// to reconnect to the websocket endpoint.
DefaultReconnectionTimeout = 10 * time.Second
// DefaultPostConnectionTimeout is the default timeout for the provider to wait
// after a connection is established before sending messages.
DefaultPostConnectionTimeout = 1 * time.Second
// DefaultReadBufferSize is the default I/O read buffer size. If a buffer size of
// 0 is specified, then a default buffer size is used i.e. the buffers allocated
// by the HTTP server.
DefaultReadBufferSize = 0
// DefaultWriteBufferSize is the default I/O write buffer size. If a buffer size of
// 0 is specified, then a default buffer size is used i.e. the buffers allocated
// by the HTTP server.
DefaultWriteBufferSize = 0
// DefaultHandshakeTimeout is the default duration for the handshake to complete.
DefaultHandshakeTimeout = 10 * time.Second
// DefaultEnableCompression is the default value for whether the client should
// attempt to negotiate per message compression (RFC 7692).
DefaultEnableCompression = false
// DefaultReadTimeout is the default read deadline on the underlying network
// connection.
DefaultReadTimeout = 10 * time.Second
// DefaultWriteTimeout is the default write deadline on the underlying network
// connection.
DefaultWriteTimeout = 5 * time.Second
// DefaultPingInterval is the default interval at which the provider should send
// ping messages to the server.
DefaultPingInterval = 0 * time.Second
// DefaultWriteInterval is the default interval at which the provider should send
// write messages to the server.
DefaultWriteInterval = 100 * time.Millisecond
// DefaultMaxReadErrorCount is the default maximum number of read errors that
// the provider will tolerate before closing the connection and attempting to
// reconnect. This default value utilized by the gorilla/websocket package is
// 1000, but we set it to a lower value to allow the provider to reconnect
// faster.
DefaultMaxReadErrorCount = 100
// DefaultMaxSubscriptionsPerConnection is the default maximum subscriptions
// a provider can handle per-connection. When this value is 0, one connection
// will handle all subscriptions.
DefaultMaxSubscriptionsPerConnection = 0
// DefaultMaxSubscriptionsPerBatch is the default maximum number of subscriptions
// that can be assigned to a single batch/write/message.
DefaultMaxSubscriptionsPerBatch = 1
)
// WebSocketConfig defines a config for a websocket based data provider.
type WebSocketConfig struct {
// Enabled indicates if the provider is enabled.
Enabled bool `json:"enabled"`
// MaxBufferSize is the maximum number of messages that the provider will buffer
// at any given time. If the provider receives more messages than this, it will
// block receiving messages until the buffer is cleared.
MaxBufferSize int `json:"maxBufferSize"`
// ReconnectionTimeout is the timeout for the provider to attempt to reconnect
// to the websocket endpoint.
ReconnectionTimeout time.Duration `json:"reconnectionTimeout"`
// PostConnectionTimeout is the timeout for the provider to wait after a connection
// is established before sending messages.
PostConnectionTimeout time.Duration `json:"postConnectionTimeout"`
// Endpoints are the websocket endpoints for the provider. At least one endpoint
// must be specified.
Endpoints []Endpoint `json:"endpoints"`
// Name is the name of the provider that corresponds to this config.
Name string `json:"name"`
// ReadBufferSize specifies the I/O read buffer size. if a buffer size of 0 is
// specified, then a default buffer size is used.
ReadBufferSize int `json:"readBufferSize"`
// WriteBufferSize specifies the I/O write buffer size. if a buffer size of 0 is
// specified, then a default buffer size is used.
WriteBufferSize int `json:"writeBufferSize"`
// HandshakeTimeout specifies the duration for the handshake to complete.
HandshakeTimeout time.Duration `json:"handshakeTimeout"`
// EnableCompression specifies if the client should attempt to negotiate per
// message compression (RFC 7692). Setting this value to true does not guarantee
// that compression will be supported. Note that enabling compression may
EnableCompression bool `json:"enableCompression"`
// ReadTimeout sets the read deadline on the underlying network connection.
// After a read has timed out, the websocket connection state is corrupt and
// all future reads will return an error. A zero value for t means reads will
// not time out.
ReadTimeout time.Duration `json:"readTimeout"`
// WriteTimeout sets the write deadline on the underlying network
// connection. After a write has timed out, the websocket state is corrupt and
// all future writes will return an error. A zero value for t means writes will
// not time out.
WriteTimeout time.Duration `json:"writeTimeout"`
// PingInterval is the interval to ping the server. Note that a ping interval
// of 0 disables pings.
PingInterval time.Duration `json:"pingInterval"`
// WriteInterval is the interval at which the provider should wait before sending
// consecutive write messages to the server.
WriteInterval time.Duration `json:"writeInterval"`
// MaxReadErrorCount is the maximum number of read errors that the provider
// will tolerate before closing the connection and attempting to reconnect.
MaxReadErrorCount int `json:"maxReadErrorCount"`
// MaxSubscriptionsPerConnection is the maximum amount of subscriptions that
// can be assigned to a single connection for this provider. The null value (0),
// indicates that there is no limit per connection.
MaxSubscriptionsPerConnection int `json:"maxSubscriptionsPerConnection"`
// MaxSubscriptionsPerBatch is the maximum number of subscription messages that the
// provider will send in a single batch/write.
MaxSubscriptionsPerBatch int `json:"maxSubscriptionsPerBatch"`
}
// ValidateBasic performs basic validation of the websocket config.
func (c *WebSocketConfig) ValidateBasic() error {
if !c.Enabled {
return nil
}
if c.MaxBufferSize < 1 {
return fmt.Errorf("websocket max buffer size must be greater than 0")
}
if c.ReconnectionTimeout <= 0 {
return fmt.Errorf("websocket reconnection timeout must be greater than 0")
}
if c.PostConnectionTimeout < 0 {
return fmt.Errorf("websocket post connection timeout must be greater than 0")
}
if len(c.Endpoints) == 0 {
return fmt.Errorf("websocket endpoints cannot be empty")
}
for i, e := range c.Endpoints {
if err := e.ValidateBasic(); err != nil {
return fmt.Errorf("endpoint %d: %w", i, err)
}
}
if len(c.Name) == 0 {
return fmt.Errorf("websocket name cannot be empty")
}
if c.ReadBufferSize < 0 {
return fmt.Errorf("websocket read buffer size cannot be negative")
}
if c.WriteBufferSize < 0 {
return fmt.Errorf("websocket write buffer size cannot be negative")
}
if c.HandshakeTimeout <= 0 {
return fmt.Errorf("websocket handshake timeout must be greater than 0")
}
if c.ReadTimeout <= 0 {
return fmt.Errorf("websocket read timeout must be greater than 0")
}
if c.WriteTimeout <= 0 {
return fmt.Errorf("websocket write timeout must be greater than 0")
}
if c.PingInterval < 0 {
return fmt.Errorf("websocket ping interval cannot be negative")
}
if c.WriteInterval < 0 {
return fmt.Errorf("websocket write interval must be greater than 0")
}
if c.MaxReadErrorCount < 0 {
return fmt.Errorf("websocket max read error count cannot be negative")
}
if c.MaxSubscriptionsPerConnection < 0 {
return fmt.Errorf("websocket max subscriptions per connection cannot be negative")
}
if c.MaxSubscriptionsPerBatch < 1 {
return fmt.Errorf("websocket max subscriptions per batch must be greater than 0")
}
return nil
}