Skip to content

Implement serialization header #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Type: Package
Package: nanonext
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.5.2.9005
Version: 1.5.2.9006
Authors@R: c(
person("Charlie", "Gao", , "[email protected]", role = c("aut", "cre"),
comment = c(ORCID = "0000-0002-0750-061X")),
Expand Down
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ export("%~>%")
export("opt<-")
export(.advance)
export(.context)
export(.header)
export(.interrupt)
export(.keep)
export(.mark)
export(.read_header)
export(.read_marker)
export(.unresolved)
export(call_aio)
export(call_aio_)
Expand Down
52 changes: 44 additions & 8 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -297,29 +297,65 @@ status_code <- function(x) .Call(rnng_status_code, x)
serial_config <- function(class, sfunc, ufunc, vec = FALSE)
.Call(rnng_serial_config, class, sfunc, ufunc)

#' Set Serialization Marker
#' Advances the RNG State
#'
#' Internal package function.
#'
#' @param x logical value.
#' @return NULL.
#'
#' @return The logical value `x` supplied.
#' @keywords internal
#' @export
#'
.advance <- function() .Call(rnng_advance_rng_state)

#' Serialization Headers and Markers
#'
#' Internal package functions.
#'
#' @param value integer value.
#'
#' @return For `.header()`: the integer `value` supplied.
#'
#' @keywords internal
#' @export
#'
.mark <- function(x = TRUE) .Call(rnng_set_marker, x)
.header <- function(value = 0L) .Call(rnng_header_set, value)

#' Advances the RNG State
#' Read Serialization Header
#'
#' Internal package function.
#' @param x raw vector.
#'
#' @return NULL.
#' @return For `.read_header()`: integer value.
#'
#' @keywords internal
#' @rdname dot-header
#' @export
#'
.advance <- function() .Call(rnng_advance_rng_state)
.read_header <- function(x) .Call(rnng_header_read, x)

#' Set Serialization Marker
#'
#' @param bool logical value.
#'
#' @return For `.mark()`: the logical `bool` supplied.
#'
#' @keywords internal
#' @rdname dot-header
#' @export
#'
.mark <- function(bool = TRUE) .Call(rnng_marker_set, bool)

#' Read Serialization Marker
#'
#' @param x raw vector.
#'
#' @return For `.read_marker()`: logical value `TRUE` or `FALSE`.
#'
#' @keywords internal
#' @rdname dot-header
#' @export
#'
.read_marker <- function(x) .Call(rnng_marker_read, x)

#' Interrupt Switch
#'
Expand Down
37 changes: 37 additions & 0 deletions man/dot-header.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 0 additions & 18 deletions man/dot-mark.Rd

This file was deleted.

39 changes: 34 additions & 5 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

// internals -------------------------------------------------------------------

static uint8_t special_bit = 0;
static int special_marker = 0;
static int special_header = 0;
static nano_serial_bundle nano_bundle;
static SEXP nano_eval_res;

Expand Down Expand Up @@ -315,9 +316,11 @@ void nano_serialize(nano_buf *buf, SEXP object, SEXP hook) {
NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
struct R_outpstream_st output_stream;

if (special_bit) {
if (special_header || special_marker) {
buf->buf[0] = 0x7;
buf->buf[3] = special_bit;
buf->buf[3] = (uint8_t) special_marker;
if (special_header)
memcpy(buf->buf + 4, &special_header, sizeof(int));
buf->cur += 8;
}

Expand Down Expand Up @@ -621,13 +624,39 @@ int nano_matchargs(const SEXP mode) {

// specials --------------------------------------------------------------------

SEXP rnng_set_marker(SEXP x) {
SEXP rnng_marker_set(SEXP x) {

special_bit = (uint8_t) NANO_INTEGER(x);
special_marker = NANO_INTEGER(x);
return x;

}

SEXP rnng_marker_read(SEXP x) {

unsigned char *buf = (unsigned char *) NANO_DATAPTR(x);

return Rf_ScalarLogical(TYPEOF(x) == RAWSXP && XLENGTH(x) > 12 && buf[0] == 0x7 && buf[3] == 0x1);

}

SEXP rnng_header_set(SEXP x) {

special_header = NANO_INTEGER(x);
return x;

}

SEXP rnng_header_read(SEXP x) {

unsigned char *buf = (unsigned char *) NANO_DATAPTR(x);
int res = 0;
if (TYPEOF(x) == RAWSXP && XLENGTH(x) > 12 && buf[0] == 0x7) {
memcpy(&res, buf + 4, sizeof(int));
}
return Rf_ScalarInteger(res);

}

SEXP rnng_eval_safe(SEXP arg) {

return R_ToplevelExec(nano_eval_safe, arg) ? nano_eval_res : Rf_allocVector(RAWSXP, 1);
Expand Down
5 changes: 4 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,16 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_eval_safe", (DL_FUNC) &rnng_eval_safe, 1},
{"rnng_fini", (DL_FUNC) &rnng_fini, 0},
{"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2},
{"rnng_header_read", (DL_FUNC) &rnng_header_read, 1},
{"rnng_header_set", (DL_FUNC) &rnng_header_set, 1},
{"rnng_interrupt_switch", (DL_FUNC) &rnng_interrupt_switch, 1},
{"rnng_is_error_value", (DL_FUNC) &rnng_is_error_value, 1},
{"rnng_is_nul_byte", (DL_FUNC) &rnng_is_nul_byte, 1},
{"rnng_listen", (DL_FUNC) &rnng_listen, 5},
{"rnng_listener_close", (DL_FUNC) &rnng_listener_close, 1},
{"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1},
{"rnng_marker_read", (DL_FUNC) &rnng_marker_read, 1},
{"rnng_marker_set", (DL_FUNC) &rnng_marker_set, 1},
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 1},
{"rnng_monitor_create", (DL_FUNC) &rnng_monitor_create, 2},
{"rnng_monitor_read", (DL_FUNC) &rnng_monitor_read, 1},
Expand All @@ -153,7 +157,6 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_send", (DL_FUNC) &rnng_send, 5},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 6},
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 3},
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 1},
{"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3},
{"rnng_set_promise_context", (DL_FUNC) &rnng_set_promise_context, 2},
{"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2},
Expand Down
5 changes: 4 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,16 @@ SEXP rnng_dialer_start(SEXP, SEXP);
SEXP rnng_eval_safe(SEXP);
SEXP rnng_fini(void);
SEXP rnng_get_opt(SEXP, SEXP);
SEXP rnng_header_read(SEXP);
SEXP rnng_header_set(SEXP);
SEXP rnng_interrupt_switch(SEXP);
SEXP rnng_is_error_value(SEXP);
SEXP rnng_is_nul_byte(SEXP);
SEXP rnng_listen(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_listener_close(SEXP);
SEXP rnng_listener_start(SEXP);
SEXP rnng_marker_read(SEXP);
SEXP rnng_marker_set(SEXP);
SEXP rnng_messenger(SEXP);
SEXP rnng_messenger_thread_create(SEXP);
SEXP rnng_monitor_create(SEXP, SEXP);
Expand All @@ -331,7 +335,6 @@ SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_serial_config(SEXP, SEXP, SEXP);
SEXP rnng_set_marker(SEXP);
SEXP rnng_set_opt(SEXP, SEXP, SEXP);
SEXP rnng_set_promise_context(SEXP, SEXP);
SEXP rnng_signal_thread_create(SEXP, SEXP);
Expand Down
15 changes: 12 additions & 3 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,19 @@ test_error(req$opt("false", list()), "type")

test_class("nanoContext", ctx <- context(rep))
test_print(ctx)
test_equal(.header(12345L), 12345L)
test_true(.mark())
test_class("sendAio", csaio <- req$send_aio(data.frame(), mode = "seria", timeout = 500))
test_true(!.mark(FALSE))
test_zero(call_aio_(csaio)$result)
test_class("recvAio", craio <- recv_aio(ctx, timeout = 500))
test_type("list", collect_aio(craio))
test_class("recvAio", craio <- recv_aio(ctx, mode = 8L, timeout = 500))
test_type("raw", res <- collect_aio(craio))
test_true(.read_marker(res))
test_true(!.read_marker("not"))
test_equal(.read_header(res), 12345L)
test_equal(.read_header("not"), 0L)
test_type("list", unserialize(res[9:length(res)]))
test_equal(.header(0L), 0L)
test_true(!.mark(FALSE))
test_zero(req$send("context test", mode ="raw", block = 500))
test_equal(recv(ctx, mode = "string", block = 500), "context test")
test_type("integer", req$send(data.frame(), mode = "seri", block = 500))
Expand All @@ -220,6 +227,7 @@ test_type("logical", .unresolved(msg))
test_type("logical", unresolved(msg))
test_class("data.frame", call_aio(msg)$data)
test_true(!unresolved(msg))
test_equal(.header(2025250L), 2025250L)
test_zero(req$send(c(TRUE, FALSE, TRUE), mode = 2L, block = 500))
test_class("recvAio", msg <- recv_aio(ctx, mode = 6L, timeout = 500))
test_type("logical", msg[])
Expand All @@ -245,6 +253,7 @@ test_class("recvAio", rek <- request(req$context, c(1+3i, 4+2i), send_mode = "se
test_zero(reply(ctx, execute = identity, recv_mode = 1L, send_mode = 1L, timeout = 500))
test_type("complex", call_aio(rek)[["data"]])
test_type("integer", rek[["aio"]])
test_equal(.header(0L), 0L)

test_type("list", cfg <- serial_config(class = c("invalid", "custom"), sfunc = list(identity, function(x) raw(1L)), ufunc = list(identity, as.integer)))
opt(req$socket, "serial") <- cfg
Expand Down
Loading