Skip to content

Enable failure store for log data streams #131261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2872d81
Enabling failure store for log data streams
eyalkoren Jul 15, 2025
915a9c0
Merge remote-tracking branch 'upstream/main' into logs-enable-faliure…
eyalkoren Jul 15, 2025
7c53ce7
Update docs/changelog/131261.yaml
eyalkoren Jul 15, 2025
011f862
Adjusting tests
eyalkoren Jul 15, 2025
f8c34c1
Merge remote-tracking branch 'eyalkoren/logs-enable-faliure-store' in…
eyalkoren Jul 15, 2025
de13ae7
Merge remote-tracking branch 'upstream/main' into logs-enable-faliure…
eyalkoren Jul 16, 2025
cac9c82
Update docs/changelog/131261.yaml
eyalkoren Jul 16, 2025
2304df6
Merge remote-tracking branch 'eyalkoren/logs-enable-faliure-store' in…
eyalkoren Jul 16, 2025
71a4898
Bump StackTemplateRegistry#REGISTRY_VERSION
eyalkoren Jul 16, 2025
af5a001
Replace explicit indices with API conventions in yaml tests
eyalkoren Jul 16, 2025
a686b70
Merge remote-tracking branch 'upstream/main' into logs-enable-faliure…
eyalkoren Jul 16, 2025
a007922
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 24, 2025
c6818f5
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 24, 2025
6218349
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 24, 2025
781436d
Update docs/changelog/131261.yaml
eyalkoren Jul 24, 2025
0e3647d
Update 131261.yaml
eyalkoren Jul 27, 2025
789ca17
Fixing release notes body
eyalkoren Jul 27, 2025
1e2b56c
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 27, 2025
a7eb8d2
Update 131261.yaml
LucaWintergerst Jul 28, 2025
8a1b804
Merge remote-tracking branch 'upstream/main' into logs-enable-faliure…
eyalkoren Jul 29, 2025
10af65e
Using cluster feature instead of synthetic version for yaml test
eyalkoren Jul 29, 2025
7ec58e0
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 29, 2025
595c6b3
add proper docs links instead of static links
LucaWintergerst Jul 29, 2025
11f322d
Merge branch 'main' into logs-enable-faliure-store
eyalkoren Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/changelog/131261.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pr: 131261
summary: Enable Failure Store for new logs-*-* data streams
area: Data streams
type: feature
issues:
- 131105
highlight:
title: Enable Failure Store for new logs data streams
body: |-
The [Failure Store](docs-content://manage-data/data-store/data-streams/failure-store.md) is now enabled by default for new logs data streams matching the pattern `logs-*-*`. This means that such data streams will now store invalid documents in a
dedicated failure index instead of rejecting them, allowing better visibility and control over data quality issues without loosing data. This can be [enabled manually](docs-content://manage-data/data-store/data-streams/failure-store.md#set-up-failure-store-existing) for existing data streams.
Note: With the failure store enabled, the http response code clients receive when indexing invalid documents will change from `400 Bad Request` to `201 Created`, with an additional response attribute `"failure_store" : "used"`.
notable: true
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -740,6 +741,73 @@ public void testIgnoreDynamicBeyondLimit() throws Exception {
assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
}

@SuppressWarnings("unchecked")
public void testFailureStoreWithInvalidFieldType() throws Exception {
String dataStreamName = "logs-app-with-failure-store";
createDataStream(client, dataStreamName);

indexDoc(client, dataStreamName, """
{
"@timestamp": "2023-11-30T12:00:00Z",
"message": "This is a valid message"
}
""");

// invalid document (message as an object instead of string)
indexDoc(client, dataStreamName, """
{
"@timestamp": "2023-11-30T12:01:00Z",
"message": {
"nested": "This should fail because message should be a string"
}
}
""");

refreshAllIndices();

Request dsInfoRequest = new Request("GET", "/_data_stream/" + dataStreamName);
Map<String, Object> dsInfoResponse = entityAsMap(client.performRequest(dsInfoRequest));
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) dsInfoResponse.get("data_streams");
Map<String, Object> dataStream = dataStreams.getFirst();
Map<String, Object> failureStoreInfo = (Map<String, Object>) dataStream.get("failure_store");
assertNotNull(failureStoreInfo);
assertThat(failureStoreInfo.get("enabled"), is(true));
List<Map<String, Object>> failureIndices = (List<Map<String, Object>>) failureStoreInfo.get("indices");

assertThat(failureIndices, not(empty()));
String failureIndex = (String) failureIndices.getFirst().get("index_name");
assertThat(failureIndex, matchesRegex("\\.fs-" + dataStreamName + "-.*"));

// query the failure store index
Request failureStoreQuery = new Request("GET", "/" + failureIndex + "/_search");
failureStoreQuery.setJsonEntity("""
{
"query": {
"match_all": {}
}
}
""");
Map<String, Object> failureStoreResponse = entityAsMap(client.performRequest(failureStoreQuery));
Map<String, Object> hits = (Map<String, Object>) failureStoreResponse.get("hits");
List<Map<String, Object>> hitsList = (List<Map<String, Object>>) hits.get("hits");

// Verify the failed document is in the failure store
assertThat(hitsList.size(), is(1));
Map<String, Object> failedDoc = (Map<String, Object>) hitsList.getFirst().get("_source");
Map<String, Object> document = (Map<String, Object>) failedDoc.get("document");
assertNotNull(document);
Map<String, Object> source = (Map<String, Object>) document.get("source");
assertNotNull(source);
Map<String, Object> message = (Map<String, Object>) source.get("message");
assertNotNull(message);
assertThat(message.get("nested"), equalTo("This should fail because message should be a string"));
Map<String, Object> error = (Map<String, Object>) failedDoc.get("error");
assertNotNull(error);
assertEquals("document_parsing_exception", error.get("type"));
String errorMessage = (String) error.get("message");
assertThat(errorMessage, containsString("failed to parse field [message] of type [match_only_text] in document with id"));
}

@Override
protected String indexTemplateName() {
return "logs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@ public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature LOGS_STREAM_FEATURE = new NodeFeature("logs_stream");

public static final NodeFeature FAILURE_STORE_IN_LOG_DATA_STREAMS = new NodeFeature("logs_data_streams.failure_store.enabled");

@Override
public Set<NodeFeature> getFeatures() {
return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX, LOGS_STREAM_FEATURE);
return Set.of(
DATA_STREAM_FAILURE_STORE_TSDB_FIX,
DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX,
LOGS_STREAM_FEATURE,
FAILURE_STORE_IN_LOG_DATA_STREAMS
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
setup:
- requires:
cluster_features: [ "logs_data_streams.failure_store.enabled" ]
reason: "failure store became enabled by default for log data streams in 9.2.0"

- do:
indices.create_data_stream:
name: logs-app-default
---
teardown:
- do:
indices.delete_data_stream:
name: logs-app-default
ignore: 404

---
"Test logs-*-* data streams have failure store enabled by default":
# index a valid document (string message)
- do:
index:
index: logs-app-default
refresh: true
body:
'@timestamp': '2023-01-01T12:00:00Z'
host:
name: 'server-01'
severity: 'INFO'
message: "Application started successfully"
- match: { result: created }

- do:
indices.get_data_stream:
name: logs-app-default
- match: { data_streams.0.name: logs-app-default }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.failure_store.enabled: true }
- length: { data_streams.0.failure_store.indices: 0 }

# index a document with (object message, causing a mapping conflict)
- do:
index:
index: logs-app-default
refresh: true
body:
'@timestamp': '2023-01-01T12:01:00Z'
host:
name: 'server-02'
severity: 'ERROR'
message:
struct:
value: 42
- match: { result: 'created' }
- match: { failure_store: used}

- do:
indices.get_data_stream:
name: logs-app-default
- length: { data_streams.0.failure_store.indices: 1 }

- do:
search:
index: logs-app-default::data
body:
query:
match_all: {}
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.severity: "INFO" }
- match: { hits.hits.0._source.message: "Application started successfully" }

- do:
search:
index: logs-app-default::failures
body:
query:
match_all: {}
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.document.source.message.struct.value: 42 }
- match: { hits.hits.0._source.error.type: "document_parsing_exception" }
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ public void testDSXpackUsage() throws Exception {
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
assertBusy(() -> {
Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
assertThat(logsTemplate, notNullValue());
assertThat(logsTemplate.get("name"), equalTo("logs"));
assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
Map<?, ?> syntheticsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/synthetics").get("index_templates")).get(0);
assertThat(syntheticsTemplate, notNullValue());
assertThat(syntheticsTemplate.get("name"), equalTo("synthetics"));
assertThat(((Map<?, ?>) syntheticsTemplate.get("index_template")).get("data_stream"), notNullValue());
});
putFailureStoreTemplate();

// Create a data stream
Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
Request indexRequest = new Request("POST", "/synthetics-myapp-default/_doc");
indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
client().performRequest(indexRequest);

// Roll over the data stream
Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
Request rollover = new Request("POST", "/synthetics-myapp-default/_rollover");
client().performRequest(rollover);

// Create failure store data stream
Expand All @@ -105,10 +105,10 @@ public void testDSXpackUsage() throws Exception {
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));

// Enable the failure store for logs-mysql-default using the cluster setting...
// Enable the failure store for synthetics-myapp-default using the cluster setting...
updateClusterSettings(
Settings.builder()
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "logs-mysql-default")
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "synthetics-myapp-default")
.build()
);
// ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
},
"default_pipeline": "logs@default-pipeline"
}
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"_meta": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {

// The stack template registry version. This number must be incremented when we make changes
// to built-in templates.
public static final int REGISTRY_VERSION = 16;
public static final int REGISTRY_VERSION = 17;

public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ setup:
data_stream.namespace: "namespace1"

- do:
catch: bad_request
index:
index: logs-dataset0-namespace1
body:
"@timestamp": "2020-01-01"
data_stream.type: "metrics"
data_stream.dataset: "dataset0"
data_stream.namespace: "namespace1"
- match: { failure_store: used }

- do:
catch: bad_request
Expand Down
Loading