Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions lib/logstash/filters/elapsed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "logstash/namespace"
require 'thread'
require 'socket'
require 'time'


# The elapsed filter tracks a pair of start/end events and uses their
Expand All @@ -15,6 +16,10 @@
# The filter has been developed to track the execution time of processes and
# other long tasks.
#
# The filter has been modified to allow specification of a timestamp to use
# in determining the time interval between two events, instead of relying
# on the timestamp logstash creates upon receiving and processing the event.
#
# The configuration looks like this:
# [source,ruby]
# filter {
Expand Down Expand Up @@ -85,6 +90,7 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base
PREFIX = "elapsed_"
ELAPSED_FIELD = PREFIX + "time"
TIMESTAMP_START_EVENT_FIELD = PREFIX + "timestamp_start"
TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD = PREFIX + "timestamp_start_ts"
HOST_FIELD = "host"

ELAPSED_TAG = "elapsed"
Expand All @@ -97,9 +103,15 @@ class LogStash::Filters::Elapsed < LogStash::Filters::Base
# The name of the tag identifying the "start event"
config :start_tag, :validate => :string, :required => true

# The name of the tag identifying the "start timestamp"
config :start_timestamp, :validate => :string, :required => true, :default => '@timestamp'

# The name of the tag identifying the "end event"
config :end_tag, :validate => :string, :required => true

# The name of the tag identifying the "end timestamp"
config :end_timestamp, :validate => :string, :required => true, :default => '@timestamp'

# The name of the field containing the task ID.
# This value must uniquely identify the task in the system, otherwise
# it's impossible to match the couple of events.
Expand Down Expand Up @@ -154,13 +166,13 @@ def filter(event)
if(@start_events.has_key?(unique_id))
start_event = @start_events.delete(unique_id).event
@mutex.unlock
elapsed = event["@timestamp"] - start_event["@timestamp"]
elapsed = Time.parse(event[@end_timestamp]) - Time.parse(start_event[@start_timestamp])
if(@new_event_on_match)
elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"])
elapsed_event = new_elapsed_event(elapsed, unique_id, start_event[@start_timestamp])
filter_matched(elapsed_event)
yield elapsed_event if block_given?
else
return add_elapsed_info(event, elapsed, unique_id, start_event["@timestamp"])
return add_elapsed_info(event, elapsed, unique_id, start_event[@start_timestamp])
end
else
@mutex.unlock
Expand Down Expand Up @@ -214,7 +226,8 @@ def create_expired_events_from(expired_elements)
error_event[HOST_FIELD] = Socket.gethostname
error_event[@unique_id_field] = element.event[@unique_id_field]
error_event[ELAPSED_FIELD] = element.age
error_event[TIMESTAMP_START_EVENT_FIELD] = element.event["@timestamp"]
error_event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(element.event[@start_timestamp])
error_event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(element.event[@start_timestamp]).strftime('%Q').to_i

events << error_event
filter_matched(error_event)
Expand All @@ -224,11 +237,11 @@ def create_expired_events_from(expired_elements)
end

def start_event?(event)
return (event["tags"] != nil && event["tags"].include?(@start_tag))
return (event["tags"] != nil && event["tags"].include?(@start_tag) && event[@start_timestamp] != nil)
end

def end_event?(event)
return (event["tags"] != nil && event["tags"].include?(@end_tag))
return (event["tags"] != nil && event["tags"].include?(@end_tag) && event[@end_timestamp] != nil)
end

def new_elapsed_event(elapsed_time, unique_id, timestamp_start_event)
Expand All @@ -243,7 +256,8 @@ def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event)

event[ELAPSED_FIELD] = elapsed_time
event[@unique_id_field] = unique_id
event[TIMESTAMP_START_EVENT_FIELD] = timestamp_start_event
event[TIMESTAMP_START_EVENT_FIELD] = Time.parse(timestamp_start_event)
event[TIMESTAMP_SINCE_EPOCH_START_EVENT_FIELD] = DateTime.parse(timestamp_start_event).strftime('%Q').to_i

return event
end
Expand Down
33 changes: 19 additions & 14 deletions spec/filters/elapsed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
require "logstash/filters/elapsed"
require "logstash/event"
require "socket"
require 'time'

