diff --git a/Makefile b/Makefile index b4b06af..e7149fd 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,10 @@ BUILDENVVAR=CGO_ENABLED=0 all: build chmod +x bin/load-watcher +.PHONY: test +test: + go test ./... + .PHONY: build build: $(COMMONENVVAR) $(BUILDENVVAR) go build -o bin/load-watcher main.go diff --git a/pkg/watcher/internal/metricsprovider/k8s.go b/pkg/watcher/internal/metricsprovider/k8s.go index 59ab0ee..dac963a 100644 --- a/pkg/watcher/internal/metricsprovider/k8s.go +++ b/pkg/watcher/internal/metricsprovider/k8s.go @@ -75,16 +75,16 @@ func NewMetricsServerClient() (watcher.MetricsProviderClient, error) { if err != nil { return nil, err } - return metricsServerClient{ + return &metricsServerClient{ metricsClientSet: metricsClientSet, coreClientSet: clientSet}, nil } -func (m metricsServerClient) Name() string { +func (m *metricsServerClient) Name() string { return watcher.K8sClientName } -func (m metricsServerClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { +func (m *metricsServerClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { var metrics = []watcher.Metric{} nodeMetrics, err := m.metricsClientSet.MetricsV1beta1().NodeMetricses().Get(context.TODO(), host, metav1.GetOptions{}) @@ -112,7 +112,7 @@ func (m metricsServerClient) FetchHostMetrics(host string, window *watcher.Windo return metrics, nil } -func (m metricsServerClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { +func (m *metricsServerClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { metrics := make(map[string][]watcher.Metric) nodeMetricsList, err := m.metricsClientSet.MetricsV1beta1().NodeMetricses().List(context.TODO(), metav1.ListOptions{}) @@ -156,7 +156,7 @@ func (m metricsServerClient) FetchAllHostsMetrics(window *watcher.Window) (map[s return metrics, nil } -func (m metricsServerClient) Health() (int, error) { +func (m *metricsServerClient) Health() (int, error) { var status int m.metricsClientSet.RESTClient().Verb("HEAD").Do(context.Background()).StatusCode(&status) if status != http.StatusOK { diff --git a/pkg/watcher/internal/metricsprovider/prometheus.go b/pkg/watcher/internal/metricsprovider/prometheus.go index a2b7406..7db1c3f 100644 --- a/pkg/watcher/internal/metricsprovider/prometheus.go +++ b/pkg/watcher/internal/metricsprovider/prometheus.go @@ -146,14 +146,14 @@ func NewPromClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProviderCli return nil, err } - return promClient{client}, err + return &promClient{client}, err } -func (s promClient) Name() string { +func (s *promClient) Name() string { return watcher.PromClientName } -func (s promClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { +func (s *promClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { var metricList []watcher.Metric var anyerr error @@ -177,7 +177,7 @@ func (s promClient) FetchHostMetrics(host string, window *watcher.Window) ([]wat } // FetchAllHostsMetrics Fetch all host metrics with different operators (avg_over_time, stddev_over_time) and different resource types (CPU, Memory) -func (s promClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { +func (s *promClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { hostMetrics := make(map[string][]watcher.Metric) var anyerr error @@ -203,7 +203,7 @@ func (s promClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]w return hostMetrics, anyerr } -func (s promClient) Health() (int, error) { +func (s *promClient) Health() (int, error) { req, err := http.NewRequest("HEAD", DefaultPromAddress, nil) if err != nil { return -1, err @@ -218,7 +218,7 @@ func (s promClient) Health() (int, error) { return 0, nil } -func (s promClient) buildPromQuery(host string, metric string, method string, rollup string) string { +func (s *promClient) buildPromQuery(host string, metric string, method string, rollup string) string { var promQuery string if host == allHosts { @@ -247,7 +247,7 @@ func (s promClient) getPromResults(promQuery string) (model.Value, error) { return results, nil } -func (s promClient) promResults2MetricMap(promresults model.Value, metric string, method string, rollup string) map[string][]watcher.Metric { +func (s *promClient) promResults2MetricMap(promresults model.Value, metric string, method string, rollup string) map[string][]watcher.Metric { var metricType string var operator string diff --git a/pkg/watcher/internal/metricsprovider/signalfx.go b/pkg/watcher/internal/metricsprovider/signalfx.go index c67a680..26590b9 100644 --- a/pkg/watcher/internal/metricsprovider/signalfx.go +++ b/pkg/watcher/internal/metricsprovider/signalfx.go @@ -37,9 +37,9 @@ const ( DefaultSignalFxAddress = "https://api.signalfx.com" signalFxMetricsAPI = "/v1/timeserieswindow" signalFxMetdataAPI = "/v2/metrictimeseries" - signalFxHostFilter = "host:" - signalFxClusterFilter = "cluster:" + signalFxHostFilter = "SIGNALFX_HOST_FILTER" signalFxHostNameSuffixKey = "SIGNALFX_HOST_NAME_SUFFIX" + signalFxClusterFilter = "SIGNALFX_ClUSTER_FILTER" signalFxClusterName = "SIGNALFX_CLUSTER_NAME" // SignalFX Query Params oneMinuteResolutionMs = 60000 @@ -56,7 +56,9 @@ type signalFxClient struct { client http.Client authToken string signalFxAddress string + hostKey string hostNameSuffix string + clusterKey string clusterName string } @@ -67,8 +69,27 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide tlsConfig := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: opts.InsecureSkipVerify}, // TODO(aqadeer): Figure out a secure way to let users add SSL certs } - hostNameSuffix, _ := os.LookupEnv(signalFxHostNameSuffixKey) - clusterName, _ := os.LookupEnv(signalFxClusterName) + + hostKey, ok := os.LookupEnv(signalFxHostFilter) + if !ok { + hostKey = "host" + } + + hostNameSuffix, ok := os.LookupEnv(signalFxHostNameSuffixKey) + if !ok { + hostNameSuffix = "" + } + + clusterKey, ok := os.LookupEnv(signalFxClusterFilter) + if !ok { + clusterKey = "cluster" + } + + clusterName, ok := os.LookupEnv(signalFxClusterName) + if !ok { + clusterName = "*" + } + var signalFxAddress, signalFxAuthToken = DefaultSignalFxAddress, "" if opts.Address != "" { signalFxAddress = opts.Address @@ -79,24 +100,26 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide if signalFxAuthToken == "" { log.Fatalf("No auth token found to connect with SignalFx server") } - return signalFxClient{client: http.Client{ + return &signalFxClient{client: http.Client{ Timeout: httpClientTimeout, Transport: tlsConfig}, authToken: signalFxAuthToken, signalFxAddress: signalFxAddress, + hostKey: hostKey, hostNameSuffix: hostNameSuffix, + clusterKey: clusterKey, clusterName: clusterName}, nil } -func (s signalFxClient) Name() string { +func (s *signalFxClient) Name() string { return watcher.SignalFxClientName } -func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { +func (s *signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) { log.Debugf("fetching metrics for host %v", host) var metrics []watcher.Metric - hostFilter := signalFxHostFilter + host + s.hostNameSuffix - clusterFilter := signalFxClusterFilter + s.clusterName + hostFilter := s.hostKey + ":" + host + s.hostNameSuffix + clusterFilter := s.clusterKey + ":" + s.clusterName for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} { uri, err := s.buildMetricURL(hostFilter, clusterFilter, metric, window) if err != nil { @@ -131,9 +154,9 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([ return metrics, nil } -func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { - hostFilter := signalFxHostFilter + "*" + s.hostNameSuffix - clusterFilter := signalFxClusterFilter + s.clusterName +func (s *signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) { + hostFilter := s.hostKey + ":*" + s.hostNameSuffix + clusterFilter := s.clusterKey + ":" + s.clusterName metrics := make(map[string][]watcher.Metric) for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} { uri, err := s.buildMetricURL(hostFilter, clusterFilter, metric, window) @@ -173,7 +196,7 @@ func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string if err != nil { return metrics, fmt.Errorf("received error in decoding metadata payload: %v", err) } - mappedMetrics, err := getMetricsFromPayloads(metricPayload, metadataPayload) + mappedMetrics, err := getMetricsFromPayloads(metricPayload, metadataPayload, s.hostKey) if err != nil { return metrics, fmt.Errorf("received error in getting metrics from payload: %v", err) } @@ -185,11 +208,11 @@ func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string return metrics, nil } -func (s signalFxClient) Health() (int, error) { +func (s *signalFxClient) Health() (int, error) { return Ping(s.client, s.signalFxAddress) } -func (s signalFxClient) requestWithAuthToken(uri string) *http.Request { +func (s *signalFxClient) requestWithAuthToken(uri string) *http.Request { req, _ := http.NewRequest(http.MethodGet, uri, nil) req.Header.Set("X-SF-Token", s.authToken) req.Header.Set("Content-Type", "application/json") @@ -224,7 +247,7 @@ func addMetadata(metric *watcher.Metric, metricType string) { } } -func (s signalFxClient) buildMetricURL(hostFilter string, clusterFilter string, metric string, window *watcher.Window) (uri *url.URL, err error) { +func (s *signalFxClient) buildMetricURL(hostFilter string, clusterFilter string, metric string, window *watcher.Window) (uri *url.URL, err error) { uri, err = url.Parse(s.signalFxAddress + signalFxMetricsAPI) if err != nil { return nil, err @@ -245,7 +268,7 @@ func (s signalFxClient) buildMetricURL(hostFilter string, clusterFilter string, return } -func (s signalFxClient) buildMetadataURL(host string, clusterFilter string, metric string) (uri *url.URL, err error) { +func (s *signalFxClient) buildMetadataURL(host string, clusterFilter string, metric string) (uri *url.URL, err error) { uri, err = url.Parse(s.signalFxAddress + signalFxMetdataAPI) if err != nil { return nil, err @@ -346,7 +369,7 @@ Sample metaData payload: "created": 1614534848000, "creator": null, "dimensions": { - "host": "test.dev.com", + "[hostKey]": "test.dev.com", "sf_metric": null }, "id": "EvVH6P7BgAA", @@ -359,7 +382,7 @@ Sample metaData payload: ] } */ -func getMetricsFromPayloads(metricData interface{}, metadata interface{}) (map[string]watcher.Metric, error) { +func getMetricsFromPayloads(metricData interface{}, metadata interface{}, hostKey string) (map[string]watcher.Metric, error) { keyHostMap := make(map[string]string) hostMetricMap := make(map[string]watcher.Metric) if _, ok := metadata.(map[string]interface{}); !ok { @@ -396,7 +419,7 @@ func getMetricsFromPayloads(metricData interface{}, metadata interface{}) (map[s log.Errorf("type conversion failed, found %T", dimensions) continue } - host := dimensions.(map[string]interface{})["host"] + host := dimensions.(map[string]interface{})[hostKey] if host == nil { log.Errorf("no host found in %v", dimensions) continue