Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dynamic document pipeline support for logs #37860

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_logs_pipeline_per_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add config `logs_dynamic_pipeline` to dynamically set the document pipeline of log records using log record attribute `elasticsearch.document_pipeline`.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37419]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
6 changes: 5 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ This can be customised through the following settings:
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute.
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document. See [Setting a document id dynamically](#setting-a-document-id-dynamically).
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document when the `otel` mapping mode is used. See [Setting a document id dynamically](#setting-a-document-id-dynamically).

- `logs_dynamic_pipeline` (optional): Dynamically determines the ingest pipeline to be used in Elasticsearch based on a log record attribute.
- `enabled`(default=false): Enable/Disable dynamic pipeline for log records. If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, it will be used as the Elasticsearch ingest pipeline. The attribute `elasticsearch.document_pipeline` is removed from the final document when the `otel` mapping mode is used.

### Elasticsearch document mapping

Expand Down Expand Up @@ -188,6 +191,7 @@ Documents may be optionally passed through an [Elasticsearch Ingest pipeline] pr
This can be configured through the following settings:

- `pipeline` (optional): ID of an [Elasticsearch Ingest pipeline] used for processing documents published by the exporter.
- If `elasticsearch.document_pipeline` exists and is not an empty string in the log record attributes, then that pipeline will be used for that log record.

### Elasticsearch bulk indexing

Expand Down
8 changes: 5 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error
Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand Down Expand Up @@ -126,13 +126,14 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
doc := docappender.BulkIndexerItem{
Index: index,
Body: document,
DocumentID: docID,
DynamicTemplates: dynamicTemplates,
Action: action,
Pipeline: pipeline,
}
err := s.bi.Add(doc)
if err != nil {
Expand Down Expand Up @@ -255,13 +256,14 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, pipeline string, document io.WriterTo, dynamicTemplates map[string]string, action string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
DocumentID: docID,
DynamicTemplates: dynamicTemplates,
Action: action,
Pipeline: pipeline,
}
select {
case <-ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -259,7 +259,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
Expand All @@ -286,7 +286,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
session, err := bi.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
assert.NoError(t, bi.Close(context.Background()))
}
7 changes: 7 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type Config struct {
// LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES.
LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"`

// LogsDynamicPipeline configures whether log record attribute `elasticsearch.document_pipeline` is set as the document ingest pipeline for ES.
LogsDynamicPipeline DynamicPipelineSettings `mapstructure:"logs_dynamic_pipeline"`

// Pipeline configures the ingest node pipeline name that should be used to process the
// events.
//
Expand Down Expand Up @@ -119,6 +122,10 @@ type DynamicIDSettings struct {
Enabled bool `mapstructure:"enabled"`
}

type DynamicPipelineSettings struct {
Enabled bool `mapstructure:"enabled"`
}

// AuthenticationSettings defines user authentication related settings.
type AuthenticationSettings struct {
// User is used to configure HTTP Basic Authentication.
Expand Down
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -150,6 +153,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -223,6 +229,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down
56 changes: 35 additions & 21 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,14 @@ func (e *elasticsearchExporter) pushLogRecord(

buf := e.bufferPool.NewPooledBuffer()
docID := e.extractDocumentIDAttribute(record.Attributes())
pipeline := e.extractDocumentPipelineAttribute(record.Attributes())
if err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, index, buf.Buffer); err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode log event: %w", err)
}

// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, docID, buf, nil, docappender.ActionCreate)
return bulkIndexerSession.Add(ctx, index.Index, docID, pipeline, buf, nil, docappender.ActionCreate)
}

type dataPointsGroup struct {
Expand Down Expand Up @@ -371,7 +372,7 @@ func (e *elasticsearchExporter) pushMetricsData(
errs = append(errs, err)
continue
}
if err := session.Add(ctx, index.Index, "", buf, dynamicTemplates, docappender.ActionCreate); err != nil {
if err := session.Add(ctx, index.Index, "", "", buf, dynamicTemplates, docappender.ActionCreate); err != nil {
// not recycling after Add returns an error as we don't know if it's already recycled
if cerr := ctx.Err(); cerr != nil {
return cerr
Expand Down Expand Up @@ -467,7 +468,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
return fmt.Errorf("failed to encode trace record: %w", err)
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil, docappender.ActionCreate)
return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand All @@ -493,19 +494,7 @@ func (e *elasticsearchExporter) pushSpanEvent(
return nil
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, "", buf, nil, docappender.ActionCreate)
}

func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string {
if !e.config.LogsDynamicID.Enabled {
return ""
}

v, ok := m.Get(elasticsearch.DocumentIDAttributeName)
if !ok {
return ""
}
return v.AsString()
return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate)
}

func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofile.Profiles) error {
Expand Down Expand Up @@ -609,15 +598,40 @@ func (e *elasticsearchExporter) pushProfileRecord(
return e.model.encodeProfile(resource, scope, record, func(buf *bytes.Buffer, docID, index string) error {
switch index {
case otelserializer.StackTraceIndex:
return stackTracesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate)
return stackTracesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.StackFrameIndex:
return stackFramesSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate)
return stackFramesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.AllEventsIndex:
return eventsSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate)
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.ExecutablesIndex:
return executablesSession.Add(ctx, index, docID, buf, nil, docappender.ActionUpdate)
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
default:
return defaultSession.Add(ctx, index, docID, buf, nil, docappender.ActionCreate)
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
}
})
}

func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string {
if !e.config.LogsDynamicID.Enabled {
return ""
}

v, ok := m.Get(elasticsearch.DocumentIDAttributeName)
if !ok {
return ""
}
return v.AsString()
}

func (e *elasticsearchExporter) extractDocumentPipelineAttribute(m pcommon.Map) string {
// if Pipeline is configured for the whole exporter, use that.
if !e.config.LogsDynamicPipeline.Enabled {
return ""
}

v, ok := m.Get(elasticsearch.DocumentPipelineAttributeName)
if !ok {
return ""
}
return v.AsString()
}
Loading