describe LogStash::Filters::Elapsed do
START_TAG = "startTag"
START_TIMESTAMP = "start_timestamp"
END_TAG = "endTag"
END_TIMESTAMP = "end_timestamp"
ID_FIELD = "uniqueIdField"

def event(data)
Expand All @@ -17,12 +20,14 @@ def event(data)
def start_event(data)
data["tags"] ||= []
data["tags"] << START_TAG
data[START_TIMESTAMP] ||= "20150518-18:33:22.345"
event(data)
end

def end_event(data = {})
data["tags"] ||= []
data["tags"] << END_TAG
data[END_TIMESTAMP] ||= "20150518-18:33:22.347"
event(data)
end

Expand All @@ -31,7 +36,7 @@ def end_event(data = {})
end

def setup_filter(config = {})
@config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD}
@config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD, "start_timestamp" => START_TIMESTAMP, "end_timestamp" => END_TIMESTAMP}
@config.merge!(config)
@filter = LogStash::Filters::Elapsed.new(@config)
@filter.register
Expand All @@ -57,7 +62,7 @@ def setup_filter(config = {})
describe "receiving an event with a valid start tag" do
describe "but without an unique id field" do
it "does not record it" do
@filter.filter(event("tags" => ["tag1", START_TAG]))
@filter.filter(event("tags" => ["tag1", START_TAG], START_TIMESTAMP => "20150518-18:33:22.345"))
insist { @filter.start_events.size } == 0
end
end
Expand All @@ -75,7 +80,7 @@ def setup_filter(config = {})

describe "receiving two 'start events' for the same id field" do
it "keeps the first one and does not save the second one" do
args = {"tags" => [START_TAG], ID_FIELD => "id123"}
args = {"tags" => [START_TAG], ID_FIELD => "id123", START_TIMESTAMP => "20150518-18:33:22.345"}
first_event = event(args)
second_event = event(args)

Expand Down Expand Up @@ -157,12 +162,12 @@ def setup_filter(config = {})
insist { @match_event["tags"].include?("elapsed_match") } == true
end

it "contains an 'elapsed_time field' with the elapsed time" do
insist { @match_event["elapsed_time"] } == 10
it "contains an 'elapsed.time field' with the elapsed time" do
insist { @match_event["elapsed.time"] } == 0.002
end

it "contains an 'elapsed_timestamp_start field' with the timestamp of the 'start event'" do
insist { @match_event["elapsed_timestamp_start"] } == @start_event["@timestamp"]
it "contains an 'elapsed.timestamp_start field' with the timestamp of the 'start event'" do
insist { @match_event["elapsed.timestamp_start"] } == Time.parse(@start_event[START_TIMESTAMP])
end

it "contains an 'id field'" do
Expand All @@ -178,8 +183,8 @@ def setup_filter(config = {})
@start_event = start_event(ID_FIELD => @id_value)
@filter.filter(@start_event)

end_timestamp = @start_event["@timestamp"] + 10
end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp)
# end_timestamp = @start_event["@timestamp"] + 10
end_event = end_event(ID_FIELD => @id_value)
@filter.filter(end_event) do |new_event|
@match_event = new_event
end
Expand All @@ -196,8 +201,8 @@ def setup_filter(config = {})

context "if 'new_event_on_match' is set to 'false'" do
before(:each) do
end_timestamp = @start_event["@timestamp"] + 10
end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp)
# end_timestamp = @start_event["@timestamp"] + 10
end_event = end_event(ID_FIELD => @id_value)
@filter.filter(end_event)

@match_event = end_event
Expand Down Expand Up @@ -283,9 +288,9 @@ def ages()
insist { @expired_events[1]["elapsed_time"] } == 31
end

it "creates a new event containing an 'elapsed_timestamp_start field' with the timestamp of the expired 'start event'" do
insist { @expired_events[0]["elapsed_timestamp_start"] } == @start_event_2["@timestamp"]
insist { @expired_events[1]["elapsed_timestamp_start"] } == @start_event_3["@timestamp"]
it "creates a new event containing an 'elapsed.timestamp_start field' with the timestamp of the expired 'start event'" do
insist { @expired_events[0]["elapsed.timestamp_start"] } == Time.parse(@start_event_2[START_TIMESTAMP])
insist { @expired_events[1]["elapsed.timestamp_start"] } == Time.parse(@start_event_3[START_TIMESTAMP])
end

it "creates a new event containing a 'host field' for each expired 'start event'" do
Expand Down