Skip to content

Commit da53328

Browse files
authored
Threaded Dispatcher (#49)
* dispatcher POC * add host connection * accept external cv * recv aio signal from host * add host url * create host socket * move to req/rep * port dispatcher concept * comment unsafe print statements from thread + document R function * re-use aios * use safe printfs * use new API layer * disable retries at daemon level * record active pipes, allow scale down * tidy up urls * prelim threaded dispatcher tls support * add thread_disp_finalizer * add safety * simplify - remove need for pipe notify * use pipe events to track connections through cv flag value * doc for internal use * add more safety * align APIs * use more efficient saio structs * active status concept * synchronize * read active status * support daemon time/task outs * signal from pipe events * catch one * rename to online * remove one lock * synchronize daemons * fix synchronization routine * move url parse out of loop * take all urls as inputs * add tests * fixes and cleanups * passes valgrind * increase coverage * more coverage * try more * run comms over dispatcher * widen test timings * sync comms check
1 parent 756b516 commit da53328

File tree

11 files changed

+540
-48
lines changed

11 files changed

+540
-48
lines changed

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ export("%~>%")
4747
export("opt<-")
4848
export(.advance)
4949
export(.context)
50+
export(.dispatcher)
5051
export(.keep)
5152
export(.mark)
53+
export(.online)
5254
export(.unresolved)
5355
export(call_aio)
5456
export(call_aio_)

R/sync.R

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,36 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket))
306306
#' @export
307307
#'
308308
`%~>%` <- function(cv, cv2) invisible(.Call(rnng_signal_thread_create, cv, cv2))
309+
310+
#' Dispatcher Socket
311+
#'
312+
#' Creates a Dispatcher socket, which is a special type of \sQuote{req} socket,
313+
#' with FIFO scheduling using a threaded implementation (for internal use
314+
#' only).
315+
#'
316+
#' @param cv a \sQuote{conditionVariable} object.
317+
#' @param host \sQuote{inproc://} url connecting the host to the thread.
318+
#' @param url the URLs at which to listen for rep nodes.
319+
#' @inheritParams listen
320+
#'
321+
#' @return A \sQuote{req} Socket. The thread is attached as an attribute.
322+
#'
323+
#' @keywords internal
324+
#' @export
325+
#'
326+
.dispatcher <- function(cv, host, url, tls = quote(expr =))
327+
.Call(rnng_dispatcher_socket, cv, host, url, tls)
328+
329+
#' Read Online Status
330+
#'
331+
#' Reads the online status of threaded dispatcher sockets (for internal use
332+
#' only).
333+
#'
334+
#' @param sock a dispatcher Socket.
335+
#'
336+
#' @return An vector of integer values.
337+
#'
338+
#' @keywords internal
339+
#' @export
340+
#'
341+
.online <- function(sock) .Call(rnng_read_online, sock)

man/dot-dispatcher.Rd

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/dot-online.Rd

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aio.c

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,6 @@ static void raio_complete(void *arg) {
6767

6868
}
6969

