Skip to content

Commit

Permalink
CreateSnapshot and DeleteSnapshot impl on Supervisor (#1873)
Browse files Browse the repository at this point in the history
Signed-off-by: Deepak Kinni <[email protected]>
  • Loading branch information
Deepak Kinni authored Jul 22, 2022
1 parent be062d9 commit 4d49466
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pkg/common/unittestcommon/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,9 @@ func (c *FakeK8SOrchestrator) GetAllVolumes() []string {
func (c *FakeK8SOrchestrator) GetAllK8sVolumes() []string {
return nil
}

// AnnotateVolumeSnapshot annotates the volumesnapshot CR in k8s cluster
func (c *FakeK8SOrchestrator) AnnotateVolumeSnapshot(ctx context.Context, volumeSnapshotName string,
volumeSnapshotNamespace string, annotations map[string]string) (bool, error) {
return true, nil
}
3 changes: 3 additions & 0 deletions pkg/csi/service/common/commonco/coagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type COCommonInterface interface {
GetAllVolumes() []string
// GetAllK8sVolumes returns list of volumes in a bound state, in the K8s cluster
GetAllK8sVolumes() []string
// AnnotateVolumeSnapshot annotates the volumesnapshot CR in k8s cluster with the snapshot-id and fcd-id
AnnotateVolumeSnapshot(ctx context.Context, volumeSnapshotName string,
volumeSnapshotNamespace string, annotations map[string]string) (bool, error)
}

// GetContainerOrchestratorInterface returns orchestrator object for a given
Expand Down
22 changes: 20 additions & 2 deletions pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync/atomic"
"time"

snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"

cnstypes "github.com/vmware/govmomi/cns/types"
pbmtypes "github.com/vmware/govmomi/pbm/types"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -199,6 +201,7 @@ type K8sOrchestrator struct {
volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled
volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled
k8sClient clientset.Interface
snapshotterClient snapshotterClientSet.Interface
}

// K8sGuestInitParams lists the set of parameters required to run the init for
Expand Down Expand Up @@ -229,8 +232,9 @@ type K8sVanillaInitParams struct {
func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor,
params interface{}) (*K8sOrchestrator, error) {
var (
coInstanceErr error
k8sClient clientset.Interface
coInstanceErr error
k8sClient clientset.Interface
snapshotterClient snapshotterClientSet.Interface
)
if atomic.LoadUint32(&k8sOrchestratorInstanceInitialized) == 0 {
k8sOrchestratorInitMutex.Lock()
Expand All @@ -246,9 +250,17 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn
return nil, coInstanceErr
}

// Create a snapshotter client
snapshotterClient, coInstanceErr = k8s.NewSnapshotterClient(ctx)
if coInstanceErr != nil {
log.Errorf("Creating Snapshotter client failed. Err: %v", coInstanceErr)
return nil, coInstanceErr
}

k8sOrchestratorInstance = &K8sOrchestrator{}
k8sOrchestratorInstance.clusterFlavor = controllerClusterFlavor
k8sOrchestratorInstance.k8sClient = k8sClient
k8sOrchestratorInstance.snapshotterClient = snapshotterClient
k8sOrchestratorInstance.informerManager = k8s.NewInformer(k8sClient)
coInstanceErr = initFSS(ctx, k8sClient, controllerClusterFlavor, params)
if coInstanceErr != nil {
Expand Down Expand Up @@ -1376,3 +1388,9 @@ func (c *K8sOrchestrator) GetAllVolumes() []string {
}
return volumeIDs
}

// AnnotateVolumeSnapshot annotates the volumesnapshot CR in k8s cluster
func (c *K8sOrchestrator) AnnotateVolumeSnapshot(ctx context.Context, volumeSnapshotName string,
volumeSnapshotNamespace string, annotations map[string]string) (bool, error) {
return c.updateVolumeSnapshotAnnotations(ctx, volumeSnapshotName, volumeSnapshotNamespace, annotations)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package k8sorchestrator

import (
"context"
"encoding/json"
"strings"
"time"

k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -128,3 +133,60 @@ func isValidMigratedvSphereVolume(ctx context.Context, pvMetadata metav1.ObjectM
}
return false
}

// updateVolumeSnapshotAnnotations updates annotations passed as key-value pairs
// on VolumeSnapshot object
func (c *K8sOrchestrator) updateVolumeSnapshotAnnotations(ctx context.Context,
volumeSnapshotName string, volumeSnapshotNamespace string,
volumeSnapshotAnnotations map[string]string) (bool, error) {
log := logger.GetLogger(ctx)
retryCount := 0
interval := time.Second
limit := 5 * time.Minute
// TODO: make this configurable
// Attempt to update the annotation every second for 5minutes
annotateUpdateErr := wait.PollImmediate(interval, limit, func() (bool, error) {
retryCount++
// Retrieve the volume snapshot and verify that it exists
volumeSnapshot, err := c.snapshotterClient.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).
Get(ctx, volumeSnapshotName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.Errorf("attempt: %d, the volumesnapshot %s/%s requested to be annotated with %+v is not found",
retryCount, volumeSnapshotNamespace, volumeSnapshotName, volumeSnapshotAnnotations)
}
log.Errorf("attempt: %d, failed to annotate the volumesnapshot %s/%s due to error: %+v",
retryCount, volumeSnapshotNamespace, volumeSnapshotName, err)
return false, nil
}
patchAnnotation := common.MergeMaps(volumeSnapshot.Annotations, volumeSnapshotAnnotations)

patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": patchAnnotation,
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Errorf("attempt: %d, fail to marshal patch: %+v", retryCount, err)
return false, nil
}
patchedVolumeSnapshot, err := c.snapshotterClient.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).
Patch(ctx, volumeSnapshotName, k8stypes.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("attempt: %d, failed to patch the volumesnapshot %s/%s with annotation %+v, error: %+v",
retryCount, volumeSnapshotNamespace, volumeSnapshotName, volumeSnapshotAnnotations, err)
return false, nil
}
log.Infof("attempt: %d, Successfully patched volumesnapshot %s/%s with latest annotations %+v",
retryCount, patchedVolumeSnapshot.Namespace, patchedVolumeSnapshot.Name,
patchedVolumeSnapshot.Annotations)
return true, nil
})
if annotateUpdateErr != nil {
log.Errorf("failed to patch the volumesnapshot %s/%s with annotation %+v, error: %+v",
volumeSnapshotNamespace, volumeSnapshotName, volumeSnapshotAnnotations, annotateUpdateErr)
return false, annotateUpdateErr
}
return true, nil
}
12 changes: 12 additions & 0 deletions pkg/csi/service/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ const (
// PreferredDatastoresCategory points to the vSphere Category
// created to tag preferred datastores in a topology-aware environment.
PreferredDatastoresCategory = "cns.vmware.topology-preferred-datastores"

// VolumeSnapshotNameKey represents the volumesnapshot CR name within
// the request parameters
VolumeSnapshotNameKey = "csi.storage.k8s.io/volumesnapshot/name"

// VolumeSnapshotNamespaceKey represents the volumesnapshot CR namespace within
// the request parameters
VolumeSnapshotNamespaceKey = "csi.storage.k8s.io/volumesnapshot/namespace"

// VolumeSnapshotInfoKey represents the annotation key of the fcd-id + snapshot-id
// on the VolumeSnapshot CR
VolumeSnapshotInfoKey = "csi.vsphere.volume/snapshot"
)

