Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ v8::Local<v8::Array> TopicPartitionListToV8Array(
return tp_array;
}

Dispatcher::Dispatcher() {
async = NULL;
Dispatcher::Dispatcher(): async(nullptr, async_deleter) {
uv_mutex_init(&async_lock);
}

Expand All @@ -66,19 +65,17 @@ Dispatcher::~Dispatcher() {
// Only run this if we aren't already listening
void Dispatcher::Activate() {
if (!async) {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);
async = std::unique_ptr<uv_async_t, decltype(async_deleter)*>(
new uv_async_t(), async_deleter);
uv_async_init(uv_default_loop(), async.get(), AsyncMessage_);

async->data = this;
}
}

// Should be able to run this regardless of whether it is active or not
void Dispatcher::Deactivate() {
if (async) {
uv_close(reinterpret_cast<uv_handle_t*>(async), NULL);
async = NULL;
}
async.reset();
}

bool Dispatcher::HasCallbacks() {
Expand All @@ -87,7 +84,7 @@ bool Dispatcher::HasCallbacks() {

void Dispatcher::Execute() {
if (async) {
uv_async_send(async);
uv_async_send(async.get());
}
}

Expand Down Expand Up @@ -119,6 +116,17 @@ void Dispatcher::RemoveCallback(const v8::Local<v8::Function> &cb) {
}
}

// Custom deleter for uv_async_t smart pointers
void Dispatcher::async_deleter(uv_async_t* async) {
uv_close(
reinterpret_cast<uv_handle_t*>(async),
// Release memory after uv_close() has finished.
[](uv_handle_t* handle) {
delete reinterpret_cast<uv_async_t*>(handle);
}
);
}

event_t::event_t(const RdKafka::Event &event) {
message = "";
fac = "";
Expand Down
5 changes: 4 additions & 1 deletion src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <uv.h>
#include <nan.h>

#include <memory>
#include <vector>
#include <deque>

Expand Down Expand Up @@ -49,7 +50,9 @@ class Dispatcher {
dispatcher->Flush();
}

uv_async_t *async;
static inline void async_deleter(uv_async_t* async);

std::unique_ptr<uv_async_t, decltype(async_deleter)*> async;
};

struct event_t {
Expand Down