Skip to content

Commit 864216c

Browse files
committed
pipe: add flb_pipe_error
On Windows, the `flb_pipe_r` and `flb_pipe_w` macros do not set errno on failure, meaning calling `flb_errno` in error scenarios is insufficient. This PR adds a new macro that will check the correct place, `WSAGetLastError`, and output a similar error message. On Linux this will still be `flb_errno`, meaning messages should work the same as they always did, but now on Windows we will get actual error messages. Signed-off-by: braydonk <[email protected]>
1 parent 38ddbef commit 864216c

20 files changed

+72
-45
lines changed

include/fluent-bit/flb_input.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ static FLB_INLINE void flb_input_return(struct flb_coro *coro) {
671671
val = FLB_BITS_U64_SET(FLB_ENGINE_IN_CORO, ins->id);
672672
n = flb_pipe_w(ins->ch_events[1], (void *) &val, sizeof(val));
673673
if (n == -1) {
674-
flb_errno();
674+
flb_pipe_error();
675675
}
676676

677677
flb_input_coro_prepare_destroy(input_coro);

include/fluent-bit/flb_log.h

+7
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,18 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char *
232232
int flb_log_worker_init(struct flb_worker *worker);
233233
int flb_log_worker_destroy(struct flb_worker *worker);
234234
int flb_errno_print(int errnum, const char *file, int line);
235+
int flb_WSAGetLastError_print(int errnum, const char *file, int line);
235236

236237
#ifdef __FLB_FILENAME__
237238
#define flb_errno() flb_errno_print(errno, __FLB_FILENAME__, __LINE__)
239+
#ifdef WIN32
240+
#define flb_WSAGetLastError() flb_WSAGetLastError_print(WSAGetLastError(), __FLB_FILENAME__, __LINE__)
241+
#endif
238242
#else
239243
#define flb_errno() flb_errno_print(errno, __FILE__, __LINE__)
244+
#ifdef WIN32
245+
#define flb_WSAGetLastError() flb_WSAGetLastError_print(WSAGetLastError(), __FILE__, __LINE__)
246+
#endif
240247
#endif
241248

242249
#endif

include/fluent-bit/flb_output.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
11991199
/* Notify the event loop about our return status */
12001200
n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val));
12011201
if (n == -1) {
1202-
flb_errno();
1202+
flb_pipe_error();
12031203
}
12041204

12051205
/*

include/fluent-bit/flb_pipe.h

+3
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
#define flb_sockfd_t evutil_socket_t
2929
#define flb_pipe_w(fd, buf, len) send(fd, buf, len, 0)
3030
#define flb_pipe_r(fd, buf, len) recv(fd, buf, len, 0)
31+
#define flb_pipe_error() flb_WSAGetLastError()
3132
#define FLB_PIPE_WOULDBLOCK() (WSAGetLastError() == WSAEWOULDBLOCK)
3233
#else
3334
#define flb_pipefd_t int
3435
#define flb_sockfd_t int
3536
#define flb_pipe_w(fd, buf, len) write(fd, buf, len)
3637
#define flb_pipe_r(fd, buf, len) read(fd, buf, len)
38+
#define flb_pipe_error() flb_errno()
3739
#define FLB_PIPE_WOULDBLOCK() (errno == EAGAIN || errno == EWOULDBLOCK)
3840
#endif
3941

@@ -43,5 +45,6 @@ int flb_pipe_close(flb_pipefd_t fd);
4345
int flb_pipe_set_nonblocking(flb_pipefd_t fd);
4446
ssize_t flb_pipe_read_all(int fd, void *buf, size_t count);
4547
ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count);
48+
void flb_pipe_log_last_error();
4649

4750
#endif

include/fluent-bit/flb_scheduler.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ static FLB_INLINE void flb_sched_timer_cb_coro_return()
234234
val = FLB_BITS_U64_SET(FLB_SCHED_TIMER_CORO_RETURN, stc->id);
235235
n = flb_pipe_w(sched->ch_events[1], &val, sizeof(val));
236236
if (n == -1) {
237-
flb_errno();
237+
flb_pipe_error();
238238
}
239239

240240
flb_coro_yield(coro, FLB_TRUE);

plugins/in_exec/in_exec.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ static int in_exec_collect(struct flb_input_instance *ins,
5454
if (ctx->oneshot == FLB_TRUE) {
5555
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
5656
if (ret == -1) {
57-
flb_errno();
57+
flb_pipe_error();
5858
return -1;
5959
}
6060
}
@@ -256,7 +256,7 @@ static int in_exec_config_read(struct flb_exec *ctx,
256256
flb_plg_error(in, "unable to load configuration");
257257
return -1;
258258
}
259-
259+
260260
/* filepath setting */
261261
if (ctx->cmd == NULL) {
262262
flb_plg_error(in, "no input 'command' was given");
@@ -418,7 +418,7 @@ static int in_exec_prerun(struct flb_input_instance *ins,
418418
/* Kick the oneshot execution */
419419
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
420420
if (ret == -1) {
421-
flb_errno();
421+
flb_pipe_error();
422422
return -1;
423423
}
424424
return 0;

plugins/in_exec_wasi/in_exec_wasi.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ static int in_exec_wasi_collect(struct flb_input_instance *ins,
6969
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
7070
if (ret == -1) {
7171
fclose(stdoutp);
72-
flb_errno();
72+
flb_pipe_error();
7373
return -1;
7474
}
7575
}
@@ -404,7 +404,7 @@ static int in_exec_wasi_prerun(struct flb_input_instance *ins,
404404
/* Kick the oneshot execution */
405405
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
406406
if (ret == -1) {
407-
flb_errno();
407+
flb_pipe_error();
408408
return -1;
409409
}
410410
return 0;

plugins/in_lib/in_lib.c

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static int in_lib_collect(struct flb_input_instance *ins,
6969
flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes);
7070
if (bytes == -1) {
7171
perror("read");
72+
flb_pipe_error();
7273
if (errno == -EPIPE) {
7374
return -1;
7475
}

plugins/in_tail/tail.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static inline int consume_byte(flb_pipefd_t fd)
5050
/* We need to consume the byte */
5151
ret = flb_pipe_r(fd, (char *) &val, sizeof(val));
5252
if (ret <= 0) {
53-
flb_errno();
53+
flb_pipe_error();
5454
return -1;
5555
}
5656

plugins/in_tail/tail_signal.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ static inline int tail_signal_manager(struct flb_tail_config *ctx)
4545
/* Insert a dummy event into the channel manager */
4646
n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val));
4747
if (n == -1) {
48-
flb_errno();
48+
flb_pipe_error();
4949
return -1;
5050
}
5151
else {
@@ -68,7 +68,7 @@ static inline int tail_signal_pending(struct flb_tail_config *ctx)
6868
* notification is already pending, it's safe to ignore.
6969
*/
7070
if (n == -1 && !FLB_PIPE_WOULDBLOCK()) {
71-
flb_errno();
71+
flb_pipe_error();
7272
return -1;
7373
}
7474

@@ -87,7 +87,7 @@ static inline int tail_consume_pending(struct flb_tail_config *ctx)
8787
do {
8888
ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val));
8989
if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) {
90-
flb_errno();
90+
flb_pipe_error();
9191
return -1;
9292
}
9393
} while (!FLB_PIPE_WOULDBLOCK());

