Skip to content

Commit

Permalink
[processor/transform] Add support for global condition's fully qualif…
Browse files Browse the repository at this point in the history
…ied paths and error mode overrides (#37676)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR is part of
#29017,
and a split from
#36888.
It changes the `transformprocessor`, adding support for error mode
overrides per context statements group, and global `conditions` with
fully qualified paths for context-inferred configuration styles.

Change log:
- Structured context-inferred configuration styles (without `context:
<value>`) now supports and requires conditions with fully qualified
paths, for example:
  
    ```yaml
   log_statements:
      - statements:
          - set(log.attributes["pass"], "true")
        conditions:
          - log.attributes["pass"] == nil
    
  ```
If non qualified paths are used, it raises an error, for example `-
attributes["pass"] == nil` would generate the following error:
  
  ```
invalid config for "transform" processor unable to parse OTTL condition
"attributes[\"pass\"] == nil": missing context name for path
"attributes[pass]", possibly valid options are: "log.attributes[pass]",
"instrumentation_scope.attributes[pass]", "resource.attributes[pass]"
  ``` 
Existing non-context inferred configurations shouldn't be impacted by
this change.

- A new configuration `error_mode` key has been added to the context
statements group (`common.ContextStatements`),
When provided, it overrides the top-level error mode, offering more
granular control over error handling, for example:

     ```yaml
      error_mode: propagate
      log_statements:
        - context: log
          error_mode: ignore
          statements: 
            - set(attributes["attr1"], ParseJSON(1))
        - context: log
          statements:
            - set(attributes["executed"], true)
    ```

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

#29017

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Unit tests

<!--Describe the documentation added.-->
#### Documentation
Changed the README to include the new configuration key `error_mode`

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
edmocosta and evan-bradley authored Feb 6, 2025
1 parent 66082ae commit 9280ca1
Show file tree
Hide file tree
Showing 12 changed files with 725 additions and 20 deletions.
31 changes: 31 additions & 0 deletions .chloggen/add-global-conditions-and-group-error-mode-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for global conditions and error mode overrides.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29017]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Global conditions are now available for context-inferred structured configurations, allowing the use of fully
qualified paths. Additionally, a new configuration key called `error_mode` has been added to the context statements group.
This key determines how the processor reacts to errors that occur while processing that specific group of statements.
When provided, it overrides the top-level error mode, offering more granular control over error handling.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
22 changes: 22 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ If a context doesn't meet any of the conditions, then the associated statement w
Each statement may have a Where clause that acts as an additional check for whether to execute the statement.

The transform processor also allows configuring an optional field, `error_mode`, which will determine how the processor reacts to errors that occur while processing a statement.
The top-level `error_mode` can be overridden at context statements level, offering more granular control over error handling.

| error_mode | description |
|------------|---------------------------------------------------------------------------------------------------------------------------------------------|
Expand All @@ -54,6 +55,7 @@ transform:
error_mode: ignore
<trace|metric|log>_statements:
- context: string
error_mode: propagate
conditions:
- string
- string
Expand All @@ -62,6 +64,7 @@ transform:
- string
- string
- context: string
error_mode: silent
statements:
- string
- string
Expand Down Expand Up @@ -646,6 +649,25 @@ transform:
- set(attributes["nested.attr3"], cache["nested"]["attr3"])
```

### Override context statements error mode

```yaml
transform:
# default error mode applied to all context statements
error_mode: propagate
log_statements:
- context: log
# overrides the default error mode for these statements
error_mode: ignore
statements:
- merge_maps(cache, ParseJSON(body), "upsert") where IsMatch(body, "^\\{")
- set(attributes["attr1"], cache["attr1"])
- context: log
statements:
- set(attributes["namespace"], attributes["k8s.namespace.name"])
```

### Get Severity of an Unstructured Log Body

Given the following unstructured log body
Expand Down
36 changes: 36 additions & 0 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,42 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, "context_statements_error_mode"),
expected: &Config{
ErrorMode: ottl.IgnoreError,
TraceStatements: []common.ContextStatements{
{
Statements: []string{`set(resource.attributes["name"], "propagate")`},
ErrorMode: ottl.PropagateError,
},
{
Statements: []string{`set(resource.attributes["name"], "ignore")`},
ErrorMode: "",
},
},
MetricStatements: []common.ContextStatements{
{
Statements: []string{`set(resource.attributes["name"], "silent")`},
ErrorMode: ottl.SilentError,
},
{
Statements: []string{`set(resource.attributes["name"], "ignore")`},
ErrorMode: "",
},
},
LogStatements: []common.ContextStatements{
{
Statements: []string{`set(resource.attributes["name"], "propagate")`},
ErrorMode: ottl.PropagateError,
},
{
Statements: []string{`set(resource.attributes["name"], "ignore")`},
ErrorMode: "",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.Name(), func(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion processor/transformprocessor/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type ContextStatements struct {
Context ContextID `mapstructure:"context"`
Conditions []string `mapstructure:"conditions"`
Statements []string `mapstructure:"statements"`

// ErrorMode determines how the processor reacts to errors that occur while processing
// this group of statements. When provided, it overrides the default Config ErrorMode.
ErrorMode ottl.ErrorMode `mapstructure:"error_mode"`
// `SharedCache` is an experimental feature that may change or be removed in the future.
// When enabled, it allows the statements cache to be shared across all other groups that share the cache.
// This feature is not configurable via `mapstructure` and cannot be set in configuration files.
Expand Down
12 changes: 10 additions & 2 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,19 @@ func convertLogStatements(pc *ottl.ParserCollection[LogsConsumer], _ *ottl.Parse
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottllog.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottllog.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLogWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardLogFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode))
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(errorMode))
return logStatements{lStatements, globalExpr}, nil
}

Expand Down
24 changes: 20 additions & 4 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,19 @@ func convertMetricStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *ottl
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardMetricFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottlmetric.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottlmetric.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetricWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardMetricFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(pc.ErrorMode))
mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(errorMode))
return metricStatements{mStatements, globalExpr}, nil
}

Expand All @@ -229,11 +237,19 @@ func convertDataPointStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *o
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardDataPointFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottldatapoint.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottldatapoint.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPointWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardDataPointFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(pc.ErrorMode))
dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(errorMode))
return dataPointStatements{dpStatements, globalExpr}, nil
}

Expand Down
31 changes: 24 additions & 7 deletions processor/transformprocessor/internal/common/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,19 @@ func parseResourceContextStatements[R any](
if err != nil {
return *new(R), err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardResourceFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottlresource.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottlresource.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResourceWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardResourceFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return *new(R), errGlobalBoolExpr
}
rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.Settings, ottlresource.WithStatementSequenceErrorMode(pc.ErrorMode))
rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.Settings, ottlresource.WithStatementSequenceErrorMode(errorMode))
result := (baseContext)(resourceStatements{rStatements, globalExpr})
return result.(R), nil
}
Expand All @@ -220,24 +228,33 @@ func parseScopeContextStatements[R any](
if err != nil {
return *new(R), err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardScopeFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottlscope.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottlscope.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScopeWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardScopeFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return *new(R), errGlobalBoolExpr
}
sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.Settings, ottlscope.WithStatementSequenceErrorMode(pc.ErrorMode))
sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.Settings, ottlscope.WithStatementSequenceErrorMode(errorMode))
result := (baseContext)(scopeStatements{sStatements, globalExpr})
return result.(R), nil
}

func parseGlobalExpr[K any](
boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings) (*ottl.ConditionSequence[K], error),
func parseGlobalExpr[K any, O any](
boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings, []O) (*ottl.ConditionSequence[K], error),
conditions []string,
errorMode ottl.ErrorMode,
settings component.TelemetrySettings,
standardFuncs map[string]ottl.Factory[K],
parserOptions []O,
) (expr.BoolExpr[K], error) {
if len(conditions) > 0 {
return boolExprFunc(conditions, standardFuncs, errorMode, settings)
return boolExprFunc(conditions, standardFuncs, errorMode, settings, parserOptions)
}
// By default, set the global expression to always true unless conditions are specified.
return expr.AlwaysTrue[K](), nil
Expand Down
24 changes: 20 additions & 4 deletions processor/transformprocessor/internal/common/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,19 @@ func convertSpanStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ottl.Pa
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardSpanFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottlspan.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottlspan.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardSpanFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.Settings, ottlspan.WithStatementSequenceErrorMode(pc.ErrorMode))
sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.Settings, ottlspan.WithStatementSequenceErrorMode(errorMode))
return traceStatements{sStatements, globalExpr}, nil
}

Expand All @@ -157,11 +165,19 @@ func convertSpanEventStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ot
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardSpanEventFuncs())
errorMode := pc.ErrorMode
if contextStatements.ErrorMode != "" {
errorMode = contextStatements.ErrorMode
}
var parserOptions []ottlspanevent.Option
if contextStatements.Context == "" {
parserOptions = append(parserOptions, ottlspanevent.EnablePathContextNames())
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEventWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardSpanEventFuncs(), parserOptions)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.Settings, ottlspanevent.WithStatementSequenceErrorMode(pc.ErrorMode))
seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.Settings, ottlspanevent.WithStatementSequenceErrorMode(errorMode))
return spanEventStatements{seStatements, globalExpr}, nil
}

Expand Down
Loading

0 comments on commit 9280ca1

Please sign in to comment.