Skip to content

fix image references and increase leniency of the pod anti affinity rule #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/cloudstack-csi-driver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ RUN apk add --no-cache \
# blkid, mount and umount are required by k8s.io/mount-utils \
blkid \
mount \
umount
umount \
# Provides udevadm for device management
udev

COPY ./bin/cloudstack-csi-driver /cloudstack-csi-driver
ENTRYPOINT ["/cloudstack-csi-driver"]
20 changes: 11 additions & 9 deletions deploy/k8s/controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ spec:
serviceAccountName: cloudstack-csi-controller
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app.kubernetes.io/name"
operator: In
values:
- cloudstack-csi-controller
topologyKey: "kubernetes.io/hostname"
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app.kubernetes.io/name"
operator: In
values:
- cloudstack-csi-controller
topologyKey: "kubernetes.io/hostname"
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
Expand All @@ -59,7 +61,7 @@ spec:

containers:
- name: cloudstack-csi-controller
image: cloudstack-csi-driver
image: ghcr.io/shapeblue/cloudstack-csi-driver:master
imagePullPolicy: Always
args:
- "controller"
Expand Down
8 changes: 7 additions & 1 deletion deploy/k8s/node-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:

containers:
- name: cloudstack-csi-node
image: cloudstack-csi-driver
image: ghcr.io/shapeblue/cloudstack-csi-driver:master
imagePullPolicy: IfNotPresent
args:
- "node"
Expand Down Expand Up @@ -64,6 +64,8 @@ spec:
mountPath: /dev
- name: cloud-init-dir
mountPath: /run/cloud-init/
- name: sys-dir
mountPath: /sys
# Comment the above 2 lines and uncomment the next 2 lines for Ignition support
# - name: ignition-dir
# mountPath: /run/metadata
Expand Down Expand Up @@ -177,6 +179,10 @@ spec:
hostPath:
path: /run/cloud-init/
type: Directory
- name: sys-dir
hostPath:
path: /sys
type: Directory
# Comment the above 4 lines and uncomment the next 4 lines for Ignition support
# - name: ignition-dir
# hostPath:
Expand Down
166 changes: 163 additions & 3 deletions pkg/mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,25 @@ func (m *mounter) GetBlockSizeBytes(devicePath string) (int64, error) {

func (m *mounter) GetDevicePath(ctx context.Context, volumeID string) (string, error) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.1,
Steps: 15,
Duration: 2 * time.Second,
Factor: 1.5,
Steps: 20,
}

