@@ -481,8 +481,7 @@ static void rnng_dispatch_thread(void *args) {
481481 int * online = disp -> online ;
482482 char * * url = disp -> url ;
483483
484- int xc , end = 0 , sync = ncv -> flag ;
485- ncv -> flag = 0 ;
484+ int xc , end = 0 ;
486485 nng_socket hsock ;
487486 nng_dialer hdial ;
488487 nng_socket sock [n ];
@@ -514,11 +513,6 @@ static void rnng_dispatch_thread(void *args) {
514513 0x72 , 0x6f , 0x72 , 0xfe , 0x00 , 0x00 , 0x00
515514 };
516515
517- if (nng_rep0_open (& hsock ))
518- return ;
519- if (nng_dial (hsock , disp -> host , & hdial , 0 ))
520- goto exitlevel1 ;
521-
522516 for (R_xlen_t i = 0 ; i < n ; i ++ ) {
523517
524518 signal [i ].cv = ncv ;
@@ -527,17 +521,17 @@ static void rnng_dispatch_thread(void *args) {
527521 nng_socket_set_ms (sock [i ], "req:resend-time" , 0 ) ||
528522 nng_pipe_notify (sock [i ], NNG_PIPE_EV_ADD_POST , nano_record_pipe , & signal [i ]) ||
529523 nng_pipe_notify (sock [i ], NNG_PIPE_EV_REM_POST , nano_record_pipe , & signal [i ]))
530- goto exitlevel2 ;
524+ goto exitlevel1 ;
531525
532526 if (disp -> tls != NULL ) {
533527 if (nng_listener_create (& list [i ], sock [i ], url [i ]) ||
534528 nng_tls_config_server_name (disp -> tls , disp -> up -> u_hostname ) ||
535529 nng_listener_set_ptr (list [i ], NNG_OPT_TLS_CONFIG , disp -> tls ) ||
536530 nng_listener_start (list [i ], 0 ))
537- goto exitlevel2 ;
531+ goto exitlevel1 ;
538532 } else {
539533 if (nng_listen (sock [i ], url [i ], & list [i ], 0 ))
540- goto exitlevel2 ;
534+ goto exitlevel1 ;
541535 }
542536
543537 raio [i ]-> next = ncv ;
@@ -547,9 +541,15 @@ static void rnng_dispatch_thread(void *args) {
547541 if (nng_aio_alloc (& saio [i ]-> aio , sendaio_complete , saio [i ]) ||
548542 nng_aio_alloc (& raio [i ]-> aio , raio_complete_signal , raio [i ]) ||
549543 nng_aio_alloc (& haio [i ]-> aio , raio_complete_signal , haio [i ]))
550- goto exitlevel2 ;
544+ goto exitlevel1 ;
551545 }
552546
547+ if (nng_rep0_open (& hsock ))
548+ goto exitlevel1 ;
549+
550+ if (nng_dial (hsock , disp -> host , & hdial , 0 ))
551+ goto exitlevel2 ;
552+
553553 for (R_xlen_t i = 0 ; i < n ; i ++ ) {
554554 nng_mtx_lock (mtx );
555555 while (ncv -> condition == 0 )
@@ -562,15 +562,6 @@ static void rnng_dispatch_thread(void *args) {
562562 nng_mtx_unlock (mtx );
563563 }
564564
565- if (sync ) {
566- if (nng_recvmsg (hsock , & msg , 0 ))
567- goto exitlevel2 ;
568- if (nng_sendmsg (hsock , msg , 0 )) {
569- nng_msg_free (msg );
570- goto exitlevel2 ;
571- }
572- }
573-
574565 for (R_xlen_t i = 0 ; i < n ; i ++ ) {
575566 nng_ctx_open (& rctx [i ], hsock );
576567 nng_ctx_recv (rctx [i ], haio [i ]-> aio );
@@ -668,17 +659,14 @@ static void rnng_dispatch_thread(void *args) {
668659 }
669660
670661 exitlevel2 :
662+ nng_close (hsock );
663+ exitlevel1 :
671664 for (R_xlen_t i = 0 ; i < n ; i ++ )
672665 nng_close (sock [i ]);
673- exitlevel1 :
674- nng_close (hsock );
675666
676667}
677668
678- SEXP rnng_dispatcher_socket (SEXP cv , SEXP host , SEXP url , SEXP tls ) {
679-
680- if (NANO_TAG (cv ) != nano_CvSymbol )
681- Rf_error ("'cv' is not a valid Condition Variable" );
669+ SEXP rnng_dispatcher_socket (SEXP host , SEXP url , SEXP tls ) {
682670
683671 const int sync = tls == R_MissingArg ;
684672 const int sec = !sync && tls != R_NilValue ;
@@ -687,14 +675,13 @@ SEXP rnng_dispatcher_socket(SEXP cv, SEXP host, SEXP url, SEXP tls) {
687675 if (sec && NANO_TAG (tls ) != nano_TlsSymbol )
688676 Rf_error ("'tls' is not a valid TLS Configuration" );
689677
690- nano_cv * ncv = (nano_cv * ) NANO_PTR (cv );
691-
692678 int xc ;
693- SEXP xptr , sock , list ;
679+ SEXP cv , cvt , xptr , sock , list ;
694680
681+ PROTECT (cvt = rnng_cv_alloc ());
682+ nano_cv * tcv = (nano_cv * ) NANO_PTR (cvt );
695683 nano_thread_disp * disp = R_Calloc (1 , nano_thread_disp );
696- disp -> cv = ncv ;
697- ncv -> flag = sync ;
684+ disp -> cv = tcv ;
698685 disp -> n = nd ;
699686 disp -> tls = sec ? (nng_tls_config * ) NANO_PTR (tls ) : NULL ;
700687 if (sec ) nng_tls_config_hold (disp -> tls );
@@ -722,13 +709,17 @@ SEXP rnng_dispatcher_socket(SEXP cv, SEXP host, SEXP url, SEXP tls) {
722709 if ((xc = nng_req0_open (hsock )))
723710 goto exitlevel2 ;
724711
712+ PROTECT (cv = rnng_cv_alloc ());
713+ nano_cv * ncv = (nano_cv * ) NANO_PTR (cv );
725714 if ((xc = nng_socket_set_ms (* hsock , "req:resend-time" , 0 )) ||
715+ (xc = nng_pipe_notify (* hsock , NNG_PIPE_EV_ADD_POST , pipe_cb_signal , ncv )) ||
726716 (xc = nng_listen (* hsock , disp -> host , & hl -> list , 0 )) ||
727717 (xc = nng_thread_create (& disp -> thr , rnng_dispatch_thread , disp )))
728718 goto exitlevel3 ;
729719
730720 PROTECT (sock = R_MakeExternalPtr (hsock , nano_SocketSymbol , R_NilValue ));
731721 R_RegisterCFinalizerEx (sock , socket_finalizer , TRUE);
722+ Rf_setAttrib (sock , nano_CvSymbol , cvt );
732723
733724 xptr = R_MakeExternalPtr (disp , nano_SocketSymbol , R_NilValue );
734725 Rf_setAttrib (sock , R_MissingArg , xptr );
@@ -738,12 +729,18 @@ SEXP rnng_dispatcher_socket(SEXP cv, SEXP host, SEXP url, SEXP tls) {
738729 Rf_setAttrib (sock , nano_ListenerSymbol , list );
739730 R_RegisterCFinalizerEx (list , listener_finalizer , TRUE);
740731
732+ rnng_cv_wait (cv );
733+ if ((xc = nng_pipe_notify (* hsock , NNG_PIPE_EV_ADD_POST , NULL , NULL )))
734+ goto exitlevel4 ;
741735
742- UNPROTECT (1 );
736+ UNPROTECT (3 );
743737 return sock ;
744738
739+ exitlevel4 :
740+ UNPROTECT (1 );
745741 exitlevel3 :
746742 nng_close (* hsock );
743+ UNPROTECT (1 );
747744 exitlevel2 :
748745 nng_url_free (disp -> up );
749746 exitlevel1 :
@@ -762,6 +759,7 @@ SEXP rnng_dispatcher_socket(SEXP cv, SEXP host, SEXP url, SEXP tls) {
762759 R_Free (disp -> online );
763760 if (sec ) nng_tls_config_free (disp -> tls );
764761 R_Free (disp );
762+ UNPROTECT (1 );
765763 ERROR_OUT (xc );
766764
767765}
0 commit comments