Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Flag | Environment Variable | Type | Default | Required | Description
---- | -------------------- | ---- | ------- | -------- | -----------
`log-level` | `LOG_LEVEL` | `string` | `info` | no | The level of log detail.
`bind-address` | `BIND_ADDRESS` | `string` | `:9797` | no | The address for binding listener.
`node-taint` | `NODE_TAINT` | `string` | | yes | The startup taint to put on node.
`daemonset-annotation` | `DAEMONSET_ANNOTATION` | `string` | | yes | The annotation of required daemonset.
`node-taint` | `NODE_TAINT` | `string` | | yes | The startup taint to remove from node.
`daemonset-annotation` | `DAEMONSET_ANNOTATION` | `string` | | yes | The annotations of required daemonset, eg. key1:val1,key2:val2,key3.
`daemonset-label` | `DAEMONSET_LABEL` | `string` | | yes | The labels of required daemonset, eg. key1:val1,key2:val2,key3.

3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
type Ops struct {
LogLevel string `long:"log-level" env:"LOG_LEVEL" description:"Log level" default:"info"`
NodeTaint string `long:"node-taint" env:"NODE_TAINT" description:"The node taints that's going to remove."`
DaemonSetAnnotation string `long:"daemonset-annotation" env:"DAEMONSET_ANNOTATION" description:"The annotation of the daemonset to watch"`
DaemonSetAnnotation string `long:"daemonset-annotation" env:"DAEMONSET_ANNOTATION" description:"The annotation of the daemonset to watch, key:value pairs, comma delemited"`
BindAddr string `long:"bind-address" short:"p" env:"BIND_ADDRESS" default:":9797" description:"address for binding metrics listener"`
DaemonSetLabel string `long:"daemonset-label" env:"DAEMONSET_LABEL" description:"The label of the daemonset to watch, key:value pairs, comma delemited"`
}
147 changes: 135 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"nodetaint/config"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -43,8 +44,22 @@ var (
notReadyTaint = &core_v1.Taint{
Effect: core_v1.TaintEffectNoSchedule,
}

annotationsMap = make(map[string]string)
labelsMap = make(map[string]string)
annotationsKeys = make([]string, 0)
labelsKeys = make([]string, 0)
)

func filter(ss []string, testFnc func(string) bool) string {
for _, str := range ss {
if testFnc(str) {
return str
}
}
return ""
}

