Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,35 @@ int flb_input_chunk_write_at(void *data, off_t offset,
int flb_input_chunk_append_obj(struct flb_input_instance *in,
const char *tag, int tag_len,
msgpack_object data);
/*
* Skip input-level metrics (records/bytes total) on this append. Used for
* internal route copies so conditional routing does not inflate the totals
* by the route count.
*/
#define FLB_INPUT_CHUNK_SKIP_INPUT_METRICS (1 << 1)

int flb_input_chunk_append_raw(struct flb_input_instance *in,
int event_type,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_append_raw_flags(struct flb_input_instance *in,
int event_type,
int flags,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_append_raw_local(struct flb_input_instance *in,
int event_type,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_append_raw_local_flags(struct flb_input_instance *in,
int event_type,
int flags,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size);
int flb_input_chunk_ring_buffer_enqueue(struct flb_input_instance *in,
int event_type,
size_t records,
Expand Down
35 changes: 30 additions & 5 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2651,6 +2651,7 @@ static int memrb_input_chunk_release_space(struct flb_input_instance *ins,
/* Append a RAW MessagPack buffer to the input instance */
static int input_chunk_append_raw(struct flb_input_instance *in,
int event_type,
int flags,
size_t n_records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
Expand Down Expand Up @@ -2793,7 +2794,8 @@ static int input_chunk_append_raw(struct flb_input_instance *in,

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
if (ic->total_records > 0) {
/* skip for internal route copies; the batch is counted once elsewhere */
if (ic->total_records > 0 && !(flags & FLB_INPUT_CHUNK_SKIP_INPUT_METRICS)) {
/* timestamp */
ts = cfl_time_now();

Expand Down Expand Up @@ -3136,7 +3138,8 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
cr->buf_data, cr->buf_size);
}
else {
input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
input_chunk_append_raw(cr->ins, cr->event_type, cr->flags,
cr->records,
cr->tag, tag_len,
cr->buf_data, cr->buf_size);
}
Expand All @@ -3154,6 +3157,17 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
{
return flb_input_chunk_append_raw_flags(in, event_type, 0, records,
tag, tag_len, buf, buf_size);
}

int flb_input_chunk_append_raw_flags(struct flb_input_instance *in,
int event_type,
int flags,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
{
int ret;

Expand All @@ -3162,12 +3176,12 @@ int flb_input_chunk_append_raw(struct flb_input_instance *in,
* add the data reference to the ring buffer.
*/
if (flb_input_is_threaded(in)) {
ret = append_to_ring_buffer(in, event_type, 0, records,
ret = append_to_ring_buffer(in, event_type, flags, records,
tag, tag_len,
buf, buf_size);
}
else {
ret = input_chunk_append_raw(in, event_type, records,
ret = input_chunk_append_raw(in, event_type, flags, records,
tag, tag_len, buf, buf_size);
}

Expand All @@ -3180,7 +3194,18 @@ int flb_input_chunk_append_raw_local(struct flb_input_instance *in,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
{
return input_chunk_append_raw(in, event_type, records,
return input_chunk_append_raw(in, event_type, 0, records,
tag, tag_len, buf, buf_size);
}

int flb_input_chunk_append_raw_local_flags(struct flb_input_instance *in,
int event_type,
int flags,
size_t records,
const char *tag, size_t tag_len,
const void *buf, size_t buf_size)
{
return input_chunk_append_raw(in, event_type, flags, records,
tag, tag_len, buf, buf_size);
}

Expand Down
94 changes: 78 additions & 16 deletions src/flb_input_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
size_t tag_len,
const void *buf,
size_t buf_size,
int local_append)
int local_append,
int *any_appended)
{
int ret;
int appended;
Expand Down Expand Up @@ -1397,18 +1398,21 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
continue;
}

/* internal route copy: skip metrics, the batch is counted once */
if (local_append == FLB_TRUE) {
ret = flb_input_chunk_append_raw_local(ins,
ret = flb_input_chunk_append_raw_local_flags(ins,
FLB_INPUT_LOGS,
FLB_INPUT_CHUNK_SKIP_INPUT_METRICS,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid dropping metrics when the base append is skipped

In conditional routing under mem_buf_limit or storage pause, one of these routed appends can successfully create a route-specific chunk and then pause the input via flb_input_chunk_protect; because the route copy is marked FLB_INPUT_CHUNK_SKIP_INPUT_METRICS, the only planned input-counter update is the later unconditional append. That append runs after this loop and returns before counting when flb_input_buf_paused() is true, so chunks already accepted for conditional routes are delivered without ever being reflected in fluentbit_input_records_total / _bytes_total.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex review

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Count route copies if the base append can fail

When conditional/per-record routing is active and a route-copy append succeeds but the later unconditional append fails (for example, the route copy pushes the input over mem_buf_limit/storage limits and flb_input_chunk_protect() pauses it before the base append runs), this flag suppresses the only input metrics for data that remains queued for that route. The function returns an error, but the per-route chunk has already been appended and had its direct outputs applied, so fluentbit_input_records_total / bytes can under-report delivered records on that failure path; count the original ingestion before splitting or account/undo the successful route copies when the base append fails.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex review

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex Review: Didn't find any major issues. More of your lovely PRs please.

Reviewed commit: a06ca3ca09

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

payload->total_records,
payload->tag,
flb_sds_len(payload->tag),
payload->data,
payload->size);
}
else {
ret = flb_input_chunk_append_raw(ins,
ret = flb_input_chunk_append_raw_flags(ins,
FLB_INPUT_LOGS,
FLB_INPUT_CHUNK_SKIP_INPUT_METRICS,
payload->total_records,
payload->tag,
flb_sds_len(payload->tag),
Expand Down Expand Up @@ -1445,6 +1449,14 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
return -1;
}

/*
* Report acceptance so the caller counts the batch once, even if a
* later route copy in this loop fails.
*/
if (any_appended != NULL) {
*any_appended = FLB_TRUE;
}

appended++;
}

Expand Down Expand Up @@ -1473,6 +1485,9 @@ static int input_log_append_processed_internal(struct flb_input_instance *ins,
int ret;
int conditional_result;
int conditional_handled = FLB_FALSE;
int conditional_active = FLB_FALSE;
int base_flags = 0;
int accepted = FLB_FALSE;
size_t dummy = 0;
const char *base_tag = tag;
size_t base_tag_len = tag_len;
Expand All @@ -1492,29 +1507,76 @@ static int input_log_append_processed_internal(struct flb_input_instance *ins,
base_tag_len = strlen(base_tag);
}

conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len,
buf, buf_size,
local_append);
if (conditional_result < 0) {
return -1;
/*
* With conditional routing the batch is fanned out into per-route copies
* plus a base append, all skipping input metrics. It is accounted once
* below, after at least one append is accepted, so a fully rejected batch
* is not counted. The non-routed path keeps counting in
* flb_input_chunk_append_raw().
*/
conditional_active = (cfl_list_size(&ins->routes_direct) > 0 &&
input_has_conditional_routes(ins) == FLB_TRUE);
if (conditional_active == FLB_TRUE) {
base_flags = FLB_INPUT_CHUNK_SKIP_INPUT_METRICS;
}

conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len,
buf, buf_size,
local_append, &accepted);
if (conditional_result > 0) {
conditional_handled = FLB_TRUE;
}

/*
* Always call flb_input_chunk_append_raw to ensure non-conditional routes
* receive data even when conditional routes exist. The conditional routing
* should be additive, not exclusive.
* Base append carries the full buffer for non-conditional routes (routing
* is additive). It is skipped on split failure.
*/
if (local_append == FLB_TRUE) {
ret = flb_input_chunk_append_raw_local(ins, FLB_INPUT_LOGS, records,
tag, tag_len, buf, buf_size);
if (conditional_result >= 0) {
if (local_append == FLB_TRUE) {
ret = flb_input_chunk_append_raw_local_flags(ins, FLB_INPUT_LOGS,
base_flags, records,
tag, tag_len, buf, buf_size);
}
else {
ret = flb_input_chunk_append_raw_flags(ins, FLB_INPUT_LOGS,
base_flags, records,
tag, tag_len, buf, buf_size);
}
if (ret == 0) {
accepted = FLB_TRUE;
}
}
else {
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, buf, buf_size);
ret = -1;
}

#ifdef FLB_HAVE_METRICS
/* count the batch once, now that at least one append was accepted */
if (conditional_active == FLB_TRUE && accepted == FLB_TRUE && buf_size > 0) {
size_t effective_records = records;

/* mirror the zero-record recovery in input_chunk_append_raw() */
if (effective_records == 0) {
effective_records = flb_mp_count_log_records(buf, buf_size);
}

if (effective_records > 0) {
uint64_t ts = cfl_time_now();
char *name = (char *) flb_input_name(ins);

cmt_counter_add(ins->cmt_records, ts, effective_records, 1,
(char *[]) {name});
cmt_counter_add(ins->cmt_bytes, ts, buf_size, 1,
(char *[]) {name});

flb_metrics_sum(FLB_METRIC_N_RECORDS, effective_records, ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, ins->metrics);
}
}
#endif

if (conditional_result < 0) {
return -1;
}

if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) {
Expand Down
Loading