diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb index 44afdc4..4852a10 100644 --- a/lib/logstash/filters/elapsed.rb +++ b/lib/logstash/filters/elapsed.rb @@ -22,8 +22,10 @@ # start_tag => "start event tag" # end_tag => "end event tag" # unique_id_field => "id field name" +# unique_id_fields => ["id field name", "second id field name"] # timeout => seconds # new_event_on_match => true/false +# embed_inbetween_messages => true/false # } # } # @@ -39,12 +41,12 @@ # [source,ruby] # filter { # grok { -# match => ["message", "%{TIMESTAMP_ISO8601} START id: (?.*)"] +# match => ["message", "%{TIMESTAMP_ISO8601} START FROM (?%{IP:ip}) id: (?.*)"] # add_tag => [ "taskStarted" ] # } # # grok { -# match => ["message", "%{TIMESTAMP_ISO8601} END id: (?.*)"] +# match => ["message", "%{TIMESTAMP_ISO8601} END FROM (?%{IP:ip}) id: (?.*)"] # add_tag => [ "taskTerminated"] # } # @@ -52,6 +54,7 @@ # start_tag => "taskStarted" # end_tag => "taskTerminated" # unique_id_field => "task_id" +# unique_id_fields => ["task_id", "task_id_ip"] # } # } # @@ -89,7 +92,7 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base ELAPSED_TAG = "elapsed" EXPIRED_ERROR_TAG = PREFIX + "expired_error" - END_WITHOUT_START_TAG = PREFIX + "end_without_start" + END_WITHOUT_START_TAG = PREFIX + "end_wtihout_start" MATCH_TAG = PREFIX + "match" config_name "elapsed" @@ -103,7 +106,11 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base # The name of the field containing the task ID. # This value must uniquely identify the task in the system, otherwise # it's impossible to match the couple of events. - config :unique_id_field, :validate => :string, :required => true + config :unique_id_field, :validate => :string, :required => false + + # The names of the fields containing the task IDs + # Just for the case one key is not unique or key is constructed in runtime + config :unique_id_fields, :validate => :array, :required => false # The amount of seconds after an "end event" can be considered lost. # The corresponding "start event" is discarded and an "expired event" @@ -115,12 +122,20 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base # to the "end event"; if it's set to `true` a new "match event" is created. config :new_event_on_match, :validate => :boolean, :required => false, :default => false + # As useful information can be present between start_tag and end_tag, there + # is the option to collect all messages which where read after start_tag and push them + # into ['between'] tag + # Default is false + config :embed_inbetween_messages, :validate => :boolean, :required => false, :default => false + + public def register @mutex = Mutex.new # This is the state of the filter. The keys are the "unique_id_field", # the values are couples of values: @start_events = {} + @between_events = [] @logger.info("Elapsed, timeout: #{@timeout} seconds") end @@ -132,8 +147,26 @@ def start_events def filter(event) return unless filter?(event) + + unique_id = "" + + if @unique_id_fields != nil && @unique_id_fields.size > 0 + @unique_id_field = "" + @unique_id_fields.each{ |el| + if event[el].nil? + if @embed_inbetween_messages + ev2 = LogStash::Filters::Elapsed::Element.new(event) + @between_events.push(ev2.event) + end + return + end + unique_id = unique_id + event[el] + @unique_id_field = @unique_id_field + el + } + else + unique_id = event[@unique_id_field] + end - unique_id = event[@unique_id_field] return if unique_id.nil? if(start_event?(event)) @@ -157,6 +190,10 @@ def filter(event) elapsed = event["@timestamp"] - start_event["@timestamp"] if(@new_event_on_match) elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"]) + if @embed_inbetween_messages && @unique_id_fields != nil && @unique_id_fields.size > 0 + elapsed_event["between"] = @between_events.join(' ') + @between_events = [] + end filter_matched(elapsed_event) yield elapsed_event if block_given? else