func setupLogging(logLevel string) {
// Use log level
level, err := logrus.ParseLevel(logLevel)
Expand Down Expand Up @@ -73,7 +88,8 @@ func getClientset() (*kubernetes.Clientset, error) {
return clientset, nil
}

func checkDSStatus(node *core_v1.Node, opts config.Ops) (bool, error) {
func checkDSStatus(node *core_v1.Node) (bool, error) {

isHandling.Lock()
defer isHandling.Unlock()

Expand All @@ -88,9 +104,37 @@ func checkDSStatus(node *core_v1.Node, opts config.Ops) (bool, error) {
readyPods := make(map[string]int)
for _, obj := range (*podStore[node.Name]).List() {
if pod, ok := obj.(*core_v1.Pod); ok {
// only check the pods with specified annotation
if _, ok := pod.Annotations[opts.DaemonSetAnnotation]; !ok {
continue
// only check the pods with specified annotation or label
if filter(annotationsKeys, func(s string) bool {
_, ok := pod.Annotations[s]
return ok
}) == "" {
if filter(labelsKeys, func(s string) bool {
_, ok := pod.Labels[s]
return ok
}) == "" {
continue
}
}

if annot := filter(annotationsKeys, func(s string) bool {
_, ok := pod.Annotations[s]
return ok
}); annot != "" {
if annotationsMap[annot] != "" && annotationsMap[annot] != pod.Annotations[annot] {
continue
}
logrus.Debugf("Checking annotated pod name: %s annotation: %s on node: %s", pod.Name, annot, node.Name)
}

if lbl := filter(labelsKeys, func(s string) bool {
_, ok := pod.Labels[s]
return ok
}); lbl != "" {
if labelsMap[lbl] != "" && labelsMap[lbl] != pod.Labels[lbl] {
continue
}
logrus.Debugf("Checking labeled pod name: %s label: %s on node: %s", pod.Name, lbl, node.Name)
}

// only check the ready pods
Expand All @@ -111,6 +155,7 @@ func checkDSStatus(node *core_v1.Node, opts config.Ops) (bool, error) {
dsName := pod.OwnerReferences[0].Name
if _, ok := dsList[dsName]; ok {
readyPods[dsName] = 1
logrus.Debugf("Ready Pod name: %s", pod.Name)
}
}
}
Expand Down Expand Up @@ -159,12 +204,12 @@ func RemoveTaintOffNode(ctx context.Context, c *kubernetes.Clientset, node *core
if !updated {
return nil
}
return PatchNodeTaints(c, node.Name, oldNode, newNode, ctx)
return PatchNodeTaints(ctx, c, node.Name, oldNode, newNode)
})
}

// PatchNodeTaints patches node's taints.
func PatchNodeTaints(c *kubernetes.Clientset, nodeName string, oldNode *core_v1.Node, newNode *core_v1.Node, ctx context.Context) error {
func PatchNodeTaints(ctx context.Context, c *kubernetes.Clientset, nodeName string, oldNode *core_v1.Node, newNode *core_v1.Node) error {
oldData, err := json.Marshal(oldNode)
if err != nil {
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
Expand Down Expand Up @@ -221,6 +266,31 @@ func main() {

notReadyTaint.Key = opts.NodeTaint

annotationsList := strings.Split(opts.DaemonSetAnnotation, ",")
labelsList := strings.Split(opts.DaemonSetLabel, ",")
for _, annot := range annotationsList {
kv := strings.Split(annot, ":")
annotationsMap[strings.TrimSpace(kv[0])] = ""
if len(kv) == 2 {
annotationsMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
}
for _, lbl := range labelsList {
kv := strings.Split(lbl, ":")
labelsMap[strings.TrimSpace(kv[0])] = ""
if len(kv) == 2 {
labelsMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
}

for key := range annotationsMap {
annotationsKeys = append(annotationsKeys, key)
}

for key := range labelsMap {
labelsKeys = append(labelsKeys, key)
}

setupLogging(opts.LogLevel)

clientSet, err := getClientset()
Expand All @@ -241,9 +311,37 @@ func main() {
isHandling.Lock()
defer isHandling.Unlock()

if _, ok := ds.Spec.Template.Annotations[opts.DaemonSetAnnotation]; !ok {
delete(dsList, ds.Name)
return
if filter(annotationsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Annotations[s]
return ok
}) == "" {
if filter(labelsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Labels[s]
return ok
}) == "" {
delete(dsList, ds.Name)
return
}
}

if annot := filter(annotationsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Annotations[s]
return ok
}); annot != "" {
if annotationsMap[annot] != "" && annotationsMap[annot] != ds.Spec.Template.Annotations[annot] {
delete(dsList, ds.Name)
return
}
}

if lbl := filter(labelsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Labels[s]
return ok
}); lbl != "" {
if labelsMap[lbl] != "" && labelsMap[lbl] != ds.Spec.Template.Labels[lbl] {
delete(dsList, ds.Name)
return
}
}

switch ops {
Expand All @@ -255,6 +353,9 @@ func main() {
delete(dsList, ds.Name)
}
logrus.Debugf("Number of required daemonsets is %v", len(dsList))
for dsName := range dsList {
logrus.Debugf("Required daemonset name: %s", dsName)
}
}

podHandler := func(pod *core_v1.Pod, node *core_v1.Node, podStopChan chan struct{}) {
Expand All @@ -278,7 +379,7 @@ func main() {
}

// check if all daemonset pods are running
finished, err := checkDSStatus(node, *opts)
finished, err := checkDSStatus(node)
if err != nil {
logrus.Errorf("Failed to sync daemonsets: %v", err)
return
Expand Down Expand Up @@ -337,12 +438,34 @@ func main() {

for _, obj := range c.dsIndexer.List() {
if ds, ok := obj.(*v1.DaemonSet); ok {
if _, ok := ds.Spec.Template.Annotations[opts.DaemonSetAnnotation]; ok {
dsList[ds.Name] = ds
if annot := filter(annotationsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Annotations[s]
return ok
}); annot != "" {
if annotationsMap[annot] == "" {
dsList[ds.Name] = ds
}
if annotationsMap[annot] != "" && annotationsMap[annot] == ds.Spec.Template.Annotations[annot] {
dsList[ds.Name] = ds
}
}
if lbl := filter(labelsKeys, func(s string) bool {
_, ok := ds.Spec.Template.Labels[s]
return ok
}); lbl != "" {
if labelsMap[lbl] == "" {
dsList[ds.Name] = ds
}
if labelsMap[lbl] != "" && labelsMap[lbl] == ds.Spec.Template.Labels[lbl] {
dsList[ds.Name] = ds
}
}
}
}
logrus.Infof("Number of required daemonsets is %v", len(dsList))
for dsName := range dsList {
logrus.Debugf("Required daemonset name: %s", dsName)
}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK\n")
Expand Down