70-
static void raio_complete_signal(void *arg) {
71-
72-
nano_aio *raio = (nano_aio *) arg;
73-
nano_cv *ncv = (nano_cv *) raio->next;
74-
nng_cv *cv = ncv->cv;
75-
nng_mtx *mtx = ncv->mtx;
76-
77-
const int res = nng_aio_result(raio->aio);
78-
if (res == 0)
79-
raio->data = nng_aio_get_msg(raio->aio);
80-
81-
nng_mtx_lock(mtx);
82-
raio->result = res - !res;
83-
ncv->condition++;
84-
nng_cv_wake(cv);
85-
nng_mtx_unlock(mtx);
86-
87-
if (raio->cb != NULL)
88-
later2(raio_invoke_cb, raio->cb);
89-
90-
}
91-
9270
static void iraio_complete(void *arg) {
9371

9472
nano_aio *iaio = (nano_aio *) arg;

src/core.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,33 @@ static SEXP nano_outHook(SEXP x, SEXP fun) {
122122

123123
// functions with forward definitions in nanonext.h ----------------------------
124124

125+
void raio_complete_signal(void *arg) {
126+
127+
nano_aio *raio = (nano_aio *) arg;
128+
nano_cv *ncv = (nano_cv *) raio->next;
129+
nng_cv *cv = ncv->cv;
130+
nng_mtx *mtx = ncv->mtx;
131+
132+
const int res = nng_aio_result(raio->aio);
133+
if (res == 0)
134+
raio->data = nng_aio_get_msg(raio->aio);
135+
136+
nng_mtx_lock(mtx);
137+
raio->result = res - !res;
138+
ncv->condition++;
139+
nng_cv_wake(cv);
140+
nng_mtx_unlock(mtx);
141+
142+
}
143+
144+
void sendaio_complete(void *arg) {
145+
146+
nng_aio *aio = ((nano_saio *) arg)->aio;
147+
if (nng_aio_result(aio))
148+
nng_msg_free(nng_aio_get_msg(aio));
149+
150+
}
151+
125152
void dialer_finalizer(SEXP xptr) {
126153

127154
if (NANO_PTR(xptr) == NULL) return;
@@ -153,6 +180,16 @@ void socket_finalizer(SEXP xptr) {
153180

154181
}
155182

183+
void cv_finalizer(SEXP xptr) {
184+
185+
if (NANO_PTR(xptr) == NULL) return;
186+
nano_cv *xp = (nano_cv *) NANO_PTR(xptr);
187+
nng_cv_free(xp->cv);
188+
nng_mtx_free(xp->mtx);
189+
R_Free(xp);
190+
191+
}
192+
156193
#if R_VERSION < R_Version(4, 1, 0)
157194

158195
inline SEXP R_NewEnv(SEXP parent, int hash, int size) {

src/init.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ static const R_CallMethodDef callMethods[] = {
143143
{"rnng_dial", (DL_FUNC) &rnng_dial, 5},
144144
{"rnng_dialer_close", (DL_FUNC) &rnng_dialer_close, 1},
145145
{"rnng_dialer_start", (DL_FUNC) &rnng_dialer_start, 2},
146+
{"rnng_dispatcher_socket", (DL_FUNC) &rnng_dispatcher_socket, 4},
146147
{"rnng_fini", (DL_FUNC) &rnng_fini, 0},
147148
{"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2},
148149
{"rnng_is_error_value", (DL_FUNC) &rnng_is_error_value, 1},
@@ -160,6 +161,7 @@ static const R_CallMethodDef callMethods[] = {
160161
{"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 6},
161162
{"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 6},
162163
{"rnng_random", (DL_FUNC) &rnng_random, 2},
164+
{"rnng_read_online", (DL_FUNC) &rnng_read_online, 1},
163165
{"rnng_reap", (DL_FUNC) &rnng_reap, 1},
164166
{"rnng_recv", (DL_FUNC) &rnng_recv, 4},
165167
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6},

src/nanonext.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,25 @@ typedef struct nano_thread_duo_s {
212212
nano_cv *cv2;
213213
} nano_thread_duo;
214214

215+
typedef struct nano_thread_disp_s {
216+
nng_thread *thr;
217+
nano_cv *cv;
218+
nng_tls_config *tls;
219+
nano_saio **saio;
220+
nano_aio **raio;
221+
nano_aio **haio;
222+
nng_url *up;
223+
const char *host;
224+
char **url;
225+
int *online;
226+
R_xlen_t n;
227+
} nano_thread_disp;
228+
229+
typedef struct nano_signal_s {
230+
nano_cv *cv;
231+
int *online;
232+
} nano_signal;
233+
215234
typedef struct nano_buf_s {
216235
unsigned char *buf;
217236
size_t len;
@@ -227,6 +246,9 @@ SEXP R_mkClosure(SEXP, SEXP, SEXP);
227246
SEXP nano_findVarInFrame(const SEXP, const SEXP);
228247
SEXP nano_PreserveObject(const SEXP);
229248
void nano_ReleaseObject(SEXP);
249+
void raio_complete_signal(void *);
250+
void sendaio_complete(void *);
251+
void cv_finalizer(SEXP);
230252
void dialer_finalizer(SEXP);
231253
void listener_finalizer(SEXP);
232254
void socket_finalizer(SEXP);
@@ -275,6 +297,7 @@ SEXP rnng_cv_wait_safe(SEXP);
275297
SEXP rnng_dial(SEXP, SEXP, SEXP, SEXP, SEXP);
276298
SEXP rnng_dialer_close(SEXP);
277299
SEXP rnng_dialer_start(SEXP, SEXP);
300+
SEXP rnng_dispatcher_socket(SEXP, SEXP, SEXP, SEXP);
278301
SEXP rnng_fini(void);
279302
SEXP rnng_get_opt(SEXP, SEXP);
280303
SEXP rnng_is_error_value(SEXP);
@@ -293,6 +316,7 @@ SEXP rnng_pipe_close(SEXP);
293316
SEXP rnng_pipe_notify(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
294317
SEXP rnng_protocol_open(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
295318
SEXP rnng_random(SEXP, SEXP);
319+
SEXP rnng_read_online(SEXP);
296320
SEXP rnng_reap(SEXP);
297321
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
298322
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);

src/sync.c

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ static void request_complete_signal(void *arg) {
7676

7777
}
7878

79-
static void sendaio_complete(void *arg) {
80-
81-
nng_aio *aio = ((nano_saio *) arg)->aio;
82-
if (nng_aio_result(aio))
83-
nng_msg_free(nng_aio_get_msg(aio));
84-
85-
}
86-
8779
static void pipe_cb_signal(nng_pipe p, nng_pipe_ev ev, void *arg) {
8880

8981
int sig;
@@ -161,16 +153,6 @@ static void pipe_cb_dropcon(nng_pipe p, nng_pipe_ev ev, void *arg) {
161153

162154
// finalizers ------------------------------------------------------------------
163155

164-
static void cv_finalizer(SEXP xptr) {
165-
166-
if (NANO_PTR(xptr) == NULL) return;
167-
nano_cv *xp = (nano_cv *) NANO_PTR(xptr);
168-
nng_cv_free(xp->cv);
169-
nng_mtx_free(xp->mtx);
170-
R_Free(xp);
171-
172-
}
173-
174156
static void cv_duo_finalizer(SEXP xptr) {
175157

176158
if (NANO_PTR(xptr) == NULL) return;

0 commit comments

Comments
 (0)