diff --git a/package-lock.json b/package-lock.json index 782cd3b5..4ddfdfc8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-rdkafka", - "version": "v2.16.0", + "version": "v2.16.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "node-rdkafka", - "version": "v2.16.0", + "version": "v2.16.1", "hasInstallScript": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index b50c6a8e..e5f76835 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-rdkafka", - "version": "v2.16.0", + "version": "v2.16.1", "description": "Node.js bindings for librdkafka", "librdkafka": "2.1.1", "main": "lib/index.js", diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0c4c07c2..019b0cb6 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -92,11 +92,6 @@ Baton KafkaConsumer::Disconnect() { } } - if (m_consume_loop != nullptr) { - delete m_consume_loop; - m_consume_loop = nullptr; - } - m_is_closing = false; return Baton(err); @@ -1192,6 +1187,18 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) { Nan::Callback *callback = new Nan::Callback(cb); KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + Workers::KafkaConsumerConsumeLoop* consumeLoop = (Workers::KafkaConsumerConsumeLoop*)consumer->m_consume_loop; + if (consumeLoop != nullptr) { + // stop the consume loop + consumeLoop->Close(); + + // cleanup the async worker + consumeLoop->WorkComplete(); + consumeLoop->Destroy(); + + consumer->m_consume_loop = nullptr; + } + Nan::AsyncQueueWorker( new Workers::KafkaConsumerDisconnect(callback, consumer)); info.GetReturnValue().Set(Nan::Null()); diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index de8f2181..c91590ec 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -98,7 +98,7 @@ class KafkaConsumer : public Connection { int m_partition_cnt; bool m_is_subscribed = false; - void* m_consume_loop; + void* m_consume_loop = nullptr; // Node methods static NAN_METHOD(NodeConnect); diff --git a/src/workers.cc b/src/workers.cc index 3d9b3d7c..55d3dd50 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -663,6 +663,7 @@ KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback, const int & timeout_sleep_delay_ms) : MessageWorker(callback), consumer(consumer), + m_looping(true), m_timeout_ms(timeout_ms), m_timeout_sleep_delay_ms(timeout_sleep_delay_ms) { uv_thread_create(&thread_event_loop, KafkaConsumerConsumeLoop::ConsumeLoop, (void*)this); @@ -670,6 +671,11 @@ KafkaConsumerConsumeLoop::KafkaConsumerConsumeLoop(Nan::Callback *callback, KafkaConsumerConsumeLoop::~KafkaConsumerConsumeLoop() {} +void KafkaConsumerConsumeLoop::Close() { + m_looping = false; + uv_thread_join(&thread_event_loop); +} + void KafkaConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) { // ConsumeLoop is used instead } @@ -680,8 +686,7 @@ void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) { KafkaConsumer* consumer = consumerLoop->consumer; // Do one check here before we move forward - bool looping = true; - while (consumer->IsConnected() && looping) { + while (consumerLoop->m_looping && consumer->IsConnected()) { Baton b = consumer->Consume(consumerLoop->m_timeout_ms); RdKafka::ErrorCode ec = b.err(); if (ec == RdKafka::ERR_NO_ERROR) { @@ -711,7 +716,7 @@ void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) { default: // Unknown error. We need to break out of this consumerLoop->SetErrorBaton(b); - looping = false; + consumerLoop->m_looping = false; break; } } else if (ec == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART || ec == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) { @@ -719,7 +724,7 @@ void KafkaConsumerConsumeLoop::ConsumeLoop(void *arg) { } else { // Unknown error. We need to break out of this consumerLoop->SetErrorBaton(b); - looping = false; + consumerLoop->m_looping = false; } } } diff --git a/src/workers.h b/src/workers.h index dcea2477..d7d5ac8a 100644 --- a/src/workers.h +++ b/src/workers.h @@ -373,6 +373,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker { ~KafkaConsumerConsumeLoop(); static void ConsumeLoop(void *arg); + void Close(); void Execute(const ExecutionMessageBus&); void HandleOKCallback(); void HandleErrorCallback(); @@ -383,6 +384,7 @@ class KafkaConsumerConsumeLoop : public MessageWorker { const int m_timeout_ms; unsigned int m_rand_seed; const int m_timeout_sleep_delay_ms; + bool m_looping; }; class KafkaConsumerConsume : public ErrorAwareWorker {