Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input_chunk: restrict appending chunks greater than size limit #9995

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
@@ -230,7 +230,8 @@ struct flb_config {
int storage_del_bad_chunks; /* delete irrecoverable chunks */
char *storage_bl_mem_limit; /* storage backlog memory limit */
struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */
int storage_trim_files; /* enable/disable file trimming */
int storage_trim_files; /* enable/disable file trimming */
size_t storage_chunk_max_size; /* The max size for a chunk in bytes */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
@@ -358,15 +359,16 @@ enum conf_type {
#define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6"

/* Storage / Chunk I/O */
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
4 changes: 4 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
@@ -154,6 +154,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_TRIM_FILES,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_trim_files)},
{FLB_CONF_STORAGE_CHUNK_MAX_SIZE,
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, storage_chunk_max_size)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
@@ -278,6 +281,7 @@ struct flb_config *flb_config_init()
config->storage_path = NULL;
config->storage_input_plugin = NULL;
config->storage_metrics = FLB_TRUE;
config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE;

config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
16 changes: 11 additions & 5 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
@@ -1142,7 +1142,13 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
}

if (id >= 0) {
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
/*
* If the chunk is busy, locked, or does not have room for the new
* data, force the creation of a new chunk.
*/
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk) ||
(flb_input_chunk_get_real_size(ic) + chunk_size) > \
in->config->storage_chunk_max_size) {
ic = NULL;
}
else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
@@ -1679,8 +1685,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
real_diff = 0;
}

/* Lock buffers where size > 2MB */
if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
/* Lock buffers where size > chunk max size */
if (content_size > in->config->storage_chunk_max_size) {
cio_chunk_lock(ic->chunk);
}

@@ -1747,8 +1753,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
content_size = cio_chunk_get_content_size(ic->chunk);

/* Do we have less than 1% available ? */
min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) {
min = (in->config->storage_chunk_max_size * 0.01);
if (in->config->storage_chunk_max_size - content_size < min) {
cio_chunk_down(ic->chunk);
}
}
49 changes: 47 additions & 2 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log_event_decoder.h>

static int input_log_append(struct flb_input_instance *ins,
size_t processor_starting_stage,
@@ -34,6 +35,11 @@ static int input_log_append(struct flb_input_instance *ins,
int processor_is_active;
void *out_buf = (void *) buf;
size_t out_size = buf_size;
size_t in_buf_start, in_buf_size, in_records;
size_t record_size, cursor, cursor_last;
struct flb_log_event_decoder decoder;
struct flb_log_event log_event;
msgpack_unpacked result;

processor_is_active = flb_processor_is_active(ins->processor);
if (processor_is_active) {
@@ -68,9 +74,48 @@ static int input_log_append(struct flb_input_instance *ins,
}
}

ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);
if (out_size < ins->config->storage_chunk_max_size) {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);
} else {
in_buf_start = 0;
in_buf_size = 0;
in_records = 0;
cursor = 0;
cursor_last = 0;
msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, out_buf, out_size, &cursor)) {
record_size = cursor - cursor_last;

/* If the current decoded record won't fit, append what we have. */
if (in_records > 0 &&
in_buf_size + record_size > ins->config->storage_chunk_max_size) {
/* Send the events we have so far. */
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, in_records,
tag, tag_len,
out_buf + in_buf_start, in_buf_size);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Egg on my face, I never realized that arithmetic on void pointers is illegal in compilers other than GCC; GCC handles void a special way, making sizeof(void) = 1. I'll need to think of another way to do this to allow for other compilers to work.


/* Set the next buffer to start where the last one ended. */
in_buf_start += in_buf_size;

/* Reset size and record count. */
in_buf_size = 0;
in_records = 0;
}

/* Add record to current tracker. */
in_records += 1;
in_buf_size += record_size;
cursor_last = cursor;
}

/* Send along any remaining events. */
if (in_records > 0) {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, in_records,
tag, tag_len,
out_buf + in_buf_start, in_buf_size);
}
}

if (processor_is_active && buf != out_buf) {
flb_free(out_buf);