Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Gem::Specification.new do |gem|

gem.add_runtime_dependency("bundler")
gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 2.3.2", "< 3.0.0"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.9.0"])
Expand All @@ -42,6 +41,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("uri", '~> 1.0')
gem.add_runtime_dependency("net-http", '~> 0.8')
gem.add_runtime_dependency("async-http", "~> 0.86")
gem.add_runtime_dependency("json", '>= 2.20')

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
16 changes: 12 additions & 4 deletions lib/fluent/compat/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

require 'msgpack'
require 'json'
require 'yajl'

require 'fluent/engine'
require 'fluent/plugin'
Expand Down Expand Up @@ -77,10 +76,19 @@ def each_line(line)
end

class JSONParser < Parser
BYTES_TO_READ = 8192

def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
parser = JSON::ResumableParser.new({})
begin
while (chunk = io.readpartial(BYTES_TO_READ))
Comment thread
Watson1978 marked this conversation as resolved.
Outdated
parser << chunk
while parser.parse
@on_message.call(parser.value)
end
end
rescue EOFError
end
end
end

Expand Down
1 change: 0 additions & 1 deletion lib/fluent/config/literal_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
require 'stringio'

require 'json'
require 'yajl'
require 'socket'
require 'ripper'

Expand Down
1 change: 0 additions & 1 deletion lib/fluent/load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require 'stringio'
require 'fileutils'
require 'json'
require 'yajl'
require 'uri'
require 'msgpack'
require 'strptime'
Expand Down
16 changes: 9 additions & 7 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

require 'fluent/plugin/input'
require 'fluent/msgpack_factory'
require 'yajl'
require 'json'
require 'digest'
require 'securerandom'

Expand Down Expand Up @@ -248,13 +248,15 @@ def read_messages(conn, &block)
unless feeder
first = data[0]
if first == '{' || first == '[' # json
parser = Yajl::Parser.new
parser.on_parse_complete = ->(obj){
block.call(obj, bytes, serializer)
bytes = 0
}
parser = JSON::ResumableParser.new({})
serializer = :to_json.to_proc
feeder = ->(d){ parser << d }
feeder = ->(d){
parser << d
while parser.parse
block.call(parser.value, bytes, serializer)
bytes = 0
end
}
else # msgpack
parser = Fluent::MessagePackFactory.msgpack_unpacker
serializer = :to_msgpack.to_proc
Expand Down
8 changes: 5 additions & 3 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
require 'fluent/msgpack_factory'

require 'cool.io'
require 'yajl'
require 'json'
require 'fileutils'
require 'socket'

Expand Down Expand Up @@ -158,8 +158,7 @@ def on_read(data)
first = data[0]
if first == '{'.freeze || first == '['.freeze
m = method(:on_read_json)
@parser = Yajl::Parser.new
@parser.on_parse_complete = @on_message
@parser = JSON::ResumableParser.new({})
else
m = method(:on_read_msgpack)
@parser = Fluent::MessagePackFactory.msgpack_unpacker
Expand All @@ -173,6 +172,9 @@ def on_read(data)

def on_read_json(data)
@parser << data
while @parser.parse
@on_message.call(@parser.value)
end
rescue => e
@log.error "unexpected error in json payload", error: e.to_s
@log.error_backtrace
Expand Down
27 changes: 15 additions & 12 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
require 'fluent/time'
require 'fluent/oj_options'

require 'yajl'
require 'json'

module Fluent
Expand All @@ -28,12 +27,10 @@ class JSONParser < Parser

config_set_default :time_key, 'time'
desc 'Set JSON parser'
# NOTE: Contains yajl for backward compatibility
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj

# The Yajl library defines a default buffer size of 8KiB when parsing
# from IO streams, so maintain this for backwards-compatibility.
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
desc 'Set the buffer size that Yajl will use when parsing streaming input'
desc 'Set the buffer size that JSON parser will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192

config_set_default :time_type, :float
Expand All @@ -54,8 +51,8 @@ def configure_json_parser(name)

log&.info "Oj is not installed, and failing back to JSON for json parser"
configure_json_parser(:json)
when :json then [JSON.method(:parse), JSON::ParserError]
when :yajl then [Yajl.method(:load), Yajl::ParseError]
when :yajl, :json # NOTE: Fallback yajl to json for backward compatibility
[JSON.method(:parse), JSON::ParserError]
else
raise "BUG: unknown json parser specified: #{name}"
end
Expand Down Expand Up @@ -94,11 +91,17 @@ def parser_type
end

def parse_io(io, &block)
y = Yajl::Parser.new
y.on_parse_complete = ->(record){
block.call(parse_time(record), record)
}
y.parse(io, @stream_buffer_size)
parser = JSON::ResumableParser.new({})
begin
while (chunk = io.readpartial(@stream_buffer_size))
parser << chunk
while parser.parse
record = parser.value
block.call(parse_time(record), record)
end
end
rescue EOFError
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def setup
sub_test_case "configure_json_parser" do
data("oj", [:oj, [Oj.method(:load), Oj::ParseError]])
data("json", [:json, [JSON.method(:parse), JSON::ParserError]])
data("yajl", [:yajl, [Yajl.method(:load), Yajl::ParseError]])
data("yajl", [:yajl, [JSON.method(:parse), JSON::ParserError]])
def test_return_each_loader((input, expected_return))
result = @parser.instance.configure_json_parser(input)
assert_equal expected_return, result
Expand Down
8 changes: 1 addition & 7 deletions test/test_event_time.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require_relative 'helper'
require 'timecop'
require 'oj'
require 'yajl'
require 'json'

class EventTimeTest < Test::Unit::TestCase
setup do
Expand Down Expand Up @@ -70,12 +70,6 @@ class EventTimeTest < Test::Unit::TestCase
assert_equal('["tag",100,{"key":"value"}]', Oj.dump(["tag", time, {"key" => "value"}], mode: :compat))
end

test 'Yajl.dump' do
time = Fluent::EventTime.new(100)
assert_equal('{"time":100}', Yajl.dump({'time' => time}))
assert_equal('["tag",100,{"key":"value"}]', Yajl.dump(["tag", time, {"key" => "value"}]))
end

test '.from_time' do
sec = 1000
usec = 2
Expand Down
12 changes: 6 additions & 6 deletions test/test_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,28 @@ def test_format
configure({})
formatted = @formatter.format(tag, @time, record)

assert_equal("#{time2str(@time)}\t#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{time2str(@time)}\t#{tag}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time
configure('output_time' => 'false')
formatted = @formatter.format(tag, @time, record)

assert_equal("#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{tag}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_tag
configure('output_tag' => 'false')
formatted = @formatter.format(tag, @time, record)

assert_equal("#{time2str(@time)}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{time2str(@time)}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time_and_tag
configure('output_tag' => 'false', 'output_time' => 'false')
formatted = @formatter.format('tag', @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time_and_tag_against_string_literal_configure
Expand All @@ -100,7 +100,7 @@ def test_format_without_time_and_tag_against_string_literal_configure
])
formatted = @formatter.format('tag', @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end
end

Expand All @@ -122,7 +122,7 @@ def test_format(data)
@formatter.configure('json_parser' => data)
formatted = @formatter.format(tag, @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end
end

Expand Down
Loading