Skip to content

Add streams docs #1091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added solutions/images/logs-streams-advanced.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-failures.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-field-stats.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-grok.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-parsed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-patterns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added solutions/images/logs-streams-retention.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions solutions/observability/logs/streams/management/advanced.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
applies_to:
serverless: preview
---
# Configure advanced settings [streams-advanced-settings]

The **Advanced** tab on the **Manage stream** page shows the lower-level details of your stream. While Streams simplifies many configurations, it doesn't currently support modifying all pipelines and templates. From the **Advanced** tab, you can manually interact with the index or component templates, or modify any of the other ingest pipelines that are being used.
This UI is intended for more advanced users.

![Screenshot of the Advanced tab](<../../../../images/logs-streams-advanced.png>)
154 changes: 154 additions & 0 deletions solutions/observability/logs/streams/management/extract.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
---
applies_to:
serverless: preview
---
# Extract fields [streams-extract-fields]

Unstructured log messages need to be parsed into meaningful fields so you can filter and analyze them quickly. Common fields to extract include timestamp and the log level, but you can also extract information like IP addresses, usernames, or ports.

Use the **Extract field** tab on the **Manage stream** page to process your data. The UI simulates your changes and provides an immediate preview that's tested end-to-end.

The UI also shows indexing problems, such as mapping conflicts, so you can address them before applying changes.

:::{note}
Applied changes aren't retroactive and only affect *future data ingested*.
:::

## Add a processor [streams-add-processors]

Streams uses {{es}} ingest pipelines to process your data. Ingest pipelines are made up of processors that transform your data.

To add a processor:

1. Select **Add processor** to open a list of supported processors.
1. Select a processor from the list:
- [Date](./extract/date.md)
- [Dissect](./extract/dissect.md)
- [Grok](./extract/grok.md)
- [Key-Value (KV)](./extract/key-value.md)
- GeoIP
- Rename
- Set
- URL Decode
1. Select **Add Processor** to save the processor.

:::{note}
Editing processors with JSON is planned for a future release. More processors may be added over time.
:::

### Add conditions to processors [streams-add-processor-conditions]

You can provide a condition for each processor under **Optional fields**. Conditions are boolean expressions that are evaluated for each document. Provide a field, a value, and a comparator.
Processors support these comparators:
- equals
- not equals
- less than
- less than or equals
- greater than
- greater than or equals
- contains
- starts with
- ends with
- exists
- not exists

### Preview changes [streams-preview-changes]

Under **Processors for field extraction**, when you set pipeline processors to modify your documents, **Data preview** shows you a preview of the results with additional filtering options depending on the outcome of the simulation.

When you add or edit processors, the **Data preview** updates automatically.

:::{note}
To avoid unexpected results, focus on adding processors rather than removing or reordering existing processors.
:::

**Data preview** loads 100 documents from your existing data and runs your changes using them.
For any newly added processors, this simulation is reliable. You can save individual processors during the preview, and even reorder them.
Selecting 'Save changes' applies your changes to the data stream.

If you edit the stream again, note the following:
- Adding more processors to the end of the list will work as expected.
- Changing existing processors or re-ordering them may cause unexpected results. Because the pipeline may have already processed the documents used for sampling, the UI cannot accurately simulate changes to existing data.
- Adding a new processor and moving it before an existing processor may cause unexpected results. The UI only simulates the new processor, not the existing ones, so the simulation may not accurately reflect changes to existing data.

![Screenshot of the Grok processor UI](<../../../../images/logs-streams-grok.png>)

### Ignore failures [streams-ignore-failures]

Turn on **Ignore failure** to ignore the processor if it fails. This is useful if you want to continue processing the document even if the processor fails.

### Ignore missing fields [streams-ignore-missing-fields]

Turn on **Ignore missing fields** to ignore the processor if the field is not present. This is useful if you want to continue processing the document even if the field is not present.

## Detect and handle failures [streams-detect-failures]

