diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 064415e807a..9f4518de28b 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -106,7 +106,7 @@ int static do_in_emitter_add_record(struct em_chunk *ec, ec->tag, flb_sds_len(ec->tag), ec->mp_sbuf.data, ec->mp_sbuf.size); - if (ret == -1) { + if (ret < 0) { flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag); /* Release the echunk */ em_chunk_destroy(ec); diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index be5cc0467cc..796d732ae1c 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, return 0; } -struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, - const char *tag, int tag_len) +static inline void errnum_set(int *errnum, int error) +{ + if (errnum) { + *errnum = error; + } +} + +static inline void errnum_set_from_errno(int *errnum) +{ + if (errno) { + errnum_set(errnum, errno); + } +} + +static inline void errnum_set_cio(int *errnum, int cio_err) +{ + switch (cio_err) { + case CIO_OK: + errnum_set(errnum, 0); + break; + case CIO_CORRUPTED: + errnum_set(errnum, EIO); + break; + case CIO_RETRY: + errnum_set(errnum, EAGAIN); + break; + default: + case CIO_ERROR: + errnum_set(errnum, EINVAL); + break; + } +} + +static struct flb_input_chunk *input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len, int *errnum) { int ret; int err; @@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (!chunk) { flb_error("[input chunk] could not create chunk file: %s:%s", storage->stream->name, name); + errnum_set_cio(errnum, err); return NULL; } /* @@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (ret == CIO_FALSE) { ret = cio_chunk_up_force(chunk); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in /* Write chunk header */ ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in ic = flb_calloc(1, sizeof(struct flb_input_chunk)); if (!ic) { flb_errno(); + errnum_set_from_errno(errnum); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in return ic; } +struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len) +{ + return input_chunk_create(in, event_type, tag, tag_len, NULL); +} + int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, const char *tag_buf, int tag_len, int del) @@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, int event_type, const char *tag, int tag_len, - size_t chunk_size, int *set_down) + size_t chunk_size, int *set_down, + int *errnum) { int id = -1; int ret; @@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, /* No chunk was found, we need to create a new one */ if (!ic) { - ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len); + ic = input_chunk_create(in, event_type, (char *) tag, tag_len, errnum); new_chunk = FLB_TRUE; if (!ic) { return NULL; @@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } + /* Set the error no ENOSPC so the caller knows we have hit a storage limit. */ + errnum_set(errnum, ENOSPC); return NULL; } @@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const void *buf, size_t buf_size) { int ret, total_records_start; + int err = 0; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (ret != 0) { /* we could not allocate the required space, just return */ - return -1; + return -ENOMEM; } } } @@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (flb_input_buf_paused(in) == FLB_TRUE) { flb_debug("[input chunk] %s is paused, cannot append records", flb_input_name(in)); - return -1; + return -EAGAIN; } if (buf_size == 0) { @@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, * Get a target input chunk, can be one with remaining space available * or a new one. */ - ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down); + ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down, &err); if (!ic) { flb_error("[input chunk] no available chunk"); - return -1; + if (err != 0) { + return -err; + } + /* fallback on returning errno if it is set. */ + else if (errno != 0) { + return -errno; + } + return -EIO; } /* newly created chunk */ @@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_is_up(ic); if (ret == FLB_FALSE) { ret = cio_chunk_up_force(ic->chunk); - if (ret == -1) { + if (ret <= CIO_ERROR) { flb_error("[input chunk] cannot retrieve temporary chunk"); - return -1; + switch (ret) { + case CIO_CORRUPTED: + return -EIO; + case CIO_RETRY: + return -EAGAIN; + case CIO_ERROR: + return -ENOMEM; + } + return -EINVAL; } set_down = FLB_TRUE; } @@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_write(ic, final_data_buffer, final_data_size); + err = errno; } else { ret = 0; @@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_error("[input chunk] error writing data from %s instance", flb_input_name(in)); cio_chunk_tx_rollback(ic->chunk); - - return -1; + if (err) { + return -err; + } + return -EIO; } /* get the chunks content size */