diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index a8b9321..eb114fe 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -7,6 +7,7 @@ require "logstash/namespace" require 'thread' require 'socket' +require 'time' # The elapsed filter tracks a pair of start/end events and uses their @@ -15,6 +16,10 @@ # The filter has been developed to track the execution time of processes and # other long tasks. # +# The filter has been modified to allow specification of a timestamp to use +# in determining the time interval between two events, instead of relying +# on the timestamp logstash creates upon receiving and processing the event. +# # The configuration looks like this: # [source,ruby] # filter { @@ -85,6 +90,7 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base PREFIX = "elapsed_" ELAPSED_FIELD = PREFIX + "time" TIMESTAMP_START_EVENT_FIELD = PREFIX + "timestamp_start" + TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD = PREFIX + "timestamp_start_ts" HOST_FIELD = "host" ELAPSED_TAG = "elapsed" @@ -97,9 +103,15 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base # The name of the tag identifying the "start event" config :start_tag, :validate => :string, :required => true + # The name of the tag identifying the "start timestamp" + config :start_timestamp, :validate => :string, :required => true, :default => '@timestamp' + # The name of the tag identifying the "end event" config :end_tag, :validate => :string, :required => true + # The name of the tag identifying the "end timestamp" + config :end_timestamp, :validate => :string, :required => true, :default => '@timestamp' + # The name of the field containing the task ID. # This value must uniquely identify the task in the system, otherwise # it's impossible to match the couple of events. @@ -154,13 +166,13 @@ def filter(event) if(@start_events.has_key?(unique_id)) start_event = @start_events.delete(unique_id).event @mutex.unlock - elapsed = event["@timestamp"] - start_event["@timestamp"] + elapsed = Time.parse(event[@end_timestamp]) - Time.parse(start_event[@start_timestamp]) if(@new_event_on_match) - elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"]) + elapsed_event = new_elapsed_event(elapsed, unique_id, start_event[@start_timestamp]) filter_matched(elapsed_event) yield elapsed_event if block_given? else - return add_elapsed_info(event, elapsed, unique_id, start_event["@timestamp"]) + return add_elapsed_info(event, elapsed, unique_id, start_event[@start_timestamp]) end else @mutex.unlock @@ -214,7 +226,8 @@ def create_expired_events_from(expired_elements) error_event[HOST_FIELD] = Socket.gethostname error_event[@unique_id_field] = element.event[@unique_id_field] error_event[ELAPSED_FIELD] = element.age - error_event[TIMESTAMP_START_EVENT_FIELD] = element.event["@timestamp"] + error_event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp]) + error_event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(element.event[@start_timestamp]).strftime('%Q').to_i events << error_event filter_matched(error_event) @@ -224,11 +237,11 @@ def create_expired_events_from(expired_elements) end def start_event?(event) - return (event["tags"] != nil && event["tags"].include?(@start_tag)) + return (event["tags"] != nil && event["tags"].include?(@start_tag) && event[@start_timestamp] != nil) end def end_event?(event) - return (event["tags"] != nil && event["tags"].include?(@end_tag)) + return (event["tags"] != nil && event["tags"].include?(@end_tag) && event[@end_timestamp] != nil) end def new_elapsed_event(elapsed_time, unique_id, timestamp_start_event) @@ -243,7 +256,8 @@ def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event) event[ELAPSED_FIELD] = elapsed_time event[@unique_id_field] = unique_id - event[TIMESTAMP_START_EVENT_FIELD] = timestamp_start_event + event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(timestamp_start_event) + event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(timestamp_start_event).strftime('%Q').to_i return event end diff --git a/spec/filters/elapsed_spec.rb b/spec/filters/elapsed_spec.rb index 5c6bde8..22de33d 100644 --- a/spec/filters/elapsed_spec.rb +++ b/spec/filters/elapsed_spec.rb @@ -3,10 +3,13 @@ require "logstash/filters/elapsed" require "logstash/event" require "socket" +require 'time' describe LogStash::Filters::Elapsed do START_TAG = "startTag" + START_TIMESTAMP = "start_timestamp" END_TAG = "endTag" + END_TIMESTAMP = "end_timestamp" ID_FIELD = "uniqueIdField" def event(data) @@ -17,12 +20,14 @@ def event(data) def start_event(data) data["tags"] ||= [] data["tags"] << START_TAG + data[START_TIMESTAMP] ||= "20150518-18:33:22.345" event(data) end def end_event(data = {}) data["tags"] ||= [] data["tags"] << END_TAG + data[END_TIMESTAMP] ||= "20150518-18:33:22.347" event(data) end @@ -31,7 +36,7 @@ def end_event(data = {}) end def setup_filter(config = {}) - @config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD} + @config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD, "start_timestamp" => START_TIMESTAMP, "end_timestamp" => END_TIMESTAMP} @config.merge!(config) @filter = LogStash::Filters::Elapsed.new(@config) @filter.register @@ -57,7 +62,7 @@ def setup_filter(config = {}) describe "receiving an event with a valid start tag" do describe "but without an unique id field" do it "does not record it" do - @filter.filter(event("tags" => ["tag1", START_TAG])) + @filter.filter(event("tags" => ["tag1", START_TAG], START_TIMESTAMP => "20150518-18:33:22.345")) insist { @filter.start_events.size } == 0 end end @@ -75,7 +80,7 @@ def setup_filter(config = {}) describe "receiving two 'start events' for the same id field" do it "keeps the first one and does not save the second one" do - args = {"tags" => [START_TAG], ID_FIELD => "id123"} + args = {"tags" => [START_TAG], ID_FIELD => "id123", START_TIMESTAMP => "20150518-18:33:22.345"} first_event = event(args) second_event = event(args) @@ -157,12 +162,12 @@ def setup_filter(config = {}) insist { @match_event["tags"].include?("elapsed_match") } == true end - it "contains an 'elapsed_time field' with the elapsed time" do - insist { @match_event["elapsed_time"] } == 10 + it "contains an 'elapsed.time field' with the elapsed time" do + insist { @match_event["elapsed.time"] } == 0.002 end - it "contains an 'elapsed_timestamp_start field' with the timestamp of the 'start event'" do - insist { @match_event["elapsed_timestamp_start"] } == @start_event["@timestamp"] + it "contains an 'elapsed.timestamp_start field' with the timestamp of the 'start event'" do + insist { @match_event["elapsed.timestamp_start"] } == Time.parse(@start_event[START_TIMESTAMP]) end it "contains an 'id field'" do @@ -178,8 +183,8 @@ def setup_filter(config = {}) @start_event = start_event(ID_FIELD => @id_value) @filter.filter(@start_event) - end_timestamp = @start_event["@timestamp"] + 10 - end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + # end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value) @filter.filter(end_event) do |new_event| @match_event = new_event end @@ -196,8 +201,8 @@ def setup_filter(config = {}) context "if 'new_event_on_match' is set to 'false'" do before(:each) do - end_timestamp = @start_event["@timestamp"] + 10 - end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + # end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value) @filter.filter(end_event) @match_event = end_event @@ -283,9 +288,9 @@ def ages() insist { @expired_events[1]["elapsed_time"] } == 31 end - it "creates a new event containing an 'elapsed_timestamp_start field' with the timestamp of the expired 'start event'" do - insist { @expired_events[0]["elapsed_timestamp_start"] } == @start_event_2["@timestamp"] - insist { @expired_events[1]["elapsed_timestamp_start"] } == @start_event_3["@timestamp"] + it "creates a new event containing an 'elapsed.timestamp_start field' with the timestamp of the expired 'start event'" do + insist { @expired_events[0]["elapsed.timestamp_start"] } == Time.parse(@start_event_2[START_TIMESTAMP]) + insist { @expired_events[1]["elapsed.timestamp_start"] } == Time.parse(@start_event_3[START_TIMESTAMP]) end it "creates a new event containing a 'host field' for each expired 'start event'" do