diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 7708fda1947..e15c4a58cba 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -671,7 +671,7 @@ static FLB_INLINE void flb_input_return(struct flb_coro *coro) { val = FLB_BITS_U64_SET(FLB_ENGINE_IN_CORO, ins->id); n = flb_pipe_w(ins->ch_events[1], (void *) &val, sizeof(val)); if (n == -1) { - flb_errno(); + flb_pipe_error(); } flb_input_coro_prepare_destroy(input_coro); diff --git a/include/fluent-bit/flb_log.h b/include/fluent-bit/flb_log.h index 96cbf68326e..32647824e9e 100644 --- a/include/fluent-bit/flb_log.h +++ b/include/fluent-bit/flb_log.h @@ -232,11 +232,20 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char * int flb_log_worker_init(struct flb_worker *worker); int flb_log_worker_destroy(struct flb_worker *worker); int flb_errno_print(int errnum, const char *file, int line); +#ifdef WIN32 +int flb_wsa_get_last_error_print(int errnum, const char *file, int line); +#endif #ifdef __FLB_FILENAME__ #define flb_errno() flb_errno_print(errno, __FLB_FILENAME__, __LINE__) +#ifdef WIN32 +#define flb_wsa_get_last_error() flb_wsa_get_last_error_print(WSAGetLastError(), __FLB_FILENAME__, __LINE__) +#endif #else #define flb_errno() flb_errno_print(errno, __FILE__, __LINE__) +#ifdef WIN32 +#define flb_wsa_get_last_error() flb_wsa_get_last_error_print(WSAGetLastError(), __FILE__, __LINE__) +#endif #endif #endif diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 46409a27292..83c84cc1c4f 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -1199,7 +1199,7 @@ static inline void flb_output_return(int ret, struct flb_coro *co) { /* Notify the event loop about our return status */ n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val)); if (n == -1) { - flb_errno(); + flb_pipe_error(); } /* diff --git a/include/fluent-bit/flb_pipe.h b/include/fluent-bit/flb_pipe.h index 0c46ddcb670..a39aa852f9a 100644 --- a/include/fluent-bit/flb_pipe.h +++ b/include/fluent-bit/flb_pipe.h @@ -28,12 +28,14 @@ #define flb_sockfd_t evutil_socket_t #define flb_pipe_w(fd, buf, len) send(fd, buf, len, 0) #define flb_pipe_r(fd, buf, len) recv(fd, buf, len, 0) +#define flb_pipe_error() flb_wsa_get_last_error() #define FLB_PIPE_WOULDBLOCK() (WSAGetLastError() == WSAEWOULDBLOCK) #else #define flb_pipefd_t int #define flb_sockfd_t int #define flb_pipe_w(fd, buf, len) write(fd, buf, len) #define flb_pipe_r(fd, buf, len) read(fd, buf, len) +#define flb_pipe_error() flb_errno() #define FLB_PIPE_WOULDBLOCK() (errno == EAGAIN || errno == EWOULDBLOCK) #endif @@ -43,5 +45,6 @@ int flb_pipe_close(flb_pipefd_t fd); int flb_pipe_set_nonblocking(flb_pipefd_t fd); ssize_t flb_pipe_read_all(int fd, void *buf, size_t count); ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count); +void flb_pipe_log_last_error(); #endif diff --git a/include/fluent-bit/flb_scheduler.h b/include/fluent-bit/flb_scheduler.h index 891d35195f3..11b526d350f 100644 --- a/include/fluent-bit/flb_scheduler.h +++ b/include/fluent-bit/flb_scheduler.h @@ -234,7 +234,7 @@ static FLB_INLINE void flb_sched_timer_cb_coro_return() val = FLB_BITS_U64_SET(FLB_SCHED_TIMER_CORO_RETURN, stc->id); n = flb_pipe_w(sched->ch_events[1], &val, sizeof(val)); if (n == -1) { - flb_errno(); + flb_pipe_error(); } flb_coro_yield(coro, FLB_TRUE); diff --git a/plugins/in_exec/in_exec.c b/plugins/in_exec/in_exec.c index f59549fcb0e..4b37b87bb1c 100644 --- a/plugins/in_exec/in_exec.c +++ b/plugins/in_exec/in_exec.c @@ -54,7 +54,7 @@ static int in_exec_collect(struct flb_input_instance *ins, if (ctx->oneshot == FLB_TRUE) { ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } } @@ -256,7 +256,7 @@ static int in_exec_config_read(struct flb_exec *ctx, flb_plg_error(in, "unable to load configuration"); return -1; } - + /* filepath setting */ if (ctx->cmd == NULL) { flb_plg_error(in, "no input 'command' was given"); @@ -418,7 +418,7 @@ static int in_exec_prerun(struct flb_input_instance *ins, /* Kick the oneshot execution */ ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } return 0; diff --git a/plugins/in_exec_wasi/in_exec_wasi.c b/plugins/in_exec_wasi/in_exec_wasi.c index 4da6e85b26a..d10f763e176 100644 --- a/plugins/in_exec_wasi/in_exec_wasi.c +++ b/plugins/in_exec_wasi/in_exec_wasi.c @@ -69,7 +69,7 @@ static int in_exec_wasi_collect(struct flb_input_instance *ins, ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val)); if (ret == -1) { fclose(stdoutp); - flb_errno(); + flb_pipe_error(); return -1; } } @@ -404,7 +404,7 @@ static int in_exec_wasi_prerun(struct flb_input_instance *ins, /* Kick the oneshot execution */ ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } return 0; diff --git a/plugins/in_lib/in_lib.c b/plugins/in_lib/in_lib.c index d1065a8c75c..71eafe82616 100644 --- a/plugins/in_lib/in_lib.c +++ b/plugins/in_lib/in_lib.c @@ -69,6 +69,7 @@ static int in_lib_collect(struct flb_input_instance *ins, flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes); if (bytes == -1) { perror("read"); + flb_pipe_error(); if (errno == -EPIPE) { return -1; } diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index c27d7588325..6fac8d6032c 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -50,7 +50,7 @@ static inline int consume_byte(flb_pipefd_t fd) /* We need to consume the byte */ ret = flb_pipe_r(fd, (char *) &val, sizeof(val)); if (ret <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } diff --git a/plugins/in_tail/tail_signal.h b/plugins/in_tail/tail_signal.h index 8d7472f8e8a..8dc3165dbfb 100644 --- a/plugins/in_tail/tail_signal.h +++ b/plugins/in_tail/tail_signal.h @@ -45,7 +45,7 @@ static inline int tail_signal_manager(struct flb_tail_config *ctx) /* Insert a dummy event into the channel manager */ n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val)); if (n == -1) { - flb_errno(); + flb_pipe_error(); return -1; } else { @@ -68,7 +68,7 @@ static inline int tail_signal_pending(struct flb_tail_config *ctx) * notification is already pending, it's safe to ignore. */ if (n == -1 && !FLB_PIPE_WOULDBLOCK()) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -87,7 +87,7 @@ static inline int tail_consume_pending(struct flb_tail_config *ctx) do { ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val)); if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) { - flb_errno(); + flb_pipe_error(); return -1; } } while (!FLB_PIPE_WOULDBLOCK()); diff --git a/src/aws/flb_aws_credentials_process.c b/src/aws/flb_aws_credentials_process.c index 44c024ca776..4eaa30fe850 100644 --- a/src/aws/flb_aws_credentials_process.c +++ b/src/aws/flb_aws_credentials_process.c @@ -369,7 +369,7 @@ static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf) if (FLB_PIPE_WOULDBLOCK()) { return 1; } - flb_errno(); + flb_pipe_error(); return -1; } else if (result == 0) { /* EOF */ @@ -481,7 +481,7 @@ static void exec_process_child(struct process* p) { while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) { if (errno != EINTR) { - return; + return; } } while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) { @@ -491,7 +491,7 @@ static void exec_process_child(struct process* p) } while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) { if (errno != EINTR) { - return; + return; } } @@ -558,7 +558,7 @@ static int read_from_process(struct process* p, struct readbuf* buf) return -1; } - flb_time_set(&timeout, + flb_time_set(&timeout, (time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC), ((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS); diff --git a/src/flb_engine.c b/src/flb_engine.c index bedc28477c5..cc153370f20 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -190,7 +190,7 @@ static inline int handle_input_event(flb_pipefd_t fd, uint64_t ts, bytes = flb_pipe_r(fd, &val, sizeof(val)); if (bytes == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -484,7 +484,7 @@ static inline int handle_output_events(flb_pipefd_t fd, bytes = flb_pipe_r(fd, &values, sizeof(values)); if (bytes == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -525,7 +525,7 @@ static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config) /* read the event */ bytes = flb_pipe_r(fd, &val, sizeof(val)); if (bytes == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -1049,7 +1049,7 @@ int flb_engine_start(struct flb_config *config) /* Read the coroutine reference */ ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *)); if (ret <= 0 || output_flush == 0) { - flb_errno(); + flb_pipe_error(); continue; } diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index 52966121759..053a56e2b03 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -105,7 +105,7 @@ static inline int handle_input_thread_event(flb_pipefd_t fd, struct flb_config * bytes = flb_pipe_r(fd, &val, sizeof(val)); if (bytes == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -426,7 +426,7 @@ static void input_thread(void *data) /* Read the coroutine reference */ ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *)); if (ret <= 0 || output_flush == 0) { - flb_errno(); + flb_pipe_error(); continue; } @@ -518,7 +518,7 @@ int flb_input_thread_instance_pause(struct flb_input_instance *ins) ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val)); if (ret <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -543,7 +543,7 @@ int flb_input_thread_instance_resume(struct flb_input_instance *ins) ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val)); if (ret <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -565,7 +565,7 @@ int flb_input_thread_instance_exit(struct flb_input_instance *ins) ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val)); if (ret <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -731,7 +731,7 @@ int flb_input_thread_collectors_signal_start(struct flb_input_instance *ins) ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(uint64_t)); if (ret <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -749,7 +749,7 @@ int flb_input_thread_collectors_signal_wait(struct flb_input_instance *ins) thi = ins->thi; bytes = flb_pipe_r(thi->ch_parent_events[0], &val, sizeof(uint64_t)); if (bytes <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } diff --git a/src/flb_lib.c b/src/flb_lib.c index 0e4cde0dbaf..30d4d99dd40 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -793,7 +793,7 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) else { ret = flb_pipe_w(i_ins->channel[1], data, len); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } } diff --git a/src/flb_log.c b/src/flb_log.c index c5c73d40ad0..851793216d4 100644 --- a/src/flb_log.c +++ b/src/flb_log.c @@ -33,6 +33,11 @@ #include #include +#ifdef WIN32 +#include +#include +#endif + #ifdef FLB_HAVE_AWS_ERROR_REPORTER #include @@ -57,7 +62,7 @@ static inline int64_t flb_log_consume_signal(struct flb_log *context) sizeof(signal_value)); if (result <= 0) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -75,7 +80,7 @@ static inline int flb_log_enqueue_signal(struct flb_log *context, sizeof(signal_value)); if (result <= 0) { - flb_errno(); + flb_pipe_error(); result = 1; } @@ -121,8 +126,6 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log) bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message)); if (bytes <= 0) { - flb_errno(); - return -1; } if (msg.size > sizeof(msg.msg)) { @@ -742,8 +745,20 @@ int flb_errno_print(int errnum, const char *file, int line) strerror_r(errnum, buf, sizeof(buf) - 1); flb_error("[%s:%i errno=%i] %s", file, line, errnum, buf); - return 0; + return errnum; +} + +#ifdef WIN32 +int flb_wsa_get_last_error_print(int errnum, const char *file, int line) +{ + char buf[256]; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, errnum, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + buf, sizeof(buf), NULL); + flb_error("[%s:%i WSAGetLastError=%i] %s", file, line, errnum, buf); + return errnum; } +#endif int flb_log_destroy(struct flb_log *log, struct flb_config *config) { diff --git a/src/flb_notification.c b/src/flb_notification.c index b6e575c4974..6021f7d2326 100644 --- a/src/flb_notification.c +++ b/src/flb_notification.c @@ -275,7 +275,7 @@ int flb_notification_enqueue(int plugin_type, sizeof(void *)); if (result == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -291,7 +291,7 @@ int flb_notification_receive(flb_pipefd_t channel, result = flb_pipe_r(channel, notification, sizeof(struct flb_notification *)); if (result <= 0) { - flb_errno(); + flb_pipe_error(); return -1;; } diff --git a/src/flb_output.c b/src/flb_output.c index 288fc9dbb1f..cb308b4670c 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -348,7 +348,7 @@ int flb_output_task_flush(struct flb_task *task, ret = flb_pipe_w(config->ch_self_events[1], &out_flush, sizeof(struct flb_output_flush*)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); flb_output_flush_destroy(out_flush); flb_task_users_dec(task, FLB_FALSE); diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 7e094551b62..bb146665b0c 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -71,7 +71,7 @@ static inline int handle_output_event(struct flb_config *config, bytes = flb_pipe_r(fd, &val, sizeof(val)); if (bytes == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -97,7 +97,7 @@ static inline int handle_output_event(struct flb_config *config, */ ret = flb_pipe_w(ch_parent, &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -289,7 +289,7 @@ static void output_thread(void *data) /* Read the task reference */ n = flb_pipe_r(event->fd, &task, sizeof(struct flb_task *)); if (n <= 0) { - flb_errno(); + flb_pipe_error(); continue; } /* @@ -450,7 +450,7 @@ int flb_output_thread_pool_flush(struct flb_task *task, n = flb_pipe_w(th_ins->ch_parent_events[1], &task, sizeof(struct flb_task*)); if (n == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -631,7 +631,7 @@ void flb_output_thread_pool_destroy(struct flb_output_instance *ins) th_ins = th->params.data; n = flb_pipe_w(th_ins->ch_parent_events[1], &stop, sizeof(stop)); if (n < 0) { - flb_errno(); + flb_pipe_error(); flb_plg_error(th_ins->ins, "could not signal worker thread"); flb_free(th_ins); continue; diff --git a/src/flb_pipe.c b/src/flb_pipe.c index 4175a24dfc6..b1203852909 100644 --- a/src/flb_pipe.c +++ b/src/flb_pipe.c @@ -96,7 +96,7 @@ void flb_pipe_destroy(flb_pipefd_t pipefd[2]) int flb_pipe_close(flb_pipefd_t fd) { - /* + /* * when chunk file is destroyed, the fd for file will be -1, we should avoid * deleting chunk file with fd -1 */ @@ -140,7 +140,7 @@ ssize_t flb_pipe_read_all(int fd, void *buf, size_t count) } else if (bytes == 0) { /* Broken pipe ? */ - flb_errno(); + flb_pipe_error(); return -1; } total += bytes; @@ -172,7 +172,7 @@ ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count) } else if (bytes == 0) { /* Broken pipe ? */ - flb_errno(); + flb_pipe_error(); return -1; } total += bytes; @@ -180,4 +180,4 @@ ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count) } while (total < count); return total; -} +} \ No newline at end of file diff --git a/src/flb_utils.c b/src/flb_utils.c index 55b5bf6cae8..1ad48bee872 100644 --- a/src/flb_utils.c +++ b/src/flb_utils.c @@ -496,7 +496,7 @@ int flb_utils_timer_consume(flb_pipefd_t fd) ret = flb_pipe_r(fd, &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } @@ -517,7 +517,7 @@ int flb_utils_pipe_byte_consume(flb_pipefd_t fd) ret = flb_pipe_r(fd, &val, sizeof(val)); if (ret == -1) { - flb_errno(); + flb_pipe_error(); return -1; } diff --git a/x.conf b/x.conf new file mode 100644 index 00000000000..09fe4092f7f --- /dev/null +++ b/x.conf @@ -0,0 +1,6 @@ +[INPUT] + Name dummy + +[OUTPUT] + Name es + Match *