Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: add input grace period and check pending chunks on shutdown #9952

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ struct flb_config {
* shutdown when all remaining tasks are flushed
*/
int grace;
int grace_count; /* Count of grace shutdown tries */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* convert null to nan ? */
int grace_count; /* Count of grace shutdown tries */
int grace_input; /* Shutdown grace to keep inputs ingesting */
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
int convert_nan_to_null; /* Convert null to nan ? */

int daemon; /* Run as a daemon ? */
flb_pipefd_t shutdown_fd; /* Shutdown FD, 5 seconds */
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int flb_engine_exit_status(struct flb_config *config, int status);
int flb_engine_shutdown(struct flb_config *config);
int flb_engine_destroy_tasks(struct mk_list *tasks);
void flb_engine_reschedule_retries(struct flb_config *config);
void flb_engine_stop_ingestion(struct flb_config *config);

/* Engine event loop */
void flb_engine_evl_init();
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx);
/* cmetrics */
int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_metrics *sm);

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);

#endif
1 change: 1 addition & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ struct flb_config *flb_config_init()
config->verbose = 3;
config->grace = 5;
config->grace_count = 0;
config->grace_input = config->grace / 2;
config->exit_status_code = 0;

/* json */
Expand Down
59 changes: 54 additions & 5 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ int sb_segregate_chunks(struct flb_config *config)
int flb_engine_start(struct flb_config *config)
{
int ret;
int tasks = 0;
int fs_chunks = 0;
int mem_chunks = 0;
uint64_t ts;
char tmp[16];
int rb_flush_flag;
Expand All @@ -695,6 +698,7 @@ int flb_engine_start(struct flb_config *config)
struct flb_sched *sched;
struct flb_net_dns dns_ctx;
struct flb_notification *notification;
int exiting = FLB_FALSE;

/* Initialize the networking layer */
flb_net_lib_init();
Expand Down Expand Up @@ -946,11 +950,15 @@ int flb_engine_start(struct flb_config *config)

ret = sb_segregate_chunks(config);

if (ret) {
if (ret < 0)
{
flb_error("[engine] could not segregate backlog chunks");
return -2;
}

config->grace_input = config->grace / 2;
flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input);

while (1) {
rb_flush_flag = FLB_FALSE;

Expand Down Expand Up @@ -1019,19 +1027,49 @@ int flb_engine_start(struct flb_config *config)
* If grace period is set to -1, keep trying to shut down until all
* tasks and retries get flushed.
*/
ret = flb_task_running_count(config);
tasks = 0;
mem_chunks = 0;
fs_chunks = 0;
tasks = flb_task_running_count(config);
flb_chunk_count(config, &mem_chunks, &fs_chunks);
ret = tasks + mem_chunks + fs_chunks;
if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) {
if (config->grace_count == 1) {
flb_task_running_print(config);
ret = sb_segregate_chunks(config);
if (ret < 0) {
flb_error("[engine] could not segregate backlog chunks");
return -2;
}
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}

/* Create new tasks for pending chunks */
flb_engine_flush(config, NULL);
if (config->grace_count < config->grace_input) {
if (exiting == FLB_FALSE) {
flb_engine_exit(config);
exiting = FLB_TRUE;
}
} else {
if (config->is_ingestion_active == FLB_TRUE) {
flb_engine_stop_ingestion(config);
}
}
flb_engine_exit(config);
}
else {
if (ret > 0) {
if (tasks > 0) {
flb_task_running_print(config);
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
tasks);
ret = config->exit_status_code;
flb_engine_shutdown(config);
config = NULL;
Expand Down Expand Up @@ -1132,6 +1170,7 @@ int flb_engine_shutdown(struct flb_config *config)
struct flb_sched_timer_coro_cb_params *sched_params;

config->is_running = FLB_FALSE;
config->is_ingestion_active = FLB_FALSE;
flb_input_pause_all(config);

#ifdef FLB_HAVE_STREAM_PROCESSOR
Expand Down Expand Up @@ -1200,6 +1239,16 @@ int flb_engine_exit(struct flb_config *config)
return ret;
}

/* Stop ingestion and pause all inputs */
void flb_engine_stop_ingestion(struct flb_config *config)
{
config->is_ingestion_active = FLB_FALSE;
config->is_shutting_down = FLB_TRUE;

flb_info("[engine] pausing all inputs..");
flb_input_pause_all(config);
}

int flb_engine_exit_status(struct flb_config *config, int status)
{
config->exit_status_code = status;
Expand Down
10 changes: 10 additions & 0 deletions src/flb_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,16 @@ int flb_storage_create(struct flb_config *ctx)
return 0;
}

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks)
{
struct cio_stats storage_st;

cio_stats_get(ctx->cio, &storage_st);

*mem_chunks = storage_st.chunks_mem;
*fs_chunks = storage_st.chunks_fs;
}

void flb_storage_destroy(struct flb_config *ctx)
{
struct cio_ctx *cio;
Expand Down