// Supported container orchestrators.
Expand Down
13 changes: 13 additions & 0 deletions pkg/csi/service/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,16 @@ func GetClusterComputeResourceMoIds(ctx context.Context) ([]string, error) {
}
return clusterComputeResourceMoIds, nil
}

// MergeMaps merges two maps to create a new one, the key-value pair from first
// are replaced with key-value pair of second
func MergeMaps(first map[string]string, second map[string]string) map[string]string {
merged := make(map[string]string)
for key, val := range first {
merged[key] = val
}
for key, val := range second {
merged[key] = val
}
return merged
}
164 changes: 161 additions & 3 deletions pkg/csi/service/wcp/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"strings"
"time"

"github.com/vmware/govmomi/vim25/types"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/davecgh/go-spew/spew"
"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -1309,8 +1312,136 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot

ctx = logger.NewContextWithLogger(ctx)
log := logger.GetLogger(ctx)
log.Infof("CreateSnapshot: called with args %+v", *req)
return nil, status.Error(codes.Unimplemented, "")
log.Infof("WCP CreateSnapshot: called with args %+v", *req)
isBlockVolumeSnapshotWCPEnabled := commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.BlockVolumeSnapshot)
if !isBlockVolumeSnapshotWCPEnabled {
return nil, logger.LogNewErrorCode(log, codes.Unimplemented, "createSnapshot")
}
volumeType := prometheus.PrometheusUnknownVolumeType
createSnapshotInternal := func() (*csi.CreateSnapshotResponse, error) {
// Validate CreateSnapshotRequest
if err := validateWCPCreateSnapshotRequest(ctx, req); err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal,
"validation for CreateSnapshot Request: %+v has failed. Error: %v", *req, err)
}
volumeID := req.GetSourceVolumeId()