src/aws/flb_aws_credentials_process.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf)
369369
if (FLB_PIPE_WOULDBLOCK()) {
370370
return 1;
371371
}
372-
flb_errno();
372+
flb_pipe_error();
373373
return -1;
374374
}
375375
else if (result == 0) { /* EOF */
@@ -481,7 +481,7 @@ static void exec_process_child(struct process* p)
481481
{
482482
while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) {
483483
if (errno != EINTR) {
484-
return;
484+
return;
485485
}
486486
}
487487
while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) {
@@ -491,7 +491,7 @@ static void exec_process_child(struct process* p)
491491
}
492492
while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) {
493493
if (errno != EINTR) {
494-
return;
494+
return;
495495
}
496496
}
497497

@@ -558,7 +558,7 @@ static int read_from_process(struct process* p, struct readbuf* buf)
558558
return -1;
559559
}
560560

561-
flb_time_set(&timeout,
561+
flb_time_set(&timeout,
562562
(time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC),
563563
((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS);
564564

src/flb_engine.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ static inline int handle_input_event(flb_pipefd_t fd, uint64_t ts,
190190

191191
bytes = flb_pipe_r(fd, &val, sizeof(val));
192192
if (bytes == -1) {
193-
flb_errno();
193+
flb_pipe_error();
194194
return -1;
195195
}
196196

@@ -484,7 +484,7 @@ static inline int handle_output_events(flb_pipefd_t fd,
484484
bytes = flb_pipe_r(fd, &values, sizeof(values));
485485

486486
if (bytes == -1) {
487-
flb_errno();
487+
flb_pipe_error();
488488
return -1;
489489
}
490490

@@ -525,7 +525,7 @@ static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config)
525525
/* read the event */
526526
bytes = flb_pipe_r(fd, &val, sizeof(val));
527527
if (bytes == -1) {
528-
flb_errno();
528+
flb_pipe_error();
529529
return -1;
530530
}
531531

@@ -1049,7 +1049,7 @@ int flb_engine_start(struct flb_config *config)
10491049
/* Read the coroutine reference */
10501050
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
10511051
if (ret <= 0 || output_flush == 0) {
1052-
flb_errno();
1052+
flb_pipe_error();
10531053
continue;
10541054
}
10551055

src/flb_input_thread.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ static inline int handle_input_thread_event(flb_pipefd_t fd, struct flb_config *
105105

106106
bytes = flb_pipe_r(fd, &val, sizeof(val));
107107
if (bytes == -1) {
108-
flb_errno();
108+
flb_pipe_error();
109109
return -1;
110110
}
111111

@@ -426,7 +426,7 @@ static void input_thread(void *data)
426426
/* Read the coroutine reference */
427427
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
428428
if (ret <= 0 || output_flush == 0) {
429-
flb_errno();
429+
flb_pipe_error();
430430
continue;
431431
}
432432

@@ -518,7 +518,7 @@ int flb_input_thread_instance_pause(struct flb_input_instance *ins)
518518

519519
ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
520520
if (ret <= 0) {
521-
flb_errno();
521+
flb_pipe_error();
522522
return -1;
523523
}
524524

@@ -543,7 +543,7 @@ int flb_input_thread_instance_resume(struct flb_input_instance *ins)
543543

544544
ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
545545
if (ret <= 0) {
546-
flb_errno();
546+
flb_pipe_error();
547547
return -1;
548548
}
549549

@@ -565,7 +565,7 @@ int flb_input_thread_instance_exit(struct flb_input_instance *ins)
565565

566566
ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
567567
if (ret <= 0) {
568-
flb_errno();
568+
flb_pipe_error();
569569
return -1;
570570
}
571571

@@ -731,7 +731,7 @@ int flb_input_thread_collectors_signal_start(struct flb_input_instance *ins)
731731

732732
ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(uint64_t));
733733
if (ret <= 0) {
734-
flb_errno();
734+
flb_pipe_error();
735735
return -1;
736736
}
737737

@@ -749,7 +749,7 @@ int flb_input_thread_collectors_signal_wait(struct flb_input_instance *ins)
749749
thi = ins->thi;
750750
bytes = flb_pipe_r(thi->ch_parent_events[0], &val, sizeof(uint64_t));
751751
if (bytes <= 0) {
752-
flb_errno();
752+
flb_pipe_error();
753753
return -1;
754754
}
755755

src/flb_lib.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -793,7 +793,7 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
793793
else {
794794
ret = flb_pipe_w(i_ins->channel[1], data, len);
795795
if (ret == -1) {
796-
flb_errno();
796+
flb_pipe_error();
797797
return -1;
798798
}
799799
}

src/flb_log.c

+19-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
#include <fluent-bit/flb_worker.h>
3434
#include <fluent-bit/flb_mem.h>
3535

36+
#ifdef WIN32
37+
#include <winsock.h>
38+
#include <winbase.h>
39+
#endif
40+
3641
#ifdef FLB_HAVE_AWS_ERROR_REPORTER
3742
#include <fluent-bit/aws/flb_aws_error_reporter.h>
3843

@@ -57,7 +62,7 @@ static inline int64_t flb_log_consume_signal(struct flb_log *context)
5762
sizeof(signal_value));
5863

5964
if (result <= 0) {
60-
flb_errno();
65+
flb_pipe_error();
6166

6267
return -1;
6368
}
@@ -75,7 +80,7 @@ static inline int flb_log_enqueue_signal(struct flb_log *context,
7580
sizeof(signal_value));
7681

7782
if (result <= 0) {
78-
flb_errno();
83+
flb_pipe_error();
7984

8085
result = 1;
8186
}
@@ -121,7 +126,7 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log)
121126
bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message));
122127

123128
if (bytes <= 0) {
124-
flb_errno();
129+
flb_pipe_error();
125130

126131
return -1;
127132
}
@@ -745,6 +750,17 @@ int flb_errno_print(int errnum, const char *file, int line)
745750
return 0;
746751
}
747752

753+
int flb_WSAGetLastError_print(int errnum, const char *file, int line)
754+
{
755+
#ifdef WIN32
756+
char buf[256];
757+
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
758+
NULL, errnum, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
759+
buf, sizeof(buf), NULL);
760+
flb_error("[%s:%i WSAGetLastError=%i] %s", file, line, errnum, buf);
761+
#endif
762+
}
763+
748764
int flb_log_destroy(struct flb_log *log, struct flb_config *config)
749765
{
750766
/* Signal the child worker, stop working */

src/flb_notification.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ int flb_notification_enqueue(int plugin_type,
275275
sizeof(void *));
276276

277277
if (result == -1) {
278-
flb_errno();
278+
flb_pipe_error();
279279

280280
return -1;
281281
}
@@ -291,7 +291,7 @@ int flb_notification_receive(flb_pipefd_t channel,
291291
result = flb_pipe_r(channel, notification, sizeof(struct flb_notification *));
292292

293293
if (result <= 0) {
294-
flb_errno();
294+
flb_pipe_error();
295295
return -1;;
296296
}
297297

src/flb_output.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ int flb_output_task_flush(struct flb_task *task,
348348
ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
349349
sizeof(struct flb_output_flush*));
350350
if (ret == -1) {
351-
flb_errno();
351+
flb_pipe_error();
352352
flb_output_flush_destroy(out_flush);
353353
flb_task_users_dec(task, FLB_FALSE);
354354

0 commit comments

Comments
 (0)