Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_http: improved http status codes #9920

Open
wants to merge 4 commits into
base: pwhelan-ret-values-input-chunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
ec->tag, flb_sds_len(ec->tag),
ec->mp_sbuf.data,
ec->mp_sbuf.size);
if (ret == -1) {
if (ret < 0) {
flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag);
/* Release the echunk */
em_chunk_destroy(ec);
Expand Down Expand Up @@ -237,6 +237,9 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in,
flb_sds_destroy(ec.tag);
msgpack_sbuffer_destroy(&ec.mp_sbuf);
}
if (ret < 0) {
return -1;
}
return ret;
}

Expand Down
81 changes: 63 additions & 18 deletions plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,25 +219,25 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm,

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return -1;
return ret;
}

ret = flb_log_event_encoder_set_timestamp(&ctx->log_encoder, tm);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return -1;
return ret;
}

ret = flb_log_event_encoder_set_body_from_msgpack_object(
&ctx->log_encoder,
record);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return -1;
return ret;
}

ret = flb_log_event_encoder_commit_record(&ctx->log_encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return -1;
return ret;
}

if (tag) {
Expand All @@ -254,11 +254,7 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm,
ctx->log_encoder.output_length);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
return -1;
}

return 0;
return ret;
}

int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size)
Expand Down Expand Up @@ -510,6 +506,32 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag,
return ret;
}

/// ERROR/HTTP: Missing Content TYpe
#define EHNOTYPE 5001
/// ERROR/HTTP: Invalid Content Type
#define EHINVTYPE 5002
/// ERROR/HTTP: Invalid Content Type
#define EHNOBODY 5003

static int send_response_error(struct flb_http *ctx, struct http_conn *conn, int error)
{
switch (error) {
case -ENOMEM:
// 500 ??
return send_response(conn, 429, "error: no memory available\n");
case -ENOSPC:
return send_response(conn, 429, "error: no storage available\n");
case -EIO:
return send_response(conn, 500, "error: I/O failure\n");
case -EAGAIN:
return send_response(conn, 429, "error: too many requests\n");
case -1:
return send_response(conn, 400, "error: invalid request\n");
}
flb_plg_debug(ctx->ins, "unknown error: %d", errno);
return send_response(conn, 400, "error: unknown error\n");
}

static int process_payload(struct flb_http *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
Expand Down Expand Up @@ -585,8 +607,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
request->data.len = original_data_size;
}


if (ret != 0) {
send_response(conn, 400, "error: invalid payload\n");
send_response_error(ctx, conn, ret);
return -1;
}

Expand Down Expand Up @@ -797,6 +820,31 @@ static int send_response_ng(struct flb_http_response *response,
return 0;
}

static int send_response_error_ng(struct flb_http *ctx, struct flb_http_response *response, int error)
{
switch (error) {
case -ENOMEM:
// 500 ??
return send_response_ng(response, 429, "error: no memory available\n");
case -ENOSPC:
return send_response_ng(response, 429, "error: no storage available\n");
case -EIO:
return send_response_ng(response, 500, "error: I/O failure\n");
case -EAGAIN:
return send_response_ng(response, 429, "error: too many requests\n");
case -EHNOTYPE:
return send_response_ng(response, 400, "error: header 'Content-Type' is not set\n");
case -EHINVTYPE:
return send_response_ng(response, 400, "error: invalid 'Content-Type'\n");
case -EHNOBODY:
return send_response_ng(response, 400, "eror: no payload found\n");
case -1:
return send_response_ng(response, 400, "error: invalid request\n");
}
flb_plg_debug(ctx->ins, "unknown error: %d", errno);
return send_response_ng(response, 400, "error: unknown error\n");
}

static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size)
{
int ret;
Expand Down Expand Up @@ -891,7 +939,7 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_
log_event_error:
flb_plg_error(ctx->ins, "Error encoding record : %d", ret);
msgpack_unpacked_destroy(&result);
return -1;
return ret;
}

