From 52700ae56c5d541e711fbd5f27373b3dc200f8dc Mon Sep 17 00:00:00 2001 From: Lorenz Brun Date: Tue, 28 Jan 2025 15:07:08 +0100 Subject: [PATCH] m/n/k8s: add nftables network policy controller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This integrates my K8s network policy controller. In its current form it does not have many guarantees as the custom CNI plugin is not yet in there but it mostly works. Also there is still a DNS hole as host-local services are not properly policed yet. It has a basic smoke test using the connectivity testing helper as well as some metrics to make sure it is integrated properly and to be able to monitor its performance. Change-Id: Ia2f54b9975361270678ce742ae5e32df25e515c5 Reviewed-on: https://review.monogon.dev/c/monogon/+/3740 Tested-by: Jenkins CI Reviewed-by: Jan Schär --- build/bazel/go.MODULE.bazel | 1 + go.mod | 4 +- go.sum | 4 + metropolis/node/kubernetes/BUILD.bazel | 1 + .../node/kubernetes/networkpolicy/BUILD.bazel | 23 +++ .../kubernetes/networkpolicy/networkpolicy.go | 141 ++++++++++++++++++ .../kubernetes/reconciler/resources_rbac.go | 12 +- metropolis/node/kubernetes/service_worker.go | 6 + .../test/e2e/suites/kubernetes/BUILD.bazel | 2 + .../test/e2e/suites/kubernetes/run_test.go | 47 ++++++ 10 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 metropolis/node/kubernetes/networkpolicy/BUILD.bazel create mode 100644 metropolis/node/kubernetes/networkpolicy/networkpolicy.go diff --git a/build/bazel/go.MODULE.bazel b/build/bazel/go.MODULE.bazel index 2e5402dc..20e7dc41 100644 --- a/build/bazel/go.MODULE.bazel +++ b/build/bazel/go.MODULE.bazel @@ -88,6 +88,7 @@ use_repo( "io_k8s_kubernetes", "io_k8s_pod_security_admission", "io_k8s_utils", + "org_dolansoft_git_dolansoft_k8s_nft_npc", "org_go4_netipx", "org_golang_google_api", "org_golang_google_genproto_googleapis_api", diff --git a/go.mod b/go.mod index 481bc0d3..ebfbf6ad 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.14.0 require ( 4d63.com/gocheckcompilerdirectives v1.2.1 cloud.google.com/go/storage v1.38.0 + git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 github.com/adrg/xdg v0.4.0 github.com/bazelbuild/rules_go v0.52.0 github.com/cavaliergopher/cpio v1.0.1 @@ -146,7 +147,7 @@ require ( k8s.io/client-go v0.32.0 k8s.io/component-base v0.32.0 k8s.io/klog/v2 v2.130.1 - k8s.io/kubectl v0.0.0 + k8s.io/kubectl v0.32.0 k8s.io/kubelet v0.32.0 k8s.io/kubernetes v1.32.0 k8s.io/pod-security-admission v0.0.0 @@ -280,6 +281,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hodgesds/perf-utils v0.7.0 // indirect + github.com/igrmk/treemap/v2 v2.0.1 // indirect github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/intel/goresctrl v0.8.0 // indirect diff --git a/go.sum b/go.sum index c838a8d9..8e150755 100644 --- a/go.sum +++ b/go.sum @@ -1356,6 +1356,8 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= +git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 h1:VcDYYx80mOeRWBwBr2Hs1grbz1E1Tmf0yrJEZuF2L6U= +git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391/go.mod h1:JVUzK3P8vcS9HGrEDu4Ye+Ll4g3hxJr/DDYkpiuNZik= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= @@ -2535,6 +2537,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= +github.com/igrmk/treemap/v2 v2.0.1 h1:Jhy4z3yhATvYZMWCmxsnHO5NnNZBdueSzvxh6353l+0= +github.com/igrmk/treemap/v2 v2.0.1/go.mod h1:PkTPvx+8OHS8/41jnnyVY+oVsfkaOUZGcr+sfonosd4= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973/go.mod h1:PoK3ejP3LJkGTzKqRlpvCIFas3ncU02v8zzWDW+g0FY= github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel index 636295cd..8b8c1c5e 100644 --- a/metropolis/node/kubernetes/BUILD.bazel +++ b/metropolis/node/kubernetes/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//metropolis/node/kubernetes/clusternet", "//metropolis/node/kubernetes/metricsprovider", "//metropolis/node/kubernetes/metricsproxy", + "//metropolis/node/kubernetes/networkpolicy", "//metropolis/node/kubernetes/nfproxy", "//metropolis/node/kubernetes/pki", "//metropolis/node/kubernetes/plugins/kvmdevice", diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel new file mode 100644 index 00000000..d3d3b760 --- /dev/null +++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "networkpolicy", + srcs = ["networkpolicy.go"], + importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy", + visibility = ["//visibility:public"], + deps = [ + "//go/logging", + "//metropolis/node", + "//osbase/supervisor", + "@io_k8s_api//core/v1:core", + "@io_k8s_client_go//informers", + "@io_k8s_client_go//kubernetes", + "@io_k8s_client_go//kubernetes/typed/core/v1:core", + "@io_k8s_client_go//tools/cache", + "@io_k8s_client_go//tools/cache/synctrack", + "@io_k8s_client_go//tools/record", + "@io_k8s_client_go//util/workqueue", + "@io_k8s_kubectl//pkg/scheme", + "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl", + ], +) diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go new file mode 100644 index 00000000..618ff031 --- /dev/null +++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go @@ -0,0 +1,141 @@ +// Copyright The Monogon Project Authors. +// SPDX-License-Identifier: Apache-2.0 + +package networkpolicy + +import ( + "context" + "fmt" + + "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubectl/pkg/scheme" + + "source.monogon.dev/go/logging" + "source.monogon.dev/metropolis/node" + "source.monogon.dev/osbase/supervisor" +) + +type Service struct { + Kubernetes kubernetes.Interface +} + +type workItem struct { + typ string + name cache.ObjectName +} + +type updateEnqueuer struct { + typ string + q workqueue.TypedInterface[workItem] + hasProcessed *synctrack.AsyncTracker[workItem] + l logging.Leveled +} + +func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) { + name, err := cache.ObjectToName(obj) + if err != nil { + c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err) + return + } + item := workItem{typ: c.typ, name: name} + if isInInitialList { + c.hasProcessed.Start(item) + } + c.q.Add(item) +} + +func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) { + name, err := cache.ObjectToName(newObj) + if err != nil { + c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err) + return + } + c.q.Add(workItem{typ: c.typ, name: name}) +} + +func (c *updateEnqueuer) OnDelete(obj interface{}) { + name, err := cache.DeletionHandlingObjectToName(obj) + if err != nil { + c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err) + return + } + c.q.Add(workItem{typ: c.typ, name: name}) +} + +func (c *Service) Run(ctx context.Context) error { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"}) + + nft := nftctrl.New(recorder, node.LinkGroupK8sPod) + defer nft.Close() + l := supervisor.Logger(ctx) + + informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0) + q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{ + Name: "networkpolicy", + }) + + var hasProcessed synctrack.AsyncTracker[workItem] + + nsInformer := informerFactory.Core().V1().Namespaces() + nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l}) + podInformer := informerFactory.Core().V1().Pods() + podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l}) + nwpInformer := informerFactory.Networking().V1().NetworkPolicies() + nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l}) + hasProcessed.UpstreamHasSynced = func() bool { + return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced() + } + informerFactory.Start(ctx.Done()) + + go func() { + <-ctx.Done() + q.ShutDown() + informerFactory.Shutdown() + }() + + hasSynced := false + for { + if ctx.Err() != nil { + return ctx.Err() + } + i, shut := q.Get() + if shut { + return ctx.Err() + } + switch i.typ { + case "pod": + pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name) + nft.SetPod(i.name, pod) + case "nwp": + nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name) + nft.SetNetworkPolicy(i.name, nwp) + case "ns": + ns, _ := nsInformer.Lister().Get(i.name.Name) + nft.SetNamespace(i.name.Name, ns) + } + hasProcessed.Finished(i) + if hasSynced { + if err := nft.Flush(); err != nil { + return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err) + } + } else if hasProcessed.HasSynced() { + if err := nft.Flush(); err != nil { + return fmt.Errorf("initial flush failed: %w", err) + } + l.Info("Initial sync completed") + supervisor.Signal(ctx, supervisor.SignalHealthy) + hasSynced = true + } + q.Done(i) + } +} diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go index 60d1ba4d..9ce99422 100644 --- a/metropolis/node/kubernetes/reconciler/resources_rbac.go +++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go @@ -94,7 +94,17 @@ func (r resourceClusterRoles) Expected() []meta.Object { }, { APIGroups: []string{""}, - Resources: []string{"services", "nodes", "namespaces"}, + Resources: []string{"services", "nodes", "namespaces", "pods"}, + Verbs: []string{"get", "list", "watch"}, + }, + { + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{"get", "list", "watch", "create", "update", "patch"}, + }, + { + APIGroups: []string{"networking.k8s.io"}, + Resources: []string{"networkpolicies"}, Verbs: []string{"get", "list", "watch"}, }, }, diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go index 6f6633bd..8ca85563 100644 --- a/metropolis/node/kubernetes/service_worker.go +++ b/metropolis/node/kubernetes/service_worker.go @@ -23,6 +23,7 @@ import ( "source.monogon.dev/metropolis/node/core/network" "source.monogon.dev/metropolis/node/kubernetes/clusternet" "source.monogon.dev/metropolis/node/kubernetes/metricsprovider" + "source.monogon.dev/metropolis/node/kubernetes/networkpolicy" "source.monogon.dev/metropolis/node/kubernetes/nfproxy" kpki "source.monogon.dev/metropolis/node/kubernetes/pki" "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice" @@ -209,6 +210,10 @@ func (s *Worker) Run(ctx context.Context) error { ClientSet: clients["netserv"].client, } + npc := networkpolicy.Service{ + Kubernetes: clients["netserv"].client, + } + var dnsIPRanges []netip.Prefix for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} { ipPrefix, err := netip.ParsePrefix(ipNet.String()) @@ -250,6 +255,7 @@ func (s *Worker) Run(ctx context.Context) error { {"nfproxy", nfproxy.Run}, {"dns-service", dnsService.Run}, {"dns-listener", runDNSListener}, + {"npc", npc.Run}, {"kvmdeviceplugin", kvmDevicePlugin.Run}, {"kubelet", kubelet.Run}, } { diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel index ac449eb4..99731a32 100644 --- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel +++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel @@ -43,8 +43,10 @@ go_test( "//metropolis/test/util", "@io_bazel_rules_go//go/runfiles", "@io_k8s_api//core/v1:core", + "@io_k8s_api//networking/v1:networking", "@io_k8s_apimachinery//pkg/api/errors", "@io_k8s_apimachinery//pkg/apis/meta/v1:meta", + "@io_k8s_apimachinery//pkg/util/intstr", "@io_k8s_kubernetes//pkg/api/v1/pod", "@io_k8s_utils//ptr", "@org_golang_google_protobuf//types/known/fieldmaskpb", diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go index 939c6630..ee39c1cb 100644 --- a/metropolis/test/e2e/suites/kubernetes/run_test.go +++ b/metropolis/test/e2e/suites/kubernetes/run_test.go @@ -22,8 +22,10 @@ import ( "github.com/bazelbuild/rules_go/go/runfiles" "google.golang.org/protobuf/types/known/fieldmaskpb" corev1 "k8s.io/api/core/v1" + nwkv1 "k8s.io/api/networking/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" podv1 "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/ptr" @@ -392,6 +394,51 @@ func TestE2EKubernetes(t *testing.T) { }) ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) }) + t.Run("Network Policy Smoke Test", func(t *testing.T) { + ct := connectivity.SetupTest(t, &connectivity.TestSpec{ + Name: "npc-smoke", + ClientSet: clientSet, + RESTConfig: restConfig, + NumPods: 2, + ExtraPodConfig: func(i int, pod *corev1.Pod) { + // Spread pods out over nodes to test inter-node network + pod.Labels = make(map[string]string) + pod.Labels["name"] = "npc-smoke" + pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{ + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: metav1.SetAsLabelSelector(pod.Labels), + }} + }, + }) + // Test connectivity before applying network policy + ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) + ct.TestPodConnectivity(t, 0, 1, 1235, connectivity.ExpectedSuccess) + nwp := &nwkv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "npc-smoke", + }, + Spec: nwkv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}}, + Ingress: []nwkv1.NetworkPolicyIngressRule{{ + Ports: []nwkv1.NetworkPolicyPort{{ + Protocol: ptr.To(corev1.ProtocolTCP), + Port: &intstr.IntOrString{Type: intstr.Int, IntVal: 1234}, + }}, + From: []nwkv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}}, + }}, + }}, + }, + } + if _, err := clientSet.NetworkingV1().NetworkPolicies("default").Create(context.Background(), nwp, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + // Check if policy is in effect + ct.TestPodConnectivityEventual(t, 0, 1, 1235, connectivity.ExpectedReject, 30*time.Second) + ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) + }) for _, runtimeClass := range []string{"runc", "gvisor"} { statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass) util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {