Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_input_metric.h>
#include <fluent-bit/flb_input_trace.h>
#include <fluent-bit/flb_input_event.h>
#include <fluent-bit/flb_custom.h>
#include <cmetrics/cmt_decode_msgpack.h>
#include <ctraces/ctr_decode_msgpack.h>

/* Proxies */
#include "proxy/go/go.h"
Expand Down Expand Up @@ -75,8 +80,12 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
int ret = FLB_OK;
int event_type;
size_t len = 0;
size_t offset = 0;
void *data = NULL;
struct cmt *cmt = NULL;
struct ctrace *ctr = NULL;
struct flb_plugin_input_proxy_context *ctx = (struct flb_plugin_input_proxy_context *) in_context;

#ifdef FLB_HAVE_PROXY_GO
Expand All @@ -85,7 +94,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
ret = proxy_go_input_collect(ctx, &data, &len);

if (len == 0) {
flb_trace("[GO] No logs are ingested");
flb_trace("[GO] No data ingested");
return -1;
}

Expand All @@ -94,17 +103,58 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
return -1;
}

flb_input_log_append(ins, NULL, 0, data, len);
event_type = ctx->proxy->def->event_type;

if (event_type == FLB_INPUT_METRICS) {
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &offset);
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
flb_error("[proxy] failed to decode metrics msgpack (error %d)", ret);
proxy_go_input_cleanup(ctx, data);
return -1;
}
ret = flb_input_metrics_append(ins, NULL, 0, cmt);
if (ret != 0) {
flb_error("[proxy] could not append metrics, ret=%d", ret);
cmt_decode_msgpack_destroy(cmt);
proxy_go_input_cleanup(ctx, data);
return -1;
}
cmt_decode_msgpack_destroy(cmt);
}
else if (event_type == FLB_INPUT_TRACES) {
Comment on lines +108 to +124

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Map proxy input event types from output constants

For a proxy plugin that follows the existing proxy event_type convention used by outputs, metrics are registered as FLB_OUTPUT_METRICS (value 2) and traces as FLB_OUTPUT_TRACES (value 4); this new input path compares the same def->event_type against FLB_INPUT_METRICS/FLB_INPUT_TRACES (1/2). In that scenario a metrics-producing Go input is decoded as traces and a traces-producing input falls through to the log append path, so the feature fails for SDKs exposing the already-established proxy output constants unless this registration value is translated by plugin type.

Useful? React with 👍 / 👎.

ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &offset);
if (ret != CTR_DECODE_MSGPACK_SUCCESS) {
flb_error("[proxy] failed to decode traces msgpack (error %d)", ret);
proxy_go_input_cleanup(ctx, data);
return -1;
}
ret = flb_input_trace_append(ins, NULL, 0, ctr);
if (ret != 0) {
flb_error("[proxy] could not append traces, ret=%d", ret);
ctr_decode_msgpack_destroy(ctr);
proxy_go_input_cleanup(ctx, data);
return -1;
}
/* flb_input_trace_append takes ownership of ctr and destroys it on success */
}
else if (event_type == FLB_INPUT_LOGS) {
ret = flb_input_log_append(ins, NULL, 0, data, len);
}
else {
flb_error("[proxy] unsupported event_type %d for input plugin '%s'",
event_type, ins->name);
proxy_go_input_cleanup(ctx, data);
return -1;
}

ret = proxy_go_input_cleanup(ctx, data);
if (ret == -1) {
if (proxy_go_input_cleanup(ctx, data) == -1) {
flb_errno();
return -1;
}
}
#endif

return 0;
return ret;
}

static int flb_proxy_input_cb_init(struct flb_input_instance *ins,
Expand Down
Loading