Skip to content
This repository was archived by the owner on Oct 25, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@ If you do not provide a configuration file, the exporter creates one from the pr

#### Rule configuration

| Key | Description |
|--------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.metrics | List of metrics to gather |
| Key | Description |
|---------------------------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.excludedTopics | Optional list of topics to exclude from the query |
| rules.topicMetricsLocalFilterRegex | Optional regex to filter the result of the query locally. Anything matching the regex will NOT be included. |
| rules.metrics | List of metrics to gather |


### Examples of configuration files

Expand Down
4 changes: 2 additions & 2 deletions cmd/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CCloudCollectorMetric struct {
global bool
}

// CCloudCollector is a custom prometheu collector to collect data from
// CCloudCollector is a custom prometheus collector to collect data from
// Confluent Cloud Metrics API
type CCloudCollector struct {
metrics map[string]CCloudCollectorMetric
Expand Down Expand Up @@ -58,7 +58,7 @@ func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) {
}

// NewCCloudCollector creates a new instance of the collector
// During the creation, we invoke the descriptor endpoint to fetcha all
// During the creation, we invoke the descriptor endpoint to fetch all
// existing metrics and their labels
func NewCCloudCollector() CCloudCollector {

Expand Down
20 changes: 19 additions & 1 deletion cmd/internal/collector/collector_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cc KafkaCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.Wai
// CollectMetricsForRule collects all metrics for a specific rule
func (cc KafkaCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) {
defer wg.Done()
query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics, cc.resource)
query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics, rule.ExcludeTopics, cc.resource)
log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created")
optimizedQuery, additionalLabels := OptimizeQuery(query)
log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized")
Expand All @@ -76,6 +76,14 @@ func (cc KafkaCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan

func (cc KafkaCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) {
desc := ccmetric.desc


if len(response.Data) == 1000 {
log.Warn("The query returned the maximum amount of data points allowed (1000), " +
"you might be missing some data. Try further filtering your ccloudexporter.")
}

METRICSLOOP:
for _, dataPoint := range response.Data {
// Some data points might need to be ignored if it is the global query
topic, topicPresent := dataPoint["metric.topic"].(string)
Expand All @@ -93,6 +101,16 @@ func (cc KafkaCCloudCollector) handleResponse(response QueryResponse, ccmetric C
continue
}


if topicPresent {
for _, currentRegex := range RegexList {
if currentRegex.MatchString(topic) {
continue METRICSLOOP
}
}
}


value, ok := dataPoint["value"].(float64)
if !ok {
log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float")
Expand Down
91 changes: 91 additions & 0 deletions cmd/internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,97 @@ func TestHandleResponse(t *testing.T) {

collector.handleResponse(response, metric, pchan, rule, make(map[string]string))

if len(pchan) != 2 {
t.Errorf("Invalid number of metrics returned, expected 2 got %d", len(pchan))
t.Fail()
return
}

}

func TestHandleResponseForRegexFiltering(t *testing.T) {
metric := CCloudCollectorMetric{
labels: []string{"topic", "kafka_id"},
metric: MetricDescription{Name: "metric"},
desc: prometheus.NewDesc("metric", "help", []string{"topic", "kafka_id"}, nil),
}

var rule = Rule{
id: 0,
Topics: []string{"topic"},
Clusters: []string{"cluster"},
Metrics: []string{"metric", "metric2"},
GroupByLabels: []string{"topic", "kafka_id"},
ExcludeTopicsRegex: []string{"excludedTopic*", "excludedThing*"},
}

// Compile the Regex
Context.Granularity = "PT1M"
Context.Rules = []Rule{rule}
validateConfiguration()

collector := KafkaCCloudCollector{
metrics: map[string]CCloudCollectorMetric{
"metric": metric,
},
resource: ResourceDescription{
Type: "kafka",
Labels: []MetricLabel{
{
Key: "kafka.id",
},
},
},
}

responseString := `
{
"data": [
{
"resource.kafka.id": "cluster",
"metric.topic": "topic",
"timestamp": "2020-06-03T13:37:00Z",
"value": 1.0
},
{
"resource.kafka.id": "cluster",
"metric.topic": "topic2",
"timestamp": "2020-06-03T13:37:00Z",
"value": 1.0
},
{
"resource.kafka.id": "cluster",
"metric.topic": "excludedTopicA",
"timestamp": "2020-06-03T13:37:00Z",
"value": 1.0
},
{
"resource.kafka.id": "cluster",
"metric.topic": "excludedThingB",
"timestamp": "2020-06-03T13:37:00Z",
"value": 1.0
}
]
}`

responseBytes, err := ioutil.ReadAll(strings.NewReader(responseString))
if err != nil {
t.Errorf(err.Error())
t.Fail()
return
}

response := QueryResponse{}
err = json.Unmarshal(responseBytes, &response)
if err != nil {
t.Errorf(err.Error())
t.Fail()
}

pchan := make(chan prometheus.Metric, 10)

collector.handleResponse(response, metric, pchan, rule, make(map[string]string))

if len(pchan) != 2 {
t.Errorf("Invalid number of metrics returned, expected 2 got %d", len(pchan))
t.Fail()
Expand Down
11 changes: 10 additions & 1 deletion cmd/internal/collector/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type ExporterContext struct {
// should collect for a specific set of topics or clusters
type Rule struct {
Topics []string `mapstructure:"topics"`
ExcludeTopics []string `mapstructure:"ExcludeTopics"`
ExcludeTopicsRegex []string `mapstructure:"topicMetricsLocalFilterRegex"`
Clusters []string `mapstructure:"clusters"`
Connectors []string `mapstructure:"connectors"`
Ksql []string `mapstructure:"ksqls"`
Expand All @@ -45,7 +47,7 @@ type TopicClusterMetric struct {
// Version is the git short SHA1 hash provided at build time
var Version = "homecooked"

// Context is the global variable defining the context for the expoter
// Context is the global variable defining the context for the exporter
var Context = ExporterContext{}

// DefaultGroupingLabels is the default value for groupBy.labels
Expand Down Expand Up @@ -74,6 +76,13 @@ var DefaultMetrics = []string{
"io.confluent.kafka.ksql/streaming_unit_count",
}

var NonTopicFilterMetrics = map[string]struct{}{
"io.confluent.kafka.server/active_connection_count" : {},
"io.confluent.kafka.server/request_count" : {},
"io.confluent.kafka.server/partition_count" : {},
"io.confluent.kafka.server/successful_authentication_count" : {},
}

// GetMapOfMetrics returns the whitelist of metrics in a map
// where the key is the metric and the value is true if it is comming from an override
func (context ExporterContext) GetMapOfMetrics(prefix string) map[string]bool {
Expand Down
17 changes: 17 additions & 0 deletions cmd/internal/collector/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ package collector
import (
"flag"
"os"
"regexp"
"strings"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

var supportedGranularity = []string{"PT1M", "PT5M", "PT15M", "PT30M", "PT1H"}
var RegexList = make([]*regexp.Regexp, 0)

// ParseOption parses options provided by the CLI and the configuration file
// This function will panic if the options are invalid
Expand Down Expand Up @@ -130,6 +132,21 @@ func validateConfiguration() {
if len(rule.GroupByLabels) == 0 {
log.Fatalln("Labels is required while defining a rule")
}

// Fail if a filter both includes and excludes a topic
if len(rule.Topics) > 0 && len(rule.ExcludeTopics) > 0 {
for _, inTopic := range rule.Topics {
if !contains(rule.ExcludeTopics, inTopic) {
log.Fatalf("You cannot both include and exclude topic: %s", inTopic)
}
}
}

for _, currentRegex := range rule.ExcludeTopicsRegex {
didCompile := regexp.MustCompile(currentRegex)
RegexList = append(RegexList, didCompile)
}

}
}

Expand Down
60 changes: 44 additions & 16 deletions cmd/internal/collector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import log "github.com/sirupsen/logrus"
// This is the JSON structure for the endpoint
// https://api.telemetry.confluent.cloud/v1/metrics/cloud/descriptors
type Query struct {
Aggreations []Aggregation `json:"aggregations"`
Filter FilterHeader `json:"filter"`
Granularity string `json:"granularity"`
GroupBy []string `json:"group_by"`
Intervals []string `json:"intervals"`
Limit int `json:"limit"`
Aggregations []Aggregation `json:"aggregations"`
Filter FilterHeader `json:"filter"`
Granularity string `json:"granularity"`
GroupBy []string `json:"group_by"`
Intervals []string `json:"intervals"`
Limit int `json:"limit"`
}

// Aggregation for a Confluent Cloud API metric
Expand All @@ -44,10 +44,11 @@ type FilterHeader struct {

// Filter structure
type Filter struct {
Field string `json:"field,omitempty"`
Op string `json:"op"`
Value string `json:"value,omitempty"`
Filters []Filter `json:"filters,omitempty"`
Field string `json:"field,omitempty"`
Op string `json:"op"`
Value string `json:"value,omitempty"`
Filters []Filter `json:"filters,omitempty"`
UnaryFilter *Filter `json:"filter,omitempty"`
}

// QueryResponse from the cloud endpoint
Expand All @@ -67,12 +68,12 @@ type QueryResponse struct {
// }

var (
queryURI = "/v2/metrics/cloud/query"
queryURI = "v2/metrics/cloud/query"
)

// BuildQuery creates a new Query for a metric for a specific cluster and time interval
// This function will return the main global query, override queries will not be generated
func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []string, topicFiltering []string, resource ResourceDescription) Query {
func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []string, topicFiltering []string, excludeTopics []string, resource ResourceDescription) Query {
timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized
timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay

Expand Down Expand Up @@ -112,6 +113,33 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str
})
}

// Exclude topics filter, this isn't necessary if a list of topics is already defined
// A list of topics provided is by definition filtering other non-wanted topics
if len(topicFiltering) == 0 && len(excludeTopics) != 0 {
_ , contains := NonTopicFilterMetrics[metric.Name]
if !contains {
excludeTopicFilters := []Filter{}
for _, exTopic := range excludeTopics {
excludeFilter := Filter{
Field: "metric.topic",
Op: "EQ",
Value: exTopic,
}
wrapperNotFilter := Filter{
Op: "NOT",
UnaryFilter: &excludeFilter,
}
excludeTopicFilters = append(excludeTopicFilters, wrapperNotFilter)
}
if len(excludeTopicFilters) > 0 {
filters = append(filters, Filter{
Op: "AND",
Filters: excludeTopicFilters,
})
}
}
}

filterHeader := FilterHeader{
Op: "AND",
Filters: filters,
Expand All @@ -133,7 +161,7 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str
}

return Query{
Aggreations: []Aggregation{aggregation},
Aggregations: []Aggregation{aggregation},
Filter: filterHeader,
Granularity: Context.Granularity,
GroupBy: groupBy,
Expand Down Expand Up @@ -180,7 +208,7 @@ func BuildConnectorsQuery(metric MetricDescription, connectors []string, resourc
}

return Query{
Aggreations: []Aggregation{aggregation},
Aggregations: []Aggregation{aggregation},
Filter: filterHeader,
Granularity: Context.Granularity,
GroupBy: groupBy,
Expand Down Expand Up @@ -225,9 +253,9 @@ func BuildKsqlQuery(metric MetricDescription, ksqlAppIds []string, resource Reso
for i, rsrcLabel := range resource.Labels {
groupBy[i] = "resource." + rsrcLabel.Key
}

return Query{
Aggreations: []Aggregation{aggregation},
Aggregations: []Aggregation{aggregation},
Filter: filterHeader,
Granularity: Context.Granularity,
GroupBy: groupBy,
Expand Down
Loading