diff --git a/build/bazel/go.MODULE.bazel b/build/bazel/go.MODULE.bazel index 2e5402dc..20e7dc41 100644 --- a/build/bazel/go.MODULE.bazel +++ b/build/bazel/go.MODULE.bazel @@ -88,6 +88,7 @@ use_repo( "io_k8s_kubernetes", "io_k8s_pod_security_admission", "io_k8s_utils", + "org_dolansoft_git_dolansoft_k8s_nft_npc", "org_go4_netipx", "org_golang_google_api", "org_golang_google_genproto_googleapis_api", diff --git a/go.mod b/go.mod index 481bc0d3..ebfbf6ad 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.14.0 require ( 4d63.com/gocheckcompilerdirectives v1.2.1 cloud.google.com/go/storage v1.38.0 + git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 github.com/adrg/xdg v0.4.0 github.com/bazelbuild/rules_go v0.52.0 github.com/cavaliergopher/cpio v1.0.1 @@ -146,7 +147,7 @@ require ( k8s.io/client-go v0.32.0 k8s.io/component-base v0.32.0 k8s.io/klog/v2 v2.130.1 - k8s.io/kubectl v0.0.0 + k8s.io/kubectl v0.32.0 k8s.io/kubelet v0.32.0 k8s.io/kubernetes v1.32.0 k8s.io/pod-security-admission v0.0.0 @@ -280,6 +281,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hodgesds/perf-utils v0.7.0 // indirect + github.com/igrmk/treemap/v2 v2.0.1 // indirect github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/intel/goresctrl v0.8.0 // indirect diff --git a/go.sum b/go.sum index c838a8d9..8e150755 100644 --- a/go.sum +++ b/go.sum @@ -1356,6 +1356,8 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= +git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 h1:VcDYYx80mOeRWBwBr2Hs1grbz1E1Tmf0yrJEZuF2L6U= +git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391/go.mod h1:JVUzK3P8vcS9HGrEDu4Ye+Ll4g3hxJr/DDYkpiuNZik= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= @@ -2535,6 +2537,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= +github.com/igrmk/treemap/v2 v2.0.1 h1:Jhy4z3yhATvYZMWCmxsnHO5NnNZBdueSzvxh6353l+0= +github.com/igrmk/treemap/v2 v2.0.1/go.mod h1:PkTPvx+8OHS8/41jnnyVY+oVsfkaOUZGcr+sfonosd4= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973/go.mod h1:PoK3ejP3LJkGTzKqRlpvCIFas3ncU02v8zzWDW+g0FY= github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel index 636295cd..8b8c1c5e 100644 --- a/metropolis/node/kubernetes/BUILD.bazel +++ b/metropolis/node/kubernetes/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//metropolis/node/kubernetes/clusternet", "//metropolis/node/kubernetes/metricsprovider", "//metropolis/node/kubernetes/metricsproxy", + "//metropolis/node/kubernetes/networkpolicy", "//metropolis/node/kubernetes/nfproxy", "//metropolis/node/kubernetes/pki", "//metropolis/node/kubernetes/plugins/kvmdevice", diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel new file mode 100644 index 00000000..d3d3b760 --- /dev/null +++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "networkpolicy", + srcs = ["networkpolicy.go"], + importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy", + visibility = ["//visibility:public"], + deps = [ + "//go/logging", + "//metropolis/node", + "//osbase/supervisor", + "@io_k8s_api//core/v1:core", + "@io_k8s_client_go//informers", + "@io_k8s_client_go//kubernetes", + "@io_k8s_client_go//kubernetes/typed/core/v1:core", + "@io_k8s_client_go//tools/cache", + "@io_k8s_client_go//tools/cache/synctrack", + "@io_k8s_client_go//tools/record", + "@io_k8s_client_go//util/workqueue", + "@io_k8s_kubectl//pkg/scheme", + "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl", + ], +) diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go new file mode 100644 index 00000000..618ff031 --- /dev/null +++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go @@ -0,0 +1,141 @@ +// Copyright The Monogon Project Authors. +// SPDX-License-Identifier: Apache-2.0 + +package networkpolicy + +import ( + "context" + "fmt" + + "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubectl/pkg/scheme" + + "source.monogon.dev/go/logging" + "source.monogon.dev/metropolis/node" + "source.monogon.dev/osbase/supervisor" +) + +type Service struct { + Kubernetes kubernetes.Interface +} + +type workItem struct { + typ string + name cache.ObjectName +} + +type updateEnqueuer struct { + typ string + q workqueue.TypedInterface[workItem] + hasProcessed *synctrack.AsyncTracker[workItem] + l logging.Leveled +} + +func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) { + name, err := cache.ObjectToName(obj) + if err != nil { + c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err) + return + } + item := workItem{typ: c.typ, name: name} + if isInInitialList { + c.hasProcessed.Start(item) + } + c.q.Add(item) +} + +func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) { + name, err := cache.ObjectToName(newObj) + if err != nil { + c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err) + return + } + c.q.Add(workItem{typ: c.typ, name: name}) +} + +func (c *updateEnqueuer) OnDelete(obj interface{}) { + name, err := cache.DeletionHandlingObjectToName(obj) + if err != nil { + c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err) + return + } + c.q.Add(workItem{typ: c.typ, name: name}) +} + +func (c *Service) Run(ctx context.Context) error { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"}) + + nft := nftctrl.New(recorder, node.LinkGroupK8sPod) + defer nft.Close() + l := supervisor.Logger(ctx) + + informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0) + q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{ + Name: "networkpolicy", + }) + + var hasProcessed synctrack.AsyncTracker[workItem] + + nsInformer := informerFactory.Core().V1().Namespaces() + nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l}) + podInformer := informerFactory.Core().V1().Pods() + podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l}) + nwpInformer := informerFactory.Networking().V1().NetworkPolicies() + nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l}) + hasProcessed.UpstreamHasSynced = func() bool { + return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced() + } + informerFactory.Start(ctx.Done()) + + go func() { + <-ctx.Done() + q.ShutDown() + informerFactory.Shutdown() + }() + + hasSynced := false + for { + if ctx.Err() != nil { + return ctx.Err() + } + i, shut := q.Get() + if shut { + return ctx.Err() + } + switch i.typ { + case "pod": + pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name) + nft.SetPod(i.name, pod) + case "nwp": + nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name) + nft.SetNetworkPolicy(i.name, nwp) + case "ns": + ns, _ := nsInformer.Lister().Get(i.name.Name) + nft.SetNamespace(i.name.Name, ns) + } + hasProcessed.Finished(i) + if hasSynced { + if err := nft.Flush(); err != nil { + return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err) + } + } else if hasProcessed.HasSynced() { + if err := nft.Flush(); err != nil { + return fmt.Errorf("initial flush failed: %w", err) + } + l.Info("Initial sync completed") + supervisor.Signal(ctx, supervisor.SignalHealthy) + hasSynced = true + } + q.Done(i) + } +} diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go index 60d1ba4d..9ce99422 100644 --- a/metropolis/node/kubernetes/reconciler/resources_rbac.go +++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go @@ -94,7 +94,17 @@ func (r resourceClusterRoles) Expected() []meta.Object { }, { APIGroups: []string{""}, - Resources: []string{"services", "nodes", "namespaces"}, + Resources: []string{"services", "nodes", "namespaces", "pods"}, + Verbs: []string{"get", "list", "watch"}, + }, + { + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{"get", "list", "watch", "create", "update", "patch"}, + }, + { + APIGroups: []string{"networking.k8s.io"}, + Resources: []string{"networkpolicies"}, Verbs: []string{"get", "list", "watch"}, }, }, diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go index 6f6633bd..8ca85563 100644 --- a/metropolis/node/kubernetes/service_worker.go +++ b/metropolis/node/kubernetes/service_worker.go @@ -23,6 +23,7 @@ import ( "source.monogon.dev/metropolis/node/core/network" "source.monogon.dev/metropolis/node/kubernetes/clusternet" "source.monogon.dev/metropolis/node/kubernetes/metricsprovider" + "source.monogon.dev/metropolis/node/kubernetes/networkpolicy" "source.monogon.dev/metropolis/node/kubernetes/nfproxy" kpki "source.monogon.dev/metropolis/node/kubernetes/pki" "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice" @@ -209,6 +210,10 @@ func (s *Worker) Run(ctx context.Context) error { ClientSet: clients["netserv"].client, } + npc := networkpolicy.Service{ + Kubernetes: clients["netserv"].client, + } + var dnsIPRanges []netip.Prefix for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} { ipPrefix, err := netip.ParsePrefix(ipNet.String()) @@ -250,6 +255,7 @@ func (s *Worker) Run(ctx context.Context) error { {"nfproxy", nfproxy.Run}, {"dns-service", dnsService.Run}, {"dns-listener", runDNSListener}, + {"npc", npc.Run}, {"kvmdeviceplugin", kvmDevicePlugin.Run}, {"kubelet", kubelet.Run}, } { diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel index ac449eb4..99731a32 100644 --- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel +++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel @@ -43,8 +43,10 @@ go_test( "//metropolis/test/util", "@io_bazel_rules_go//go/runfiles", "@io_k8s_api//core/v1:core", + "@io_k8s_api//networking/v1:networking", "@io_k8s_apimachinery//pkg/api/errors", "@io_k8s_apimachinery//pkg/apis/meta/v1:meta", + "@io_k8s_apimachinery//pkg/util/intstr", "@io_k8s_kubernetes//pkg/api/v1/pod", "@io_k8s_utils//ptr", "@org_golang_google_protobuf//types/known/fieldmaskpb", diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go index 939c6630..ee39c1cb 100644 --- a/metropolis/test/e2e/suites/kubernetes/run_test.go +++ b/metropolis/test/e2e/suites/kubernetes/run_test.go @@ -22,8 +22,10 @@ import ( "github.com/bazelbuild/rules_go/go/runfiles" "google.golang.org/protobuf/types/known/fieldmaskpb" corev1 "k8s.io/api/core/v1" + nwkv1 "k8s.io/api/networking/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" podv1 "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/ptr" @@ -392,6 +394,51 @@ func TestE2EKubernetes(t *testing.T) { }) ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) }) + t.Run("Network Policy Smoke Test", func(t *testing.T) { + ct := connectivity.SetupTest(t, &connectivity.TestSpec{ + Name: "npc-smoke", + ClientSet: clientSet, + RESTConfig: restConfig, + NumPods: 2, + ExtraPodConfig: func(i int, pod *corev1.Pod) { + // Spread pods out over nodes to test inter-node network + pod.Labels = make(map[string]string) + pod.Labels["name"] = "npc-smoke" + pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{ + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: metav1.SetAsLabelSelector(pod.Labels), + }} + }, + }) + // Test connectivity before applying network policy + ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) + ct.TestPodConnectivity(t, 0, 1, 1235, connectivity.ExpectedSuccess) + nwp := &nwkv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "npc-smoke", + }, + Spec: nwkv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}}, + Ingress: []nwkv1.NetworkPolicyIngressRule{{ + Ports: []nwkv1.NetworkPolicyPort{{ + Protocol: ptr.To(corev1.ProtocolTCP), + Port: &intstr.IntOrString{Type: intstr.Int, IntVal: 1234}, + }}, + From: []nwkv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}}, + }}, + }}, + }, + } + if _, err := clientSet.NetworkingV1().NetworkPolicies("default").Create(context.Background(), nwp, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + // Check if policy is in effect + ct.TestPodConnectivityEventual(t, 0, 1, 1235, connectivity.ExpectedReject, 30*time.Second) + ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess) + }) for _, runtimeClass := range []string{"runc", "gvisor"} { statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass) util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {