Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task_map: added task map scaling #10044

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@
#define HEALTH_CHECK_PERIOD 60
#define FLB_CONFIG_DEFAULT_TAG "fluent_bit"

#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE 2048
#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT 16384
#define FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE 256

/* The reason behind FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT being set to 16384
* is that this is largest unsigned number expressable with 14 bits which is
* a limit imposed by the messaging mechanism used.
*
* As for FLB_CONFIG_DEFAULT_TASK_MAP_SIZE, 2048 was chosen to retain its
* original value and FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE is set
* to a multiple of 8 because entries in the task map are just task
* pointers.
*/

/* Main struct to hold the configuration of the runtime service */
struct flb_config {
struct mk_event ch_event;
Expand Down Expand Up @@ -288,7 +302,8 @@ struct flb_config {
unsigned int sched_cap;
unsigned int sched_base;

struct flb_task_map tasks_map[2048];
struct flb_task_map *task_map;
size_t task_map_size;

int dry_run;
};
Expand All @@ -302,6 +317,8 @@ int flb_config_set_property(struct flb_config *config,
const char *k, const char *v);
int flb_config_set_program_name(struct flb_config *config, char *name);
int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf);
int flb_config_task_map_resize(struct flb_config *config, size_t new_size);
int flb_config_task_map_grow(struct flb_config *config);

int set_log_level_from_env(struct flb_config *config);
#ifdef FLB_HAVE_STATIC_CONF
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ int flb_task_running_count(struct flb_config *config);
int flb_task_running_print(struct flb_config *config);
int flb_task_map_get_task_id(struct flb_config *config);

struct flb_task *task_alloc(struct flb_config *config);

struct flb_task *flb_task_create(uint64_t ref_id,
const char *buf,
size_t size,
Expand Down
66 changes: 65 additions & 1 deletion src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,14 @@ struct flb_config *flb_config_init()
mk_list_init(&config->cmetrics);
mk_list_init(&config->cf_parsers_list);

memset(&config->tasks_map, '\0', sizeof(config->tasks_map));
/* Task map */
ret = flb_config_task_map_resize(config, FLB_CONFIG_DEFAULT_TASK_MAP_SIZE);

if (ret != 0) {
flb_error("[config] task map resize failed");
flb_config_exit(config);
return NULL;
}

/* Initialize multiline-parser list. We need this here, because from now
* on we use flb_config_exit to cleanup the config, which requires
Expand Down Expand Up @@ -547,6 +554,9 @@ void flb_config_exit(struct flb_config *config)
flb_cf_destroy(cf);
}

/* release task map */
flb_config_task_map_resize(config, 0);

flb_free(config);
}

Expand Down Expand Up @@ -970,3 +980,57 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf)

return 0;
}

int flb_config_task_map_resize(struct flb_config *config, size_t new_size)
{
struct flb_task_map *new_task_map;

if (new_size == config->task_map_size) {
return 0;
}

if (new_size == 0) {
if (config->task_map != NULL) {
flb_free(config->task_map);

config->task_map = NULL;
config->task_map_size = 0;
}

return 0;
}

if (config->task_map == NULL) {
new_task_map = flb_calloc(new_size, sizeof(struct flb_task_map));
}
else {
new_task_map = flb_realloc(config->task_map, new_size * sizeof(struct flb_task_map));
}

if (new_task_map == NULL) {
flb_errno();

return -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing flb_errno();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in commit 0c3b656

}

if (new_size > config->task_map_size) {
memset(&new_task_map[config->task_map_size],
0,
(new_size - config->task_map_size) * sizeof(struct flb_task_map));
}

config->task_map = new_task_map;
config->task_map_size = new_size;

return 0;
}

int flb_config_task_map_grow(struct flb_config *config)
{
if (config->task_map_size >= FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT) {
return -1;
}

return flb_config_task_map_resize(config,
config->task_map_size + FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_GROWTH_SiZE);
}
2 changes: 1 addition & 1 deletion src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ static inline int handle_output_event(uint64_t ts,
task_id, out_id, trace_st);
#endif

task = config->tasks_map[task_id].task;
task = config->task_map[task_id].task;
ins = flb_output_get_instance(config, out_id);
if (flb_output_is_threaded(ins) == FLB_FALSE) {
flb_output_flush_finished(config, out_id);
Expand Down
37 changes: 26 additions & 11 deletions src/flb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,42 @@

/*
* Every task created must have an unique ID, this function lookup the
* lowest number available in the tasks_map.
* lowest number available in the task_map.
*
* This 'id' is used by the task interface to communicate with the engine event
* loop about some action.
*/

static inline int map_get_task_id(struct flb_config *config)
{
int result;
int i;
int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map));

for (i = 0; i < map_size; i++) {
if (config->tasks_map[i].task == NULL) {
for (i = 0; i < config->task_map_size ; i++) {
if (config->task_map[i].task == NULL) {
return i;
}
}

result = flb_config_task_map_grow(config);

if (result == 0) {
return i;
}

return -1;
}

static inline void map_set_task_id(int id, struct flb_task *task,
struct flb_config *config)
{
config->tasks_map[id].task = task;
config->task_map[id].task = task;

}

static inline void map_free_task_id(int id, struct flb_config *config)
{
config->tasks_map[id].task = NULL;
config->task_map[id].task = NULL;
}

void flb_task_retry_destroy(struct flb_task_retry *retry)
Expand Down Expand Up @@ -232,7 +238,7 @@ int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins)
}

/* Allocate an initialize a basic Task structure */
static struct flb_task *task_alloc(struct flb_config *config)
struct flb_task *task_alloc(struct flb_config *config)
{
int task_id;
struct flb_task *task;
Expand All @@ -250,6 +256,7 @@ static struct flb_task *task_alloc(struct flb_config *config)
flb_free(task);
return NULL;
}

map_set_task_id(task_id, task, config);

flb_trace("[task %p] created (id=%i)", task, task_id);
Expand Down Expand Up @@ -484,22 +491,30 @@ void flb_task_destroy(struct flb_task *task, int del)
}

/* Unlink and release task */
mk_list_del(&task->_head);
if (!mk_list_entry_is_orphan(&task->_head)) {
mk_list_del(&task->_head);
}

/* destroy chunk */
flb_input_chunk_destroy(task->ic, del);
if (task->ic != NULL) {
flb_input_chunk_destroy(task->ic, del);
}

/* Remove 'retries' */
mk_list_foreach_safe(head, tmp, &task->retries) {
retry = mk_list_entry(head, struct flb_task_retry, _head);

flb_task_retry_destroy(retry);
}

flb_input_chunk_set_limits(task->i_ins);
if (task->i_ins != NULL) {
flb_input_chunk_set_limits(task->i_ins);
}

if (task->event_chunk) {
if (task->event_chunk != NULL) {
flb_event_chunk_destroy(task->event_chunk);
}

flb_free(task);
}

Expand Down
3 changes: 2 additions & 1 deletion tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ set(UNIT_TESTS_FILES
uri.c
msgpack_append_message.c
conditionals.c
endianness
endianness.c
task_map.c
)

# Config format
Expand Down
94 changes: 94 additions & 0 deletions tests/internal/task_map.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_socket.h>
#include <fluent-bit/flb_task.h>

#include "flb_tests_internal.h"

#define TASK_COUNT_LIMIT (FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT + 1)

struct test_ctx {
struct flb_config *config;
};

struct test_ctx* test_ctx_create()
{
struct test_ctx *ret_ctx = NULL;

ret_ctx = flb_calloc(1, sizeof(struct test_ctx));
if (!TEST_CHECK(ret_ctx != NULL)) {
flb_errno();
TEST_MSG("flb_malloc(test_ctx) failed");
return NULL;
}

ret_ctx->config = flb_config_init();
if(!TEST_CHECK(ret_ctx->config != NULL)) {
TEST_MSG("flb_config_init failed");
flb_free(ret_ctx);
return NULL;
}

return ret_ctx;
}

int test_ctx_destroy(struct test_ctx* ctx)
{
if (!TEST_CHECK(ctx != NULL)) {
return -1;
}

if (ctx->config) {
flb_config_exit(ctx->config);
}

flb_free(ctx);
return 0;
}

void test_task_map_limit()
{
struct test_ctx *ctx;
ssize_t index;
struct flb_task *tasks[TASK_COUNT_LIMIT];
int failure_detected;

ctx = test_ctx_create();

if (!TEST_CHECK(ctx != NULL)) {
return;
}

failure_detected = FLB_FALSE;

for (index = 0 ; index < TASK_COUNT_LIMIT ; index++) {
tasks[index] = task_alloc(ctx->config);

if (tasks[index] == NULL) {
failure_detected = FLB_TRUE;

break;
}
}

if (TEST_CHECK(failure_detected == FLB_TRUE)) {
TEST_CHECK(index == FLB_CONFIG_DEFAULT_TASK_MAP_SIZE_LIMIT);
}

while (index >= 0) {
if (tasks[index] != NULL) {
flb_task_destroy(tasks[index], FLB_TRUE);
}
index--;
}

test_ctx_destroy(ctx);
}

TEST_LIST = {
{ "task_map_limit" , test_task_map_limit},
{ 0 }
};
Loading