From fbf0a3ad751346f7cb6deff1d9067e3a8ece72f4 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 7 Feb 2025 12:29:29 +0800 Subject: [PATCH] [elasticsearchexporter] [chore] refactor routing Create documentRouter interface which encapsulates all logic related to document routing. --- .../data_stream_router.go | 203 ++++++++++++------ .../data_stream_router_test.go | 22 +- exporter/elasticsearchexporter/exporter.go | 133 +++++------- exporter/elasticsearchexporter/model_test.go | 6 +- 4 files changed, 213 insertions(+), 151 deletions(-) diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 584048a2c2e0..64ee4b588cbd 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -7,6 +7,7 @@ import ( "fmt" "regexp" "strings" + "time" "unicode" "go.opentelemetry.io/collector/pdata/pcommon" @@ -41,74 +42,154 @@ func sanitizeDataStreamField(field, disallowed, appendSuffix string) string { return field } -func routeWithDefaults(defaultDSType string) func( - pcommon.Map, - pcommon.Map, - pcommon.Map, - string, - bool, - string, -) elasticsearch.Index { - return func( - recordAttr pcommon.Map, - scopeAttr pcommon.Map, - resourceAttr pcommon.Map, - fIndex string, - otel bool, - scopeName string, - ) elasticsearch.Index { - // Order: - // 1. read data_stream.* from attributes - // 2. read elasticsearch.index.* from attributes - // 3. receiver-based routing - // 4. use default hardcoded data_stream.* - dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) - namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) - dataStreamMode := datasetExists || namespaceExists - if !dataStreamMode { - prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) - suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) - if prefixExists || suffixExists { - return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)} - } - } +// documentRouter is an interface for routing records to the appropriate +// index or data stream. The router may mutate record attributes. +type documentRouter interface { + routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) + routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) + routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) + routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) +} - // Receiver-based routing - // For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) - // for the scope name - // github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper - if submatch := receiverRegex.FindStringSubmatch(scopeName); len(submatch) > 0 { - receiverName := submatch[1] - dataset = receiverName +func newDocumentRouter(mode MappingMode, dynamicIndex bool, defaultIndex string, cfg *Config) documentRouter { + var router documentRouter + if dynamicIndex { + router = dynamicDocumentRouter{ + index: elasticsearch.Index{Index: defaultIndex}, + otel: mode == MappingOTel, } - - // For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". - // This is in order to match the built-in logs-*.otel-* index template. - var datasetSuffix string - if otel { - datasetSuffix += ".otel" + } else { + router = staticDocumentRouter{ + index: elasticsearch.Index{Index: defaultIndex}, } + } + if cfg.LogstashFormat.Enabled { + router = logstashDocumentRouter{inner: router, logstashFormat: cfg.LogstashFormat} + } + return router +} + +type staticDocumentRouter struct { + index elasticsearch.Index +} + +func (r staticDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(resource, scope, recordAttrs) +} + +func (r staticDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(resource, scope, recordAttrs) +} + +func (r staticDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(resource, scope, recordAttrs) +} + +func (r staticDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(resource, scope, recordAttrs) +} + +func (r staticDocumentRouter) route(_ pcommon.Resource, _ pcommon.InstrumentationScope, _ pcommon.Map) (elasticsearch.Index, error) { + return r.index, nil +} + +type dynamicDocumentRouter struct { + index elasticsearch.Index + otel bool +} - dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix) - namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "") - return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace) +func (r dynamicDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil +} + +func (r dynamicDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeMetrics), nil +} + +func (r dynamicDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeTraces), nil +} + +func (r dynamicDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil +} + +type logstashDocumentRouter struct { + inner documentRouter + logstashFormat LogstashFormatSettings +} + +func (r logstashDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(r.inner.routeLogRecord(resource, scope, recordAttrs)) +} + +func (r logstashDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(r.inner.routeDataPoint(resource, scope, recordAttrs)) +} + +func (r logstashDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(r.inner.routeSpan(resource, scope, recordAttrs)) +} + +func (r logstashDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) { + return r.route(r.inner.routeSpanEvent(resource, scope, recordAttrs)) +} + +func (r logstashDocumentRouter) route(index elasticsearch.Index, err error) (elasticsearch.Index, error) { + if err != nil { + return elasticsearch.Index{}, err } + formattedIndex, err := generateIndexWithLogstashFormat(index.Index, &r.logstashFormat, time.Now()) + if err != nil { + return elasticsearch.Index{}, err + } + return elasticsearch.Index{Index: formattedIndex}, nil } -var ( - // routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes. - // This function may mutate record attributes. - routeLogRecord = routeWithDefaults(defaultDataStreamTypeLogs) +func routeRecord( + resource pcommon.Resource, + scope pcommon.InstrumentationScope, + recordAttr pcommon.Map, + index string, + otel bool, + defaultDSType string, +) elasticsearch.Index { + resourceAttr := resource.Attributes() + scopeAttr := scope.Attributes() + + // Order: + // 1. read data_stream.* from attributes + // 2. read elasticsearch.index.* from attributes + // 3. receiver-based routing + // 4. use default hardcoded data_stream.* + dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) + namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) + dataStreamMode := datasetExists || namespaceExists + if !dataStreamMode { + prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) + suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) + if prefixExists || suffixExists { + return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)} + } + } - // routeDataPoint returns the name of the index to send the data point to according to data stream routing related attributes. - // This function may mutate record attributes. - routeDataPoint = routeWithDefaults(defaultDataStreamTypeMetrics) + // Receiver-based routing + // For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode) + // for the scope name + // github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper + if submatch := receiverRegex.FindStringSubmatch(scope.Name()); len(submatch) > 0 { + receiverName := submatch[1] + dataset = receiverName + } - // routeSpan returns the name of the index to send the span to according to data stream routing related attributes. - // This function may mutate record attributes. - routeSpan = routeWithDefaults(defaultDataStreamTypeTraces) + // For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". + // This is in order to match the built-in logs-*.otel-* index template. + var datasetSuffix string + if otel { + datasetSuffix += ".otel" + } - // routeSpanEvent returns the name of the index to send the span event to according to data stream routing related attributes. - // This function may mutate record attributes. - routeSpanEvent = routeWithDefaults(defaultDataStreamTypeLogs) -) + dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix) + namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "") + return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace) +} diff --git a/exporter/elasticsearchexporter/data_stream_router_test.go b/exporter/elasticsearchexporter/data_stream_router_test.go index 20993d230d7c..13d2bf0ec26b 100644 --- a/exporter/elasticsearchexporter/data_stream_router_test.go +++ b/exporter/elasticsearchexporter/data_stream_router_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" @@ -70,7 +71,12 @@ func TestRouteLogRecord(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeLogRecord(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) + router := dynamicDocumentRouter{otel: tc.otel} + scope := pcommon.NewInstrumentationScope() + scope.SetName(tc.scopeName) + + ds, err := router.routeLogRecord(pcommon.NewResource(), scope, pcommon.NewMap()) + require.NoError(t, err) assert.Equal(t, tc.want, ds) }) } @@ -81,7 +87,12 @@ func TestRouteDataPoint(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeDataPoint(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) + router := dynamicDocumentRouter{otel: tc.otel} + scope := pcommon.NewInstrumentationScope() + scope.SetName(tc.scopeName) + + ds, err := router.routeDataPoint(pcommon.NewResource(), scope, pcommon.NewMap()) + require.NoError(t, err) assert.Equal(t, tc.want, ds) }) } @@ -92,7 +103,12 @@ func TestRouteSpan(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ds := routeSpan(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName) + router := dynamicDocumentRouter{otel: tc.otel} + scope := pcommon.NewInstrumentationScope() + scope.SetName(tc.scopeName) + + ds, err := router.routeSpan(pcommon.NewResource(), scope, pcommon.NewMap()) + require.NoError(t, err) assert.Equal(t, tc.want, ds) }) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 56ec493e7086..e35c60087b3f 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -9,7 +9,6 @@ import ( "fmt" "runtime" "sync" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" @@ -28,12 +27,10 @@ type elasticsearchExporter struct { component.TelemetrySettings userAgent string - config *Config - index string - logstashFormat LogstashFormatSettings - dynamicIndex bool - model mappingModel - otel bool + config *Config + index string + dynamicIndex bool + model mappingModel wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer @@ -52,8 +49,6 @@ func newExporter( mode: cfg.MappingMode(), } - otel := model.mode == MappingOTel - userAgent := fmt.Sprintf( "%s/%s (%s/%s)", set.BuildInfo.Description, @@ -66,13 +61,11 @@ func newExporter( TelemetrySettings: set.TelemetrySettings, userAgent: userAgent, - config: cfg, - index: index, - dynamicIndex: dynamicIndex, - model: model, - logstashFormat: cfg.LogstashFormat, - otel: otel, - bufferPool: pool.NewBufferPool(), + config: cfg, + index: index, + dynamicIndex: dynamicIndex, + model: model, + bufferPool: pool.NewBufferPool(), } } @@ -110,6 +103,9 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { } func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { + mappingMode := e.config.MappingMode() + router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + e.wg.Add(1) defer e.wg.Done() @@ -130,7 +126,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) scope := ill.Scope() logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil { + if err := e.pushLogRecord(ctx, router, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -157,6 +153,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) func (e *elasticsearchExporter) pushLogRecord( ctx context.Context, + router documentRouter, resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, @@ -164,29 +161,20 @@ func (e *elasticsearchExporter) pushLogRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := elasticsearch.Index{Index: e.index} - if e.dynamicIndex { - fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) - } - - if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) - if err != nil { - return err - } - fIndex = elasticsearch.Index{Index: formattedIndex} + index, err := router.routeLogRecord(resource, scope, record.Attributes()) + if err != nil { + return err } buf := e.bufferPool.NewPooledBuffer() docID := e.extractDocumentIDAttribute(record.Attributes()) - err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, fIndex, buf.Buffer) - if err != nil { + if err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, index, buf.Buffer); err != nil { buf.Recycle() return fmt.Errorf("failed to encode log event: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, docID, buf, nil) + return bulkIndexerSession.Add(ctx, index.Index, docID, buf, nil) } type dataPointsGroup struct { @@ -205,6 +193,9 @@ func (e *elasticsearchExporter) pushMetricsData( ctx context.Context, metrics pmetric.Metrics, ) error { + mappingMode := e.config.MappingMode() + router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + e.wg.Add(1) defer e.wg.Done() @@ -230,14 +221,14 @@ func (e *elasticsearchExporter) pushMetricsData( metric := scopeMetrics.Metrics().At(k) upsertDataPoint := func(dp datapoints.DataPoint) error { - fIndex, err := e.getMetricDataPointIndex(resource, scope, dp) + index, err := router.routeDataPoint(resource, scope, dp.Attributes()) if err != nil { return err } - groupedDataPoints, ok := groupedDataPointsByIndex[fIndex] + groupedDataPoints, ok := groupedDataPointsByIndex[index] if !ok { groupedDataPoints = make(map[uint32]*dataPointsGroup) - groupedDataPointsByIndex[fIndex] = groupedDataPoints + groupedDataPointsByIndex[index] = groupedDataPoints } dpHash := e.model.hashDataPoint(dp) dpGroup, ok := groupedDataPoints[dpHash] @@ -312,17 +303,20 @@ func (e *elasticsearchExporter) pushMetricsData( } } - for fIndex, groupedDataPoints := range groupedDataPointsByIndex { + for index, groupedDataPoints := range groupedDataPointsByIndex { for _, dpGroup := range groupedDataPoints { buf := e.bufferPool.NewPooledBuffer() dynamicTemplates, err := e.model.encodeMetrics( - dpGroup.resource, dpGroup.resourceSchemaURL, dpGroup.scope, dpGroup.scopeSchemaURL, dpGroup.dataPoints, &validationErrs, fIndex, buf.Buffer) + dpGroup.resource, dpGroup.resourceSchemaURL, + dpGroup.scope, dpGroup.scopeSchemaURL, + dpGroup.dataPoints, &validationErrs, index, buf.Buffer, + ) if err != nil { buf.Recycle() errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates); err != nil { + if err := session.Add(ctx, index.Index, "", buf, dynamicTemplates); err != nil { // not recycling after Add returns an error as we don't know if it's already recycled if cerr := ctx.Err(); cerr != nil { return cerr @@ -344,30 +338,13 @@ func (e *elasticsearchExporter) pushMetricsData( return errors.Join(errs...) } -func (e *elasticsearchExporter) getMetricDataPointIndex( - resource pcommon.Resource, - scope pcommon.InstrumentationScope, - dataPoint datapoints.DataPoint, -) (elasticsearch.Index, error) { - fIndex := elasticsearch.Index{Index: e.index} - if e.dynamicIndex { - fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) - } - - if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) - if err != nil { - return elasticsearch.Index{}, err - } - fIndex = elasticsearch.Index{Index: formattedIndex} - } - return fIndex, nil -} - func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { + mappingMode := e.config.MappingMode() + router := newDocumentRouter(mappingMode, e.dynamicIndex, e.index, e.config) + e.wg.Add(1) defer e.wg.Done() @@ -389,7 +366,7 @@ func (e *elasticsearchExporter) pushTraceData( spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil { + if err := e.pushTraceRecord(ctx, router, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -397,7 +374,7 @@ func (e *elasticsearchExporter) pushTraceData( } for ii := 0; ii < span.Events().Len(); ii++ { spanEvent := span.Events().At(ii) - if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil { + if err := e.pushSpanEvent(ctx, router, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil { errs = append(errs, err) } } @@ -416,6 +393,7 @@ func (e *elasticsearchExporter) pushTraceData( func (e *elasticsearchExporter) pushTraceRecord( ctx context.Context, + router documentRouter, resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, @@ -423,31 +401,23 @@ func (e *elasticsearchExporter) pushTraceRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := elasticsearch.Index{Index: e.index} - if e.dynamicIndex { - fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, span.Name()) - } - - if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) - if err != nil { - return err - } - fIndex = elasticsearch.Index{Index: formattedIndex} + index, err := router.routeSpan(resource, scope, span.Attributes()) + if err != nil { + return err } buf := e.bufferPool.NewPooledBuffer() - err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, fIndex, buf.Buffer) - if err != nil { + if err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, index, buf.Buffer); err != nil { buf.Recycle() return fmt.Errorf("failed to encode trace record: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil) + return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil) } func (e *elasticsearchExporter) pushSpanEvent( ctx context.Context, + router documentRouter, resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, @@ -456,26 +426,19 @@ func (e *elasticsearchExporter) pushSpanEvent( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := elasticsearch.Index{Index: e.index} - if e.dynamicIndex { - fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) + index, err := router.routeSpanEvent(resource, scope, spanEvent.Attributes()) + if err != nil { + return err } - if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) - if err != nil { - return err - } - fIndex = elasticsearch.Index{Index: formattedIndex} - } buf := e.bufferPool.NewPooledBuffer() - e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, fIndex, buf.Buffer) + e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, index, buf.Buffer) if buf.Buffer.Len() == 0 { buf.Recycle() return nil } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil) + return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil) } func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 55f2347ab5eb..4869e35e00d3 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1132,11 +1132,13 @@ func TestEncodeLogOtelMode(t *testing.T) { for _, tc := range tests { record, scope, resource := createTestOTelLogRecord(t, tc.rec) + router := newDocumentRouter(MappingOTel, true, "", &Config{}) - idx := routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) + idx, err := router.routeLogRecord(resource, scope, record.Attributes()) + require.NoError(t, err) var buf bytes.Buffer - err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, idx, &buf) + err = m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, idx, &buf) require.NoError(t, err) want := tc.rec