Skip to content

Commit

Permalink
feat(cv): add cstor volume manager/controller using new APIs (#6)
Browse files Browse the repository at this point in the history
* feat(cv): add cstor volume manager/controller using new APIs

Signed-off-by: prateekpandey14 <[email protected]>
  • Loading branch information
prateekpandey14 authored Mar 9, 2020
1 parent 302ed2f commit 1a1ba04
Show file tree
Hide file tree
Showing 113 changed files with 3,873 additions and 14,875 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Compiled binaries and deployment files
/bin/

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
61 changes: 61 additions & 0 deletions cmd/volume-manager/app/commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
"fmt"

"github.com/openebs/api/pkg/util"
"github.com/spf13/cobra"
"k8s.io/klog"
)

var (
cmdName = "cstor-volume-mgmt"
usage = fmt.Sprintf("%s", cmdName)
)

// CmdSnaphotOptions holds the options for snapshot
// create command
type CmdSnaphotOptions struct {
volName string
snapName string
}

// NewCStorVolumeMgmt creates a new CStorVolume CRD watcher and grpc command.
func NewCStorVolumeMgmt() (*cobra.Command, error) {
// Create a new command.
cmd := &cobra.Command{
Use: usage,
Short: "CStor Volume Watcher and GRPC server",
Long: `interfaces between observing the CStorVolume
objects and volume controller creation and GRPC server`,
Run: func(cmd *cobra.Command, args []string) {
util.CheckErr(Run(cmd), util.Fatal)
},
}
cmd.AddCommand(
NewCmdStart(),
)
return cmd, nil
}

// Run is to run cstor-volume-mgmt command without any arguments
func Run(cmd *cobra.Command) error {
klog.Infof("cstor-volume-mgmt for CStorVolume objects and grpc server")
return nil
}
55 changes: 55 additions & 0 deletions cmd/volume-manager/app/commands_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright © 2020 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"strconv"
"testing"

"github.com/spf13/cobra"
)

// TestNewCStorVolumeMgmt is to test cstor-volume-mgmt command.
func TestNewCStorVolumeMgmt(t *testing.T) {
cases := []struct {
use string
}{
{"start"},
}
cmd, err := NewCStorVolumeMgmt()
if err != nil {
t.Errorf("Unable to Instantiate cstor-volume-mgmt")
}
cmds := cmd.Commands()
if len(cmds) != len(cases) {
t.Errorf("ExpectedCommands: %d ActualCommands: '%d'", len(cases), len(cmds))
}
for i, c := range cases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
if c.use != cmds[i].Use {
t.Errorf("ExpectedCommand: '%s' ActualCommand: '%s'", c.use, cmds[i].Use)
}
})
}
}

// TestRun is to test running cstor-volume-mgmt without sub-commands.
func TestRun(t *testing.T) {
var cmd *cobra.Command
err := Run(cmd)
if err != nil {
t.Errorf("Expected: '%s' Actual: '%s'", "nil", err)
}
}
71 changes: 71 additions & 0 deletions cmd/volume-manager/app/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
goflag "flag"
"sync"

mgmtcontroller "github.com/openebs/cstor-operators/pkg/controllers/volume-mgmt"
"github.com/openebs/cstor-operators/pkg/controllers/volume-mgmt/volume"
serverclient "github.com/openebs/cstor-operators/pkg/volume-rpc/client"
targetserver "github.com/openebs/cstor-operators/pkg/volume-rpc/targetserver"
"github.com/spf13/cobra"
)

// CmdStartOptions has flags for starting CStorVolume watcher.
type CmdStartOptions struct {
kubeconfig string
port string
}

// NewCmdStart starts gRPC server and watcher for CStorVolume.
func NewCmdStart() *cobra.Command {
options := CmdStartOptions{}
cmd := &cobra.Command{
Use: "start",
Short: "starts CStorVolume gRPC and watcher",
Long: `The grpc server would be serving snapshot requests whereas
the watcher would be watching for add, updat, delete events`,
Run: func(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
targetserver.StartTargetServer(options.kubeconfig)
wg.Done()
}()
wg.Add(1)
go func() {
serverclient.StartServer(volume.UnixSockVar, options.port)
wg.Done()
}()
wg.Add(1)
go func() {
mgmtcontroller.StartControllers(options.kubeconfig)
wg.Done()
}()
wg.Wait()
},
}
goflag.CommandLine.Parse([]string{})
cmd.Flags().StringVar(&options.kubeconfig, "kubeconfig", "",
`kubeconfig needs to be specified if out of cluster`)
cmd.Flags().StringVarP(&options.port, "port", "p", options.port,
"port on which the server should listen on")

