diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..3603e6a2 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - - @bulk_response_metrics.increment(response.code.to_s) - - case response.code - when 200 # OK - LogStash::Json.load(response.body) - when 413 # Payload Too Large + begin + response = @pool.post(@bulk_path, params, body_stream.string) + @bulk_response_metrics.increment(response.code.to_s) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + @bulk_response_metrics.increment(e.response_code.to_s) + raise e unless e.response_code == 413 + # special handling for 413, treat it as a document level issue logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') - else - url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + emulate_batch_error_response(batch_actions, 413, 'payload_too_large') + rescue => e # it may be a network issue instead, re-raise + raise e end + + LogStash::Json.load(response.body) end def emulate_batch_error_response(actions, http_code, reason) @@ -411,6 +409,9 @@ def host_to_url(h) def exists?(path, use_get=false) response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return true if e.code == 404 + raise e end def template_exists?(template_endpoint, name) @@ -420,7 +421,10 @@ def template_exists?(template_endpoint, name) def template_put(template_endpoint, name, template) path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - @pool.put(path, nil, LogStash::Json.dump(template)) + response = @pool.put(path, nil, LogStash::Json.dump(template)) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return response if e.code == 404 + raise e end # ILM methods @@ -432,17 +436,15 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + logger.info("Created rollover alias", name: alias_name) + # If the rollover alias already exists, ignore the error that comes back from Elasticsearch + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if e.response_code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return end + raise e end def get_xpack_info diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index c9e49ec7..11f85b53 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. We might need a better story around this later - # but for our current purposes this is correct code = resp.code - if code < 200 || code > 299 && code != 404 + if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 68715066..1ef9d0f9 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -253,13 +253,11 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e - end + response = perform_request_to_url(url, :head, @healthcheck_path) + return response, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) + return nil, e end def healthcheck!(register_phase = true) @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true) end def get_root_path(url, params={}) - begin - resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) - return resp, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) - return nil, e - end + resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) + return resp, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) + return nil, e end def test_serverless_connection(url, root_response)