From 636fe32b62c3f75c52e158bc23450c25e89a049e Mon Sep 17 00:00:00 2001 From: Matt Walters Date: Mon, 18 May 2015 16:50:42 -0400 Subject: [PATCH 1/5] reworking to allow specifying and supplying existing timestamp on record, instead of relying on timestamp created when log is processed. --- lib/logstash/filters/elapsed.rb | 21 ++++++++++++++------- spec/filters/elapsed_spec.rb | 29 +++++++++++++++++------------ 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index 44afdc4..2c14f63 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -7,6 +7,7 @@ require "logstash/namespace" require 'thread' require 'socket' +require 'time' # The elapsed filter tracks a pair of start/end events and uses their @@ -97,9 +98,15 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base # The name of the tag identifying the "start event" config :start_tag, :validate => :string, :required => true + # The name of the tag identifying the "start timestamp" + config :start_timestamp, :validate => :string, :required => true, :default => '@timestamp' + # The name of the tag identifying the "end event" config :end_tag, :validate => :string, :required => true + # The name of the tag identifying the "end timestamp" + config :end_timestamp, :validate => :string, :required => true, :default => '@timestamp' + # The name of the field containing the task ID. # This value must uniquely identify the task in the system, otherwise # it's impossible to match the couple of events. @@ -154,13 +161,13 @@ def filter(event) if(@start_events.has_key?(unique_id)) start_event = @start_events.delete(unique_id).event @mutex.unlock - elapsed = event["@timestamp"] - start_event["@timestamp"] + elapsed = Time.parse(event[@end_timestamp]) - Time.parse(start_event[@start_timestamp]) if(@new_event_on_match) - elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"]) + elapsed_event = new_elapsed_event(elapsed, unique_id, start_event[@start_timestamp]) filter_matched(elapsed_event) yield elapsed_event if block_given? else - return add_elapsed_info(event, elapsed, unique_id, start_event["@timestamp"]) + return add_elapsed_info(event, elapsed, unique_id, start_event[@start_timestamp]) end else @mutex.unlock @@ -214,7 +221,7 @@ def create_expired_events_from(expired_elements) error_event[HOST_FIELD] = Socket.gethostname error_event[@unique_id_field] = element.event[@unique_id_field] error_event[ELAPSED_FIELD] = element.age - error_event[TIMESTAMP_START_EVENT_FIELD] = element.event["@timestamp"] + error_event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]) events << error_event filter_matched(error_event) @@ -224,11 +231,11 @@ def create_expired_events_from(expired_elements) end def start_event?(event) - return (event["tags"] != nil && event["tags"].include?(@start_tag)) + return (event["tags"] != nil && event["tags"].include?(@start_tag) && event[@start_timestamp] != nil) end def end_event?(event) - return (event["tags"] != nil && event["tags"].include?(@end_tag)) + return (event["tags"] != nil && event["tags"].include?(@end_tag) && event[@end_timestamp] != nil) end def new_elapsed_event(elapsed_time, unique_id, timestamp_start_event) @@ -243,7 +250,7 @@ def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event) event[ELAPSED_FIELD] = elapsed_time event[@unique_id_field] = unique_id - event[TIMESTAMP_START_EVENT_FIELD] = timestamp_start_event + event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(timestamp_start_event) return event end diff --git a/spec/filters/elapsed_spec.rb b/spec/filters/elapsed_spec.rb index f65b12a..b67cea0 100644 --- a/spec/filters/elapsed_spec.rb +++ b/spec/filters/elapsed_spec.rb @@ -3,10 +3,13 @@ require "logstash/filters/elapsed" require "logstash/event" require "socket" +require 'time' describe LogStash::Filters::Elapsed do START_TAG = "startTag" + START_TIMESTAMP = "start_timestamp" END_TAG = "endTag" + END_TIMESTAMP = "end_timestamp" ID_FIELD = "uniqueIdField" def event(data) @@ -17,12 +20,14 @@ def event(data) def start_event(data) data["tags"] ||= [] data["tags"] << START_TAG + data[START_TIMESTAMP] ||= "20150518-18:33:22.345" event(data) end def end_event(data = {}) data["tags"] ||= [] data["tags"] << END_TAG + data[END_TIMESTAMP] ||= "20150518-18:33:22.347" event(data) end @@ -31,7 +36,7 @@ def end_event(data = {}) end def setup_filter(config = {}) - @config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD} + @config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD, "start_timestamp" => START_TIMESTAMP, "end_timestamp" => END_TIMESTAMP} @config.merge!(config) @filter = LogStash::Filters::Elapsed.new(@config) @filter.register @@ -57,7 +62,7 @@ def setup_filter(config = {}) describe "receiving an event with a valid start tag" do describe "but without an unique id field" do it "does not record it" do - @filter.filter(event("tags" => ["tag1", START_TAG])) + @filter.filter(event("tags" => ["tag1", START_TAG], START_TIMESTAMP => "20150518-18:33:22.345")) insist { @filter.start_events.size } == 0 end end @@ -75,7 +80,7 @@ def setup_filter(config = {}) describe "receiving two 'start events' for the same id field" do it "keeps the first one and does not save the second one" do - args = {"tags" => [START_TAG], ID_FIELD => "id123"} + args = {"tags" => [START_TAG], ID_FIELD => "id123", START_TIMESTAMP => "20150518-18:33:22.345"} first_event = event(args) second_event = event(args) @@ -100,7 +105,7 @@ def setup_filter(config = {}) describe "and with an id" do describe "but without a previous 'start event'" do - it "adds a tag 'elapsed.end_witout_start' to the 'end event'" do + it "adds a tag 'elapsed.end_without_start' to the 'end event'" do end_event = end_event(ID_FIELD => "id_123") @filter.filter(end_event) @@ -158,11 +163,11 @@ def setup_filter(config = {}) end it "contains an 'elapsed.time field' with the elapsed time" do - insist { @match_event["elapsed.time"] } == 10 + insist { @match_event["elapsed.time"] } == 0.002 end it "contains an 'elapsed.timestamp_start field' with the timestamp of the 'start event'" do - insist { @match_event["elapsed.timestamp_start"] } == @start_event["@timestamp"] + insist { @match_event["elapsed.timestamp_start"] } == Time.parse(@start_event[START_TIMESTAMP]) end it "contains an 'id field'" do @@ -178,8 +183,8 @@ def setup_filter(config = {}) @start_event = start_event(ID_FIELD => @id_value) @filter.filter(@start_event) - end_timestamp = @start_event["@timestamp"] + 10 - end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + # end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value) @filter.filter(end_event) do |new_event| @match_event = new_event end @@ -196,8 +201,8 @@ def setup_filter(config = {}) context "if 'new_event_on_match' is set to 'false'" do before(:each) do - end_timestamp = @start_event["@timestamp"] + 10 - end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + # end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value) @filter.filter(end_event) @match_event = end_event @@ -284,8 +289,8 @@ def ages() end it "creates a new event containing an 'elapsed.timestamp_start field' with the timestamp of the expired 'start event'" do - insist { @expired_events[0]["elapsed.timestamp_start"] } == @start_event_2["@timestamp"] - insist { @expired_events[1]["elapsed.timestamp_start"] } == @start_event_3["@timestamp"] + insist { @expired_events[0]["elapsed.timestamp_start"] } == Time.parse(@start_event_2[START_TIMESTAMP]) + insist { @expired_events[1]["elapsed.timestamp_start"] } == Time.parse(@start_event_3[START_TIMESTAMP]) end it "creates a new event containing a 'host field' for each expired 'start event'" do From 6d09cd11427da556b5afac00747cc8a22406ad13 Mon Sep 17 00:00:00 2001 From: Matt Walters Date: Mon, 18 May 2015 17:12:08 -0400 Subject: [PATCH 2/5] adding comment --- lib/logstash/filters/elapsed.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index 2c14f63..fdcec46 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -16,6 +16,10 @@ # The filter has been developed to track the execution time of processes and # other long tasks. # +# The filter has been modified to allow specification of a timestamp to use +# in determining the time interval between two events, instead of relying +# on the timestamp logstash creates upon receiving and processing the event. +# # The configuration looks like this: # [source,ruby] # filter { From 2d59ee741daa1494a047c039beb94f403a9f88df Mon Sep 17 00:00:00 2001 From: Matt Walters Date: Thu, 21 May 2015 17:18:24 -0400 Subject: [PATCH 3/5] adding an actual timestamp, instead of a string that happens to be named timestamp... --- lib/logstash/filters/elapsed.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index fdcec46..0f705bf 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -90,6 +90,7 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base PREFIX = "elapsed." ELAPSED_FIELD = PREFIX + "time" TIMESTAMP_START_EVENT_FIELD = PREFIX + "timestamp_start" + TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD = PREFIX + "timestamp_start_ts" HOST_FIELD = "host" ELAPSED_TAG = "elapsed" @@ -226,6 +227,7 @@ def create_expired_events_from(expired_elements) error_event[@unique_id_field] = element.event[@unique_id_field] error_event[ELAPSED_FIELD] = element.age error_event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]) + error_event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]).to_f events << error_event filter_matched(error_event) From 29fdac15f6d5ed345b5318c7bcf1fe166325d6b9 Mon Sep 17 00:00:00 2001 From: Matt Walters Date: Fri, 22 May 2015 00:22:49 -0400 Subject: [PATCH 4/5] adding real unix timestamp to completed elapsed event --- lib/logstash/filters/elapsed.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index 0f705bf..83f938e 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -257,6 +257,7 @@ def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event) event[ELAPSED_FIELD] = elapsed_time event[@unique_id_field] = unique_id event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(timestamp_start_event) + event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = Time.parse(timestamp_start_event).to_f return event end From ff3cdf317e35c92f8e3501ce6ebb73c2777fa5b1 Mon Sep 17 00:00:00 2001 From: Matt Walters Date: Fri, 22 May 2015 10:23:55 -0400 Subject: [PATCH 5/5] better integer value for timestamp --- lib/logstash/filters/elapsed.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index 83f938e..f72c336 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -227,7 +227,7 @@ def create_expired_events_from(expired_elements) error_event[@unique_id_field] = element.event[@unique_id_field] error_event[ELAPSED_FIELD] = element.age error_event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]) - error_event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]).to_f + error_event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(element.event[@start_timestamp]).strftime('%Q').to_i events << error_event filter_matched(error_event) @@ -257,7 +257,7 @@ def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event) event[ELAPSED_FIELD] = elapsed_time event[@unique_id_field] = unique_id event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(timestamp_start_event) - event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = Time.parse(timestamp_start_event).to_f + event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(timestamp_start_event).strftime('%Q').to_i return event end