Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ def reading_bytesize
end

class IOHandler
BYTES_TO_READ = 8192
BYTES_TO_READ = 64 * 1024
SHUTDOWN_TIMEOUT = 5

attr_accessor :shutdown_timeout
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def create_watcher
end

sub_test_case 'when limit is 5' do
test 'call receive_lines once when short line(less than 8192)' do
test 'call receive_lines once when short line(less than 65536)' do
text = "line\n" * 8
@file.write(text)
@file.close
Expand All @@ -120,8 +120,8 @@ def create_watcher
assert_equal 8, returned_lines[0].size
end

test 'call receive_lines some times when long line(more than 8192)' do
t = 'line' * (8192 / 8)
test 'call receive_lines some times when long line(more than 65536)' do
t = 'line' * (65536 / 8)
text = "#{t}\n" * 8
@file.write(text)
@file.close
Expand Down
40 changes: 20 additions & 20 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def test_emit_with_read_lines_limit(data)
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.

d.run(expect_emits: num_events, timeout: 2) do
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f|
Expand All @@ -518,15 +518,15 @@ def test_emit_with_read_lines_limit(data)
end

sub_test_case "reads_bytes_per_second w/o throttled" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2])
data("flat 65536 bytes, 2 events" => [:flat, 100, 65536, 2],
"flat 65536 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 65536, 2],
"flat #{65536*10} bytes, 20 events" => [:flat, 100, (65536 * 10), 20],
"flat #{65536*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (65536 * 10), 20],
"parse #{65536*4} bytes, 8 events" => [:parse, 100, (65536 * 4), 8],
"parse #{65536*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 4), 8],
"parse #{65536*10} bytes, 20 events" => [:parse, 100, (65536 * 10), 20],
"parse #{65536*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 10), 20],
"flat 64k bytes with unit, 2 events" => [:flat, 100, "64k", 2])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
Expand All @@ -540,7 +540,7 @@ def test_emit_with_read_bytes_limit_per_second(data)
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end

msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.
start_time = Fluent::Clock.now

d = create_driver(config)
Expand All @@ -553,8 +553,8 @@ def test_emit_with_read_bytes_limit_per_second(data)
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(Array.new(num_events) { {"message" => msg} },
d.events.collect { |event| event[2] })
assert_equal(num_events, d.events.size)
assert_true(d.events.all? { |event| {"message" => msg} == event[2]})
end

def test_read_bytes_limit_precede_read_lines_limit
Expand All @@ -564,7 +564,7 @@ def test_read_bytes_limit_precede_read_lines_limit
"read_lines_limit" => 1000,
"read_bytes_limit_per_second" => 8192
})
msg = 'abc'
msg = 'abc' * 8
start_time = Fluent::Clock.now
d = create_driver(config)
d.run(expect_emits: 2) do
Expand All @@ -576,14 +576,14 @@ def test_read_bytes_limit_precede_read_lines_limit
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(Array.new(4096) { {"message" => msg} },
d.events.collect { |event| event[2] })
assert_equal(5242, d.events.size)
Comment thread
kenhys marked this conversation as resolved.
Outdated
assert_true(d.events.all? { |event| {"message" => msg} == event[2]})
end
end

sub_test_case "reads_bytes_per_second w/ throttled already" do
data("flat 8192 bytes" => [:flat, 100, 8192],
"parse 8192 bytes" => [:parse, 100, 8192])
data("flat 65536 bytes" => [:flat, 100, 65536],
"parse 65536 bytes" => [:parse, 100, 65536])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes = data
case config_style
Expand All @@ -593,7 +593,7 @@ def test_emit_with_read_bytes_limit_per_second(data)
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
require 'fluent/config/types'
Expand Down Expand Up @@ -1238,7 +1238,7 @@ def test_encoding_for_regular_expression_parsing
assert_equal(true, events.length > 0)
assert_equal({"name" => "いうえ"}, events[0][2])
end

sub_test_case "multiline" do
data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
Expand Down
Loading