diff --git a/README.md b/README.md index 628750b7..df12d9c5 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,11 @@ Various types of Kubernetes resources can be Osiris-enabled using an annotation. Osiris-enabled pods are automatically instrumented with a __metrics-collecting proxy__ deployed as a sidecar container. -Osiris-enabled deployments (if _already_ scaled to a configurable minimum number -of replicas-- one by default) automatically have metrics from their pods -continuously scraped and analyzed by the __zeroscaler__ component. When the -aggregated metrics reveal that all of the deployment's pods are idling, the -zeroscaler scales the deployment to zero replicas. +Osiris-enabled deployments or statefulSets (if _already_ scaled to a configurable +minimum number of replicas-- one by default) automatically have metrics from +their pods continuously scraped and analyzed by the __zeroscaler__ component. +When the aggregated metrics reveal that all of the deployment's pods are idling, +the zeroscaler scales the deployment to zero replicas. Under normal circumstances, scaling a deployment to zero replicas poses a problem: any services that select pods from that deployment (and only that @@ -178,14 +178,14 @@ spec: Most of Osiris configuration is done with Kubernetes annotations - as seen in the Usage section. -#### Deployment Annotations +#### Deployment & StatefulSet Annotations -The following table lists the supported annotations for Kubernetes `Deployments` and their default values. +The following table lists the supported annotations for Kubernetes `Deployments` and `StatefulSets`, and their default values. | Annotation | Description | Default | | ---------- | ----------- | ------- | -| `osiris.deislabs.io/enabled` | Enable the zeroscaler component to scrape and analyze metrics from the deployment's pods and scale the deployment to zero when idle. Allowed values: `y`, `yes`, `true`, `on`, `1`. | _no value_ (= disabled) | -| `osiris.deislabs.io/minReplicas` | The minimum number of replicas to set on the deployment when Osiris will scale up. If you set `2`, Osiris will scale the deployment from `0` to `2` replicas directly. Osiris won't collect metrics from deployments which have more than `minReplicas` replicas - to avoid useless collections of metrics. | `1` | +| `osiris.deislabs.io/enabled` | Enable the zeroscaler component to scrape and analyze metrics from the deployment's or statefulSet's pods and scale the deployment/statefulSet to zero when idle. Allowed values: `y`, `yes`, `true`, `on`, `1`. | _no value_ (= disabled) | +| `osiris.deislabs.io/minReplicas` | The minimum number of replicas to set on the deployment/statefulSet when Osiris will scale up. If you set `2`, Osiris will scale the deployment/statefulSet from `0` to `2` replicas directly. Osiris won't collect metrics from deployments/statefulSets which have more than `minReplicas` replicas - to avoid useless collections of metrics. | `1` | | `osiris.deislabs.io/metricsCheckInterval` | The interval in which Osiris would repeatedly track the pod http request metrics. The value is the number of seconds of the interval. Note that this value override the global value defined by the `zeroscaler.metricsCheckInterval` Helm value. | _value of the `zeroscaler.metricsCheckInterval` Helm value_ | #### Pod Annotations @@ -205,6 +205,7 @@ The following table lists the supported annotations for Kubernetes `Services` an | ---------- | ----------- | ------- | | `osiris.deislabs.io/enabled` | Enable this service's endpoints to be managed by the Osiris endpoints controller. Allowed values: `y`, `yes`, `true`, `on`, `1`. | _no value_ (= disabled) | | `osiris.deislabs.io/deployment` | Name of the deployment which is behind this service. This is _required_ to map the service with its deployment. | _no value_ | +| `osiris.deislabs.io/statefulset` | Name of the statefulSet which is behind this service. This is _required_ to map the service with its statefulSet. | _no value_ | | `osiris.deislabs.io/loadBalancerHostname` | Map requests coming from a specific hostname to this service. Note that if you have multiple hostnames, you can set them with different annotations, using `osiris.deislabs.io/loadBalancerHostname-1`, `osiris.deislabs.io/loadBalancerHostname-2`, ... | _no value_ | | `osiris.deislabs.io/ingressHostname` | Map requests coming from a specific hostname to this service. If you use an ingress in front of your service, this is required to create a link between the ingress and the service. Note that if you have multiple hostnames, you can set them with different annotations, using `osiris.deislabs.io/ingressHostname-1`, `osiris.deislabs.io/ingressHostname-2`, ... | _no value_ | | `osiris.deislabs.io/ingressDefaultPort` | Custom service port when the request comes from an ingress. Default behaviour if there are more than 1 port on the service, is to look for a port named `http`, and fallback to the port `80`. Set this if you have multiple ports and using a non-standard port with a non-standard name. | _no value_ | diff --git a/chart/osiris/templates/cluster-role.yaml b/chart/osiris/templates/cluster-role.yaml index c88ed75a..c75cbcae 100644 --- a/chart/osiris/templates/cluster-role.yaml +++ b/chart/osiris/templates/cluster-role.yaml @@ -32,6 +32,7 @@ rules: - apps resources: - deployments + - statefulsets verbs: - get - list diff --git a/pkg/deployments/activator/activating.go b/pkg/deployments/activator/activating.go index 7d2ef320..29128af5 100644 --- a/pkg/deployments/activator/activating.go +++ b/pkg/deployments/activator/activating.go @@ -12,23 +12,23 @@ import ( func (a *activator) activateDeployment( app *app, -) (*deploymentActivation, error) { +) (*appActivation, error) { deploymentsClient := a.kubeClient.AppsV1().Deployments(app.namespace) deployment, err := deploymentsClient.Get( - app.deploymentName, + app.name, metav1.GetOptions{}, ) if err != nil { return nil, err } - da := &deploymentActivation{ + da := &appActivation{ readyAppPodIPs: map[string]struct{}{}, successCh: make(chan struct{}), timeoutCh: make(chan struct{}), } glog.Infof( "Activating deployment %s in namespace %s", - app.deploymentName, + app.name, app.namespace, ) go da.watchForCompletion( @@ -50,7 +50,54 @@ func (a *activator) activateDeployment( }} patchesBytes, _ := json.Marshal(patches) _, err = deploymentsClient.Patch( - app.deploymentName, + app.name, + k8s_types.JSONPatchType, + patchesBytes, + ) + return da, err +} + +func (a *activator) activateStatefulSet( + app *app, +) (*appActivation, error) { + statefulSetsClient := a.kubeClient.AppsV1().StatefulSets(app.namespace) + statefulSet, err := statefulSetsClient.Get( + app.name, + metav1.GetOptions{}, + ) + if err != nil { + return nil, err + } + da := &appActivation{ + readyAppPodIPs: map[string]struct{}{}, + successCh: make(chan struct{}), + timeoutCh: make(chan struct{}), + } + glog.Infof( + "Activating statefulSet %s in namespace %s", + app.name, + app.namespace, + ) + go da.watchForCompletion( + a.kubeClient, + app, + labels.Set(statefulSet.Spec.Selector.MatchLabels).AsSelector(), + ) + if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas > 0 { + // We don't need to do this, as it turns out! Scaling is either already + // in progress-- perhaps initiated by another process-- or may even be + // completed already. Just return dr and allow the caller to move on to + // verifying / waiting for this activation to be complete. + return da, nil + } + patches := []kubernetes.PatchOperation{{ + Op: "replace", + Path: "/spec/replicas", + Value: kubernetes.GetMinReplicas(statefulSet.Annotations, 1), + }} + patchesBytes, _ := json.Marshal(patches) + _, err = statefulSetsClient.Patch( + app.name, k8s_types.JSONPatchType, patchesBytes, ) diff --git a/pkg/deployments/activator/activator.go b/pkg/deployments/activator/activator.go index d6df8def..cc2c7d92 100644 --- a/pkg/deployments/activator/activator.go +++ b/pkg/deployments/activator/activator.go @@ -29,8 +29,8 @@ type activator struct { nodeAddresses map[string]struct{} appsByHost map[string]*app indicesLock sync.RWMutex - deploymentActivations map[string]*deploymentActivation - deploymentActivationsLock sync.Mutex + appActivations map[string]*appActivation + appActivationsLock sync.Mutex dynamicProxyListenAddrStr string dynamicProxy tcp.DynamicProxy httpClient *http.Client @@ -56,7 +56,7 @@ func NewActivator(kubeClient kubernetes.Interface) (Activator, error) { services: map[string]*corev1.Service{}, nodeAddresses: map[string]struct{}{}, appsByHost: map[string]*app{}, - deploymentActivations: map[string]*deploymentActivation{}, + appActivations: map[string]*appActivation{}, httpClient: &http.Client{ Timeout: time.Minute * 1, }, @@ -127,7 +127,7 @@ func (a *activator) syncService(obj interface{}) { a.indicesLock.Lock() defer a.indicesLock.Unlock() svc := obj.(*corev1.Service) - svcKey := getKey(svc.Namespace, svc.Name) + svcKey := getKey(svc.Namespace, "Service", svc.Name) if k8s.ResourceIsOsirisEnabled(svc.Annotations) { a.services[svcKey] = svc } else { @@ -140,7 +140,7 @@ func (a *activator) syncDeletedService(obj interface{}) { a.indicesLock.Lock() defer a.indicesLock.Unlock() svc := obj.(*corev1.Service) - svcKey := getKey(svc.Namespace, svc.Name) + svcKey := getKey(svc.Namespace, "Service", svc.Name) delete(a.services, svcKey) a.updateIndex() } diff --git a/pkg/deployments/activator/app.go b/pkg/deployments/activator/app.go index e26bf48a..16185532 100644 --- a/pkg/deployments/activator/app.go +++ b/pkg/deployments/activator/app.go @@ -1,9 +1,17 @@ package activator +type appKind string + +const ( + appKindDeployment appKind = "Deployment" + appKindStatefulSet appKind = "StatefulSet" +) + type app struct { - namespace string - serviceName string - deploymentName string - targetHost string - targetPort int + namespace string + serviceName string + name string + kind appKind + targetHost string + targetPort int } diff --git a/pkg/deployments/activator/deployment_activation.go b/pkg/deployments/activator/app_activation.go similarity index 65% rename from pkg/deployments/activator/deployment_activation.go rename to pkg/deployments/activator/app_activation.go index edb750c4..68385cc1 100644 --- a/pkg/deployments/activator/deployment_activation.go +++ b/pkg/deployments/activator/app_activation.go @@ -14,7 +14,7 @@ import ( "k8s.io/client-go/tools/cache" ) -type deploymentActivation struct { +type appActivation struct { readyAppPodIPs map[string]struct{} endpoints *corev1.Endpoints lock sync.Mutex @@ -22,14 +22,14 @@ type deploymentActivation struct { timeoutCh chan struct{} } -func (d *deploymentActivation) watchForCompletion( +func (a *appActivation) watchForCompletion( kubeClient kubernetes.Interface, app *app, appPodSelector labels.Selector, ) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Watch the pods managed by this deployment + // Watch the pods managed by this deployment/statefulSet podsInformer := k8s.PodsIndexInformer( kubeClient, app.namespace, @@ -37,11 +37,11 @@ func (d *deploymentActivation) watchForCompletion( appPodSelector, ) podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: d.syncPod, + AddFunc: a.syncPod, UpdateFunc: func(_, newObj interface{}) { - d.syncPod(newObj) + a.syncPod(newObj) }, - DeleteFunc: d.syncPod, + DeleteFunc: a.syncPod, }) // Watch the corresponding endpoints resource for this service endpointsInformer := k8s.EndpointsIndexInformer( @@ -54,9 +54,9 @@ func (d *deploymentActivation) watchForCompletion( nil, ) endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: d.syncEndpoints, + AddFunc: a.syncEndpoints, UpdateFunc: func(_, newObj interface{}) { - d.syncEndpoints(newObj) + a.syncEndpoints(newObj) }, }) go podsInformer.Run(ctx.Done()) @@ -65,23 +65,24 @@ func (d *deploymentActivation) watchForCompletion( defer timer.Stop() for { select { - case <-d.successCh: + case <-a.successCh: return case <-timer.C: glog.Errorf( - "Activation of deployment %s in namespace %s timed out", - app.deploymentName, + "Activation of %s %s in namespace %s timed out", + app.kind, + app.name, app.namespace, ) - close(d.timeoutCh) + close(a.timeoutCh) return } } } -func (d *deploymentActivation) syncPod(obj interface{}) { - d.lock.Lock() - defer d.lock.Unlock() +func (a *appActivation) syncPod(obj interface{}) { + a.lock.Lock() + defer a.lock.Unlock() pod := obj.(*corev1.Pod) var ready bool for _, condition := range pod.Status.Conditions { @@ -94,27 +95,27 @@ func (d *deploymentActivation) syncPod(obj interface{}) { } // Keep track of which pods are ready if ready { - d.readyAppPodIPs[pod.Status.PodIP] = struct{}{} + a.readyAppPodIPs[pod.Status.PodIP] = struct{}{} } else { - delete(d.readyAppPodIPs, pod.Status.PodIP) + delete(a.readyAppPodIPs, pod.Status.PodIP) } - d.checkActivationComplete() + a.checkActivationComplete() } -func (d *deploymentActivation) syncEndpoints(obj interface{}) { - d.lock.Lock() - defer d.lock.Unlock() - d.endpoints = obj.(*corev1.Endpoints) - d.checkActivationComplete() +func (a *appActivation) syncEndpoints(obj interface{}) { + a.lock.Lock() + defer a.lock.Unlock() + a.endpoints = obj.(*corev1.Endpoints) + a.checkActivationComplete() } -func (d *deploymentActivation) checkActivationComplete() { - if d.endpoints != nil { - for _, subset := range d.endpoints.Subsets { +func (a *appActivation) checkActivationComplete() { + if a.endpoints != nil { + for _, subset := range a.endpoints.Subsets { for _, address := range subset.Addresses { - if _, ok := d.readyAppPodIPs[address.IP]; ok { + if _, ok := a.readyAppPodIPs[address.IP]; ok { glog.Infof("App pod with ip %s is in service", address.IP) - close(d.successCh) + close(a.successCh) return } } diff --git a/pkg/deployments/activator/index.go b/pkg/deployments/activator/index.go index 3d22b9d7..73fe283a 100644 --- a/pkg/deployments/activator/index.go +++ b/pkg/deployments/activator/index.go @@ -13,159 +13,175 @@ var ( // updateIndex builds an index that maps all the possible ways a service can be // addressed to application info that encapsulates details like which deployment -// to activate and where to relay requests to after successful activation. The -// new index replaces any old/existing index. +// or statefulSet to activate and where to relay requests to after successful +// activation. The new index replaces any old/existing index. func (a *activator) updateIndex() { appsByHost := map[string]*app{} for _, svc := range a.services { + var ( + name string + kind appKind + ) if deploymentName, ok := svc.Annotations["osiris.deislabs.io/deployment"]; ok { - svcShortDNSName := fmt.Sprintf("%s.%s", svc.Name, svc.Namespace) - svcFullDNSName := fmt.Sprintf("%s.svc.cluster.local", svcShortDNSName) - // Determine the "default" ingress port. When a request arrives at the - // activator via an ingress conroller, the request's host header won't - // indicate a port. After activation is complete, the activator needs to - // forward the request to the service (which is now backed by application - // endpoints). It's important to know which service port to forward the - // request to. - var ingressDefaultPort string - // Start by seeing if a default port was explicitly specified. - if ingressDefaultPort, ok = - svc.Annotations["osiris.deislabs.io/ingressDefaultPort"]; !ok { - // If not specified, try to infer it. - // If there's only one port, that's it. - if len(svc.Spec.Ports) == 1 { - ingressDefaultPort = fmt.Sprintf("%d", svc.Spec.Ports[0].Port) - } else { - // Look for a port named "http". If found, that's it. While we're - // looping also look to see if the servie exposes port 80. If no port - // is named "http", we'll assume 80 (if exposed) is the default port. - var foundPort80 bool - for _, port := range svc.Spec.Ports { - if port.Name == "http" { - ingressDefaultPort = fmt.Sprintf("%d", port.Port) - break - } - if port.Port == 80 { - foundPort80 = true - } + name = deploymentName + kind = appKindDeployment + } else if statefulSetName, ok := + svc.Annotations["osiris.deislabs.io/statefulset"]; ok { + name = statefulSetName + kind = appKindStatefulSet + } + if len(name) == 0 { + continue + } + + svcShortDNSName := fmt.Sprintf("%s.%s", svc.Name, svc.Namespace) + svcFullDNSName := fmt.Sprintf("%s.svc.cluster.local", svcShortDNSName) + // Determine the "default" ingress port. When a request arrives at the + // activator via an ingress conroller, the request's host header won't + // indicate a port. After activation is complete, the activator needs to + // forward the request to the service (which is now backed by application + // endpoints). It's important to know which service port to forward the + // request to. + var ingressDefaultPort string + var ok bool + // Start by seeing if a default port was explicitly specified. + if ingressDefaultPort, ok = + svc.Annotations["osiris.deislabs.io/ingressDefaultPort"]; !ok { + // If not specified, try to infer it. + // If there's only one port, that's it. + if len(svc.Spec.Ports) == 1 { + ingressDefaultPort = fmt.Sprintf("%d", svc.Spec.Ports[0].Port) + } else { + // Look for a port named "http". If found, that's it. While we're + // looping also look to see if the servie exposes port 80. If no port + // is named "http", we'll assume 80 (if exposed) is the default port. + var foundPort80 bool + for _, port := range svc.Spec.Ports { + if port.Name == "http" { + ingressDefaultPort = fmt.Sprintf("%d", port.Port) + break } - if ingressDefaultPort == "" && foundPort80 { - ingressDefaultPort = "80" + if port.Port == 80 { + foundPort80 = true } } - } - // Determine the "default" TLS port. When a TLS-secured request arrives at - // the activator, the TLS SNI header won't indicate a port. After - // activation is complete, the activator needs to forward the request to - // the service (which is now backed by application endpoints). It's - // important to know which service port to forward the request to. - var tlsDefaultPort string - if tlsDefaultPort, ok = - svc.Annotations["osiris.deislabs.io/tlsPort"]; !ok { - // If not specified, try to infer it. - // If there's only one port, that's it. - if len(svc.Spec.Ports) == 1 { - tlsDefaultPort = fmt.Sprintf("%d", svc.Spec.Ports[0].Port) - } else { - // Look for a port named "https". If found, that's it. While we're - // looping also look to see if the servie exposes port 443. If no port - // is named "https", we'll assume 443 (if exposed) is the default - // port. - var foundPort443 bool - for _, port := range svc.Spec.Ports { - if port.Name == "https" { - tlsDefaultPort = fmt.Sprintf("%d", port.Port) - break - } - if port.Port == 443 { - foundPort443 = true - } - } - if tlsDefaultPort == "" && foundPort443 { - ingressDefaultPort = "443" - } + if ingressDefaultPort == "" && foundPort80 { + ingressDefaultPort = "80" } } - // For every port... - for _, port := range svc.Spec.Ports { - app := &app{ - namespace: svc.Namespace, - serviceName: svc.Name, - deploymentName: deploymentName, - targetHost: svc.Spec.ClusterIP, - targetPort: int(port.Port), - } - // If the port is 80, also index by hostname/IP sans port number... - if port.Port == 80 { - // kube-dns names - appsByHost[svcShortDNSName] = app - appsByHost[svcFullDNSName] = app - // cluster IP - appsByHost[svc.Spec.ClusterIP] = app - // external IPs - for _, loadBalancerIngress := range svc.Status.LoadBalancer.Ingress { - if loadBalancerIngress.IP != "" { - appsByHost[loadBalancerIngress.IP] = app - } - } - // Honor all annotations of the form - // ^osiris\.deislabs\.io/loadBalancerHostname(?:-\d+)?$ - for k, v := range svc.Annotations { - if loadBalancerHostnameAnnotationRegex.MatchString(k) { - appsByHost[v] = app - } + } + // Determine the "default" TLS port. When a TLS-secured request arrives at + // the activator, the TLS SNI header won't indicate a port. After + // activation is complete, the activator needs to forward the request to + // the service (which is now backed by application endpoints). It's + // important to know which service port to forward the request to. + var tlsDefaultPort string + if tlsDefaultPort, ok = + svc.Annotations["osiris.deislabs.io/tlsPort"]; !ok { + // If not specified, try to infer it. + // If there's only one port, that's it. + if len(svc.Spec.Ports) == 1 { + tlsDefaultPort = fmt.Sprintf("%d", svc.Spec.Ports[0].Port) + } else { + // Look for a port named "https". If found, that's it. While we're + // looping also look to see if the servie exposes port 443. If no port + // is named "https", we'll assume 443 (if exposed) is the default + // port. + var foundPort443 bool + for _, port := range svc.Spec.Ports { + if port.Name == "https" { + tlsDefaultPort = fmt.Sprintf("%d", port.Port) + break } - } - if fmt.Sprintf("%d", port.Port) == ingressDefaultPort { - // Honor all annotations of the form - // ^osiris\.deislabs\.io/ingressHostname(?:-\d+)?$ - for k, v := range svc.Annotations { - if ingressHostnameAnnotationRegex.MatchString(k) { - appsByHost[v] = app - } + if port.Port == 443 { + foundPort443 = true } } - if fmt.Sprintf("%d", port.Port) == tlsDefaultPort { - // Now index by hostname:tls. Note that there's no point in indexing - // by IP:tls because SNI server name will never be an IP. - // kube-dns names - appsByHost[fmt.Sprintf("%s:tls", svcShortDNSName)] = app - appsByHost[fmt.Sprintf("%s:tls", svcFullDNSName)] = app - // Honor all annotations of the form - // ^osiris\.deislabs\.io/loadBalancerHostname(?:-\d+)?$ - for k, v := range svc.Annotations { - if loadBalancerHostnameAnnotationRegex.MatchString(k) { - appsByHost[fmt.Sprintf("%s:tls", v)] = app - } - } + if tlsDefaultPort == "" && foundPort443 { + ingressDefaultPort = "443" } - // Now index by hostname/IP:port... + } + } + // For every port... + for _, port := range svc.Spec.Ports { + app := &app{ + namespace: svc.Namespace, + serviceName: svc.Name, + name: name, + kind: kind, + targetHost: svc.Spec.ClusterIP, + targetPort: int(port.Port), + } + // If the port is 80, also index by hostname/IP sans port number... + if port.Port == 80 { // kube-dns names - appsByHost[fmt.Sprintf("%s:%d", svcShortDNSName, port.Port)] = app - appsByHost[fmt.Sprintf("%s:%d", svcFullDNSName, port.Port)] = app + appsByHost[svcShortDNSName] = app + appsByHost[svcFullDNSName] = app // cluster IP - appsByHost[fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port)] = app + appsByHost[svc.Spec.ClusterIP] = app // external IPs for _, loadBalancerIngress := range svc.Status.LoadBalancer.Ingress { if loadBalancerIngress.IP != "" { - appsByHost[fmt.Sprintf("%s:%d", loadBalancerIngress.IP, port.Port)] = app // nolint: lll + appsByHost[loadBalancerIngress.IP] = app } } - // Node hostname/IP:node-port - if port.NodePort != 0 { - for nodeAddress := range a.nodeAddresses { - appsByHost[fmt.Sprintf("%s:%d", nodeAddress, port.NodePort)] = app + // Honor all annotations of the form + // ^osiris\.deislabs\.io/loadBalancerHostname(?:-\d+)?$ + for k, v := range svc.Annotations { + if loadBalancerHostnameAnnotationRegex.MatchString(k) { + appsByHost[v] = app } } + } + if fmt.Sprintf("%d", port.Port) == ingressDefaultPort { + // Honor all annotations of the form + // ^osiris\.deislabs\.io/ingressHostname(?:-\d+)?$ + for k, v := range svc.Annotations { + if ingressHostnameAnnotationRegex.MatchString(k) { + appsByHost[v] = app + } + } + } + if fmt.Sprintf("%d", port.Port) == tlsDefaultPort { + // Now index by hostname:tls. Note that there's no point in indexing + // by IP:tls because SNI server name will never be an IP. + // kube-dns names + appsByHost[fmt.Sprintf("%s:tls", svcShortDNSName)] = app + appsByHost[fmt.Sprintf("%s:tls", svcFullDNSName)] = app // Honor all annotations of the form // ^osiris\.deislabs\.io/loadBalancerHostname(?:-\d+)?$ for k, v := range svc.Annotations { if loadBalancerHostnameAnnotationRegex.MatchString(k) { - appsByHost[fmt.Sprintf("%s:%d", v, port.Port)] = app + appsByHost[fmt.Sprintf("%s:tls", v)] = app } } } + // Now index by hostname/IP:port... + // kube-dns names + appsByHost[fmt.Sprintf("%s:%d", svcShortDNSName, port.Port)] = app + appsByHost[fmt.Sprintf("%s:%d", svcFullDNSName, port.Port)] = app + // cluster IP + appsByHost[fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port)] = app + // external IPs + for _, loadBalancerIngress := range svc.Status.LoadBalancer.Ingress { + if loadBalancerIngress.IP != "" { + appsByHost[fmt.Sprintf("%s:%d", loadBalancerIngress.IP, port.Port)] = app // nolint: lll + } + } + // Node hostname/IP:node-port + if port.NodePort != 0 { + for nodeAddress := range a.nodeAddresses { + appsByHost[fmt.Sprintf("%s:%d", nodeAddress, port.NodePort)] = app + } + } + // Honor all annotations of the form + // ^osiris\.deislabs\.io/loadBalancerHostname(?:-\d+)?$ + for k, v := range svc.Annotations { + if loadBalancerHostnameAnnotationRegex.MatchString(k) { + appsByHost[fmt.Sprintf("%s:%d", v, port.Port)] = app + } + } } } a.appsByHost = appsByHost diff --git a/pkg/deployments/activator/keys.go b/pkg/deployments/activator/keys.go index 1b059db4..0491593d 100644 --- a/pkg/deployments/activator/keys.go +++ b/pkg/deployments/activator/keys.go @@ -4,6 +4,6 @@ import ( "fmt" ) -func getKey(namespace, name string) string { - return fmt.Sprintf("%s:%s", namespace, name) +func getKey(namespace string, kind appKind, name string) string { + return fmt.Sprintf("%s:%s/%s", kind, namespace, name) } diff --git a/pkg/deployments/activator/request_handling.go b/pkg/deployments/activator/request_handling.go index 39887a3f..302511da 100644 --- a/pkg/deployments/activator/request_handling.go +++ b/pkg/deployments/activator/request_handling.go @@ -7,78 +7,102 @@ import ( ) func (a *activator) activateAndWait(hostname string) (string, int, error) { - glog.Infof("Request received for for host %s", hostname) + glog.Infof("Request received for host %s", hostname) a.indicesLock.RLock() app, ok := a.appsByHost[hostname] a.indicesLock.RUnlock() if !ok { - return "", 0, fmt.Errorf("No deployment found for host %s", hostname) + return "", 0, fmt.Errorf( + "No deployment or statefulSet found for host %s", + hostname, + ) } glog.Infof( - "Deployment %s in namespace %s may require activation", - app.deploymentName, + "%s %s in namespace %s may require activation", + app.kind, + app.name, app.namespace, ) - // Are we already activating the deployment in question? + // Are we already activating the deployment/statefulset in question? var err error - deploymentKey := getKey(app.namespace, app.deploymentName) - deploymentActivation, ok := a.deploymentActivations[deploymentKey] + appKey := getKey(app.namespace, app.kind, app.name) + appActivation, ok := a.appActivations[appKey] if ok { glog.Infof( - "Found activation in-progress for deployment %s in namespace %s", - app.deploymentName, + "Found activation in-progress for %s %s in namespace %s", + app.kind, + app.name, app.namespace, ) } else { func() { - a.deploymentActivationsLock.Lock() - defer a.deploymentActivationsLock.Unlock() - // Some other goroutine could have initiated activation of this deployment - // while we were waiting for the lock. Now that we have the lock, do we - // still need to do this? - deploymentActivation, ok = a.deploymentActivations[deploymentKey] + a.appActivationsLock.Lock() + defer a.appActivationsLock.Unlock() + // Some other goroutine could have initiated activation of this + // deployment/statefulSet while we were waiting for the lock. + // Now that we have the lock, do we still need to do this? + appActivation, ok = a.appActivations[appKey] if ok { glog.Infof( - "Found activation in-progress for deployment %s in namespace %s", - app.deploymentName, + "Found activation in-progress for %s %s in namespace %s", + app.kind, + app.name, app.namespace, ) return } glog.Infof( - "Found NO activation in-progress for deployment %s in namespace %s", - app.deploymentName, + "Found NO activation in-progress for %s %s in namespace %s", + app.kind, + app.name, app.namespace, ) // Initiate activation (or discover that it may already have been started // by another activator process) - if deploymentActivation, err = a.activateDeployment(app); err != nil { + switch app.kind { + case appKindDeployment: + appActivation, err = a.activateDeployment(app) + case appKindStatefulSet: + appActivation, err = a.activateStatefulSet(app) + default: + glog.Errorf("unknown app kind %s", app.kind) + return + } + if err != nil { + glog.Errorf( + "%s activation for %s in namespace %s failed: %s", + app.kind, + app.name, + app.namespace, + err, + ) return } // Add it to the index of in-flight activation - a.deploymentActivations[deploymentKey] = deploymentActivation + a.appActivations[appKey] = appActivation // But remove it from that index when it's complete go func() { deleteActivation := func() { - a.deploymentActivationsLock.Lock() - defer a.deploymentActivationsLock.Unlock() - delete(a.deploymentActivations, deploymentKey) + a.appActivationsLock.Lock() + defer a.appActivationsLock.Unlock() + delete(a.appActivations, appKey) } select { - case <-deploymentActivation.successCh: + case <-appActivation.successCh: deleteActivation() - case <-deploymentActivation.timeoutCh: + case <-appActivation.timeoutCh: deleteActivation() } }() }() if err != nil { return "", 0, fmt.Errorf( - "Error activating deployment %s in namespace %s: %s", - app.deploymentName, + "Error activating %s %s in namespace %s: %s", + app.kind, + app.name, app.namespace, err, ) @@ -89,12 +113,13 @@ func (a *activator) activateAndWait(hostname string) (string, int, error) { // progress, we need to wait for that activation to be completed... or fail... // or time out. select { - case <-deploymentActivation.successCh: + case <-appActivation.successCh: return app.targetHost, app.targetPort, nil - case <-deploymentActivation.timeoutCh: + case <-appActivation.timeoutCh: return "", 0, fmt.Errorf( - "Timed out waiting for activation of deployment %s in namespace %s: %s", - app.deploymentName, + "Timed out waiting for activation of %s %s in namespace %s: %s", + app.kind, + app.name, app.namespace, err, ) diff --git a/pkg/deployments/zeroscaler/metrics_collector.go b/pkg/deployments/zeroscaler/metrics_collector.go index 56a19131..6415b58e 100644 --- a/pkg/deployments/zeroscaler/metrics_collector.go +++ b/pkg/deployments/zeroscaler/metrics_collector.go @@ -26,8 +26,9 @@ const ( type metricsCollector struct { kubeClient kubernetes.Interface - deploymentName string - deploymentNamespace string + appKind string + appName string + appNamespace string selector labels.Selector metricsCheckInterval time.Duration podsInformer cache.SharedIndexInformer @@ -40,20 +41,22 @@ type metricsCollector struct { func newMetricsCollector( kubeClient kubernetes.Interface, - deploymentName string, - deploymentNamespace string, + appKind string, + appName string, + appNamespace string, selector labels.Selector, metricsCheckInterval time.Duration, ) *metricsCollector { m := &metricsCollector{ kubeClient: kubeClient, - deploymentName: deploymentName, - deploymentNamespace: deploymentNamespace, + appKind: appKind, + appName: appName, + appNamespace: appNamespace, selector: selector, metricsCheckInterval: metricsCheckInterval, podsInformer: k8s.PodsIndexInformer( kubeClient, - deploymentNamespace, + appNamespace, nil, selector, ), @@ -82,15 +85,17 @@ func (m *metricsCollector) run(ctx context.Context) { go func() { <-ctx.Done() glog.Infof( - "Stopping metrics collection for deployment %s in namespace %s", - m.deploymentName, - m.deploymentNamespace, + "Stopping metrics collection for %s %s in namespace %s", + m.appKind, + m.appName, + m.appNamespace, ) }() glog.Infof( - "Starting metrics collection for deployment %s in namespace %s", - m.deploymentName, - m.deploymentNamespace, + "Starting metrics collection for %s %s in namespace %s", + m.appKind, + m.appName, + m.appNamespace, ) go m.podsInformer.Run(ctx.Done()) // When this exits, the cancel func will stop the informer @@ -143,7 +148,7 @@ func (m *metricsCollector) collectMetrics(ctx context.Context) { timer := time.NewTimer(3 * time.Second) defer timer.Stop() var timedOut bool - // Get metrics for all of the deployment's CURRENT pods. + // Get metrics for all of the deployment/statefulSet's CURRENT pods. var scrapeWG sync.WaitGroup for _, pod := range m.currentAppPods { podMetricsPort, ok := getMetricsPort(pod) @@ -176,10 +181,10 @@ func (m *metricsCollector) collectMetrics(ctx context.Context) { if periodStartTime == nil { return } - // Now iterate over stats for ALL of the deployment's pods-- this may - // include pods that died since the last check-- their stats should - // still count, but since we won't have stats for those, we'll have to - // err on the side of caution and assume activity in such cases. + // Now iterate over stats for ALL of the deployment/statefulSet's pods + // --this may include pods that died since the last check-- their stats + // should still count, but since we won't have stats for those, we'll + // have to err on the side of caution and assume activity in such cases. var foundActivity, assumedActivity bool for podName, ps := range m.allAppPodStats { if ps.podDeletedTime != nil && @@ -281,9 +286,10 @@ func (m *metricsCollector) scrape( func (m *metricsCollector) scaleToZero() { glog.Infof( - "Scale to zero starting for deployment %s in namespace %s", - m.deploymentName, - m.deploymentNamespace, + "Scale to zero starting for %s %s in namespace %s", + m.appKind, + m.appName, + m.appNamespace, ) patches := []k8s.PatchOperation{{ @@ -292,23 +298,38 @@ func (m *metricsCollector) scaleToZero() { Value: 0, }} patchesBytes, _ := json.Marshal(patches) - if _, err := m.kubeClient.AppsV1().Deployments(m.deploymentNamespace).Patch( - m.deploymentName, - k8s_types.JSONPatchType, - patchesBytes, - ); err != nil { + var err error + switch m.appKind { + case "Deployment": + _, err = m.kubeClient.AppsV1().Deployments(m.appNamespace).Patch( + m.appName, + k8s_types.JSONPatchType, + patchesBytes, + ) + case "StatefulSet": + _, err = m.kubeClient.AppsV1().StatefulSets(m.appNamespace).Patch( + m.appName, + k8s_types.JSONPatchType, + patchesBytes, + ) + default: + err = fmt.Errorf("unknown kind '%s'", m.appKind) + } + if err != nil { glog.Errorf( - "Error scaling deployment %s in namespace %s to zero: %s", - m.deploymentName, - m.deploymentNamespace, + "Error scaling %s %s in namespace %s to zero: %s", + m.appKind, + m.appName, + m.appNamespace, err, ) return } glog.Infof( - "Scaled deployment %s in namespace %s to zero", - m.deploymentName, - m.deploymentNamespace, + "Scaled %s %s in namespace %s to zero", + m.appKind, + m.appName, + m.appNamespace, ) } diff --git a/pkg/deployments/zeroscaler/zeroscaler.go b/pkg/deployments/zeroscaler/zeroscaler.go index ef42e991..da035df2 100644 --- a/pkg/deployments/zeroscaler/zeroscaler.go +++ b/pkg/deployments/zeroscaler/zeroscaler.go @@ -22,12 +22,13 @@ type Zeroscaler interface { } type zeroscaler struct { - cfg Config - kubeClient kubernetes.Interface - deploymentsInformer cache.SharedInformer - collectors map[string]*metricsCollector - collectorsLock sync.Mutex - ctx context.Context + cfg Config + kubeClient kubernetes.Interface + deploymentsInformer cache.SharedInformer + statefulSetsInformer cache.SharedInformer + collectors map[string]*metricsCollector + collectorsLock sync.Mutex + ctx context.Context } func NewZeroscaler(cfg Config, kubeClient kubernetes.Interface) Zeroscaler { @@ -40,6 +41,12 @@ func NewZeroscaler(cfg Config, kubeClient kubernetes.Interface) Zeroscaler { nil, nil, ), + statefulSetsInformer: k8s.StatefulSetsIndexInformer( + kubeClient, + metav1.NamespaceAll, + nil, + nil, + ), collectors: map[string]*metricsCollector{}, } z.deploymentsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -49,10 +56,18 @@ func NewZeroscaler(cfg Config, kubeClient kubernetes.Interface) Zeroscaler { }, DeleteFunc: z.syncDeletedDeployment, }) + z.statefulSetsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: z.syncStatefulSet, + UpdateFunc: func(_, newObj interface{}) { + z.syncStatefulSet(newObj) + }, + DeleteFunc: z.syncDeletedStatefulSet, + }) return z } -// Run causes the controller to collect metrics for Osiris-enabled deployments. +// Run causes the controller to collect metrics for Osiris-enabled +// deployments and statefulSets. func (z *zeroscaler) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -66,6 +81,10 @@ func (z *zeroscaler) Run(ctx context.Context) { z.deploymentsInformer.Run(ctx.Done()) cancel() }() + go func() { + z.statefulSetsInformer.Run(ctx.Done()) + cancel() + }() healthz.RunServer(ctx, 5000) cancel() } @@ -88,7 +107,13 @@ func (z *zeroscaler) syncDeployment(obj interface{}) { deployment.Name, deployment.Namespace, ) - z.ensureMetricsCollection(deployment) + z.ensureMetricsCollection( + "Deployment", + deployment.Namespace, + deployment.Name, + deployment.Annotations, + deployment.Spec.Selector, + ) } else { glog.Infof( "Osiris-enabled deployment %s in namespace %s is running zero "+ @@ -97,7 +122,11 @@ func (z *zeroscaler) syncDeployment(obj interface{}) { deployment.Name, deployment.Namespace, ) - z.ensureNoMetricsCollection(deployment) + z.ensureNoMetricsCollection( + "Deployment", + deployment.Namespace, + deployment.Name, + ) } } else { glog.Infof( @@ -106,7 +135,65 @@ func (z *zeroscaler) syncDeployment(obj interface{}) { deployment.Name, deployment.Namespace, ) - z.ensureNoMetricsCollection(deployment) + z.ensureNoMetricsCollection( + "Deployment", + deployment.Namespace, + deployment.Name, + ) + } +} + +func (z *zeroscaler) syncStatefulSet(obj interface{}) { + statefulSet := obj.(*appsv1.StatefulSet) + if k8s.ResourceIsOsirisEnabled(statefulSet.Annotations) { + glog.Infof( + "Notified about new or updated Osiris-enabled statefulSet %s in "+ + "namespace %s", + statefulSet.Name, + statefulSet.Namespace, + ) + minReplicas := k8s.GetMinReplicas(statefulSet.Annotations, 1) + if *statefulSet.Spec.Replicas > 0 && + statefulSet.Status.ReadyReplicas <= minReplicas { + glog.Infof( + "Osiris-enabled statefulSet %s in namespace %s is running the minimun "+ + "number of replicas or fewer; ensuring metrics collection", + statefulSet.Name, + statefulSet.Namespace, + ) + z.ensureMetricsCollection( + "StatefulSet", + statefulSet.Namespace, + statefulSet.Name, + statefulSet.Annotations, + statefulSet.Spec.Selector, + ) + } else { + glog.Infof( + "Osiris-enabled statefulSet %s in namespace %s is running zero "+ + "replicas OR more than the minimum number of replicas; ensuring "+ + "NO metrics collection", + statefulSet.Name, + statefulSet.Namespace, + ) + z.ensureNoMetricsCollection( + "StatefulSet", + statefulSet.Namespace, + statefulSet.Name, + ) + } + } else { + glog.Infof( + "Notified about new or updated non-Osiris-enabled statefulSet %s in "+ + "namespace %s; ensuring NO metrics collection", + statefulSet.Name, + statefulSet.Namespace, + ) + z.ensureNoMetricsCollection( + "StatefulSet", + statefulSet.Namespace, + statefulSet.Name, + ) } } @@ -118,28 +205,49 @@ func (z *zeroscaler) syncDeletedDeployment(obj interface{}) { deployment.Name, deployment.Namespace, ) - z.ensureNoMetricsCollection(deployment) + z.ensureNoMetricsCollection( + "Deployment", + deployment.Namespace, + deployment.Name, + ) } -func (z *zeroscaler) ensureMetricsCollection(deployment *appsv1.Deployment) { +func (z *zeroscaler) syncDeletedStatefulSet(obj interface{}) { + statefulSet := obj.(*appsv1.StatefulSet) + glog.Infof( + "Notified about deleted statefulSet %s in namespace %s; ensuring NO "+ + "metrics collection", + statefulSet.Name, + statefulSet.Namespace, + ) + z.ensureNoMetricsCollection( + "StatefulSet", + statefulSet.Namespace, + statefulSet.Name, + ) +} + +func (z *zeroscaler) ensureMetricsCollection(kind, namespace, name string, + annotations map[string]string, labelSelector *metav1.LabelSelector) { z.collectorsLock.Lock() defer z.collectorsLock.Unlock() - key := getDeploymentKey(deployment) - selector := labels.SelectorFromSet(deployment.Spec.Selector.MatchLabels) + key := getKey(kind, namespace, name) + selector := labels.SelectorFromSet(labelSelector.MatchLabels) if collector, ok := z.collectors[key]; !ok || !reflect.DeepEqual(selector, collector.selector) { if ok { collector.stop() } metricsCheckInterval, err := k8s.GetMetricsCheckInterval( - deployment.Annotations, + annotations, ) if err != nil { glog.Warningf( "There was an error getting custom metrics check interval value "+ - "in deployment %s, falling back to the default value of %d "+ + "in %s %s, falling back to the default value of %d "+ "seconds; error: %s", - deployment.Name, + kind, + name, z.cfg.MetricsCheckInterval, err, ) @@ -147,25 +255,28 @@ func (z *zeroscaler) ensureMetricsCollection(deployment *appsv1.Deployment) { } if metricsCheckInterval <= 0 { glog.Warningf( - "Invalid custom metrics check interval value %d in deployment %s,"+ + "Invalid custom metrics check interval value %d in %s %s,"+ " falling back to the default value of %d seconds", metricsCheckInterval, - deployment.Name, + kind, + name, z.cfg.MetricsCheckInterval, ) metricsCheckInterval = z.cfg.MetricsCheckInterval } glog.Infof( - "Using new metrics collector for deployment %s in namespace %s "+ + "Using new metrics collector for %s %s in namespace %s "+ "with metrics check interval of %d seconds", - deployment.Name, - deployment.Namespace, + kind, + name, + namespace, metricsCheckInterval, ) collector := newMetricsCollector( z.kubeClient, - deployment.Name, - deployment.Namespace, + kind, + name, + namespace, selector, time.Duration(metricsCheckInterval)*time.Second, ) @@ -181,22 +292,23 @@ func (z *zeroscaler) ensureMetricsCollection(deployment *appsv1.Deployment) { return } glog.Infof( - "Using existing metrics collector for deployment %s in namespace %s", - deployment.Name, - deployment.Namespace, + "Using existing metrics collector for %s %s in namespace %s", + kind, + name, + namespace, ) } -func (z *zeroscaler) ensureNoMetricsCollection(deployment *appsv1.Deployment) { +func (z *zeroscaler) ensureNoMetricsCollection(kind, namespace, name string) { z.collectorsLock.Lock() defer z.collectorsLock.Unlock() - key := getDeploymentKey(deployment) + key := getKey(kind, namespace, name) if collector, ok := z.collectors[key]; ok { collector.stop() delete(z.collectors, key) } } -func getDeploymentKey(deployment *appsv1.Deployment) string { - return fmt.Sprintf("%s:%s", deployment.Namespace, deployment.Name) +func getKey(kind, namespace, name string) string { + return fmt.Sprintf("%s:%s/%s", kind, namespace, name) } diff --git a/pkg/endpoints/controller/endpoints_manager.go b/pkg/endpoints/controller/endpoints_manager.go index 7a06e3e5..b3add753 100644 --- a/pkg/endpoints/controller/endpoints_manager.go +++ b/pkg/endpoints/controller/endpoints_manager.go @@ -98,8 +98,8 @@ func (e *endpointsManager) run(ctx context.Context) { ) }() e.podsInformer.Run(ctx.Done()) - // force an initial sync of the endpoints for deployments that are initially - // scaled to 0, and for which we won't see Pod events. + // force an initial sync of the endpoints for deployments/statefulsets + // that are initially scaled to 0, and for which we won't see Pod events. e.syncEndpoints() } diff --git a/pkg/endpoints/hijacker/hijacker.go b/pkg/endpoints/hijacker/hijacker.go index 82ca35a7..a5089dfc 100644 --- a/pkg/endpoints/hijacker/hijacker.go +++ b/pkg/endpoints/hijacker/hijacker.go @@ -213,10 +213,13 @@ func (h *hijacker) handleRequest(w http.ResponseWriter, r *http.Request) { func validateService(svc *corev1.Service) error { if kubernetes.ResourceIsOsirisEnabled(svc.Annotations) { - if _, ok := svc.Annotations["osiris.deislabs.io/deployment"]; !ok { + _, deploymentPresent := svc.Annotations["osiris.deislabs.io/deployment"] + _, statefulSetPresent := svc.Annotations["osiris.deislabs.io/statefulset"] + if !deploymentPresent && !statefulSetPresent { return fmt.Errorf( `Osiris-enabled service %s in namespace %s is lacking the required `+ - `"osiris.deislabs.io/deployment" annotation`, + `"osiris.deislabs.io/deployment" or`+ + `"osiris.deislabs.io/statefulset" annotation`, svc.Name, svc.Namespace, ) diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index ab938195..2c23a21d 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -46,6 +46,40 @@ func DeploymentsIndexInformer( ) } +func StatefulSetsIndexInformer( + client kubernetes.Interface, + namespace string, + fieldSelector fields.Selector, + labelSelector labels.Selector, +) cache.SharedIndexInformer { + statefulSetsClient := client.AppsV1().StatefulSets(namespace) + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if fieldSelector != nil { + options.FieldSelector = fieldSelector.String() + } + if labelSelector != nil { + options.LabelSelector = labelSelector.String() + } + return statefulSetsClient.List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if fieldSelector != nil { + options.FieldSelector = fieldSelector.String() + } + if labelSelector != nil { + options.LabelSelector = labelSelector.String() + } + return statefulSetsClient.Watch(options) + }, + }, + &appsv1.StatefulSet{}, + 0, + cache.Indexers{}, + ) +} + func PodsIndexInformer( client kubernetes.Interface, namespace string,