Documents fail processing for different reasons. Streams helps you to easily find and handle failures before deploying changes.

The following example shows not all messages matched the provided Grok pattern:

![Screenshot showing some failed documents](<../../../../images/logs-streams-parsed.png>)

You can filter your documents by selecting **Parsed** or **Failed** at the top of the table. Select **Failed** to see the documents that failed:

![Screenshot showing the documents UI with Failed selected](<../../../../images/logs-streams-failures.png>)

Failures are displayed at the bottom of the process editor:

![Screenshot showing failure notifications](<../../../../images/logs-streams-processor-failures.png>)

These failures may be something you should address, but in some cases they also act as more of a warning.

### Mapping Conflicts

As part of processing, Streams also checks for mapping conflicts by simulating the change end to end. If a mapping conflict is detected, Streams marks the processor as failed and displays a failure message:

![Screenshot showing mapping conflict notifications](<../../../../images/logs-streams-mapping-conflicts.png>)

## Processor statistics and detected fields [streams-stats-and-detected-fields]

Once saved, the processor also gives you a quick look at how successful the processing was for this step and which fields were added.

![Screenshot showing field stats](<../../../../images/logs-streams-field-stats.png>)

## Advanced: How and where do these changes get applied to the underlying datastream? [streams-applied-changes]

When you save processors, Streams modifies the "best matching" ingest pipeline for the data stream. In short, Streams either chooses the best matching pipeline ending in `@custom` that is already part of your data stream, or it adds one for you.

Streams identifies the appropriate @custom pipeline (for example, `logs-myintegration@custom` or `logs@custom`).
It checks the default_pipeline that is set on the datastream.

You can view the default pipeline at **Manage stream** → **Advanced** under **Ingest pipeline**.
In this default pipeline, we locate the last processor that calls a pipeline ending in `@custom`. For integrations, this would result in a pipeline name like `logs-myintegration@custom`. Without an integration, the only `@custom` pipeline available may be `logs@custom`.

- If no default pipeline is detected, Streams adds a default pipeline to the data stream by updating the index templates.
- If a default pipeline is detected, but it does not contain a custom pipeline, Streams adds the pipeline processor directly to the pipeline.

Streams then adds a pipeline processor to the end of that `@custom` pipeline. This processor definition directs matching documents to a dedicated pipeline managed by Streams called `<data_stream_name>@stream.processing`:

```json
// Example processor added to the relevant @custom pipeline
{
"pipeline": {
"name": "<data_stream_name>@stream.processing", // for example, [email protected]
"if": "ctx._index == '<data_stream_name>'",
"ignore_missing_pipeline": true,
"description": "Call the stream's managed pipeline - do not change this manually but instead use the Streams UI or API"
}
}
```

