diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 1cfc6301ff8..db71930e990 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -38,6 +38,20 @@ #define HEALTH_CHECK_PERIOD 60 #define FLB_CONFIG_DEFAULT_TAG "fluent_bit" +#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE 2048 +#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT 16384 +#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE 256 + +/* The reason behind FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT being set to 16384 + * is that this is largest unsigned number expressable with 14 bits which is + * a limit imposed by the messaging mechanism used. + * + * As for FLB_CONFIG_DEFAULT_TASK_MAP_SIZE, 2048 was chosen to retain its + * original value and FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE is set + * to a multiple of 8 because entries in the task map are just task + * pointers. + */ + /* Main struct to hold the configuration of the runtime service */ struct flb_config { struct mk_event ch_event; @@ -288,7 +302,8 @@ struct flb_config { unsigned int sched_cap; unsigned int sched_base; - struct flb_task_map tasks_map[2048]; + struct flb_task_map *task_map; + size_t task_map_size; int dry_run; }; @@ -302,6 +317,8 @@ int flb_config_set_property(struct flb_config *config, const char *k, const char *v); int flb_config_set_program_name(struct flb_config *config, char *name); int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf); +int flb_config_task_map_resize(struct flb_config *config, size_t new_size); +int flb_config_task_map_grow(struct flb_config *config); int set_log_level_from_env(struct flb_config *config); #ifdef FLB_HAVE_STATIC_CONF diff --git a/include/fluent-bit/flb_task.h b/include/fluent-bit/flb_task.h index 719d245ec44..79bda81a207 100644 --- a/include/fluent-bit/flb_task.h +++ b/include/fluent-bit/flb_task.h @@ -127,6 +127,8 @@ int flb_task_running_count(struct flb_config *config); int flb_task_running_print(struct flb_config *config); int flb_task_map_get_task_id(struct flb_config *config); +struct flb_task *task_alloc(struct flb_config *config); + struct flb_task *flb_task_create(uint64_t ref_id, const char *buf, size_t size, diff --git a/src/flb_config.c b/src/flb_config.c index 32dc34b7e83..d1e7541aac3 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -333,7 +333,14 @@ struct flb_config *flb_config_init() mk_list_init(&config->cmetrics); mk_list_init(&config->cf_parsers_list); - memset(&config->tasks_map, '\0', sizeof(config->tasks_map)); + /* Task map */ + ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE); + + if (ret != 0) { + flb_error("[config] task map resize failed"); + flb_config_exit(config); + return NULL; + } /* Initialize multiline-parser list. We need this here, because from now * on we use flb_config_exit to cleanup the config, which requires @@ -547,6 +554,9 @@ void flb_config_exit(struct flb_config *config) flb_cf_destroy(cf); } + /* release task map */ + flb_config_task_map_resize(config, 0); + flb_free(config); } @@ -970,3 +980,57 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) return 0; } + +int flb_config_task_map_resize(struct flb_config *config, size_t new_size) +{ + struct flb_task_map *new_task_map; + + if (new_size == config->task_map_size) { + return 0; + } + + if (new_size == 0) { + if (config->task_map != NULL) { + flb_free(config->task_map); + + config->task_map = NULL; + config->task_map_size = 0; + } + + return 0; + } + + if (config->task_map == NULL) { + new_task_map = flb_calloc(new_size, sizeof(struct flb_task_map)); + } + else { + new_task_map = flb_realloc(config->task_map, new_size * sizeof(struct flb_task_map)); + } + + if (new_task_map == NULL) { + flb_errno(); + + return -1; + } + + if (new_size > config->task_map_size) { + memset(&new_task_map[config->task_map_size], + 0, + (new_size - config->task_map_size) * sizeof(struct flb_task_map)); + } + + config->task_map = new_task_map; + config->task_map_size = new_size; + + return 0; +} + +int flb_config_task_map_grow(struct flb_config *config) +{ + if (config->task_map_size >= FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT) { + return -1; + } + + return flb_config_task_map_resize(config, + config->task_map_size + FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE); +} diff --git a/src/flb_engine.c b/src/flb_engine.c index bedc28477c5..9f27de159ba 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -273,7 +273,7 @@ static inline int handle_output_event(uint64_t ts, task_id, out_id, trace_st); #endif - task = config->tasks_map[task_id].task; + task = config->task_map[task_id].task; ins = flb_output_get_instance(config, out_id); if (flb_output_is_threaded(ins) == FLB_FALSE) { flb_output_flush_finished(config, out_id); diff --git a/src/flb_task.c b/src/flb_task.c index e29bb6cdcfd..c7ffa96cc8f 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -33,7 +33,7 @@ /* * Every task created must have an unique ID, this function lookup the - * lowest number available in the tasks_map. + * lowest number available in the task_map. * * This 'id' is used by the task interface to communicate with the engine event * loop about some action. @@ -41,28 +41,34 @@ static inline int map_get_task_id(struct flb_config *config) { + int result; int i; - int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map)); - for (i = 0; i < map_size; i++) { - if (config->tasks_map[i].task == NULL) { + for (i = 0; i < config->task_map_size ; i++) { + if (config->task_map[i].task == NULL) { return i; } } + result = flb_config_task_map_grow(config); + + if (result == 0) { + return i; + } + return -1; } static inline void map_set_task_id(int id, struct flb_task *task, struct flb_config *config) { - config->tasks_map[id].task = task; + config->task_map[id].task = task; } static inline void map_free_task_id(int id, struct flb_config *config) { - config->tasks_map[id].task = NULL; + config->task_map[id].task = NULL; } void flb_task_retry_destroy(struct flb_task_retry *retry) @@ -232,7 +238,7 @@ int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins) } /* Allocate an initialize a basic Task structure */ -static struct flb_task *task_alloc(struct flb_config *config) +struct flb_task *task_alloc(struct flb_config *config) { int task_id; struct flb_task *task; @@ -250,6 +256,7 @@ static struct flb_task *task_alloc(struct flb_config *config) flb_free(task); return NULL; } + map_set_task_id(task_id, task, config); flb_trace("[task %p] created (id=%i)", task, task_id); @@ -484,22 +491,30 @@ void flb_task_destroy(struct flb_task *task, int del) } /* Unlink and release task */ - mk_list_del(&task->_head); + if (!mk_list_entry_is_orphan(&task->_head)) { + mk_list_del(&task->_head); + } /* destroy chunk */ - flb_input_chunk_destroy(task->ic, del); + if (task->ic != NULL) { + flb_input_chunk_destroy(task->ic, del); + } /* Remove 'retries' */ mk_list_foreach_safe(head, tmp, &task->retries) { retry = mk_list_entry(head, struct flb_task_retry, _head); + flb_task_retry_destroy(retry); } - flb_input_chunk_set_limits(task->i_ins); + if (task->i_ins != NULL) { + flb_input_chunk_set_limits(task->i_ins); + } - if (task->event_chunk) { + if (task->event_chunk != NULL) { flb_event_chunk_destroy(task->event_chunk); } + flb_free(task); } diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 1a77c6b93b8..c379b62966c 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -45,7 +45,8 @@ set(UNIT_TESTS_FILES uri.c msgpack_append_message.c conditionals.c - endianness + endianness.c + task_map.c ) # Config format diff --git a/tests/internal/task_map.c b/tests/internal/task_map.c new file mode 100644 index 00000000000..f1ba353128a --- /dev/null +++ b/tests/internal/task_map.c @@ -0,0 +1,94 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include + +#include "flb_tests_internal.h" + +#define TASK_COUNT_LIMIT (FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT + 1) + +struct test_ctx { + struct flb_config *config; +}; + +struct test_ctx* test_ctx_create() +{ + struct test_ctx *ret_ctx = NULL; + + ret_ctx = flb_calloc(1, sizeof(struct test_ctx)); + if (!TEST_CHECK(ret_ctx != NULL)) { + flb_errno(); + TEST_MSG("flb_malloc(test_ctx) failed"); + return NULL; + } + + ret_ctx->config = flb_config_init(); + if(!TEST_CHECK(ret_ctx->config != NULL)) { + TEST_MSG("flb_config_init failed"); + flb_free(ret_ctx); + return NULL; + } + + return ret_ctx; +} + +int test_ctx_destroy(struct test_ctx* ctx) +{ + if (!TEST_CHECK(ctx != NULL)) { + return -1; + } + + if (ctx->config) { + flb_config_exit(ctx->config); + } + + flb_free(ctx); + return 0; +} + +void test_task_map_limit() +{ + struct test_ctx *ctx; + ssize_t index; + struct flb_task *tasks[TASK_COUNT_LIMIT]; + int failure_detected; + + ctx = test_ctx_create(); + + if (!TEST_CHECK(ctx != NULL)) { + return; + } + + failure_detected = FLB_FALSE; + + for (index = 0 ; index < TASK_COUNT_LIMIT ; index++) { + tasks[index] = task_alloc(ctx->config); + + if (tasks[index] == NULL) { + failure_detected = FLB_TRUE; + + break; + } + } + + if (TEST_CHECK(failure_detected == FLB_TRUE)) { + TEST_CHECK(index == FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT); + } + + while (index >= 0) { + if (tasks[index] != NULL) { + flb_task_destroy(tasks[index], FLB_TRUE); + } + index--; + } + + test_ctx_destroy(ctx); +} + +TEST_LIST = { + { "task_map_limit" , test_task_map_limit}, + { 0 } +};