diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 064415e807a..a77c93fd996 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -106,7 +106,7 @@ int static do_in_emitter_add_record(struct em_chunk *ec, ec->tag, flb_sds_len(ec->tag), ec->mp_sbuf.data, ec->mp_sbuf.size); - if (ret == -1) { + if (ret < 0) { flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag); /* Release the echunk */ em_chunk_destroy(ec); @@ -237,6 +237,9 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, flb_sds_destroy(ec.tag); msgpack_sbuffer_destroy(&ec.mp_sbuf); } + if (ret < 0) { + return -1; + } return ret; } diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index 4e2aa6a761c..0ebe49e84d9 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -219,25 +219,25 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_set_timestamp(&ctx->log_encoder, tm); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_set_body_from_msgpack_object( &ctx->log_encoder, record); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; + return ret; } if (tag) { @@ -254,11 +254,7 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, ctx->log_encoder.output_length); } - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - return -1; - } - - return 0; + return ret; } int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) @@ -510,6 +506,32 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, return ret; } +/// ERROR/HTTP: Missing Content TYpe +#define EHNOTYPE 5001 +/// ERROR/HTTP: Invalid Content Type +#define EHINVTYPE 5002 +/// ERROR/HTTP: Invalid Content Type +#define EHNOBODY 5003 + +static int send_response_error(struct flb_http *ctx, struct http_conn *conn, int error) +{ + switch (error) { + case -ENOMEM: + // 500 ?? + return send_response(conn, 429, "error: no memory available\n"); + case -ENOSPC: + return send_response(conn, 429, "error: no storage available\n"); + case -EIO: + return send_response(conn, 500, "error: I/O failure\n"); + case -EAGAIN: + return send_response(conn, 429, "error: too many requests\n"); + case -1: + return send_response(conn, 400, "error: invalid request\n"); + } + flb_plg_debug(ctx->ins, "unknown error: %d", errno); + return send_response(conn, 400, "error: unknown error\n"); +} + static int process_payload(struct flb_http *ctx, struct http_conn *conn, flb_sds_t tag, struct mk_http_session *session, @@ -585,8 +607,9 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, request->data.len = original_data_size; } + if (ret != 0) { - send_response(conn, 400, "error: invalid payload\n"); + send_response_error(ctx, conn, ret); return -1; } @@ -797,6 +820,31 @@ static int send_response_ng(struct flb_http_response *response, return 0; } +static int send_response_error_ng(struct flb_http *ctx, struct flb_http_response *response, int error) +{ + switch (error) { + case -ENOMEM: + // 500 ?? + return send_response_ng(response, 429, "error: no memory available\n"); + case -ENOSPC: + return send_response_ng(response, 429, "error: no storage available\n"); + case -EIO: + return send_response_ng(response, 500, "error: I/O failure\n"); + case -EAGAIN: + return send_response_ng(response, 429, "error: too many requests\n"); + case -EHNOTYPE: + return send_response_ng(response, 400, "error: header 'Content-Type' is not set\n"); + case -EHINVTYPE: + return send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); + case -EHNOBODY: + return send_response_ng(response, 400, "eror: no payload found\n"); + case -1: + return send_response_ng(response, 400, "error: invalid request\n"); + } + flb_plg_debug(ctx->ins, "unknown error: %d", errno); + return send_response_ng(response, 400, "error: unknown error\n"); +} + static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) { int ret; @@ -891,7 +939,7 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ log_event_error: flb_plg_error(ctx->ins, "Error encoding record : %d", ret); msgpack_unpacked_destroy(&result); - return -1; + return ret; } static ssize_t parse_payload_json_ng(flb_sds_t tag, @@ -946,8 +994,7 @@ static int process_payload_ng(flb_sds_t tag, struct flb_http *ctx; if (request->content_type == NULL) { - send_response_ng(response, 400, "error: header 'Content-Type' is not set\n"); - return -1; + return -EHNOTYPE; } if (strcasecmp(request->content_type, "application/json") == 0) { @@ -959,14 +1006,12 @@ static int process_payload_ng(flb_sds_t tag, } if (type == -1) { - send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); - return -1; + return -EHINVTYPE; } if (request->body == NULL || cfl_sds_len(request->body) == 0) { - send_response_ng(response, 400, "error: no payload found\n"); - return -1; + return -EHNOBODY; } if (type == HTTP_CONTENT_JSON) { @@ -1042,7 +1087,7 @@ int http_prot_handle_ng(struct flb_http_request *request, send_response_ng(response, ctx->successful_response_code, NULL); } else { - send_response_ng(response, 400, "error: unable to process records\n"); + send_response_error_ng(ctx, response, ret); } return ret; diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index e2527624f72..7bf7f7048c0 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type, return 0; } -struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, - const char *tag, int tag_len) +static inline void errnum_set(int *errnum, int error) +{ + if (errnum) { + *errnum = error; + } +} + +static inline void errnum_set_from_errno(int *errnum) +{ + if (errno) { + errnum_set(errnum, errno); + } +} + +static inline void errnum_set_cio(int *errnum, int cio_err) +{ + switch (cio_err) { + case CIO_OK: + errnum_set(errnum, 0); + break; + case CIO_CORRUPTED: + errnum_set(errnum, EIO); + break; + case CIO_RETRY: + errnum_set(errnum, EAGAIN); + break; + default: + case CIO_ERROR: + errnum_set(errnum, EINVAL); + break; + } +} + +static struct flb_input_chunk *input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len, int *errnum) { int ret; int err; @@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (!chunk) { flb_error("[input chunk] could not create chunk file: %s:%s", storage->stream->name, name); + errnum_set_cio(errnum, err); return NULL; } /* @@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in if (ret == CIO_FALSE) { ret = cio_chunk_up_force(chunk); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in /* Write chunk header */ ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len); if (ret == -1) { + errnum_set(errnum, EIO); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in ic = flb_calloc(1, sizeof(struct flb_input_chunk)); if (!ic) { flb_errno(); + errnum_set_from_errno(errnum); cio_chunk_close(chunk, CIO_TRUE); return NULL; } @@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in return ic; } +struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type, + const char *tag, int tag_len) +{ + return input_chunk_create(in, event_type, tag, tag_len, NULL); +} + int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, const char *tag_buf, int tag_len, int del) @@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, int event_type, const char *tag, int tag_len, - size_t chunk_size, int *set_down) + size_t chunk_size, int *set_down, + int *errnum) { int id = -1; int ret; @@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, /* No chunk was found, we need to create a new one */ if (!ic) { - ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len); + ic = input_chunk_create(in, event_type, (char *) tag, tag_len, errnum); new_chunk = FLB_TRUE; if (!ic) { return NULL; @@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } + /* Set the error no ENOSPC so the caller knows we have hit a storage limit. */ + errnum_set(errnum, ENOSPC); return NULL; } @@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, const void *buf, size_t buf_size) { int ret, total_records_start; + int err = 0; int set_down = FLB_FALSE; int min; int new_chunk = FLB_FALSE; @@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (ret != 0) { /* we could not allocate the required space, just return */ - return -1; + return -ENOMEM; } } } @@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, if (flb_input_buf_paused(in) == FLB_TRUE) { flb_debug("[input chunk] %s is paused, cannot append records", in->name); - return -1; + return -EAGAIN; } if (buf_size == 0) { @@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, * Get a target input chunk, can be one with remaining space available * or a new one. */ - ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down); + ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down, &err); if (!ic) { flb_error("[input chunk] no available chunk"); - return -1; + if (err != 0) { + return -err; + } + /* fallback on returning errno if it is set. */ + else if (errno != 0) { + return -errno; + } + return -EIO; } /* newly created chunk */ @@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_is_up(ic); if (ret == FLB_FALSE) { ret = cio_chunk_up_force(ic->chunk); - if (ret == -1) { + if (ret <= CIO_ERROR) { flb_error("[input chunk] cannot retrieve temporary chunk"); - return -1; + switch (ret) { + case CIO_CORRUPTED: + return -EIO; + case CIO_RETRY: + return -EAGAIN; + case CIO_ERROR: + return -ENOMEM; + } + return -EINVAL; } set_down = FLB_TRUE; } @@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in, ret = flb_input_chunk_write(ic, final_data_buffer, final_data_size); + err = errno; } else { ret = 0; @@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_error("[input chunk] error writing data from %s instance", in->name); cio_chunk_tx_rollback(ic->chunk); - - return -1; + if (err) { + return -err; + } + return -EIO; } /* get the chunks content size */ diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 66ddaea5230..2960a688675 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -99,6 +99,13 @@ static int cb_check_result_json(void *record, size_t size, void *data) return 0; } +/* Callback to check expected results */ +static int cb_check_result_none(void *record, size_t size, void *data) +{ + flb_free(record); + return 0; +} + struct http_client_ctx* http_client_ctx_create() { struct http_client_ctx *ret_ctx = NULL; @@ -588,6 +595,470 @@ void flb_test_http_failure_400_bad_disk_write() test_ctx_destroy(ctx); } +void flb_test_http_failure_400_empty_body() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = ""; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_paused() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + struct mk_list *cur; + struct flb_input_instance *i_ins; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + // how can we pause the input buffers? + mk_list_foreach(cur, &ctx->flb->config->inputs) { + i_ins = mk_list_entry(cur, struct flb_input_instance, _head); + i_ins->mem_buf_status = FLB_INPUT_PAUSED; + i_ins->storage_buf_status = FLB_INPUT_PAUSED; + } + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_enospc() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c = NULL; + int ret; + int try; + size_t b_sent; + + char *buf = "{\"foo\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_none; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_service_set(ctx->flb, + "storage.path", "/tmp/http-input-test-429-enospc", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "storage.type", "filesystem", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + "storage.total_limit_size", "1K", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + for (try = 0; try < 16384; try++) { + if (c != NULL) { + flb_http_client_destroy(c); + } + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + if (c->resp.status == 429) { + if (memcmp(c->resp.payload, + "error: no storage available\n", + strlen("error: no storage available\n")) == 0) { + break; + } + } + + if (!(try % 128)) { + flb_time_msleep(1); + } + } + + if (TEST_CHECK((c->resp.payload != NULL && c->resp.status != 0))) { + if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + if (!TEST_CHECK(memcmp(c->resp.payload, + "error: no storage available\n", + strlen("error: no storage available\n")) == 0)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + } + else { + TEST_MSG("no response to check"); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_400_notype() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_400_invtype() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + size_t b_sent; + + char *buf = "{\"foo\": \"bar\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = ""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + "application/x-unknown", strlen("application/x-unknown")); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(c->resp.status == 400)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 400, c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +void flb_test_http_failure_429_enomem() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c = NULL; + int ret; + int try; + size_t b_sent; + + char *buf = "{\"foo\": \"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\"}"; + + clear_output_num(); + + cb_data.cb = cb_check_result_none; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* + flb_service_set(ctx->flb, + "Flush", "5", + NULL); + */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "storage.type", "memrb", + "mem_buf_limit", "10", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + flb_time_msleep(5000); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + for (try = 0; try < 16384; try++) { + if (c != NULL) { + flb_http_client_destroy(c); + } + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + if (c->resp.status == 429) { + if (memcmp(c->resp.payload, + "error: no memory available\n", + strlen("error: no memory available\n")) == 0) { + break; + } + } + + if (!(try % 128)) { + flb_time_msleep(1); + } + } + + if (TEST_CHECK((c->resp.payload != NULL && c->resp.status != 0))) { + if (!TEST_CHECK(c->resp.status == 429)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + + if (!TEST_CHECK(memcmp(c->resp.payload, + "error: no memory available\n", + strlen("error: no memory available\n")) == 0)) { + TEST_MSG("http response code error. expect: %d, got: %d\n", 429, c->resp.status); + } + } + else { + TEST_MSG("no response to check"); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void test_http_tag_key(char *input) { struct flb_lib_out_cb cb_data; @@ -677,6 +1148,12 @@ TEST_LIST = { {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"failure_response_code_400_bad_json", flb_test_http_failure_400_bad_json}, {"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write}, + {"failure_response_code_400_empty_body", flb_test_http_failure_400_empty_body}, + {"failure_response_code_429_paused", flb_test_http_failure_429_paused}, + {"failure_response_code_429_enomem", flb_test_http_failure_429_enomem}, + {"failure_response_code_429_enospc", flb_test_http_failure_429_enospc}, + {"failure_response_code_400_notype", flb_test_http_failure_400_notype}, + {"failure_response_code_400_invtype", flb_test_http_failure_400_invtype}, {"tag_key_with_map_input", flb_test_http_tag_key_with_map_input}, {"tag_key_with_array_input", flb_test_http_tag_key_with_array_input}, {NULL, NULL}