Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2023 Authors of kcrow
# SPDX-License-Identifier: Apache-2.0

ARG BASE_IMAGE=docker.io/library/busybox:1.36.1
ARG BASE_IMAGE=docker.m.daocloud.io/library/alpine
FROM ${BASE_IMAGE} AS builder
RUN apk add --no-cache xfsprogs-extra xfsprogs

FROM ${BASE_IMAGE}

Expand All @@ -11,6 +13,8 @@ ARG GIT_COMMIT_TIME
ENV GIT_COMMIT_TIME=${GIT_COMMIT_TIME}
ARG VERSION
ENV VERSION=${VERSION}
COPY --from=builder /usr/sbin/xfs_quota /usr/sbin/xfs_quota
RUN apk add --no-cache xfsprogs libedit

COPY bin/* /usr/bin/
CMD ["/usr/bin/daemon daemon"]
16 changes: 16 additions & 0 deletions charts/kcrowdaemon/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
{{- end }}
priorityClassName: {{ default "system-node-critical" .Values.controller.priorityClassName }}
hostNetwork: true
hostPID: true
{{- if not .Values.controller.kubeconfig }}
serviceAccountName: {{ .Values.controller.name | trunc 63 | trimSuffix "-" }}
{{- end }}
Expand All @@ -50,6 +51,8 @@ spec:
{{- end }}
containers:
- name: {{ .Values.controller.name | trunc 63 | trimSuffix "-" }}
securityContext:
privileged: true
image: {{ include "kcrow.controller.image" . | quote }}
imagePullPolicy: {{ .Values.controller.image.pullPolicy }}
command:
Expand Down Expand Up @@ -101,6 +104,11 @@ spec:
- mountPath: /var/run/nri/nri.sock
name: socket
readOnly: true
- mountPath: /dev
name: host-dev
- mountPath: /var/lib/containerd
mountPropagation: HostToContainer
name: containerd-dir
volumes:
{{- if .Values.controller.kubeconfig }}
- name: kubeconfig
Expand All @@ -112,3 +120,11 @@ spec:
hostPath:
path: {{ .Values.controller.nriSock }}
type: Socket
- hostPath:
path: /var/lib/containerd
type: Directory
name: containerd-dir
- hostPath:
path: /dev
type: Directory
name: host-dev
4 changes: 3 additions & 1 deletion cmd/daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/pyroscope-go"
"github.com/kcrow-io/kcrow/pkg"
"github.com/kcrow-io/kcrow/pkg/cgroup"
"github.com/kcrow-io/kcrow/pkg/disk"
"github.com/kcrow-io/kcrow/pkg/k8s"
"github.com/kcrow-io/kcrow/pkg/ulimit"
"github.com/kcrow-io/kcrow/pkg/util"
Expand Down Expand Up @@ -140,10 +141,11 @@ func initControllerServiceManagers(ctrlctx *ControllerContext) {
// init manager
coci := cgroup.CgroupManager(noc, nsc, pom)
roci := ulimit.RlimitManager(noc, nsc, pom)
diskci := disk.DiskManager(nsc, pom)
voli := vmvol.New(ctrlctx.InnerCtx, volm, rmm, pom)

// registry manager
hub, err := pkg.New(ctrlctx.InnerCtx, ctrlctx.Cfg.NriSockPath, coci, roci, voli)
hub, err := pkg.New(ctrlctx.InnerCtx, ctrlctx.Cfg.NriSockPath, coci, roci, voli, diskci)

if err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/kcrow-io/kcrow

go 1.21

toolchain go1.24.1

require (
Expand Down
5 changes: 5 additions & 0 deletions pkg/cgroup/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"sync"

"github.com/containerd/nri/pkg/api"
merr "github.com/kcrow-io/kcrow/pkg/errors"
"github.com/kcrow-io/kcrow/pkg/k8s"
"github.com/kcrow-io/kcrow/pkg/oci"
Expand Down Expand Up @@ -104,6 +105,10 @@ func (m *manager) Process(ctx context.Context, im *oci.Item) error {
return nil
}

func (m *manager) Start(ctx context.Context, pod *api.PodSandbox, container *api.Container) error {
return nil
}

// only support [kind].[suffix]
func (m *manager) NodeUpdate(ni *k8s.NodeItem) {
switch ni.Ev {
Expand Down
104 changes: 104 additions & 0 deletions pkg/disk/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package disk

import (
"bufio"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
)

func getOverlayPath(containerRootfs string) (uint64, string, error) {
f, err := os.Open(SystemMountInfoFile)
if err != nil {
return 0, "", fmt.Errorf("failed to open host_mountinfo: %v", err)
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, containerRootfs) {
continue
}

fields := strings.Split(line, " - ")
if len(fields) < 2 {
continue
}

preFields := strings.Fields(fields[0])
mountPoint := preFields[4]

if mountPoint != containerRootfs {
continue
}

postFields := strings.Fields(fields[1])
if len(postFields) < 3 {
continue
}
options := postFields[2]
for _, opt := range strings.Split(options, ",") {
if strings.HasPrefix(opt, "upperdir=") {
upperDir := strings.TrimPrefix(opt, "upperdir=")
cleanPath := filepath.Clean(upperDir)
if strings.HasSuffix(cleanPath, "/fs") {
cleanPath = filepath.Dir(cleanPath)
}
snapshotId, err := strconv.ParseUint(filepath.Base(cleanPath), 10, 64)
if err != nil {
return 0, "", fmt.Errorf("failed to parse snapshot id from path [%s]: %v", upperDir, err)
}

return snapshotId, upperDir, nil
}
}
}

return 0, "", fmt.Errorf("overlay path not found in mountinfo for %s", containerRootfs)
}

func applyXFSQuota(projectID uint64, path string, limitMB int) error {
mountPoint := ContainerdRootPath
exec.Command("xfs_quota", "-x", "-c", fmt.Sprintf("project -C -p %s %d", path, projectID), mountPoint).Run()

setupCmd := exec.Command("xfs_quota", "-x", "-c", fmt.Sprintf("project -s -p %s %d", path, projectID), mountPoint)
if out, err := setupCmd.CombinedOutput(); err != nil {
return fmt.Errorf("setup project failed: %s, %v", string(out), err)
}

workpath := getWorkPath(path)

workCmd := exec.Command("xfs_quota", "-x", "-c", fmt.Sprintf("project -s -p %s %d", workpath, projectID), mountPoint)
if out, err := workCmd.CombinedOutput(); err != nil {
return fmt.Errorf("setup project failed: %s, %v", string(out), err)
}

limitCmd := exec.Command("xfs_quota", "-x", "-c", fmt.Sprintf("limit -p bhard=%dm %d", limitMB, projectID), mountPoint)
if out, err := limitCmd.CombinedOutput(); err != nil {
return fmt.Errorf("set limit failed: %s, %v", string(out), err)
}
return nil
}

func getWorkPath(fsPath string) string {
return strings.TrimSuffix(fsPath, "/fs") + "/work"
}

func checkContainerdRootPathQuotaEnabled() bool {
data, _ := os.ReadFile("/proc/mounts")
mountPoint := "/var/lib/containerd"

for _, line := range strings.Split(string(data), "\n") {
fields := strings.Fields(line)
if len(fields) >= 4 && fields[1] == mountPoint {
opts := "," + fields[3] + ","
// Due to the current support of only the xfs file system, only Prjquota is determined, and the ext4 file system is identified as pquota
return strings.Contains(opts, ",prjquota,")
}
}
return false
}
100 changes: 100 additions & 0 deletions pkg/disk/manage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package disk

import (
"context"
"path/filepath"
"strconv"
"sync"

"github.com/containerd/nri/pkg/api"
"github.com/kcrow-io/kcrow/pkg/k8s"
"github.com/kcrow-io/kcrow/pkg/oci"
"k8s.io/klog/v2"
)

const (
DiskAnnotation = "size.disk.kcorw.io"
ContainerdBasePath = "/run/containerd/io.containerd.runtime.v2.task/k8s.io/"
ContainerdRootPath = "/var/lib/containerd"
SystemMountInfoFile = "/proc/1/mountinfo"
)

type manager struct {
po *k8s.PodManage
mu sync.RWMutex
namespace map[string]string
}

func DiskManager(ns *k8s.NsManage, po *k8s.PodManage) oci.Oci {
if !checkContainerdRootPathQuotaEnabled() {
klog.Warning("The disk where /var/lib/containerd is located does not have prjquota enabled, skipping DiskManager init..... ")
return nil
}

m := &manager{
po: po,
namespace: make(map[string]string),
}
ns.Registe(m)
return m
}

func (m *manager) Name() string { return "disk" }

func (m *manager) Process(ctx context.Context, im *oci.Item) error {
return nil
}

func (m *manager) Start(ctx context.Context, pod *api.PodSandbox, container *api.Container) error {

limitStr, ok := pod.Annotations[DiskAnnotation]
if !ok {
limitStr, ok = m.namespace[pod.Namespace]
if !ok {
return nil
}
}

limitMB, err := strconv.Atoi(limitStr)
if err != nil || limitMB <= 0 {
klog.Errorf("Invalid quota limit for %s: %s", pod.Name, limitStr)
return nil
}

rootfsPath := filepath.Join(ContainerdBasePath, container.Id, "rootfs")

klog.V(2).Infof("Applying quota %d MB to container %s (ID: %s) at %s", limitMB, container.Name, container.Id, rootfsPath)

runPath := filepath.Join(ContainerdBasePath, container.Id, "rootfs")
//Obtain the snapshot ID of overlays as the ProjectID of xfs_quota
snapshotID, foundPath, err := getOverlayPath(runPath)
if err == nil && foundPath != "" {
rootfsPath = foundPath
} else {
klog.Errorf("Could not find physical path for container %s", container.Id)
return nil
}

klog.V(2).Infof("Target XFS Quota Path: %s, Quota ProjectID: %v", rootfsPath, snapshotID)

if err := applyXFSQuota(snapshotID, rootfsPath, limitMB); err != nil {
klog.Errorf("Failed to apply quota: %v", err)
}
return nil
}

func (m *manager) NamespaceUpdate(ni *k8s.NsItem) {
switch ni.Ev {
case k8s.AddEvent, k8s.UpdateEvent:
default:
return
}
val := ni.Ns.Annotations[DiskAnnotation]
m.mu.Lock()
defer m.mu.Unlock()
if val != "" {
m.namespace[ni.Ns.GetName()] = val
} else {
delete(m.namespace, ni.Ns.GetName())
}
}
17 changes: 15 additions & 2 deletions pkg/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func New(ctx context.Context, nripath string, ocis ...oci.Oci) (*Hub, error) {
}

for _, oc := range ocis {
klog.Infof("registe controller '%v'", oc.Name())
hub.rcs = append(hub.rcs, oc)
if oc != nil {
klog.Infof("registe controller '%v'", oc.Name())
hub.rcs = append(hub.rcs, oc)
}
}
return hub, nil
}
Expand Down Expand Up @@ -91,6 +93,17 @@ func (h *Hub) CreateContainer(ctx context.Context, p *api.PodSandbox, ct *api.Co
return adjust, nil, err
}

func (h *Hub) StartContainer(ctx context.Context, p *api.PodSandbox, ct *api.Container) error {
for _, rc := range h.rcs {
// 调用每个插件的 Start 方法
if err := rc.Start(ctx, p, ct); err != nil {
klog.Errorf("controller %s start failed: %v", rc.Name(), err)
return err
}
}
return nil
}

func newStub(rc any, nripath string) (stub.Stub, error) {
return stub.New(rc,
stub.WithSocketPath(nripath),
Expand Down
2 changes: 1 addition & 1 deletion pkg/oci/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Item struct {

type Oci interface {
Name() string

Process(context.Context, *Item) error
Start(ctx context.Context, pod *api.PodSandbox, container *api.Container) error
}

func GetPodInfo(ct *api.Container) types.NamespacedName {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ulimit/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

"github.com/containerd/nri/pkg/api"
merr "github.com/kcrow-io/kcrow/pkg/errors"
"github.com/kcrow-io/kcrow/pkg/k8s"
"github.com/kcrow-io/kcrow/pkg/oci"
Expand Down Expand Up @@ -37,6 +38,10 @@ func (m *manager) Name() string {
return "ulimit"
}

func (m *manager) Start(ctx context.Context, pod *api.PodSandbox, container *api.Container) error {
return nil
}

func (m *manager) Process(ctx context.Context, im *oci.Item) error {
if im == nil || im.Adjust == nil {
return errors.Join(fmt.Errorf("process %s, but data invalid", m.Name()), &merr.InternalError{})
Expand Down
4 changes: 4 additions & 0 deletions pkg/vmvol/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (m *manager) Process(ctx context.Context, im *oci.Item) error {
return nil
}

func (m *manager) Start(ctx context.Context, pod *api.PodSandbox, container *api.Container) error {
return nil
}

func kataPrefixPath(cnt *api.Container) string {
return fmt.Sprintf("/run/kata-containers/%s/rootfs", cnt.Id)
}
Loading