Skip to content

Commit 47cdb48

Browse files
committed
increment version post threaded dispatcher
1 parent da53328 commit 47cdb48

File tree

3 files changed

+5
-12
lines changed

3 files changed

+5
-12
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 1.2.1.9015
4+
Version: 1.2.1.9016
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
# nanonext 1.2.1.9015 (development)
1+
# nanonext 1.2.1.9016 (development)
22

33
#### New Features
44

5+
* Adds support for threaded dispatcher in `mirai`.
56
* Adds 'recvAio' method for `promises::as.promise()` and `promises::is.promising()` to enable 'recvAio' promises.
67

78
#### Updates

src/thread.c

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ SEXP rnng_wait_thread_create(SEXP x) {
369369
}
370370

371371
static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) {
372+
372373
nano_signal *signal = (nano_signal *) arg;
373374
const int incr = ev == NNG_PIPE_EV_ADD_POST;
374375
nano_cv *ncv = signal->cv;
@@ -379,7 +380,7 @@ static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) {
379380
ncv->condition++;
380381
nng_cv_wake(cv);
381382
nng_mtx_unlock(mtx);
382-
// nano_printf(1, "pipe ev %d\n", incr);
383+
383384
}
384385

385386
static void rnng_signal_thread(void *args) {
@@ -604,7 +605,6 @@ static void rnng_dispatch_thread(void *args) {
604605
nng_mtx_unlock(mtx);
605606
if (xc) {
606607
raio[i]->result = 0;
607-
// nano_printf(1, "received reply %d\n", i);
608608
if (xc < 0) {
609609
buf = nng_msg_body((nng_msg *) raio[i]->data);
610610
if (buf[3] == 0x1) {
@@ -616,7 +616,6 @@ static void rnng_dispatch_thread(void *args) {
616616
if ((xc = nng_ctx_sendmsg(rctx[i], (nng_msg *) raio[i]->data, 0)))
617617
nng_msg_free((nng_msg *) raio[i]->data);
618618
} else {
619-
// nano_printf(1, "received error %d\n", i);
620619
nng_msg_alloc(&msg, 0);
621620
if (xc == 19)
622621
nng_msg_append(msg, errnt, sizeof(errnt));
@@ -629,13 +628,11 @@ static void rnng_dispatch_thread(void *args) {
629628
nng_ctx_close(ctx[i]);
630629
nng_ctx_close(rctx[i]);
631630
busy[i] = 0;
632-
// nano_printf(1, "processed reply %d\n", i);
633631
if (end) {
634632
end = 0;
635633
} else {
636634
nng_ctx_open(&rctx[i], hsock);
637635
nng_ctx_recv(rctx[i], haio[i]->aio);
638-
// nano_printf(1, "allocated %d\n", i);
639636
}
640637
break;
641638
}
@@ -649,22 +646,18 @@ static void rnng_dispatch_thread(void *args) {
649646
haio[i]->result = 0;
650647
if (xc < 0) {
651648
busy[i] = 1;
652-
// nano_printf(1, "prep for send %d\n", i);
653649
nng_ctx_open(&ctx[i], sock[i]);
654650
nng_aio_set_msg(saio[i]->aio, (nng_msg *) haio[i]->data);
655651
nng_ctx_send(ctx[i], saio[i]->aio);
656652
nng_ctx_recv(ctx[i], raio[i]->aio);
657-
// nano_printf(1, "sent %d\n", i);
658653
} else {
659654
nng_msg_alloc(&msg, 0);
660655
nng_msg_append(msg, (unsigned char *) &xc, sizeof(int));
661656
if ((xc = nng_ctx_sendmsg(rctx[i], msg, 0)))
662657
nng_msg_free(msg);
663658
nng_ctx_close(rctx[i]);
664-
// nano_printf(1, "send error resetting %d\n", i);
665659
nng_ctx_open(&rctx[i], hsock);
666660
nng_ctx_recv(rctx[i], haio[i]->aio);
667-
// nano_printf(1, "allocated %d\n", i);
668661
}
669662
break;
670663
}
@@ -679,7 +672,6 @@ static void rnng_dispatch_thread(void *args) {
679672
nng_close(sock[i]);
680673
exitlevel1:
681674
nng_close(hsock);
682-
// nano_printf(1, "Dispatcher thread halted\n");
683675

684676
}
685677

0 commit comments

Comments
 (0)