From 5f21c8bb1de189bc4d354095f818c36305635c64 Mon Sep 17 00:00:00 2001 From: Yabin Ma Date: Tue, 10 Dec 2024 07:15:31 +0100 Subject: [PATCH] NET-1778: scale test code changes (#3203) * comment ACL call and add debug message * add cache for network nodes * fix load node to network cache issue * add peerUpdate call 1 min limit * add debug log for scale test * release maps * avoid default policy for node * 1 min limit for peerUpdate trigger * mq options * Revert "mq options" This reverts commit 10b93d01185478141a12fe7fc8f973d848d28e35. * set peerUpdate run in sequence * update for emqx 5.8.2 * remove batch peer update * change the sleep to 10 millisec to avoid timeout * add compress and change encrypt for peerUpdate message * add mem profiling and automaxprocs * add failover ctx mutex * ignore request to failover peer * remove code without called * remove debug logs * update emqx to v5.8.2 * change broker keepalive * add OLD_ACL_SUPPORT setting * add host version check for message encrypt * remove debug message * remove peerUpdate call control --------- Co-authored-by: abhishek9686 --- compose/docker-compose-emqx.yml | 3 +- controllers/acls.go | 2 +- controllers/server.go | 6 +++ docker/emqx.conf | 21 ++++++++ go.mod | 3 ++ go.sum | 14 ++++- logic/acls.go | 34 ++++++++---- logic/acls/nodeacls/retrieve.go | 4 ++ logic/extpeers.go | 8 +-- logic/nodes.go | 53 ++++++++++++++++++- logic/peers.go | 4 +- logic/proc.go | 13 +++++ main.go | 1 + main_ee.go | 5 +- mq/emqx_on_prem.go | 2 +- mq/migrate.go | 10 +++- mq/mq.go | 2 +- mq/publishers.go | 42 +++++---------- mq/util.go | 92 +++++++++++++++++++++++++++++++-- scripts/netmaker.default.env | 6 +-- servercfg/serverconf.go | 30 +++-------- 21 files changed, 273 insertions(+), 82 deletions(-) create mode 100644 docker/emqx.conf diff --git a/compose/docker-compose-emqx.yml b/compose/docker-compose-emqx.yml index 61dabf08e..94de8fad9 100644 --- a/compose/docker-compose-emqx.yml +++ b/compose/docker-compose-emqx.yml @@ -3,7 +3,7 @@ version: "3.4" services: mq: container_name: mq - image: emqx/emqx:5.0.9 + image: emqx/emqx:5.8.2 env_file: ./netmaker.env restart: unless-stopped environment: @@ -20,6 +20,7 @@ services: - emqx_data:/opt/emqx/data - emqx_etc:/opt/emqx/etc - emqx_logs:/opt/emqx/log + - ./emqx.conf:/opt/emqx/data/configs/cluster.hocon volumes: emqx_data: { } # storage for emqx data emqx_etc: { } # storage for emqx etc diff --git a/controllers/acls.go b/controllers/acls.go index 99d67876d..1bc2c61ca 100644 --- a/controllers/acls.go +++ b/controllers/acls.go @@ -69,7 +69,7 @@ func aclDebug(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest")) return } - allowed := logic.IsNodeAllowedToCommunicate(node, peer) + allowed := logic.IsNodeAllowedToCommunicate(node, peer, true) logic.ReturnSuccessResponseWithJson(w, r, allowed, "fetched all acls in the network ") } diff --git a/controllers/server.go b/controllers/server.go index 77b06fcc7..171bbdb32 100644 --- a/controllers/server.go +++ b/controllers/server.go @@ -48,6 +48,8 @@ func serverHandlers(r *mux.Router) { Methods(http.MethodGet) r.HandleFunc("/api/server/cpu_profile", logic.SecurityCheck(false, http.HandlerFunc(cpuProfile))). Methods(http.MethodPost) + r.HandleFunc("/api/server/mem_profile", logic.SecurityCheck(false, http.HandlerFunc(memProfile))). + Methods(http.MethodPost) } func cpuProfile(w http.ResponseWriter, r *http.Request) { @@ -62,6 +64,10 @@ func cpuProfile(w http.ResponseWriter, r *http.Request) { } } } +func memProfile(w http.ResponseWriter, r *http.Request) { + os.Remove("/root/data/mem.prof") + logic.StartMemProfiling() +} func getUsage(w http.ResponseWriter, _ *http.Request) { type usage struct { diff --git a/docker/emqx.conf b/docker/emqx.conf new file mode 100644 index 000000000..e0af69c2b --- /dev/null +++ b/docker/emqx.conf @@ -0,0 +1,21 @@ +authentication = [ + { + backend = "built_in_database" + mechanism = "password_based" + password_hash_algorithm { + name = "sha256", + salt_position = "suffix" + } + user_id_type = "username" + } +] +authorization { + deny_action = ignore + no_match = allow + sources = [ + { + type = built_in_database + enable = true + } + ] +} diff --git a/go.mod b/go.mod index cfb934a93..1d90afa8c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/gravitl/netmaker go 1.23 require ( + github.com/blang/semver v3.5.1+incompatible github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/go-playground/validator/v10 v10.23.0 github.com/golang-jwt/jwt/v4 v4.5.1 @@ -16,6 +17,7 @@ require ( github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/stretchr/testify v1.9.0 github.com/txn2/txeh v1.5.5 + go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.29.0 golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.24.0 @@ -51,6 +53,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/seancfoley/bintree v1.3.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index b5815e621..7dece523a 100644 --- a/go.sum +++ b/go.sum @@ -2,11 +2,14 @@ cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2Qx cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= +github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/c-robinson/iplib v1.0.8 h1:exDRViDyL9UBLcfmlxxkY5odWX5092nPsQIykHXhIn4= github.com/c-robinson/iplib v1.0.8/go.mod h1:i3LuuFL1hRT5gFpBRnEydzw8R6yhGkF4szNDIbF8pgo= github.com/coreos/go-oidc/v3 v3.9.0 h1:0J/ogVOd4y8P0f0xUh8l9t07xRP/d8tccvjHl2dcsSo= github.com/coreos/go-oidc/v3 v3.9.0/go.mod h1:rTKz2PYwftcrtoCzV5g5kvfJoWcm0Mk8AF8y1iAQro4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -46,6 +49,10 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -64,6 +71,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posthog/posthog-go v1.2.24 h1:A+iG4saBJemo++VDlcWovbYf8KFFNUfrCoJtsc40RPA= github.com/posthog/posthog-go v1.2.24/go.mod h1:uYC2l1Yktc8E+9FAHJ9QZG4vQf/NHJPD800Hsm7DzoM= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -87,6 +96,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/txn2/txeh v1.5.5 h1:UN4e/lCK5HGw/gGAi2GCVrNKg0GTCUWs7gs5riaZlz4= github.com/txn2/txeh v1.5.5/go.mod h1:qYzGG9kCzeVEI12geK4IlanHWY8X4uy/I3NcW7mk8g4= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= @@ -142,8 +153,9 @@ golang.zx2c4.com/wireguard/wgctrl v0.0.0-20221104135756-97bc4ad4a1cb h1:9aqVcYED golang.zx2c4.com/wireguard/wgctrl v0.0.0-20221104135756-97bc4ad4a1cb/go.mod h1:mQqgjkW8GQQcJQsbBvK890TKqUK1DfKWkuBGbOkuMHQ= gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk= gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/mail.v2 v2.3.1 h1:WYFn/oANrAGP2C0dcV6/pbkPzv8yGzqTjPmTeO7qoXk= gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logic/acls.go b/logic/acls.go index f48f0ee27..97b0f7415 100644 --- a/logic/acls.go +++ b/logic/acls.go @@ -347,14 +347,20 @@ func GetDefaultPolicy(netID models.NetworkID, ruleType models.AclPolicyType) (mo return acl, nil } // check if there are any custom all policies + srcMap := make(map[string]struct{}) + dstMap := make(map[string]struct{}) + defer func() { + srcMap = nil + dstMap = nil + }() policies, _ := ListAclsByNetwork(netID) for _, policy := range policies { if !policy.Enabled { continue } if policy.RuleType == ruleType { - dstMap := convAclTagToValueMap(policy.Dst) - srcMap := convAclTagToValueMap(policy.Src) + dstMap = convAclTagToValueMap(policy.Dst) + srcMap = convAclTagToValueMap(policy.Src) if _, ok := srcMap["*"]; ok { if _, ok := dstMap["*"]; ok { return policy, nil @@ -512,29 +518,37 @@ func IsUserAllowedToCommunicate(userName string, peer models.Node) bool { } // IsNodeAllowedToCommunicate - check node is allowed to communicate with the peer -func IsNodeAllowedToCommunicate(node, peer models.Node) bool { +func IsNodeAllowedToCommunicate(node, peer models.Node, checkDefaultPolicy bool) bool { if node.IsStatic { node = node.StaticNode.ConvertToStaticNode() } if peer.IsStatic { peer = peer.StaticNode.ConvertToStaticNode() } - // check default policy if all allowed return true - defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) - if err == nil { - if defaultPolicy.Enabled { - return true + if checkDefaultPolicy { + // check default policy if all allowed return true + defaultPolicy, err := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) + if err == nil { + if defaultPolicy.Enabled { + return true + } } } // list device policies policies := listDevicePolicies(models.NetworkID(peer.Network)) + srcMap := make(map[string]struct{}) + dstMap := make(map[string]struct{}) + defer func() { + srcMap = nil + dstMap = nil + }() for _, policy := range policies { if !policy.Enabled { continue } - srcMap := convAclTagToValueMap(policy.Src) - dstMap := convAclTagToValueMap(policy.Dst) + srcMap = convAclTagToValueMap(policy.Src) + dstMap = convAclTagToValueMap(policy.Dst) // fmt.Printf("\n======> SRCMAP: %+v\n", srcMap) // fmt.Printf("\n======> DSTMAP: %+v\n", dstMap) // fmt.Printf("\n======> node Tags: %+v\n", node.Tags) diff --git a/logic/acls/nodeacls/retrieve.go b/logic/acls/nodeacls/retrieve.go index 4411c5b22..84895f44d 100644 --- a/logic/acls/nodeacls/retrieve.go +++ b/logic/acls/nodeacls/retrieve.go @@ -7,12 +7,16 @@ import ( "sync" "github.com/gravitl/netmaker/logic/acls" + "github.com/gravitl/netmaker/servercfg" ) var NodesAllowedACLMutex = &sync.Mutex{} // AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool { + if !servercfg.IsOldAclEnabled() { + return true + } NodesAllowedACLMutex.Lock() defer NodesAllowedACLMutex.Unlock() var currentNetworkACL, err = FetchAllACLs(networkID) diff --git a/logic/extpeers.go b/logic/extpeers.go index c03a0efa7..6cb352231 100644 --- a/logic/extpeers.go +++ b/logic/extpeers.go @@ -564,7 +564,7 @@ func GetFwRulesOnIngressGateway(node models.Node) (rules []models.FwRule) { if peer.StaticNode.ClientID == nodeI.StaticNode.ClientID || peer.IsUserNode { continue } - if IsNodeAllowedToCommunicate(nodeI, peer) { + if IsNodeAllowedToCommunicate(nodeI, peer, true) { if peer.IsStatic { if nodeI.StaticNode.Address != "" { rules = append(rules, models.FwRule{ @@ -650,7 +650,7 @@ func GetExtPeers(node, peer *models.Node) ([]wgtypes.PeerConfig, []models.IDandA continue } if extPeer.RemoteAccessClientID == "" { - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), *peer, true) { continue } } else { @@ -739,7 +739,7 @@ func getExtpeerEgressRanges(node models.Node) (ranges, ranges6 []net.IPNet) { if len(extPeer.ExtraAllowedIPs) == 0 { continue } - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) { continue } for _, allowedRange := range extPeer.ExtraAllowedIPs { @@ -766,7 +766,7 @@ func getExtpeersExtraRoutes(node models.Node) (egressRoutes []models.EgressNetwo if len(extPeer.ExtraAllowedIPs) == 0 { continue } - if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node) { + if !IsNodeAllowedToCommunicate(extPeer.ConvertToStaticNode(), node, true) { continue } egressRoutes = append(egressRoutes, getExtPeerEgressRoute(node, extPeer)...) diff --git a/logic/nodes.go b/logic/nodes.go index 63bbe5dce..c3f19aec6 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "maps" "net" + "slices" "sort" "sync" "time" @@ -24,8 +26,10 @@ import ( ) var ( - nodeCacheMutex = &sync.RWMutex{} - nodesCacheMap = make(map[string]models.Node) + nodeCacheMutex = &sync.RWMutex{} + nodeNetworkCacheMutex = &sync.RWMutex{} + nodesCacheMap = make(map[string]models.Node) + nodesNetworkCacheMap = make(map[string]map[string]models.Node) ) func getNodeFromCache(nodeID string) (node models.Node, ok bool) { @@ -48,12 +52,37 @@ func deleteNodeFromCache(nodeID string) { delete(nodesCacheMap, nodeID) nodeCacheMutex.Unlock() } +func deleteNodeFromNetworkCache(nodeID string, network string) { + nodeNetworkCacheMutex.Lock() + delete(nodesNetworkCacheMap[network], nodeID) + nodeNetworkCacheMutex.Unlock() +} + +func storeNodeInNetworkCache(node models.Node, network string) { + nodeNetworkCacheMutex.Lock() + if nodesNetworkCacheMap[network] == nil { + nodesNetworkCacheMap[network] = make(map[string]models.Node) + } + nodesNetworkCacheMap[network][node.ID.String()] = node + nodeNetworkCacheMutex.Unlock() +} func storeNodeInCache(node models.Node) { nodeCacheMutex.Lock() nodesCacheMap[node.ID.String()] = node nodeCacheMutex.Unlock() } +func loadNodesIntoNetworkCache(nMap map[string]models.Node) { + nodeNetworkCacheMutex.Lock() + for _, v := range nMap { + network := v.Network + if nodesNetworkCacheMap[network] == nil { + nodesNetworkCacheMap[network] = make(map[string]models.Node) + } + nodesNetworkCacheMap[network][v.ID.String()] = v + } + nodeNetworkCacheMutex.Unlock() +} func loadNodesIntoCache(nMap map[string]models.Node) { nodeCacheMutex.Lock() @@ -63,6 +92,7 @@ func loadNodesIntoCache(nMap map[string]models.Node) { func ClearNodeCache() { nodeCacheMutex.Lock() nodesCacheMap = make(map[string]models.Node) + nodesNetworkCacheMap = make(map[string]map[string]models.Node) nodeCacheMutex.Unlock() } @@ -77,6 +107,12 @@ const ( // GetNetworkNodes - gets the nodes of a network func GetNetworkNodes(network string) ([]models.Node, error) { + + if networkNodes, ok := nodesNetworkCacheMap[network]; ok { + nodeNetworkCacheMutex.Lock() + defer nodeNetworkCacheMutex.Unlock() + return slices.Collect(maps.Values(networkNodes)), nil + } allnodes, err := GetAllNodes() if err != nil { return []models.Node{}, err @@ -99,6 +135,12 @@ func GetHostNodes(host *models.Host) []models.Node { // GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node { + + if networkNodes, ok := nodesNetworkCacheMap[network]; ok { + nodeNetworkCacheMutex.Lock() + defer nodeNetworkCacheMutex.Unlock() + return slices.Collect(maps.Values(networkNodes)) + } var nodes = []models.Node{} for i := range allNodes { node := allNodes[i] @@ -123,6 +165,7 @@ func UpdateNodeCheckin(node *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*node) + storeNodeInNetworkCache(*node, node.Network) } return nil } @@ -140,6 +183,7 @@ func UpsertNode(newNode *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*newNode) + storeNodeInNetworkCache(*newNode, newNode.Network) } return nil } @@ -179,6 +223,7 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*newNode) + storeNodeInNetworkCache(*newNode, newNode.Network) if _, ok := allocatedIpMap[newNode.Network]; ok { if newNode.Address.IP != nil && !newNode.Address.IP.Equal(currentNode.Address.IP) { AddIpToAllocatedIpMap(newNode.Network, newNode.Address.IP) @@ -298,6 +343,7 @@ func DeleteNodeByID(node *models.Node) error { } if servercfg.CacheEnabled() { deleteNodeFromCache(node.ID.String()) + deleteNodeFromNetworkCache(node.ID.String(), node.Network) } if servercfg.IsDNSMode() { SetDNS() @@ -360,6 +406,7 @@ func GetAllNodes() ([]models.Node, error) { nodesMap := make(map[string]models.Node) if servercfg.CacheEnabled() { defer loadNodesIntoCache(nodesMap) + defer loadNodesIntoNetworkCache(nodesMap) } collection, err := database.FetchRecords(database.NODES_TABLE_NAME) if err != nil { @@ -469,6 +516,7 @@ func GetNodeByID(uuid string) (models.Node, error) { } if servercfg.CacheEnabled() { storeNodeInCache(node) + storeNodeInNetworkCache(node, node.Network) } return node, nil } @@ -622,6 +670,7 @@ func createNode(node *models.Node) error { } if servercfg.CacheEnabled() { storeNodeInCache(*node) + storeNodeInNetworkCache(*node, node.Network) } if _, ok := allocatedIpMap[node.Network]; ok { if node.Address.IP != nil { diff --git a/logic/peers.go b/logic/peers.go index 4a00fdb75..e88f3f4d3 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -96,6 +96,8 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE { continue } + // check default policy if all allowed return true + defaultPolicy, _ := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy) if host.OS == models.OS_Types.IoT { hostPeerUpdate.NodeAddrs = append(hostPeerUpdate.NodeAddrs, node.PrimaryAddressIPNet()) if node.IsRelayed { @@ -259,7 +261,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N !peer.PendingDelete && peer.Connected && nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) && - IsNodeAllowedToCommunicate(node, peer) && + (defaultPolicy.Enabled || IsNodeAllowedToCommunicate(node, peer, false)) && (deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) { peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection } diff --git a/logic/proc.go b/logic/proc.go index fec258d75..fadc69316 100644 --- a/logic/proc.go +++ b/logic/proc.go @@ -2,6 +2,7 @@ package logic import ( "os" + "runtime" "runtime/pprof" "github.com/gravitl/netmaker/logger" @@ -22,3 +23,15 @@ func StopCPUProfiling(f *os.File) { pprof.StopCPUProfile() f.Close() } + +func StartMemProfiling() { + f, err := os.OpenFile("/root/data/mem.prof", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0755) + if err != nil { + logger.Log(0, "could not create Memory profile: ", err.Error()) + } + defer f.Close() + runtime.GC() // get up-to-date statistics + if err = pprof.WriteHeapProfile(f); err != nil { + logger.Log(0, "could not write memory profile: ", err.Error()) + } +} diff --git a/main.go b/main.go index 33eebb71c..566d61cf9 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/serverctl" + _ "go.uber.org/automaxprocs" "golang.org/x/exp/slog" ) diff --git a/main_ee.go b/main_ee.go index 90001d870..3beb9e189 100644 --- a/main_ee.go +++ b/main_ee.go @@ -3,7 +3,10 @@ package main -import "github.com/gravitl/netmaker/pro" +import ( + "github.com/gravitl/netmaker/pro" + _ "go.uber.org/automaxprocs" +) func init() { pro.InitPro() diff --git a/mq/emqx_on_prem.go b/mq/emqx_on_prem.go index b9cd690cc..ff8c9bf1f 100644 --- a/mq/emqx_on_prem.go +++ b/mq/emqx_on_prem.go @@ -261,7 +261,7 @@ func (e *EmqxOnPrem) CreateDefaultAllowRule() error { if err != nil { return err } - req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload)) + req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/rules/all", bytes.NewReader(payload)) if err != nil { return err } diff --git a/mq/migrate.go b/mq/migrate.go index ec8b4afb0..5790e4721 100644 --- a/mq/migrate.go +++ b/mq/migrate.go @@ -88,7 +88,15 @@ func SendPullSYN() error { Host: host, } msg, _ := json.Marshal(hostUpdate) - encrypted, encryptErr := encryptMsg(&host, msg) + zipped, err := compressPayload(msg) + if err != nil { + return err + } + encrypted, encryptErr := encryptAESGCM(host.TrafficKeyPublic[0:32], zipped) + if encryptErr != nil { + return encryptErr + } + if encryptErr != nil { continue } diff --git a/mq/mq.go b/mq/mq.go index bb3a7d538..948a8d224 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -35,7 +35,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) { opts.SetConnectRetry(true) opts.SetCleanSession(true) opts.SetConnectRetryInterval(time.Second * 1) - opts.SetKeepAlive(time.Second * 10) + opts.SetKeepAlive(time.Second * 15) opts.SetOrderMatters(false) opts.SetWriteTimeout(time.Minute) } diff --git a/mq/publishers.go b/mq/publishers.go index 3b47390a8..28ef1935d 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" @@ -14,11 +15,9 @@ import ( "golang.org/x/exp/slog" ) -var batchSize = servercfg.GetPeerUpdateBatchSize() -var batchUpdate = servercfg.GetBatchPeerUpdate() - // PublishPeerUpdate --- determines and publishes a peer update to all the hosts func PublishPeerUpdate(replacePeers bool) error { + if !servercfg.IsMessageQueueBackend() { return nil } @@ -37,35 +36,20 @@ func PublishPeerUpdate(replacePeers bool) error { return err } - //if batch peer update disabled - if !batchUpdate { - for _, host := range hosts { - host := host - go func(host models.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + for _, host := range hosts { + host := host + time.Sleep(5 * time.Millisecond) + go func(host models.Host) { + if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil { + id := host.Name + if host.ID != uuid.Nil { + id = host.ID.String() } - }(host) - } - return nil + slog.Error("failed to publish peer update to host", id, ": ", err) + } + }(host) } - //if batch peer update enabled - batchHost := BatchItems(hosts, batchSize) - var wg sync.WaitGroup - for _, v := range batchHost { - hostLen := len(v) - wg.Add(hostLen) - for i := 0; i < hostLen; i++ { - host := hosts[i] - go func(host models.Host) { - if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil { - logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) - } - }(host) - } - wg.Wait() - } return nil } diff --git a/mq/util.go b/mq/util.go index a38cd7d75..7600ff8e4 100644 --- a/mq/util.go +++ b/mq/util.go @@ -1,12 +1,20 @@ package mq import ( + "bytes" + "compress/gzip" + "crypto/aes" + "crypto/cipher" + "crypto/rand" "errors" "fmt" + "io" "math" "strings" "time" + "unicode" + "github.com/blang/semver" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/netclient/ncutils" @@ -66,6 +74,39 @@ func BatchItems[T any](items []T, batchSize int) [][]T { return batches } +func compressPayload(data []byte) ([]byte, error) { + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + if _, err := zw.Write(data); err != nil { + return nil, err + } + zw.Close() + return buf.Bytes(), nil +} +func encryptAESGCM(key, plaintext []byte) ([]byte, error) { + // Create AES block cipher + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + // Create GCM (Galois/Counter Mode) cipher + aesGCM, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + + // Create a random nonce + nonce := make([]byte, aesGCM.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err + } + + // Encrypt the data + ciphertext := aesGCM.Seal(nonce, nonce, plaintext, nil) + return ciphertext, nil +} + func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { if host.OS == models.OS_Types.IoT { return msg, nil @@ -96,10 +137,29 @@ func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { func publish(host *models.Host, dest string, msg []byte) error { - encrypted, encryptErr := encryptMsg(host, msg) - if encryptErr != nil { - return encryptErr + var encrypted []byte + var encryptErr error + vlt, err := versionLessThan(host.Version, "v0.30.0") + if err != nil { + slog.Warn("error checking version less than", "error", err) + return err + } + if vlt { + encrypted, encryptErr = encryptMsg(host, msg) + if encryptErr != nil { + return encryptErr + } + } else { + zipped, err := compressPayload(msg) + if err != nil { + return err + } + encrypted, encryptErr = encryptAESGCM(host.TrafficKeyPublic[0:32], zipped) + if encryptErr != nil { + return encryptErr + } } + if mqclient == nil || !mqclient.IsConnectionOpen() { return errors.New("cannot publish ... mqclient not connected") } @@ -127,3 +187,29 @@ func GetID(topic string) (string, error) { //the last part of the topic will be the node.ID return parts[count-1], nil } + +// versionLessThan checks if v1 < v2 semantically +// dev is the latest version +func versionLessThan(v1, v2 string) (bool, error) { + if v1 == "dev" { + return false, nil + } + if v2 == "dev" { + return true, nil + } + semVer1 := strings.TrimFunc(v1, func(r rune) bool { + return !unicode.IsNumber(r) + }) + semVer2 := strings.TrimFunc(v2, func(r rune) bool { + return !unicode.IsNumber(r) + }) + sv1, err := semver.Parse(semVer1) + if err != nil { + return false, fmt.Errorf("failed to parse semver1 (%s): %w", semVer1, err) + } + sv2, err := semver.Parse(semVer2) + if err != nil { + return false, fmt.Errorf("failed to parse semver2 (%s): %w", semVer2, err) + } + return sv1.LT(sv2), nil +} diff --git a/scripts/netmaker.default.env b/scripts/netmaker.default.env index 1a9d14461..d534abacc 100644 --- a/scripts/netmaker.default.env +++ b/scripts/netmaker.default.env @@ -86,13 +86,11 @@ EMAIL_SENDER_ADDR= EMAIL_SENDER_USER= # sender smtp password EMAIL_SENDER_PASSWORD= -# if batch peer update enable or not -PEER_UPDATE_BATCH=true -# batch peer update size when PEER_UPDATE_BATCH is enabled -PEER_UPDATE_BATCH_SIZE=50 # default domain for internal DNS lookup DEFAULT_DOMAIN=netmaker.hosted # managed dns setting, set to true to resolve dns entries on netmaker network MANAGE_DNS=false +# set to true, old acl is supported, otherwise, old acl is disabled +OLD_ACL_SUPPORT=true # if STUN is set to true, hole punch is called STUN=true diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 40d659e0e..21c56d512 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -668,6 +668,14 @@ func GetManageDNS() bool { return enabled } +func IsOldAclEnabled() bool { + enabled := true + if os.Getenv("OLD_ACL_SUPPORT") != "" { + enabled = os.Getenv("OLD_ACL_SUPPORT") == "true" + } + return enabled +} + // GetDefaultDomain - get the default domain func GetDefaultDomain() string { //default netmaker.hosted @@ -690,28 +698,6 @@ func validateDomain(domain string) bool { return exp.MatchString(domain) } -// GetBatchPeerUpdate - if batch peer update -func GetBatchPeerUpdate() bool { - enabled := true - if os.Getenv("PEER_UPDATE_BATCH") != "" { - enabled = os.Getenv("PEER_UPDATE_BATCH") == "true" - } - return enabled -} - -// GetPeerUpdateBatchSize - get the batch size for peer update -func GetPeerUpdateBatchSize() int { - //default 50 - batchSize := 50 - if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" { - b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE")) - if e == nil && b > 0 && b < 1000 { - batchSize = b - } - } - return batchSize -} - // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX func GetEmqxRestEndpoint() string { return os.Getenv("EMQX_REST_ENDPOINT")