From 9987999e8a0e9428cd74220d409008ca55b911ac Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Thu, 13 Feb 2025 09:59:40 +0100 Subject: [PATCH] [exporter/elasticsearch] Support for profiles export (#37567) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds support for exporting profiles within the elasticsearch exporter. --------- Co-authored-by: Florian Lehner Co-authored-by: Carson Ip Co-authored-by: Tim Rühsen Co-authored-by: Andrzej Stencel --- .chloggen/elasticsearch-profiles.yaml | 27 + exporter/elasticsearchexporter/README.md | 26 +- exporter/elasticsearchexporter/attribute.go | 15 +- exporter/elasticsearchexporter/bulkindexer.go | 33 +- .../elasticsearchexporter/bulkindexer_test.go | 67 +- exporter/elasticsearchexporter/exporter.go | 179 ++++- exporter/elasticsearchexporter/factory.go | 34 +- exporter/elasticsearchexporter/go.mod | 19 +- exporter/elasticsearchexporter/go.sum | 32 +- .../integrationtest/go.mod | 9 +- .../integrationtest/go.sum | 23 +- .../internal/metadata/generated_status.go | 7 +- .../serializer/otelserializer/profile.go | 85 +++ .../serializer/otelserializer/profile_test.go | 126 ++++ .../serializeprofiles/benchmark_test.go | 82 ++ .../serializeprofiles/encode.go | 74 ++ .../otelserializer/serializeprofiles/model.go | 132 ++++ .../serializeprofiles/transform.go | 443 +++++++++++ .../serializeprofiles/transform_test.go | 698 ++++++++++++++++++ .../serializeprofiles/unixtime.go | 33 + .../serializeprofiles/unixtime_test.go | 43 ++ exporter/elasticsearchexporter/metadata.yaml | 2 +- exporter/elasticsearchexporter/model.go | 11 + 23 files changed, 2097 insertions(+), 103 deletions(-) create mode 100644 .chloggen/elasticsearch-profiles.yaml create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/benchmark_test.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/encode.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform_test.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime.go create mode 100644 exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime_test.go diff --git a/.chloggen/elasticsearch-profiles.yaml b/.chloggen/elasticsearch-profiles.yaml new file mode 100644 index 000000000000..cd56b0b3c4a2 --- /dev/null +++ b/.chloggen/elasticsearch-profiles.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add profiles support to elasticsearch exporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37567] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a2cfb83edb3b..8ef99c7201d3 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -3,7 +3,7 @@ | Status | | | ------------- |-----------| -| Stability | [development]: metrics | +| Stability | [development]: metrics, profiles | | | [beta]: traces, logs | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) | @@ -14,7 +14,7 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch). +This exporter supports sending logs, metrics, traces and profiles to [Elasticsearch](https://www.elastic.co/elasticsearch). The Exporter is API-compatible with Elasticsearch 7.17.x and 8.x. Certain features of the exporter, such as the `otel` mapping mode, may require newer versions of Elasticsearch. Limited effort will @@ -246,6 +246,26 @@ The metric types supported are: - Exponential histogram (Delta temporality only) - Summary +## Exporting profiles + +Profiles support is currently in development, and should not be used in +production. Profiles only support the OTel mapping mode. + +Example: + +```yaml +exporters: + elasticsearch: + endpoint: https://elastic.example.com:9200 + mapping: + mode: otel +``` + +> [!IMPORTANT] +> For the Elasticsearch Exporter to be able to export Profiles data, Universal Profiling needs to be installed in the database. +> See [the Universal Profiling getting started documentation](https://www.elastic.co/guide/en/observability/current/profiling-get-started.html) +> You will need to use the Elasticsearch endpoint, with an [Elasticsearch API key](https://www.elastic.co/guide/en/kibana/current/api-keys.html). + [confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings [configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings [configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration @@ -355,7 +375,7 @@ In case the record contains `timestamp`, this value is used. Otherwise, the `obs ## Setting a document id dynamically -The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute. +The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute. Besides the ability to control the document ID, this setting also works as a deduplication mechanism, as Elasticsearch will refuse to index a document with the same ID. The log record attribute `elasticsearch.document_id` can be set explicitly by a processor based on the log record. diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index d9232286585c..c83b21eb8de2 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -7,13 +7,14 @@ import "go.opentelemetry.io/collector/pdata/pcommon" // dynamic index attribute key constants const ( - indexPrefix = "elasticsearch.index.prefix" - indexSuffix = "elasticsearch.index.suffix" - defaultDataStreamDataset = "generic" - defaultDataStreamNamespace = "default" - defaultDataStreamTypeLogs = "logs" - defaultDataStreamTypeMetrics = "metrics" - defaultDataStreamTypeTraces = "traces" + indexPrefix = "elasticsearch.index.prefix" + indexSuffix = "elasticsearch.index.suffix" + defaultDataStreamDataset = "generic" + defaultDataStreamNamespace = "default" + defaultDataStreamTypeLogs = "logs" + defaultDataStreamTypeMetrics = "metrics" + defaultDataStreamTypeTraces = "traces" + defaultDataStreamTypeProfiles = "profiles" ) func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) { diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index c75827f340f5..973c9f7b3042 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error + Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -55,14 +55,14 @@ type bulkIndexerSession interface { const defaultMaxRetries = 2 -func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (bulkIndexer, error) { +func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) { if config.Batcher.Enabled != nil { - return newSyncBulkIndexer(logger, client, config), nil + return newSyncBulkIndexer(logger, client, config, requireDataStream), nil } - return newAsyncBulkIndexer(logger, client, config) + return newAsyncBulkIndexer(logger, client, config, requireDataStream) } -func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkIndexerConfig { +func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig { var maxDocRetries int if config.Retry.Enabled { maxDocRetries = defaultMaxRetries @@ -79,14 +79,14 @@ func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkI MaxDocumentRetries: maxDocRetries, Pipeline: config.Pipeline, RetryOnDocumentStatus: config.Retry.RetryOnStatus, - RequireDataStream: config.MappingMode() == MappingOTel, + RequireDataStream: requireDataStream, CompressionLevel: compressionLevel, } } -func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) *syncBulkIndexer { +func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) *syncBulkIndexer { return &syncBulkIndexer{ - config: bulkIndexerConfig(client, config), + config: bulkIndexerConfig(client, config, requireDataStream), flushTimeout: config.Timeout, flushBytes: config.Flush.Bytes, retryConfig: config.Retry, @@ -126,8 +126,14 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { - doc := docappender.BulkIndexerItem{Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates} +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { + doc := docappender.BulkIndexerItem{ + Index: index, + Body: document, + DocumentID: docID, + DynamicTemplates: dynamicTemplates, + Action: action, + } err := s.bi.Add(doc) if err != nil { return err @@ -176,7 +182,7 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error { } } -func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (*asyncBulkIndexer, error) { +func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (*asyncBulkIndexer, error) { numWorkers := config.NumWorkers if numWorkers == 0 { numWorkers = runtime.NumCPU() @@ -190,7 +196,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Con pool.wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config)) + bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config, requireDataStream)) if err != nil { return nil, err } @@ -249,12 +255,13 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates, + Action: action, } select { case <-ctx.Done(): diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index a0f4912a03c3..1ca17772e9a2 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/elastic/go-docappender/v2" "github.com/elastic/go-elasticsearch/v8" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -97,12 +98,12 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { }}) require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config) + bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config, false) require.NoError(t, err) session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -111,56 +112,6 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { } } -func TestAsyncBulkIndexer_requireDataStream(t *testing.T) { - tests := []struct { - name string - config Config - wantRequireDataStream bool - }{ - { - name: "ecs", - config: Config{ - NumWorkers: 1, - Mapping: MappingsSettings{Mode: MappingECS.String()}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - wantRequireDataStream: false, - }, - { - name: "otel", - config: Config{ - NumWorkers: 1, - Mapping: MappingsSettings{Mode: MappingOTel.String()}, - Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8}, - }, - wantRequireDataStream: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - requireDataStreamCh := make(chan bool, 1) - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(r *http.Request) (*http.Response, error) { - if r.URL.Path == "/_bulk" { - requireDataStreamCh <- r.URL.Query().Get("require_data_stream") == "true" - } - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - - runBulkIndexerOnce(t, &tt.config, client) - - assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh) - }) - } -} - func TestAsyncBulkIndexer_flush_error(t *testing.T) { tests := []struct { name string @@ -222,14 +173,14 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { require.NoError(t, err) core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg) + bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg, false) require.NoError(t, err) defer bulkIndexer.Close(context.Background()) session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -303,12 +254,12 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { } func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer { - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config) + bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config, false) require.NoError(t, err) session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -331,11 +282,11 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { }}) require.NoError(t, err) - bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg) + bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg, false) session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 56ec493e7086..11e2357907a6 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -4,6 +4,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "bytes" "context" "errors" "fmt" @@ -11,17 +12,20 @@ import ( "sync" "time" + "github.com/elastic/go-docappender/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer" ) type elasticsearchExporter struct { @@ -38,6 +42,16 @@ type elasticsearchExporter struct { wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer + // Profiles requires multiple bulk indexers depending on the data type + // Bulk indexer for profiling-events-* + biEvents bulkIndexer + // Bulk indexer for profiling-stacktraces + biStackTraces bulkIndexer + // Bulk indexer for profiling-stackframes + biStackFrames bulkIndexer + // Bulk indexer for profiling-executables + biExecutables bulkIndexer + bufferPool *pool.BufferPool } @@ -81,11 +95,32 @@ func (e *elasticsearchExporter) Start(ctx context.Context, host component.Host) if err != nil { return err } - bulkIndexer, err := newBulkIndexer(e.Logger, client, e.config) + bulkIndexer, err := newBulkIndexer(e.Logger, client, e.config, e.config.MappingMode() == MappingOTel) if err != nil { return err } e.bulkIndexer = bulkIndexer + biEvents, err := newBulkIndexer(e.Logger, client, e.config, true) + if err != nil { + return err + } + e.biEvents = biEvents + biStackTraces, err := newBulkIndexer(e.Logger, client, e.config, false) + if err != nil { + return err + } + e.biStackTraces = biStackTraces + biStackFrames, err := newBulkIndexer(e.Logger, client, e.config, false) + if err != nil { + return err + } + e.biStackFrames = biStackFrames + biExecutables, err := newBulkIndexer(e.Logger, client, e.config, false) + if err != nil { + return err + } + e.biExecutables = biExecutables + return nil } @@ -95,6 +130,26 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { return err } } + if e.biEvents != nil { + if err := e.biEvents.Close(ctx); err != nil { + return err + } + } + if e.biStackTraces != nil { + if err := e.biStackTraces.Close(ctx); err != nil { + return err + } + } + if e.biStackFrames != nil { + if err := e.biStackFrames.Close(ctx); err != nil { + return err + } + } + if e.biExecutables != nil { + if err := e.biExecutables.Close(ctx); err != nil { + return err + } + } doneCh := make(chan struct{}) go func() { @@ -186,7 +241,7 @@ func (e *elasticsearchExporter) pushLogRecord( } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, docID, buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, docID, buf, nil, docappender.ActionCreate) } type dataPointsGroup struct { @@ -322,7 +377,7 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates); err != nil { + if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates, docappender.ActionCreate); err != nil { // not recycling after Add returns an error as we don't know if it's already recycled if cerr := ctx.Err(); cerr != nil { return cerr @@ -443,7 +498,7 @@ func (e *elasticsearchExporter) pushTraceRecord( return fmt.Errorf("failed to encode trace record: %w", err) } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil, docappender.ActionCreate) } func (e *elasticsearchExporter) pushSpanEvent( @@ -475,7 +530,7 @@ func (e *elasticsearchExporter) pushSpanEvent( return nil } // not recycling after Add returns an error as we don't know if it's already recycled - return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil) + return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil, docappender.ActionCreate) } func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { @@ -489,3 +544,117 @@ func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string } return v.AsString() } + +func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofile.Profiles) error { + e.wg.Add(1) + defer e.wg.Done() + + defaultSession, err := e.bulkIndexer.StartSession(ctx) + if err != nil { + return err + } + defer defaultSession.End() + eventsSession, err := e.biEvents.StartSession(ctx) + if err != nil { + return err + } + defer eventsSession.End() + stackTracesSession, err := e.biStackTraces.StartSession(ctx) + if err != nil { + return err + } + defer stackTracesSession.End() + stackFramesSession, err := e.biStackFrames.StartSession(ctx) + if err != nil { + return err + } + defer stackFramesSession.End() + executablesSession, err := e.biExecutables.StartSession(ctx) + if err != nil { + return err + } + defer executablesSession.End() + + var errs []error + rps := pd.ResourceProfiles() + for i := 0; i < rps.Len(); i++ { + rp := rps.At(i) + resource := rp.Resource() + sps := rp.ScopeProfiles() + for j := 0; j < sps.Len(); j++ { + sp := sps.At(j) + scope := sp.Scope() + p := sp.Profiles() + for k := 0; k < p.Len(); k++ { + if err := e.pushProfileRecord(ctx, resource, p.At(k), scope, defaultSession, eventsSession, stackTracesSession, stackFramesSession, executablesSession); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + + if errors.Is(err, ErrInvalidTypeForBodyMapMode) { + e.Logger.Warn("dropping profile record", zap.Error(err)) + continue + } + + errs = append(errs, err) + } + } + } + } + + if err := defaultSession.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + if err := eventsSession.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + if err := stackTracesSession.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + if err := stackFramesSession.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + if err := executablesSession.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } + + return errors.Join(errs...) +} + +func (e *elasticsearchExporter) pushProfileRecord( + ctx context.Context, + resource pcommon.Resource, + record pprofile.Profile, + scope pcommon.InstrumentationScope, + defaultSession, eventsSession, stackTracesSession, stackFramesSession, executablesSession bulkIndexerSession, +) error { + return e.model.encodeProfile(resource, scope, record, func(buf *bytes.Buffer, docID, index string) error { + switch index { + case otelserializer.StackTraceIndex: + return stackTracesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + case otelserializer.StackFrameIndex: + return stackFramesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + case otelserializer.AllEventsIndex: + return eventsSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + case otelserializer.ExecutablesIndex: + return executablesSession.Add(ctx, index, docID, buf, nil, docappender.ActionUpdate) + default: + return defaultSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate) + } + }) +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index c72ecbfc0fd1..267cb1219c91 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -17,6 +17,8 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" + "go.opentelemetry.io/collector/exporter/xexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) @@ -30,12 +32,13 @@ const ( // NewFactory creates a factory for Elastic exporter. func NewFactory() exporter.Factory { - return exporter.NewFactory( + return xexporter.NewFactory( metadata.Type, createDefaultConfig, - exporter.WithLogs(createLogsExporter, metadata.LogsStability), - exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), - exporter.WithTraces(createTracesExporter, metadata.TracesStability), + xexporter.WithLogs(createLogsExporter, metadata.LogsStability), + xexporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), + xexporter.WithTraces(createTracesExporter, metadata.TracesStability), + xexporter.WithProfiles(createProfilesExporter, metadata.ProfilesStability), ) } @@ -163,6 +166,29 @@ func createTracesExporter(ctx context.Context, ) } +// createProfilesExporter creates a new exporter for profiles. +// +// Profiles are directly indexed into Elasticsearch. +func createProfilesExporter( + ctx context.Context, + set exporter.Settings, + cfg component.Config, +) (xexporter.Profiles, error) { + cf := cfg.(*Config) + + handleDeprecatedConfig(cf, set.Logger) + + exporter := newExporter(cf, set, "", false) + + return xexporterhelper.NewProfilesExporter( + ctx, + set, + cfg, + exporter.pushProfilesData, + exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)..., + ) +} + func exporterhelperOptions( cfg *Config, start component.StartFunc, diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 9217b96a6df3..06bcd9d1a1eb 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -23,18 +23,25 @@ require ( go.opentelemetry.io/collector/confmap/xconfmap v0.0.0-20250210155359-76f44e1e21d1 go.opentelemetry.io/collector/consumer v1.25.1-0.20250210123122-44b3eeda354c go.opentelemetry.io/collector/exporter v0.119.1-0.20250210123122-44b3eeda354c + go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.1-0.20250210123122-44b3eeda354c go.opentelemetry.io/collector/exporter/exportertest v0.119.1-0.20250210123122-44b3eeda354c + go.opentelemetry.io/collector/exporter/xexporter v0.119.1-0.20250210123122-44b3eeda354c go.opentelemetry.io/collector/extension/auth/authtest v0.119.1-0.20250210123122-44b3eeda354c go.opentelemetry.io/collector/pdata v1.25.1-0.20250210123122-44b3eeda354c + go.opentelemetry.io/collector/pdata/pprofile v0.119.1-0.20250210123122-44b3eeda354c go.opentelemetry.io/collector/semconv v0.119.1-0.20250210123122-44b3eeda354c + go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f + go.opentelemetry.io/otel v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) require ( github.com/armon/go-radix v1.0.0 // indirect + github.com/cilium/ebpf v0.16.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect + github.com/elastic/go-freelru v0.16.0 // indirect github.com/elastic/go-sysinfo v1.15.0 // indirect github.com/elastic/go-windows v1.0.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -47,9 +54,11 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -59,8 +68,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rs/cors v1.11.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.elastic.co/apm/module/apmzap/v2 v2.6.3 // indirect go.elastic.co/apm/v2 v2.6.3 // indirect go.elastic.co/fastjson v1.4.0 // indirect @@ -69,30 +80,30 @@ require ( go.opentelemetry.io/collector/config/configretry v1.25.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/config/configtls v1.25.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/consumer/consumererror v0.119.1-0.20250210123122-44b3eeda354c // indirect + go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/consumer/consumertest v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.119.1-0.20250210123122-44b3eeda354c // indirect - go.opentelemetry.io/collector/exporter/xexporter v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/extension v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/extension/auth v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/extension/xextension v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/featuregate v1.25.1-0.20250210123122-44b3eeda354c // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/pipeline v0.119.1-0.20250210123122-44b3eeda354c // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/receiver v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/receiver/receivertest v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.119.1-0.20250210123122-44b3eeda354c // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sync v0.11.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect google.golang.org/grpc v1.70.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/exporter/elasticsearchexporter/go.sum b/exporter/elasticsearchexporter/go.sum index f96ddea1977f..1785f9061602 100644 --- a/exporter/elasticsearchexporter/go.sum +++ b/exporter/elasticsearchexporter/go.sum @@ -2,6 +2,8 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= +github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -11,6 +13,8 @@ github.com/elastic/go-docappender/v2 v2.5.0 h1:fNsmkTrV82gTd6BTA5bc0qLriPdZOhf5h github.com/elastic/go-docappender/v2 v2.5.0/go.mod h1:ymIoSDEiGm8fG/Vdfk4ytsX+qG20cLFtQkX6fZ7NfTs= github.com/elastic/go-elasticsearch/v8 v8.17.1 h1:bOXChDoCMB4TIwwGqKd031U8OXssmWLT3UrAr9EGs3Q= github.com/elastic/go-elasticsearch/v8 v8.17.1/go.mod h1:MVJCtL+gJJ7x5jFeUmA20O7rvipX8GcQmo5iBcmaJn4= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elastic/go-structform v0.0.12 h1:HXpzlAKyej8T7LobqKDThUw7BMhwV6Db24VwxNtgxCs= github.com/elastic/go-structform v0.0.12/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elastic/go-sysinfo v1.15.0 h1:54pRFlAYUlVNQ2HbXzLVZlV+fxS7Eax49stzg95M4Xw= @@ -26,6 +30,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -48,6 +54,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= @@ -62,6 +70,8 @@ github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2t github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/strftime v1.1.0 h1:gMESpZy44/4pXLO/m+sL0yBd1W6LjgjrrD4a68Gapyg= github.com/lestrrat-go/strftime v1.1.0/go.mod h1:uzeIB52CeUJenCo1syghlugshMysrqUT51HlxphXVeI= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -83,6 +93,8 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -96,6 +108,10 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3 h1:5CuemBg1oZnXI6jz+jkLqM95Np1XHasdy0CCelX62Ec= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3/go.mod h1:MVK2TIrvMGQaL7bnO4dbnJ+jpDNqmkj+BukDRBYTI60= go.elastic.co/apm/module/apmhttp/v2 v2.6.3 h1:jwFovJZkccySKCyy2oE9ZKvUry/a4gdQHF/MtZUQVtE= @@ -134,12 +150,16 @@ go.opentelemetry.io/collector/consumer v1.25.1-0.20250210123122-44b3eeda354c h1: go.opentelemetry.io/collector/consumer v1.25.1-0.20250210123122-44b3eeda354c/go.mod h1:wzomc+0FCV4FrgNP4SUF/7P1ux2iE7h4SAxVXk0+rv4= go.opentelemetry.io/collector/consumer/consumererror v0.119.1-0.20250210123122-44b3eeda354c h1:SSjo+YYRBgsE3fw426HrEku6WNoyP6MMXRycRZRxApE= go.opentelemetry.io/collector/consumer/consumererror v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:EXh9Sl/YNz3xSgzDdptiUr2UszNPONdHc3RzdfeKQDE= +go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.119.1-0.20250210123122-44b3eeda354c h1:mLUTJ6LHjPJ7R9IRMyl/P/uluBGz4+wyX7gsd9cq+sQ= +go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:PU682ehBugZlpb5wx6TrKNx9bCTRmV77x0/+oaXvLQk= go.opentelemetry.io/collector/consumer/consumertest v0.119.1-0.20250210123122-44b3eeda354c h1:ErvHovgDePqXULwMtxVQcFGIPm1KqCrz+R8XK7752wI= go.opentelemetry.io/collector/consumer/consumertest v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:ur4t9L7UlnyqzgtWkoGC9dcJI0UR4u/5r2yz44ivblE= go.opentelemetry.io/collector/consumer/xconsumer v0.119.1-0.20250210123122-44b3eeda354c h1:AS9BA4G5hPZ8NwZLqV07y4N7Cd0/PT6dWaosKnsVHSc= go.opentelemetry.io/collector/consumer/xconsumer v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:wwqVjueP0SKyuFKtDyqkzs0nAjQhHbSITEr+7oWV1Nc= go.opentelemetry.io/collector/exporter v0.119.1-0.20250210123122-44b3eeda354c h1:Tdw39Eci0Q4eJDpaYOjSUYa/4wH8ODlLcngnn3K1fV4= go.opentelemetry.io/collector/exporter v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:8TJKNP27koPbJWHvv9T2tC6WfXEu692hyaLOpOhdn2U= +go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.1-0.20250210123122-44b3eeda354c h1:FG5MsWKE7gS4A7bAZwhoMHMs66Rm+PiWe4U9owAT2C0= +go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:7qd9JihRnhkya5e/oIEVSNjiWGgxsMEvLsBt2APw1kY= go.opentelemetry.io/collector/exporter/exportertest v0.119.1-0.20250210123122-44b3eeda354c h1:uTsuBIs2+dvsIBQfrY2ZYY9QrIIUUv6OveP68b2D8DU= go.opentelemetry.io/collector/exporter/exportertest v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:bKwNYiqxhrTcmKeU6NXXAimPbdVgcw4wmjWWT0PAgas= go.opentelemetry.io/collector/exporter/xexporter v0.119.1-0.20250210123122-44b3eeda354c h1:7fSki69pVj0tqVbmfZu2ToaU4K+uWQGzMAOPqwoSlqc= @@ -164,6 +184,8 @@ go.opentelemetry.io/collector/pdata/testdata v0.119.0 h1:a3OiuLYx7CaEQQ8LxMhPIM8 go.opentelemetry.io/collector/pdata/testdata v0.119.0/go.mod h1:stCgL1fKOVp93mI4ocHy/xBMhkW3TXr8VetH4X86q8g= go.opentelemetry.io/collector/pipeline v0.119.1-0.20250210123122-44b3eeda354c h1:7wjodUC7ykxg900reVF1svAIv4XNzTAnRyd75dTNTt0= go.opentelemetry.io/collector/pipeline v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= +go.opentelemetry.io/collector/pipeline/xpipeline v0.119.1-0.20250210123122-44b3eeda354c h1:TW5nWxB/wxXV5euDOzMktLxWhei7enp/5BhtCeUhkn0= +go.opentelemetry.io/collector/pipeline/xpipeline v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:m2acg7G5Rb8Srm5aK4rD7mUDQhd1r/qXq1DJxdhMAcQ= go.opentelemetry.io/collector/receiver v0.119.1-0.20250210123122-44b3eeda354c h1:x3WbeEtQbh3dL7FIwfn/JT2bhJcQ/i8l6oUIDcjIcSk= go.opentelemetry.io/collector/receiver v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:QCV0M2OClUeBwTB4i8BaMCas70Jdtftimr82Qsw88G0= go.opentelemetry.io/collector/receiver/receivertest v0.119.1-0.20250210123122-44b3eeda354c h1:MHZ5hsbGPm+z5cAZG/2UVi/c4Vq0GE12PBsNC9qmViw= @@ -174,6 +196,8 @@ go.opentelemetry.io/collector/semconv v0.119.1-0.20250210123122-44b3eeda354c h1: go.opentelemetry.io/collector/semconv v0.119.1-0.20250210123122-44b3eeda354c/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= +go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f h1:DqRQ7JaRjf3TwWwfwHIvsBB/aLUs+kgrX+MrAIllALI= +go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f/go.mod h1:hfAVBjRN6FZjSgZUBsNzvRDJWlS46R5Y0SGVr4Jl86s= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= @@ -193,6 +217,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -209,6 +235,8 @@ golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -223,8 +251,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d h1:xJJRGY7TJcvIlpSrN3K6LAWgNFUILlO+OMAqtg9aqnw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index 694ed72281a1..b70d018282b8 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -43,10 +43,12 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cilium/ebpf v0.16.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/ebitengine/purego v0.8.2 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect github.com/elastic/go-elasticsearch/v8 v8.17.1 // indirect + github.com/elastic/go-freelru v0.16.0 // indirect github.com/elastic/go-grok v0.3.1 // indirect github.com/elastic/go-structform v0.0.12 // indirect github.com/elastic/go-sysinfo v1.15.0 // indirect @@ -79,6 +81,7 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect @@ -88,6 +91,7 @@ require ( github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect github.com/magefile/mage v1.15.0 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -123,6 +127,7 @@ require ( github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rs/cors v1.11.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.6 // indirect @@ -131,6 +136,7 @@ require ( github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect github.com/valyala/fastjson v1.6.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.6.3 // indirect go.elastic.co/apm/module/apmzap/v2 v2.6.3 // indirect @@ -188,6 +194,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.34.0 // indirect go.opentelemetry.io/contrib/zpages v0.59.0 // indirect + go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f // indirect go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.10.0 // indirect @@ -208,7 +215,7 @@ require ( go.opentelemetry.io/otel/trace v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.22.0 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/go.sum b/exporter/elasticsearchexporter/integrationtest/go.sum index d45d750a60e3..44e840e85ed1 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.sum +++ b/exporter/elasticsearchexporter/integrationtest/go.sum @@ -25,6 +25,8 @@ github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMr github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= +github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -40,6 +42,8 @@ github.com/elastic/go-docappender/v2 v2.5.0 h1:fNsmkTrV82gTd6BTA5bc0qLriPdZOhf5h github.com/elastic/go-docappender/v2 v2.5.0/go.mod h1:ymIoSDEiGm8fG/Vdfk4ytsX+qG20cLFtQkX6fZ7NfTs= github.com/elastic/go-elasticsearch/v8 v8.17.1 h1:bOXChDoCMB4TIwwGqKd031U8OXssmWLT3UrAr9EGs3Q= github.com/elastic/go-elasticsearch/v8 v8.17.1/go.mod h1:MVJCtL+gJJ7x5jFeUmA20O7rvipX8GcQmo5iBcmaJn4= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elastic/go-grok v0.3.1 h1:WEhUxe2KrwycMnlvMimJXvzRa7DoByJB4PVUIE1ZD/U= github.com/elastic/go-grok v0.3.1/go.mod h1:n38ls8ZgOboZRgKcjMY8eFeZFMmcL9n2lP0iHhIDk64= github.com/elastic/go-structform v0.0.12 h1:HXpzlAKyej8T7LobqKDThUw7BMhwV6Db24VwxNtgxCs= @@ -68,6 +72,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= @@ -146,6 +152,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= @@ -174,6 +182,8 @@ github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c h1:cqn374mizHuIWj+OSJCajGr/phAmuMug9qIX3l9CflE= @@ -222,6 +232,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= @@ -270,6 +282,10 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3 h1:5CuemBg1oZnXI6jz+jkLqM95Np1XHasdy0CCelX62Ec= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.3/go.mod h1:MVK2TIrvMGQaL7bnO4dbnJ+jpDNqmkj+BukDRBYTI60= go.elastic.co/apm/module/apmhttp/v2 v2.6.3 h1:jwFovJZkccySKCyy2oE9ZKvUry/a4gdQHF/MtZUQVtE= @@ -420,6 +436,8 @@ go.opentelemetry.io/contrib/propagators/b3 v1.34.0 h1:9pQdCEvV/6RWQmag94D6rhU+A4 go.opentelemetry.io/contrib/propagators/b3 v1.34.0/go.mod h1:FwM71WS8i1/mAK4n48t0KU6qUS/OZRBgDrHZv3RlJ+w= go.opentelemetry.io/contrib/zpages v0.59.0 h1:t0H5zUy8fifIhRuVwm2FrA/D70Kk10SSpAEvvbaNscw= go.opentelemetry.io/contrib/zpages v0.59.0/go.mod h1:9wo+yUPvHnBQEzoHJ8R3nA/Q5rkef7HjtLlSFI0Tgrc= +go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f h1:DqRQ7JaRjf3TwWwfwHIvsBB/aLUs+kgrX+MrAIllALI= +go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f/go.mod h1:hfAVBjRN6FZjSgZUBsNzvRDJWlS46R5Y0SGVr4Jl86s= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0 h1:5dTKu4I5Dn4P2hxyW3l3jTaZx9ACgg0ECos1eAVrheY= @@ -473,8 +491,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -528,6 +546,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/exporter/elasticsearchexporter/internal/metadata/generated_status.go b/exporter/elasticsearchexporter/internal/metadata/generated_status.go index 777092615160..89c60990e031 100644 --- a/exporter/elasticsearchexporter/internal/metadata/generated_status.go +++ b/exporter/elasticsearchexporter/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - MetricsStability = component.StabilityLevelDevelopment - TracesStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelDevelopment + ProfilesStability = component.StabilityLevelDevelopment + TracesStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta ) diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go new file mode 100644 index 000000000000..bb1a75e2858b --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelserializer // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer" + +import ( + "bytes" + "encoding/json" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" +) + +const ( + AllEventsIndex = "profiling-events-all" + StackTraceIndex = "profiling-stacktraces" + StackFrameIndex = "profiling-stackframes" + ExecutablesIndex = "profiling-executables" +) + +// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document. +func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error { + data, err := serializeprofiles.Transform(resource, scope, profile) + if err != nil { + return err + } + + for _, payload := range data { + if payload.StackTraceEvent.StackTraceID != "" { + c, err := toJSON(payload.StackTraceEvent) + if err != nil { + return err + } + err = pushData(c, "", AllEventsIndex) + if err != nil { + return err + } + } + + if payload.StackTrace.DocID != "" { + c, err := toJSON(payload.StackTrace) + if err != nil { + return err + } + err = pushData(c, payload.StackTrace.DocID, StackTraceIndex) + if err != nil { + return err + } + } + + for _, stackFrame := range payload.StackFrames { + c, err := toJSON(stackFrame) + if err != nil { + return err + } + err = pushData(c, stackFrame.DocID, StackFrameIndex) + if err != nil { + return err + } + } + + for _, executable := range payload.Executables { + c, err := toJSON(executable) + if err != nil { + return err + } + err = pushData(c, executable.DocID, ExecutablesIndex) + if err != nil { + return err + } + } + } + return nil +} + +func toJSON(d any) (*bytes.Buffer, error) { + c, err := json.Marshal(d) + if err != nil { + return nil, err + } + + return bytes.NewBuffer(c), nil +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go new file mode 100644 index 000000000000..279693ff623d --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelserializer + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" +) + +func TestSerializeProfile(t *testing.T) { + tests := []struct { + name string + profileCustomizer func(resource pcommon.Resource, scope pcommon.InstrumentationScope, record pprofile.Profile) + wantErr bool + expected []map[string]any + }{ + { + name: "with a simple sample", + profileCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, profile pprofile.Profile) { + profile.StringTable().Append("samples", "count", "cpu", "nanoseconds") + st := profile.SampleType().AppendEmpty() + st.SetTypeStrindex(0) + st.SetUnitStrindex(1) + pt := profile.PeriodType() + pt.SetTypeStrindex(2) + pt.SetUnitStrindex(3) + + a := profile.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr("600DCAFE4A110000F2BF38C493F5FB92") + a = profile.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + a = profile.AttributeTable().AppendEmpty() + a.SetKey("host.id") + a.Value().SetStr("localhost") + + profile.AttributeIndices().Append(2) + + sample := profile.Sample().AppendEmpty() + sample.TimestampsUnixNano().Append(0) + sample.SetLocationsLength(1) + + m := profile.MappingTable().AppendEmpty() + m.AttributeIndices().Append(0) + + l := profile.LocationTable().AppendEmpty() + l.SetMappingIndex(0) + l.SetAddress(111) + l.AttributeIndices().Append(1) + }, + wantErr: false, + expected: []map[string]any{ + { + "Stacktrace.frame.ids": "YA3K_koRAADyvzjEk_X7kgAAAAAAAABv", + "Stacktrace.frame.types": "AQM", + "ecs.version": "1.12.0", + }, + { + "script": map[string]any{ + "params": map[string]any{ + "buildid": "YA3K_koRAADyvzjEk_X7kg", + "ecsversion": "1.12.0", + "filename": "samples", + "timestamp": json.Number(fmt.Sprintf("%d", serializeprofiles.GetStartOfWeekFromTime(time.Now()))), + }, + "source": serializeprofiles.ExeMetadataUpsertScript, + }, + "scripted_upsert": true, + "upsert": map[string]any{}, + }, + { + "@timestamp": "1970-01-01T00:00:00Z", + "Stacktrace.count": json.Number("1"), + "Stacktrace.id": "02VzuClbpt_P3xxwox83Ng", + "ecs.version": "1.12.0", + "host.id": "localhost", + "process.thread.name": "", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + profiles := pprofile.NewProfiles() + resource := profiles.ResourceProfiles().AppendEmpty() + scope := resource.ScopeProfiles().AppendEmpty() + profile := scope.Profiles().AppendEmpty() + tt.profileCustomizer(resource.Resource(), scope.Scope(), profile) + profiles.MarkReadOnly() + + buf := []*bytes.Buffer{} + err := SerializeProfile(resource.Resource(), scope.Scope(), profile, func(b *bytes.Buffer, _ string, _ string) error { + buf = append(buf, b) + return nil + }) + if !tt.wantErr { + require.NoError(t, err) + } + + var results []map[string]any + for _, v := range buf { + var d map[string]any + decoder := json.NewDecoder(v) + decoder.UseNumber() + err := decoder.Decode(&d) + + require.NoError(t, err) + results = append(results, d) + } + + assert.Equal(t, tt.expected, results) + }) + } +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/benchmark_test.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/benchmark_test.go new file mode 100644 index 000000000000..506c1cca5782 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/benchmark_test.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles + +import ( + "testing" + + "go.opentelemetry.io/collector/pdata/pprofile" +) + +func BenchmarkTransform(b *testing.B) { + for _, bb := range []struct { + name string + buildResourceProfiles func() pprofile.ResourceProfiles + }{ + { + name: "with a basic recorded sample", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + a := p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildIDEncoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID2Encoded) + + p.StringTable().Append("firefox", "libc.so", "samples", "count", "cpu", "nanoseconds") + st := p.SampleType().AppendEmpty() + st.SetTypeStrindex(2) + st.SetUnitStrindex(3) + pt := p.PeriodType() + pt.SetTypeStrindex(4) + pt.SetUnitStrindex(5) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(1) + m.SetFilenameStrindex(0) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(2) + m.SetFilenameStrindex(1) + + l := p.LocationTable().AppendEmpty() + l.SetAddress(address) + l.AttributeIndices().Append(0) + l.SetMappingIndex(0) + l = p.LocationTable().AppendEmpty() + l.SetAddress(address2) + l.AttributeIndices().Append(0) + l.SetMappingIndex(1) + + s := p.Sample().AppendEmpty() + s.TimestampsUnixNano().Append(42) + s.Value().Append(1) + s.SetLocationsLength(2) + s.SetLocationsStartIndex(0) + + return rp + }, + }, + } { + b.Run(bb.name, func(b *testing.B) { + rp := bb.buildResourceProfiles() + sp := rp.ScopeProfiles().At(0) + p := sp.Profiles().At(0) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, _ = Transform(rp.Resource(), sp.Scope(), p) + } + }) + } +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/encode.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/encode.go new file mode 100644 index 000000000000..8c75d49151a6 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/encode.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" + +import ( + "bytes" + "encoding/base64" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// runLengthEncodeFrameTypesReverseTo is a specialized run-length encoder for +// a reversed array of FrameType elements. +// +// The output is a binary stream of 1-byte length and the binary representation of the +// object. +// E.g. an uint8 array like ['a', 'a', 'x', 'x', 'x', 'x', 'x'] is converted into +// the byte array [5, 'x', 2, 'a']. +// +// This function has been optimized to do zero heap allocations if dst has enough capacity. +func runLengthEncodeFrameTypesReverseTo(dst *bytes.Buffer, values []libpf.FrameType) { + if len(values) == 0 { + return + } + + l := 1 + cur := values[len(values)-1] + + write := func() { + dst.WriteByte(byte(l)) + dst.WriteByte(byte(cur)) + } + + for i := len(values) - 2; i >= 0; i-- { + next := values[i] + + if next == cur && l < 255 { + l++ + continue + } + + write() + l = 1 + cur = next + } + + write() +} + +// encodeFrameTypesTo applies run-length encoding to the frame types in reverse order +// and writes the results as base64url encoded string into dst. +// +// This function has been optimized to do zero heap allocations if dst has enough capacity. +func encodeFrameTypesTo(dst *bytes.Buffer, frameTypes []libpf.FrameType) { + // Up to 255 consecutive identical frame types are converted into 2 bytes (binary). + // Switching between frame types does not happen often, so 128 is more than enough + // for the base64 representation, even to cover most corner cases. + // The fallback will do a heap allocation for the rare cases that need more than 128 bytes. + buf := bytes.NewBuffer(make([]byte, 0, 128)) + runLengthEncodeFrameTypesReverseTo(buf, frameTypes) + + var tmp []byte + tmplen := base64.RawURLEncoding.EncodedLen(buf.Len()) + if tmplen <= 128 { + // Enforce stack allocation by using a fixed size. + tmp = make([]byte, 128) + } else { + // Fall back to heap allocation. + tmp = make([]byte, tmplen) + } + base64.RawURLEncoding.Encode(tmp, buf.Bytes()) + dst.Write(tmp[:tmplen]) +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go new file mode 100644 index 000000000000..ced9dea2caa5 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go @@ -0,0 +1,132 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" + +import ( + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// EcsVersionString is the value for the `ecs.version` metrics field. +// It is relatively arbitrary and currently has no consumer. +// APM server is using 1.12.0. We stick with it as well. +const EcsVersionString = "1.12.0" + +// EcsVersion is a struct to hold the `ecs.version` metrics field. +// Used as a helper in ES index struct types. +type EcsVersion struct { + V string `json:"ecs.version"` +} + +// StackPayload represents a single [StackTraceEvent], with a [StackTrace], a +// map of [StackFrames] and a map of [ExeMetadata] that have been serialized, +// and need to be ingested into ES. +type StackPayload struct { + StackTraceEvent StackTraceEvent + StackTrace StackTrace + StackFrames []StackFrame + Executables []ExeMetadata + + UnsymbolizedLeafFrames []libpf.FrameID +} + +// StackTraceEvent represents a stacktrace event serializable into ES. +// The json field names need to be case-sensitively equal to the fields defined +// in the schema mapping. +type StackTraceEvent struct { + EcsVersion + TimeStamp unixTime64 `json:"@timestamp"` + HostID string `json:"host.id"` + StackTraceID string `json:"Stacktrace.id"` // 128-bit hash in binary form + + // Event-specific metadata + PodName string `json:"orchestrator.resource.name,omitempty"` + ContainerID string `json:"container.id,omitempty"` + ContainerName string `json:"container.name,omitempty"` + K8sNamespaceName string `json:"k8s.namespace.name,omitempty"` + ThreadName string `json:"process.thread.name"` + Count uint16 `json:"Stacktrace.count"` +} + +// StackTrace represents a stacktrace serializable into the stacktraces index. +// DocID should be the base64-encoded Stacktrace ID. +type StackTrace struct { + EcsVersion + DocID string `json:"-"` + FrameIDs string `json:"Stacktrace.frame.ids"` + Types string `json:"Stacktrace.frame.types"` +} + +// StackFrame represents a stacktrace serializable into the stackframes index. +// DocID should be the base64-encoded FileID+Address (24 bytes). +// To simplify the unmarshalling for readers, we use arrays here, even though host agent +// doesn't send inline information yet. The symbolizer already stores arrays, which requires +// the reader to handle both formats if we don't use arrays here. +type StackFrame struct { + EcsVersion + DocID string `json:"-"` + FileName []string `json:"Stackframe.file.name,omitempty"` + FunctionName []string `json:"Stackframe.function.name,omitempty"` + LineNumber []int32 `json:"Stackframe.line.number,omitempty"` + FunctionOffset []int32 `json:"Stackframe.function.offset,omitempty"` +} + +// Script written in Painless that will both create a new document (if DocID does not exist), +// and update timestamp of an existing document. Named parameters are used to improve performance +// re: script compilation (since the script does not change across executions, it can be compiled +// once and cached). +const ExeMetadataUpsertScript = ` +if (ctx.op == 'create') { + ctx._source['@timestamp'] = params.timestamp; + ctx._source['Executable.build.id'] = params.buildid; + ctx._source['Executable.file.name'] = params.filename; + ctx._source['ecs.version'] = params.ecsversion; +} else { + if (ctx._source['@timestamp'] == params.timestamp) { + ctx.op = 'noop' + } else { + ctx._source['@timestamp'] = params.timestamp + } +} +` + +type ExeMetadataScript struct { + Source string `json:"source"` + Params ExeMetadataParams `json:"params"` +} + +type ExeMetadataParams struct { + LastSeen uint32 `json:"timestamp"` + BuildID string `json:"buildid"` + FileName string `json:"filename"` + EcsVersion string `json:"ecsversion"` +} + +// ExeMetadata represents executable metadata serializable into the executables index. +// DocID should be the base64-encoded FileID. +type ExeMetadata struct { + DocID string `json:"-"` + // ScriptedUpsert needs to be 'true' for the script to execute regardless of the + // document existing or not. + ScriptedUpsert bool `json:"scripted_upsert"` + Script ExeMetadataScript `json:"script"` + // This needs to exist for document creation to succeed (if document does not exist), + // but can be empty as the script implements both document creation and updating. + Upsert struct{} `json:"upsert"` +} + +func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) ExeMetadata { + return ExeMetadata{ + DocID: docID, + ScriptedUpsert: true, + Script: ExeMetadataScript{ + Source: ExeMetadataUpsertScript, + Params: ExeMetadataParams{ + LastSeen: lastSeen, + BuildID: buildID, + FileName: fileName, + EcsVersion: EcsVersionString, + }, + }, + } +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go new file mode 100644 index 000000000000..e1e31d5718aa --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go @@ -0,0 +1,443 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" + +import ( + "bytes" + "fmt" + "hash/fnv" + "strconv" + "strings" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" +) + +// Transform transforms a [pprofile.Profile] into our own +// representation, for ingestion into Elasticsearch +func Transform(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) { + var data []StackPayload + + if err := checkProfileType(profile); err != nil { + return data, err + } + + // profileContainer is checked for nil inside stackPayloads(). + payloads, err := stackPayloads(resource, scope, profile) + if err != nil { + return nil, err + } + data = append(data, payloads...) + + return data, nil +} + +// checkProfileType acts as safeguard to make sure only known profiles are +// accepted. Different kinds of profiles are currently not supported +// and mixing profiles will make profiling information unusable. +func checkProfileType(profile pprofile.Profile) error { + sampleType := profile.SampleType() + if sampleType.Len() != 1 { + return fmt.Errorf("expected 1 sample type but got %d", sampleType.Len()) + } + + sType := getString(profile, int(sampleType.At(0).TypeStrindex())) + sUnit := getString(profile, int(sampleType.At(0).UnitStrindex())) + + // Make sure only on-CPU profiling data is accepted at the moment. + // This needs to match with + //nolint:lll + // https://github.com/open-telemetry/opentelemetry-ebpf-profiler/blob/a720d06a401cb23249c5066dc69e96384af99cf3/reporter/otlp_reporter.go#L531 + if !strings.EqualFold(sType, "samples") || !strings.EqualFold(sUnit, "count") { + return fmt.Errorf("expected sampling type of [[\"samples\",\"count\"]] "+ + "but got [[\"%s\", \"%s\"]]", sType, sUnit) + } + + periodType := profile.PeriodType() + pType := getString(profile, int(periodType.TypeStrindex())) + pUnit := getString(profile, int(periodType.UnitStrindex())) + + // Make sure only on-CPU profiling data is accepted at the moment. + // This needs to match with + //nolint:lll + // https://github.com/open-telemetry/opentelemetry-ebpf-profiler/blob/a720d06a401cb23249c5066dc69e96384af99cf3/reporter/otlp_reporter.go#L536 + if !strings.EqualFold(pType, "cpu") || !strings.EqualFold(pUnit, "nanoseconds") { + return fmt.Errorf("expected period type [\"cpu\",\"nanoseconds\"] but got "+ + "[\"%s\", \"%s\"]", pType, pUnit) + } + + return nil +} + +// stackPayloads creates a slice of StackPayloads from the given ResourceProfiles, +// ScopeProfiles, and ProfileContainer. +func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) { + unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len()) + stackPayload := make([]StackPayload, 0, profile.Sample().Len()) + + hostMetadata := newHostMetadata(resource, scope, profile) + + for i := 0; i < profile.Sample().Len(); i++ { + sample := profile.Sample().At(i) + + frames, frameTypes, leafFrame, err := stackFrames(profile, sample) + if err != nil { + return nil, fmt.Errorf("failed to create stackframes: %w", err) + } + if len(frames) == 0 { + continue + } + + traceID, err := stackTraceID(frames) + if err != nil { + return nil, fmt.Errorf("failed to create stacktrace ID: %w", err) + } + + event := stackTraceEvent(traceID, profile, sample, hostMetadata) + + // Set the stacktrace and stackframes to the payload. + // The docs only need to be written once. + stackPayload = append(stackPayload, StackPayload{ + StackTrace: stackTrace(traceID, frames, frameTypes), + StackFrames: symbolizedFrames(frames), + }) + + if !isFrameSymbolized(frames[len(frames)-1]) && leafFrame != nil { + unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame) + } + + // Add one event per timestamp and its count value. + for j := 0; j < sample.TimestampsUnixNano().Len(); j++ { + t := sample.TimestampsUnixNano().At(j) + event.TimeStamp = newUnixTime64(t) + + if j < sample.Value().Len() { + event.Count = uint16(sample.Value().At(j)) + } else { + event.Count = 1 // restore default + } + if event.Count > 0 { + stackPayload = append(stackPayload, StackPayload{ + StackTraceEvent: event, + }) + } + } + } + + if len(stackPayload) > 0 { + if profile.MappingTable().Len() > 0 { + exeMetadata, err := executables(profile, profile.MappingTable()) + if err != nil { + return nil, err + } + + stackPayload[0].Executables = exeMetadata + } + stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames + } + + return stackPayload, nil +} + +// symbolizedFrames returns a slice of StackFrames that have symbols. +func symbolizedFrames(frames []StackFrame) []StackFrame { + framesWithSymbols := make([]StackFrame, 0, len(frames)) + for i := range frames { + if isFrameSymbolized(frames[i]) { + framesWithSymbols = append(framesWithSymbols, frames[i]) + } + } + return framesWithSymbols +} + +func isFrameSymbolized(frame StackFrame) bool { + return len(frame.FileName) > 0 || len(frame.FunctionName) > 0 +} + +func stackTraceEvent(traceID string, profile pprofile.Profile, sample pprofile.Sample, hostMetadata map[string]string) StackTraceEvent { + event := StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + HostID: hostMetadata[string(semconv.HostIDKey)], + StackTraceID: traceID, + Count: 1, // TODO: Check whether count can be dropped with nanosecond timestamps + } + + // Store event-specific attributes. + for i := 0; i < sample.AttributeIndices().Len(); i++ { + if profile.AttributeTable().Len() < i { + continue + } + attr := profile.AttributeTable().At(i) + + switch attribute.Key(attr.Key()) { + case semconv.HostIDKey: + event.HostID = attr.Value().AsString() + case semconv.ContainerIDKey: + event.ContainerID = attr.Value().AsString() + case semconv.K8SPodNameKey: + event.PodName = attr.Value().AsString() + case semconv.ContainerNameKey: + event.ContainerName = attr.Value().AsString() + case semconv.ThreadNameKey: + event.ThreadName = attr.Value().AsString() + } + } + + return event +} + +func stackTrace(stackTraceID string, frames []StackFrame, frameTypes []libpf.FrameType) StackTrace { + frameIDs := make([]string, 0, len(frames)) + for _, f := range frames { + frameIDs = append(frameIDs, f.DocID) + } + + // Up to 255 consecutive identical frame types are converted into 2 bytes (binary). + // We expect mostly consecutive frame types in a trace. Even if the encoding + // takes more than 32 bytes in single cases, the probability that the average base64 length + // per trace is below 32 bytes is very high. + // We expect resizing of buf to happen very rarely. + buf := bytes.NewBuffer(make([]byte, 0, 32)) + encodeFrameTypesTo(buf, frameTypes) + + return StackTrace{ + EcsVersion: EcsVersion{V: EcsVersionString}, + DocID: stackTraceID, + FrameIDs: strings.Join(frameIDs, ""), + Types: buf.String(), + } +} + +func stackFrames(profile pprofile.Profile, sample pprofile.Sample) ([]StackFrame, []libpf.FrameType, *libpf.FrameID, error) { + frames := make([]StackFrame, 0, sample.LocationsLength()) + + locations := getLocations(profile, sample) + totalFrames := 0 + for _, location := range locations { + totalFrames += location.Line().Len() + } + frameTypes := make([]libpf.FrameType, 0, totalFrames) + + var leafFrameID *libpf.FrameID + + for locationIdx, location := range locations { + if location.MappingIndex() >= int32(profile.MappingTable().Len()) { + continue + } + + frameTypeStr, err := getStringFromAttribute(profile, location, "profile.frame.type") + if err != nil { + return nil, nil, nil, err + } + frameTypes = append(frameTypes, libpf.FrameTypeFromString(frameTypeStr)) + + functionNames := make([]string, 0, location.Line().Len()) + fileNames := make([]string, 0, location.Line().Len()) + lineNumbers := make([]int32, 0, location.Line().Len()) + + for i := 0; i < location.Line().Len(); i++ { + line := location.Line().At(i) + + if line.FunctionIndex() < int32(profile.FunctionTable().Len()) { + functionNames = append(functionNames, getString(profile, int(profile.FunctionTable().At(int(line.FunctionIndex())).NameStrindex()))) + fileNames = append(fileNames, getString(profile, int(profile.FunctionTable().At(int(line.FunctionIndex())).FilenameStrindex()))) + } + lineNumbers = append(lineNumbers, int32(line.Line())) + } + + frameID, err := getFrameID(profile, location) + if err != nil { + return nil, nil, nil, err + } + + if locationIdx == 0 { + leafFrameID = frameID + } + + frames = append([]StackFrame{ + { + EcsVersion: EcsVersion{V: EcsVersionString}, + DocID: frameID.String(), + FileName: fileNames, + FunctionName: functionNames, + LineNumber: lineNumbers, + }, + }, frames...) + } + + return frames, frameTypes, leafFrameID, nil +} + +func getFrameID(profile pprofile.Profile, location pprofile.Location) (*libpf.FrameID, error) { + // The MappingIndex is known to be valid. + mapping := profile.MappingTable().At(int(location.MappingIndex())) + buildID, err := getBuildID(profile, mapping) + if err != nil { + return nil, err + } + + var addressOrLineno uint64 + if location.Address() > 0 { + addressOrLineno = location.Address() + } else if location.Line().Len() > 0 { + addressOrLineno = uint64(location.Line().At(location.Line().Len() - 1).Line()) + } + + frameID := libpf.NewFrameID(buildID, libpf.AddressOrLineno(addressOrLineno)) + return &frameID, nil +} + +type attributable interface { + AttributeIndices() pcommon.Int32Slice +} + +// getStringFromAttribute returns a string from one of attrIndices from the attribute table +// of the profile if the attribute key matches the expected attrKey. +func getStringFromAttribute(profile pprofile.Profile, record attributable, attrKey string) (string, error) { + lenAttrTable := profile.AttributeTable().Len() + + for i := 0; i < record.AttributeIndices().Len(); i++ { + idx := int(record.AttributeIndices().At(i)) + + if idx >= lenAttrTable { + return "", fmt.Errorf("requested attribute index (%d) "+ + "exceeds size of attribute table (%d)", idx, lenAttrTable) + } + if profile.AttributeTable().At(idx).Key() == attrKey { + return profile.AttributeTable().At(idx).Value().AsString(), nil + } + } + + return "", fmt.Errorf("failed to get '%s' from indices %v", attrKey, record.AttributeIndices().AsRaw()) +} + +// getBuildID returns the Build ID for the given mapping. It checks for both +// old-style Build ID (stored with the mapping) and Build ID as attribute. +func getBuildID(profile pprofile.Profile, mapping pprofile.Mapping) (libpf.FileID, error) { + // Fetch build ID from profiles.attribute_table. + buildIDStr, err := getStringFromAttribute(profile, mapping, "process.executable.build_id.htlhash") + if err != nil { + return libpf.FileID{}, err + } + return libpf.FileIDFromString(buildIDStr) +} + +func executables(profile pprofile.Profile, mappings pprofile.MappingSlice) ([]ExeMetadata, error) { + metadata := make([]ExeMetadata, 0, mappings.Len()) + lastSeen := GetStartOfWeekFromTime(time.Now()) + + for i := 0; i < mappings.Len(); i++ { + mapping := mappings.At(i) + + filename := profile.StringTable().At(int(mapping.FilenameStrindex())) + if filename == "" { + // This is true for interpreted languages like Python. + continue + } + + buildID, err := getBuildID(profile, mapping) + if err != nil { + return nil, err + } + + if buildID.IsZero() { + // No build ID was specified or could be fetched. + continue + } + + docID := buildID.Base64() + executable := NewExeMetadata(docID, lastSeen, docID, filename) + metadata = append(metadata, executable) + } + + return metadata, nil +} + +// stackTraceID creates a unique trace ID from the stack frames. +// For the OTEL profiling protocol, we have all required information in one wire message. +// But for the Elastic gRPC protocol, trace events and stack traces are sent separately, so +// that the host agent still needs to generate the stack trace IDs. +// +// The following code generates the same trace ID as the host agent. +// For ES 9.0.0, we could use a faster hash algorithm, e.g. xxh3, and hash strings instead +// of hashing binary data. +func stackTraceID(frames []StackFrame) (string, error) { + var buf [24]byte + h := fnv.New128a() + for i := len(frames) - 1; i >= 0; i-- { // reverse ordered frames, done in stackFrames() + frameID, err := libpf.NewFrameIDFromString(frames[i].DocID) + if err != nil { + return "", fmt.Errorf("failed to create frameID from string: %w", err) + } + _, _ = h.Write(frameID.FileID().Bytes()) + // Using FormatUint() or putting AppendUint() into a function leads + // to escaping to heap (allocation). + _, _ = h.Write(strconv.AppendUint(buf[:0], uint64(frameID.AddressOrLine()), 10)) + } + // make instead of nil avoids a heap allocation + traceHash, err := libpf.TraceHashFromBytes(h.Sum(make([]byte, 0, 16))) + if err != nil { + return "", err + } + + return traceHash.Base64(), nil +} + +func getLocations(profile pprofile.Profile, sample pprofile.Sample) []pprofile.Location { + if sample.LocationsLength() > 0 { + locations := make([]pprofile.Location, 0, sample.LocationsLength()) + + for i := int(sample.LocationsStartIndex()); i < int(sample.LocationsLength()); i++ { + if i < profile.LocationTable().Len() { + locations = append(locations, profile.LocationTable().At(i)) + } + } + return locations + } + + locations := make([]pprofile.Location, 0, sample.LocationsLength()) + lastIndex := int(sample.LocationsStartIndex() + sample.LocationsLength()) + for i := int(sample.LocationsStartIndex()); i < lastIndex; i++ { + if i < profile.LocationTable().Len() { + locations = append(locations, profile.LocationTable().At(i)) + } + } + return locations +} + +func getString(profile pprofile.Profile, index int) string { + if index < profile.StringTable().Len() { + return profile.StringTable().At(index) + } + return "" +} + +func GetStartOfWeekFromTime(t time.Time) uint32 { + return uint32(t.Truncate(time.Hour * 24 * 7).Unix()) +} + +func newHostMetadata(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) map[string]string { + attrs := make(map[string]string, 128) + + addEventHostData(attrs, resource.Attributes()) + addEventHostData(attrs, scope.Attributes()) + addEventHostData(attrs, pprofile.FromAttributeIndices(profile.AttributeTable(), profile)) + + if len(attrs) == 0 { + return nil + } + + return attrs +} + +func addEventHostData(data map[string]string, attrs pcommon.Map) { + attrs.Range(func(k string, v pcommon.Value) bool { + data[k] = v.AsString() + return true + }) +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform_test.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform_test.go new file mode 100644 index 000000000000..466e327c0012 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform_test.go @@ -0,0 +1,698 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles + +import ( + "bytes" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/ebpf-profiler/libpf" + semconv "go.opentelemetry.io/otel/semconv/v1.22.0" +) + +var ( + stacktraceIDBase64 = stacktraceIDFormat(0xcafebeef, 0xd00d1eaf) + + buildID, buildIDEncoded, buildIDBase64 = formatFileIDFormat(0x0011223344556677, + 0x8899aabbccddeeff) + buildID2, buildID2Encoded, buildID2Base64 = formatFileIDFormat(0x0112233445566778, + 0x899aabbccddeeffe) + buildID3, buildID3Encoded, _ = formatFileIDFormat(0x1122334455667788, + 0x99aabbccddeeffee) + + frameIDBase64 = libpf.NewFrameID(buildID, address).String() + frameID2Base64 = libpf.NewFrameID(buildID2, address2).String() + frameID3Base64 = libpf.NewFrameID(buildID3, address3).String() +) + +const ( + address = 111 + address2 = 222 + address3 = 333 +) + +func stacktraceIDFormat(hi, lo uint64) string { + // Base64() is used in the host agent to encode stacktraceID. + return libpf.NewFileID(hi, lo).Base64() +} + +func formatFileIDFormat(hi, lo uint64) (fileID libpf.FileID, fileIDHex, fileIDBase64 string) { + // StringNoQuotes() is used in the host agent to encode stacktraceID and buildID. + // We should possibly switch to Base64 encoding. + fileID = libpf.NewFileID(hi, lo) + fileIDHex = fileID.StringNoQuotes() + fileIDBase64 = fileID.Base64() + return +} + +func TestTransform(t *testing.T) { + wantedTraceID := mkStackTraceID(t, []libpf.FrameID{ + libpf.NewFrameID(buildID, address), + libpf.NewFrameID(buildID2, address2), + }) + for _, tt := range []struct { + name string + buildResourceProfiles func() pprofile.ResourceProfiles + + wantPayload []StackPayload + wantErr error + }{ + { + name: "with an empty sample", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + p.StringTable().Append("samples", "count", "cpu", "nanoseconds") + st := p.SampleType().AppendEmpty() + st.SetTypeStrindex(0) + st.SetUnitStrindex(1) + pt := p.PeriodType() + pt.SetTypeStrindex(2) + pt.SetUnitStrindex(3) + + p.Sample().AppendEmpty() + + return rp + }, + + wantPayload: nil, + wantErr: nil, + }, + { + name: "with an invalid profiling type", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + p.StringTable().Append("off-CPU", "events") + st := p.SampleType().AppendEmpty() + st.SetTypeStrindex(0) + st.SetUnitStrindex(1) + + p.Sample().AppendEmpty() + + return rp + }, + + wantPayload: nil, + wantErr: errors.New("expected sampling type of [[\"samples\",\"count\"]] but got [[\"off-CPU\", \"events\"]]"), + }, + { + name: "with no sample value and no line number on location", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + p.StringTable().Append("samples", "count", "cpu", "nanoseconds") + st := p.SampleType().AppendEmpty() + st.SetTypeStrindex(0) + st.SetUnitStrindex(1) + pt := p.PeriodType() + pt.SetTypeStrindex(2) + pt.SetUnitStrindex(3) + + s := p.Sample().AppendEmpty() + s.TimestampsUnixNano().Append(42) + + l := p.LocationTable().AppendEmpty() + l.SetAddress(111) + + return rp + }, + + wantPayload: nil, + wantErr: nil, + }, + { + name: "with a single indexed sample", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + a := p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildIDEncoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID2Encoded) + + p.StringTable().Append("firefox", "libc.so", "samples", "count", "cpu", "nanoseconds") + st := p.SampleType().AppendEmpty() + st.SetTypeStrindex(2) + st.SetUnitStrindex(3) + pt := p.PeriodType() + pt.SetTypeStrindex(4) + pt.SetUnitStrindex(5) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(1) + m.SetFilenameStrindex(0) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(2) + m.SetFilenameStrindex(1) + + l := p.LocationTable().AppendEmpty() + l.SetAddress(address) + l.AttributeIndices().Append(0) + l.SetMappingIndex(0) + l = p.LocationTable().AppendEmpty() + l.SetAddress(address2) + l.AttributeIndices().Append(0) + l.SetMappingIndex(1) + + s := p.Sample().AppendEmpty() + s.TimestampsUnixNano().Append(42) + s.Value().Append(1) + s.SetLocationsLength(2) + s.SetLocationsStartIndex(0) + + return rp + }, + + wantPayload: []StackPayload{ + { + StackTrace: StackTrace{ + EcsVersion: EcsVersion{V: EcsVersionString}, + DocID: wantedTraceID, + FrameIDs: frameID2Base64 + frameIDBase64, + Types: frameTypesToString([]libpf.FrameType{ + libpf.NativeFrame, + libpf.NativeFrame, + }), + }, + StackFrames: []StackFrame{}, + Executables: []ExeMetadata{ + NewExeMetadata( + buildIDBase64, + GetStartOfWeekFromTime(time.Now()), + buildIDBase64, + "firefox", + ), + NewExeMetadata( + buildID2Base64, + GetStartOfWeekFromTime(time.Now()), + buildID2Base64, + "libc.so", + ), + }, + UnsymbolizedLeafFrames: []libpf.FrameID{ + libpf.NewFrameID(buildID, address), + }, + }, + { + StackTraceEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + TimeStamp: 42000000000, + StackTraceID: wantedTraceID, + Count: 1, + }, + }, + }, + wantErr: nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + rp := tt.buildResourceProfiles() + sp := rp.ScopeProfiles().At(0) + + payload, err := Transform(rp.Resource(), sp.Scope(), sp.Profiles().At(0)) + require.Equal(t, tt.wantErr, err) + assert.Equal(t, tt.wantPayload, payload) + }) + } +} + +func TestStackPayloads(t *testing.T) { + wantedTraceID := mkStackTraceID(t, []libpf.FrameID{ + libpf.NewFrameID(buildID, address), + libpf.NewFrameID(buildID2, address2), + }) + for _, tt := range []struct { + name string + buildResourceProfiles func() pprofile.ResourceProfiles + + wantPayload []StackPayload + wantErr error + }{ + { //nolint:dupl + name: "with a single indexed sample", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + p.StringTable().Append(stacktraceIDBase64, "firefox", "libc.so") + + a := p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildIDEncoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID2Encoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + + l := p.LocationTable().AppendEmpty() + l.SetMappingIndex(0) + l.SetAddress(address) + l.AttributeIndices().Append(3) + l = p.LocationTable().AppendEmpty() + l.SetMappingIndex(1) + l.SetAddress(address2) + l.AttributeIndices().Append(3) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(1) + m.SetFilenameStrindex(1) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(2) + m.SetFilenameStrindex(2) + + s := p.Sample().AppendEmpty() + s.TimestampsUnixNano().Append(1) + s.Value().Append(1) + s.SetLocationsLength(2) + s.SetLocationsStartIndex(0) + + return rp + }, + + wantPayload: []StackPayload{ + { + StackTrace: StackTrace{ + EcsVersion: EcsVersion{V: EcsVersionString}, + DocID: wantedTraceID, + FrameIDs: frameID2Base64 + frameIDBase64, + Types: frameTypesToString([]libpf.FrameType{ + libpf.FrameType(3), + libpf.FrameType(3), + }), + }, + StackFrames: []StackFrame{}, + Executables: []ExeMetadata{ + NewExeMetadata( + buildIDBase64, + GetStartOfWeekFromTime(time.Now()), + buildIDBase64, + "firefox", + ), + NewExeMetadata( + buildID2Base64, + GetStartOfWeekFromTime(time.Now()), + buildID2Base64, + "libc.so", + ), + }, + UnsymbolizedLeafFrames: []libpf.FrameID{ + libpf.NewFrameID(buildID, address), + }, + }, + { + StackTraceEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + TimeStamp: 1000000000, + StackTraceID: wantedTraceID, + Count: 1, + }, + }, + }, + }, + { + name: "with a duplicated sample", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + + p.StringTable().Append(stacktraceIDBase64, "firefox", "libc.so") + + a := p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildIDEncoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID2Encoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + + l := p.LocationTable().AppendEmpty() + l.SetMappingIndex(0) + l.SetAddress(address) + l.AttributeIndices().Append(2) + l = p.LocationTable().AppendEmpty() + l.SetMappingIndex(1) + l.SetAddress(address2) + l.AttributeIndices().Append(2) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(0) + m.SetFilenameStrindex(1) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(1) + m.SetFilenameStrindex(2) + + s := p.Sample().AppendEmpty() + s.TimestampsUnixNano().Append(1) + s.Value().Append(2) + s.SetLocationsLength(2) + s.SetLocationsStartIndex(0) + + return rp + }, + + wantPayload: []StackPayload{ + { + StackTrace: StackTrace{ + EcsVersion: EcsVersion{V: EcsVersionString}, + DocID: wantedTraceID, + FrameIDs: frameID2Base64 + frameIDBase64, + Types: frameTypesToString([]libpf.FrameType{ + libpf.FrameType(3), + libpf.FrameType(3), + }), + }, + StackFrames: []StackFrame{}, + Executables: []ExeMetadata{ + NewExeMetadata( + buildIDBase64, + GetStartOfWeekFromTime(time.Now()), + buildIDBase64, + "firefox", + ), + NewExeMetadata( + buildID2Base64, + GetStartOfWeekFromTime(time.Now()), + buildID2Base64, + "libc.so", + ), + }, + UnsymbolizedLeafFrames: []libpf.FrameID{ + libpf.NewFrameID(buildID, address), + }, + }, + { + StackTraceEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + TimeStamp: 1000000000, + StackTraceID: wantedTraceID, + Count: 2, + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + rp := tt.buildResourceProfiles() + sp := rp.ScopeProfiles().At(0) + + payloads, err := stackPayloads(rp.Resource(), sp.Scope(), sp.Profiles().At(0)) + require.Equal(t, tt.wantErr, err) + assert.Equal(t, tt.wantPayload, payloads) + }) + } +} + +func TestStackTraceEvent(t *testing.T) { + for _, tt := range []struct { + name string + timestamp uint64 + buildResourceProfiles func() pprofile.ResourceProfiles + + wantEvent StackTraceEvent + }{ + { + name: "sets host specific data", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + _ = rp.Resource().Attributes().FromRaw(map[string]any{ + string(semconv.ServiceVersionKey): "1.2.0", + }) + + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + p.StringTable().Append(stacktraceIDBase64) + + p.Sample().AppendEmpty() + + return rp + }, + + wantEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + StackTraceID: stacktraceIDBase64, + Count: 1, + }, + }, + { + name: "sets the timestamp", + timestamp: 1000000000, + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + p.StringTable().Append(stacktraceIDBase64) + + p.Sample().AppendEmpty() + + return rp + }, + + wantEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + TimeStamp: 1000000000000000000, + StackTraceID: stacktraceIDBase64, + Count: 1, + }, + }, + { + name: "sets the stack trace ID", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + p.StringTable().Append(stacktraceIDBase64) + + p.Sample().AppendEmpty() + + return rp + }, + + wantEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + StackTraceID: stacktraceIDBase64, + Count: 1, + }, + }, + { + name: "sets event specific data", + buildResourceProfiles: func() pprofile.ResourceProfiles { + rp := pprofile.NewResourceProfiles() + sp := rp.ScopeProfiles().AppendEmpty() + p := sp.Profiles().AppendEmpty() + p.StringTable().Append(stacktraceIDBase64) + + a := p.AttributeTable().AppendEmpty() + a.SetKey(string(semconv.K8SPodNameKey)) + a.Value().SetStr("my_pod") + a = p.AttributeTable().AppendEmpty() + a.SetKey(string(semconv.ContainerNameKey)) + a.Value().SetStr("my_container") + a = p.AttributeTable().AppendEmpty() + a.SetKey(string(semconv.ThreadNameKey)) + a.Value().SetStr("my_thread") + a = p.AttributeTable().AppendEmpty() + a.SetKey(string(semconv.ServiceNameKey)) + a.Value().SetStr("my_service") + + s := p.Sample().AppendEmpty() + s.AttributeIndices().Append(0, 1, 2, 3) + + return rp + }, + + wantEvent: StackTraceEvent{ + EcsVersion: EcsVersion{V: EcsVersionString}, + PodName: "my_pod", + ContainerName: "my_container", + ThreadName: "my_thread", + StackTraceID: stacktraceIDBase64, + Count: 1, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + rp := tt.buildResourceProfiles() + p := rp.ScopeProfiles().At(0).Profiles().At(0) + s := p.Sample().At(0) + + event := stackTraceEvent(stacktraceIDBase64, p, s, map[string]string{}) + event.TimeStamp = newUnixTime64(tt.timestamp) + + assert.Equal(t, tt.wantEvent, event) + }) + } +} + +func TestStackTrace(t *testing.T) { + for _, tt := range []struct { + name string + buildProfile func() pprofile.Profile + + wantTrace StackTrace + }{ + { + name: "creates a stack trace", + buildProfile: func() pprofile.Profile { + p := pprofile.NewProfile() + + a := p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("kernel") + a = p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("dotnet") + a = p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildIDEncoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID2Encoded) + a = p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(buildID3Encoded) + + p.StringTable().Append( + stacktraceIDBase64, + "kernel", + "native", + "dotnet", + ) + + l := p.LocationTable().AppendEmpty() + l.SetMappingIndex(0) + l.SetAddress(address) + l.AttributeIndices().Append(0) + l = p.LocationTable().AppendEmpty() + l.SetMappingIndex(1) + l.SetAddress(address2) + l.AttributeIndices().Append(1) + l = p.LocationTable().AppendEmpty() + l.SetMappingIndex(2) + l.SetAddress(address3) + l.AttributeIndices().Append(2) + + li := l.Line().AppendEmpty() + li.SetLine(1) + li = l.Line().AppendEmpty() + li.SetLine(3) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(3) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(4) + m = p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(5) + + s := p.Sample().AppendEmpty() + s.SetLocationsLength(3) + + return p + }, + + wantTrace: StackTrace{ + EcsVersion: EcsVersion{V: EcsVersionString}, + FrameIDs: frameID3Base64 + frameID2Base64 + frameIDBase64, + Types: frameTypesToString([]libpf.FrameType{ + libpf.KernelFrame, + libpf.DotnetFrame, + libpf.NativeFrame, + }), + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + p := tt.buildProfile() + s := p.Sample().At(0) + + frames, frameTypes, _, err := stackFrames(p, s) + require.NoError(t, err) + + stacktrace := stackTrace("", frames, frameTypes) + assert.Equal(t, tt.wantTrace, stacktrace) + + assert.Equal(t, len(frames), len(frameTypes)) + }) + } +} + +// frameTypesToString converts a slice of FrameType to a RLE encoded string as stored in ES. +// +// Decode such strings with e.g. 'echo -n Ago | basenc --base64url -d | od -t x1'. +// Output "02 0a" means 02 frames with type 0a (10). +// In libpf/frametype.go you find DotnetFrame with value 10. +func frameTypesToString(frameTypes []libpf.FrameType) string { + buf := bytes.NewBuffer(make([]byte, 0, 32)) + encodeFrameTypesTo(buf, frameTypes) + return buf.String() +} + +func mkStackTraceID(t *testing.T, frameIDs []libpf.FrameID) string { + p := pprofile.NewProfile() + s := p.Sample().AppendEmpty() + s.SetLocationsLength(int32(len(frameIDs))) + + a := p.AttributeTable().AppendEmpty() + a.SetKey("profile.frame.type") + a.Value().SetStr("native") + + for i, frameID := range frameIDs { + p.StringTable().Append(frameID.FileID().StringNoQuotes()) + + a := p.AttributeTable().AppendEmpty() + a.SetKey("process.executable.build_id.htlhash") + a.Value().SetStr(frameID.FileID().StringNoQuotes()) + + m := p.MappingTable().AppendEmpty() + m.AttributeIndices().Append(int32(i + 1)) + + l := p.LocationTable().AppendEmpty() + l.SetMappingIndex(int32(i)) + l.SetAddress(uint64(frameID.AddressOrLine())) + l.AttributeIndices().Append(0) + } + + frames, _, _, err := stackFrames(p, s) + require.NoError(t, err) + + traceID, err := stackTraceID(frames) + require.NoError(t, err) + + return traceID +} diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime.go new file mode 100644 index 000000000000..c6e93a51b0fe --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles" + +import ( + "encoding/json" + "fmt" + "math" + "time" +) + +// unixTime64 represents nanoseconds since epoch. +type unixTime64 uint64 + +// newUnixTime64 creates a unixTime64 from either seconds or nanoseconds since the epoch. +func newUnixTime64(t uint64) unixTime64 { + if t <= math.MaxUint32 { + return unixTime64(t) * 1e9 + } + return unixTime64(t) +} + +func (t unixTime64) MarshalJSON() ([]byte, error) { + // Nanoseconds, ES does not support 'epoch_nanoseconds' so + // we have to pass it a value formatted as 'strict_date_optional_time_nanos'. + out := []byte(fmt.Sprintf("%q", + time.Unix(0, int64(t)).UTC().Format(time.RFC3339Nano))) + return out, nil +} + +// Compile-time interface checks +var _ json.Marshaler = (*unixTime64)(nil) diff --git a/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime_test.go b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime_test.go new file mode 100644 index 000000000000..893fec4356d5 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/unixtime_test.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package serializeprofiles + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnixTime64_MarshalJSON(t *testing.T) { + tests := []struct { + name string + time unixTime64 + want string + }{ + { + name: "zero", + time: newUnixTime64(0), + want: `"1970-01-01T00:00:00Z"`, + }, + { + name: "32bit value (assure there is no regression)", + time: newUnixTime64(1710349106), + want: `"2024-03-13T16:58:26Z"`, + }, + { + name: "64bit value", + time: newUnixTime64(1710349106864964685), + want: `"2024-03-13T16:58:26.864964685Z"`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b, err := test.time.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, test.want, string(b)) + }) + } +} diff --git a/exporter/elasticsearchexporter/metadata.yaml b/exporter/elasticsearchexporter/metadata.yaml index 97b1c0b20349..5bf4cc422cdb 100644 --- a/exporter/elasticsearchexporter/metadata.yaml +++ b/exporter/elasticsearchexporter/metadata.yaml @@ -4,7 +4,7 @@ status: class: exporter stability: beta: [traces, logs] - development: [metrics] + development: [metrics, profiles] distributions: [contrib] codeowners: active: [JaredTan95, carsonip, lahsivjar] diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 03021e75f29d..5467a23f6bd5 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" @@ -83,6 +84,7 @@ type mappingModel interface { hashDataPoint(datapoints.DataPoint) uint32 encodeDocument(objmodel.Document, *bytes.Buffer) error encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) + encodeProfile(pcommon.Resource, pcommon.InstrumentationScope, pprofile.Profile, func(*bytes.Buffer, string, string) error) error } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -294,6 +296,15 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU otelserializer.SerializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, idx, buf) } +func (m *encodeModel) encodeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, record pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error { + switch m.mode { + case MappingOTel: + return otelserializer.SerializeProfile(resource, scope, record, pushData) + default: + return errors.New("profiles can only be encoded in OTel mode") + } +} + func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx elasticsearch.Index) { key := "Attributes" if m.mode == MappingRaw {