diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 88ff8cb0af3..ecb51f3dc24 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -52,8 +52,14 @@ FLB_EXPORT void flb_init_env(); FLB_EXPORT flb_ctx_t *flb_create(); FLB_EXPORT void flb_destroy(flb_ctx_t *ctx); FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data); +FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb); +FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, + ...); FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc); FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data); FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...); diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 5781d41e47e..ac428c7e656 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -221,7 +221,7 @@ int flb_processor_run(struct flb_processor *proc, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name); + const char *unit_name); void flb_processor_unit_destroy(struct flb_processor_unit *pu); int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v); diff --git a/src/flb_lib.c b/src/flb_lib.c index 0e4cde0dbaf..c746acdf43b 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -329,6 +330,72 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +static int flb_processor_event_type_get(const char *event_type) +{ + if (strcasecmp(event_type, "logs") == 0) { + return FLB_PROCESSOR_LOGS; + } + else if (strcasecmp(event_type, "metrics") == 0) { + return FLB_PROCESSOR_METRICS; + } + else if (strcasecmp(event_type, "traces") == 0) { + return FLB_PROCESSOR_TRACES; + } + else if (strcasecmp(event_type, "profiles") == 0) { + return FLB_PROCESSOR_PROFILES; + } + else { + return -1; + } +} + +/* Create a single processor unit for the input processor */ +int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_input_instance *i_ins; + struct flb_processor *proc; + struct flb_processor_unit * pu; + char *key; + char *value; + va_list va; + + i_ins = in_instance_get(ctx, ffd); + if (!i_ins) { + return -1; + } + proc = i_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_input_instance *i_ins; @@ -545,6 +612,53 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...) return 0; } +/* Create a single processor unit for the input processor */ +int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, + const char *processor_unit_name, int ffd, ...) +{ + int ret; + int type; + struct flb_output_instance *o_ins; + struct flb_processor *proc; + struct flb_processor_unit * pu; + char *key; + char *value; + va_list va; + + o_ins = out_instance_get(ctx, ffd); + if (!o_ins) { + return -1; + } + proc = o_ins->processor; + if (!proc) { + return -1; + } + type = flb_processor_event_type_get(event_type); + if (type == -1) { + return -1; + } + pu = flb_processor_unit_create(proc, type, processor_unit_name); + va_start(va, ffd); + while ((key = va_arg(va, char *))) { + value = va_arg(va, char *); + if (!value) { + /* Wrong parameter */ + va_end(va); + return -1; + } + struct cfl_variant cfl_value = { + .type = CFL_VARIANT_STRING, + .data.as_string = value, + }; + ret = flb_processor_unit_set_property(pu, key, &cfl_value); + if (ret != 0) { + va_end(va); + return -1; + } + } + return 0; +} + int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) { struct flb_output_instance *o_ins; diff --git a/src/flb_processor.c b/src/flb_processor.c index 9322e297b1e..ae0b465794f 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -136,7 +136,7 @@ struct flb_processor *flb_processor_create(struct flb_config *config, struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, int event_type, - char *unit_name) + const char *unit_name) { int result; struct mk_list *head;