From 79393293e5aaa1439d19918d489a23585c4a31db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 12:05:35 +0000 Subject: [PATCH 1/7] make requests return exception only on 2xx codes --- .../outputs/elasticsearch/http_client.rb | 53 ++++++++++--------- .../http_client/manticore_adapter.rb | 5 +- .../outputs/elasticsearch/http_client/pool.rb | 24 ++++----- 3 files changed, 38 insertions(+), 44 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..33d7345c 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -182,21 +182,18 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - - @bulk_response_metrics.increment(response.code.to_s) - - case response.code - when 200 # OK - LogStash::Json.load(response.body) - when 413 # Payload Too Large - logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') - else - url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + begin + response = @pool.post(@bulk_path, params, body_stream.string) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if response.code == 413 # special handling for 413, treat it as a document level issue + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) + emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') + else + raise e + end + ensure + code = e ? e.code : response.code + @bulk_response_metrics.increment(code.to_s) end end @@ -411,6 +408,9 @@ def host_to_url(h) def exists?(path, use_get=false) response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return true if e.code == 404 + raise e end def template_exists?(template_endpoint, name) @@ -420,7 +420,10 @@ def template_exists?(template_endpoint, name) def template_put(template_endpoint, name, template) path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - @pool.put(path, nil, LogStash::Json.dump(template)) + response = @pool.put(path, nil, LogStash::Json.dump(template)) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return response if e.code == 404 + raise e end # ILM methods @@ -432,17 +435,15 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + logger.info("Created rollover alias", name: alias_name) + # If the rollover alias already exists, ignore the error that comes back from Elasticsearch + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if e.response_code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return end + raise e end def get_xpack_info diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index c9e49ec7..b9a183ef 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. We might need a better story around this later - # but for our current purposes this is correct code = resp.code - if code < 200 || code > 299 && code != 404 + if code < 200 || code > 299 #&& code != 404 raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 68715066..1ef9d0f9 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -253,13 +253,11 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e - end + response = perform_request_to_url(url, :head, @healthcheck_path) + return response, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) + return nil, e end def healthcheck!(register_phase = true) @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true) end def get_root_path(url, params={}) - begin - resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) - return resp, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) - return nil, e - end + resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) + return resp, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) + return nil, e end def test_serverless_connection(url, root_response) From 678db4061798960d34df48d74b6b5d03a21e5dd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 15:54:40 +0000 Subject: [PATCH 2/7] minor fixes --- lib/logstash/outputs/elasticsearch/http_client.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 33d7345c..41e1d147 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -185,14 +185,14 @@ def bulk_send(body_stream, batch_actions) begin response = @pool.post(@bulk_path, params, body_stream.string) rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if response.code == 413 # special handling for 413, treat it as a document level issue + if e.response_code == 413 # special handling for 413, treat it as a document level issue logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') + emulate_batch_error_response(batch_actions, 413, 'payload_too_large') else raise e end ensure - code = e ? e.code : response.code + code = e ? e.response_code : response.code @bulk_response_metrics.increment(code.to_s) end end From 7b3febe0aa9adf33fe3c2345bd7b1d17b0d1ac7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 15:56:59 +0000 Subject: [PATCH 3/7] minor fixes --- .../outputs/elasticsearch/http_client.rb | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 41e1d147..28da1f15 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -181,20 +181,15 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - - begin - response = @pool.post(@bulk_path, params, body_stream.string) - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 413 # special handling for 413, treat it as a document level issue - logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, 413, 'payload_too_large') - else - raise e - end - ensure - code = e ? e.response_code : response.code - @bulk_response_metrics.increment(code.to_s) - end + response = @pool.post(@bulk_path, params, body_stream.string) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + raise e unless e.response_code == 413 + # special handling for 413, treat it as a document level issue + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) + emulate_batch_error_response(batch_actions, 413, 'payload_too_large') + ensure + code = e ? e.response_code : response.code + @bulk_response_metrics.increment(code.to_s) end def emulate_batch_error_response(actions, http_code, reason) From f24b348b0f38e56eb3f0e2714e4e6180828719b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 16:00:55 +0000 Subject: [PATCH 4/7] minor fixes --- lib/logstash/outputs/elasticsearch/http_client.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 28da1f15..42254574 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -182,14 +182,16 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} response = @pool.post(@bulk_path, params, body_stream.string) + @bulk_response_metrics.increment(response.code.to_s) + LogStash::Json.load(response.body) rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + @bulk_response_metrics.increment(e.response_code.to_s) raise e unless e.response_code == 413 # special handling for 413, treat it as a document level issue logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) emulate_batch_error_response(batch_actions, 413, 'payload_too_large') - ensure - code = e ? e.response_code : response.code - @bulk_response_metrics.increment(code.to_s) + else # it may be a network issue instead, re-raise + raise e end def emulate_batch_error_response(actions, http_code, reason) From deb0860cb4c49e22fb0eed198c458f96fd5b3652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 16:06:12 +0000 Subject: [PATCH 5/7] minor fixes --- .../outputs/elasticsearch/http_client.rb | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 42254574..6f80f40c 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -181,17 +181,22 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) + + begin + response = @pool.post(@bulk_path, params, body_stream.string) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + @bulk_response_metrics.increment(e.response_code.to_s) + raise e unless e.response_code == 413 + # special handling for 413, treat it as a document level issue + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) + emulate_batch_error_response(batch_actions, 413, 'payload_too_large') + rescue => e # it may be a network issue instead, re-raise + raise e + end + @bulk_response_metrics.increment(response.code.to_s) + LogStash::Json.load(response.body) - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - @bulk_response_metrics.increment(e.response_code.to_s) - raise e unless e.response_code == 413 - # special handling for 413, treat it as a document level issue - logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, 413, 'payload_too_large') - else # it may be a network issue instead, re-raise - raise e end def emulate_batch_error_response(actions, http_code, reason) From 50315b9e7f093a622a87200a7287cccb50330462 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 16:12:43 +0000 Subject: [PATCH 6/7] minor fixes --- lib/logstash/outputs/elasticsearch/http_client.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 6f80f40c..3603e6a2 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -184,6 +184,7 @@ def bulk_send(body_stream, batch_actions) begin response = @pool.post(@bulk_path, params, body_stream.string) + @bulk_response_metrics.increment(response.code.to_s) rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e @bulk_response_metrics.increment(e.response_code.to_s) raise e unless e.response_code == 413 @@ -194,8 +195,6 @@ def bulk_send(body_stream, batch_actions) raise e end - @bulk_response_metrics.increment(response.code.to_s) - LogStash::Json.load(response.body) end From a3551be6228762467694dbf9c46f412cb29e5ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 16:14:29 +0000 Subject: [PATCH 7/7] minor fixes --- .../outputs/elasticsearch/http_client/manticore_adapter.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index b9a183ef..11f85b53 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -77,7 +77,7 @@ def perform_request(url, method, path, params={}, body=nil) end code = resp.code - if code < 200 || code > 299 #&& code != 404 + if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) end