diff --git a/include/fluent-bit/flb_ring_buffer.h b/include/fluent-bit/flb_ring_buffer.h index bcb5a3e5557..26965de0386 100644 --- a/include/fluent-bit/flb_ring_buffer.h +++ b/include/fluent-bit/flb_ring_buffer.h @@ -40,5 +40,7 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size); int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size); +int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size); +int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size); #endif diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c index c8bfe029350..83a91ba3ce1 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.c +++ b/plugins/filter_rewrite_tag/rewrite_tag.c @@ -39,6 +39,7 @@ static int emitter_create(struct flb_rewrite_tag *ctx) { int ret; struct flb_input_instance *ins; + char ring_buffer_size[42]; ret = flb_input_name_exists(ctx->emitter_name, ctx->config); if (ret == FLB_TRUE) { @@ -73,6 +74,16 @@ static int emitter_create(struct flb_rewrite_tag *ctx) flb_plg_error(ctx->ins, "cannot set storage.type"); } + /* Set ring_buffer_size */ + if (ctx->emitter_ring_buffer_size > 0) { + snprintf(ring_buffer_size, sizeof(ring_buffer_size)-1, "%zd", + ctx->emitter_ring_buffer_size); + ret = flb_input_set_property(ins, "ring_buffer_size", ring_buffer_size); + if (ret == -1) { + flb_plg_error(ins, "cannot set ring buffer size"); + } + } + /* Initialize emitter plugin */ ret = flb_input_instance_init(ins, ctx->config); if (ret == -1) { @@ -602,6 +613,11 @@ static struct flb_config_map config_map[] = { FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_type), NULL }, + { + FLB_CONFIG_MAP_SIZE, "emitter_ring_buffer_size", "0", + FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_ring_buffer_size), + "set the emitter ring buffer size, must be set to > 0 to enable it" + }, { FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_RTAG_MEM_BUF_LIMIT_DEFAULT, FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit), diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h index d73b49f12eb..1de6c77c9fc 100644 --- a/plugins/filter_rewrite_tag/rewrite_tag.h +++ b/plugins/filter_rewrite_tag/rewrite_tag.h @@ -43,12 +43,12 @@ struct flb_rewrite_tag { flb_sds_t emitter_name; /* emitter input plugin name */ flb_sds_t emitter_storage_type; /* emitter storage type */ size_t emitter_mem_buf_limit; /* Emitter buffer limit */ + size_t emitter_ring_buffer_size; /* ring buffer size of the emitter */ struct mk_list rules; /* processed rules */ struct mk_list *cm_rules; /* config_map rules (only strings) */ struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ struct flb_filter_instance *ins; /* self-filter instance */ struct flb_config *config; /* Fluent Bit context */ - #ifdef FLB_HAVE_METRICS struct cmt_counter *cmt_emitted; #endif diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 064415e807a..52edef54896 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -228,12 +228,16 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, (void) in; - while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec, + while ((ret = flb_ring_buffer_peek(ctx->msgs, 0, (void *)&ec, sizeof(struct em_chunk))) == 0) { ret = flb_input_log_append(in, ec.tag, flb_sds_len(ec.tag), ec.mp_sbuf.data, ec.mp_sbuf.size); + if (ret < 0) { + return ret; + } + flb_ring_buffer_skip(ctx->msgs, sizeof(struct em_chunk)); flb_sds_destroy(ec.tag); msgpack_sbuffer_destroy(&ec.mp_sbuf); } diff --git a/src/flb_ring_buffer.c b/src/flb_ring_buffer.c index 66e5fa47564..fcd99e29522 100644 --- a/src/flb_ring_buffer.c +++ b/src/flb_ring_buffer.c @@ -202,4 +202,26 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size) return 0; } +int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size) +{ + size_t ret; + + ret = lwrb_peek(rb->ctx, skip_count, ptr, size); + if (ret == 0) { + return -1; + } + + return 0; +} +int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size) +{ + size_t ret; + + ret = lwrb_skip(rb->ctx, size); + if (ret == 0) { + return -1; + } + + return 0; +} diff --git a/tests/internal/ring_buffer.c b/tests/internal/ring_buffer.c index 0c0e20741e4..4824ad6d0ff 100644 --- a/tests/internal/ring_buffer.c +++ b/tests/internal/ring_buffer.c @@ -161,6 +161,56 @@ static void test_smart_flush() flb_bucket_queue_destroy(bktq); mk_event_loop_destroy(evl); } + +void test_peek_seek() +{ + int i; + int ret; + int elements; + struct check *c; + struct check *tmp; + struct flb_ring_buffer *rb; + + elements = sizeof(checks) / sizeof(struct check); + + rb = flb_ring_buffer_create(sizeof(struct check *) * elements); + TEST_CHECK(rb != NULL); + if (!rb) { + exit(EXIT_FAILURE); + } + + for (i = 0; i < elements; i++) { + c = &checks[i]; + ret = flb_ring_buffer_write(rb, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + } + + /* try to write another record, it must fail */ + tmp = c; + ret = flb_ring_buffer_write(rb, (void *) &tmp, sizeof(tmp)); + TEST_CHECK(ret == -1); + + c = NULL; + + /* consume one entry */ + ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + + /* the consumed entry must be equal to the first one */ + c = &checks[0]; + TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0); + + /* consume one entry */ + ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c)); + TEST_CHECK(ret == 0); + + /* the consumed entry must be equal to the first one */ + c = &checks[0]; + TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0); + + flb_ring_buffer_destroy(rb); +} + TEST_LIST = { { "basic", test_basic}, { "smart_flush", test_smart_flush}, diff --git a/tests/runtime/filter_rewrite_tag.c b/tests/runtime/filter_rewrite_tag.c index 08cf37511a8..701113d4533 100644 --- a/tests/runtime/filter_rewrite_tag.c +++ b/tests/runtime/filter_rewrite_tag.c @@ -180,6 +180,61 @@ static void flb_test_matched() filter_test_destroy(ctx); } +/* + * Original tag: rewrite + * Rewritten tag: updated + */ +static void flb_test_ring_buffer() +{ + struct flb_lib_out_cb cb_data; + struct filter_test *ctx; + int ret; + int not_used = 0; + int bytes; + int got; + char *p = "[0, {\"key\":\"rewrite\"}]"; + + /* Prepare output callback with expected result */ + cb_data.cb = cb_count_msgpack; + cb_data.data = ¬_used; + + /* Create test context */ + ctx = filter_test_create((void *) &cb_data); + if (!ctx) { + exit(EXIT_FAILURE); + } + clear_output_num(); + /* Configure filter */ + ret = flb_filter_set(ctx->flb, ctx->f_ffd, + "emitter_ring_buffer_size", "128", + "Rule", "$key ^(rewrite)$ updated false", + NULL); + TEST_CHECK(ret == 0); + + /* Configure output */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "Match", "updated", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* ingest record */ + bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + + flb_time_msleep(1500); /* waiting flush */ + got = get_output_num(); + + if (!TEST_CHECK(got != 0)) { + TEST_MSG("expect: %d got: %d", 1, got); + } + + filter_test_destroy(ctx); +} + /* * Original tag: rewrite * Rewritten tag: updated @@ -556,6 +611,7 @@ TEST_LIST = { {"matched", flb_test_matched}, {"not_matched", flb_test_not_matched}, {"keep_true", flb_test_keep_true}, + {"ring_buffer", flb_test_ring_buffer}, {"heavy_input_pause_emitter", flb_test_heavy_input_pause_emitter}, {"issue_4518", flb_test_issue_4518}, {"issue_4793", flb_test_issue_4793},