From 43d90aa4da593bd8cf407c9a85879cc51067a321 Mon Sep 17 00:00:00 2001 From: Daniel Ruoso Date: Sat, 15 Feb 2025 16:39:28 -0500 Subject: [PATCH] add a wrapper for libuv --- CMakeLists.txt | 34 ++- .../interpretercollection.hpp | 8 +- .../interpretercollectionmanager.cpp | 10 +- src/networkprotocoldsl/interpreterrunner.cpp | 19 +- src/networkprotocoldsl/interpreterrunner.hpp | 3 +- .../operation/statemachineoperation.cpp | 6 +- .../support/notificationsignal.hpp | 29 ++- src/networkprotocoldsl_uv/asyncworkqueue.cpp | 62 +++++ src/networkprotocoldsl_uv/asyncworkqueue.hpp | 40 +++ .../libuvclientrunner.cpp | 201 +++++++++++++++ .../libuvclientrunner.hpp | 82 ++++++ .../libuvclientwrapper.cpp | 46 ++++ .../libuvclientwrapper.hpp | 51 ++++ .../libuvserverrunner.cpp | 236 ++++++++++++++++++ .../libuvserverrunner.hpp | 83 ++++++ .../libuvserverwrapper.cpp | 58 +++++ .../libuvserverwrapper.hpp | 57 +++++ tests/028-libuv-io-runner.cpp | 173 +++++++++++++ tests/CMakeLists.txt | 11 +- 19 files changed, 1180 insertions(+), 29 deletions(-) create mode 100644 src/networkprotocoldsl_uv/asyncworkqueue.cpp create mode 100644 src/networkprotocoldsl_uv/asyncworkqueue.hpp create mode 100644 src/networkprotocoldsl_uv/libuvclientrunner.cpp create mode 100644 src/networkprotocoldsl_uv/libuvclientrunner.hpp create mode 100644 src/networkprotocoldsl_uv/libuvclientwrapper.cpp create mode 100644 src/networkprotocoldsl_uv/libuvclientwrapper.hpp create mode 100644 src/networkprotocoldsl_uv/libuvserverrunner.cpp create mode 100644 src/networkprotocoldsl_uv/libuvserverrunner.hpp create mode 100644 src/networkprotocoldsl_uv/libuvserverwrapper.cpp create mode 100644 src/networkprotocoldsl_uv/libuvserverwrapper.hpp create mode 100644 tests/028-libuv-io-runner.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f56265b..9fe8435 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,16 @@ project( networkprotocoldsl CXX ) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) +# Detect libuv for the UV adapter target only. +find_path(LIBUV_INCLUDE_DIR NAMES uv.h) +find_library(LIBUV_LIBRARIES NAMES uv libuv) +if(NOT LIBUV_INCLUDE_DIR) + message(FATAL_ERROR "Libuv include directory not found") +endif() +if(NOT LIBUV_LIBRARIES) + message(FATAL_ERROR "Libuv libraries not found") +endif() + # # Enable sanitizers for Debug builds if(CMAKE_BUILD_TYPE MATCHES Debug) message(STATUS "Enabling Sanitizers") @@ -212,7 +222,7 @@ add_library( src/networkprotocoldsl/generate.hpp ) -target_link_libraries( networkprotocoldsl PUBLIC lexertl) +target_link_libraries( networkprotocoldsl PUBLIC lexertl ) target_include_directories( networkprotocoldsl @@ -220,12 +230,24 @@ target_include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/src/ ) -target_compile_options( - networkprotocoldsl - PUBLIC - -Wall -# -fconcepts-diagnostics-depth=10 +# Create a separate target for the libuv adapter so users are not forced to use it. +add_library( networkprotocoldsl_uv + src/networkprotocoldsl_uv/libuvserverrunner.cpp + src/networkprotocoldsl_uv/libuvserverrunner.hpp + src/networkprotocoldsl_uv/libuvserverwrapper.cpp # new file added + src/networkprotocoldsl_uv/libuvserverwrapper.hpp # new file added + src/networkprotocoldsl_uv/libuvclientrunner.cpp + src/networkprotocoldsl_uv/libuvclientrunner.hpp + src/networkprotocoldsl_uv/libuvclientwrapper.cpp # new file added + src/networkprotocoldsl_uv/libuvclientwrapper.hpp # new file added + src/networkprotocoldsl_uv/asyncworkqueue.cpp + src/networkprotocoldsl_uv/asyncworkqueue.hpp +) +target_include_directories( networkprotocoldsl_uv PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/src/ + ${LIBUV_INCLUDE_DIR} ) +target_link_libraries( networkprotocoldsl_uv PUBLIC networkprotocoldsl ${LIBUV_LIBRARIES} ) enable_testing() add_subdirectory(tests) diff --git a/src/networkprotocoldsl/interpretercollection.hpp b/src/networkprotocoldsl/interpretercollection.hpp index feb0ed7..6511f3e 100644 --- a/src/networkprotocoldsl/interpretercollection.hpp +++ b/src/networkprotocoldsl/interpretercollection.hpp @@ -15,10 +15,10 @@ struct InterpreterSignals { support::NotificationSignal wake_up_for_input; support::NotificationSignal wake_up_for_callback; InterpreterSignals() - : wake_up_interpreter(support::NotificationSignal()), - wake_up_for_output(support::NotificationSignal()), - wake_up_for_input(support::NotificationSignal()), - wake_up_for_callback(support::NotificationSignal()) {} + : wake_up_interpreter(support::NotificationSignal("interpreter")), + wake_up_for_output(support::NotificationSignal("output")), + wake_up_for_input(support::NotificationSignal("input")), + wake_up_for_callback(support::NotificationSignal("callback")) {} }; struct InterpreterCollection { diff --git a/src/networkprotocoldsl/interpretercollectionmanager.cpp b/src/networkprotocoldsl/interpretercollectionmanager.cpp index d29f835..2a4ea19 100644 --- a/src/networkprotocoldsl/interpretercollectionmanager.cpp +++ b/src/networkprotocoldsl/interpretercollectionmanager.cpp @@ -18,16 +18,22 @@ std::future InterpreterCollectionManager::insert_interpreter( std::shared_ptr ctx = std::make_shared(program.get_instance(arglist)); ctx->additional_data = additional_data; - _collection.do_transaction( [&fd, &ctx](std::shared_ptr current) -> std::shared_ptr { auto new_interpreters = current->interpreters; + auto old_interpreter_it = new_interpreters.find(fd); + if (old_interpreter_it != new_interpreters.end()) { + if (old_interpreter_it->second->exited.load()) { + new_interpreters.erase(fd); + } else { + throw std::runtime_error("Interpreter already exists for fd"); + } + } new_interpreters.insert({fd, ctx}); return std::make_shared( std::move(new_interpreters), current->signals); }); - _collection.current()->signals->wake_up_interpreter.notify(); return ctx->interpreter_result.get_future(); diff --git a/src/networkprotocoldsl/interpreterrunner.cpp b/src/networkprotocoldsl/interpreterrunner.cpp index d8387aa..06e59c2 100644 --- a/src/networkprotocoldsl/interpreterrunner.cpp +++ b/src/networkprotocoldsl/interpreterrunner.cpp @@ -7,8 +7,14 @@ #include #include +#include +#include #include +#define INTERPRETERRUNNER_DEBUG(x) +//#define INTERPRETERRUNNER_DEBUG(x) std::cerr << "InterpreterRunner[" << +//std::this_thread::get_id() << "] " << __func__ << ": " << x << std::endl + namespace networkprotocoldsl { namespace { @@ -142,7 +148,6 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) { break; case ContinuationState::Exited: context->exited.store(true); - collection->signals->wake_up_for_output.notify(); OperationResult r = context->interpreter.get_result(); if (std::holds_alternative(r)) { Value v = std::get(r); @@ -152,6 +157,10 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) { context->interpreter_result.set_exception( std::make_exception_ptr(InterpreterResultIsNotValue(r))); } + collection->signals->wake_up_for_output.notify(); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_for_callback.notify(); + collection->signals->wake_up_interpreter.notify(); break; }; } @@ -164,11 +173,15 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) { collection->signals->wake_up_for_output.notify(); break; } else { + INTERPRETERRUNNER_DEBUG("Waiting, no active interpreter"); collection->signals->wake_up_interpreter.wait(); + INTERPRETERRUNNER_DEBUG("Woken up..."); } } else { if (ready_interpreters == 0) { + INTERPRETERRUNNER_DEBUG("Waiting, no ready interpreter"); collection->signals->wake_up_interpreter.wait(); + INTERPRETERRUNNER_DEBUG("Woken up..."); } } } @@ -209,11 +222,15 @@ void InterpreterRunner::callback_loop(InterpreterCollectionManager &mgr) { collection->signals->wake_up_for_output.notify(); break; } else { + INTERPRETERRUNNER_DEBUG("Waiting, no active interpreter"); collection->signals->wake_up_for_callback.wait(); + INTERPRETERRUNNER_DEBUG("Woken up..."); } } else { if (callbacks_count == 0) { + INTERPRETERRUNNER_DEBUG("Waiting, no callbacks to process"); collection->signals->wake_up_for_callback.wait(); + INTERPRETERRUNNER_DEBUG("Woken up..."); } } } diff --git a/src/networkprotocoldsl/interpreterrunner.hpp b/src/networkprotocoldsl/interpreterrunner.hpp index 47de509..63289ef 100644 --- a/src/networkprotocoldsl/interpreterrunner.hpp +++ b/src/networkprotocoldsl/interpreterrunner.hpp @@ -10,7 +10,8 @@ namespace networkprotocoldsl { struct InterpreterRunner { using callback_function = Value (*)(const std::vector &); - std::unordered_map callbacks; + using callback_map = std::unordered_map; + callback_map callbacks; std::atomic exit_when_done = false; void interpreter_loop(InterpreterCollectionManager &mgr); void callback_loop(InterpreterCollectionManager &mgr); diff --git a/src/networkprotocoldsl/operation/statemachineoperation.cpp b/src/networkprotocoldsl/operation/statemachineoperation.cpp index 135801e..f5b838c 100644 --- a/src/networkprotocoldsl/operation/statemachineoperation.cpp +++ b/src/networkprotocoldsl/operation/statemachineoperation.cpp @@ -10,9 +10,9 @@ #include -//#define DEBUG(x) -#define DEBUG(x) \ - std::cerr << std::this_thread::get_id() << ": " << x << std::endl; +#define DEBUG(x) +//#define DEBUG(x) \ +// std::cerr << std::this_thread::get_id() << ": " << x << std::endl; namespace { diff --git a/src/networkprotocoldsl/support/notificationsignal.hpp b/src/networkprotocoldsl/support/notificationsignal.hpp index dc892b4..2766e21 100644 --- a/src/networkprotocoldsl/support/notificationsignal.hpp +++ b/src/networkprotocoldsl/support/notificationsignal.hpp @@ -5,33 +5,52 @@ #include #include +#include + +#define NOTIFICATIONSIGNAL_DEBUG(x) +//#define NOTIFICATIONSIGNAL_DEBUG(x) std::cerr << "NotificationSignal" << "[" +//<< name << "/" << std::this_thread::get_id() << "] " << __func__ << ": " << x +//<< std::endl + namespace networkprotocoldsl::support { class NotificationSignal { + std::string name; std::mutex mtx; std::condition_variable cv; - bool notified = false; + std::atomic notified = false; public: - NotificationSignal() : mtx(std::mutex()), cv(std::condition_variable()) {} + NotificationSignal(const std::string &n) + : name(n), mtx(std::mutex()), cv(std::condition_variable()) {} NotificationSignal(const NotificationSignal &in) = delete; NotificationSignal(NotificationSignal &&in) = delete; NotificationSignal &operator=(const NotificationSignal &) = delete; void notify() { { + NOTIFICATIONSIGNAL_DEBUG("before guard"); std::lock_guard lk(mtx); - notified = true; + NOTIFICATIONSIGNAL_DEBUG("after guard"); + notified.store(true); } + NOTIFICATIONSIGNAL_DEBUG("before notify"); cv.notify_all(); + NOTIFICATIONSIGNAL_DEBUG("after notify"); } void wait() { + NOTIFICATIONSIGNAL_DEBUG("before lock"); std::unique_lock lk(mtx); - if (!notified) { + NOTIFICATIONSIGNAL_DEBUG("after lock"); + if (!notified.load()) { + NOTIFICATIONSIGNAL_DEBUG("not notified, before wait"); cv.wait(lk); + NOTIFICATIONSIGNAL_DEBUG("not notified, after wait"); + } else { + NOTIFICATIONSIGNAL_DEBUG("no wait"); } - notified = false; + notified.store(false); } }; diff --git a/src/networkprotocoldsl_uv/asyncworkqueue.cpp b/src/networkprotocoldsl_uv/asyncworkqueue.cpp new file mode 100644 index 0000000..cc44dfc --- /dev/null +++ b/src/networkprotocoldsl_uv/asyncworkqueue.cpp @@ -0,0 +1,62 @@ +#include "asyncworkqueue.hpp" +#include + +namespace networkprotocoldsl_uv { + +static void async_work_callback(uv_async_t *handle) { + AsyncWorkQueue *queue = static_cast(handle->data); + queue->process(); +} + +// Callback invoked by uv_close during shutdown. +static void shutdown_close_cb(uv_handle_t *handle) { + AsyncWorkQueue *self = static_cast(handle->data); + self->shutdown_promise.set_value(); +} + +AsyncWorkQueue::AsyncWorkQueue(uv_loop_t *loop) { + uv_async_init(loop, &async_handle_, async_work_callback); + async_handle_.data = this; +} + +AsyncWorkQueue::~AsyncWorkQueue() { + // If shutdown wasn't invoked already, trigger it and wait. + if (!shutdown_called_) { + shutdown().wait(); + } +} + +void AsyncWorkQueue::push_work(std::function work) { + if (shutdown_called_.load()) { + throw std::runtime_error( + "Cannot push work after shutdown has been initiated"); + } + work_queue_.push_back(work); + uv_async_send(&async_handle_); +} + +void AsyncWorkQueue::process() { + while (true) { + auto work_opt = work_queue_.pop(); + if (!work_opt.has_value()) + break; + work_opt.value()(); + } +} + +uv_async_t *AsyncWorkQueue::get_async_handle() { return &async_handle_; } + +std::future AsyncWorkQueue::shutdown() { + if (!shutdown_called_.load()) { + // Submit a work item that calls uv_close with a callback that fulfills + // shutdown_promise_. + push_work([this]() { + uv_close(reinterpret_cast(&async_handle_), + shutdown_close_cb); + shutdown_called_.store(true); + }); + } + return shutdown_promise.get_future(); +} + +} // namespace networkprotocoldsl_uv diff --git a/src/networkprotocoldsl_uv/asyncworkqueue.hpp b/src/networkprotocoldsl_uv/asyncworkqueue.hpp new file mode 100644 index 0000000..7494f21 --- /dev/null +++ b/src/networkprotocoldsl_uv/asyncworkqueue.hpp @@ -0,0 +1,40 @@ +#ifndef NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP +#define NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP + +#include +#include +#include +#include +#include + +namespace networkprotocoldsl_uv { + +class AsyncWorkQueue { +public: + AsyncWorkQueue(uv_loop_t *loop); + ~AsyncWorkQueue(); + + // Enqueue a work item and signal the loop. + void push_work(std::function work); + + // Process all pending work items. + void process(); + + // Returns the uv_async_t handle. + uv_async_t *get_async_handle(); + + // Safe shutdown: returns a future that is fulfilled when shutdown completes. + std::future shutdown(); + std::promise shutdown_promise; + +private: + uv_async_t async_handle_; + networkprotocoldsl::support::MutexLockQueue> + work_queue_; + // Updated to use std::atomic_bool for safe concurrent access. + std::atomic_bool shutdown_called_{false}; +}; + +} // namespace networkprotocoldsl_uv + +#endif // NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP diff --git a/src/networkprotocoldsl_uv/libuvclientrunner.cpp b/src/networkprotocoldsl_uv/libuvclientrunner.cpp new file mode 100644 index 0000000..8eca5e1 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvclientrunner.cpp @@ -0,0 +1,201 @@ +#include "libuvclientrunner.hpp" +#include "asyncworkqueue.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace networkprotocoldsl_uv { +using namespace networkprotocoldsl; + +namespace { + +// Removed WriteCallbackContext. + +struct LibuvContext { + LibuvClientRunner *runner; + networkprotocoldsl::InterpreterCollectionManager *mgr; + InterpretedProgram program; + uv_loop_t *loop; + networkprotocoldsl_uv::AsyncWorkQueue *work_queue; // injected work queue. +}; + +struct UvConnectionData { + LibuvContext *context; + uv_tcp_t conn; + int fd; +}; + +void alloc_buffer_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) { + buf->base = static_cast(malloc(suggested)); + buf->len = suggested; +} + +// New unified close callback for client connections. +static void unified_close_cb(uv_handle_t *handle) { + UvConnectionData *data = static_cast(handle->data); + delete data->context; + delete data; +} + +// Updated on_read callback to push received data into input_buffer. +void on_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + UvConnectionData *conn_data = static_cast(stream->data); + // Retrieve the interpreter context from the collection. + auto collection = conn_data->context->mgr->get_collection(); + auto it = collection->interpreters.find(conn_data->fd); + if (it == collection->interpreters.end()) { + if (buf->base) + free(buf->base); + // interpreter was removed, so just unregister all callbacks. + uv_read_stop(stream); + // Use unified_close_cb here. + uv_close(reinterpret_cast(stream), unified_close_cb); + return; + } + if (nread > 0) { + std::string input_data(buf->base, nread); + // Push data to input_buffer. + it->second->input_buffer.push_back(input_data); + // Notify the interpreter that input is available. + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_interpreter.notify(); + } else if (nread < 0) { + // Handle error/connection close (optional: mark EOF). + it->second->eof.store(true); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_interpreter.notify(); + } + if (buf->base) + free(buf->base); +} + +// Updated free static write callback using UvConnectionData directly. +static void on_write_completed(uv_write_t *req, int status) { + auto *conn_data = static_cast(req->data); + auto collection = conn_data->context->mgr->get_collection(); + auto it = collection->interpreters.find(conn_data->fd); + if (it != collection->interpreters.end() && status < 0) { + it->second->eof.store(true); + collection->signals->wake_up_for_output.notify(); + collection->signals->wake_up_interpreter.notify(); + uv_read_stop(reinterpret_cast(&conn_data->conn)); + // Use unified_close_cb here. + uv_close(reinterpret_cast(&conn_data->conn), + unified_close_cb); + } + delete req; +} + +// Updated pusher thread loop to use UvConnectionData in the write callback. +void pusher_thread_loop(LibuvClientRunner *r, UvConnectionData *data) { + while (true) { + bool did_something = false; + auto collection = data->context->mgr->get_collection(); + // Exit if the interpreter was removed. + auto it = collection->interpreters.find(data->fd); + if (it == collection->interpreters.end() || it->second->exited.load()) { + data->context->work_queue->push_work([data]() { + uv_read_stop(reinterpret_cast(&data->conn)); + // Use unified_close_cb here. + uv_close(reinterpret_cast(&data->conn), + unified_close_cb); + }); + return; + } + // Pop outgoing data from the interpreter's output_buffer. + auto &interpreter_context = it->second; + auto output_opt = interpreter_context->output_buffer.pop(); + if (output_opt.has_value()) { + did_something = true; + std::string output = output_opt.value(); + // Enqueue a write task using UvConnectionData for the callback. + data->context->work_queue->push_work([data, output, collection]() { + uv_write_t *req = new uv_write_t; + req->data = data; // set UvConnectionData as the write callback data. + uv_buf_t wrbuf = + uv_buf_init(const_cast(output.data()), output.size()); + uv_write(req, reinterpret_cast(&data->conn), &wrbuf, 1, + on_write_completed); + }); + } + // If nothing was found, wait for wake_up_for_output notification. + if (!did_something) { + collection->signals->wake_up_for_output.wait(); + } + } +} + +static void on_connect_client_cb(uv_connect_t *client, int status) { + UvConnectionData *conn_data = + reinterpret_cast(client->data); + auto *runner = conn_data->context->runner; + if (status < 0) { + runner->connection_result.set_value(std::string(uv_strerror(status))); + delete client; + // Instead of manually deleting conn_data, close the handle so + // unified_close_cb deletes it. + uv_close(reinterpret_cast(&conn_data->conn), + unified_close_cb); + return; + } + uv_tcp_t *handle = reinterpret_cast(client->handle); + handle->data = conn_data; + uv_os_fd_t fd; + if (uv_fileno(reinterpret_cast(handle), &fd) == 0) { + conn_data->fd = fd; + runner->client_result = conn_data->context->mgr->insert_interpreter( + static_cast(fd), conn_data->context->program, std::nullopt, + conn_data); + runner->connection_result.set_value(static_cast(fd)); + uv_read_start(reinterpret_cast(handle), alloc_buffer_cb, + on_read_cb); + // Create pusher thread using pusher_thread_loop. + runner->pusher_thread.emplace( + std::thread(pusher_thread_loop, runner, conn_data)); + } else { + runner->connection_result.set_value( + "Connection did not provide a valid file descriptor."); + uv_close(reinterpret_cast(handle), unified_close_cb); + delete client; + return; + } + delete client; +} + +} // namespace + +LibuvClientRunner::LibuvClientRunner( + networkprotocoldsl::InterpreterCollectionManager &mgr, uv_loop_t *loop, + const std::string &ip, int port, const InterpretedProgram &program, + networkprotocoldsl_uv::AsyncWorkQueue &async_queue) { + struct sockaddr_in connect_addr; + uv_ip4_addr(ip.c_str(), port, &connect_addr); + UvConnectionData *data = new UvConnectionData(); + // Create context with injected async work queue. + auto *ctx = new LibuvContext{this, &mgr, program, loop, &async_queue}; + data->context = ctx; + // Schedule uv_tcp_init and uv_tcp_connect from the loop thread. + async_queue.push_work([=]() { + uv_tcp_init(loop, &data->conn); + uv_connect_t *connect_req = new uv_connect_t; + connect_req->data = data; + uv_tcp_connect(connect_req, &data->conn, + reinterpret_cast(&connect_addr), + on_connect_client_cb); + }); +} + +LibuvClientRunner::~LibuvClientRunner() { + // ...existing cleanup... + if (pusher_thread && pusher_thread->joinable()) { + pusher_thread->join(); + } + // ...other cleanup... +} + +} // namespace networkprotocoldsl_uv diff --git a/src/networkprotocoldsl_uv/libuvclientrunner.hpp b/src/networkprotocoldsl_uv/libuvclientrunner.hpp new file mode 100644 index 0000000..d9ebb41 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvclientrunner.hpp @@ -0,0 +1,82 @@ +#ifndef NETWORKPROTOCOLDSL_UV_LIBUVCLIENTRUNNER_HPP +#define NETWORKPROTOCOLDSL_UV_LIBUVCLIENTRUNNER_HPP + +#include +#include +#include + +#include + +#include + +#include +#include +#include + +namespace networkprotocoldsl_uv { + +class LibuvClientRunner { +public: + /** + * @brief Result of a connection attempt. + * + * If successful, the file descriptor is returned, which is also the + * key in the InterpreterCollectionManager. If unsuccessful, an error + * message is returned. + */ + using ConnectionResult = std::variant; + + /** + * @brief Connect to an IP address and port. + * + * @param mgr The InterpreterCollectionManager to use. + * @param loop The libuv loop to use. + * @param ip The IP address to connect to. + * @param port The port to connect to. + * @param program The program to use for interpreting incoming data. + * @param async_queue The async work queue to use. + * + * The constructor will start connecting immediately. The ConnectionResult + * will be set when the connection operation completes. + * + */ + LibuvClientRunner(networkprotocoldsl::InterpreterCollectionManager &mgr, + uv_loop_t *loop, const std::string &ip, int port, + const networkprotocoldsl::InterpretedProgram &program, + AsyncWorkQueue &async_queue); + + /** + * @brief Destructor. + */ + ~LibuvClientRunner(); + + /** + * @brief promise for the connection result. + * + * The promise will be set when the connection operation completes. + */ + std::promise connection_result; + + /** + * @brief future of the value for the client + * + * The future will be set when the client interpreter returns + */ + std::future client_result; + + /** + * @brief the thread which pushes data from the interpter to uv + * + * The thread will be created when the connection operation completes. + * And it will be joined when the destructor is called. + */ + std::optional pusher_thread; + + // disallow copy and assignment + LibuvClientRunner(const LibuvClientRunner &) = delete; + LibuvClientRunner &operator=(const LibuvClientRunner &) = delete; +}; + +} // namespace networkprotocoldsl_uv + +#endif // NETWORKPROTOCOLDSL_UV_LIBUVCLIENTRUNNER_HPP diff --git a/src/networkprotocoldsl_uv/libuvclientwrapper.cpp b/src/networkprotocoldsl_uv/libuvclientwrapper.cpp new file mode 100644 index 0000000..ed7a88e --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvclientwrapper.cpp @@ -0,0 +1,46 @@ +#include "libuvclientwrapper.hpp" +#include "asyncworkqueue.hpp" +#include "libuvclientrunner.hpp" +#include +#include +#include + +namespace networkprotocoldsl_uv { + +LibuvClientWrapper::LibuvClientWrapper( + const networkprotocoldsl::InterpretedProgram &program, + const networkprotocoldsl::InterpreterRunner::callback_map &callbacks, + AsyncWorkQueue &async_queue) + : runner_{networkprotocoldsl::InterpreterRunner{callbacks, false}}, + async_queue_{&async_queue}, loop_{uv_default_loop()}, program_{program} { + // ...existing initialization... +} + +LibuvClientWrapper::~LibuvClientWrapper() { + // ...existing cleanup... + if (interpreter_thread_.joinable()) + interpreter_thread_.join(); + if (callback_thread_.joinable()) + callback_thread_.join(); +} + +std::future +LibuvClientWrapper::start(const std::string &ip, int port) { + uv_client_runner_ = std::make_unique( + mgr_, loop_, ip, port, program_, *async_queue_); + interpreter_thread_ = + std::thread([this]() { runner_.interpreter_loop(mgr_); }); + callback_thread_ = std::thread([this]() { runner_.callback_loop(mgr_); }); + return uv_client_runner_->connection_result.get_future(); +} + +std::future &LibuvClientWrapper::result() { + runner_.exit_when_done.store(true); + mgr_.get_collection()->signals->wake_up_for_output.notify(); + mgr_.get_collection()->signals->wake_up_for_input.notify(); + mgr_.get_collection()->signals->wake_up_for_callback.notify(); + mgr_.get_collection()->signals->wake_up_interpreter.notify(); + return uv_client_runner_->client_result; +} + +} // namespace networkprotocoldsl_uv diff --git a/src/networkprotocoldsl_uv/libuvclientwrapper.hpp b/src/networkprotocoldsl_uv/libuvclientwrapper.hpp new file mode 100644 index 0000000..df46811 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvclientwrapper.hpp @@ -0,0 +1,51 @@ +#ifndef NETWORKPROTOCOLDSL_UV_LIBUVCLIENTWRAPPER_HPP +#define NETWORKPROTOCOLDSL_UV_LIBUVCLIENTWRAPPER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace networkprotocoldsl_uv { + +class LibuvClientWrapper { +public: + LibuvClientWrapper( + const networkprotocoldsl::InterpretedProgram &program, + const networkprotocoldsl::InterpreterRunner::callback_map &callbacks, + AsyncWorkQueue &async_queue); + ~LibuvClientWrapper(); + + // Start connecting; returns a future holding the connection result. + std::future start(const std::string &ip, + int port); + + // Returns a future for the interpreter's result. + std::future &result(); + + // ...no stop() needed as completion is indicated by result future. + +private: + networkprotocoldsl::InterpreterCollectionManager mgr_; + networkprotocoldsl::InterpreterRunner runner_; + std::unique_ptr uv_client_runner_; + AsyncWorkQueue *async_queue_; + uv_loop_t *loop_; + networkprotocoldsl::InterpretedProgram program_; + + std::thread interpreter_thread_; + std::thread callback_thread_; + + // Disable copy. + LibuvClientWrapper(const LibuvClientWrapper &) = delete; + LibuvClientWrapper &operator=(const LibuvClientWrapper &) = delete; +}; + +} // namespace networkprotocoldsl_uv + +#endif // NETWORKPROTOCOLDSL_UV_LIBUVCLIENTWRAPPER_HPP diff --git a/src/networkprotocoldsl_uv/libuvserverrunner.cpp b/src/networkprotocoldsl_uv/libuvserverrunner.cpp new file mode 100644 index 0000000..9db3526 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvserverrunner.cpp @@ -0,0 +1,236 @@ +#include "libuvserverrunner.hpp" +#include "asyncworkqueue.hpp" +#include +#include +#include +#include +#include +#include + +namespace networkprotocoldsl_uv { +using namespace networkprotocoldsl; + +struct LibuvServerRunnerImpl { + networkprotocoldsl::InterpreterCollectionManager *mgr_; + uv_loop_t *loop_; + uv_tcp_t server_; + InterpretedProgram program; + networkprotocoldsl_uv::AsyncWorkQueue *work_queue; + std::atomic exit_when_done{false}; + std::thread pusher_thread; + std::promise server_stopped; +}; + +namespace { + +struct UvConnectionData { + LibuvServerRunnerImpl *runner; + std::atomic connection_close_request_sent{false}; + uv_tcp_t conn; + int fd; +}; + +static void on_close_cb(uv_handle_t *handle) { + auto *conn_data = static_cast(handle->data); + auto collection = conn_data->runner->mgr_->get_collection(); + collection->signals->wake_up_for_callback.notify(); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_for_output.notify(); + collection->signals->wake_up_interpreter.notify(); + delete static_cast(handle->data); +} + +static void on_write_completed(uv_write_t *req, int status) { + auto *conn_data = static_cast(req->data); + auto collection = conn_data->runner->mgr_->get_collection(); + auto it = collection->interpreters.find(conn_data->fd); + if (it != collection->interpreters.end() && status < 0) { + it->second->eof.store(true); + collection->signals->wake_up_interpreter.notify(); + uv_read_stop(reinterpret_cast(&conn_data->conn)); + } + delete req; +} + +static void pusher_thread_loop_all(LibuvServerRunnerImpl *impl) { + while (true) { + auto collection = impl->mgr_->get_collection(); + int active_interpreters = 0; + int blocked_interpreters = 0; + for (const auto &entry : collection->interpreters) { + auto interpreter = entry.second; + // Each interpreter's additional_data holds its UvConnectionData. + UvConnectionData *conn_data = + static_cast(interpreter->additional_data); + if (interpreter->exited.load()) { + if (!conn_data->connection_close_request_sent.load()) { + conn_data->connection_close_request_sent.store(true); + impl->mgr_->remove_interpreter(conn_data->fd); + impl->work_queue->push_work([conn_data]() { + uv_close(reinterpret_cast(&conn_data->conn), + on_close_cb); + }); + } + continue; + } + active_interpreters++; + + auto output_opt = entry.second->output_buffer.pop(); + collection->signals->wake_up_interpreter.notify(); + if (output_opt.has_value()) { + blocked_interpreters++; + std::string output = output_opt.value(); + impl->work_queue->push_work([conn_data, output, collection]() { + uv_write_t *req = new uv_write_t; + req->data = conn_data; + uv_buf_t wrbuf = + uv_buf_init(const_cast(output.data()), output.size()); + uv_write(req, reinterpret_cast(&conn_data->conn), + &wrbuf, 1, on_write_completed); + }); + } + } + if (active_interpreters == 0) { + if (impl->exit_when_done.load()) { + impl->server_stopped.set_value(); + return; + } + } + if (blocked_interpreters == 0) { + collection->signals->wake_up_for_output.wait(); + } + } +} + +// New static free function to allocate sbuffer. +static void server_alloc_buffer_cb(uv_handle_t *handle, size_t suggested, + uv_buf_t *buf) { + buf->base = static_cast(malloc(suggested)); + buf->len = suggested; +} + +// New static free function to handle read events. +static void server_on_read_cb(uv_stream_t *stream, ssize_t nread, + const uv_buf_t *buf) { + UvConnectionData *data = static_cast(stream->data); + auto collection = data->runner->mgr_->get_collection(); + auto it = collection->interpreters.find(data->fd); + if (it == collection->interpreters.end()) { + if (buf->base) + free(buf->base); + uv_read_stop(stream); + uv_close(reinterpret_cast(stream), on_close_cb); + return; + } + if (nread > 0) { + std::string input(buf->base, nread); + it->second->input_buffer.push_back(input); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_interpreter.notify(); + } else if (nread < 0) { + it->second->eof.store(true); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_interpreter.notify(); + } + if (buf->base) + free(buf->base); +} + +// New connection callback for the server. +static void on_new_connection_cb(uv_stream_t *server, int status) { + if (status != 0) + return; + auto *handle_data = static_cast(server->data); + UvConnectionData *conn_data = + new UvConnectionData{handle_data->runner, false, uv_tcp_t(), -1}; + uv_tcp_init(conn_data->runner->loop_, &conn_data->conn); + if (uv_accept(server, reinterpret_cast(&conn_data->conn)) == + 0) { + uv_os_fd_t fd; + if (uv_fileno(reinterpret_cast(&conn_data->conn), &fd) == + 0) { + conn_data->conn.data = conn_data; + conn_data->fd = fd; + // Insert interpreter for this connection. + conn_data->runner->mgr_->insert_interpreter(static_cast(fd), + conn_data->runner->program, + std::nullopt, conn_data); + uv_read_start(reinterpret_cast(&conn_data->conn), + server_alloc_buffer_cb, server_on_read_cb); + auto collection = conn_data->runner->mgr_->get_collection(); + collection->signals->wake_up_for_input.notify(); + collection->signals->wake_up_interpreter.notify(); + collection->signals->wake_up_for_output.notify(); + collection->signals->wake_up_for_callback.notify(); + } else { + uv_close(reinterpret_cast(&conn_data->conn), on_close_cb); + } + } else { + uv_close(reinterpret_cast(&conn_data->conn), on_close_cb); + } +} + +} // namespace + +void LibuvServerRunner::stop_accepting() { + // Defer the uv_close call to the loop thread. + if (impl_->exit_when_done.load()) + return; + impl_->work_queue->push_work([this]() { + uv_close(reinterpret_cast(&impl_->server_), on_close_cb); + impl_->exit_when_done.store(true); + impl_->mgr_->get_collection()->signals->wake_up_for_output.notify(); + }); +} + +LibuvServerRunner::LibuvServerRunner( + networkprotocoldsl::InterpreterCollectionManager &mgr, uv_loop_t *loop, + const std::string &ip, int port, const InterpretedProgram &program, + networkprotocoldsl_uv::AsyncWorkQueue &async_queue) { // new parameter + bind_result = std::promise(); + impl_ = + new LibuvServerRunnerImpl{&mgr, loop, uv_tcp_t(), program, &async_queue}; + server_stopped = impl_->server_stopped.get_future(); + // Wrap uv_tcp_init call so it runs in the loop thread. + async_queue.push_work([this, loop, ip, port, program, &async_queue]() { + uv_tcp_init(loop, &impl_->server_); + // Create a new merged context. + // (Deferring the bind operation to the loop thread) + auto *ctx = new UvConnectionData{impl_, false, uv_tcp_t(), -1}; + ctx->runner = impl_; + impl_->server_.data = ctx; + + struct sockaddr_in bind_addr; + uv_ip4_addr(ip.c_str(), port, &bind_addr); + int rc = + uv_tcp_bind(&impl_->server_, + reinterpret_cast(&bind_addr), 0); + if (rc != 0) { + bind_result.set_value(std::string(uv_strerror(rc))); + return; + } + uv_os_fd_t fd; + if (uv_fileno(reinterpret_cast(&impl_->server_), &fd) == 0) { + bind_result.set_value(static_cast(fd)); + uv_listen(reinterpret_cast(&impl_->server_), 1024, + on_new_connection_cb); + } else { + bind_result.set_value( + "Failed to retrieve file descriptor after binding."); + } + // Spawn a single, global pusher thread. + ctx->runner->pusher_thread = + std::thread(pusher_thread_loop_all, ctx->runner); + }); +} + +// destructor +LibuvServerRunner::~LibuvServerRunner() { + stop_accepting(); + if (impl_->pusher_thread.joinable()) { + impl_->pusher_thread.join(); + } + delete impl_; +} + +} // namespace networkprotocoldsl_uv \ No newline at end of file diff --git a/src/networkprotocoldsl_uv/libuvserverrunner.hpp b/src/networkprotocoldsl_uv/libuvserverrunner.hpp new file mode 100644 index 0000000..e47125b --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvserverrunner.hpp @@ -0,0 +1,83 @@ +#ifndef NETWORKPROTOCOLDSL_UV_LIBUVSERVERRUNNER_HPP +#define NETWORKPROTOCOLDSL_UV_LIBUVSERVERRUNNER_HPP + +#include + +#include + +#include + +#include +#include +#include +#include +#include + +namespace networkprotocoldsl_uv { + +struct LibuvServerRunnerImpl; +class LibuvServerRunner { +public: + /** + * @brief Result of a bind attempt. + * + * If successful, the file descriptor is returned, which is also the + * key in the InterpreterCollectionManager. If unsuccessful, an error + * message is returned. + */ + using BindResult = std::variant; + + /** + * @brief Bind to an IP address and port, and start accepting connections. + * + * @param mgr The InterpreterCollectionManager to use. + * @param loop The libuv loop to use. + * @param ip The IP address to bind to. + * @param port The port to bind to. + * @param program The program to use for interpreting incoming data. + * @param async_queue The async work queue to use. + * + * The constructor will start accepting connections immediately. + * + * The BindResult will be set when the bind operation completes. + */ + LibuvServerRunner(networkprotocoldsl::InterpreterCollectionManager &mgr, + uv_loop_t *loop, const std::string &ip, int port, + const networkprotocoldsl::InterpretedProgram &program, + AsyncWorkQueue &async_queue); + + /** + * @brief Destructor. + */ + ~LibuvServerRunner(); + + /** + * @brief Stop accepting connections and close the server. + * + * This method will block until the server is closed. + */ + void stop_accepting(); + + /** + * @brief result of the bind + * + * This promise will be set when the bind operation completes. + */ + std::promise bind_result; + + /** + * @brief whether the server has completely stopped + */ + std::future server_stopped; + + // implementation details + LibuvServerRunnerImpl *impl_; + + // disallow copy and assignment + LibuvServerRunner(const LibuvServerRunner &) = delete; + LibuvServerRunner &operator=(const LibuvServerRunner &) = delete; +}; + +} // namespace networkprotocoldsl_uv + +#endif // NETWORKPROTOCOLDSL_UV_LIBUVSERVERRUNNER_HPP diff --git a/src/networkprotocoldsl_uv/libuvserverwrapper.cpp b/src/networkprotocoldsl_uv/libuvserverwrapper.cpp new file mode 100644 index 0000000..6d4d866 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvserverwrapper.cpp @@ -0,0 +1,58 @@ +#include +#include +#include + +#include +#include + +namespace networkprotocoldsl_uv { + +LibuvServerWrapper::LibuvServerWrapper( + const networkprotocoldsl::InterpretedProgram &program, + const networkprotocoldsl::InterpreterRunner::callback_map &callbacks, + AsyncWorkQueue &async_queue) + : runner_{networkprotocoldsl::InterpreterRunner{callbacks, false}}, + loop_{uv_default_loop()}, program_{program}, async_queue_{&async_queue} +// use the passed async queue. +{ + // Note: No creation of async work queue here. +} + +LibuvServerWrapper::~LibuvServerWrapper() { + stop(); + // ...existing cleanup... +} + +std::future +LibuvServerWrapper::start(const std::string &ip, int port) { + // Create the libuv server runner (bind happens asynchronously). + uv_server_runner_ = std::make_unique( + mgr_, loop_, ip, port, program_, *async_queue_); + // Launch interpreter loop thread. + interpreter_thread_ = + std::thread([this]() { runner_.interpreter_loop(mgr_); }); + // Launch callback loop thread. + callback_thread_ = std::thread([this]() { runner_.callback_loop(mgr_); }); + // Return the future associated with the bind result. + return uv_server_runner_->bind_result.get_future(); +} + +void LibuvServerWrapper::stop() { + if (uv_server_runner_) { + uv_server_runner_->stop_accepting(); + } + // Signal the interpreter to exit. + runner_.exit_when_done.store(true); + mgr_.get_collection()->signals->wake_up_for_output.notify(); + mgr_.get_collection()->signals->wake_up_for_input.notify(); + mgr_.get_collection()->signals->wake_up_for_callback.notify(); + mgr_.get_collection()->signals->wake_up_interpreter.notify(); + // Join threads. + if (interpreter_thread_.joinable()) + interpreter_thread_.join(); + if (callback_thread_.joinable()) + callback_thread_.join(); + uv_server_runner_->server_stopped.wait(); +} + +} // namespace networkprotocoldsl_uv diff --git a/src/networkprotocoldsl_uv/libuvserverwrapper.hpp b/src/networkprotocoldsl_uv/libuvserverwrapper.hpp new file mode 100644 index 0000000..eb8e913 --- /dev/null +++ b/src/networkprotocoldsl_uv/libuvserverwrapper.hpp @@ -0,0 +1,57 @@ +#ifndef NETWORKPROTOCOLDSL_UV_LIBUVSERVERWRAPPER_HPP +#define NETWORKPROTOCOLDSL_UV_LIBUVSERVERWRAPPER_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +// For brevity, assume networkprotocoldsl::Value is defined. +namespace networkprotocoldsl_uv { + +class LibuvServerWrapper { +public: + // Now receives program, callbacks, and an async work queue reference. + LibuvServerWrapper( + const networkprotocoldsl::InterpretedProgram &program, + const networkprotocoldsl::InterpreterRunner::callback_map &callbacks, + AsyncWorkQueue &async_queue); + ~LibuvServerWrapper(); + + // start now receives the ip and port to bind on and returns the bind result + // future. + std::future start(const std::string &ip, + int port); + + // Stop the server and join threads. + void stop(); + +private: + networkprotocoldsl::InterpreterCollectionManager mgr_; + networkprotocoldsl::InterpreterRunner runner_; + std::unique_ptr uv_server_runner_; + AsyncWorkQueue *async_queue_; // Changed: no longer owned internally. + uv_loop_t *loop_; + + // Save the program for use in start(). + networkprotocoldsl::InterpretedProgram program_; + + std::thread interpreter_thread_; + std::thread callback_thread_; + + // Disable copy. + LibuvServerWrapper(const LibuvServerWrapper &) = delete; + LibuvServerWrapper &operator=(const LibuvServerWrapper &) = delete; +}; + +} // namespace networkprotocoldsl_uv + +#endif // NETWORKPROTOCOLDSL_UV_LIBUVSERVERWRAPPER_HPP diff --git a/tests/028-libuv-io-runner.cpp b/tests/028-libuv-io-runner.cpp new file mode 100644 index 0000000..0963340 --- /dev/null +++ b/tests/028-libuv-io-runner.cpp @@ -0,0 +1,173 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUMBER_OF_CLIENTS_IN_TEST 5 + +using namespace networkprotocoldsl; +using namespace networkprotocoldsl_uv; + +value::Octets _o(const std::string &str) { + return value::Octets{std::make_shared(str)}; +} + +TEST(LibuvIORunnerTest, Complete) { + std::string test_file = + std::string(TEST_DATA_DIR) + "/023-source-code-http-client-server.txt"; + + // Create libuv loop and initialize the LibuvIORunner. + uv_loop_t *loop = uv_default_loop(); + // Create an async work queue for both client and server. + networkprotocoldsl_uv::AsyncWorkQueue async_queue(loop); + + std::thread io_thread([&]() { uv_run(loop, UV_RUN_DEFAULT); }); + + // Generate server program, manager and runner + auto maybe_server_program = InterpretedProgram::generate_server(test_file); + ASSERT_TRUE(maybe_server_program.has_value()); + auto server_program = maybe_server_program.value(); + + InterpreterRunner::callback_map server_callbacks = { + {"AwaitResponse", + [](const std::vector &args) -> Value { + value::Dictionary dict = std::get(args[0]); + std::cerr << "Member count: " << dict.members->size() << std::endl; + std::cerr << "Server received request: " + << std::get( + dict.members->at("request_target")) + .data.get() + << std::endl; + return value::DynamicList{ + {_o("HTTP Response"), + value::Dictionary{ + {{"response_code", 200}, + {"reason_phrase", _o("Looks good")}, + {"major_version", 1}, + {"minor_version", 1}, + {"headers", + value::DynamicList{ + {value::Dictionary{{{"key", _o("Some-Response")}, + {"value", _o("some value")}}}, + value::Dictionary{{{"key", _o("TestHeader")}, + {"value", _o("Value")}}}}}}}}}}; + }}, + {"Closed", + [](const std::vector &args) -> Value { + std::cerr << "Server Close" << std::endl; + return value::DynamicList{{_o("N/A"), args.at(0)}}; + }}, + }; + + // Create the server wrapper with the callback map. + LibuvServerWrapper server_wrapper(server_program, server_callbacks, + async_queue); + + // Wait for the bind result before proceeding. + std::future bind_future = + server_wrapper.start("127.0.0.1", 8080); + bind_future.wait(); + auto bind_result = bind_future.get(); + if (std::holds_alternative(bind_result)) { + std::cerr << "Bind failed: " << std::get(bind_result) + << std::endl; + } + ASSERT_EQ(std::holds_alternative(bind_result), true); + // the bind result is the file descriptor of the server + ASSERT_NE(std::get(bind_result), 0); + + // Generate client program. + auto maybe_client_program = InterpretedProgram::generate_client(test_file); + ASSERT_TRUE(maybe_client_program.has_value()); + auto client_program = maybe_client_program.value(); + + // Create client wrapper with a local callback map. + networkprotocoldsl::InterpreterRunner::callback_map client_callbacks = { + {"Open", + [](const std::vector &args) -> Value { + // ...existing callback code... + static int first_time = 0; + if (!first_time) { + std::cerr << "Client Open" << std::endl; + first_time = 1; + } else { + std::cerr << "Back to open!" << std::endl; + return value::DynamicList{ + {_o("Client Closes Connection"), value::Dictionary{}}}; + } + return value::DynamicList{ + {_o("HTTP Request"), + value::Dictionary{ + {{"verb", _o("GET")}, + {"request_target", _o("/test")}, + {"major_version", 1}, + {"minor_version", 1}, + {"headers", + value::DynamicList{ + {value::Dictionary{{{"key", _o("Accept")}, + {"value", _o("application/json")}}}, + value::Dictionary{{{"key", _o("TestHeader")}, + {"value", _o("Value")}}}}}}}}}}; + }}, + {"Closed", + [](const std::vector &args) -> Value { + std::cerr << "Client Close" << std::endl; + return value::DynamicList{{_o("N/A"), args.at(0)}}; + }}, + }; + + // Create the client wrapper. + networkprotocoldsl_uv::LibuvClientWrapper client_wrapper( + client_program, client_callbacks, async_queue); + + // Start connecting. + auto connection_future = client_wrapper.start("127.0.0.1", 8080); + connection_future.wait(); + auto connection_result_value = connection_future.get(); + if (std::holds_alternative(connection_result_value)) { + std::cerr << "Connection failed: " + << std::get(connection_result_value) << std::endl; + } + ASSERT_EQ(std::holds_alternative(connection_result_value), true); + int client_fd = std::get(connection_result_value); + ASSERT_NE(client_fd, 0); + + // Wait for the interpreter result. + auto &client_interpreter_result = client_wrapper.result(); + client_interpreter_result.wait(); + // Validate the result. + ASSERT_EQ(std::holds_alternative( + client_interpreter_result.get()), + true); + + // Stop the server. + server_wrapper.stop(); + + // de-register the async handle to stop the loop + async_queue.shutdown().wait(); + io_thread.join(); + + // ...existing assertions to validate responses... +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 29dfae6..f662e1e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,15 +10,11 @@ target_link_libraries( networkprotocoldsl ) -find_path(LIBUV_INCLUDE_DIR NAMES uv.h) -find_library(LIBUV_LIBRARIES NAMES uv libuv) -add_library( uv INTERFACE ) -target_include_directories( uv INTERFACE ${LIBUV_INCLUDE_DIR} ) -target_link_libraries( uv INTERFACE ${LIBUV_LIBRARIES} ) - include(CTest) enable_testing() +set(028-libuv-io-runner_EXTRA_LIBS networkprotocoldsl_uv) +set(014-using-with-libuv_EXTRA_LIBS uv) foreach( TEST 001-empty @@ -48,6 +44,7 @@ foreach( 025-transitionlookahead 026-statemachineoperator 027-translate-ast-to-optree + 028-libuv-io-runner ) add_executable(${TEST}.t ${TEST}.cpp) target_compile_definitions(${TEST}.t PRIVATE -DTEST_DATA_DIR="${CMAKE_CURRENT_SOURCE_DIR}/data") @@ -56,7 +53,7 @@ foreach( PUBLIC testlibs networkprotocoldsl - uv + ${${TEST}_EXTRA_LIBS} ${GTEST_BOTH_LIBRARIES} ) gtest_discover_tests(${TEST}.t