Skip to content

Commit

Permalink
[elasticsearchexporter] [chore] hasher interface
Browse files Browse the repository at this point in the history
Extract a dataPointHasher interface from the mappingModel
interface, so we can make the latter purely about encoding,
and separate concerns.
  • Loading branch information
axw committed Feb 7, 2025
1 parent 499df64 commit f78a07b
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 111 deletions.
5 changes: 4 additions & 1 deletion exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func (e *elasticsearchExporter) pushMetricsData(
ctx context.Context,
metrics pmetric.Metrics,
) error {
mappingMode := e.config.MappingMode()
hasher := newDataPointHasher(mappingMode)

e.wg.Add(1)
defer e.wg.Done()

Expand Down Expand Up @@ -239,7 +242,7 @@ func (e *elasticsearchExporter) pushMetricsData(
groupedDataPoints = make(map[uint32]*dataPointsGroup)
groupedDataPointsByIndex[fIndex] = groupedDataPoints
}
dpHash := e.model.hashDataPoint(dp)
dpHash := hasher.hashDataPoint(dp)
dpGroup, ok := groupedDataPoints[dpHash]
if !ok {
groupedDataPoints[dpHash] = &dataPointsGroup{
Expand Down
128 changes: 128 additions & 0 deletions exporter/elasticsearchexporter/metric_grouping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"encoding/binary"
"hash"
"hash/fnv"
"math"
"slices"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
"go.opentelemetry.io/collector/pdata/pcommon"
)

// dataPointHasher is an interface for hashing data points by their identity,
// for grouping into a single document.
type dataPointHasher interface {
hashDataPoint(datapoints.DataPoint) uint32
}

func newDataPointHasher(mode MappingMode) dataPointHasher {
switch mode {
case MappingOTel:
return otelDataPointHasher{}
default:
// // Defaults to ECS for backward compatibility
return ecsDataPointHasher{}
}
}

// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity

type ecsDataPointHasher struct{}
type otelDataPointHasher struct{}

func (h ecsDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
hasher := fnv.New32a()

timestampBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
hasher.Write(timestampBuf)

mapHashExcludeReservedAttrs(hasher, dp.Attributes())

return hasher.Sum32()
}

func (h otelDataPointHasher) hashDataPoint(dp datapoints.DataPoint) uint32 {
hasher := fnv.New32a()

timestampBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
hasher.Write(timestampBuf)

binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp()))
hasher.Write(timestampBuf)

hasher.Write([]byte(dp.Metric().Unit()))

mapHashExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey)

return hasher.Sum32()
}

// mapHashExcludeReservedAttrs is mapHash but ignoring some reserved attributes.
// e.g. index is already considered during routing and DS attributes do not need to be considered in hashing
func mapHashExcludeReservedAttrs(hasher hash.Hash, m pcommon.Map, extra ...string) {
m.Range(func(k string, v pcommon.Value) bool {
switch k {
case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace:
return true
}
if slices.Contains(extra, k) {
return true
}
hasher.Write([]byte(k))
valueHash(hasher, v)

return true
})
}

func mapHash(hasher hash.Hash, m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
hasher.Write([]byte(k))
valueHash(hasher, v)

return true
})
}

func valueHash(h hash.Hash, v pcommon.Value) {
switch v.Type() {
case pcommon.ValueTypeEmpty:
h.Write([]byte{0})
case pcommon.ValueTypeStr:
h.Write([]byte(v.Str()))
case pcommon.ValueTypeBool:
if v.Bool() {
h.Write([]byte{1})
} else {
h.Write([]byte{0})
}
case pcommon.ValueTypeDouble:
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double()))
h.Write(buf)
case pcommon.ValueTypeInt:
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(v.Int()))
h.Write(buf)
case pcommon.ValueTypeBytes:
h.Write(v.Bytes().AsRaw())
case pcommon.ValueTypeMap:
mapHash(h, v.Map())
case pcommon.ValueTypeSlice:
sliceHash(h, v.Slice())
}
}

func sliceHash(h hash.Hash, s pcommon.Slice) {
for i := 0; i < s.Len(); i++ {
valueHash(h, s.At(i))
}
}
109 changes: 0 additions & 109 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry

import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash"
"hash/fnv"
"math"
"slices"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -80,7 +75,6 @@ type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer)
hashDataPoint(datapoints.DataPoint) uint32
encodeDocument(objmodel.Document, *bytes.Buffer) error
encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error)
}
Expand Down Expand Up @@ -203,17 +197,6 @@ func (m *encodeModel) encodeDocument(document objmodel.Document, buf *bytes.Buff
return nil
}

// upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index
func (m *encodeModel) hashDataPoint(dp datapoints.DataPoint) uint32 {
switch m.mode {
case MappingOTel:
return metricOTelHash(dp, dp.Metric().Unit())
default:
// Defaults to ECS for backward compatibility
return metricECSHash(dp.Timestamp(), dp.Attributes())
}
}

func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
dp0 := dataPoints[0]
var document objmodel.Document
Expand Down Expand Up @@ -457,95 +440,3 @@ func encodeLogTimestampECSMode(document *objmodel.Document, record plog.LogRecor

document.AddTimestamp("@timestamp", record.ObservedTimestamp())
}

// TODO use https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/exp/metrics/identity
func metricECSHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 {
hasher := fnv.New32a()

timestampBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampBuf, uint64(timestamp))
hasher.Write(timestampBuf)

mapHashExcludeReservedAttrs(hasher, attributes)

return hasher.Sum32()
}

func metricOTelHash(dp datapoints.DataPoint, unit string) uint32 {
hasher := fnv.New32a()

timestampBuf := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.Timestamp()))
hasher.Write(timestampBuf)

binary.LittleEndian.PutUint64(timestampBuf, uint64(dp.StartTimestamp()))
hasher.Write(timestampBuf)

hasher.Write([]byte(unit))

mapHashExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey)

return hasher.Sum32()
}

// mapHashExcludeReservedAttrs is mapHash but ignoring some reserved attributes.
// e.g. index is already considered during routing and DS attributes do not need to be considered in hashing
func mapHashExcludeReservedAttrs(hasher hash.Hash, m pcommon.Map, extra ...string) {
m.Range(func(k string, v pcommon.Value) bool {
switch k {
case elasticsearch.DataStreamType, elasticsearch.DataStreamDataset, elasticsearch.DataStreamNamespace:
return true
}
if slices.Contains(extra, k) {
return true
}
hasher.Write([]byte(k))
valueHash(hasher, v)

return true
})
}

func mapHash(hasher hash.Hash, m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
hasher.Write([]byte(k))
valueHash(hasher, v)

return true
})
}

func valueHash(h hash.Hash, v pcommon.Value) {
switch v.Type() {
case pcommon.ValueTypeEmpty:
h.Write([]byte{0})
case pcommon.ValueTypeStr:
h.Write([]byte(v.Str()))
case pcommon.ValueTypeBool:
if v.Bool() {
h.Write([]byte{1})
} else {
h.Write([]byte{0})
}
case pcommon.ValueTypeDouble:
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, math.Float64bits(v.Double()))
h.Write(buf)
case pcommon.ValueTypeInt:
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(v.Int()))
h.Write(buf)
case pcommon.ValueTypeBytes:
h.Write(v.Bytes().AsRaw())
case pcommon.ValueTypeMap:
mapHash(h, v.Map())
case pcommon.ValueTypeSlice:
sliceHash(h, v.Slice())
}
}

func sliceHash(h hash.Hash, s pcommon.Slice) {
for i := 0; i < s.Len(); i++ {
valueHash(h, s.At(i))
}
}
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestEncodeMetric(t *testing.T) {
dedot: true,
mode: MappingECS,
}
hasher := newDataPointHasher(model.mode)

groupedDataPoints := make(map[uint32][]datapoints.DataPoint)

Expand All @@ -114,7 +115,7 @@ func TestEncodeMetric(t *testing.T) {
dps := m.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := datapoints.NewNumber(m, dps.At(i))
dpHash := model.hashDataPoint(dp)
dpHash := hasher.hashDataPoint(dp)
dataPoints, ok := groupedDataPoints[dpHash]
if !ok {
groupedDataPoints[dpHash] = []datapoints.DataPoint{dp}
Expand Down

0 comments on commit f78a07b

Please sign in to comment.