Skip to content

Commit

Permalink
Add missing support for histograms
Browse files Browse the repository at this point in the history
Signed-off-by: Jirka Kremser <[email protected]>
  • Loading branch information
jkremser committed Oct 23, 2024
1 parent 4edcbf5 commit 2a83cc3
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 35 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ ifeq ($(ARCH), x86_64)
endif
CGO ?=0
TARGET_OS ?=linux
LOCAL_ENDPOINT ?= host.k3d.internal

GO_BUILD_VARS= GO111MODULE=on CGO_ENABLED=$(CGO) GOOS=$(TARGET_OS) GOARCH=$(ARCH)

Expand Down Expand Up @@ -63,7 +62,7 @@ deploy-helm: ## Deploys helm chart with otel-collector and otel scaler.
@$(call say,Deploy helm chart to current k8s context)
cd helmchart/otel-add-on && \
helm dependency build && \
helm upgrade -i keda-otel .
helm upgrade -i kedify-otel .

.PHONY: logs
logs:
Expand Down
11 changes: 7 additions & 4 deletions dev.Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
include Makefile

LOCAL_ENDPOINT ?= host.k3d.internal
SO_NAME ?= otel-example

.PHONY: dev-k3d
dev-k3d: build-image ## Builds the container image for current arch, imports it to running k3d and restarts the scaler.
@$(call say,Doing the dev cycle)
k3d image import ghcr.io/kedify/otel-add-on:latest
helm upgrade --reuse-values \
keda-otel helmchart/otel-add-on \
kedify-otel helmchart/otel-add-on \
--set image.tag=latest \
--set image.pullPolicy=IfNotPresent \
--set settings.logs.logLvl=debug \
Expand All @@ -15,10 +18,10 @@ dev-k3d: build-image ## Builds the container image for current arch, imports it
dev-local: ## Prepare the SO and otel collector for local debug
@$(call say,Prepare the conditions for local debug)
helm upgrade --reuse-values \
keda-otel helmchart/otel-add-on \
kedify-otel helmchart/otel-add-on \
--set replicaCount=1 \
--set opentelemetry-collector.config.exporters.otlp.endpoint=$(LOCAL_ENDPOINT):4317
kubectl patch so otel-example --type=json -p '[{"op":"replace","path":"/spec/triggers/0/metadata/scalerAddress","value":"$(LOCAL_ENDPOINT):4318"}]'
kubectl patch so $(SO_NAME) --type=json -p '[{"op":"replace","path":"/spec/triggers/0/metadata/scalerAddress","value":"$(LOCAL_ENDPOINT):4318"}]'
@$(call say,Continue by running the scaler locally from your favorite IDE outsice of K8s)
@echo "Make sure $(LOCAL_ENDPOINT):4317 and $(LOCAL_ENDPOINT):4318 are listening.."

Expand All @@ -29,5 +32,5 @@ undo-dev-local: ## Revers the SO and otel collector for local debug
keda-otel helmchart/otel-add-on \
--set replicaCount=1 \
--set opentelemetry-collector.config.exporters.otlp.endpoint=keda-otel-scaler:4317
kubectl patch so otel-example --type=json -p '[{"op":"replace","path":"/spec/triggers/0/metadata/scalerAddress","value":"keda-otel-scaler:4318"}]'
kubectl patch so $(SO_NAME) --type=json -p '[{"op":"replace","path":"/spec/triggers/0/metadata/scalerAddress","value":"keda-otel-scaler:4318"}]'
kubectl scale deploy/otel-add-on-scaler --replicas=1
20 changes: 13 additions & 7 deletions examples/metric-pull/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ Note the following section in the helm chart values that configures the OTEL col
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: (.+)(?::\d+);(\d+)
replacement: $1:$2
...
```
We are adding one static target - the metrics from the OTEL collector itself, just for demo purposes, these
Expand All @@ -67,17 +72,18 @@ Create `ScaledObject`:
kubectl apply -f podinfo-so.yaml
```

Podinfo exposes some basic metrics and one of them is `http_request_duration_seconds` histogram. We can take the `http_request_duration_seconds_count`,
`Podinfo` exposes some basic metrics and one of them is `http_request_duration_seconds` histogram. We can take the `http_request_duration_seconds_count`,
which is a monotonic counter that increases with each request and turn it into the metric that will determine
how many replicas of pod we want.

how many replicas of pod we want. Scaler supports `rate` "function over time" similar to the
[one](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate) from PromQL.

Create some traffic. Podinfo has an endpoint that responds after a delay, in this case it's two seconds.
We will be calling 20 requests per seconds from each worker (4 of them) - so 80 req/s.
Finally, create some traffic. Podinfo has an endpoint that responds after a delay, in this case it's two seconds.
```bash
hey -n 5000 -c 4 -q 20 -z 70s http://localhost:8080/delay/2
hey -n 5000 -z 120s http://localhost:8080/delay/2
```

Observer how number of replicas of Podinfo deployment is reacting on the load.

