Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipe: add flb_pipe_error #10017

Merged
merged 5 commits into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ static FLB_INLINE void flb_input_return(struct flb_coro *coro) {
val = FLB_BITS_U64_SET(FLB_ENGINE_IN_CORO, ins->id);
n = flb_pipe_w(ins->ch_events[1], (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

flb_input_coro_prepare_destroy(input_coro);
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,20 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char *
int flb_log_worker_init(struct flb_worker *worker);
int flb_log_worker_destroy(struct flb_worker *worker);
int flb_errno_print(int errnum, const char *file, int line);
#ifdef WIN32
int flb_wsa_get_last_error_print(int errnum, const char *file, int line);
#endif

#ifdef __FLB_FILENAME__
#define flb_errno() flb_errno_print(errno, __FLB_FILENAME__, __LINE__)
#ifdef WIN32
#define flb_wsa_get_last_error() flb_wsa_get_last_error_print(WSAGetLastError(), __FLB_FILENAME__, __LINE__)
#endif
#else
#define flb_errno() flb_errno_print(errno, __FILE__, __LINE__)
#ifdef WIN32
#define flb_wsa_get_last_error() flb_wsa_get_last_error_print(WSAGetLastError(), __FILE__, __LINE__)
#endif
#endif

#endif
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
/* Notify the event loop about our return status */
n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

/*
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
#define flb_sockfd_t evutil_socket_t
#define flb_pipe_w(fd, buf, len) send(fd, buf, len, 0)
#define flb_pipe_r(fd, buf, len) recv(fd, buf, len, 0)
#define flb_pipe_error() flb_wsa_get_last_error()
#define FLB_PIPE_WOULDBLOCK() (WSAGetLastError() == WSAEWOULDBLOCK)
#else
#define flb_pipefd_t int
#define flb_sockfd_t int
#define flb_pipe_w(fd, buf, len) write(fd, buf, len)
#define flb_pipe_r(fd, buf, len) read(fd, buf, len)
#define flb_pipe_error() flb_errno()
#define FLB_PIPE_WOULDBLOCK() (errno == EAGAIN || errno == EWOULDBLOCK)
#endif

Expand All @@ -43,5 +45,6 @@ int flb_pipe_close(flb_pipefd_t fd);
int flb_pipe_set_nonblocking(flb_pipefd_t fd);
ssize_t flb_pipe_read_all(int fd, void *buf, size_t count);
ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count);
void flb_pipe_log_last_error();

#endif
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static FLB_INLINE void flb_sched_timer_cb_coro_return()
val = FLB_BITS_U64_SET(FLB_SCHED_TIMER_CORO_RETURN, stc->id);
n = flb_pipe_w(sched->ch_events[1], &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

flb_coro_yield(coro, FLB_TRUE);
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_exec/in_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static int in_exec_collect(struct flb_input_instance *ins,
if (ctx->oneshot == FLB_TRUE) {
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down Expand Up @@ -256,7 +256,7 @@ static int in_exec_config_read(struct flb_exec *ctx,
flb_plg_error(in, "unable to load configuration");
return -1;
}

/* filepath setting */
if (ctx->cmd == NULL) {
flb_plg_error(in, "no input 'command' was given");
Expand Down Expand Up @@ -418,7 +418,7 @@ static int in_exec_prerun(struct flb_input_instance *ins,
/* Kick the oneshot execution */
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
return 0;
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_exec_wasi/in_exec_wasi.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static int in_exec_wasi_collect(struct flb_input_instance *ins,
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
if (ret == -1) {
fclose(stdoutp);
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ static int in_exec_wasi_prerun(struct flb_input_instance *ins,
/* Kick the oneshot execution */
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/in_lib/in_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static int in_lib_collect(struct flb_input_instance *ins,
flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes);
if (bytes == -1) {
perror("read");
flb_pipe_error();
if (errno == -EPIPE) {
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static inline int consume_byte(flb_pipefd_t fd)
/* We need to consume the byte */
ret = flb_pipe_r(fd, (char *) &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down
6 changes: 3 additions & 3 deletions plugins/in_tail/tail_signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static inline int tail_signal_manager(struct flb_tail_config *ctx)
/* Insert a dummy event into the channel manager */
n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
else {
Expand All @@ -68,7 +68,7 @@ static inline int tail_signal_pending(struct flb_tail_config *ctx)
* notification is already pending, it's safe to ignore.
*/
if (n == -1 && !FLB_PIPE_WOULDBLOCK()) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -87,7 +87,7 @@ static inline int tail_consume_pending(struct flb_tail_config *ctx)
do {
ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val));
if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) {
flb_errno();
flb_pipe_error();
return -1;
}
} while (!FLB_PIPE_WOULDBLOCK());
Expand Down
8 changes: 4 additions & 4 deletions src/aws/flb_aws_credentials_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf)
if (FLB_PIPE_WOULDBLOCK()) {
return 1;
}
flb_errno();
flb_pipe_error();
return -1;
}
else if (result == 0) { /* EOF */
Expand Down Expand Up @@ -481,7 +481,7 @@ static void exec_process_child(struct process* p)
{
while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) {
if (errno != EINTR) {
return;
return;
}
}
while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) {
Expand All @@ -491,7 +491,7 @@ static void exec_process_child(struct process* p)
}
while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) {
if (errno != EINTR) {
return;
return;
}
}

Expand Down Expand Up @@ -558,7 +558,7 @@ static int read_from_process(struct process* p, struct readbuf* buf)
return -1;
}

flb_time_set(&timeout,
flb_time_set(&timeout,
(time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC),
((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS);

Expand Down
8 changes: 4 additions & 4 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static inline int handle_input_event(flb_pipefd_t fd, uint64_t ts,

bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -484,7 +484,7 @@ static inline int handle_output_events(flb_pipefd_t fd,
bytes = flb_pipe_r(fd, &values, sizeof(values));

if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -525,7 +525,7 @@ static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config)
/* read the event */
bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -1049,7 +1049,7 @@ int flb_engine_start(struct flb_config *config)
/* Read the coroutine reference */
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
if (ret <= 0 || output_flush == 0) {
flb_errno();
flb_pipe_error();
continue;
}

Expand Down
14 changes: 7 additions & 7 deletions src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static inline int handle_input_thread_event(flb_pipefd_t fd, struct flb_config *

bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -426,7 +426,7 @@ static void input_thread(void *data)
/* Read the coroutine reference */
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
if (ret <= 0 || output_flush == 0) {
flb_errno();
flb_pipe_error();
continue;
}

Expand Down Expand Up @@ -518,7 +518,7 @@ int flb_input_thread_instance_pause(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -543,7 +543,7 @@ int flb_input_thread_instance_resume(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -565,7 +565,7 @@ int flb_input_thread_instance_exit(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -731,7 +731,7 @@ int flb_input_thread_collectors_signal_start(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(uint64_t));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -749,7 +749,7 @@ int flb_input_thread_collectors_signal_wait(struct flb_input_instance *ins)
thi = ins->thi;
bytes = flb_pipe_r(thi->ch_parent_events[0], &val, sizeof(uint64_t));
if (bytes <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down
2 changes: 1 addition & 1 deletion src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
else {
ret = flb_pipe_w(i_ins->channel[1], data, len);
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down
25 changes: 20 additions & 5 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
#include <fluent-bit/flb_worker.h>
#include <fluent-bit/flb_mem.h>

#ifdef WIN32
#include <winsock.h>
#include <winbase.h>
#endif

#ifdef FLB_HAVE_AWS_ERROR_REPORTER
#include <fluent-bit/aws/flb_aws_error_reporter.h>

Expand All @@ -57,7 +62,7 @@ static inline int64_t flb_log_consume_signal(struct flb_log *context)
sizeof(signal_value));

if (result <= 0) {
flb_errno();
flb_pipe_error();

return -1;
}
Expand All @@ -75,7 +80,7 @@ static inline int flb_log_enqueue_signal(struct flb_log *context,
sizeof(signal_value));

if (result <= 0) {
flb_errno();
flb_pipe_error();

result = 1;
}
Expand Down Expand Up @@ -121,8 +126,6 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log)
bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message));

if (bytes <= 0) {
flb_errno();

return -1;
}
if (msg.size > sizeof(msg.msg)) {
Expand Down Expand Up @@ -742,8 +745,20 @@ int flb_errno_print(int errnum, const char *file, int line)

strerror_r(errnum, buf, sizeof(buf) - 1);
flb_error("[%s:%i errno=%i] %s", file, line, errnum, buf);
return 0;
return errnum;
}

#ifdef WIN32
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make the body of the function or the whole function conditional?
This function is only referenced by flb_WSAGetLastError if our target is Windows so I don't think making the whole thing conditional would be a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep fair assessment. Done in 7a1429c

int flb_wsa_get_last_error_print(int errnum, const char *file, int line)
{
char buf[256];
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, errnum, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
buf, sizeof(buf), NULL);
flb_error("[%s:%i WSAGetLastError=%i] %s", file, line, errnum, buf);
return errnum;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a return value or change the return type to void

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a return value to keep it lined up with flb_errno in 7a1429c

#endif

int flb_log_destroy(struct flb_log *log, struct flb_config *config)
{
Expand Down
4 changes: 2 additions & 2 deletions src/flb_notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ int flb_notification_enqueue(int plugin_type,
sizeof(void *));

if (result == -1) {
flb_errno();
flb_pipe_error();

return -1;
}
Expand All @@ -291,7 +291,7 @@ int flb_notification_receive(flb_pipefd_t channel,
result = flb_pipe_r(channel, notification, sizeof(struct flb_notification *));

if (result <= 0) {
flb_errno();
flb_pipe_error();
return -1;;
}

Expand Down
2 changes: 1 addition & 1 deletion src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ int flb_output_task_flush(struct flb_task *task,
ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
sizeof(struct flb_output_flush*));
if (ret == -1) {
flb_errno();
flb_pipe_error();
flb_output_flush_destroy(out_flush);
flb_task_users_dec(task, FLB_FALSE);

Expand Down
Loading
Loading