// Check if the source volume is migrated vSphere volume
if strings.Contains(volumeID, ".vmdk") {
return nil, logger.LogNewErrorCodef(log, codes.Unimplemented,
"cannot snapshot migrated vSphere volume. :%q", volumeID)
}
volumeType = prometheus.PrometheusBlockVolumeType
// Query capacity in MB and datastore url for block volume snapshot
volumeIds := []cnstypes.CnsVolumeId{{Id: volumeID}}
cnsVolumeDetailsMap, err := utils.QueryVolumeDetailsUtil(ctx, c.manager.VolumeManager, volumeIds)
if err != nil {
return nil, err
}
if _, ok := cnsVolumeDetailsMap[volumeID]; !ok {
return nil, logger.LogNewErrorCodef(log, codes.Internal,
"cns query volume did not return the volume: %s", volumeID)
}
snapshotSizeInMB := cnsVolumeDetailsMap[volumeID].SizeInMB
datastoreUrl := cnsVolumeDetailsMap[volumeID].DatastoreUrl
if cnsVolumeDetailsMap[volumeID].VolumeType != common.BlockVolumeType {
return nil, logger.LogNewErrorCodef(log, codes.FailedPrecondition,
"queried volume doesn't have the expected volume type. Expected VolumeType: %v. "+
"Queried VolumeType: %v", volumeType, cnsVolumeDetailsMap[volumeID].VolumeType)
}
// Check if snapshots number of this volume reaches the granular limit on VSAN/VVOL
maxSnapshotsPerBlockVolume := c.manager.CnsConfig.Snapshot.GlobalMaxSnapshotsPerBlockVolume
log.Infof("The limit of the maximum number of snapshots per block volume is "+
"set to the global maximum (%v) by default.", maxSnapshotsPerBlockVolume)
if c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVSAN > 0 ||
c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVVOL > 0 {

var isGranularMaxEnabled bool
if strings.Contains(datastoreUrl, strings.ToLower(string(types.HostFileSystemVolumeFileSystemTypeVsan))) {
if c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVSAN > 0 {
maxSnapshotsPerBlockVolume = c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVSAN
isGranularMaxEnabled = true

}
} else if strings.Contains(datastoreUrl, strings.ToLower(string(types.HostFileSystemVolumeFileSystemTypeVVOL))) {
if c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVVOL > 0 {
maxSnapshotsPerBlockVolume = c.manager.CnsConfig.Snapshot.GranularMaxSnapshotsPerBlockVolumeInVVOL
isGranularMaxEnabled = true
}
}

if isGranularMaxEnabled {
log.Infof("The limit of the maximum number of snapshots per block volume on datastore %q is "+
"overridden by the granular maximum (%v).", datastoreUrl, maxSnapshotsPerBlockVolume)
}
}

// Check if snapshots number of this volume reaches the limit
snapshotList, _, err := common.QueryVolumeSnapshotsByVolumeID(ctx, c.manager.VolumeManager, volumeID,
common.QuerySnapshotLimit)
if err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal,
"failed to query snapshots of volume %s for the limit check. Error: %v", volumeID, err)
}

if len(snapshotList) >= maxSnapshotsPerBlockVolume {
return nil, logger.LogNewErrorCodef(log, codes.FailedPrecondition,
"the number of snapshots on the source volume %s reaches the configured maximum (%v)",
volumeID, c.manager.CnsConfig.Snapshot.GlobalMaxSnapshotsPerBlockVolume)
}

// the returned snapshotID below is a combination of CNS VolumeID and CNS SnapshotID concatenated by the "+"
// sign. That is, a string of "<UUID>+<UUID>". Because, all other CNS snapshot APIs still require both
// VolumeID and SnapshotID as the input, while corresponding snapshot APIs in upstream CSI require SnapshotID.
// So, we need to bridge the gap in vSphere CSI driver and return a combined SnapshotID to CSI Snapshotter.
snapshotID, snapshotCreateTimePtr, err := common.CreateSnapshotUtil(ctx, c.manager, volumeID, req.Name)
if err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal,
"failed to create snapshot on volume %q: %v", volumeID, err)
}
snapshotCreateTimeInProto := timestamppb.New(*snapshotCreateTimePtr)

createSnapshotResponse := &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: snapshotSizeInMB * common.MbInBytes,
SnapshotId: snapshotID,
SourceVolumeId: volumeID,
CreationTime: snapshotCreateTimeInProto,
ReadyToUse: true,
},
}

log.Infof("CreateSnapshot succeeded for snapshot %s "+
"on volume %s size %d Time proto %+v Timestamp %+v Response: %+v",
snapshotID, volumeID, snapshotSizeInMB*common.MbInBytes, snapshotCreateTimeInProto,
*snapshotCreateTimePtr, createSnapshotResponse)

volumeSnapshotName := req.Parameters[common.VolumeSnapshotNameKey]
volumeSnapshotNamespace := req.Parameters[common.VolumeSnapshotNamespaceKey]

