Skip to content

Commit

Permalink
m/n/k8s: add nftables network policy controller
Browse files Browse the repository at this point in the history
This integrates my K8s network policy controller. In its current form it
does not have many guarantees as the custom CNI plugin is not yet in
there but it mostly works. Also there is still a DNS hole as host-local
services are not properly policed yet.

It has a basic smoke test using the connectivity testing helper as well
as some metrics to make sure it is integrated properly and to be able to
monitor its performance.

Change-Id: Ia2f54b9975361270678ce742ae5e32df25e515c5
Reviewed-on: https://review.monogon.dev/c/monogon/+/3740
Tested-by: Jenkins CI
Reviewed-by: Jan Schär <[email protected]>
  • Loading branch information
lorenz committed Feb 11, 2025
1 parent e8beaed commit 52700ae
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 2 deletions.
1 change: 1 addition & 0 deletions build/bazel/go.MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use_repo(
"io_k8s_kubernetes",
"io_k8s_pod_security_admission",
"io_k8s_utils",
"org_dolansoft_git_dolansoft_k8s_nft_npc",
"org_go4_netipx",
"org_golang_google_api",
"org_golang_google_genproto_googleapis_api",
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.14.0
require (
4d63.com/gocheckcompilerdirectives v1.2.1
cloud.google.com/go/storage v1.38.0
git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391
github.com/adrg/xdg v0.4.0
github.com/bazelbuild/rules_go v0.52.0
github.com/cavaliergopher/cpio v1.0.1
Expand Down Expand Up @@ -146,7 +147,7 @@ require (
k8s.io/client-go v0.32.0
k8s.io/component-base v0.32.0
k8s.io/klog/v2 v2.130.1
k8s.io/kubectl v0.0.0
k8s.io/kubectl v0.32.0
k8s.io/kubelet v0.32.0
k8s.io/kubernetes v1.32.0
k8s.io/pod-security-admission v0.0.0
Expand Down Expand Up @@ -280,6 +281,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hodgesds/perf-utils v0.7.0 // indirect
github.com/igrmk/treemap/v2 v2.0.1 // indirect
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/intel/goresctrl v0.8.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,8 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 h1:VcDYYx80mOeRWBwBr2Hs1grbz1E1Tmf0yrJEZuF2L6U=
git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391/go.mod h1:JVUzK3P8vcS9HGrEDu4Ye+Ll4g3hxJr/DDYkpiuNZik=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
Expand Down Expand Up @@ -2535,6 +2537,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/igrmk/treemap/v2 v2.0.1 h1:Jhy4z3yhATvYZMWCmxsnHO5NnNZBdueSzvxh6353l+0=
github.com/igrmk/treemap/v2 v2.0.1/go.mod h1:PkTPvx+8OHS8/41jnnyVY+oVsfkaOUZGcr+sfonosd4=
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ=
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973/go.mod h1:PoK3ejP3LJkGTzKqRlpvCIFas3ncU02v8zzWDW+g0FY=
github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
Expand Down
1 change: 1 addition & 0 deletions metropolis/node/kubernetes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//metropolis/node/kubernetes/clusternet",
"//metropolis/node/kubernetes/metricsprovider",
"//metropolis/node/kubernetes/metricsproxy",
"//metropolis/node/kubernetes/networkpolicy",
"//metropolis/node/kubernetes/nfproxy",
"//metropolis/node/kubernetes/pki",
"//metropolis/node/kubernetes/plugins/kvmdevice",
Expand Down
23 changes: 23 additions & 0 deletions metropolis/node/kubernetes/networkpolicy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "networkpolicy",
srcs = ["networkpolicy.go"],
importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy",
visibility = ["//visibility:public"],
deps = [
"//go/logging",
"//metropolis/node",
"//osbase/supervisor",
"@io_k8s_api//core/v1:core",
"@io_k8s_client_go//informers",
"@io_k8s_client_go//kubernetes",
"@io_k8s_client_go//kubernetes/typed/core/v1:core",
"@io_k8s_client_go//tools/cache",
"@io_k8s_client_go//tools/cache/synctrack",
"@io_k8s_client_go//tools/record",
"@io_k8s_client_go//util/workqueue",
"@io_k8s_kubectl//pkg/scheme",
"@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
],
)
141 changes: 141 additions & 0 deletions metropolis/node/kubernetes/networkpolicy/networkpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The Monogon Project Authors.
// SPDX-License-Identifier: Apache-2.0

package networkpolicy

import (
"context"
"fmt"

"git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubectl/pkg/scheme"

"source.monogon.dev/go/logging"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/osbase/supervisor"
)

type Service struct {
Kubernetes kubernetes.Interface
}

type workItem struct {
typ string
name cache.ObjectName
}

type updateEnqueuer struct {
typ string
q workqueue.TypedInterface[workItem]
hasProcessed *synctrack.AsyncTracker[workItem]
l logging.Leveled
}

func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
name, err := cache.ObjectToName(obj)
if err != nil {
c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
return
}
item := workItem{typ: c.typ, name: name}
if isInInitialList {
c.hasProcessed.Start(item)
}
c.q.Add(item)
}

func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
name, err := cache.ObjectToName(newObj)
if err != nil {
c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
return
}
c.q.Add(workItem{typ: c.typ, name: name})
}

func (c *updateEnqueuer) OnDelete(obj interface{}) {
name, err := cache.DeletionHandlingObjectToName(obj)
if err != nil {
c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
return
}
c.q.Add(workItem{typ: c.typ, name: name})
}

func (c *Service) Run(ctx context.Context) error {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})

