diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 12df389969..3d32d6a057 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1125,7 +1125,7 @@ def reading_bytesize end class IOHandler - BYTES_TO_READ = 8192 + BYTES_TO_READ = 64 * 1024 SHUTDOWN_TIMEOUT = 5 attr_accessor :shutdown_timeout diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index 5dd37dc4b8..65ae45b688 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -94,7 +94,7 @@ def create_watcher end sub_test_case 'when limit is 5' do - test 'call receive_lines once when short line(less than 8192)' do + test 'call receive_lines once when short line(less than 65536)' do text = "line\n" * 8 @file.write(text) @file.close @@ -120,8 +120,8 @@ def create_watcher assert_equal 8, returned_lines[0].size end - test 'call receive_lines some times when long line(more than 8192)' do - t = 'line' * (8192 / 8) + test 'call receive_lines some times when long line(more than 65536)' do + t = 'line' * (65536 / 8) text = "#{t}\n" * 8 @file.write(text) @file.close diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d0d973e646..3163abd1f6 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -496,7 +496,7 @@ def test_emit_with_read_lines_limit(data) config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit }) + PARSE_SINGLE_LINE_CONFIG end d = create_driver(config) - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + msg = 'x' * 65000 # in_tail reads 65536 bytes at once. d.run(expect_emits: num_events, timeout: 2) do Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| @@ -518,15 +518,15 @@ def test_emit_with_read_lines_limit(data) end sub_test_case "reads_bytes_per_second w/o throttled" do - data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], - "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], - "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], - "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], - "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], - "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], - "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2]) + data("flat 65536 bytes, 2 events" => [:flat, 100, 65536, 2], + "flat 65536 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 65536, 2], + "flat #{65536*10} bytes, 20 events" => [:flat, 100, (65536 * 10), 20], + "flat #{65536*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (65536 * 10), 20], + "parse #{65536*4} bytes, 8 events" => [:parse, 100, (65536 * 4), 8], + "parse #{65536*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 4), 8], + "parse #{65536*10} bytes, 20 events" => [:parse, 100, (65536 * 10), 20], + "parse #{65536*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (65536 * 10), 20], + "flat 64k bytes with unit, 2 events" => [:flat, 100, "64k", 2]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style @@ -540,7 +540,7 @@ def test_emit_with_read_bytes_limit_per_second(data) config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG end - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + msg = 'x' * 65000 # in_tail reads 65536 bytes at once. start_time = Fluent::Clock.now d = create_driver(config) @@ -553,8 +553,8 @@ def test_emit_with_read_bytes_limit_per_second(data) end assert_true(Fluent::Clock.now - start_time > 1) - assert_equal(Array.new(num_events) { {"message" => msg} }, - d.events.collect { |event| event[2] }) + assert_equal(num_events, d.events.size) + assert_true(d.events.all? { |event| {"message" => msg} == event[2]}) end def test_read_bytes_limit_precede_read_lines_limit @@ -564,7 +564,7 @@ def test_read_bytes_limit_precede_read_lines_limit "read_lines_limit" => 1000, "read_bytes_limit_per_second" => 8192 }) - msg = 'abc' + msg = 'abc' * 8 start_time = Fluent::Clock.now d = create_driver(config) d.run(expect_emits: 2) do @@ -576,14 +576,22 @@ def test_read_bytes_limit_precede_read_lines_limit end assert_true(Fluent::Clock.now - start_time > 1) - assert_equal(Array.new(4096) { {"message" => msg} }, - d.events.collect { |event| event[2] }) + + # Line length: msg = 'abc' * 8 is 24 bytes, plus \n (1 byte) = 25 bytes/line. + # Buffer size: BYTES_TO_READ is now 65,536 bytes. + # In this test environment, before the shutdown gracefully halts the read loop, in_tail manages to execute exactly two readpartial calls. + # Total bytes read = 65,536 * 2 = 131,072 bytes. + # 131,072 bytes / 25 bytes per line = 5242 lines (with a remainder of 22 bytes). + expected_read_lines = (65536 * 2) / ('abc' * 8 + "\n").size + assert_equal(expected_read_lines, d.events.size) + + assert_true(d.events.all? { |event| {"message" => msg} == event[2]}) end end sub_test_case "reads_bytes_per_second w/ throttled already" do - data("flat 8192 bytes" => [:flat, 100, 8192], - "parse 8192 bytes" => [:parse, 100, 8192]) + data("flat 65536 bytes" => [:flat, 100, 65536], + "parse 65536 bytes" => [:parse, 100, 65536]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes = data case config_style @@ -593,7 +601,7 @@ def test_emit_with_read_bytes_limit_per_second(data) config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG end d = create_driver(config) - msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + msg = 'x' * 65000 # in_tail reads 65536 bytes at once. mock.proxy(d.instance).io_handler(anything, anything) do |io_handler| require 'fluent/config/types' @@ -1238,7 +1246,7 @@ def test_encoding_for_regular_expression_parsing assert_equal(true, events.length > 0) assert_equal({"name" => "いうえ"}, events[0][2]) end - + sub_test_case "multiline" do data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG)