Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input_chunk: restrict appending chunks greater than size limit #9995

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ struct flb_config {
int storage_del_bad_chunks; /* delete irrecoverable chunks */
char *storage_bl_mem_limit; /* storage backlog memory limit */
struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */
int storage_trim_files; /* enable/disable file trimming */
int storage_trim_files; /* enable/disable file trimming */
size_t storage_chunk_max_size; /* The max size for a chunk in bytes */

/* Embedded SQL Database support (SQLite3) */
#ifdef FLB_HAVE_SQLDB
Expand Down Expand Up @@ -358,15 +359,16 @@ enum conf_type {
#define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6"

/* Storage / Chunk I/O */
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_PATH "storage.path"
#define FLB_CONF_STORAGE_SYNC "storage.sync"
#define FLB_CONF_STORAGE_METRICS "storage.metrics"
#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum"
#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit"
#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up"
#define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \
"storage.delete_irrecoverable_chunks"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files"
#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size"

/* Coroutines */
#define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size"
Expand Down
4 changes: 4 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ struct flb_service_config service_configs[] = {
{FLB_CONF_STORAGE_TRIM_FILES,
FLB_CONF_TYPE_BOOL,
offsetof(struct flb_config, storage_trim_files)},
{FLB_CONF_STORAGE_CHUNK_MAX_SIZE,
FLB_CONF_TYPE_INT,
offsetof(struct flb_config, storage_chunk_max_size)},

/* Coroutines */
{FLB_CONF_STR_CORO_STACK_SIZE,
Expand Down Expand Up @@ -278,6 +281,7 @@ struct flb_config *flb_config_init()
config->storage_path = NULL;
config->storage_input_plugin = NULL;
config->storage_metrics = FLB_TRUE;
config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE;

config->sched_cap = FLB_SCHED_CAP;
config->sched_base = FLB_SCHED_BASE;
Expand Down
16 changes: 11 additions & 5 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,13 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
}

if (id >= 0) {
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
/*
* If the chunk is busy, locked, or does not have room for the new
* data, force the creation of a new chunk.
*/
if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk) ||
(flb_input_chunk_get_real_size(ic) + chunk_size) > \
in->config->storage_chunk_max_size) {
ic = NULL;
}
else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
Expand Down Expand Up @@ -1679,8 +1685,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
real_diff = 0;
}

/* Lock buffers where size > 2MB */
if (content_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
/* Lock buffers where size > chunk max size */
if (content_size > in->config->storage_chunk_max_size) {
cio_chunk_lock(ic->chunk);
}

Expand Down Expand Up @@ -1747,8 +1753,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
content_size = cio_chunk_get_content_size(ic->chunk);

/* Do we have less than 1% available ? */
min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
if (FLB_INPUT_CHUNK_FS_MAX_SIZE - content_size < min) {
min = (in->config->storage_chunk_max_size * 0.01);
if (in->config->storage_chunk_max_size - content_size < min) {
cio_chunk_down(ic->chunk);
}
}
Expand Down
49 changes: 47 additions & 2 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_log_event_decoder.h>

static int input_log_append(struct flb_input_instance *ins,
size_t processor_starting_stage,
Expand All @@ -34,6 +35,11 @@ static int input_log_append(struct flb_input_instance *ins,
int processor_is_active;
void *out_buf = (void *) buf;
size_t out_size = buf_size;
size_t in_buf_start, in_buf_size, in_records;
size_t record_size, cursor, cursor_last;
struct flb_log_event_decoder decoder;
struct flb_log_event log_event;
msgpack_unpacked result;

processor_is_active = flb_processor_is_active(ins->processor);
if (processor_is_active) {
Expand Down Expand Up @@ -68,9 +74,48 @@ static int input_log_append(struct flb_input_instance *ins,
}
}

ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);
if (out_size < ins->config->storage_chunk_max_size) {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);
} else {
in_buf_start = 0;
in_buf_size = 0;
in_records = 0;
cursor = 0;
cursor_last = 0;
msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, out_buf, out_size, &cursor)) {
record_size = cursor - cursor_last;

/* If the current decoded record won't fit, append what we have. */
if (in_records > 0 &&
in_buf_size + record_size > ins->config->storage_chunk_max_size) {
/* Send the events we have so far. */
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, in_records,
tag, tag_len,
out_buf + in_buf_start, in_buf_size);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Egg on my face, I never realized that arithmetic on void pointers is illegal in compilers other than GCC; GCC handles void a special way, making sizeof(void) = 1. I'll need to think of another way to do this to allow for other compilers to work.


/* Set the next buffer to start where the last one ended. */
in_buf_start += in_buf_size;

/* Reset size and record count. */
in_buf_size = 0;
in_records = 0;
}

/* Add record to current tracker. */
in_records += 1;
in_buf_size += record_size;
cursor_last = cursor;
}

/* Send along any remaining events. */
if (in_records > 0) {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, in_records,
tag, tag_len,
out_buf + in_buf_start, in_buf_size);
}
}

if (processor_is_active && buf != out_buf) {
flb_free(out_buf);
Expand Down
Loading