out_es/out_opensearch: sanitize bulk action metadata#11967
Conversation
📝 WalkthroughWalkthroughAdds ChangesBulk action line injection sanitization
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 952b125a93
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| else { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| ES_BULK_INDEX_FMT_ID, | ||
| ctx->es_action, | ||
| es_index, ctx->type, id_key_str); | ||
| if (ctx->suppress_type_name) { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| ES_BULK_INDEX_FMT_WITHOUT_TYPE, | ||
| ctx->es_action, | ||
| es_index); |
There was a problem hiding this comment.
Preserve required _id for update/upsert
When Id_Key resolves to a value rejected by the new safety check and Write_Operation is update or upsert, this fallback emits the same update action without an _id. The config path explicitly requires Id_Key or Generate_Id for update/upsert (plugins/out_es/es_conf.c:320-322), so real Elasticsearch bulk requests reject these items and they can be retried/dropped instead of being handled safely; this path should fail/skip the record rather than formatting an id-less update action.
Useful? React with 👍 / 👎.
| else { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| OS_BULK_INDEX_FMT_ID, | ||
| ctx->action, | ||
| index, ctx->type, id_key_str); | ||
| if (ctx->suppress_type_name) { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| OS_BULK_INDEX_FMT_NO_TYPE, | ||
| ctx->action, | ||
| index); |
There was a problem hiding this comment.
Preserve required _id for update/upsert
When id_key resolves to a value rejected by the new safety check and Write_Operation is update or upsert, this fallback emits the same update action without an _id. The config path explicitly requires id_key or generate_id for update/upsert (plugins/out_opensearch/os_conf.c:180-184), so real OpenSearch bulk requests reject these items and they can be retried/dropped instead of being handled safely; this path should fail/skip the record rather than formatting an id-less update action.
Useful? React with 👍 / 👎.
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
952b125 to
c95aec4
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
plugins/out_opensearch/opensearch.c (1)
419-443: ⚡ Quick winMove the record-accessor prefix variable to the function declarations.
flb_sds_t vis declared inside theifblock; this file’s C style requires variables at the start of functions.♻️ Proposed style fix
flb_sds_t out_buf; flb_sds_t id_key_str = NULL; + flb_sds_t ra_prefix_value; @@ index_custom_len = 0; if (ctx->logstash_prefix_key) { - flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, - (char *) tag, tag_len, - map, NULL); - if (v) { - len = flb_sds_len(v); - if (os_action_line_value_is_safe(v, len) == FLB_TRUE) { + ra_prefix_value = flb_ra_translate(ctx->ra_prefix_key, + (char *) tag, tag_len, + map, NULL); + if (ra_prefix_value) { + len = flb_sds_len(ra_prefix_value); + if (os_action_line_value_is_safe(ra_prefix_value, len) == FLB_TRUE) { if (len > 128) { len = 128; - memcpy(logstash_index, v, 128); + memcpy(logstash_index, ra_prefix_value, 128); } else { - memcpy(logstash_index, v, len); + memcpy(logstash_index, ra_prefix_value, len); } index_custom_len = len; } - flb_sds_destroy(v); + flb_sds_destroy(ra_prefix_value); } }As per coding guidelines, “Declare variables at the start of functions, not mid-block.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_opensearch/opensearch.c` around lines 419 - 443, Move the variable declaration of flb_sds_t v from inside the if (ctx->logstash_prefix_key) conditional block to the function declarations section at the start of the function where other variables are declared. Keep the assignment of v using flb_ra_translate() inside the conditional block where it currently is, only removing the declaration part from there.Source: Coding guidelines
plugins/out_es/es.c (1)
431-455: ⚡ Quick winMove the record-accessor prefix variable to the function declarations.
flb_sds_t vis declared inside theifblock; this file’s C style requires variables at the start of functions.♻️ Proposed style fix
flb_sds_t tmp_buf; flb_sds_t id_key_str = NULL; + flb_sds_t ra_prefix_value; @@ es_index_custom_len = 0; if (ctx->logstash_prefix_key) { - flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, - (char *) tag, tag_len, - map, NULL); - if (v) { - len = flb_sds_len(v); - if (es_action_line_value_is_safe(v, len) == FLB_TRUE) { + ra_prefix_value = flb_ra_translate(ctx->ra_prefix_key, + (char *) tag, tag_len, + map, NULL); + if (ra_prefix_value) { + len = flb_sds_len(ra_prefix_value); + if (es_action_line_value_is_safe(ra_prefix_value, len) == FLB_TRUE) { if (len > 128) { len = 128; - memcpy(logstash_index, v, 128); + memcpy(logstash_index, ra_prefix_value, 128); } else { - memcpy(logstash_index, v, len); + memcpy(logstash_index, ra_prefix_value, len); } es_index_custom_len = len; } - flb_sds_destroy(v); + flb_sds_destroy(ra_prefix_value); } }As per coding guidelines, “Declare variables at the start of functions, not mid-block.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_es/es.c` around lines 431 - 455, The variable `flb_sds_t v` is declared inside the `if (ctx->logstash_prefix_key)` conditional block, but C style guidelines for this file require all variable declarations at the start of the function. Move the `flb_sds_t v` declaration to the beginning of the function with other variable declarations (before the logstash_index initialization), initialize it to NULL, and then assign the result of the `flb_ra_translate` call to it within the conditional block as it currently is.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_es/es.c`:
- Around line 598-623: The code currently discards any previously generated
`_id` (from `Generate_ID`) when rejecting an unsafe optional `Id_Key` value in
the id_key_required == FLB_TRUE condition block. Instead of immediately
continuing to skip the record, preserve the generated ID by allowing the bulk
header construction logic (similar to what occurs in the else if (id_key_str)
block with suppress_type_name) to execute using the generated `_id` value.
Modify the control flow so that when id_key_required is true but Id_Key is
unsafe/missing, the code still constructs the bulk index header with the
previously generated `_id` rather than abandoning it.
In `@plugins/out_opensearch/opensearch.c`:
- Around line 629-654: When rejecting an unsafe Id_Key value in the conditional
block that checks id_key_required and id_key_str, ensure that any previously
generated safe _id from the Generate_ID feature is preserved in the index
header. Modify the else if (id_key_str) block that formats the index header
using OS_BULK_INDEX_FMT_NO_TYPE or OS_BULK_INDEX_FMT to check whether a
generated _id already exists, and if so, avoid overwriting the header in a way
that removes the _id field. This prevents breaking update/upsert requests and
maintains deduplication for index/create operations that rely on the generated
ID.
In
`@tests/integration/scenarios/out_es/tests/test_out_es_ndjson_action_line_001.py`:
- Around line 164-175: The test function
test_record_accessor_values_do_not_forge_bulk_action_lines currently only
validates the first request in requests_seen by checking requests_seen[0], but
it should validate all captured requests. Modify the assertion logic after
service.stop() to iterate through all items in the requests_seen list, verify
each request's path starts with "/_bulk", and call _assert_no_forged_delete on
each request body. This ensures that forged delete actions are detected in any
of the captured requests, not just the first one.
---
Nitpick comments:
In `@plugins/out_es/es.c`:
- Around line 431-455: The variable `flb_sds_t v` is declared inside the `if
(ctx->logstash_prefix_key)` conditional block, but C style guidelines for this
file require all variable declarations at the start of the function. Move the
`flb_sds_t v` declaration to the beginning of the function with other variable
declarations (before the logstash_index initialization), initialize it to NULL,
and then assign the result of the `flb_ra_translate` call to it within the
conditional block as it currently is.
In `@plugins/out_opensearch/opensearch.c`:
- Around line 419-443: Move the variable declaration of flb_sds_t v from inside
the if (ctx->logstash_prefix_key) conditional block to the function declarations
section at the start of the function where other variables are declared. Keep
the assignment of v using flb_ra_translate() inside the conditional block where
it currently is, only removing the declaration part from there.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 17c7b3f7-33f0-480b-8823-02c703d3db65
📒 Files selected for processing (10)
plugins/out_es/es.cplugins/out_opensearch/opensearch.ctests/integration/scenarios/out_es/config/out_es_id_key_ndjson.yamltests/integration/scenarios/out_es/config/out_es_id_key_update_ndjson.yamltests/integration/scenarios/out_es/config/out_es_logstash_prefix_key_ndjson.yamltests/integration/scenarios/out_es/config/out_opensearch_id_key_ndjson.yamltests/integration/scenarios/out_es/config/out_opensearch_id_key_update_ndjson.yamltests/integration/scenarios/out_es/config/out_opensearch_index_record_accessor_ndjson.yamltests/integration/scenarios/out_es/config/out_opensearch_logstash_prefix_key_ndjson.yamltests/integration/scenarios/out_es/tests/test_out_es_ndjson_action_line_001.py
✅ Files skipped from review due to trivial changes (1)
- tests/integration/scenarios/out_es/config/out_es_id_key_ndjson.yaml
🚧 Files skipped from review as they are similar to previous changes (4)
- tests/integration/scenarios/out_es/config/out_opensearch_id_key_ndjson.yaml
- tests/integration/scenarios/out_es/config/out_opensearch_index_record_accessor_ndjson.yaml
- tests/integration/scenarios/out_es/config/out_opensearch_logstash_prefix_key_ndjson.yaml
- tests/integration/scenarios/out_es/config/out_es_logstash_prefix_key_ndjson.yaml
| else if (id_key_required == FLB_TRUE) { | ||
| flb_plg_warn(ctx->ins, | ||
| "skipping record with missing or unsafe Id_Key value"); | ||
| if (id_key_str) { | ||
| flb_sds_destroy(id_key_str); | ||
| id_key_str = NULL; | ||
| } | ||
| msgpack_sbuffer_destroy(&tmp_sbuf); | ||
| continue; | ||
| } | ||
| else if (id_key_str) { | ||
| if (ctx->suppress_type_name) { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| ES_BULK_INDEX_FMT_WITHOUT_TYPE, | ||
| ctx->es_action, | ||
| es_index); | ||
| } | ||
| else { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| ES_BULK_INDEX_FMT, | ||
| ctx->es_action, | ||
| es_index, ctx->type); | ||
| } | ||
| } |
There was a problem hiding this comment.
Preserve generated _id when rejecting an unsafe optional Id_Key.
If Generate_ID is enabled, lines 551-570 already compose a safe generated _id. This unsafe Id_Key fallback then rewrites the header without _id, which can break update/upsert requests and removes generated-ID deduplication for index/create operations.
🐛 Proposed fix
- else if (id_key_str) {
+ else if (id_key_str && ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| else if (id_key_required == FLB_TRUE) { | |
| flb_plg_warn(ctx->ins, | |
| "skipping record with missing or unsafe Id_Key value"); | |
| if (id_key_str) { | |
| flb_sds_destroy(id_key_str); | |
| id_key_str = NULL; | |
| } | |
| msgpack_sbuffer_destroy(&tmp_sbuf); | |
| continue; | |
| } | |
| else if (id_key_str) { | |
| if (ctx->suppress_type_name) { | |
| index_len = flb_sds_snprintf(&j_index, | |
| flb_sds_alloc(j_index), | |
| ES_BULK_INDEX_FMT_WITHOUT_TYPE, | |
| ctx->es_action, | |
| es_index); | |
| } | |
| else { | |
| index_len = flb_sds_snprintf(&j_index, | |
| flb_sds_alloc(j_index), | |
| ES_BULK_INDEX_FMT, | |
| ctx->es_action, | |
| es_index, ctx->type); | |
| } | |
| } | |
| else if (id_key_required == FLB_TRUE) { | |
| flb_plg_warn(ctx->ins, | |
| "skipping record with missing or unsafe Id_Key value"); | |
| if (id_key_str) { | |
| flb_sds_destroy(id_key_str); | |
| id_key_str = NULL; | |
| } | |
| msgpack_sbuffer_destroy(&tmp_sbuf); | |
| continue; | |
| } | |
| else if (id_key_str && ctx->generate_id == FLB_FALSE) { | |
| if (ctx->suppress_type_name) { | |
| index_len = flb_sds_snprintf(&j_index, | |
| flb_sds_alloc(j_index), | |
| ES_BULK_INDEX_FMT_WITHOUT_TYPE, | |
| ctx->es_action, | |
| es_index); | |
| } | |
| else { | |
| index_len = flb_sds_snprintf(&j_index, | |
| flb_sds_alloc(j_index), | |
| ES_BULK_INDEX_FMT, | |
| ctx->es_action, | |
| es_index, ctx->type); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/out_es/es.c` around lines 598 - 623, The code currently discards any
previously generated `_id` (from `Generate_ID`) when rejecting an unsafe
optional `Id_Key` value in the id_key_required == FLB_TRUE condition block.
Instead of immediately continuing to skip the record, preserve the generated ID
by allowing the bulk header construction logic (similar to what occurs in the
else if (id_key_str) block with suppress_type_name) to execute using the
generated `_id` value. Modify the control flow so that when id_key_required is
true but Id_Key is unsafe/missing, the code still constructs the bulk index
header with the previously generated `_id` rather than abandoning it.
| else if (id_key_required == FLB_TRUE) { | ||
| flb_plg_warn(ctx->ins, | ||
| "skipping record with missing or unsafe Id_Key value"); | ||
| if (id_key_str) { | ||
| flb_sds_destroy(id_key_str); | ||
| id_key_str = NULL; | ||
| } | ||
| msgpack_sbuffer_destroy(&tmp_sbuf); | ||
| continue; | ||
| } | ||
| else if (id_key_str) { | ||
| if (ctx->suppress_type_name) { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| OS_BULK_INDEX_FMT_NO_TYPE, | ||
| ctx->action, | ||
| index); | ||
| } | ||
| else { | ||
| index_len = flb_sds_snprintf(&j_index, | ||
| flb_sds_alloc(j_index), | ||
| OS_BULK_INDEX_FMT, | ||
| ctx->action, | ||
| index, ctx->type); | ||
| } | ||
| } |
There was a problem hiding this comment.
Preserve generated _id when rejecting an unsafe optional Id_Key.
If Generate_ID is enabled, lines 578-601 already compose a safe generated _id. This unsafe Id_Key fallback then rewrites the header without _id, which can break update/upsert requests and removes generated-ID deduplication for index/create operations.
🐛 Proposed fix
- else if (id_key_str) {
+ else if (id_key_str && ctx->generate_id == FLB_FALSE) {
if (ctx->suppress_type_name) {
index_len = flb_sds_snprintf(&j_index,
flb_sds_alloc(j_index),🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/out_opensearch/opensearch.c` around lines 629 - 654, When rejecting
an unsafe Id_Key value in the conditional block that checks id_key_required and
id_key_str, ensure that any previously generated safe _id from the Generate_ID
feature is preserved in the index header. Modify the else if (id_key_str) block
that formats the index header using OS_BULK_INDEX_FMT_NO_TYPE or
OS_BULK_INDEX_FMT to check whether a generated _id already exists, and if so,
avoid overwriting the header in a way that removes the _id field. This prevents
breaking update/upsert requests and maintains deduplication for index/create
operations that rely on the generated ID.
| def test_record_accessor_values_do_not_forge_bulk_action_lines(config_file): | ||
| service = Service(config_file) | ||
|
|
||
| try: | ||
| service.start() | ||
| requests_seen = service.wait_for_requests(1) | ||
| finally: | ||
| service.stop() | ||
|
|
||
| bulk_body = requests_seen[0]["body"] | ||
| assert requests_seen[0]["path"].startswith("/_bulk") | ||
| _assert_no_forged_delete(bulk_body) |
There was a problem hiding this comment.
Validate forged-delete absence across all captured bulk requests, not only the first one.
Line 173 only checks requests_seen[0], so a forged action in a later request would be missed. Aggregate and assert across all captured request bodies.
Suggested fix
def test_record_accessor_values_do_not_forge_bulk_action_lines(config_file):
@@
- bulk_body = requests_seen[0]["body"]
- assert requests_seen[0]["path"].startswith("/_bulk")
- _assert_no_forged_delete(bulk_body)
+ assert all(request["path"].startswith("/_bulk") for request in requests_seen)
+ actions = _bulk_actions(requests_seen)
+ deletes = [
+ action["delete"]
+ for action in actions
+ if "delete" in action and action["delete"].get("_id") == FORGED_DELETE_ID
+ ]
+ assert deletes == []🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@tests/integration/scenarios/out_es/tests/test_out_es_ndjson_action_line_001.py`
around lines 164 - 175, The test function
test_record_accessor_values_do_not_forge_bulk_action_lines currently only
validates the first request in requests_seen by checking requests_seen[0], but
it should validate all captured requests. Modify the assertion logic after
service.stop() to iterate through all items in the requests_seen list, verify
each request's path starts with "/_bulk", and call _assert_no_forged_delete on
each request body. This ensures that forged delete actions are detected in any
of the captured requests, not just the first one.
Summary
_bulkaction lines.out_esLogstash prefix and ID key paths, plusout_opensearchLogstash prefix, record-accessor index, and ID key paths._bulkrequests and verifies attacker-controlled record fields cannot forge adeleteaction line.Root Cause
out_esandout_opensearchformatted record-accessor output directly into NDJSON action-line JSON string slots using%s. Values containing quote/newline bytes could break out of_indexor_idand create additional bulk actions in the request body.Validation
cmake -DFLB_DEV=on ../frombuild/: passedmake -j 8: passedtests/integration/.venv/bin/python -m pytest tests/integration/scenarios/out_es/tests/test_out_es_ndjson_action_line_001.py -q: passed, 5 passedVALGRIND=1 VALGRIND_STRICT=1 tests/integration/.venv/bin/python -m pytest tests/integration/scenarios/out_es/tests/test_out_es_ndjson_action_line_001.py -q: passed, 5 passedtests/integration/.venv/bin/python .github/scripts/commit_prefix_check.py: passedGITHUB_EVENT_NAME=pull_request GITHUB_BASE_REF=master tests/integration/.venv/bin/python .github/scripts/commit_prefix_check.py: passedSummary by CodeRabbit
Bug Fixes & Tests
Bug Fixes
id_key-driven update/upsert, unsafe/missing IDs can cause the affected record to be skipped (with warnings) or omit_idas appropriate.Tests
id_key,logstash_prefix_key, and bulk action line behavior across multiple requests.