Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ project( networkprotocoldsl CXX )
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Detect libuv for the UV adapter target only.
find_path(LIBUV_INCLUDE_DIR NAMES uv.h)
find_library(LIBUV_LIBRARIES NAMES uv libuv)
if(NOT LIBUV_INCLUDE_DIR)
message(FATAL_ERROR "Libuv include directory not found")
endif()
if(NOT LIBUV_LIBRARIES)
message(FATAL_ERROR "Libuv libraries not found")
endif()

# # Enable sanitizers for Debug builds
if(CMAKE_BUILD_TYPE MATCHES Debug)
message(STATUS "Enabling Sanitizers")
Expand Down Expand Up @@ -212,20 +222,32 @@ add_library(
src/networkprotocoldsl/generate.hpp
)

target_link_libraries( networkprotocoldsl PUBLIC lexertl)
target_link_libraries( networkprotocoldsl PUBLIC lexertl )

target_include_directories(
networkprotocoldsl
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/src/
)

target_compile_options(
networkprotocoldsl
PUBLIC
-Wall
# -fconcepts-diagnostics-depth=10
# Create a separate target for the libuv adapter so users are not forced to use it.
add_library( networkprotocoldsl_uv
src/networkprotocoldsl_uv/libuvserverrunner.cpp
src/networkprotocoldsl_uv/libuvserverrunner.hpp
src/networkprotocoldsl_uv/libuvserverwrapper.cpp # new file added
src/networkprotocoldsl_uv/libuvserverwrapper.hpp # new file added
src/networkprotocoldsl_uv/libuvclientrunner.cpp
src/networkprotocoldsl_uv/libuvclientrunner.hpp
src/networkprotocoldsl_uv/libuvclientwrapper.cpp # new file added
src/networkprotocoldsl_uv/libuvclientwrapper.hpp # new file added
src/networkprotocoldsl_uv/asyncworkqueue.cpp
src/networkprotocoldsl_uv/asyncworkqueue.hpp
)
target_include_directories( networkprotocoldsl_uv PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/src/
${LIBUV_INCLUDE_DIR}
)
target_link_libraries( networkprotocoldsl_uv PUBLIC networkprotocoldsl ${LIBUV_LIBRARIES} )

enable_testing()
add_subdirectory(tests)
8 changes: 4 additions & 4 deletions src/networkprotocoldsl/interpretercollection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ struct InterpreterSignals {
support::NotificationSignal wake_up_for_input;
support::NotificationSignal wake_up_for_callback;
InterpreterSignals()
: wake_up_interpreter(support::NotificationSignal()),
wake_up_for_output(support::NotificationSignal()),
wake_up_for_input(support::NotificationSignal()),
wake_up_for_callback(support::NotificationSignal()) {}
: wake_up_interpreter(support::NotificationSignal("interpreter")),
wake_up_for_output(support::NotificationSignal("output")),
wake_up_for_input(support::NotificationSignal("input")),
wake_up_for_callback(support::NotificationSignal("callback")) {}
};

struct InterpreterCollection {
Expand Down
10 changes: 8 additions & 2 deletions src/networkprotocoldsl/interpretercollectionmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ std::future<Value> InterpreterCollectionManager::insert_interpreter(
std::shared_ptr<InterpreterContext> ctx =
std::make_shared<InterpreterContext>(program.get_instance(arglist));
ctx->additional_data = additional_data;

_collection.do_transaction(
[&fd, &ctx](std::shared_ptr<const InterpreterCollection> current)
-> std::shared_ptr<const InterpreterCollection> {
auto new_interpreters = current->interpreters;
auto old_interpreter_it = new_interpreters.find(fd);
if (old_interpreter_it != new_interpreters.end()) {
if (old_interpreter_it->second->exited.load()) {
new_interpreters.erase(fd);
} else {
throw std::runtime_error("Interpreter already exists for fd");
}
}
new_interpreters.insert({fd, ctx});
return std::make_shared<InterpreterCollection>(
std::move(new_interpreters), current->signals);
});

_collection.current()->signals->wake_up_interpreter.notify();

return ctx->interpreter_result.get_future();
Expand Down
19 changes: 18 additions & 1 deletion src/networkprotocoldsl/interpreterrunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
#include <networkprotocoldsl/operationconcepts.hpp>

#include <exception>
#include <iostream>
#include <thread>
#include <variant>

#define INTERPRETERRUNNER_DEBUG(x)
//#define INTERPRETERRUNNER_DEBUG(x) std::cerr << "InterpreterRunner[" <<
//std::this_thread::get_id() << "] " << __func__ << ": " << x << std::endl

namespace networkprotocoldsl {

namespace {
Expand Down Expand Up @@ -142,7 +148,6 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) {
break;
case ContinuationState::Exited:
context->exited.store(true);
collection->signals->wake_up_for_output.notify();
OperationResult r = context->interpreter.get_result();
if (std::holds_alternative<Value>(r)) {
Value v = std::get<Value>(r);
Expand All @@ -152,6 +157,10 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) {
context->interpreter_result.set_exception(
std::make_exception_ptr(InterpreterResultIsNotValue(r)));
}
collection->signals->wake_up_for_output.notify();
collection->signals->wake_up_for_input.notify();
collection->signals->wake_up_for_callback.notify();
collection->signals->wake_up_interpreter.notify();
break;
};
}
Expand All @@ -164,11 +173,15 @@ void InterpreterRunner::interpreter_loop(InterpreterCollectionManager &mgr) {
collection->signals->wake_up_for_output.notify();
break;
} else {
INTERPRETERRUNNER_DEBUG("Waiting, no active interpreter");
collection->signals->wake_up_interpreter.wait();
INTERPRETERRUNNER_DEBUG("Woken up...");
}
} else {
if (ready_interpreters == 0) {
INTERPRETERRUNNER_DEBUG("Waiting, no ready interpreter");
collection->signals->wake_up_interpreter.wait();
INTERPRETERRUNNER_DEBUG("Woken up...");
}
}
}
Expand Down Expand Up @@ -209,11 +222,15 @@ void InterpreterRunner::callback_loop(InterpreterCollectionManager &mgr) {
collection->signals->wake_up_for_output.notify();
break;
} else {
INTERPRETERRUNNER_DEBUG("Waiting, no active interpreter");
collection->signals->wake_up_for_callback.wait();
INTERPRETERRUNNER_DEBUG("Woken up...");
}
} else {
if (callbacks_count == 0) {
INTERPRETERRUNNER_DEBUG("Waiting, no callbacks to process");
collection->signals->wake_up_for_callback.wait();
INTERPRETERRUNNER_DEBUG("Woken up...");
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/networkprotocoldsl/interpreterrunner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace networkprotocoldsl {

struct InterpreterRunner {
using callback_function = Value (*)(const std::vector<Value> &);
std::unordered_map<std::string, callback_function> callbacks;
using callback_map = std::unordered_map<std::string, callback_function>;
callback_map callbacks;
std::atomic<bool> exit_when_done = false;
void interpreter_loop(InterpreterCollectionManager &mgr);
void callback_loop(InterpreterCollectionManager &mgr);
Expand Down
6 changes: 3 additions & 3 deletions src/networkprotocoldsl/operation/statemachineoperation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

#include <cassert>

//#define DEBUG(x)
#define DEBUG(x) \
std::cerr << std::this_thread::get_id() << ": " << x << std::endl;
#define DEBUG(x)
//#define DEBUG(x) \
// std::cerr << std::this_thread::get_id() << ": " << x << std::endl;

namespace {

Expand Down
29 changes: 24 additions & 5 deletions src/networkprotocoldsl/support/notificationsignal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,52 @@
#include <iostream>
#include <mutex>

#include <thread>

#define NOTIFICATIONSIGNAL_DEBUG(x)
//#define NOTIFICATIONSIGNAL_DEBUG(x) std::cerr << "NotificationSignal" << "["
//<< name << "/" << std::this_thread::get_id() << "] " << __func__ << ": " << x
//<< std::endl

namespace networkprotocoldsl::support {

class NotificationSignal {
std::string name;
std::mutex mtx;
std::condition_variable cv;
bool notified = false;
std::atomic<bool> notified = false;

public:
NotificationSignal() : mtx(std::mutex()), cv(std::condition_variable()) {}
NotificationSignal(const std::string &n)
: name(n), mtx(std::mutex()), cv(std::condition_variable()) {}
NotificationSignal(const NotificationSignal &in) = delete;
NotificationSignal(NotificationSignal &&in) = delete;
NotificationSignal &operator=(const NotificationSignal &) = delete;

void notify() {
{
NOTIFICATIONSIGNAL_DEBUG("before guard");
std::lock_guard<std::mutex> lk(mtx);
notified = true;
NOTIFICATIONSIGNAL_DEBUG("after guard");
notified.store(true);
}
NOTIFICATIONSIGNAL_DEBUG("before notify");
cv.notify_all();
NOTIFICATIONSIGNAL_DEBUG("after notify");
}

void wait() {
NOTIFICATIONSIGNAL_DEBUG("before lock");
std::unique_lock<std::mutex> lk(mtx);
if (!notified) {
NOTIFICATIONSIGNAL_DEBUG("after lock");
if (!notified.load()) {
NOTIFICATIONSIGNAL_DEBUG("not notified, before wait");
cv.wait(lk);
NOTIFICATIONSIGNAL_DEBUG("not notified, after wait");
} else {
NOTIFICATIONSIGNAL_DEBUG("no wait");
}
notified = false;
notified.store(false);
}
};

Expand Down
62 changes: 62 additions & 0 deletions src/networkprotocoldsl_uv/asyncworkqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "asyncworkqueue.hpp"
#include <stdexcept>

namespace networkprotocoldsl_uv {

static void async_work_callback(uv_async_t *handle) {
AsyncWorkQueue *queue = static_cast<AsyncWorkQueue *>(handle->data);
queue->process();
}

// Callback invoked by uv_close during shutdown.
static void shutdown_close_cb(uv_handle_t *handle) {
AsyncWorkQueue *self = static_cast<AsyncWorkQueue *>(handle->data);
self->shutdown_promise.set_value();
}

AsyncWorkQueue::AsyncWorkQueue(uv_loop_t *loop) {
uv_async_init(loop, &async_handle_, async_work_callback);
async_handle_.data = this;
}

AsyncWorkQueue::~AsyncWorkQueue() {
// If shutdown wasn't invoked already, trigger it and wait.
if (!shutdown_called_) {
shutdown().wait();
}
}

void AsyncWorkQueue::push_work(std::function<void()> work) {
if (shutdown_called_.load()) {
throw std::runtime_error(
"Cannot push work after shutdown has been initiated");
}
work_queue_.push_back(work);
uv_async_send(&async_handle_);
}

void AsyncWorkQueue::process() {
while (true) {
auto work_opt = work_queue_.pop();
if (!work_opt.has_value())
break;
work_opt.value()();
}
}

uv_async_t *AsyncWorkQueue::get_async_handle() { return &async_handle_; }

std::future<void> AsyncWorkQueue::shutdown() {
if (!shutdown_called_.load()) {
// Submit a work item that calls uv_close with a callback that fulfills
// shutdown_promise_.
push_work([this]() {
uv_close(reinterpret_cast<uv_handle_t *>(&async_handle_),
shutdown_close_cb);
shutdown_called_.store(true);
});
}
return shutdown_promise.get_future();
}

} // namespace networkprotocoldsl_uv
40 changes: 40 additions & 0 deletions src/networkprotocoldsl_uv/asyncworkqueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#ifndef NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP
#define NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP

#include <atomic>
#include <functional>
#include <future>
#include <networkprotocoldsl/support/mutexlockqueue.hpp>
#include <uv.h>

namespace networkprotocoldsl_uv {

class AsyncWorkQueue {
public:
AsyncWorkQueue(uv_loop_t *loop);
~AsyncWorkQueue();

// Enqueue a work item and signal the loop.
void push_work(std::function<void()> work);

// Process all pending work items.
void process();

// Returns the uv_async_t handle.
uv_async_t *get_async_handle();

// Safe shutdown: returns a future that is fulfilled when shutdown completes.
std::future<void> shutdown();
std::promise<void> shutdown_promise;

private:
uv_async_t async_handle_;
networkprotocoldsl::support::MutexLockQueue<std::function<void()>>
work_queue_;
// Updated to use std::atomic_bool for safe concurrent access.
std::atomic_bool shutdown_called_{false};
};

} // namespace networkprotocoldsl_uv

#endif // NETWORKPROTOCOLDSL_UV_ASYNCWORKQUEUE_HPP
Loading