From 7a750625f178acca59372e657a920033c59eb730 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Mon, 20 Apr 2026 10:27:08 +0900 Subject: [PATCH] test_in_monitor_agent: ensure to shutdown plugin instances (#5323) **Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: Fix thread and resource leaks in `test_in_monitor_agent.rb` by ensuring proper shutdown sequences for plugin instances. Currently, the test cases for `in_monitor_agent` start plugin instances but do not properly shut them down at the end. Because this plugin utilizes helpers (such as `http_server` and `timer`), bypassing the proper `stop`/`shutdown` lifecycle leaves their background threads running indefinitely. As a result, executing the test file leaves many orphaned threads, which could lead to test hangs or resource exhaustion in CI environments. ``` $ ruby -Ilib:test -e "at_exit { puts '--- Thread count at exit: ' + Thread.list.size.to_s; pp Thread.list }; require './test/plugin/test_in_monitor_agent.rb'" ... ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ 24 tests, 42 assertions, 0 failures, 0 errors, 0 pendings, 0 omissions, 0 notifications 100% passed ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ 5.09 tests/s, 8.90 assertions/s --- Thread count at exit: 34 [#, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #, #] ``` **Docs Changes**: N/A **Release Note**: N/A Signed-off-by: Shizuo Fujita Signed-off-by: github-actions[bot] --- test/plugin/test_in_monitor_agent.rb | 33 ++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index dd4c9dfc5e..86d00b0e79 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -409,7 +409,6 @@ def test_enable_input_metrics(with_config) tag monitor emit_interval 1 ") - d.instance.start d.end_if do d.events.size >= 5 end @@ -539,6 +538,8 @@ def get(uri, header = {}) test_filter = response.split("\n")[3] assert_equal(expected_test_in_response, test_in) assert_equal(expected_test_filter_response, test_filter) + + d.instance_shutdown end data(:include_config_and_retry_yes => [true, true, "include_config yes", "include_retry yes"], @@ -593,6 +594,8 @@ def get(uri, header = {}) null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_equal(expected_null_response, null_response) + + d.instance_shutdown end test "/api/plugins.json/not_found" do @@ -607,6 +610,8 @@ def get(uri, header = {}) assert_equal('404', resp.code) body = JSON.parse(resp.body) assert_equal(body['message'], 'Not found') + + d.instance_shutdown end data(:with_config_and_retry_yes => [true, true, "?with_config=yes&with_retry"], @@ -659,6 +664,8 @@ def get(uri, header = {}) null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_include(expected_null_response, null_response) + + d.instance_shutdown end test "/api/plugins.json with 'with_ivars'. response contains specified instance variables of each plugin" do @@ -708,6 +715,8 @@ def get(uri, header = {}) null_response = response["plugins"][5] assert_equal(expected_test_in_response, test_in_response) assert_fuzzy_equal(expected_null_response, null_response) + + d.instance_shutdown end test "/api/config" do @@ -721,6 +730,8 @@ def get(uri, header = {}) expected_response_regex = %r{pid:\d+\tppid:\d+\tversion:#{Fluent::VERSION}\tconfig_path:#{@filepath}\tpid_file:\tplugin_dirs:/etc/fluent/plugin\tlog_path:} assert_match(expected_response_regex, get("http://127.0.0.1:#{@port}/api/config").body) + + d.instance_shutdown end test "/api/config.json" do @@ -737,6 +748,8 @@ def get(uri, header = {}) assert_equal(["/etc/fluent/plugin"], res["plugin_dirs"]) assert_nil(res["log_path"]) assert_equal(Fluent::VERSION, res["version"]) + + d.instance_shutdown end test "/api/config.json?debug=1" do @@ -750,6 +763,8 @@ def get(uri, header = {}) # To check pretty print assert_true !get("http://127.0.0.1:#{@port}/api/config.json").body.include?("\n") assert_true get("http://127.0.0.1:#{@port}/api/config.json?debug=1").body.include?("\n") + + d.instance_shutdown end test "/api/config.json/not_found" do @@ -764,6 +779,8 @@ def get(uri, header = {}) assert_equal('404', resp.code) body = JSON.parse(resp.body) assert_equal(body['message'], 'Not found') + + d.instance_shutdown end end @@ -855,6 +872,11 @@ def write(chunk) # it's very hard to check exact retry count (because retries are called by output flush thread scheduling) assert{ response_retry_count >= 1 && response_retry["steps"] >= 0 } assert{ response_retry_count == response_retry["steps"] + 1 } + + output.before_shutdown + output.shutdown + output.after_shutdown + d.instance_shutdown end end @@ -868,12 +890,15 @@ def write(chunk) ") d.instance.start assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) + + d.instance_shutdown end test "worker_id = 2 on multi worker environment" do port = unused_port(protocol: :tcp) + driver = nil Fluent::SystemConfig.overwrite_system_config('workers' => 4) do - d = Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput) + driver = d = Fluent::Test::Driver::Input.new(Fluent::Plugin::MonitorAgentInput) d.instance.instance_eval{ @_fluentd_worker_id = 2 } d.configure(" @type monitor_agent @@ -883,6 +908,8 @@ def write(chunk) d.instance.start end assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins").code) + + driver.instance_shutdown end end @@ -937,6 +964,8 @@ def filter(tag, time, record) assert_equal("200", get("http://127.0.0.1:#{port}/api/plugins.json").code) assert{ d.logs.none?{|log| log.include?("NoMethodError") } } assert_equal(false, d.instance.instance_variable_get(:@first_warn)) + + d.instance_shutdown end end end