return cmd
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The OpenEBS Authors
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,23 +14,33 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package openebsio
package main

import (
"k8s.io/apimachinery/pkg/runtime"
)
"os"

// GroupName assigned to openebs.io
const (
GroupName = "openebs.io"
"github.com/openebs/cstor-operators/cmd/volume-manager/app"
cstorlogger "github.com/openebs/cstor-operators/pkg/log"
)

// SchemeBuilderAdditions may be used to add all
// resources defined in the project to a Scheme
var SchemeBuilderAdditions runtime.SchemeBuilder
func main() {
if err := run(); err != nil {
os.Exit(1)
}
os.Exit(0)
}

// Run cstor-volume-mgmt
func run() error {
// Init logging
cstorlogger.InitLogs()
defer cstorlogger.FlushLogs()

// Create & execute new command
cmd, err := app.NewCStorVolumeMgmt()
if err != nil {
return err
}

// AddToScheme adds the provided resource
// to Scheme
func AddToScheme(s *runtime.Scheme) error {
return SchemeBuilderAdditions.AddToScheme(s)
return cmd.Execute()
}
139 changes: 139 additions & 0 deletions pkg/controllers/volume-mgmt/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package volumemgmt

import (
"os"

corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

apis "github.com/openebs/api/pkg/apis/cstor/v1"
clientset "github.com/openebs/api/pkg/client/clientset/versioned"
openebsScheme "github.com/openebs/api/pkg/client/clientset/versioned/scheme"
informers "github.com/openebs/api/pkg/client/informers/externalversions"
)

const volumeControllerName = "CStorVolume"

// CStorVolumeController is the controller implementation for CStorVolume resources.
type CStorVolumeController struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface

// clientset is a openebs custom resource package generated for custom API group.
clientset clientset.Interface

// cStorVolumeSynced is used for caches sync to get populated
cStorVolumeSynced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}

// NewCStorVolumeController returns a new instance of CStorVolume controller
func NewCStorVolumeController(
kubeclientset kubernetes.Interface,
clientset clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
cStorInformerFactory informers.SharedInformerFactory) *CStorVolumeController {

// obtain references to shared index informers for the cStorVolume resources
cStorVolumeInformer := cStorInformerFactory.Cstor().V1().CStorVolumes()

openebsScheme.AddToScheme(scheme.Scheme)

// Create event broadcaster to receive events and send them to any EventSink, watcher, or log.
// Add NewCstorVolumeController types to the default Kubernetes Scheme so Events can be
// logged for CstorVolume Controller types.
klog.Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)

// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired. Events("") denotes empty namespace
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: os.Getenv("POD_NAME"), Host: os.Getenv("NODE_NAME")})

controller := &CStorVolumeController{
kubeclientset: kubeclientset,
clientset: clientset,
cStorVolumeSynced: cStorVolumeInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CStorVolume"),
recorder: recorder,
}

klog.Info("Setting up event handlers")

// Instantiating QueueLoad before entering workqueue.
q := QueueLoad{}

// Set up an event handler for when CstorVolume resources change.
cStorVolumeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if !IsValidCStorVolumeMgmt(obj.(*apis.CStorVolume)) {
return
}
q.Operation = QOpAdd
klog.Infof("Add event received for cstorvolume : %s", obj.(*apis.CStorVolume).Name)
controller.enqueueCStorVolume(obj.(*apis.CStorVolume), q)
},
UpdateFunc: func(old, new interface{}) {
newCStorVolume := new.(*apis.CStorVolume)
oldCStorVolume := old.(*apis.CStorVolume)
if !IsValidCStorVolumeMgmt(newCStorVolume) {
return
}
// Periodic resync will send update events for all known CStorVolume.
// Two different versions of the same CStorVolume will always have different RVs.
if newCStorVolume.ResourceVersion == oldCStorVolume.ResourceVersion {
q.Operation = QOpPeriodicSync
klog.V(4).Infof("cStorVolume periodic sync event : %v %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
} else if IsOnlyStatusChange(oldCStorVolume, newCStorVolume) {
klog.V(4).Infof("Only cStorVolume status change: %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
return
} else if IsDestroyEvent(newCStorVolume) {
q.Operation = QOpDestroy
klog.Infof("cStorVolume Destroy event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
} else {
q.Operation = QOpModify
klog.Infof("cStorVolume Modify event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
}
controller.enqueueCStorVolume(newCStorVolume, q)
},
DeleteFunc: func(obj interface{}) {
klog.Infof("Delete event received for cstorvolume : %s", obj.(*apis.CStorVolume).Name)
},
})

return controller
}
Loading

0 comments on commit 1a1ba04

Please sign in to comment.