diff --git a/.github/workflows/container-image.yml b/.github/workflows/container-image.yml index 18985d65b8..61be0afb99 100644 --- a/.github/workflows/container-image.yml +++ b/.github/workflows/container-image.yml @@ -4,7 +4,6 @@ on: push: branches: - master - pull_request: {} release: types: - published diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index ac09815deb..386eb2a201 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -5,7 +5,6 @@ on: branches: - master - 'support/*' - pull_request: {} concurrency: group: linux-${{ github.event_name == 'push' && github.sha || github.ref }} diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 9eed8d3b11..bc347a36ac 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -28,32 +28,31 @@ class SeenStream : public ARS { public: template - SeenStream(Args&&... args) : ARS(std::forward(args)...) + explicit SeenStream(Args&&... args) : ARS(std::forward(args)...), m_Seen(nullptr) { - m_Seen.store(nullptr); } template auto async_read_some(Args&&... args) -> decltype(((ARS*)nullptr)->async_read_some(std::forward(args)...)) { { - auto seen (m_Seen.load()); + auto* seen (m_Seen.load()); if (seen) { - *seen = Utility::GetTime(); + *seen = std::chrono::steady_clock::now(); } } return ((ARS*)this)->async_read_some(std::forward(args)...); } - inline void SetSeen(double* seen) + void SetSeen(std::chrono::steady_clock::time_point* seen) { m_Seen.store(seen); } private: - std::atomic m_Seen; + std::atomic m_Seen; }; struct UnbufferedAsioTlsStreamParams diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 9b83c13632..4722bb3204 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -21,12 +21,17 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler); * cluster connection alive when there isn't much going on. */ +void JsonRpcConnection::SetHeartbeatInterval(std::chrono::milliseconds interval) +{ + m_HeartbeatInterval = interval; +} + void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc) { boost::system::error_code ec; for (;;) { - m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20)); + m_HeartbeatTimer.expires_after(m_HeartbeatInterval); m_HeartbeatTimer.async_wait(yc[ec]); if (m_ShuttingDown) { diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 0dab1ed5f8..5fc0432b95 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -38,7 +38,7 @@ JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const Stri JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), - m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), + m_Timestamp(Utility::GetTime()), m_Seen(std::chrono::steady_clock::now()), m_IoStrand(io), m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup), m_CheckLivenessTimer(io), m_HeartbeatTimer(io) { @@ -81,7 +81,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) break; } - m_Seen = Utility::GetTime(); + m_Seen = std::chrono::steady_clock::now(); if (m_Endpoint) { m_Endpoint->AddMessageReceived(jsonString.GetLength()); } @@ -236,6 +236,11 @@ void JsonRpcConnection::SendRawMessage(const String& message) }); } +void JsonRpcConnection::SetLivenessTimeout(std::chrono::milliseconds timeout) +{ + m_LivenessTimeout = timeout; +} + void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { if (m_ShuttingDown) { @@ -411,31 +416,53 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) * leaking the connection. Therefore close it after a timeout. */ - m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10)); + auto anonymousTimeout = m_LivenessTimeout / 6; + m_CheckLivenessTimer.expires_after(anonymousTimeout); m_CheckLivenessTimer.async_wait(yc[ec]); + if (ec) { + Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); + } if (m_ShuttingDown) { return; } - auto remote (m_Stream->lowest_layer().remote_endpoint()); + { + auto remote(m_Stream->lowest_layer().remote_endpoint()); - Log(LogInformation, "JsonRpcConnection") - << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds."; + auto msg = Log(LogInformation, "JsonRpcConnection"); + msg << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after "; + if (anonymousTimeout > 1s) { + msg << anonymousTimeout.count() / 1000 << " seconds."; + } else { + msg << anonymousTimeout.count() << " milliseconds"; + } + } Disconnect(); } else { for (;;) { - m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); + m_CheckLivenessTimer.expires_after(m_LivenessTimeout / 2); m_CheckLivenessTimer.async_wait(yc[ec]); + if (ec) { + Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); + } if (m_ShuttingDown) { break; } - if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { - Log(LogInformation, "JsonRpcConnection") - << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; + if (m_Seen + m_LivenessTimeout < std::chrono::steady_clock::now() && + (!m_Endpoint || !m_Endpoint->GetSyncing())) { + { + auto msg = Log(LogInformation, "JsonRpcConnection"); + msg << "No messages for identity '" << m_Identity << "' have been received in the last "; + if (m_LivenessTimeout > 1s) { + msg << m_LivenessTimeout.count() / 1000 << " seconds."; + } else { + msg << m_LivenessTimeout.count() << " milliseconds"; + } + } Disconnect(); break; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index df846527ae..84abf61c67 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -20,6 +20,8 @@ namespace icinga { +using namespace std::chrono_literals; + enum ClientRole { ClientInbound, @@ -61,6 +63,9 @@ class JsonRpcConnection final : public Object void SendMessage(const Dictionary::Ptr& request); void SendRawMessage(const String& request); + void SetLivenessTimeout(std::chrono::milliseconds timeout); + void SetHeartbeatInterval(std::chrono::milliseconds interval); + static Value HeartbeatAPIHandler(const intrusive_ptr& origin, const Dictionary::Ptr& params); static double GetWorkQueueRate(); @@ -74,14 +79,16 @@ class JsonRpcConnection final : public Object Shared::Ptr m_Stream; ConnectionRole m_Role; double m_Timestamp; - double m_Seen; + std::chrono::steady_clock::time_point m_Seen; boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioEvent m_OutgoingMessagesQueued; AsioEvent m_WriterDone; Atomic m_ShuttingDown; WaitGroup::Ptr m_WaitGroup; - boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; + std::chrono::milliseconds m_LivenessTimeout{60s}; + std::chrono::milliseconds m_HeartbeatInterval{20s}; + boost::asio::steady_timer m_CheckLivenessTimer, m_HeartbeatTimer; JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); @@ -95,8 +102,6 @@ class JsonRpcConnection final : public Object void MessageHandler(const Dictionary::Ptr& message); - void CertificateRequestResponseHandler(const Dictionary::Ptr& message); - void SendMessageInternal(const Dictionary::Ptr& request); }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c317d4ebe4..4161c5603c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -122,6 +122,7 @@ set(base_test_SOURCES remote-configpackageutility.cpp remote-httpserverconnection.cpp remote-httpmessage.cpp + remote-jsonrpcconnection.cpp remote-url.cpp ${base_OBJS} $ diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index 69c073b02a..1a424d50ba 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -10,6 +10,12 @@ #include #include +#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout)) +#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout)) + +#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout)) +#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout)) + namespace icinga { class TestLogger : public Logger @@ -52,6 +58,13 @@ class TestLogger : public Logger return ret; } + void Clear() + { + std::lock_guard lock(m_Mutex); + m_Expects.clear(); + m_LogEntries.clear(); + } + private: void ProcessLogEntry(const LogEntry& entry) override { @@ -88,8 +101,8 @@ struct TestLoggerFixture TestLoggerFixture() { testLogger->SetSeverity(testLogger->SeverityToString(LogDebug)); - testLogger->Activate(true); testLogger->SetActive(true); + testLogger->Activate(true); } ~TestLoggerFixture() diff --git a/test/remote-jsonrpcconnection.cpp b/test/remote-jsonrpcconnection.cpp new file mode 100644 index 0000000000..b3bee763ce --- /dev/null +++ b/test/remote-jsonrpcconnection.cpp @@ -0,0 +1,427 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#include + +#include "base/json.hpp" +#include "remote/apifunction.hpp" +#include "remote/jsonrpc.hpp" +#include "remote/jsonrpcconnection.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/base-tlsstream-fixture.hpp" +#include "test/icingaapplication-fixture.hpp" +#include "test/test-ctest.hpp" +#include "test/test-timedasserts.hpp" +#include + +using namespace icinga; + +class JsonRpcConnectionFixture : public TlsStreamFixture, public TestLoggerFixture +{ +public: + JsonRpcConnectionFixture() + { + ScriptGlobal::Set("NodeName", "server"); + ApiListener::Ptr listener = new ApiListener; + listener->OnConfigLoaded(); + } + + JsonRpcConnection::Ptr ConnectEndpoint( + const Shared::Ptr& stream, + const String& identity, + bool authenticated = true, + bool deferStart = false + ) + { + Zone::Ptr zone = new Zone; + zone->SetName(identity); + zone->Register(); + + Endpoint::Ptr endpoint = new Endpoint; + endpoint->SetName(identity); + endpoint->SetZoneName(identity); + endpoint->Register(); + + StoppableWaitGroup::Ptr wg = new StoppableWaitGroup; + JsonRpcConnection::Ptr conn = new JsonRpcConnection(wg, identity, authenticated, stream, RoleClient); + if (!deferStart) { + conn->Start(); + } + + endpoint->AddClient(conn); + m_Connections[conn] = std::move(wg); + + return conn; + } + + using ConnectionPair = std::pair; + + ConnectionPair ConnectEndpointPair( + const String& nameA, + const String& nameB, + bool authenticated = true, + bool deferStart = false + ) + { + auto aToB = ConnectEndpoint(client, nameB, authenticated, deferStart); + auto bToA = ConnectEndpoint(server, nameA, authenticated, deferStart); + return std::make_pair(std::move(aToB), std::move(bToA)); + } + + void JoinWaitgroup(const JsonRpcConnection::Ptr& conn) { m_Connections[conn]->Join(); } + +private: + std::map m_Connections; +}; + +class TestApiHandler +{ +public: + struct Message + { + MessageOrigin::Ptr origin; + Dictionary::Ptr params; + }; + + using TestFn = std::function; + + static void RegisterTestFn(std::string handle, TestFn fn) { m_TestFns[std::move(handle)] = std::move(fn); } + + static Value TestApiFunction(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) + { + auto it = m_TestFns.find(String(params->Get("test")).GetData()); + if (it != m_TestFns.end()) { + return it->second(Message{origin, params}); + } + return Empty; + } + +private: + static inline std::unordered_map m_TestFns; +}; + +REGISTER_APIFUNCTION(Test, test, &TestApiHandler::TestApiFunction); + +BOOST_FIXTURE_TEST_SUITE(remote_jsonrpcconnection, JsonRpcConnectionFixture, + *CTestProperties("FIXTURES_REQUIRED ssl_certs") + *boost::unit_test::label("cluster")) + +BOOST_AUTO_TEST_CASE(construction) +{ + auto test = ConnectEndpoint(server, "test"); + BOOST_REQUIRE_EQUAL(test->GetEndpoint()->GetName(), "test"); + BOOST_REQUIRE_EQUAL(test->GetIdentity(), "test"); + BOOST_REQUIRE_CLOSE(test->GetTimestamp(), Utility::GetTime(), 1); + BOOST_REQUIRE(test->IsAuthenticated()); +} + +BOOST_AUTO_TEST_CASE(send_message) +{ + auto [aToB, bToA] = ConnectEndpointPair("a", "b"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + std::promise msgPromise; + TestApiHandler::RegisterTestFn("test", [&](TestApiHandler::Message msg) { + msgPromise.set_value(std::move(msg)); + return Empty; + }); + + // Test sending a regular message from a->b + aToB->SendMessage(message); + + auto msgFuture = msgPromise.get_future(); + BOOST_REQUIRE(msgFuture.wait_for(1s) == std::future_status::ready); + TestApiHandler::Message msg; + BOOST_CHECK_NO_THROW(msg = msgFuture.get()); + BOOST_REQUIRE_EQUAL(msg.origin->FromClient, bToA); + BOOST_REQUIRE_EQUAL(msg.params->Get("test"), "test"); + + aToB->Disconnect(); + CHECK_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 1s); + + // Sending messages after disconnect should result in an exception + BOOST_REQUIRE_THROW(aToB->SendMessage(message), std::runtime_error); +} + +BOOST_AUTO_TEST_CASE(send_raw_message) +{ + auto [aToB, bToA] = ConnectEndpointPair("a", "b"); + + auto message = JsonEncode(new Dictionary{{ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "params", new Dictionary{{"test", "test"}} } + }}); + + std::promise msgPromise; + TestApiHandler::RegisterTestFn("test", [&](TestApiHandler::Message msg) { + msgPromise.set_value(std::move(msg)); + return Empty; + }); + + // Test sending a raw message from b->a + bToA->SendRawMessage(message); + + auto msgFuture = msgPromise.get_future(); + BOOST_REQUIRE(msgFuture.wait_for(1s) == std::future_status::ready); + TestApiHandler::Message msg; + BOOST_CHECK_NO_THROW(msg = msgFuture.get()); + BOOST_REQUIRE_EQUAL(msg.origin->FromClient, aToB); + BOOST_REQUIRE_EQUAL(msg.params->Get("test"), "test"); + + aToB->Disconnect(); + CHECK_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 1s); + + // Sending messages after disconnect should result in an exception + BOOST_REQUIRE_THROW(bToA->SendRawMessage(message), std::runtime_error); +} + +BOOST_AUTO_TEST_CASE(old_message) +{ + auto conn = ConnectEndpoint(server, "test"); + BOOST_REQUIRE_EQUAL(conn->GetEndpoint()->GetRemoteLogPosition(), Timestamp{}); + + auto ts = Utility::GetTime(); + Dictionary::Ptr message = new Dictionary{{ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "ts", ts }, + { "params", new Dictionary{{"test", "test"}} } + }}; + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Received 'test::Test' message from identity 'test'.", 1s); + BOOST_CHECK_EQUAL(conn->GetEndpoint()->GetRemoteLogPosition(), ts); + testLogger->Clear(); + + message->Set("ts", Timestamp{}); + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + CHECK_NO_LOG_MESSAGE("Received 'test::Test' message from identity 'test'.", 0s); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(message_result) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "test1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { return "Success"; }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + auto msg = JsonRpc::DecodeMessage(JsonRpc::ReadMessage(client)); + BOOST_CHECK_EQUAL(msg->Get("id"), "test1"); + BOOST_CHECK_EQUAL(msg->Get("result"), "Success"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(message_result_noparams) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "test1" } + }); + + std::atomic_bool handlerCalled = false; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + handlerCalled = true; + return Empty; + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Ensure the message has been processed. + REQUIRE_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + + // The handler must not have been called since our message doesn't have any parameters. + BOOST_REQUIRE(!handlerCalled); + + auto msg = JsonRpc::DecodeMessage(JsonRpc::ReadMessage(client)); + BOOST_REQUIRE_EQUAL(msg->Get("id"), "test1"); + BOOST_REQUIRE_EQUAL(msg->Get("result"), Empty); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(call_to_nonexistant_function) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::FooBar" }, + { "params", new Dictionary{} } + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::FooBar' message for identity 'test'.*", 1s); + CHECK_LOG_MESSAGE("Call to non-existent function.*", 0s); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(heartbeat_message) +{ + auto conn = ConnectEndpoint(server, "test", false, true); + conn->SetHeartbeatInterval(500ms); + conn->Start(); + + String msgString; + CHECK_DONE_BETWEEN(msgString = JsonRpc::ReadMessage(client), 500ms, 520ms); + + Dictionary::Ptr msg; + BOOST_REQUIRE_NO_THROW(msg = JsonRpc::DecodeMessage(msgString)); + BOOST_CHECK_EQUAL(msg->Get("method"), "event::Heartbeat"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(anonymous_disconnect) +{ + auto [a, b] = ConnectEndpointPair("a", "b", false, true); + BOOST_CHECK(!a->IsAuthenticated()); + BOOST_CHECK(!b->IsAuthenticated()); + + // Actual timeout we test here is this value divided by 6, so 50ms. + a->SetLivenessTimeout(300ms); + a->Start(); + b->SetLivenessTimeout(300ms); + b->Start(); + + CHECK_EDGE_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 45ms, 60ms); +} + +BOOST_AUTO_TEST_CASE(liveness_disconnect) +{ + auto [a, b] = ConnectEndpointPair("a", "b", true, true); + BOOST_CHECK(a->IsAuthenticated()); + BOOST_CHECK(b->IsAuthenticated()); + + a->SetLivenessTimeout(50ms); + a->Start(); + b->SetLivenessTimeout(50ms); + b->Start(); + + CHECK_EDGE_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 45ms, 60ms); +} + +/* TODO: This test currently completes successfully, but only because of a scheduling + * detail of when the coroutine is spawned vs. when WriteOutgoingMessages() resumes. + * Ideally at some point we would want to rethink consistency during shutdown, wait + * for ACKs to some important messages and use the waitgroup to wait until the reply + * comes in. So currently this is disabled, because we can't reliably assume that + * remaining messages will get sent, but also because it doesn't test functionality + * we're using in any meaningful way at the moment. + */ +BOOST_AUTO_TEST_CASE(wg_join_during_send, *boost::unit_test::disabled{}) +{ + auto conn = ConnectEndpoint(server, "test"); + BOOST_CHECK(conn->IsAuthenticated()); + + std::promise beforeWgJoinPromise; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + beforeWgJoinPromise.set_value(); + return "Response"; + }); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "wgTest1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Wait until the API-function is running, then join the WaitGroup. + BOOST_REQUIRE(beforeWgJoinPromise.get_future().wait_for(1s) == std::future_status::ready); + JoinWaitgroup(conn); + + // We still need to receive a response, even though the waitgroup is joined. + String messageRaw; + BOOST_REQUIRE_NO_THROW(messageRaw = JsonRpc::ReadMessage(client)); + BOOST_REQUIRE_NO_THROW(message = JsonRpc::DecodeMessage(messageRaw)); + BOOST_CHECK_EQUAL(message->Get("id"), "wgTest1"); + BOOST_CHECK_EQUAL(message->Get("result"), "Response"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(send_after_wg_join) +{ + auto conn = ConnectEndpoint(server, "test"); + + BOOST_CHECK(conn->IsAuthenticated()); + + std::atomic_bool handlerRan = false; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + handlerRan = true; + return Empty; + }); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "wgTest1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + // Join the wait-group even before sending the message. + JoinWaitgroup(conn); + + // The message should be received without error, but it should not be processed. + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Wait until the message has been processed. + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + + // Verify that the handler hasn't run. + BOOST_CHECK(!handlerRan); + + // Since the message has not been processed, JsonRpcConnection also shouldn't have sent anything. + BOOST_CHECK_EQUAL(Endpoint::GetByName("test")->GetBytesSentPerSecond(), 0); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/test-timedasserts.hpp b/test/test-timedasserts.hpp new file mode 100644 index 0000000000..743b3a6b18 --- /dev/null +++ b/test/test-timedasserts.hpp @@ -0,0 +1,134 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#pragma once + +#include +#include +#include +#include + +#define ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto pred = [&, this]() { return static_cast(condition); }; \ + BOOST_##level(AssertWithTimeout(pred, timeout, #condition)); \ + } while (0) + +#define REQUIRE_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, REQUIRE) +#define CHECK_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, CHECK) +#define WARN_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, WARN) + +#define ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto pred = [&, this]() { return static_cast(cond); }; \ + BOOST_##level(AssertEdgeWithinTimeout(pred, time1, time2, #cond)); \ + } while (0) + +#define REQUIRE_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, REQUIRE) +#define CHECK_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, CHECK) +#define WARN_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, WARN) + +#define ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto task = [&, this]() { expr; }; \ + BOOST_##level(AssertDoneWithin(task, time1, time2, #expr)); \ + } while (0) + +#define REQUIRE_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, REQUIRE) +#define CHECK_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, CHECK) +#define WARN_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, WARN) + +#define REQUIRE_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, REQUIRE) +#define CHECK_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, CHECK) +#define WARN_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, WARN) + +namespace icinga { + +using namespace std::chrono_literals; + +/** + * Assert that the predicate `fn` will switch from false to true in the given time window + * + * @param fn The predicate to check + * @param timeout The duration in which the predicate is expected to return true + * @param cond A string representing the condition for use in error messages + * + * @return a boost assertion result. + */ +static boost::test_tools::assertion_result AssertWithTimeout( + const std::function& fn, + const std::chrono::duration& timeout, + std::string_view cond +) +{ + std::size_t iterations = timeout / 1ms; + auto stepDur = timeout / iterations; + for (std::size_t i = 0; i < iterations && !fn(); i++) { + std::this_thread::sleep_for(stepDur); + } + boost::test_tools::assertion_result retVal{fn()}; + retVal.message() << "Condition (" << cond << ") not true within " << timeout.count() << "s"; + return retVal; +} + +/** + * Assert that the predicate `fn` will switch from false to true in the given time window + * + * @param fn The predicate to check + * @param falseUntil The duration for which the predicate is expected to be false + * @param trueWithin The duration in which the predicate is expected to return true + * @param cond A string representing the condition for use in error messages + * + * @return a boost assertion result. + */ +static boost::test_tools::assertion_result AssertEdgeWithinTimeout( + const std::function& fn, + const std::chrono::duration& falseUntil, + const std::chrono::duration& trueWithin, + std::string_view cond +) +{ + std::size_t iterations = falseUntil / 1ms; + auto stepDur = falseUntil / iterations; + for (std::size_t i = 0; i < iterations && !fn(); i++) { + std::this_thread::sleep_for(stepDur); + } + if (fn()) { + boost::test_tools::assertion_result retVal{false}; + retVal.message() << "Condition (" << cond << ") was true before " << falseUntil.count() << "s"; + return retVal; + } + return AssertWithTimeout(fn, trueWithin, cond); +} + +/** + * Assert that the given function takes a duration between lower and upper to complete. + * + * @param fn The function to execute + * @param lower the lower bound to compare the duration against + * @param upper the upper bound to compare the duration against + * + * @return a boost assertion result. + */ +template +static boost::test_tools::assertion_result AssertDoneWithin( + const std::function& fn, + const std::chrono::duration& lower, + const std::chrono::duration& upper, + std::string_view fnString +) +{ + auto start = std::chrono::steady_clock::now(); + fn(); + auto duration = std::chrono::steady_clock::now() - start; + boost::test_tools::assertion_result retVal{duration > lower && duration < upper}; + retVal.message() << fnString << " took " << std::chrono::duration(duration).count() << "s"; + return retVal; +} + +} // namespace icinga