Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions lib/logstash/filters/elapsed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
# start_tag => "start event tag"
# end_tag => "end event tag"
# unique_id_field => "id field name"
# unique_id_fields => ["id field name", "second id field name"]
# timeout => seconds
# new_event_on_match => true/false
# embed_inbetween_messages => true/false
# }
# }
#
Expand All @@ -39,19 +41,20 @@
# [source,ruby]
# filter {
# grok {
# match => ["message", "%{TIMESTAMP_ISO8601} START id: (?<task_id>.*)"]
# match => ["message", "%{TIMESTAMP_ISO8601} START FROM (?<task_id_ip>%{IP:ip}) id: (?<task_id>.*)"]
# add_tag => [ "taskStarted" ]
# }
#
# grok {
# match => ["message", "%{TIMESTAMP_ISO8601} END id: (?<task_id>.*)"]
# match => ["message", "%{TIMESTAMP_ISO8601} END FROM (?<task_id_ip>%{IP:ip}) id: (?<task_id>.*)"]
# add_tag => [ "taskTerminated"]
# }
#
# elapsed {
# start_tag => "taskStarted"
# end_tag => "taskTerminated"
# unique_id_field => "task_id"
# unique_id_fields => ["task_id", "task_id_ip"]
# }
# }
#
Expand Down Expand Up @@ -89,7 +92,7 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base

ELAPSED_TAG = "elapsed"
EXPIRED_ERROR_TAG = PREFIX + "expired_error"
END_WITHOUT_START_TAG = PREFIX + "end_without_start"
END_WITHOUT_START_TAG = PREFIX + "end_wtihout_start"
MATCH_TAG = PREFIX + "match"

config_name "elapsed"
Expand All @@ -103,7 +106,11 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base
# The name of the field containing the task ID.
# This value must uniquely identify the task in the system, otherwise
# it's impossible to match the couple of events.
config :unique_id_field, :validate => :string, :required => true
config :unique_id_field, :validate => :string, :required => false

# The names of the fields containing the task IDs
# Just for the case one key is not unique or key is constructed in runtime
config :unique_id_fields, :validate => :array, :required => false

# The amount of seconds after an "end event" can be considered lost.
# The corresponding "start event" is discarded and an "expired event"
Expand All @@ -115,12 +122,20 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base
# to the "end event"; if it's set to `true` a new "match event" is created.
config :new_event_on_match, :validate => :boolean, :required => false, :default => false

# As useful information can be present between start_tag and end_tag, there
# is the option to collect all messages which where read after start_tag and push them
# into ['between'] tag
# Default is false
config :embed_inbetween_messages, :validate => :boolean, :required => false, :default => false


public
def register
@mutex = Mutex.new
# This is the state of the filter. The keys are the "unique_id_field",
# the values are couples of values: <start event, age>
@start_events = {}
@between_events = []

@logger.info("Elapsed, timeout: #{@timeout} seconds")
end
Expand All @@ -132,8 +147,26 @@ def start_events

def filter(event)
return unless filter?(event)

unique_id = ""

if @unique_id_fields != nil && @unique_id_fields.size > 0
@unique_id_field = ""
@unique_id_fields.each{ |el|
if event[el].nil?
if @embed_inbetween_messages
ev2 = LogStash::Filters::Elapsed::Element.new(event)
@between_events.push(ev2.event)
end
return
end
unique_id = unique_id + event[el]
@unique_id_field = @unique_id_field + el
}
else
unique_id = event[@unique_id_field]
end

unique_id = event[@unique_id_field]
return if unique_id.nil?

if(start_event?(event))
Expand All @@ -157,6 +190,10 @@ def filter(event)
elapsed = event["@timestamp"] - start_event["@timestamp"]
if(@new_event_on_match)
elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"])
if @embed_inbetween_messages && @unique_id_fields != nil && @unique_id_fields.size > 0
elapsed_event["between"] = @between_events.join(' ')
@between_events = []
end
filter_matched(elapsed_event)
yield elapsed_event if block_given?
else
Expand Down