nft := nftctrl.New(recorder, node.LinkGroupK8sPod)
defer nft.Close()
l := supervisor.Logger(ctx)

informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
Name: "networkpolicy",
})

var hasProcessed synctrack.AsyncTracker[workItem]

nsInformer := informerFactory.Core().V1().Namespaces()
nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
podInformer := informerFactory.Core().V1().Pods()
podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
hasProcessed.UpstreamHasSynced = func() bool {
return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
}
informerFactory.Start(ctx.Done())

go func() {
<-ctx.Done()
q.ShutDown()
informerFactory.Shutdown()
}()

hasSynced := false
for {
if ctx.Err() != nil {
return ctx.Err()
}
i, shut := q.Get()
if shut {
return ctx.Err()
}
switch i.typ {
case "pod":
pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
nft.SetPod(i.name, pod)
case "nwp":
nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
nft.SetNetworkPolicy(i.name, nwp)
case "ns":
ns, _ := nsInformer.Lister().Get(i.name.Name)
nft.SetNamespace(i.name.Name, ns)
}
hasProcessed.Finished(i)
if hasSynced {
if err := nft.Flush(); err != nil {
return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
}
} else if hasProcessed.HasSynced() {
if err := nft.Flush(); err != nil {
return fmt.Errorf("initial flush failed: %w", err)
}
l.Info("Initial sync completed")
supervisor.Signal(ctx, supervisor.SignalHealthy)
hasSynced = true
}
q.Done(i)
}
}
12 changes: 11 additions & 1 deletion metropolis/node/kubernetes/reconciler/resources_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,17 @@ func (r resourceClusterRoles) Expected() []meta.Object {
},
{
APIGroups: []string{""},
Resources: []string{"services", "nodes", "namespaces"},
Resources: []string{"services", "nodes", "namespaces", "pods"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{"get", "list", "watch", "create", "update", "patch"},
},
{
APIGroups: []string{"networking.k8s.io"},
Resources: []string{"networkpolicies"},
Verbs: []string{"get", "list", "watch"},
},
},
Expand Down
6 changes: 6 additions & 0 deletions metropolis/node/kubernetes/service_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes/clusternet"
"source.monogon.dev/metropolis/node/kubernetes/metricsprovider"
"source.monogon.dev/metropolis/node/kubernetes/networkpolicy"
"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Expand Down Expand Up @@ -209,6 +210,10 @@ func (s *Worker) Run(ctx context.Context) error {
ClientSet: clients["netserv"].client,
}

npc := networkpolicy.Service{
Kubernetes: clients["netserv"].client,
}

var dnsIPRanges []netip.Prefix
for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
ipPrefix, err := netip.ParsePrefix(ipNet.String())
Expand Down Expand Up @@ -250,6 +255,7 @@ func (s *Worker) Run(ctx context.Context) error {
{"nfproxy", nfproxy.Run},
{"dns-service", dnsService.Run},
{"dns-listener", runDNSListener},
{"npc", npc.Run},
{"kvmdeviceplugin", kvmDevicePlugin.Run},
{"kubelet", kubelet.Run},
} {
Expand Down
2 changes: 2 additions & 0 deletions metropolis/test/e2e/suites/kubernetes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ go_test(
"//metropolis/test/util",
"@io_bazel_rules_go//go/runfiles",
"@io_k8s_api//core/v1:core",
"@io_k8s_api//networking/v1:networking",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_kubernetes//pkg/api/v1/pod",
"@io_k8s_utils//ptr",
"@org_golang_google_protobuf//types/known/fieldmaskpb",
Expand Down
47 changes: 47 additions & 0 deletions metropolis/test/e2e/suites/kubernetes/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/bazelbuild/rules_go/go/runfiles"
"google.golang.org/protobuf/types/known/fieldmaskpb"
corev1 "k8s.io/api/core/v1"
nwkv1 "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -392,6 +394,51 @@ func TestE2EKubernetes(t *testing.T) {
})
ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
})
t.Run("Network Policy Smoke Test", func(t *testing.T) {
ct := connectivity.SetupTest(t, &connectivity.TestSpec{
Name: "npc-smoke",
ClientSet: clientSet,
RESTConfig: restConfig,
NumPods: 2,
ExtraPodConfig: func(i int, pod *corev1.Pod) {
// Spread pods out over nodes to test inter-node network
pod.Labels = make(map[string]string)
pod.Labels["name"] = "npc-smoke"
pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: metav1.SetAsLabelSelector(pod.Labels),
}}
},
})
// Test connectivity before applying network policy
ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
ct.TestPodConnectivity(t, 0, 1, 1235, connectivity.ExpectedSuccess)
nwp := &nwkv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "npc-smoke",
},
Spec: nwkv1.NetworkPolicySpec{
PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
Ingress: []nwkv1.NetworkPolicyIngressRule{{
Ports: []nwkv1.NetworkPolicyPort{{
Protocol: ptr.To(corev1.ProtocolTCP),
Port: &intstr.IntOrString{Type: intstr.Int, IntVal: 1234},
}},
From: []nwkv1.NetworkPolicyPeer{{
PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
}},
}},
},
}
if _, err := clientSet.NetworkingV1().NetworkPolicies("default").Create(context.Background(), nwp, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
// Check if policy is in effect
ct.TestPodConnectivityEventual(t, 0, 1, 1235, connectivity.ExpectedReject, 30*time.Second)
ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
})
for _, runtimeClass := range []string{"runc", "gvisor"} {
statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass)
util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {
Expand Down

0 comments on commit 52700ae

Please sign in to comment.