From 50f8713c42c470dab0760d8d25e2b8c0799968d8 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 27 Apr 2022 17:07:02 +0200 Subject: [PATCH 1/7] Refactor: no need for local vars to be atomic --- lib/logstash/outputs/http.rb | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 3f9bac1..7940751 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -163,9 +163,6 @@ def log_error_response(response, url, event) end def send_events(events) - successes = java.util.concurrent.atomic.AtomicInteger.new(0) - failures = java.util.concurrent.atomic.AtomicInteger.new(0) - retries = java.util.concurrent.atomic.AtomicInteger.new(0) event_count = @is_batch ? 1 : events.size pending = Queue.new @@ -175,6 +172,8 @@ def send_events(events) events.each {|e| pending << [e, 0]} end + successes, failures = 0, 0 + while popped = pending.pop break if popped == :done @@ -189,23 +188,21 @@ def send_events(events) case action when :success - successes.incrementAndGet + successes += 1 when :retry - retries.incrementAndGet - - next_attempt = attempt+1 + next_attempt = attempt + 1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) - @timer.schedule(timer_task, sleep_for*1000) + @timer.schedule(timer_task, sleep_for * 1000) when :failure - failures.incrementAndGet + failures += 1 else raise "Unknown action #{action}" end if action == :success || action == :failure - if successes.get+failures.get == event_count + if successes + failures == event_count pending << :done end end @@ -215,7 +212,7 @@ def send_events(events) :class => e.class.name, :message => e.message, :backtrace => e.backtrace) - failures.incrementAndGet + failures += 1 raise e end end From c411df46a31ab62060c5a65c86115622af204391 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 27 Apr 2022 17:10:29 +0200 Subject: [PATCH 2/7] some duplication but simplify readability --- lib/logstash/outputs/http.rb | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 7940751..d5037c3 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -189,6 +189,8 @@ def send_events(events) case action when :success successes += 1 + + pending << :done if successes + failures == event_count when :retry next_attempt = attempt + 1 sleep_for = sleep_for_attempt(next_attempt) @@ -197,15 +199,11 @@ def send_events(events) @timer.schedule(timer_task, sleep_for * 1000) when :failure failures += 1 + + pending << :done if successes + failures == event_count else raise "Unknown action #{action}" end - - if action == :success || action == :failure - if successes + failures == event_count - pending << :done - end - end rescue => e # This should never happen unless there's a flat out bug in the code @logger.error("Error sending HTTP Request", From 776f8ea337ede90c83a30fd0e33ffe9d987ecf4d Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 27 Apr 2022 17:25:59 +0200 Subject: [PATCH 3/7] Feat: simple metric to tract successes/failures --- lib/logstash/outputs/http.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index d5037c3..599fb88 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -124,6 +124,8 @@ def register # Run named Timer as daemon thread @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true) + + @request_metrics = metric.namespace(:requests) end # def register def multi_receive(events) @@ -189,9 +191,12 @@ def send_events(events) case action when :success successes += 1 + @request_metrics.increment(:successes) pending << :done if successes + failures == event_count when :retry + @request_metrics.increment(:retryable_failures) + next_attempt = attempt + 1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") @@ -199,6 +204,7 @@ def send_events(events) @timer.schedule(timer_task, sleep_for * 1000) when :failure failures += 1 + @request_metrics.increment(:failures) pending << :done if successes + failures == event_count else @@ -210,7 +216,6 @@ def send_events(events) :class => e.class.name, :message => e.message, :backtrace => e.backtrace) - failures += 1 raise e end end From bae9d566d70ad4929bbddf119329a1b1cf3a07ca Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 27 Apr 2022 17:46:36 +0200 Subject: [PATCH 4/7] remove confusing comment --- lib/logstash/outputs/http.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 599fb88..5f46b17 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -251,7 +251,6 @@ def send_event(event, attempt) body = gzip(body) end - # Create an async request response = client.send(@http_method, url, :body => body, :headers => headers).call if !response_success?(response) From 429d99ffeb1ed524e1b8316173a112fc30f9c409 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 11:01:07 +0200 Subject: [PATCH 5/7] re-format code a bit --- lib/logstash/outputs/http.rb | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 5f46b17..5c12d45 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -190,8 +190,8 @@ def send_events(events) case action when :success - successes += 1 @request_metrics.increment(:successes) + successes += 1 pending << :done if successes + failures == event_count when :retry @@ -203,8 +203,8 @@ def send_events(events) timer_task = RetryTimerTask.new(pending, event, next_attempt) @timer.schedule(timer_task, sleep_for * 1000) when :failure - failures += 1 @request_metrics.increment(:failures) + failures += 1 pending << :done if successes + failures == event_count else @@ -212,18 +212,12 @@ def send_events(events) end rescue => e # This should never happen unless there's a flat out bug in the code - @logger.error("Error sending HTTP Request", - :class => e.class.name, - :message => e.message, - :backtrace => e.backtrace) + @logger.error("Error sending HTTP Request", :message => e.message, :class => e.class, :backtrace => e.backtrace) raise e end end rescue => e - @logger.error("Error in http output loop", - :class => e.class.name, - :message => e.message, - :backtrace => e.backtrace) + @logger.error("Error in http output loop", :message => e.message, :class => e.class, :backtrace => e.backtrace) raise e end From b74412a008e94f04e0288e9fe8cae8b0a43c56aa Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 11:54:48 +0200 Subject: [PATCH 6/7] Refactor: move metric closer to http client --- lib/logstash/outputs/http.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 5c12d45..bb985f2 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -190,20 +190,16 @@ def send_events(events) case action when :success - @request_metrics.increment(:successes) successes += 1 pending << :done if successes + failures == event_count when :retry - @request_metrics.increment(:retryable_failures) - next_attempt = attempt + 1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) @timer.schedule(timer_task, sleep_for * 1000) when :failure - @request_metrics.increment(:failures) failures += 1 pending << :done if successes + failures == event_count @@ -250,12 +246,15 @@ def send_event(event, attempt) if !response_success?(response) if retryable_response?(response) log_retryable_response(response) + @request_metrics.increment(:retryable_failures) return :retry, event, attempt else log_error_response(response, url, event) + @request_metrics.increment(:failures) return :failure, event, attempt end else + @request_metrics.increment(:successes) return :success, event, attempt end @@ -279,8 +278,10 @@ def send_event(event, attempt) log_failure("Could not fetch URL", log_params) if will_retry + @request_metrics.increment(:retryable_failures) return :retry, event, attempt else + @request_metrics.increment(:failures) return :failure, event, attempt end end From ae37643a2229ea581430fc65226b1215c5f467eb Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 11:55:08 +0200 Subject: [PATCH 7/7] debug when sending request --- lib/logstash/outputs/http.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index bb985f2..897fc21 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -241,6 +241,8 @@ def send_event(event, attempt) body = gzip(body) end + @logger.debug? && @logger.debug("Sending request", url: url, headers: headers, body_length: body.length, attempt: attempt) + response = client.send(@http_method, url, :body => body, :headers => headers).call if !response_success?(response)