diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 2f53289..8b521f1 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -18,6 +18,8 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base config_name "redis" + default :codec, 'json' + # Name is used for logging in case there are multiple instances. # TODO: delete config :name, :validate => :string, :default => 'default', @@ -137,48 +139,39 @@ def register @host_idx = 0 @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } - end # def register - - def receive(event) - return unless output?(event) - - if @batch and @data_type == 'list' # Don't use batched method for pubsub. - # Stud::Buffer - buffer_receive(event.to_json, event.sprintf(@key)) - return - end - key = event.sprintf(@key) - # TODO(sissel): We really should not drop an event, but historically - # we have dropped events that fail to be converted to json. - # TODO(sissel): Find a way to continue passing events through even - # if they fail to convert properly. - begin - payload = event.to_json - rescue Encoding::UndefinedConversionError, ArgumentError - puts "FAILUREENCODING" - @logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?", - :event => event.inspect) - return - end + @codec.on_event do |event, data| - begin - @redis ||= connect - if @data_type == 'list' - congestion_check(key) - @redis.rpush(key, payload) + if @batch and @data_type == 'list' # Don't use batched method for pubsub. + # Stud::Buffer + buffer_receive(data, event.sprintf(@key)) else - @redis.publish(key, payload) + key = event.sprintf(@key) + + begin + @redis ||= connect + if @data_type == 'list' + congestion_check(key) + @redis.rpush(key, data) + else + @redis.publish(key, data) + end + rescue => e + @logger.warn("Failed to send event to Redis", :event => event, + :identity => identity, :exception => e, + :backtrace => e.backtrace) + sleep @reconnect_interval + @redis = nil + retry + end end - rescue => e - @logger.warn("Failed to send event to Redis", :event => event, - :identity => identity, :exception => e, - :backtrace => e.backtrace) - sleep @reconnect_interval - @redis = nil - retry end - end # def receive + end # def register + + def receive(event) + return unless output?(event) + @codec.encode(event) + end def congestion_check(key) return if @congestion_threshold == 0 diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index 92ad2c1..07a20e2 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' - s.version = '1.0.0' + s.version = '1.0.1' s.licenses = ['Apache License (2.0)'] s.summary = "This output will send events to a Redis queue using RPUSH" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"