Skip to content

Commit

Permalink
chore(cspi conditions): add cspi condition to represent the pool oper…
Browse files Browse the repository at this point in the history
…ations (#31)

This PR adds the conditions to represent pool operations in status part
- Failure in Pool Import is represent in PoolLost condition
- PoolExpansion process is represented in PoolExpansion condition
- DiskReplacement process is represented in disk replacement process
Signed-off-by: mittachaitu <[email protected]>
  • Loading branch information
sai chaithanya authored Apr 29, 2020
1 parent 3d04441 commit e675b34
Show file tree
Hide file tree
Showing 12 changed files with 654 additions and 94 deletions.
60 changes: 31 additions & 29 deletions examples/cspc/cspi.yaml
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
apiVersion: cstor.openebs.io/v1
kind: CStorPoolInstance
metadata:
name: cspi-stripe
namespace: openebs
spec:
pools:
- nodeSelector:
kubernetes.io/hostname: "gke-cstor-demo-default-pool-3385ab41-5swq"
dataRaidGroups:
- blockDevices:
- blockDeviceName: "sparse-1e3a8da94af49e16d937d867777699b0"
poolConfig:
defaultRaidGroupType: "stripe"

- nodeSelector:
kubernetes.io/hostname: "gke-cstor-demo-default-pool-3385ab41-j90d"
dataRaidGroups:
- blockDevices:
- blockDeviceName: "sparse-8935fde3557f1d04dd8c01a635f3c51f"
poolConfig:
defaultRaidGroupType: "stripe"

- nodeSelector:
kubernetes.io/hostname: "gke-cstor-demo-default-pool-3385ab41-sr33"
dataRaidGroups:
- blockDevices:
- blockDeviceName: "sparse-a0f9a34f5d9133078b4a6b7f341133ea"
poolConfig:
defaultRaidGroupType: "stripe"
kind: CStorPoolInstance
metadata:
labels:
kubernetes.io/hostname: 127.0.0.1
openebs.io/cas-type: cstor
openebs.io/cstor-pool-cluster: cstor-sparse-pool
openebs.io/version: 1.9.0
name: cstor-mirror-pool
namespace: openebs
spec:
dataRaidGroups:
- blockDevices:
- blockDeviceName: sparse-5a92ced3e2ee21eac7b930f670b5eab5
- blockDeviceName: sparse-37a7de580322f43a13338bf2467343f5
- blockDevices:
- blockDeviceName: sparse-72971f3b2e173c1b79db9a43e4cb841f
- blockDeviceName: sparse-a205e38ff5ec89c223654fdf1361f182
hostName: 127.0.0.1
nodeSelector:
kubernetes.io/hostname: 127.0.0.1
poolConfig:
auxResources: null
compression: ""
dataRaidGroupType: mirror
priorityClassName: ""
resources: null
roThresholdLimit: null
thickProvision: false
tolerations: null
writeCacheGroupType: ""
writeCacheRaidGroups: null
30 changes: 0 additions & 30 deletions examples/cspc/cspi1.yaml

This file was deleted.

10 changes: 9 additions & 1 deletion pkg/controllers/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package common

import (
"reflect"
"strings"
"sync"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/openebs/api/pkg/util"
"github.com/openebs/cstor-operators/pkg/pool"
"github.com/openebs/cstor-operators/pkg/volumereplica"
zcmd "github.com/openebs/cstor-operators/pkg/zcmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)
Expand Down Expand Up @@ -245,7 +247,13 @@ func CheckIfPresent(arrStr []string, searchStr string) bool {
// volumereplica can be created only if pool is present.
func CheckForCStorPool() {
for {
poolname, err := pool.GetPoolName()
ret, err := zcmd.NewPoolGetProperty().
WithScriptedMode(true).
WithField("value").
WithProperty("name").
WithPool("name").
Execute()
poolname := strings.Split(string(ret), "\n")
if reflect.DeepEqual(poolname, []string{}) {
klog.Warningf("CStorPool not found. Retrying after %v, err: %v", PoolNameHandlerInterval, err)
time.Sleep(PoolNameHandlerInterval)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/cspc-controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"github.com/openebs/cstor-operators/pkg/controllers/cspc-controller/util"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/klog"
"time"

"k8s.io/klog"
)

func (c *Controller) UpdateStatusEventually(cspc *cstor.CStorPoolCluster) error {
Expand Down
53 changes: 49 additions & 4 deletions pkg/controllers/cspi-controller/handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The OpenEBS Authors.
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import (
"github.com/openebs/api/pkg/apis/types"
"github.com/openebs/api/pkg/util"
"github.com/openebs/cstor-operators/pkg/controllers/common"
cspiutil "github.com/openebs/cstor-operators/pkg/controllers/cspi-controller/util"
zpool "github.com/openebs/cstor-operators/pkg/pool/operations"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -137,6 +138,15 @@ func (c *CStorPoolInstanceController) reconcile(key string) error {

}
common.SyncResources.Mux.Unlock()
// This case is possible incase of ephemeral disks
if !cspi.IsEmptyStatus() && !cspi.IsPendingStatus() {
// Set Pool Lost condition to true
condition := cspiutil.NewCSPICondition(
cstor.CSPIPoolLost,
corev1.ConditionTrue,
"PoolLost", "failed to import"+zpool.PoolName()+"pool")
cspi, _ = c.UpdateStatusConditionEventually(cspi, *condition)
}
return nil
}

Expand Down Expand Up @@ -189,11 +199,11 @@ func (c *CStorPoolInstanceController) update(cspi *cstor.CStorPoolInstance) (*cs
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset).
WithRecorder(c.recorder)
cspi, err := oc.Update(cspi)
ncspi, err := oc.Update(cspi)
if err != nil {
return cspi, errors.Errorf("Failed to update pool due to %s", err.Error())
return ncspi, errors.Errorf("Failed to update pool due to %s", err.Error())
}
return c.updateStatus(cspi)
return c.updateStatus(ncspi)
}

func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance) (*cstor.CStorPoolInstance, error) {
Expand Down Expand Up @@ -222,6 +232,10 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance
return cspi, errors.Errorf("Failed to sync due to %s", err.Error())
}
c.updateROMode(&status, *cspi)
// addDiskUnavailableCondition will add DiskUnavailable condition on cspi status
c.addDiskUnavailableCondition(cspi)
// Point to existing conditions
status.Conditions = cspi.Status.Conditions

if IsStatusChange(cspi.Status, status) {
cspi.Status = status
Expand All @@ -230,6 +244,7 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance
CStorPoolInstances(cspi.Namespace).
Update(cspi)
if err != nil {
klog.Errorf("Error %v", err)
return cspi, errors.Errorf("Failed to updateStatus due to '%s'", err.Error())
}
return cspiGot, nil
Expand Down Expand Up @@ -368,3 +383,33 @@ func (c *CStorPoolInstanceController) sync(cspi *cstor.CStorPoolInstance) {
fmt.Sprintf("Failed to set compression %s to the pool %s : %s", compressionType, poolName, err.Error()))
}
}

func (c *CStorPoolInstanceController) addDiskUnavailableCondition(cspi *cstor.CStorPoolInstance) {
diskUnavailableCondition := cspiutil.GetCSPICondition(cspi.Status, cstor.CSPIDiskUnavailable)
oc := zpool.NewOperationsConfig().
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset).
WithRecorder(c.recorder)
unAvailableDisks, err := oc.GetUnavailableDiskList(cspi)
if err != nil {
klog.Errorf("failed to get unavailable disks error: %v", err)
return
}
if len(unAvailableDisks) > 0 {
newCondition := cspiutil.NewCSPICondition(
cstor.CSPIDiskUnavailable,
corev1.ConditionTrue,
"DisksAreUnavailable",
fmt.Sprintf("Following disks %v are unavailable/faulted", unAvailableDisks))
cspiutil.SetCSPICondition(&cspi.Status, *newCondition)
} else {
if diskUnavailableCondition != nil {
newCondition := cspiutil.NewCSPICondition(
cstor.CSPIDiskUnavailable,
corev1.ConditionFalse,
"DisksAreAvailable",
"")
cspiutil.SetCSPICondition(&cspi.Status, *newCondition)
}
}
}
86 changes: 86 additions & 0 deletions pkg/controllers/cspi-controller/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2020 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cspicontroller

import (
"time"

cstor "github.com/openebs/api/pkg/apis/cstor/v1"
cspiutil "github.com/openebs/cstor-operators/pkg/controllers/cspi-controller/util"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)

//TODO: Update the code to use patch instead of Update call

// UpdateStatusConditionEventually updates the CSPI in etcd with provided
// condition. Below function retries for three times to update the CSPI with
// provided conditions
func (c *CStorPoolInstanceController) UpdateStatusConditionEventually(
cspi *cstor.CStorPoolInstance,
condition cstor.CStorPoolInstanceCondition) (*cstor.CStorPoolInstance, error) {
maxRetry := 3
cspiCopy := cspi.DeepCopy()
updatedCSPI, err := c.UpdateStatusCondition(cspiCopy, condition)
if err != nil {
klog.Errorf(
"failed to update CSPI %s status with condition %s will retry %d times at 2s interval: {%s}",
cspi.Name, condition.Type, maxRetry, err.Error())

for maxRetry > 0 {
newCSPI, err := c.clientset.
CstorV1().
CStorPoolInstances(cspi.Namespace).
Get(cspi.Name, metav1.GetOptions{})
if err != nil {
// This is possible due to etcd unavailability so do not retry more here
return cspi, errors.Wrapf(err, "failed to update cspi status")
}
updatedCSPI, err = c.UpdateStatusCondition(newCSPI, condition)
if err != nil {
maxRetry = maxRetry - 1
klog.Errorf(
"failed to update CSPI %s status with condition %s will retry %d times at 2s interval: {%s}",
cspi.Name, condition.Type, maxRetry, err.Error())
time.Sleep(2 * time.Second)
continue
}
return updatedCSPI, nil
}
// When retries are completed and still failed to update in etcd
// then it will return original object
return cspi, err
}
return updatedCSPI, nil
}

func (c *CStorPoolInstanceController) UpdateStatusCondition(
cspi *cstor.CStorPoolInstance,
condition cstor.CStorPoolInstanceCondition) (*cstor.CStorPoolInstance, error) {
cspiutil.SetCSPICondition(&cspi.Status, condition)
updatedCSPI, err := c.clientset.
CstorV1().
CStorPoolInstances(cspi.Namespace).
Update(cspi)
if err != nil {
// cspi object has already updated with the conditions so returning
// same object may or maynot make sense
return nil, errors.Wrapf(err, "failed to update cspi conditions")
}
return updatedCSPI, nil
}
Loading

0 comments on commit e675b34

Please sign in to comment.