static ssize_t parse_payload_json_ng(flb_sds_t tag,
Expand Down Expand Up @@ -946,8 +994,7 @@ static int process_payload_ng(flb_sds_t tag,
struct flb_http *ctx;

if (request->content_type == NULL) {
send_response_ng(response, 400, "error: header 'Content-Type' is not set\n");
return -1;
return -EHNOTYPE;
}

if (strcasecmp(request->content_type, "application/json") == 0) {
Expand All @@ -959,14 +1006,12 @@ static int process_payload_ng(flb_sds_t tag,
}

if (type == -1) {
send_response_ng(response, 400, "error: invalid 'Content-Type'\n");
return -1;
return -EHINVTYPE;
}

if (request->body == NULL ||
cfl_sds_len(request->body) == 0) {
send_response_ng(response, 400, "error: no payload found\n");
return -1;
return -EHNOBODY;
}

if (type == HTTP_CONTENT_JSON) {
Expand Down Expand Up @@ -1042,7 +1087,7 @@ int http_prot_handle_ng(struct flb_http_request *request,
send_response_ng(response, ctx->successful_response_code, NULL);
}
else {
send_response_ng(response, 400, "error: unable to process records\n");
send_response_error_ng(ctx, response, ret);
}

return ret;
Expand Down
89 changes: 77 additions & 12 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type,
return 0;
}

struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len)
static inline void errnum_set(int *errnum, int error)
{
if (errnum) {
*errnum = error;
}
}

static inline void errnum_set_from_errno(int *errnum)
{
if (errno) {
errnum_set(errnum, errno);
}
}

static inline void errnum_set_cio(int *errnum, int cio_err)
{
switch (cio_err) {
case CIO_OK:
errnum_set(errnum, 0);
break;
case CIO_CORRUPTED:
errnum_set(errnum, EIO);
break;
case CIO_RETRY:
errnum_set(errnum, EAGAIN);
break;
default:
case CIO_ERROR:
errnum_set(errnum, EINVAL);
break;
}
}

static struct flb_input_chunk *input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len, int *errnum)
{
int ret;
int err;
Expand All @@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
if (!chunk) {
flb_error("[input chunk] could not create chunk file: %s:%s",
storage->stream->name, name);
errnum_set_cio(errnum, err);
return NULL;
}
/*
Expand All @@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
if (ret == CIO_FALSE) {
ret = cio_chunk_up_force(chunk);
if (ret == -1) {
errnum_set(errnum, EIO);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand All @@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
/* Write chunk header */
ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len);
if (ret == -1) {
errnum_set(errnum, EIO);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand All @@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
ic = flb_calloc(1, sizeof(struct flb_input_chunk));
if (!ic) {
flb_errno();
errnum_set_from_errno(errnum);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand Down Expand Up @@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
return ic;
}

struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len)
{
return input_chunk_create(in, event_type, tag, tag_len, NULL);
}

int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,
const char *tag_buf, int tag_len,
int del)
Expand Down Expand Up @@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
int event_type,
const char *tag, int tag_len,
size_t chunk_size, int *set_down)
size_t chunk_size, int *set_down,
int *errnum)
{
int id = -1;
int ret;
Expand Down Expand Up @@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,

/* No chunk was found, we need to create a new one */
if (!ic) {
ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len);
ic = input_chunk_create(in, event_type, (char *) tag, tag_len, errnum);
new_chunk = FLB_TRUE;
if (!ic) {
return NULL;
Expand All @@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
flb_input_chunk_destroy(ic, FLB_TRUE);
}
/* Set the error no ENOSPC so the caller knows we have hit a storage limit. */
errnum_set(errnum, ENOSPC);
return NULL;
}

Expand Down Expand Up @@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
const void *buf, size_t buf_size)
{
int ret, total_records_start;
int err = 0;
int set_down = FLB_FALSE;
int min;
int new_chunk = FLB_FALSE;
Expand Down Expand Up @@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,

if (ret != 0) {
/* we could not allocate the required space, just return */
return -1;
return -ENOMEM;
}
}
}
Expand All @@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
if (flb_input_buf_paused(in) == FLB_TRUE) {
flb_debug("[input chunk] %s is paused, cannot append records",
in->name);
return -1;
return -EAGAIN;
}

if (buf_size == 0) {
Expand All @@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
* Get a target input chunk, can be one with remaining space available
* or a new one.
*/
ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down);
ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down, &err);
if (!ic) {
flb_error("[input chunk] no available chunk");
return -1;
if (err != 0) {
return -err;
}
/* fallback on returning errno if it is set. */
else if (errno != 0) {
return -errno;
}
return -EIO;
}

/* newly created chunk */
Expand All @@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
ret = flb_input_chunk_is_up(ic);
if (ret == FLB_FALSE) {
ret = cio_chunk_up_force(ic->chunk);
if (ret == -1) {
if (ret <= CIO_ERROR) {
flb_error("[input chunk] cannot retrieve temporary chunk");
return -1;
switch (ret) {
case CIO_CORRUPTED:
return -EIO;
case CIO_RETRY:
return -EAGAIN;
case CIO_ERROR:
return -ENOMEM;
}
return -EINVAL;
}
set_down = FLB_TRUE;
}
Expand Down Expand Up @@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
ret = flb_input_chunk_write(ic,
final_data_buffer,
final_data_size);
err = errno;
}
else {
ret = 0;
Expand All @@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_error("[input chunk] error writing data from %s instance",
in->name);
cio_chunk_tx_rollback(ic->chunk);

return -1;
if (err) {
return -err;
}
return -EIO;
}

/* get the chunks content size */
Expand Down
Loading