Skip to content

Commit

Permalink
[exporter/elasticsearch] Support for profiles export (open-telemetry#…
Browse files Browse the repository at this point in the history
…37567)

This adds support for exporting profiles within the elasticsearch
exporter.

---------

Co-authored-by: Florian Lehner <[email protected]>
Co-authored-by: Carson Ip <[email protected]>
Co-authored-by: Tim Rühsen <[email protected]>
Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
5 people authored Feb 13, 2025
1 parent 19a5f29 commit 9987999
Show file tree
Hide file tree
Showing 23 changed files with 2,097 additions and 103 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-profiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add profiles support to elasticsearch exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37567]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
26 changes: 23 additions & 3 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics |
| Stability | [development]: metrics, profiles |
| | [beta]: traces, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) |
Expand All @@ -14,7 +14,7 @@
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
This exporter supports sending logs, metrics, traces and profiles to [Elasticsearch](https://www.elastic.co/elasticsearch).

The Exporter is API-compatible with Elasticsearch 7.17.x and 8.x. Certain features of the exporter,
such as the `otel` mapping mode, may require newer versions of Elasticsearch. Limited effort will
Expand Down Expand Up @@ -246,6 +246,26 @@ The metric types supported are:
- Exponential histogram (Delta temporality only)
- Summary

## Exporting profiles

Profiles support is currently in development, and should not be used in
production. Profiles only support the OTel mapping mode.

Example:

```yaml
exporters:
elasticsearch:
endpoint: https://elastic.example.com:9200
mapping:
mode: otel
```

> [!IMPORTANT]
> For the Elasticsearch Exporter to be able to export Profiles data, Universal Profiling needs to be installed in the database.
> See [the Universal Profiling getting started documentation](https://www.elastic.co/guide/en/observability/current/profiling-get-started.html)
> You will need to use the Elasticsearch endpoint, with an [Elasticsearch API key](https://www.elastic.co/guide/en/kibana/current/api-keys.html).

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down Expand Up @@ -355,7 +375,7 @@ In case the record contains `timestamp`, this value is used. Otherwise, the `obs

## Setting a document id dynamically

The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute.
The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute.
Besides the ability to control the document ID, this setting also works as a deduplication mechanism, as Elasticsearch will refuse to index a document with the same ID.

The log record attribute `elasticsearch.document_id` can be set explicitly by a processor based on the log record.
Expand Down
15 changes: 8 additions & 7 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
defaultDataStreamTypeProfiles = "profiles"
)

func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {
Expand Down
33 changes: 20 additions & 13 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand All @@ -55,14 +55,14 @@ type bulkIndexerSession interface {

const defaultMaxRetries = 2

func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (bulkIndexer, error) {
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
return newSyncBulkIndexer(logger, client, config), nil
return newSyncBulkIndexer(logger, client, config, requireDataStream), nil
}
return newAsyncBulkIndexer(logger, client, config)
return newAsyncBulkIndexer(logger, client, config, requireDataStream)
}

func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkIndexerConfig {
func bulkIndexerConfig(client esapi.Transport, config *Config, requireDataStream bool) docappender.BulkIndexerConfig {
var maxDocRetries int
if config.Retry.Enabled {
maxDocRetries = defaultMaxRetries
Expand All @@ -79,14 +79,14 @@ func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkI
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
RequireDataStream: config.MappingMode() == MappingOTel,
RequireDataStream: requireDataStream,
CompressionLevel: compressionLevel,
}
}

func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) *syncBulkIndexer {
func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) *syncBulkIndexer {
return &syncBulkIndexer{
config: bulkIndexerConfig(client, config),
config: bulkIndexerConfig(client, config, requireDataStream),
flushTimeout: config.Timeout,
flushBytes: config.Flush.Bytes,
retryConfig: config.Retry,
Expand Down Expand Up @@ -126,8 +126,14 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
doc := docappender.BulkIndexerItem{Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates}
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
doc := docappender.BulkIndexerItem{
Index: index,
Body: document,
DocumentID: docID,
DynamicTemplates: dynamicTemplates,
Action: action,
}
err := s.bi.Add(doc)
if err != nil {
return err
Expand Down Expand Up @@ -176,7 +182,7 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error {
}
}

func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (*asyncBulkIndexer, error) {
func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config, requireDataStream bool) (*asyncBulkIndexer, error) {
numWorkers := config.NumWorkers
if numWorkers == 0 {
numWorkers = runtime.NumCPU()
Expand All @@ -190,7 +196,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Con
pool.wg.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config))
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config, requireDataStream))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,12 +255,13 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
DocumentID: docID,
DynamicTemplates: dynamicTemplates,
Action: action,
}
select {
case <-ctx.Done():
Expand Down
67 changes: 9 additions & 58 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-elasticsearch/v8"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -97,12 +98,12 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config, false)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand All @@ -111,56 +112,6 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
}
}

func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
tests := []struct {
name string
config Config
wantRequireDataStream bool
}{
{
name: "ecs",
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingECS.String()},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
wantRequireDataStream: false,
},
{
name: "otel",
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingOTel.String()},
Flush: FlushSettings{Interval: time.Hour, Bytes: 1e+8},
},
wantRequireDataStream: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
requireDataStreamCh := make(chan bool, 1)
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: func(r *http.Request) (*http.Response, error) {
if r.URL.Path == "/_bulk" {
requireDataStreamCh <- r.URL.Query().Get("require_data_stream") == "true"
}
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
}})
require.NoError(t, err)

runBulkIndexerOnce(t, &tt.config, client)

assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
})
}
}

func TestAsyncBulkIndexer_flush_error(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -222,14 +173,14 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
require.NoError(t, err)
core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))

bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg, false)
require.NoError(t, err)
defer bulkIndexer.Close(context.Background())

session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -303,12 +254,12 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
}

func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config)
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config, false)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
Expand All @@ -331,11 +282,11 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
}})
require.NoError(t, err)

bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg)
bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg, false)
session, err := bi.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
assert.NoError(t, bi.Close(context.Background()))
}
Loading

0 comments on commit 9987999

Please sign in to comment.