Skip to content

Commit c8aa29c

Browse files
authored
NET-1778: scale test changes (#920)
* fix UpdateHost call issue * add unzip and change encrypt for peerUpdate message * add old decrypt func back for compatibility * change broker keepalive * fix mq server restart connection issue
1 parent ce7366c commit c8aa29c

File tree

3 files changed

+99
-11
lines changed

3 files changed

+99
-11
lines changed

auth/auth.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func isTokenExpired(tokenString string) bool {
3737
return true
3838
}
3939

40+
func CleanJwtToken() {
41+
jwtToken = ""
42+
}
43+
4044
// Authenticate authenticates with netmaker api to permit subsequent interactions with the api
4145
func Authenticate(server *config.Server, host *config.Config) (string, error) {
4246
if jwtToken != "" && !isTokenExpired(jwtToken) {

functions/daemon.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package functions
22

33
import (
4+
"bytes"
5+
"compress/gzip"
46
"context"
7+
"crypto/aes"
8+
"crypto/cipher"
59
"errors"
610
"fmt"
11+
"io"
712
"net"
813
"os"
914
"os/signal"
@@ -341,7 +346,7 @@ func setupMQTT(server *config.Server) error {
341346
opts.SetAutoReconnect(true)
342347
opts.SetConnectRetry(true)
343348
opts.SetConnectRetryInterval(time.Second << 2)
344-
opts.SetKeepAlive(time.Second * 10)
349+
opts.SetKeepAlive(time.Second * 15)
345350
opts.SetWriteTimeout(time.Minute)
346351
opts.SetCleanSession(true)
347352
opts.SetOnConnectHandler(func(client mqtt.Client) {
@@ -485,7 +490,26 @@ func setDNSSubscriptions(client mqtt.Client, node *config.Node) {
485490
slog.Info("subscribed to DNS sync for node", "node", node.ID, "network", node.Network)
486491
}
487492

488-
// should only ever use node client configs
493+
func unzipPayload(data []byte) (resData []byte, err error) {
494+
b := bytes.NewBuffer(data)
495+
496+
var r io.Reader
497+
r, err = gzip.NewReader(b)
498+
if err != nil {
499+
return
500+
}
501+
502+
var resB bytes.Buffer
503+
_, err = resB.ReadFrom(r)
504+
if err != nil {
505+
return
506+
}
507+
508+
resData = resB.Bytes()
509+
510+
return
511+
}
512+
489513
func decryptMsg(serverName string, msg []byte) ([]byte, error) {
490514
if len(msg) <= 24 { // make sure message is of appropriate length
491515
return nil, fmt.Errorf("received invalid message from broker %v", msg)
@@ -508,6 +532,32 @@ func decryptMsg(serverName string, msg []byte) ([]byte, error) {
508532
return DeChunk(msg, serverPubKey, diskKey)
509533
}
510534

535+
func decryptAESGCM(key, ciphertext []byte) ([]byte, error) {
536+
// Create AES block cipher
537+
block, err := aes.NewCipher(key)
538+
if err != nil {
539+
return nil, err
540+
}
541+
542+
// Create GCM (Galois/Counter Mode) cipher
543+
aesGCM, err := cipher.NewGCM(block)
544+
if err != nil {
545+
return nil, err
546+
}
547+
548+
// Separate nonce and ciphertext
549+
nonceSize := aesGCM.NonceSize()
550+
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
551+
552+
// Decrypt the data
553+
plaintext, err := aesGCM.Open(nil, nonce, ciphertext, nil)
554+
if err != nil {
555+
return nil, err
556+
}
557+
558+
return plaintext, nil
559+
}
560+
511561
func read(network, which string) string {
512562
val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
513563
if isok {

functions/mqhandlers.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/devilcove/httpclient"
1515
mqtt "github.com/eclipse/paho.mqtt.golang"
16+
"github.com/gravitl/netclient/auth"
1617
"github.com/gravitl/netclient/cache"
1718
"github.com/gravitl/netclient/config"
1819
"github.com/gravitl/netclient/daemon"
@@ -44,11 +45,22 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
4445
slog.Info("processing node update for network", "network", network)
4546
node := config.GetNode(network)
4647
server := config.Servers[node.Server]
47-
data, err := decryptMsg(server.Name, msg.Payload())
48+
data, err := decryptAESGCM(config.Netclient().TrafficKeyPublic[0:32], msg.Payload())
4849
if err != nil {
49-
slog.Error("error decrypting message", "error", err)
50-
return
50+
slog.Warn("error decrypting message", "warn", err)
51+
data, err = decryptMsg(server.Name, msg.Payload())
52+
if err != nil {
53+
slog.Error("error decrypting message", "error", err)
54+
return
55+
}
56+
} else {
57+
data, err = unzipPayload(data)
58+
if err != nil {
59+
slog.Error("error unzipping message", "error", err)
60+
return
61+
}
5162
}
63+
5264
serverNode := models.Node{}
5365
if err = json.Unmarshal([]byte(data), &serverNode); err != nil {
5466
slog.Error("error unmarshalling node update data", "error", err)
@@ -148,11 +160,22 @@ func HostPeerUpdate(client mqtt.Client, msg mqtt.Message) {
148160
return
149161
}
150162
slog.Info("processing peer update for server", "server", serverName)
151-
data, err := decryptMsg(serverName, msg.Payload())
163+
data, err := decryptAESGCM(config.Netclient().TrafficKeyPublic[0:32], msg.Payload())
152164
if err != nil {
153-
slog.Error("error decrypting message", "error", err)
154-
return
165+
slog.Warn("error decrypting message", "warn", err)
166+
data, err = decryptMsg(server.Name, msg.Payload())
167+
if err != nil {
168+
slog.Error("error decrypting message", "error", err)
169+
return
170+
}
171+
} else {
172+
data, err = unzipPayload(data)
173+
if err != nil {
174+
slog.Error("error unzipping message", "error", err)
175+
return
176+
}
155177
}
178+
156179
err = json.Unmarshal([]byte(data), &peerUpdate)
157180
if err != nil {
158181
slog.Error("error unmarshalling peer data", "error", err)
@@ -281,10 +304,20 @@ func HostUpdate(client mqtt.Client, msg mqtt.Message) {
281304
if len(msg.Payload()) == 0 {
282305
return
283306
}
284-
data, err := decryptMsg(serverName, msg.Payload())
307+
data, err := decryptAESGCM(config.Netclient().TrafficKeyPublic[0:32], msg.Payload())
285308
if err != nil {
286-
slog.Error("error decrypting message", "error", err)
287-
return
309+
slog.Warn("error decrypting message", "warn", err)
310+
data, err = decryptMsg(server.Name, msg.Payload())
311+
if err != nil {
312+
slog.Error("error decrypting message", "error", err)
313+
return
314+
}
315+
} else {
316+
data, err = unzipPayload(data)
317+
if err != nil {
318+
slog.Error("error unzipping message", "error", err)
319+
return
320+
}
288321
}
289322
err = json.Unmarshal([]byte(data), &hostUpdate)
290323
if err != nil {
@@ -606,6 +639,7 @@ func mqFallback(ctx context.Context, wg *sync.WaitGroup) {
606639
}
607640
// Call netclient http config pull
608641
slog.Info("### mqfallback routine execute")
642+
auth.CleanJwtToken()
609643
response, resetInterface, replacePeers, err := Pull(false)
610644
if err != nil {
611645
slog.Error("pull failed", "error", err)

0 commit comments

Comments
 (0)