var devicePath string
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(context.Context) (bool, error) {
path, err := m.getDevicePathBySerialID(volumeID)
fmt.Println("path", path)
if err != nil {
fmt.Println("err", err)
return false, err
}
if path != "" {
devicePath = path

return true, nil
}
fmt.Println("probeVolume")
m.probeVolume(ctx)

return false, nil
Expand All @@ -111,22 +114,131 @@ func (m *mounter) GetDevicePath(ctx context.Context, volumeID string) (string, e
}

func (m *mounter) getDevicePathBySerialID(volumeID string) (string, error) {
// First try XenServer device paths
xenDevicePath, err := m.getDevicePathForXenServer(volumeID)
if err != nil {
fmt.Printf("Failed to get VMware device path: %v\n", err)
}
if xenDevicePath != "" {
return xenDevicePath, nil
}

// Try VMware device paths
vmwareDevicePath, err := m.getDevicePathForVMware(volumeID)
if err != nil {
fmt.Printf("Failed to get VMware device path: %v\n", err)
}
if vmwareDevicePath != "" {
return vmwareDevicePath, nil
}
// Fall back to standard device paths (for KVM)
sourcePathPrefixes := []string{"virtio-", "scsi-", "scsi-0QEMU_QEMU_HARDDISK_"}
serial := diskUUIDToSerial(volumeID)
fmt.Printf("Searching for device with serial: %s\n", serial)
for _, prefix := range sourcePathPrefixes {
source := filepath.Join(diskIDPath, prefix+serial)
fmt.Printf("Checking path: %s\n", source)
_, err := os.Stat(source)
if err == nil {
return source, nil
}
if !os.IsNotExist(err) {
fmt.Printf("Not found: %s\n", err.Error())
return "", err
}
}

return "", nil
}

func (m *mounter) getDevicePathForXenServer(volumeID string) (string, error) {
for i := 'b'; i <= 'z'; i++ {
devicePath := fmt.Sprintf("/dev/xvd%c", i)
fmt.Printf("Checking XenServer device path: %s\n", devicePath)

if _, err := os.Stat(devicePath); err == nil {
isBlock, err := m.IsBlockDevice(devicePath)
if err == nil && isBlock {
if m.verifyXenServerDevice(devicePath, volumeID) {
fmt.Printf("Found and verified XenServer device: %s\n", devicePath)
return devicePath, nil
}
}
}
}
return "", fmt.Errorf("device not found for volume %s", volumeID)
}

func (m *mounter) verifyXenServerDevice(devicePath string, volumeID string) bool {
size, err := m.GetBlockSizeBytes(devicePath)
if err != nil {
fmt.Printf("Failed to get device size: %v\n", err)
return false
}
fmt.Printf("Device size: %d bytes\n", size)

mounted, err := m.isDeviceMounted(devicePath)
if err != nil {
fmt.Printf("Failed to check if device is mounted: %v\n", err)
return false
}
if mounted {
fmt.Printf("Device is already mounted: %s\n", devicePath)
return false
}

props, err := m.getDeviceProperties(devicePath)
if err != nil {
fmt.Printf("Failed to get device properties: %v\n", err)
return false
}
fmt.Printf("Device properties: %v\n", props)

return true
}

func (m *mounter) isDeviceMounted(devicePath string) (bool, error) {
output, err := m.Exec.Command("grep", devicePath, "/proc/mounts").Output()
if err != nil {
if strings.Contains(err.Error(), "exit status 1") {
return false, nil
}
return false, err
}
return len(output) > 0, nil
}

func (m *mounter) isDeviceInUse(devicePath string) (bool, error) {
output, err := m.Exec.Command("lsof", devicePath).Output()
if err != nil {
if strings.Contains(err.Error(), "exit status 1") {
return false, nil
}
return false, err
}
return len(output) > 0, nil
}

func (m *mounter) getDeviceProperties(devicePath string) (map[string]string, error) {
output, err := m.Exec.Command("udevadm", "info", "--query=property", devicePath).Output()
if err != nil {
return nil, err
}

props := make(map[string]string)
for _, line := range strings.Split(string(output), "\n") {
if line == "" {
continue
}
parts := strings.Split(line, "=")
if len(parts) == 2 {
props[parts[0]] = parts[1]
}
}

return props, nil
}

func (m *mounter) probeVolume(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.V(2).Info("Scanning SCSI host")
Expand Down Expand Up @@ -279,3 +391,51 @@ func (m *mounter) Unpublish(path string) error {
func (m *mounter) Unstage(path string) error {
return mount.CleanupMountPoint(path, m, true)
}

func (m *mounter) getDevicePathForVMware(volumeID string) (string, error) {
// Loop through /dev/sdb to /dev/sdz (/dev/sda -> the root disk)
for i := 'b'; i <= 'z'; i++ {
devicePath := fmt.Sprintf("/dev/sd%c", i)
fmt.Printf("Checking VMware device path: %s\n", devicePath)

if _, err := os.Stat(devicePath); err == nil {
isBlock, err := m.IsBlockDevice(devicePath)
if err == nil && isBlock {
// Use the same verification as for XenServer
if m.verifyVMwareDevice(devicePath, volumeID) {
fmt.Printf("Found and verified VMware device: %s\n", devicePath)
return devicePath, nil
}
}
}
}
return "", fmt.Errorf("device not found for volume %s", volumeID)
}

func (m *mounter) verifyVMwareDevice(devicePath string, volumeID string) bool {
size, err := m.GetBlockSizeBytes(devicePath)
if err != nil {
fmt.Printf("Failed to get device size: %v\n", err)
return false
}
fmt.Printf("Device size: %d bytes\n", size)

mounted, err := m.isDeviceMounted(devicePath)
if err != nil {
fmt.Printf("Failed to check if device is mounted: %v\n", err)
return false
}
if mounted {
fmt.Printf("Device is already mounted: %s\n", devicePath)
return false
}

props, err := m.getDeviceProperties(devicePath)
if err != nil {
fmt.Printf("Failed to get device properties: %v\n", err)
return false
}
fmt.Printf("Device properties: %v\n", props)

return true
}
Loading