Skip to content

Commit e6b62cb

Browse files
committed
refactor: slow response and big response
1 parent a3dff8a commit e6b62cb

File tree

6 files changed

+36
-21
lines changed

6 files changed

+36
-21
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ Features:
2121

2222
- [x] Count commands and arguments
2323
- [ ] Count network traffic of command
24-
- [ ] Monitor Slow commands
25-
- [ ] Monitor big response
24+
- [x] Monitor Slow commands
25+
- [x] Monitor big response
2626

2727
## Installation
2828

@@ -51,6 +51,8 @@ password | **** | Prometheus metrics password
5151
s | empty | Separator of keys (for split). If it empty does not split keys.
5252
r | empty | Regex pattern of keys (for clean)
5353
max | 150 | Maximum lookup size of key. If value -1 unlimited lookup.
54+
slow-response-threshold | 500 | threshold for recording slow response. Millisecond
55+
big-response-threshold | 1500 | threshold for recording slow response. Bytes
5456

5557
### Grafana Dashboard
5658

durations.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ type Durations struct {
2020
list map[uint32]DurationItem
2121
}
2222

23+
func NewDurations() *Durations {
24+
return &Durations{list: make(map[uint32]DurationItem)}
25+
}
26+
2327
func (d *Durations) Set(k uint32, command, args string) {
2428
d.m.Lock()
2529
defer d.m.Unlock()

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ func main() {
1919
keyseparator = flag.String("s", "", "Separator of keys (for split). If it empty does not split keys.")
2020
keycleanerregex = flag.String("r", "", "Regex pattern for cleaner in keys")
2121
maxkeysizenumber = flag.Int("max", 120, "Key size to be lookup")
22-
slowresponsethresold = flag.Duration("slow-response-threshold", time.Millisecond*500, "")
23-
bigresponsethreshold = flag.Int("big-response-threshold", 50000, "")
22+
slowresponsethresold = flag.Duration("slow-response-threshold", time.Millisecond*500, "threshold for recording slow response. Millisecond")
23+
bigresponsethreshold = flag.Int("big-response-threshold", 1500, "threshold for recording slow response. Bytes")
2424
)
2525

2626
flag.Parse()

monitor_packet.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import (
1111

1212
//StartMonitor monitor redis packet destination or source port
1313
func StartMonitor(devicename string, redisport uint16) error {
14-
bpffilter := fmt.Sprintf("port %d and tcp[((tcp[12:1] & 0xf0) >> 2):1] = 0x2A", redisport)
15-
handle, err := pcap.OpenLive(devicename, 1600, false, -1*time.Second)
14+
// + -> 0x2b, $ -> 0x24, * -> 0x2A
15+
bpffilter := fmt.Sprintf(`port %d and tcp[((tcp[12:1] & 0xf0) >> 2):1] = 0x2A
16+
|| tcp[((tcp[12:1] & 0xf0) >> 2):1] = 0x24
17+
|| tcp[((tcp[12:1] & 0xf0) >> 2):1] = 0x2b`, redisport)
18+
handle, err := pcap.OpenLive(devicename, 1800, false, -1*time.Second)
1619
if err != nil {
1720
return fmt.Errorf("error opening device %s: %v", devicename, err)
1821
}

monitor_resp.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func monitorRespPackets(redisport uint, sep, cleaner string, maxkeysize int, slo
2727
cleanerxp = regexp.MustCompile(cleaner)
2828
}
2929

30+
durations = NewDurations()
3031
tcpchan = make(chan *layers.TCP, 100)
3132

3233
for {
@@ -35,23 +36,35 @@ func monitorRespPackets(redisport uint, sep, cleaner string, maxkeysize int, slo
3536
if packet.SrcPort == layers.TCPPort(redisport) { //response
3637
ditem, ok := durations.Get(packet.Seq)
3738
if !ok {
38-
break
39+
continue
3940
}
4041
if l := ditem.ToLatency(); l > slowresponsethresold {
41-
slowCommands.WithLabelValues(ditem.Command, ditem.Args).Observe(float64(l))
42+
if ditem.Args != "" {
43+
slowCommands.WithLabelValues(ditem.Command, ditem.Args).Observe(float64(l))
44+
}
4245
}
4346
if size := len(packet.Payload); size > bigresponsethreshold {
44-
bigCommands.WithLabelValues(ditem.Command, ditem.Args).Observe(float64(size))
47+
if ditem.Args != "" {
48+
bigCommands.WithLabelValues(ditem.Command, ditem.Args).Observe(float64(size))
49+
}
4550
}
51+
52+
log.Debug().Str("command", ditem.Command).Str("args", ditem.Args).Int("size", len(packet.Payload)).Msg("response")
4653
} else if packet.DstPort == layers.TCPPort(redisport) { //request
4754
rsp, err := parseRespPacket(packet.Payload, separator, cleanerxp, maxkeysize)
4855
if err != nil {
4956
log.Debug().Caller().Hex("payload", packet.Payload).Err(err).Msg("request parse error")
50-
break
57+
continue
58+
}
59+
if rsp.Args() != "" {
60+
durations.Set(packet.Ack, rsp.Command(), rsp.Args())
61+
commandCountDetail.WithLabelValues(rsp.Command(), rsp.Args()).Inc()
62+
}
63+
if rsp.Command() != "" {
64+
commandCount.WithLabelValues(rsp.Command()).Inc()
5165
}
52-
durations.Set(packet.Ack, rsp.Command(), rsp.Args())
53-
commandCount.WithLabelValues(rsp.Command()).Inc()
54-
commandCountDetail.WithLabelValues(rsp.Command(), rsp.Args()).Inc()
66+
67+
log.Debug().Str("command", rsp.Command()).Str("args", rsp.Args()).Float64("size", rsp.Size()).Msg("request")
5568
}
5669
}
5770
}
@@ -62,12 +75,5 @@ func parseRespPacket(payload []byte, sep []byte, cleaner *regexp.Regexp, maxkeys
6275
if err != nil {
6376
return rsp, err
6477
}
65-
66-
log.Debug().Hex("payload", payload).Msg("payload")
67-
log.Debug().Str("command", rsp.Command()).
68-
Str("args", rsp.Args()).
69-
Float64("size", rsp.Size()).
70-
Msg("received")
71-
7278
return rsp, err
7379
}

resp_parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *RespReader) parse(sep []byte, cleaner *regexp.Regexp, maxsize int) erro
6363
}
6464
}
6565

66-
if len(argsindex) >= 1 {
66+
if len(argsindex) >= 2 {
6767
first := pp[argsindex[1]]
6868
if len(first) > maxsize {
6969
if maxsize > 0 {

0 commit comments

Comments
 (0)