Streams then creates and manages the `<data_stream_name>@stream.processing` pipeline, adding the [processors](#streams-add-processors) you configured in the UI.

### User interaction with pipelines

Do not manually modify the `<data_stream_name>@stream.processing` pipeline created by Streams.
You can still add your own processors manually to the `@custom` pipeline if needed. Adding processors before the pipeline processor crated by Streams may cause unexpected behavior.

## Known limitations [streams-known-limitations]

- Streams does not support all processors. We are working on adding more processors in the future.
- Streams does not support all processor options. We are working on adding more options in the future.
- The data preview simulation may not accurately reflect the changes to the existing data when editing existing processors or re-ordering them.
- Dots in field names are not supported. You can use the dot expand processor in the `@custom` pipeline as a workaround. You need to manually add the dot processor.
- Providing any arbitrary JSON in the Streams UI is not supported. We are working on adding this in the future.
45 changes: 45 additions & 0 deletions solutions/observability/logs/streams/management/extract/date.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
applies_to:
serverless: preview
---

# Date processor [streams-date-processor]

The date processor parses date strings and uses them as the timestamp of the document.

This functionality uses the {{es}} date pipeline processor. Refer to [date processor](elasticsearch://reference/enrich-processor/date-processor.md) in the {{es}} docs for more information.

## Examples

The following list provides some common examples of date formats and how to parse them.

**Common formats**
```
2025-04-04T09:04:45+00:00 => ISO8601
1618886400 => UNIX
1618886400123 => UNIX_MS
4000000049c9f0ca => TAI64N
```

**Custom formats**
```
2023-10-15 => yyyy-MM-dd
15/10/2023 => dd/MM/yyyy
10-15-2023 => MM-dd-yyyy
2023-288 => yyyy-DDD
15 Oct 2023 => dd MMM yyyy
Sunday, October 15, 2023 => EEEE, MMMM dd, yyyy
2023-10-15T14:30:00Z => yyyy-MM-dd'T'HH:mm:ssX
2023-10-15 14:30:00 => yyyy-MM-dd HH:mm:ss
```


## Optional Fields [streams-date-optional-fields]
The following fields are optional for the date processor:

| Field | Description|
| ------- | --------------- |
| Target field | The field that will hold the parsed date. Defaults to `@timestamp`. |
| Timezone | The timezone to use when parsing the date. Supports template snippets. Defaults to `UTC`. |
| Locale | The locale to use when parsing the date, relevant when parsing month names or weekdays. Supports template snippets. Defaults to `ENGLISH`. |
| Output format | The format to use when writing the date to `target_field`. Must be a valid Java time pattern. Defaults to `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`. |
23 changes: 23 additions & 0 deletions solutions/observability/logs/streams/management/extract/dissect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
applies_to:
serverless: preview
---
# Dissect processor [streams-dissect-processor]

The dissect processor parses structured log messages and extracts fields from them. Unlike Grok, it does not use a set of predefined patterns to match the log messages. Instead, it uses a set of delimiters to split the log message into fields.
Dissect is much faster than Grok and can parse slightly more structured log messages.

This functionality uses the {{es}} dissect pipeline processor. Refer to [dissect processor](elasticsearch://reference/enrich-processor/dissect-processor.md) in the {{es}} docs for more information.

To parse a log message, simply name the field and list the delimiters you want to use. The dissect processor will then split the log message into fields based on the delimiters provided.

Example:

Log Message
```
2025-04-04T09:04:45+00:00 ERROR 160.200.87.105 127.79.135.127 21582
```
Dissect Pattern
```
%{timestamp} %{log.level} %{source.ip} %{destination.ip} %{destination.port}
```
46 changes: 46 additions & 0 deletions solutions/observability/logs/streams/management/extract/grok.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
applies_to:
serverless: preview
---
# Grok processor [streams-grok-processor]

The Grok processor parses unstructured log messages and extracts fields from them. It uses a set of predefined patterns to match the log messages and extract the fields. The Grok processor is very powerful and can parse a wide variety of log formats.

You can provide multiple patterns to the Grok processor. The Grok processor will try to match the log message against each pattern in the order they are provided. If a pattern matches, the fields will be extracted and the remaining patterns will not be used.
If a pattern does not match, the Grok processor will try the next pattern. If no patterns match, the Grok processor will fail and you can troubleshoot the issue. Refer to [generate patterns](#streams-grok-patterns) for more information.

Start with the most common patterns first and then add more specific patterns later. This reduces the number of runs the Grok processor has to do and improves the performance of the pipeline.

This functionality uses the {{es}} Grok pipeline processor. Refer to [Grok processor](elasticsearch://reference/enrich-processor/grok-processor.md) in the {{es}} docs for more information.

The Grok processor uses a set of predefined patterns to match the log messages and extract the fields.
You can also define your own pattern definitions by expanding the `Optional fields` section. This will allow you to define your own patterns and use them in the Grok processor.
The patterns are defined in the following format:

```
{
"MY_DATE": "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}"
}
```
Where `MY_DATE` is the name of the pattern.
The above pattern can then be used in the processor
```
%{MY_DATE:date}
```

## Generate Patterns [streams-grok-patterns]
Requires an LLM Connector to be configured.
Instead of writing the Grok patterns by hand, you can use the **Generate Patterns** button to generate the patterns for you.

% TODO Elastic LLM?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure on this one. @LucaWintergerst are we saying we want to describe configuring the LLM in a future iteration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we'll have an out of the box LLM soon that we should talk about here in the future


![generated patterns](<../../../../../images/logs-streams-patterns.png>)

Patterns can be accepted by clicking the plus icon next to the pattern. This will add the pattern to the list of patterns to be used in the Grok processor.

### How does the pattern generation work? [streams-grok-pattern-generation]
Under the hood, the 100 samples on the right hand side are grouped into categories of similar messages. For each category, a Grok pattern is generated by sending a few samples to the LLM. Matching patterns are then shown in the UI.

:::{note}
This can incur additional costs, depending on the LLM connector you are using. Typically a single iteration uses between 1000 and 5000 tokens, depending on the number of identified categories and the length of the messages.
:::
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
navigation_title: KV processor
applies_to:
serverless: preview
---
# Key value processor [streams-kv-processor]

The key value (KV) processor allows you to extract key-value pairs from a field and assign them to a target field or the root of the document.

This functionality uses the {{es}} kv pipeline processor. Refer to [KV processor](elasticsearch://reference/enrich-processor/kv-processor.md) in the {{es}} docs for more information.

## Required fields [streams-kv-required-fields]

The KV processor requires the following fields:

| Field | Description|
| ------- | --------------- |
| Field | The field to be parsed.|
| Field split | Regex pattern used to delimit the key-value pairs. Typically a space character (" "). |
| Value split | Regex pattern used to delimit the key from the value. Typically an equals sign (=). |

## Optional fields [streams-kv-optional-fields]

The following fields are optional for the KV processor:

| Field | Description|
| ------- | --------------- |
| Target field | The field to assign the parsed key-value pairs to. If not specified, the key-value pairs are assigned to the root of the document. |
| Include keys | A list of extracted keys to include in the output. If not specified, all keys are included by default. Type and then hit "ENTER" to add keys. |
| Exclude keys | A list of extracted keys to exclude from the output. Type and then hit "ENTER" to add keys. |
| Prefix | A prefix to add to extracted keys. |
| Trim key | A string of characters to trim from extracted keys. |
| Trim value | A string of characters to trim from extracted values. |
| Strip brackets | Removes brackets ( (), <>, []) and quotes (', ") from extracted values.|
23 changes: 23 additions & 0 deletions solutions/observability/logs/streams/management/retention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
applies_to:
serverless: preview
---

# Manage data retention [streams-data-retention]

Use the **Data retention** tab on the **Manage stream** page to set how long your stream retains data and to get insight into your stream's data ingestion and storage size.

![Screenshot of the data retention UI](<../../../../images/logs-streams-retention.png>)

The **Data retention** page is made up of the following components that can help you determine how long you want your stream to retain data:

- **Retention period**: The minimum number of days after which the data is deleted
- **Source**: The origin of the data retention policy.
- **Ingestion**: Estimated ingestion per day and month calculated based on the size of all data in the stream and divided by the age of the stream. This is an estimate, and the actual ingestion may vary.
- **Total doc count**: The total number of documents in the stream.
- **Ingestion Rate**: Estimated ingestion rate per time bucket. The bucket interval is dynamic and adjusts based on the selected time range. The ingestion rate is calculated based on the average document size in a stream, multiplied by the number of documents in the bucket. This is an estimate, and the actual ingestion rate may vary.

## Edit the data retention period [streams-update-data-retention]
Select `Edit data retention` to change how long data for your stream is retained. The **Retention period** is the minimum number of days after which the data is deleted.

To define a global default retention policy, refer to [project settings](../../../../../deploy-manage/deploy/elastic-cloud/project-settings.md).
Loading
Loading