log.Infof("Attempting to annotate volumesnapshot %s/%s with annotation %s:%s",
volumeSnapshotNamespace, volumeSnapshotName, common.VolumeSnapshotInfoKey, snapshotID)
annotated, err := commonco.ContainerOrchestratorUtility.AnnotateVolumeSnapshot(ctx, volumeSnapshotName,
volumeSnapshotNamespace, map[string]string{common.VolumeSnapshotInfoKey: snapshotID})
if err != nil || !annotated {
log.Warnf("The snapshot: %s was created successfully, but failed to annotate volumesnapshot %s/%s"+
"with annotation %s:%s. Error: %v", snapshotID, volumeSnapshotNamespace,
volumeSnapshotName, common.VolumeSnapshotInfoKey, snapshotID, err)
}
return createSnapshotResponse, nil
}

start := time.Now()
resp, err := createSnapshotInternal()
if err != nil {
prometheus.CsiControlOpsHistVec.WithLabelValues(volumeType, prometheus.PrometheusCreateSnapshotOpType,
prometheus.PrometheusFailStatus, "NotComputed").Observe(time.Since(start).Seconds())
} else {
log.Infof("Snapshot for volume %q created successfully.", req.GetSourceVolumeId())
prometheus.CsiControlOpsHistVec.WithLabelValues(volumeType, prometheus.PrometheusCreateSnapshotOpType,
prometheus.PrometheusPassStatus, "").Observe(time.Since(start).Seconds())
}
return resp, err
}

func (c *controller) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (
Expand All @@ -1319,7 +1450,34 @@ func (c *controller) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshot
ctx = logger.NewContextWithLogger(ctx)
log := logger.GetLogger(ctx)
log.Infof("DeleteSnapshot: called with args %+v", *req)
return nil, status.Error(codes.Unimplemented, "")
volumeType := prometheus.PrometheusBlockVolumeType
start := time.Now()
isBlockVolumeSnapshotWCPEnabled := commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.BlockVolumeSnapshot)
if !isBlockVolumeSnapshotWCPEnabled {
return nil, logger.LogNewErrorCode(log, codes.Unimplemented, "deleteSnapshot")
}
deleteSnapshotInternal := func() (*csi.DeleteSnapshotResponse, error) {
csiSnapshotID := req.GetSnapshotId()
err := common.DeleteSnapshotUtil(ctx, c.manager, csiSnapshotID)
if err != nil {
return nil, logger.LogNewErrorCodef(log, codes.Internal,
"Failed to delete WCP snapshot %q. Error: %+v",
csiSnapshotID, err)
}

log.Infof("DeleteSnapshot: successfully deleted snapshot %q", csiSnapshotID)
return &csi.DeleteSnapshotResponse{}, nil
}
resp, err := deleteSnapshotInternal()
if err != nil {
prometheus.CsiControlOpsHistVec.WithLabelValues(volumeType, prometheus.PrometheusDeleteSnapshotOpType,
prometheus.PrometheusFailStatus, "NotComputed").Observe(time.Since(start).Seconds())
} else {
log.Infof("Snapshot %q deleted successfully.", req.SnapshotId)
prometheus.CsiControlOpsHistVec.WithLabelValues(volumeType, prometheus.PrometheusDeleteSnapshotOpType,
prometheus.PrometheusPassStatus, "").Observe(time.Since(start).Seconds())
}
return resp, err
}

func (c *controller) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (
Expand Down
18 changes: 18 additions & 0 deletions pkg/csi/service/wcp/controller_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ func validateWCPControllerExpandVolumeRequest(ctx context.Context, req *csi.Cont
return nil
}

// validateWCPCreateSnapshotRequest is the helper function to
// validate CreateSnapshotRequest for CSI driver.
// Function returns error if validation fails otherwise returns nil.
func validateWCPCreateSnapshotRequest(ctx context.Context, req *csi.CreateSnapshotRequest) error {
log := logger.GetLogger(ctx)
volumeID := req.GetSourceVolumeId()
if len(volumeID) == 0 {
return logger.LogNewErrorCode(log, codes.InvalidArgument,
"CreateSnapshot Source Volume ID must be provided")
}

if len(req.Name) == 0 {
return logger.LogNewErrorCode(log, codes.InvalidArgument,
"Snapshot name must be provided")
}
return nil
}

// getK8sCloudOperatorClientConnection is a helper function that creates a
// clientConnection to k8sCloudOperator GRPC service running on syncer container.
func getK8sCloudOperatorClientConnection(ctx context.Context) (*grpc.ClientConn, error) {
Expand Down
Loading

0 comments on commit 4d49466

Please sign in to comment.