From 4954c950954d8356b7a547f90d5530535dc4490b Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> Date: Wed, 5 Feb 2025 17:33:36 -0300 Subject: [PATCH 1/4] input_chunk: return errors as negative values with errors based on errno. Not all errors will be the same es the errno values and this code does not set the actuall errno variable. Signed-off-by: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> --- src/flb_input_chunk.c | 89 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index e2527624f72..7bf7f7048c0 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, return 0; } -struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, - const char *tag, int tag_len) +static inline void errnum_set(int *errnum, int error) +{ + if (errnum) { + *errnum = error; + } +} + +static inline void errnum_set_from_errno(int *errnum) +{ + if (errno) { + errnum_set(errnum, errno); + } +} + +static inline void errnum_set_cio(int *errnum, int cio_err) +{ + switch (cio_err) { + case CIO_OK: + errnum_set(errnum, 0); + break; + case CIO_CORRUPTED: + errnum_set(errnum, EIO); + break; + case CIO_RETRY: + errnum_set(errnum, EAGAIN); + break; + default: + case CIO_ERROR: + errnum_set(errnum, EINVAL); + break; + } +} + +static struct flb_input_chunk *input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len, int *errnum) { int ret; int err; @@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (!chunk) { flb_error("[input chunk] could not create chunk file: %s:%s", storage->stream->name, name); + errnum_set_cio(errnum, err); return NULL; } /* @@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (ret == CIO_FALSE) { ret = cio_chunk_up_force(chunk); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in /* Write chunk header */ ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in ic = flb_calloc(1, sizeof(struct flb_input_chunk)); if (!ic) { flb_errno(); + errnum_set_from_errno(errnum); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in return ic; } +struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len) +{ + return input_chunk_create(in, event_type, tag, tag_len, NULL); +} + int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, const char *tag_buf, int tag_len, int del) @@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, int event_type, const char *tag, int tag_len, - size_t chunk_size, int *set_down) + size_t chunk_size, int *set_down, + int *errnum) { int id = -1; int ret; @@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, /* No chunk was found, we need to create a new one */ if (!ic) { - ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len); + ic = input_chunk_create(in, event_type, (char *) tag, tag_len, errnum); new_chunk = FLB_TRUE; if (!ic) { return NULL; @@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } + /* Set the error no ENOSPC so the caller knows we have hit a storage limit. */ + errnum_set(errnum, ENOSPC); return NULL; } @@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const void *buf, size_t buf_size) { int ret, total_records_start; + int err = 0; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (ret != 0) { /* we could not allocate the required space, just return */ - return -1; + return -ENOMEM; } } } @@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (flb_input_buf_paused(in) == FLB_TRUE) { flb_debug("[input chunk] %s is paused, cannot append records", in->name); - return -1; + return -EAGAIN; } if (buf_size == 0) { @@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, * Get a target input chunk, can be one with remaining space available * or a new one. */ - ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down); + ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down, &err); if (!ic) { flb_error("[input chunk] no available chunk"); - return -1; + if (err != 0) { + return -err; + } + /* fallback on returning errno if it is set. */ + else if (errno != 0) { + return -errno; + } + return -EIO; } /* newly created chunk */ @@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_is_up(ic); if (ret == FLB_FALSE) { ret = cio_chunk_up_force(ic->chunk); - if (ret == -1) { + if (ret <= CIO_ERROR) { flb_error("[input chunk] cannot retrieve temporary chunk"); - return -1; + switch (ret) { + case CIO_CORRUPTED: + return -EIO; + case CIO_RETRY: + return -EAGAIN; + case CIO_ERROR: + return -ENOMEM; + } + return -EINVAL; } set_down = FLB_TRUE; } @@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_write(ic, final_data_buffer, final_data_size); + err = errno; } else { ret = 0; @@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_error("[input chunk] error writing data from %s instance", in->name); cio_chunk_tx_rollback(ic->chunk); - - return -1; + if (err) { + return -err; + } + return -EIO; } /* get the chunks content size */ From abf0dc21f37737b1f926ef3f3dbb3ef44bd7082d Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> Date: Wed, 5 Feb 2025 17:42:37 -0300 Subject: [PATCH 2/4] in_emitter: fix do_in_emitter_add_record after refactor of input chunk return codes. Signed-off-by: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> --- plugins/in_emitter/emitter.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 064415e807a..a77c93fd996 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -106,7 +106,7 @@ int static do_in_emitter_add_record(struct em_chunk *ec, ec->tag, flb_sds_len(ec->tag), ec->mp_sbuf.data, ec->mp_sbuf.size); - if (ret == -1) { + if (ret < 0) { flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag); /* Release the echunk */ em_chunk_destroy(ec); @@ -237,6 +237,9 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, flb_sds_destroy(ec.tag); msgpack_sbuffer_destroy(&ec.mp_sbuf); } + if (ret < 0) { + return -1; + } return ret; } From 0eeff10c478178a5388b901573cbec70c0ede9b0 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> Date: Wed, 5 Feb 2025 17:54:39 -0300 Subject: [PATCH 3/4] in_http: use return value from input chunks to return specific relevant http response codes. Signed-off-by: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> --- plugins/in_http/http_prot.c | 81 ++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index 4e2aa6a761c..0ebe49e84d9 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -219,25 +219,25 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_set_timestamp(&ctx->log_encoder, tm); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_set_body_from_msgpack_object( &ctx->log_encoder, record); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } if (tag) { @@ -254,11 +254,7 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, ctx->log_encoder.output_length); } - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; - } - - return 0; + return ret; } int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) @@ -510,6 +506,32 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, return ret; } +/// ERROR/HTTP: Missing Content TYpe +#define EHNOTYPE 5001 +/// ERROR/HTTP: Invalid Content Type +#define EHINVTYPE 5002 +/// ERROR/HTTP: Invalid Content Type +#define EHNOBODY 5003 + +static int send_response_error(struct flb_http *ctx, struct http_conn *conn, int error) +{ + switch (error) { + case -ENOMEM: + // 500 ?? + return send_response(conn, 429, "error: no memory available\n"); + case -ENOSPC: + return send_response(conn, 429, "error: no storage available\n"); + case -EIO: + return send_response(conn, 500, "error: I/O failure\n"); + case -EAGAIN: + return send_response(conn, 429, "error: too many requests\n"); + case -1: + return send_response(conn, 400, "error: invalid request\n"); + } + flb_plg_debug(ctx->ins, "unknown error: %d", errno); + return send_response(conn, 400, "error: unknown error\n"); +} + static int process_payload(struct flb_http *ctx, struct http_conn *conn, flb_sds_t tag, struct mk_http_session *session, @@ -585,8 +607,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, request->data.len = original_data_size; } + if (ret != 0) { - send_response(conn, 400, "error: invalid payload\n"); + send_response_error(ctx, conn, ret); return -1; } @@ -797,6 +820,31 @@ static int send_response_ng(struct flb_http_response *response, return 0; } +static int send_response_error_ng(struct flb_http *ctx, struct flb_http_response *response, int error) +{ + switch (error) { + case -ENOMEM: + // 500 ?? + return send_response_ng(response, 429, "error: no memory available\n"); + case -ENOSPC: + return send_response_ng(response, 429, "error: no storage available\n"); + case -EIO: + return send_response_ng(response, 500, "error: I/O failure\n"); + case -EAGAIN: + return send_response_ng(response, 429, "error: too many requests\n"); + case -EHNOTYPE: + return send_response_ng(response, 400, "error: header 'Content-Type' is not set\n"); + case -EHINVTYPE: + return send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); + case -EHNOBODY: + return send_response_ng(response, 400, "eror: no payload found\n"); + case -1: + return send_response_ng(response, 400, "error: invalid request\n"); + } + flb_plg_debug(ctx->ins, "unknown error: %d", errno); + return send_response_ng(response, 400, "error: unknown error\n"); +} + static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) { int ret; @@ -891,7 +939,7 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ log_event_error: flb_plg_error(ctx->ins, "Error encoding record : %d", ret); msgpack_unpacked_destroy(&result); - return -1; + return ret; } static ssize_t parse_payload_json_ng(flb_sds_t tag, @@ -946,8 +994,7 @@ static int process_payload_ng(flb_sds_t tag, struct flb_http *ctx; if (request->content_type == NULL) { - send_response_ng(response, 400, "error: header 'Content-Type' is not set\n"); - return -1; + return -EHNOTYPE; } if (strcasecmp(request->content_type, "application/json") == 0) { @@ -959,14 +1006,12 @@ static int process_payload_ng(flb_sds_t tag, } if (type == -1) { - send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); - return -1; + return -EHINVTYPE; } if (request->body == NULL || cfl_sds_len(request->body) == 0) { - send_response_ng(response, 400, "error: no payload found\n"); - return -1; + return -EHNOBODY; } if (type == HTTP_CONTENT_JSON) { @@ -1042,7 +1087,7 @@ int http_prot_handle_ng(struct flb_http_request *request, send_response_ng(response, ctx->successful_response_code, NULL); } else { - send_response_ng(response, 400, "error: unable to process records\n"); + send_response_error_ng(ctx, response, ret); } return ret; From 98fbe28725048e9e1fa2d99398fc2ebffd607105 Mon Sep 17 00:00:00 2001 From: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> Date: Wed, 5 Feb 2025 17:55:27 -0300 Subject: [PATCH 4/4] tests: in_http: test http status response codes for new http status responses. Signed-off-by: Phillip Adair Stewart Whelan <phillip.whelan@chronosphere.io> --- tests/runtime/in_http.c | 477 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 477 insertions(+) diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 66ddaea5230..2960a688675 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -99,6 +99,13 @@ static int cb_check_result_json(void *record, size_t size, void *data) return 0; } +/* Callback to check expected results */ +static int cb_check_result_none(void *record, size_t size, void *data) +{ + flb_free(record); + return 0; +} + struct http_client_ctx* http_client_ctx_create() { struct http_client_ctx *ret_ctx = NULL; @@ -588,6 +595,470 @@ void flb_test_http_failure_400_bad_disk_write() test_ctx_destroy(ctx); } +void flb_test_http_failure_400_empty_body() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = ""; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_paused() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + struct mk_list *cur; + struct flb_input_instance *i_ins; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + // how can we pause the input buffers? + mk_list_foreach(cur, &ctx->flb->config->inputs) { + i_ins = mk_list_entry(cur, struct flb_input_instance, _head); + i_ins->mem_buf_status = FLB_INPUT_PAUSED; + i_ins->storage_buf_status = FLB_INPUT_PAUSED; + } + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_enospc() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c = NULL; + int ret; + int try; + size_t b_sent; + + char *buf = "{\"foo\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_none; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_service_set(ctx->flb, + "storage.path", "/tmp/http-input-test-429-enospc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "storage.type", "filesystem", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + "storage.total_limit_size", "1K", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + for (try = 0; try < 16384; try++) { + if (c != NULL) { + flb_http_client_destroy(c); + } + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + if (c->resp.status == 429) { + if (memcmp(c->resp.payload, + "error: no storage available\n", + strlen("error: no storage available\n")) == 0) { + break; + } + } + + if (!(try % 128)) { + flb_time_msleep(1); + } + } + + if (TEST_CHECK((c->resp.payload != NULL && c->resp.status != 0))) { + if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + if (!TEST_CHECK(memcmp(c->resp.payload, + "error: no storage available\n", + strlen("error: no storage available\n")) == 0)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + } + else { + TEST_MSG("no response to check"); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_400_notype() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_400_invtype() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + "application/x-unknown", strlen("application/x-unknown")); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_enomem() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c = NULL; + int ret; + int try; + size_t b_sent; + + char *buf = "{\"foo\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_none; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* + flb_service_set(ctx->flb, + "Flush", "5", + NULL); + */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "storage.type", "memrb", + "mem_buf_limit", "10", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + for (try = 0; try < 16384; try++) { + if (c != NULL) { + flb_http_client_destroy(c); + } + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + if (c->resp.status == 429) { + if (memcmp(c->resp.payload, + "error: no memory available\n", + strlen("error: no memory available\n")) == 0) { + break; + } + } + + if (!(try % 128)) { + flb_time_msleep(1); + } + } + + if (TEST_CHECK((c->resp.payload != NULL && c->resp.status != 0))) { + if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + if (!TEST_CHECK(memcmp(c->resp.payload, + "error: no memory available\n", + strlen("error: no memory available\n")) == 0)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + } + else { + TEST_MSG("no response to check"); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void test_http_tag_key(char *input) { struct flb_lib_out_cb cb_data; @@ -677,6 +1148,12 @@ TEST_LIST = { {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json}, {"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write}, + {"failure_response_code_400_empty_body", flb_test_http_failure_400_empty_body}, + {"failure_response_code_429_paused", flb_test_http_failure_429_paused}, + {"failure_response_code_429_enomem", flb_test_http_failure_429_enomem}, + {"failure_response_code_429_enospc", flb_test_http_failure_429_enospc}, + {"failure_response_code_400_notype", flb_test_http_failure_400_notype}, + {"failure_response_code_400_invtype", flb_test_http_failure_400_invtype}, {"tag_key_with_map_input", flb_test_http_tag_key_with_map_input}, {"tag_key_with_array_input", flb_test_http_tag_key_with_array_input}, {NULL, NULL}