Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fluent-plugin-statsd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Gem::Specification.new do |gem|
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
gem.require_paths = ['lib']

gem.add_dependency "fluentd", ">= 0.10.8"
gem.add_dependency "fluentd", [">= 0.14.15", "< 2"]

gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.2.0"
Expand Down
23 changes: 20 additions & 3 deletions lib/fluent/plugin/out_statsd.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
require 'statsd-ruby'
require 'fluent/output'
require 'fluent/plugin/output'

module Fluent
class StatsdOutput < BufferedOutput
module Fluent::Plugin
class StatsdOutput < Output
Fluent::Plugin.register_output('statsd', self)

DEFAULT_BUFFER_TYPE = "memory"

helpers :compat_parameters

config_param :flush_interval, :time, :default => 1
config_param :host, :string, :default => 'localhost'
config_param :port, :string, :default => '8125'

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
end

attr_reader :statsd

def initialize
super
end

def configure(conf)
compat_parameters_convert(conf, :buffer)
super
@statsd = Statsd.new(host, port)
end
Expand All @@ -32,6 +41,14 @@ def format(tag, time, record)
record.to_msgpack
end

def multi_workers_ready?
true
end

def formatted_to_msgpack_binary?
true
end

def write(chunk)
chunk.msgpack_each {|record|
if statsd_type = record['statsd_type']
Expand Down
22 changes: 13 additions & 9 deletions test/plugin/out_statsd.rb
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
require 'fluent/plugin/out_statsd'
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
require 'statsd-ruby'

class StatsdOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
super
Fluent::Test.setup
@now = Time.now
@now = event_time
end

def treedown
def teardown
end

CONFIG = %[
type statsd
]

def create_driver(conf = CONFIG)
Fluent::Test::BufferedOutputTestDriver.new(Fluent::StatsdOutput) {
Fluent::Test::Driver::Output.new(Fluent::Plugin::StatsdOutput) {
}.configure(conf)
end

def test_write
d = create_driver
time = Time.at(@now.to_i).utc
d.emit({ :stastd_type => 'timing', :statsd_key => 'test.statsd.t', :statsd_timing => 100 }, time)
d.emit({ :stastd_type => 'guage', :statsd_key => 'test.statsd.g', :statsd_gauge => 102 }, time)
d.emit({ :stastd_type => 'increment', :statsd_key => 'test.statsd.i'}, time)

d.run
time = @now
d.run(default_tag: 'test') do
d.feed(time, { :stastd_type => 'timing', :statsd_key => 'test.statsd.t', :statsd_timing => 100 })
d.feed(time, { :stastd_type => 'guage', :statsd_key => 'test.statsd.g', :statsd_gauge => 102 })
d.feed(time, { :stastd_type => 'increment', :statsd_key => 'test.statsd.i'})
end
end
end