Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ def reading_bytesize
end

class IOHandler
BYTES_TO_READ = 8192
BYTES_TO_READ = 64 * 1024
SHUTDOWN_TIMEOUT = 5

attr_accessor :shutdown_timeout
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def create_watcher
end

sub_test_case 'when limit is 5' do
test 'call receive_lines once when short line(less than 8192)' do
test 'call receive_lines once when short line(less than 65536)' do
text = "line\n" * 8
@file.write(text)
@file.close
Expand All @@ -120,8 +120,8 @@ def create_watcher
assert_equal 8, returned_lines[0].size
end

test 'call receive_lines some times when long line(more than 8192)' do
t = 'line' * (8192 / 8)
test 'call receive_lines some times when long line(more than 65536)' do
t = 'line' * (65536 / 8)
text = "#{t}\n" * 8
@file.write(text)
@file.close
Expand Down
48 changes: 28 additions & 20 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def test_emit_with_read_lines_limit(data)
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.

d.run(expect_emits: num_events, timeout: 2) do
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f|
Expand All @@ -518,15 +518,15 @@ def test_emit_with_read_lines_limit(data)
end

sub_test_case "reads_bytes_per_second w/o throttled" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2])
data("flat 65536 bytes, 2 events" => [:flat, 100, 65536, 2],
"flat 65536 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 65536, 2],
"flat #{65536*10} bytes, 20 events" => [:flat, 100, (65536 * 10), 20],
"flat #{65536*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (65536 * 10), 20],
"parse #{65536*4} bytes, 8 events" => [:parse, 100, (65536 * 4), 8],
"parse #{65536*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 4), 8],
"parse #{65536*10} bytes, 20 events" => [:parse, 100, (65536 * 10), 20],
"parse #{65536*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 10), 20],
"flat 64k bytes with unit, 2 events" => [:flat, 100, "64k", 2])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
Expand All @@ -540,7 +540,7 @@ def test_emit_with_read_bytes_limit_per_second(data)
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end

msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.
start_time = Fluent::Clock.now

d = create_driver(config)
Expand All @@ -553,8 +553,8 @@ def test_emit_with_read_bytes_limit_per_second(data)
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(Array.new(num_events) { {"message" => msg} },
d.events.collect { |event| event[2] })
assert_equal(num_events, d.events.size)
assert_true(d.events.all? { |event| {"message" => msg} == event[2]})
end

def test_read_bytes_limit_precede_read_lines_limit
Expand All @@ -564,7 +564,7 @@ def test_read_bytes_limit_precede_read_lines_limit
"read_lines_limit" => 1000,
"read_bytes_limit_per_second" => 8192
})
msg = 'abc'
msg = 'abc' * 8
start_time = Fluent::Clock.now
d = create_driver(config)
d.run(expect_emits: 2) do
Expand All @@ -576,14 +576,22 @@ def test_read_bytes_limit_precede_read_lines_limit
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(Array.new(4096) { {"message" => msg} },
d.events.collect { |event| event[2] })

# Line length: msg = 'abc' * 8 is 24 bytes, plus \n (1 byte) = 25 bytes/line.
# Buffer size: BYTES_TO_READ is now 65,536 bytes.
# In this test environment, before the shutdown gracefully halts the read loop, in_tail manages to execute exactly two readpartial calls.
# Total bytes read = 65,536 * 2 = 131,072 bytes.
# 131,072 bytes / 25 bytes per line = 5242 lines (with a remainder of 22 bytes).
expected_read_lines = (65536 * 2) / ('abc' * 8 + "\n").size
assert_equal(expected_read_lines, d.events.size)

assert_true(d.events.all? { |event| {"message" => msg} == event[2]})
end
end

sub_test_case "reads_bytes_per_second w/ throttled already" do
data("flat 8192 bytes" => [:flat, 100, 8192],
"parse 8192 bytes" => [:parse, 100, 8192])
data("flat 65536 bytes" => [:flat, 100, 65536],
"parse 65536 bytes" => [:parse, 100, 65536])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes = data
case config_style
Expand All @@ -593,7 +601,7 @@ def test_emit_with_read_bytes_limit_per_second(data)
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
msg = 'x' * 65000 # in_tail reads 65536 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
require 'fluent/config/types'
Expand Down Expand Up @@ -1238,7 +1246,7 @@ def test_encoding_for_regular_expression_parsing
assert_equal(true, events.length > 0)
assert_equal({"name" => "いうえ"}, events[0][2])
end

sub_test_case "multiline" do
data(flat: MULTILINE_CONFIG,
parse: PARSE_MULTILINE_CONFIG)
Expand Down
Loading