From 56dd0db8443921e4b7942d4a7d4e2948900ff5e8 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Wed, 5 Mar 2025 13:36:16 -0800 Subject: [PATCH 1/7] ring_buffer: add functions flb_ring_buffer_peek and flb_ring_buffer_seek. Signed-off-by: Phillip Adair Stewart Whelan --- include/fluent-bit/flb_ring_buffer.h | 2 ++ src/flb_ring_buffer.c | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/include/fluent-bit/flb_ring_buffer.h b/include/fluent-bit/flb_ring_buffer.h index bcb5a3e5557..26965de0386 100644 --- a/include/fluent-bit/flb_ring_buffer.h +++ b/include/fluent-bit/flb_ring_buffer.h @@ -40,5 +40,7 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size); int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size); +int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size); +int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size); #endif diff --git a/src/flb_ring_buffer.c b/src/flb_ring_buffer.c index 66e5fa47564..fcd99e29522 100644 --- a/src/flb_ring_buffer.c +++ b/src/flb_ring_buffer.c @@ -202,4 +202,26 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size) return 0; } +int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size) +{ + size_t ret; + + ret = lwrb_peek(rb->ctx, skip_count, ptr, size); + if (ret == 0) { + return -1; + } + + return 0; +} +int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size) +{ + size_t ret; + + ret = lwrb_skip(rb->ctx, size); + if (ret == 0) { + return -1; + } + + return 0; +} From f423eb44faee32e860724280f1ca90ba6d3e012d Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 10:35:52 -0300 Subject: [PATCH 2/7] filter_rewrite: add configuration to enable ring_buffer on emitter. Signed-off-by: Phillip Adair Stewart Whelan --- plugins/filter_rewrite_tag/rewrite_tag.c | 23 +++++++++++++++++++++++ plugins/filter_rewrite_tag/rewrite_tag.h | 2 +- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index c8bfe029350..ae99a30ef13 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -39,6 +39,7 @@ static int emitter_create(struct flb_rewrite_tag *ctx) { int ret; struct flb_input_instance *ins; + char ring_buffer_size[8]; ret = flb_input_name_exists(ctx->emitter_name, ctx->config); if (ret == FLB_TRUE) { @@ -73,6 +74,23 @@ static int emitter_create(struct flb_rewrite_tag *ctx) flb_plg_error(ctx->ins, "cannot set storage.type"); } + /* Set ring_buffer_size */ + if (ctx->emitter_ring_buffer_size > 0) { + ret = snprintf(ring_buffer_size, sizeof(ring_buffer_size)-1, "%zd", + ctx->emitter_ring_buffer_size); + if (ret > sizeof(ring_buffer_size)-1) { + flb_plg_error(ctx->ins, "ring_buffer_size exceeds maximum size"); + flb_input_instance_exit(ins, ctx->config); + flb_input_instance_destroy(ins); + return -1; + } + ring_buffer_size[ret] = '\0'; + ret = flb_input_set_property(ins, "ring_buffer_size", ring_buffer_size); + if (ret == -1) { + flb_plg_error(ins, "cannot set ring buffer size"); + } + } + /* Initialize emitter plugin */ ret = flb_input_instance_init(ins, ctx->config); if (ret == -1) { @@ -602,6 +620,11 @@ static struct flb_config_map config_map[] = { FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_type), NULL }, + { + FLB_CONFIG_MAP_SIZE, "emitter_ring_buffer_size", "0", + FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_ring_buffer_size), + "set the emitter ring buffer size, must be set to > 0 to enable it" + }, { FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_RTAG_MEM_BUF_LIMIT_DEFAULT, FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit), diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index d73b49f12eb..1de6c77c9fc 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -43,12 +43,12 @@ struct flb_rewrite_tag { flb_sds_t emitter_name; /* emitter input plugin name */ flb_sds_t emitter_storage_type; /* emitter storage type */ size_t emitter_mem_buf_limit; /* Emitter buffer limit */ + size_t emitter_ring_buffer_size; /* ring buffer size of the emitter */ struct mk_list rules; /* processed rules */ struct mk_list *cm_rules; /* config_map rules (only strings) */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ struct flb_filter_instance *ins; /* self-filter instance */ struct flb_config *config; /* Fluent Bit context */ - #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; #endif From 6a36d9330743e4075b93bde2e1e92d957787459b Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 10:37:31 -0300 Subject: [PATCH 3/7] tests: filter_rewrite_tag: add test for emitter ring buffer. Signed-off-by: Phillip Adair Stewart Whelan --- tests/runtime/filter_rewrite_tag.c | 56 ++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/runtime/filter_rewrite_tag.c b/tests/runtime/filter_rewrite_tag.c index 08cf37511a8..701113d4533 100644 --- a/tests/runtime/filter_rewrite_tag.c +++ b/tests/runtime/filter_rewrite_tag.c @@ -180,6 +180,61 @@ static void flb_test_matched() filter_test_destroy(ctx); } +/* + * Original tag: rewrite + * Rewritten tag: updated + */ +static void flb_test_ring_buffer() +{ + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + int ret; + int not_used = 0; + int bytes; + int got; + char *p = "[0, {\"key\":\"rewrite\"}]"; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_msgpack; + cb_data.data = ¬_used; + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data); + if (!ctx) { + exit(EXIT_FAILURE); + } + clear_output_num(); + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "emitter_ring_buffer_size", "128", + "Rule", "$key ^(rewrite)$ updated false", + NULL); + TEST_CHECK(ret == 0); + + /* Configure output */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "Match", "updated", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* ingest record */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + + flb_time_msleep(1500); /* waiting flush */ + got = get_output_num(); + + if (!TEST_CHECK(got != 0)) { + TEST_MSG("expect: %d got: %d", 1, got); + } + + filter_test_destroy(ctx); +} + /* * Original tag: rewrite * Rewritten tag: updated @@ -556,6 +611,7 @@ TEST_LIST = { {"matched", flb_test_matched}, {"not_matched", flb_test_not_matched}, {"keep_true", flb_test_keep_true}, + {"ring_buffer", flb_test_ring_buffer}, {"heavy_input_pause_emitter", flb_test_heavy_input_pause_emitter}, {"issue_4518", flb_test_issue_4518}, {"issue_4793", flb_test_issue_4793}, From 6e9ab508ab270d34ce54a9e7e468e27e0025ddb4 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 10:38:08 -0300 Subject: [PATCH 4/7] tests: internal: add peek/seek test for ring_buffer. Signed-off-by: Phillip Adair Stewart Whelan --- tests/internal/ring_buffer.c | 50 ++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/internal/ring_buffer.c b/tests/internal/ring_buffer.c index 0c0e20741e4..f854de5506d 100644 --- a/tests/internal/ring_buffer.c +++ b/tests/internal/ring_buffer.c @@ -161,6 +161,56 @@ static void test_smart_flush() flb_bucket_queue_destroy(bktq); mk_event_loop_destroy(evl); } + +void test_peek_seek() +{ + int i; + int ret; + int elements; + struct check *c; + struct check *tmp; + struct flb_ring_buffer *rb; + + elements = sizeof(checks) / sizeof(struct check); + + rb = flb_ring_buffer_create(sizeof(struct check *) * elements); + TEST_CHECK(rb != NULL); + if (!rb) { + exit(EXIT_FAILURE); + } + + for (i = 0; i < elements; i++) { + c = &checks[i]; + ret = flb_ring_buffer_write(rb, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + } + + /* try to write another record, it must fail */ + tmp = c; + ret = flb_ring_buffer_write(rb, (void *) &tmp, sizeof(tmp)); + TEST_CHECK(ret == -1); + + c = NULL; + + /* consume one entry */ + ret = flb_ring_buffer_peek(rb, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + + /* the consumed entry must be equal to the first one */ + c = &checks[0]; + TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0); + + /* consume one entry */ + ret = flb_ring_buffer_peek(rb, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + + /* the consumed entry must be equal to the first one */ + c = &checks[0]; + TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0); + + flb_ring_buffer_destroy(rb); +} + TEST_LIST = { { "basic", test_basic}, { "smart_flush", test_smart_flush}, From ac690405c7dde57b325985f08d822831729744ee Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 10:38:53 -0300 Subject: [PATCH 5/7] in_emitter: use peek/seek to leave faulty bufferes queued. Signed-off-by: Phillip Adair Stewart Whelan --- plugins/in_emitter/emitter.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 064415e807a..52edef54896 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -228,12 +228,16 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, (void) in; - while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec, + while ((ret = flb_ring_buffer_peek(ctx->msgs, 0, (void *)&ec, sizeof(struct em_chunk))) == 0) { ret = flb_input_log_append(in, ec.tag, flb_sds_len(ec.tag), ec.mp_sbuf.data, ec.mp_sbuf.size); + if (ret < 0) { + return ret; + } + flb_ring_buffer_skip(ctx->msgs, sizeof(struct em_chunk)); flb_sds_destroy(ec.tag); msgpack_sbuffer_destroy(&ec.mp_sbuf); } From 47affd8dcd5bb6605d1122ca749c66fe82efb505 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 11:00:14 -0300 Subject: [PATCH 6/7] test: internal: ring_buffer: update calls to flb_ring_buffer_peek to add missing skip_count argument. Signed-off-by: Phillip Adair Stewart Whelan --- tests/internal/ring_buffer.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/internal/ring_buffer.c b/tests/internal/ring_buffer.c index f854de5506d..4824ad6d0ff 100644 --- a/tests/internal/ring_buffer.c +++ b/tests/internal/ring_buffer.c @@ -193,7 +193,7 @@ void test_peek_seek() c = NULL; /* consume one entry */ - ret = flb_ring_buffer_peek(rb, (void *) &c, sizeof(c)); + ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c)); TEST_CHECK(ret == 0); /* the consumed entry must be equal to the first one */ @@ -201,7 +201,7 @@ void test_peek_seek() TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0); /* consume one entry */ - ret = flb_ring_buffer_peek(rb, (void *) &c, sizeof(c)); + ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c)); TEST_CHECK(ret == 0); /* the consumed entry must be equal to the first one */ From 01deb22866160c95493c1139bf05fc1070a0ad28 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan Date: Tue, 18 Mar 2025 11:30:15 -0300 Subject: [PATCH 7/7] in_emitter: increase size of ring_buffer_size to accomodate larger numbers and skip the size check. Signed-off-by: Phillip Adair Stewart Whelan --- plugins/filter_rewrite_tag/rewrite_tag.c | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index ae99a30ef13..83a91ba3ce1 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -39,7 +39,7 @@ static int emitter_create(struct flb_rewrite_tag *ctx) { int ret; struct flb_input_instance *ins; - char ring_buffer_size[8]; + char ring_buffer_size[42]; ret = flb_input_name_exists(ctx->emitter_name, ctx->config); if (ret == FLB_TRUE) { @@ -76,15 +76,8 @@ static int emitter_create(struct flb_rewrite_tag *ctx) /* Set ring_buffer_size */ if (ctx->emitter_ring_buffer_size > 0) { - ret = snprintf(ring_buffer_size, sizeof(ring_buffer_size)-1, "%zd", - ctx->emitter_ring_buffer_size); - if (ret > sizeof(ring_buffer_size)-1) { - flb_plg_error(ctx->ins, "ring_buffer_size exceeds maximum size"); - flb_input_instance_exit(ins, ctx->config); - flb_input_instance_destroy(ins); - return -1; - } - ring_buffer_size[ret] = '\0'; + snprintf(ring_buffer_size, sizeof(ring_buffer_size)-1, "%zd", + ctx->emitter_ring_buffer_size); ret = flb_input_set_property(ins, "ring_buffer_size", ring_buffer_size); if (ret == -1) { flb_plg_error(ins, "cannot set ring buffer size");