Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 90 additions & 14 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
return tmp_str;
}

static int es_action_line_value_is_safe(const char *value, size_t len)
{
size_t i;
unsigned char c;

for (i = 0; i < len; i++) {
c = (unsigned char) value[i];
if (c == '\n' || c == '\r' || c == '"' || c == '\\' || c < 0x20) {
return FLB_FALSE;
}
}

return FLB_TRUE;
}

static int compose_index_header(struct flb_elasticsearch *ctx,
int es_index_custom_len,
char *logstash_index, size_t logstash_index_size,
Expand Down Expand Up @@ -293,6 +308,10 @@ static int elasticsearch_format(struct flb_config *config,
int len;
int map_size;
int index_len = 0;
int write_op_update = FLB_FALSE;
int write_op_upsert = FLB_FALSE;
int id_key_required = FLB_FALSE;
int id_key_safe;
size_t s = 0;
size_t off = 0;
size_t off_prev = 0;
Expand Down Expand Up @@ -345,10 +364,16 @@ static int elasticsearch_format(struct flb_config *config,
return -1;
}

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

if (ctx->ra_id_key && ctx->generate_id == FLB_FALSE &&
(write_op_update == FLB_TRUE || write_op_upsert == FLB_TRUE)) {
id_key_required = FLB_TRUE;
}

/*
Expand Down Expand Up @@ -403,21 +428,29 @@ static int elasticsearch_format(struct flb_config *config,
map = *log_event.body;
map_size = map.via.map.size;

/* Copy logstash prefix for the per-record fallback path. */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
}

es_index_custom_len = 0;
if (ctx->logstash_prefix_key) {
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
if (es_action_line_value_is_safe(v, len) == FLB_TRUE) {
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}
es_index_custom_len = len;
}
es_index_custom_len = len;
flb_sds_destroy(v);
}
}
Expand Down Expand Up @@ -538,7 +571,15 @@ static int elasticsearch_format(struct flb_config *config,
}
if (ctx->ra_id_key) {
id_key_str = es_get_id_value(ctx ,&map);
if (id_key_str) {
id_key_safe = FLB_FALSE;

if (id_key_str &&
es_action_line_value_is_safe(id_key_str,
flb_sds_len(id_key_str)) == FLB_TRUE) {
id_key_safe = FLB_TRUE;
}

if (id_key_safe == FLB_TRUE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
Expand All @@ -553,6 +594,35 @@ static int elasticsearch_format(struct flb_config *config,
ctx->es_action,
es_index, ctx->type, id_key_str);
}
}
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
Comment on lines +598 to +623

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve generated _id when rejecting an unsafe optional Id_Key.

If Generate_ID is enabled, lines 551-570 already compose a safe generated _id. This unsafe Id_Key fallback then rewrites the header without _id, which can break update/upsert requests and removes generated-ID deduplication for index/create operations.

🐛 Proposed fix
-            else if (id_key_str) {
+            else if (id_key_str && ctx->generate_id == FLB_FALSE) {
                 if (ctx->suppress_type_name) {
                     index_len = flb_sds_snprintf(&j_index,
                                                  flb_sds_alloc(j_index),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (id_key_str && ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
ctx->es_action,
es_index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
ES_BULK_INDEX_FMT,
ctx->es_action,
es_index, ctx->type);
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_es/es.c` around lines 598 - 623, The code currently discards any
previously generated `_id` (from `Generate_ID`) when rejecting an unsafe
optional `Id_Key` value in the id_key_required == FLB_TRUE condition block.
Instead of immediately continuing to skip the record, preserve the generated ID
by allowing the bulk header construction logic (similar to what occurs in the
else if (id_key_str) block with suppress_type_name) to execute using the
generated `_id` value. Modify the control flow so that when id_key_required is
true but Id_Key is unsafe/missing, the code still constructs the bulk index
header with the previously generated `_id` rather than abandoning it.


if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
Expand All @@ -570,13 +640,13 @@ static int elasticsearch_format(struct flb_config *config,
}

out_buf_len = flb_sds_len(out_buf);
if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
if (write_op_update == FLB_TRUE) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
flb_sds_destroy(tmp_buf);
}
else if (strcasecmp(ctx->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
else if (write_op_upsert == FLB_TRUE) {
tmp_buf = out_buf;
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
Expand Down Expand Up @@ -845,6 +915,12 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_ERROR);
}

if (out_size == 0) {
flb_free(out_buf);
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_OK);
}

pack = (char *) out_buf;
pack_size = out_size;

Expand Down
112 changes: 92 additions & 20 deletions plugins/out_opensearch/opensearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ static flb_sds_t os_get_id_value(struct flb_opensearch *ctx,
return tmp_str;
}

static int os_action_line_value_is_safe(const char *value, size_t len)
{
size_t i;
unsigned char c;

for (i = 0; i < len; i++) {
c = (unsigned char) value[i];
if (c == '\n' || c == '\r' || c == '"' || c == '\\' || c < 0x20) {
return FLB_FALSE;
}
}

return FLB_TRUE;
}

static int compose_index_header(struct flb_opensearch *ctx,
int index_custom_len,
char *logstash_index, size_t logstash_index_size,
Expand Down Expand Up @@ -283,6 +298,8 @@ static int opensearch_format(struct flb_config *config,
int index_len = 0;
int write_op_update = FLB_FALSE;
int write_op_upsert = FLB_FALSE;
int id_key_required = FLB_FALSE;
int id_key_safe;
flb_sds_t ra_index = NULL;
size_t s = 0;
char *index = NULL;
Expand Down Expand Up @@ -330,10 +347,16 @@ static int opensearch_format(struct flb_config *config,
return -1;
}

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

if (ctx->ra_id_key && ctx->generate_id == FLB_FALSE &&
(write_op_update == FLB_TRUE || write_op_upsert == FLB_TRUE)) {
id_key_required = FLB_TRUE;
}

/*
Expand Down Expand Up @@ -393,22 +416,30 @@ static int opensearch_format(struct flb_config *config,
map = *log_event.body;
map_size = map.via.map.size;

/* Copy logstash prefix for the per-record fallback path. */
if (ctx->logstash_format == FLB_TRUE) {
strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
logstash_index[sizeof(logstash_index) - 1] = '\0';
}

index_custom_len = 0;
if (ctx->logstash_prefix_key) {
flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
(char *) tag, tag_len,
map, NULL);
if (v) {
len = flb_sds_len(v);
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}
if (os_action_line_value_is_safe(v, len) == FLB_TRUE) {
if (len > 128) {
len = 128;
memcpy(logstash_index, v, 128);
}
else {
memcpy(logstash_index, v, len);
}

index_custom_len = len;
index_custom_len = len;
}
flb_sds_destroy(v);
}
}
Expand Down Expand Up @@ -492,6 +523,11 @@ static int opensearch_format(struct flb_config *config,
if (!ra_index) {
flb_plg_warn(ctx->ins, "invalid index translation from record accessor pattern, default to static index");
}
else if (os_action_line_value_is_safe(ra_index,
flb_sds_len(ra_index)) == FLB_FALSE) {
flb_sds_destroy(ra_index);
ra_index = NULL;
}
else {
index = ra_index;
}
Expand Down Expand Up @@ -566,7 +602,15 @@ static int opensearch_format(struct flb_config *config,
}
if (ctx->ra_id_key) {
id_key_str = os_get_id_value(ctx ,&map);
if (id_key_str) {
id_key_safe = FLB_FALSE;

if (id_key_str &&
os_action_line_value_is_safe(id_key_str,
flb_sds_len(id_key_str)) == FLB_TRUE) {
id_key_safe = FLB_TRUE;
}

if (id_key_safe == FLB_TRUE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
Expand All @@ -581,6 +625,35 @@ static int opensearch_format(struct flb_config *config,
ctx->action,
index, ctx->type, id_key_str);
}
}
else if (id_key_required == FLB_TRUE) {
flb_plg_warn(ctx->ins,
"skipping record with missing or unsafe Id_Key value");
if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
msgpack_sbuffer_destroy(&tmp_sbuf);
continue;
}
else if (id_key_str) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
OS_BULK_INDEX_FMT_NO_TYPE,
ctx->action,
index);
}
else {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),
OS_BULK_INDEX_FMT,
ctx->action,
index, ctx->type);
}
}
Comment on lines +629 to +654

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve generated _id when rejecting an unsafe optional Id_Key.

If Generate_ID is enabled, lines 578-601 already compose a safe generated _id. This unsafe Id_Key fallback then rewrites the header without _id, which can break update/upsert requests and removes generated-ID deduplication for index/create operations.

🐛 Proposed fix
-            else if (id_key_str) {
+            else if (id_key_str && ctx->generate_id == FLB_FALSE) {
                 if (ctx->suppress_type_name) {
                     index_len = flb_sds_snprintf(&j_index,
                                                  flb_sds_alloc(j_index),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_opensearch/opensearch.c` around lines 629 - 654, When rejecting
an unsafe Id_Key value in the conditional block that checks id_key_required and
id_key_str, ensure that any previously generated safe _id from the Generate_ID
feature is preserved in the index header. Modify the else if (id_key_str) block
that formats the index header using OS_BULK_INDEX_FMT_NO_TYPE or
OS_BULK_INDEX_FMT to check whether a generated _id already exists, and if so,
avoid overwriting the header in a way that removes the _id field. This prevents
breaking update/upsert requests and maintains deduplication for index/create
operations that rely on the generated ID.


if (id_key_str) {
flb_sds_destroy(id_key_str);
id_key_str = NULL;
}
Expand Down Expand Up @@ -613,13 +686,6 @@ static int opensearch_format(struct flb_config *config,
return -1;
}

if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) {
write_op_update = FLB_TRUE;
}
else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
write_op_upsert = FLB_TRUE;
}

/* UPDATE | UPSERT */
if (write_op_update) {
flb_sds_cat_safe(&bulk,
Expand Down Expand Up @@ -913,6 +979,12 @@ static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_ERROR);
}

if (out_size == 0) {
flb_sds_destroy(out_buf);
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_OK);
}

pack = (char *) out_buf;
pack_size = out_size;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
service:
flush: 1
grace: 1
log_level: info
http_server: on
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}

pipeline:
inputs:
- name: dummy
tag: out_es_id_key_ndjson
dummy: '{"doc_id":"legit\"\n{\"delete\":{\"_index\":\"audit-logs\",\"_id\":\"critical-audit-record-12345\"}}\n{\"create\":{\"_index\":\"pad\",\"_id\":\"x","message":"id-key injection"}'
samples: 1

outputs:
- name: es
match: out_es_id_key_ndjson
host: 127.0.0.1
port: ${TEST_SUITE_HTTP_PORT}
index: fluent-bit
suppress_type_name: on
id_key: doc_id
retry_limit: 0
Loading
Loading