```bash
watch kubectl get pods -A
```
```
31 changes: 25 additions & 6 deletions examples/metric-pull/collector-pull-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ opentelemetry-collector:
enabled: true
clusterRole:
create: true
# additional rbac for upstream otel collector to be able to find services w/ prometheus.io/scrape annotation
# not needed, when only static targets are used
rules:
- apiGroups:
- ''
Expand All @@ -30,6 +32,7 @@ opentelemetry-collector:
static_configs:
- targets: ['0.0.0.0:8888']
- job_name: k8s
scrape_interval: 5s
kubernetes_sd_configs:
- role: service
relabel_configs:
Expand All @@ -40,24 +43,35 @@ opentelemetry-collector:
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: (.+)(?::\d+);(\d+)
replacement: $1:$2
zipkin: null
jaeger: null
otlp: null
exporters:
otlp:
# endpoint: keda-otel-scaler:4317
endpoint: 192.168.84.98:4317
compression: "none"
tls:
insecure: true
debug:
verbosity: detailed
processors:
filter/ottl:
error_mode: ignore
metrics:
# runtime/service_invocation/req_sent_total
metric:
- | # drop all other metrics that are not whitelisted here
name != "http_request_duration_seconds_count"
- resource.attributes["method"] == "GET"
- resource.attributes["path"] == "delay"
name != "http_request_duration_seconds"
# - resource.attributes["method"] == "GET"
# - resource.attributes["path"] == "delay"

service:
telemetry:
metrics:
address: 0.0.0.0:8888
pipelines:
traces: null
logs: null
Expand All @@ -69,3 +83,8 @@ opentelemetry-collector:
exporters:
- debug
- otlp
telemetry:
logs:
level: DEBUG
metrics:
address: 0.0.0.0:8888
3 changes: 2 additions & 1 deletion examples/metric-pull/podinfo-so.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ spec:
scalerAddress: "keda-otel-scaler.default.svc:4318"
metricQuery: "avg(http_request_duration_seconds_count{path=delay, method=GET, status=200})"
operationOverTime: "rate"
targetValue: "20"
targetValue: "10"
clampMax: "600"
minReplicaCount: 1
advanced:
horizontalPodAutoscalerConfig:
behavior:
Expand Down
2 changes: 1 addition & 1 deletion examples/metric-pull/podinfo-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ service:
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"

prometheus.io/port: "9898"
67 changes: 53 additions & 14 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

const countSuffix = "_count"

// otlpReceiver is the type that exposes Trace and Metrics reception.
type otlpReceiver struct {
cfg *otlpreceiver.Config
Expand Down Expand Up @@ -165,27 +167,48 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
for k := 0; k < mLen; k++ {
r.p("- name: %+v\n", metrics.At(k).Name())
r.p(" type: %+v\n", metrics.At(k).Type())
metricName := metrics.At(k).Name()
var dataPoints pmetric.NumberDataPointSlice
switch metrics.At(k).Type() {
case pmetric.MetricTypeGauge:
dataPoints = metrics.At(k).Gauge().DataPoints()
r.storeDatapoints(metricName, dataPoints)
case pmetric.MetricTypeSum:
dataPoints = metrics.At(k).Sum().DataPoints()
r.storeDatapoints(metricName, dataPoints)
case pmetric.MetricTypeHistogram:
histograms := metrics.At(k).Histogram().DataPoints()
for m := 0; m < histograms.Len(); m++ {
histogram := histograms.At(m)
r.p(" - time: %+v\n", histogram.Timestamp())
r.p(" tags: %+v\n", histogram.Attributes().AsRaw())
r.p(" count: %+v\n", histogram.Count())
r.p(" sum: %+v\n", histogram.Sum())
r.metricMemStore.Put(types.NewMetricEntry{
Name: types.MetricName(metrics.At(k).Name() + countSuffix),
MeasurementValue: float64(histogram.Count()),
MeasurementTime: histogram.Timestamp(),
Labels: histogram.Attributes().AsRaw(),
})
}
case pmetric.MetricTypeExponentialHistogram:
exHistograms := metrics.At(k).ExponentialHistogram().DataPoints()
for m := 0; m < exHistograms.Len(); m++ {
exHistogram := exHistograms.At(m)
r.p(" - time: %+v\n", exHistogram.Timestamp())
r.p(" tags: %+v\n", exHistogram.Attributes().AsRaw())
r.p(" count: %+v\n", exHistogram.Count())
r.p(" sum: %+v\n", exHistogram.Sum())
r.metricMemStore.Put(types.NewMetricEntry{
Name: types.MetricName(metrics.At(k).Name() + countSuffix),
MeasurementValue: float64(exHistogram.Count()),
MeasurementTime: exHistogram.Timestamp(),
Labels: exHistogram.Attributes().AsRaw(),
})
}
default:
// ignore others
}
for l := 0; l < dataPoints.Len(); l++ {
datapoint := dataPoints.At(l)
r.p(" - time: %+v\n", datapoint.Timestamp())
r.p(" tags: %+v\n", datapoint.Attributes().AsRaw())
value := math.Max(datapoint.DoubleValue(), float64(datapoint.IntValue()))
r.p(" value: %+v\n", value)
r.metricMemStore.Put(types.NewMetricEntry{
Name: types.MetricName(metrics.At(k).Name()),
MeasurementValue: value,
MeasurementTime: datapoint.Timestamp(),
Labels: datapoint.Attributes().AsRaw(),
})
// ignore others (MetricTypeEmpty & MetricTypeSummary)
return pmetricotlp.NewExportResponse(), nil
}
}
}
Expand All @@ -208,6 +231,22 @@ func (r *Receiver) p(format string, a ...any) {
}
}

func (r *Receiver) storeDatapoints(name string, dataPoints pmetric.NumberDataPointSlice) {
for l := 0; l < dataPoints.Len(); l++ {
datapoint := dataPoints.At(l)
r.p(" - time: %+v\n", datapoint.Timestamp())
r.p(" tags: %+v\n", datapoint.Attributes().AsRaw())
value := math.Max(datapoint.DoubleValue(), float64(datapoint.IntValue()))
r.p(" value: %+v\n", value)
r.metricMemStore.Put(types.NewMetricEntry{
Name: types.MetricName(name),
MeasurementValue: value,
MeasurementTime: datapoint.Timestamp(),
Labels: datapoint.Attributes().AsRaw(),
})
}
}

func GetStatusFromError(err error) error {
s, ok := status.FromError(err)
if !ok {
Expand Down

0 comments on commit 2a83cc3

Please sign in to comment.