-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer_add_page.go
304 lines (262 loc) · 8.42 KB
/
consumer_add_page.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package main
import (
"encoding/json"
"strconv"
"time"
"github.com/gdamore/tcell/v2"
"github.com/nats-io/nats.go"
"github.com/rivo/tview"
"github.com/solidpulse/natsdash/ds"
"github.com/solidpulse/natsdash/logger"
"gopkg.in/yaml.v2"
)
type ConsumerAddPage struct {
*tview.Flex
Data *ds.Data
app *tview.Application
txtArea *tview.TextArea
footerTxt *tview.TextView
streamName string
consumerName string
isEdit bool
}
func NewConsumerAddPage(app *tview.Application, data *ds.Data) *ConsumerAddPage {
cap := &ConsumerAddPage{
Flex: tview.NewFlex().SetDirection(tview.FlexRow),
app: app,
Data: data,
}
// Create header
headerRow := tview.NewFlex().SetDirection(tview.FlexColumn)
headerTxtView := createTextView("[ESC] Back [Alt+Enter] Save", tcell.ColorWhite)
headerTxtView.SetBorderPadding(1,1,1,1)
headerRow.AddItem(headerTxtView, 0, 1, false)
// Create text area
cap.txtArea = tview.NewTextArea()
cap.txtArea.SetBorder(true)
// Create footer
cap.footerTxt = createTextView("", tcell.ColorWhite)
cap.footerTxt.SetBorder(true)
// Add all components
cap.AddItem(headerRow, 3, 0, false).
AddItem(cap.txtArea, 0, 1, true).
AddItem(cap.footerTxt, 3, 0, false)
cap.setupInputCapture()
return cap
}
func (cap *ConsumerAddPage) redraw(ctx *ds.Context) {
// Update the title based on mode
title := "Add Consumer"
if cap.isEdit {
title = "Edit Consumer: " + cap.consumerName
}
cap.txtArea.SetTitle(title)
if cap.isEdit {
// Get existing consumer config
js, err := ctx.Conn.JetStream()
if err != nil {
cap.notify("Failed to get JetStream context: "+err.Error(), 3*time.Second, "error")
return
}
consumer, err := js.ConsumerInfo(cap.streamName, cap.consumerName)
if err != nil {
cap.notify("Failed to get consumer info: "+err.Error(), 3*time.Second, "error")
return
}
// Convert to map first
var configMap map[string]interface{}
jsonBytes, err := json.Marshal(consumer.Config)
if err != nil {
cap.notify("Failed to convert config: "+err.Error(), 3*time.Second, "error")
return
}
if err := json.Unmarshal(jsonBytes, &configMap); err != nil {
cap.notify("Failed to process config: "+err.Error(), 3*time.Second, "error")
return
}
// Convert duration fields to strings
if ackWait, ok := configMap["ack_wait"].(float64); ok {
configMap["ack_wait"] = time.Duration(ackWait).String()
}
if idleHeartbeat, ok := configMap["idle_heartbeat"].(float64); ok {
configMap["idle_heartbeat"] = time.Duration(idleHeartbeat).String()
}
// Convert to YAML
yamlBytes, err := yaml.Marshal(configMap)
if err != nil {
cap.notify("Failed to convert config to YAML: "+err.Error(), 3*time.Second, "error")
return
}
cap.txtArea.SetText(string(yamlBytes), false)
} else {
// Set default template for new consumer
defaultConfig := `# Name of the consumer (required)
# Will be set automatically from previous screen
name: ""
# Durable name for the consumer (optional)
# Makes this a durable consumer that survives restarts
durable_name: "NEW"
# Pull mode configuration (optional)
# true: pull-based / false: push-based
pull: true
# Subject filter for the consumer (required)
# Examples: "ORDERS.*", "ORDERS.>", "ORDERS.*.received"
filter_subject: "ORDERS.received"
# Delivery policy configuration (optional)
# Only one of these should be true
deliver_all: true # Deliver all available messages
deliver_last: false # Deliver only the last message
# Other options (set all false if using deliver_all):
# deliver_new: false # Only new messages
# deliver_by_start_sequence: false # Start from specific sequence
# deliver_by_start_time: false # Start from specific time
# Acknowledgment policy (required)
# Options: "none", "all", "explicit"
ack_policy: "explicit"
# Acknowledgment wait time (optional)
# How long to wait for ack before redelivery
# Format: "30s", "1m", "1h"
ack_wait: "30s"
# Replay policy (required)
# Options: "instant", "original"
replay_policy: "instant"
# Maximum delivery attempts (optional)
# How many times to attempt delivery before giving up
max_deliver: 20
# Sampling rate percentage (optional)
# 1-100, where 100 means all messages
sample_freq: 100
# Other available options:
# max_ack_pending: 1000 # Maximum pending acks
# max_waiting: 512 # Maximum waiting pulls
# max_batch: 100 # Maximum batch size for pull
# idle_heartbeat: "30s" # Idle heartbeat interval
# flow_control: false # Enable flow control
# headers_only: false # Deliver only headers`
cap.txtArea.SetText(defaultConfig, false)
}
}
func (cp *ConsumerAddPage) goBack() {
pages.SwitchToPage("consumerListPage")
_, b := pages.GetFrontPage()
b.(*ConsumerListPage).streamName = cp.streamName
b.(*ConsumerListPage).redraw(&cp.Data.CurrCtx)
cp.app.SetFocus(b) // Add this line
}
func (cp *ConsumerAddPage) notify(message string, duration time.Duration, logLevel string) {
cp.footerTxt.SetText(message)
cp.footerTxt.SetTextColor(getLogLevelColor(logLevel))
logger.Info(message)
go func() {
time.Sleep(duration)
cp.footerTxt.SetText("")
cp.footerTxt.SetTextColor(tcell.ColorWhite)
}()
}
func (cap *ConsumerAddPage) setupInputCapture() {
cap.txtArea.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey {
if event.Key() == tcell.KeyEsc {
pages.SwitchToPage("consumerListPage")
return nil
}
if event.Key() == tcell.KeyEnter && event.Modifiers() == tcell.ModAlt {
cap.saveConsumer()
return nil
}
return event
})
}
func (cap *ConsumerAddPage) saveConsumer() {
// Get JetStream context
js, err := cap.Data.CurrCtx.Conn.JetStream()
if err != nil {
cap.notify("Failed to get JetStream context: "+err.Error(), 3*time.Second, "error")
return
}
// First convert YAML to map
var yamlData map[interface{}]interface{}
if err := yaml.Unmarshal([]byte(cap.txtArea.GetText()), &yamlData); err != nil {
cap.notify("Invalid YAML configuration: "+err.Error(), 3*time.Second, "error")
return
}
// Convert YAML map to JSON-compatible map
jsonMap := convertToStringMap(yamlData)
// Handle duration fields before JSON conversion
if ackWait, ok := jsonMap["ack_wait"].(string); ok {
duration, err := time.ParseDuration(ackWait)
if err != nil {
cap.notify("Invalid ack_wait duration: "+err.Error(), 3*time.Second, "error")
return
}
jsonMap["ack_wait"] = duration.Nanoseconds()
}
if idleHeartbeat, ok := jsonMap["idle_heartbeat"].(string); ok {
duration, err := time.ParseDuration(idleHeartbeat)
if err != nil {
cap.notify("Invalid idle_heartbeat duration: "+err.Error(), 3*time.Second, "error")
return
}
jsonMap["idle_heartbeat"] = duration.Nanoseconds()
}
// Convert numeric fields to proper types
if sampleFreq, ok := jsonMap["sample_freq"].(int); ok {
jsonMap["sample_freq"] = strconv.Itoa(sampleFreq)
}
// Convert to JSON
jsonBytes, err := json.Marshal(jsonMap)
if err != nil {
cap.notify("Failed to process configuration: "+err.Error(), 3*time.Second, "error")
return
}
// Parse JSON into consumer config to use NATS struct tags
var config nats.ConsumerConfig
if err := json.Unmarshal(jsonBytes, &config); err != nil {
cap.notify("Invalid configuration: "+err.Error(), 3*time.Second, "error")
return
}
// Create or update the consumer
var consumer *nats.ConsumerInfo
if cap.isEdit {
config.Name = cap.consumerName
consumer, err = js.UpdateConsumer(cap.streamName, &config)
} else {
consumer, err = js.AddConsumer(cap.streamName, &config)
}
if err != nil {
cap.notify("Failed to save consumer: "+err.Error(), 3*time.Second, "error")
return
}
cap.notify("Consumer "+consumer.Name+" saved successfully", 3*time.Second, "info")
// Switch back to consumer list
cap.goBack()
}
// Helper function to convert YAML map to JSON-compatible map
func convertToStringMap(m map[interface{}]interface{}) map[string]interface{} {
res := make(map[string]interface{})
for k, v := range m {
switch v2 := v.(type) {
case map[interface{}]interface{}:
res[k.(string)] = convertToStringMap(v2)
case []interface{}:
res[k.(string)] = convertToStringSlice(v2)
default:
res[k.(string)] = v
}
}
return res
}
// Helper function to convert YAML slice to JSON-compatible slice
func convertToStringSlice(s []interface{}) []interface{} {
res := make([]interface{}, len(s))
for i, v := range s {
switch v2 := v.(type) {
case map[interface{}]interface{}:
res[i] = convertToStringMap(v2)
case []interface{}:
res[i] = convertToStringSlice(v2)
default:
res[i] = v
}
}
return res
}