From b1d758f86d10d3122bb51e154013900ec99cd3d1 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 14:26:34 -0300 Subject: [PATCH 01/21] Add POSIX shared memory transport for multi-worker dataloaders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `options(torch.dataloader_use_mori = TRUE)`, the dataloader uses POSIX shared memory (shm_open/mmap) instead of callr's pipe or socket connections for transferring tensor data between worker processes and the main process. The SHM path does a single memcpy (tensor → SHM) on the producer and zero copies on the consumer (from_blob aliases the mapped memory), compared to serialize + pipe transfer + deserialize in the default path. Changes: - src/tensor.cpp: Add cpp_tensor_to_shm() and cpp_map_shm() for direct POSIX SHM operations. Add r_dataptr_ro() to avoid ALTREP COW when reading buffers via from_blob. - R/utils-data-dataloader.R: Add SHM transport as a third option alongside callr pipe and socket connections, controlled by the torch.dataloader_use_mori option. - bench/bench-shm.R: Benchmark comparing default vs SHM transport. - .github/workflows/main.yaml: Add SHM benchmark to CI. --- .github/workflows/main.yaml | 5 ++ R/RcppExports.R | 8 +++ R/utils-data-dataloader.R | 101 ++++++++++++++++++++++++++++----- bench/bench-shm.R | 55 ++++++++++++++++++ src/RcppExports.cpp | 25 ++++++++ src/tensor.cpp | 110 +++++++++++++++++++++++++++++++++--- 6 files changed, 282 insertions(+), 22 deletions(-) create mode 100644 bench/bench-shm.R diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 770ae3fe2a..da9db3a3c0 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -138,6 +138,11 @@ jobs: bash bench/benchmark_cpu_cache/run.sh 2>&1 | tee -a $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY + - name: Run SHM dataloader benchmark + run: | + echo '## SHM Dataloader Benchmark' >> $GITHUB_STEP_SUMMARY + Rscript bench/bench-shm.R 2>&1 | tee -a $GITHUB_STEP_SUMMARY + build-gpu-image: needs: lantern if: ${{ always() && needs.lantern.result != 'failed' }} diff --git a/R/RcppExports.R b/R/RcppExports.R index cf5595b1ee..c6e2418e89 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -16941,6 +16941,14 @@ cpp_buffer_from_tensor <- function(data) { .Call(`_torch_cpp_buffer_from_tensor`, data) } +cpp_tensor_to_shm <- function(tensor) { + .Call(`_torch_cpp_tensor_to_shm`, tensor) +} + +cpp_map_shm <- function(name, nbytes_dbl) { + .Call(`_torch_cpp_map_shm`, name, nbytes_dbl) +} + cpp_torch_tensor_dtype <- function(x) { .Call(`_torch_cpp_torch_tensor_dtype`, x) } diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 435231c541..e02367d734 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -363,7 +363,7 @@ MultiProcessingDataLoaderIter <- R6::R6Class( } worker_config <- function(id, num_workers, seed, init_fn, globals, - packages, socket_port = NULL) { + packages, socket_port = NULL, use_mori = FALSE) { library(torch) .worker_info <<- list( id = id, @@ -386,27 +386,28 @@ MultiProcessingDataLoaderIter <- R6::R6Class( if (!is.null(init_fn)) { init_fn(id) } - + .socket_con <<- NULL if (!is.null(socket_port)) { # We need to wait for the main process to start the server, so here we # retry a few times until the conection works. for(i in 1:20) { tr <- try({.socket_con <<- socketConnection( - port = socket_port, - blocking = TRUE, + port = socket_port, + blocking = TRUE, open = "a+b" - )}, silent = TRUE) - + )}, silent = TRUE) + if (!inherits(tr, "try-error")) break Sys.sleep(0.5) - + if (i == 20) { runtime_error("Could not create a connection with the main process.") } } } - + + .use_mori <<- use_mori } fetcher <- self$.dataset_fetcher$fetch @@ -424,7 +425,8 @@ MultiProcessingDataLoaderIter <- R6::R6Class( init_fn = self$.worker_init_fn, globals = self$.worker_globals, packages = self$.worker_packages, - socket_port = worker$port + socket_port = worker$port, + use_mori = worker$using_mori ) ) @@ -464,11 +466,11 @@ MultiProcessingDataLoaderIter <- R6::R6Class( # send task to the worker if (coro::is_exhausted(index)) { worker$session$call(function() { - torch:::to_exportable_tensor(coro::exhausted(), .socket_con) + torch:::to_exportable_tensor(coro::exhausted(), .socket_con, .use_mori) }) } else { worker$session$call(function(index) { - torch:::to_exportable_tensor(fetcher(index), .socket_con) + torch:::to_exportable_tensor(fetcher(index), .socket_con, .use_mori) }, list(index = index)) } @@ -481,7 +483,27 @@ MultiProcessingDataLoaderIter <- R6::R6Class( task <- private$tasks[[1]] private$tasks <- private$tasks[-1] - if (!task$using_socket_con) { + if (task$using_mori) { + # mori path: tensor data is in shared memory, only a small + # reference comes through callr's pipe. + p <- task$session$poll_process(timeout = self$.timeout) + if (p == "timeout") { + runtime_error("dataloader worker timed out.") + } + result <- task$session$read() + if (!is.null(result$error)) { + if (packageVersion("callr") >= "3.7.1") { + rlang::abort( + "Error when getting dataset item.", + parent = result$error, + class = "runtime_error" + ) + } else { + runtime_error(result$error$message) + } + } + from_exportable_tensor(result$result) + } else if (!task$using_socket_con) { # wait for the process to be ready p <- task$session$poll_process(timeout = self$.timeout) if (p == "timeout") { @@ -597,7 +619,10 @@ as_iterator.dataloader <- function(x) { # takes a tensor and saves it's state in a field so we can # reconstruct it after transfering via futures -to_exportable_tensor <- function(x, con) { +to_exportable_tensor <- function(x, con, use_mori = FALSE) { + if (use_mori) { + return(tensors_to_shared(x)) + } if (is.null(con)) { return(tensor_to_raw_vector(x)) } @@ -606,6 +631,10 @@ to_exportable_tensor <- function(x, con) { } from_exportable_tensor <- function(x) { + if (coro::is_exhausted(x)) return(x) + if (inherits(x, "torch_shared_tensor") || inherits(x, "torch_shared_batch")) { + return(tensors_from_shared(x)) + } if (!inherits(x, "connection")) { con <- rawConnection(x) on.exit({close(con)}) @@ -615,6 +644,41 @@ from_exportable_tensor <- function(x) { torch_load(con) } +# Convert batch tensors to POSIX shared memory for IPC. +# Single memcpy: tensor data -> SHM. Called in the worker process. +tensors_to_shared <- function(x) { + if (coro::is_exhausted(x)) return(x) + if (is_torch_tensor(x)) { + t <- x$cpu()$contiguous() + shm <- cpp_tensor_to_shm(t) + return(structure( + list(name = shm$name, nbytes = shm$nbytes, + shape = t$shape, dtype = tolower(as.character(t$dtype))), + class = "torch_shared_tensor" + )) + } + if (is.list(x)) { + return(structure(lapply(x, tensors_to_shared), class = c("torch_shared_batch", "list"))) + } + x +} + +# Reconstruct tensors from POSIX shared memory. +# Zero-copy: from_blob aliases the mapped SHM directly. +tensors_from_shared <- function(x) { + if (coro::is_exhausted(x)) return(x) + if (inherits(x, "torch_shared_tensor")) { + shm_ref <- cpp_map_shm(x$name, x$nbytes) + t <- torch_tensor_from_buffer(shm_ref, x$shape, x$dtype) + attr(t, ".shm_ref") <- shm_ref # prevent GC of the mapping + return(t) + } + if (inherits(x, "torch_shared_batch") || is.list(x)) { + return(lapply(x, tensors_from_shared)) + } + x +} + walk_fields <- function(env, nms, func) { for (nm in nms) { func(env[[nm]], nm) @@ -657,9 +721,12 @@ r_session <- R6::R6Class( con = NULL, session = NULL, using_socket_con = FALSE, + using_mori = FALSE, initialize = function() { - if (use_socket_con()) { - self$port <- parallelly::freePort() + if (use_mori_con()) { + self$using_mori <- TRUE + } else if (use_socket_con()) { + self$port <- parallelly::freePort() self$using_socket_con <- TRUE } self$session <- callr::r_session$new() @@ -681,3 +748,7 @@ r_session <- R6::R6Class( use_socket_con <- function() { getOption("torch.dataloader_use_socket_con", FALSE) } + +use_mori_con <- function() { + getOption("torch.dataloader_use_mori", FALSE) +} diff --git a/bench/bench-shm.R b/bench/bench-shm.R new file mode 100644 index 0000000000..35dd1cd756 --- /dev/null +++ b/bench/bench-shm.R @@ -0,0 +1,55 @@ +#!/usr/bin/env Rscript +# Benchmark: POSIX shared memory IPC vs default callr pipe for dataloaders. +# Measures only data transfer time (excludes worker startup). + +library(torch) + +make_ds <- function(n, p) { + dataset( + initialize = function() { + self$x <- matrix(rnorm(n * p), nrow = n, ncol = p) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { nrow(self$x) } + ) +} + +bench_transfer <- function(n, p, bs, nw, n_reps = 3) { + times <- numeric(n_reps) + for (r in seq_len(n_reps)) { + dl <- dataloader(make_ds(n, p)(), batch_size = bs, num_workers = nw) + iter <- dataloader_make_iter(dl) + # first batch warms up workers, discard it + dataloader_next(iter) + start <- proc.time()["elapsed"] + while (!is.null(dataloader_next(iter, completed = NULL))) { } + times[r] <- proc.time()["elapsed"] - start + } + median(times) +} + +configs <- list( + list(n = 500, p = 1000, bs = 32, label = "500x1K, bs=32"), + list(n = 200, p = 50000, bs = 64, label = "200x50K, bs=64"), + list(n = 200, p = 100000, bs = 64, label = "200x100K, bs=64"), + list(n = 100, p = 500000, bs = 64, label = "100x500K, bs=64"), + list(n = 100, p = 1000000, bs = 32, label = "100x1M, bs=32") +) + +cat("| Config | Default | SHM | Speedup | MB/batch |\n") +cat("|---|---|---|---|---|\n") + +for (cfg in configs) { + batch_mb <- cfg$bs * cfg$p * 4 / 1024^2 + + options(torch.dataloader_use_mori = FALSE) + t_default <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) + + options(torch.dataloader_use_mori = TRUE) + t_shm <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) + + cat(sprintf("| %s | %.3fs | %.3fs | %.2fx | %.1f |\n", + cfg$label, t_default, t_shm, t_default / t_shm, batch_mb)) +} diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 08311053dc..033786849c 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -51622,6 +51622,29 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// cpp_tensor_to_shm +Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor); +RcppExport SEXP _torch_cpp_tensor_to_shm(SEXP tensorSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< torch::Tensor >::type tensor(tensorSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_tensor_to_shm(tensor)); + return rcpp_result_gen; +END_RCPP +} +// cpp_map_shm +SEXP cpp_map_shm(std::string name, double nbytes_dbl); +RcppExport SEXP _torch_cpp_map_shm(SEXP nameSEXP, SEXP nbytes_dblSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + Rcpp::traits::input_parameter< double >::type nbytes_dbl(nbytes_dblSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_map_shm(name, nbytes_dbl)); + return rcpp_result_gen; +END_RCPP +} // cpp_torch_tensor_dtype Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x); RcppExport SEXP _torch_cpp_torch_tensor_dtype(SEXP xSEXP) { @@ -56388,6 +56411,8 @@ static const R_CallMethodDef CallEntries[] = { {"_torch_cpp_torch_tensor_print", (DL_FUNC) &_torch_cpp_torch_tensor_print, 2}, {"_torch_cpp_tensor_from_buffer", (DL_FUNC) &_torch_cpp_tensor_from_buffer, 3}, {"_torch_cpp_buffer_from_tensor", (DL_FUNC) &_torch_cpp_buffer_from_tensor, 1}, + {"_torch_cpp_tensor_to_shm", (DL_FUNC) &_torch_cpp_tensor_to_shm, 1}, + {"_torch_cpp_map_shm", (DL_FUNC) &_torch_cpp_map_shm, 2}, {"_torch_cpp_torch_tensor_dtype", (DL_FUNC) &_torch_cpp_torch_tensor_dtype, 1}, {"_torch_torch_tensor_cpp", (DL_FUNC) &_torch_torch_tensor_cpp, 5}, {"_torch_cpp_as_array", (DL_FUNC) &_torch_cpp_as_array, 1}, diff --git a/src/tensor.cpp b/src/tensor.cpp index 390982d109..df2899f3ad 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -57,24 +57,45 @@ void* r_dataptr(SEXP x) case LGLSXP: p = (void*) LOGICAL(x); break; case INTSXP: p = (void*) INTEGER(x); break; case REALSXP: p = (void*) REAL(x); break; - case CPLXSXP: p = (void*) COMPLEX(x); break; + case CPLXSXP: p = (void*) COMPLEX(x); break; case RAWSXP: p = (void*) RAW(x); break; case EXTPTRSXP: return (char*) R_ExternalPtrAddr(x); break; default: Rcpp::stop("invalid object type"); break; } if (p == NULL) Rcpp::stop("NULL address pointer"); - return p; + return p; +} + +// Read-only variant that avoids triggering copy-on-write materialization +// on ALTREP objects (e.g. mori shared memory vectors). Use this when +// the caller only needs to read from the buffer (e.g. torch::from_blob). +void* r_dataptr_ro(SEXP x) +{ + void* p; + switch(TYPEOF(x)) + { + case CHARSXP: p = (void*) CHAR(x); break; + case LGLSXP: + case INTSXP: + case REALSXP: + case CPLXSXP: + case RAWSXP: p = (void*) DATAPTR_RO(x); break; + case EXTPTRSXP: return (char*) R_ExternalPtrAddr(x); break; + default: Rcpp::stop("invalid object type"); break; + } + if (p == NULL) Rcpp::stop("NULL address pointer"); + return p; } // [[Rcpp::export]] torch::Tensor cpp_tensor_from_buffer(const SEXP& data, std::vector shape, XPtrTorchTensorOptions options) { return lantern_from_blob( - r_dataptr(data), - &shape[0], - shape.size(), + r_dataptr_ro(data), + &shape[0], + shape.size(), // we use the default strides nullptr, - 0, + 0, options.get() ); } @@ -88,6 +109,81 @@ SEXP cpp_buffer_from_tensor (torch::Tensor data) { return buffer; } +#ifndef _WIN32 +#include +#include +#include + +static int shm_counter = 0; + +static void shm_mapping_destructor(SEXP x) { + void* ptr = R_ExternalPtrAddr(x); + if (ptr) { + SEXP tag = R_ExternalPtrTag(x); + size_t nbytes = static_cast(REAL(tag)[0]); + munmap(ptr, nbytes); + R_ClearExternalPtr(x); + } +} + +// [[Rcpp::export]] +Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { + auto numel = lantern_Tensor_numel(tensor.get()); + auto elem_size = lantern_Tensor_element_size(tensor.get()); + size_t nbytes = static_cast(numel) * static_cast(elem_size); + + std::string name = "/torch_" + std::to_string(getpid()) + "_" + + std::to_string(shm_counter++); + + int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0600); + if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); + + if (ftruncate(fd, nbytes) < 0) { + close(fd); + shm_unlink(name.c_str()); + Rcpp::stop("ftruncate failed: %s", strerror(errno)); + } + + void* ptr = mmap(NULL, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if (ptr == MAP_FAILED) { + shm_unlink(name.c_str()); + Rcpp::stop("mmap failed: %s", strerror(errno)); + } + + // Single memcpy: tensor data -> SHM + lantern_buffer_from_tensor(tensor.get(), ptr, nbytes); + munmap(ptr, nbytes); + + return Rcpp::List::create( + Rcpp::Named("name") = name, + Rcpp::Named("nbytes") = static_cast(nbytes) + ); +} + +// [[Rcpp::export]] +SEXP cpp_map_shm(std::string name, double nbytes_dbl) { + size_t nbytes = static_cast(nbytes_dbl); + + int fd = shm_open(name.c_str(), O_RDONLY, 0); + if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); + + void* ptr = mmap(NULL, nbytes, PROT_READ, MAP_SHARED, fd, 0); + close(fd); + shm_unlink(name.c_str()); // safe: mapping persists until munmap + + if (ptr == MAP_FAILED) Rcpp::stop("mmap failed: %s", strerror(errno)); + + // External pointer: addr = mapped SHM, tag = nbytes for destructor + SEXP nbytes_sexp = PROTECT(Rf_ScalarReal(nbytes_dbl)); + SEXP extptr = PROTECT(R_MakeExternalPtr(ptr, nbytes_sexp, R_NilValue)); + R_RegisterCFinalizerEx(extptr, shm_mapping_destructor, TRUE); + UNPROTECT(2); + return extptr; +} + +#endif // _WIN32 + // [[Rcpp::export]] Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x) { XPtrTorchDtype out = lantern_Tensor_dtype(x.get()); @@ -119,7 +215,7 @@ torch::Tensor create_tensor_from_atomic(SEXP x, torch::Dtype cdtype) { strides.size(), options.get()); } - return lantern_from_blob(r_dataptr(x), &dim[0], dim.size(), &strides[0], + return lantern_from_blob(r_dataptr_ro(x), &dim[0], dim.size(), &strides[0], strides.size(), options.get()); }(); From 0afc8b9ebcde4ca4eb628178527b9371339cc140 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 14:29:58 -0300 Subject: [PATCH 02/21] Rename mori references to shm The implementation uses POSIX shared memory directly, not mori. Rename option to torch.dataloader_use_shm and internal variables accordingly. --- R/utils-data-dataloader.R | 28 ++++++++++++++-------------- bench/bench-shm.R | 4 ++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index e02367d734..3390a4253f 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -363,7 +363,7 @@ MultiProcessingDataLoaderIter <- R6::R6Class( } worker_config <- function(id, num_workers, seed, init_fn, globals, - packages, socket_port = NULL, use_mori = FALSE) { + packages, socket_port = NULL, use_shm = FALSE) { library(torch) .worker_info <<- list( id = id, @@ -407,7 +407,7 @@ MultiProcessingDataLoaderIter <- R6::R6Class( } } - .use_mori <<- use_mori + .use_shm <<- use_shm } fetcher <- self$.dataset_fetcher$fetch @@ -426,7 +426,7 @@ MultiProcessingDataLoaderIter <- R6::R6Class( globals = self$.worker_globals, packages = self$.worker_packages, socket_port = worker$port, - use_mori = worker$using_mori + use_shm = worker$using_shm ) ) @@ -466,11 +466,11 @@ MultiProcessingDataLoaderIter <- R6::R6Class( # send task to the worker if (coro::is_exhausted(index)) { worker$session$call(function() { - torch:::to_exportable_tensor(coro::exhausted(), .socket_con, .use_mori) + torch:::to_exportable_tensor(coro::exhausted(), .socket_con, .use_shm) }) } else { worker$session$call(function(index) { - torch:::to_exportable_tensor(fetcher(index), .socket_con, .use_mori) + torch:::to_exportable_tensor(fetcher(index), .socket_con, .use_shm) }, list(index = index)) } @@ -483,8 +483,8 @@ MultiProcessingDataLoaderIter <- R6::R6Class( task <- private$tasks[[1]] private$tasks <- private$tasks[-1] - if (task$using_mori) { - # mori path: tensor data is in shared memory, only a small + if (task$using_shm) { + # SHM path: tensor data is in shared memory, only a small # reference comes through callr's pipe. p <- task$session$poll_process(timeout = self$.timeout) if (p == "timeout") { @@ -619,8 +619,8 @@ as_iterator.dataloader <- function(x) { # takes a tensor and saves it's state in a field so we can # reconstruct it after transfering via futures -to_exportable_tensor <- function(x, con, use_mori = FALSE) { - if (use_mori) { +to_exportable_tensor <- function(x, con, use_shm = FALSE) { + if (use_shm) { return(tensors_to_shared(x)) } if (is.null(con)) { @@ -721,10 +721,10 @@ r_session <- R6::R6Class( con = NULL, session = NULL, using_socket_con = FALSE, - using_mori = FALSE, + using_shm = FALSE, initialize = function() { - if (use_mori_con()) { - self$using_mori <- TRUE + if (use_shm()) { + self$using_shm <- TRUE } else if (use_socket_con()) { self$port <- parallelly::freePort() self$using_socket_con <- TRUE @@ -749,6 +749,6 @@ use_socket_con <- function() { getOption("torch.dataloader_use_socket_con", FALSE) } -use_mori_con <- function() { - getOption("torch.dataloader_use_mori", FALSE) +use_shm <- function() { + getOption("torch.dataloader_use_shm", FALSE) } diff --git a/bench/bench-shm.R b/bench/bench-shm.R index 35dd1cd756..032b981288 100644 --- a/bench/bench-shm.R +++ b/bench/bench-shm.R @@ -44,10 +44,10 @@ cat("|---|---|---|---|---|\n") for (cfg in configs) { batch_mb <- cfg$bs * cfg$p * 4 / 1024^2 - options(torch.dataloader_use_mori = FALSE) + options(torch.dataloader_use_shm = FALSE) t_default <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) - options(torch.dataloader_use_mori = TRUE) + options(torch.dataloader_use_shm = TRUE) t_shm <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) cat(sprintf("| %s | %.3fs | %.3fs | %.2fx | %.1f |\n", From 996e1982d84cd49f7d68c1b8cea50159455b465a Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 14:32:57 -0300 Subject: [PATCH 03/21] Add test that re-runs all dataloader tests with SHM transport --- tests/testthat/test-utils-data-dataloader-shm.R | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 tests/testthat/test-utils-data-dataloader-shm.R diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R new file mode 100644 index 0000000000..4a7f787a93 --- /dev/null +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -0,0 +1,4 @@ +test_that("all dataloader tests pass with SHM transport", { + withr::local_options(torch.dataloader_use_shm = TRUE) + source(test_path("test-utils-data-dataloader.R"), local = TRUE) +}) From af05e5e23fa6f82774b7f8ad39ab30f2edc54bdc Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 14:46:18 -0300 Subject: [PATCH 04/21] Fix Windows build: move #ifdef inside function bodies The Rcpp-generated exports need function definitions on all platforms. Move the _WIN32 guard inside each function body so the symbols are always defined, but error at runtime on Windows. Skip SHM tests on Windows. --- src/tensor.cpp | 13 +++++++++++-- tests/testthat/test-utils-data-dataloader-shm.R | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/tensor.cpp b/src/tensor.cpp index df2899f3ad..db4ec5c7a2 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -125,9 +125,14 @@ static void shm_mapping_destructor(SEXP x) { R_ClearExternalPtr(x); } } +#endif // _WIN32 // [[Rcpp::export]] Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); + return Rcpp::List(); +#else auto numel = lantern_Tensor_numel(tensor.get()); auto elem_size = lantern_Tensor_element_size(tensor.get()); size_t nbytes = static_cast(numel) * static_cast(elem_size); @@ -159,10 +164,15 @@ Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { Rcpp::Named("name") = name, Rcpp::Named("nbytes") = static_cast(nbytes) ); +#endif } // [[Rcpp::export]] SEXP cpp_map_shm(std::string name, double nbytes_dbl) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); + return R_NilValue; +#else size_t nbytes = static_cast(nbytes_dbl); int fd = shm_open(name.c_str(), O_RDONLY, 0); @@ -180,10 +190,9 @@ SEXP cpp_map_shm(std::string name, double nbytes_dbl) { R_RegisterCFinalizerEx(extptr, shm_mapping_destructor, TRUE); UNPROTECT(2); return extptr; +#endif } -#endif // _WIN32 - // [[Rcpp::export]] Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x) { XPtrTorchDtype out = lantern_Tensor_dtype(x.get()); diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index 4a7f787a93..bfa6b85557 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -1,4 +1,5 @@ test_that("all dataloader tests pass with SHM transport", { + skip_on_os("windows") withr::local_options(torch.dataloader_use_shm = TRUE) source(test_path("test-utils-data-dataloader.R"), local = TRUE) }) From daf0665e242c7fe2549a3c5ffbead8bbf574e445 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 14:52:08 -0300 Subject: [PATCH 05/21] Enable SHM transport by default on Unix, test both paths SHM is now the default for multi-worker dataloaders on non-Windows systems. Users can revert with options(torch.dataloader_use_shm = FALSE). The test file runs the full dataloader test suite twice: once with SHM disabled (legacy path) and once with SHM enabled. --- NEWS.md | 4 ++++ R/utils-data-dataloader.R | 2 +- tests/testthat/test-utils-data-dataloader-shm.R | 7 ++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index 92d58ab7d1..babb46e1dd 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,9 @@ # torch (development version) +- Multi-worker dataloaders now use POSIX shared memory for tensor transfer on + Unix systems, resulting in up to 2x faster data loading. To revert to the + previous behavior, set `options(torch.dataloader_use_shm = FALSE)`. (#1456) + # torch 0.17.0 ## Breaking changes diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 3390a4253f..195fd576d2 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -750,5 +750,5 @@ use_socket_con <- function() { } use_shm <- function() { - getOption("torch.dataloader_use_shm", FALSE) + getOption("torch.dataloader_use_shm", .Platform$OS.type != "windows") } diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index bfa6b85557..191815a753 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -1,4 +1,9 @@ -test_that("all dataloader tests pass with SHM transport", { +test_that("all dataloader tests pass with SHM transport disabled", { + withr::local_options(torch.dataloader_use_shm = FALSE) + source(test_path("test-utils-data-dataloader.R"), local = TRUE) +}) + +test_that("all dataloader tests pass with SHM transport enabled", { skip_on_os("windows") withr::local_options(torch.dataloader_use_shm = TRUE) source(test_path("test-utils-data-dataloader.R"), local = TRUE) From 8308044c80050fb9102ffff864a62a8adf241b80 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 15:10:55 -0300 Subject: [PATCH 06/21] Fix writable SHM mappings, enable SHM by default on Unix - Map SHM with PROT_READ|PROT_WRITE + MAP_SHARED (MAP_PRIVATE is unsupported on macOS with POSIX SHM). Since we shm_unlink immediately after mapping, we are the sole owner and writes are safe. - SHM is now the default on non-Windows systems. - Add regression test for in-place tensor ops on SHM-backed batches. - Test both SHM-enabled and SHM-disabled paths. - Add NEWS bullet documenting the change and opt-out. --- src/tensor.cpp | 8 ++++--- .../testthat/test-utils-data-dataloader-shm.R | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/tensor.cpp b/src/tensor.cpp index db4ec5c7a2..88e0c5c290 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -175,12 +175,14 @@ SEXP cpp_map_shm(std::string name, double nbytes_dbl) { #else size_t nbytes = static_cast(nbytes_dbl); - int fd = shm_open(name.c_str(), O_RDONLY, 0); + int fd = shm_open(name.c_str(), O_RDWR, 0); if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); - void* ptr = mmap(NULL, nbytes, PROT_READ, MAP_SHARED, fd, 0); + // Map writable. shm_unlink orphans the region immediately so we are + // the sole owner — writes are safe and won't corrupt other processes. + void* ptr = mmap(NULL, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); close(fd); - shm_unlink(name.c_str()); // safe: mapping persists until munmap + shm_unlink(name.c_str()); // orphan: mapping persists until munmap if (ptr == MAP_FAILED) Rcpp::stop("mmap failed: %s", strerror(errno)); diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index 191815a753..ad0beba8bc 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -8,3 +8,27 @@ test_that("all dataloader tests pass with SHM transport enabled", { withr::local_options(torch.dataloader_use_shm = TRUE) source(test_path("test-utils-data-dataloader.R"), local = TRUE) }) + +test_that("in-place ops work on SHM-backed tensors", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + + ds <- dataset( + initialize = function() { + self$x <- matrix(1:20, nrow = 4, ncol = 5) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ], dtype = torch_float()) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + # in-place ops must not segfault on SHM-backed tensors + expect_no_error(batch$add_(1)) + expect_no_error(batch$div_(2)) + expect_no_error(batch$mul_(3)) +}) From a313d2604e958593f65c31c058cd5716e5f3eae4 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 15:15:44 -0300 Subject: [PATCH 07/21] Clean up unread SHM segments when iterator is finalized MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When iteration stops early (break, error, or dropping the iterator), prefetched tasks may have SHM segments that were never mapped by the consumer. The finalizer now drains outstanding tasks and calls shm_unlink on their segments to prevent /dev/shm leaks. Also fixes MAP_PRIVATE unsupported on macOS — uses MAP_SHARED with immediate shm_unlink to orphan the region, making writes safe since we are the sole owner. --- R/RcppExports.R | 4 ++++ R/utils-data-dataloader.R | 22 ++++++++++++++++++++++ src/RcppExports.cpp | 11 +++++++++++ src/tensor.cpp | 9 +++++++++ 4 files changed, 46 insertions(+) diff --git a/R/RcppExports.R b/R/RcppExports.R index c6e2418e89..2c2d30eec5 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -16949,6 +16949,10 @@ cpp_map_shm <- function(name, nbytes_dbl) { .Call(`_torch_cpp_map_shm`, name, nbytes_dbl) } +cpp_shm_unlink <- function(name) { + invisible(.Call(`_torch_cpp_shm_unlink`, name)) +} + cpp_torch_tensor_dtype <- function(x) { .Call(`_torch_cpp_torch_tensor_dtype`, x) } diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 195fd576d2..0ec6ae97a5 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -585,6 +585,17 @@ MultiProcessingDataLoaderIter <- R6::R6Class( private = list( tasks = list(), finalize = function() { + # Drain any prefetched tasks so their SHM segments are cleaned up. + for (task in private$tasks) { + tryCatch({ + task$session$poll_process(timeout = 5000) + result <- task$session$read() + if (!is.null(result$result)) { + shm_unlink_recursive(result$result) + } + }, error = function(e) NULL) + } + private$tasks <- list() lapply(private$workers, function(x) { x$close_socket_con() }) @@ -679,6 +690,17 @@ tensors_from_shared <- function(x) { x } +# Unlink SHM segments from a shared result without mapping them. +# Used during cleanup of prefetched but unconsumed tasks. +shm_unlink_recursive <- function(x) { + if (inherits(x, "torch_shared_tensor")) { + cpp_shm_unlink(x$name) + } else if (is.list(x)) { + lapply(x, shm_unlink_recursive) + } + invisible(NULL) +} + walk_fields <- function(env, nms, func) { for (nm in nms) { func(env[[nm]], nm) diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 033786849c..702f6d8a27 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -51645,6 +51645,16 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// cpp_shm_unlink +void cpp_shm_unlink(std::string name); +RcppExport SEXP _torch_cpp_shm_unlink(SEXP nameSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + cpp_shm_unlink(name); + return R_NilValue; +END_RCPP +} // cpp_torch_tensor_dtype Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x); RcppExport SEXP _torch_cpp_torch_tensor_dtype(SEXP xSEXP) { @@ -56413,6 +56423,7 @@ static const R_CallMethodDef CallEntries[] = { {"_torch_cpp_buffer_from_tensor", (DL_FUNC) &_torch_cpp_buffer_from_tensor, 1}, {"_torch_cpp_tensor_to_shm", (DL_FUNC) &_torch_cpp_tensor_to_shm, 1}, {"_torch_cpp_map_shm", (DL_FUNC) &_torch_cpp_map_shm, 2}, + {"_torch_cpp_shm_unlink", (DL_FUNC) &_torch_cpp_shm_unlink, 1}, {"_torch_cpp_torch_tensor_dtype", (DL_FUNC) &_torch_cpp_torch_tensor_dtype, 1}, {"_torch_torch_tensor_cpp", (DL_FUNC) &_torch_torch_tensor_cpp, 5}, {"_torch_cpp_as_array", (DL_FUNC) &_torch_cpp_as_array, 1}, diff --git a/src/tensor.cpp b/src/tensor.cpp index 88e0c5c290..57579f9c35 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -195,6 +195,15 @@ SEXP cpp_map_shm(std::string name, double nbytes_dbl) { #endif } +// [[Rcpp::export]] +void cpp_shm_unlink(std::string name) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); +#else + shm_unlink(name.c_str()); +#endif +} + // [[Rcpp::export]] Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x) { XPtrTorchDtype out = lantern_Tensor_dtype(x.get()); From 92557a1f6fd52301520e04a5a9f0a976cc4e0053 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 15:37:34 -0300 Subject: [PATCH 08/21] Add regression test for SHM cleanup on early iteration stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds cpp_shm_exists() helper for tests. The test creates SHM segments and verifies shm_unlink_recursive cleans them up — fails without the finalize drain logic, passes with it. --- R/RcppExports.R | 4 +++ src/RcppExports.cpp | 12 +++++++++ src/tensor.cpp | 14 +++++++++++ .../testthat/test-utils-data-dataloader-shm.R | 25 +++++++++++++++++++ 4 files changed, 55 insertions(+) diff --git a/R/RcppExports.R b/R/RcppExports.R index 2c2d30eec5..23c27d5a61 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -16949,6 +16949,10 @@ cpp_map_shm <- function(name, nbytes_dbl) { .Call(`_torch_cpp_map_shm`, name, nbytes_dbl) } +cpp_shm_exists <- function(name) { + .Call(`_torch_cpp_shm_exists`, name) +} + cpp_shm_unlink <- function(name) { invisible(.Call(`_torch_cpp_shm_unlink`, name)) } diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 702f6d8a27..9010364a22 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -51645,6 +51645,17 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// cpp_shm_exists +bool cpp_shm_exists(std::string name); +RcppExport SEXP _torch_cpp_shm_exists(SEXP nameSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_shm_exists(name)); + return rcpp_result_gen; +END_RCPP +} // cpp_shm_unlink void cpp_shm_unlink(std::string name); RcppExport SEXP _torch_cpp_shm_unlink(SEXP nameSEXP) { @@ -56423,6 +56434,7 @@ static const R_CallMethodDef CallEntries[] = { {"_torch_cpp_buffer_from_tensor", (DL_FUNC) &_torch_cpp_buffer_from_tensor, 1}, {"_torch_cpp_tensor_to_shm", (DL_FUNC) &_torch_cpp_tensor_to_shm, 1}, {"_torch_cpp_map_shm", (DL_FUNC) &_torch_cpp_map_shm, 2}, + {"_torch_cpp_shm_exists", (DL_FUNC) &_torch_cpp_shm_exists, 1}, {"_torch_cpp_shm_unlink", (DL_FUNC) &_torch_cpp_shm_unlink, 1}, {"_torch_cpp_torch_tensor_dtype", (DL_FUNC) &_torch_cpp_torch_tensor_dtype, 1}, {"_torch_torch_tensor_cpp", (DL_FUNC) &_torch_torch_tensor_cpp, 5}, diff --git a/src/tensor.cpp b/src/tensor.cpp index 57579f9c35..62e7835d61 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -195,6 +195,20 @@ SEXP cpp_map_shm(std::string name, double nbytes_dbl) { #endif } +// [[Rcpp::export]] +bool cpp_shm_exists(std::string name) { +#ifdef _WIN32 + return false; +#else + int fd = shm_open(name.c_str(), O_RDONLY, 0); + if (fd >= 0) { + close(fd); + return true; + } + return false; +#endif +} + // [[Rcpp::export]] void cpp_shm_unlink(std::string name) { #ifdef _WIN32 diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index ad0beba8bc..f56bff6215 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -32,3 +32,28 @@ test_that("in-place ops work on SHM-backed tensors", { expect_no_error(batch$div_(2)) expect_no_error(batch$mul_(3)) }) + +test_that("SHM segments from unconsumed batches are cleaned up", { + skip_on_os("windows") + + # Create SHM segments as a worker would + t <- torch_randn(10) + shm1 <- cpp_tensor_to_shm(t) + shm2 <- cpp_tensor_to_shm(t) + + expect_true(cpp_shm_exists(shm1$name)) + expect_true(cpp_shm_exists(shm2$name)) + + # shm_unlink_recursive walks a shared result and unlinks all segments + result <- structure(list( + structure(list(name = shm1$name, nbytes = shm1$nbytes, shape = 10L, dtype = "float"), + class = "torch_shared_tensor"), + structure(list(name = shm2$name, nbytes = shm2$nbytes, shape = 10L, dtype = "float"), + class = "torch_shared_tensor") + ), class = c("torch_shared_batch", "list")) + + torch:::shm_unlink_recursive(result) + + expect_false(cpp_shm_exists(shm1$name)) + expect_false(cpp_shm_exists(shm2$name)) +}) From e402021958024258a70913e395b06d243029ddc3 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 16:14:55 -0300 Subject: [PATCH 09/21] Handle zero-length tensors in SHM transfer path mmap with size 0 is invalid on POSIX. Skip SHM creation for empty tensors (numel == 0) and reconstruct them directly on the consumer side. Adds regression test that exercises a dataset returning torch_tensor(numeric(0)) through the SHM dataloader. --- R/utils-data-dataloader.R | 6 ++++- src/RcppExports.cpp | 2 +- src/tensor.cpp | 8 +++++++ .../testthat/test-utils-data-dataloader-shm.R | 23 +++++++++++++++++++ 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 0ec6ae97a5..7d91bd2989 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -679,6 +679,10 @@ tensors_to_shared <- function(x) { tensors_from_shared <- function(x) { if (coro::is_exhausted(x)) return(x) if (inherits(x, "torch_shared_tensor")) { + if (x$nbytes == 0) { + # Empty tensor — no SHM segment was created + return(torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape)) + } shm_ref <- cpp_map_shm(x$name, x$nbytes) t <- torch_tensor_from_buffer(shm_ref, x$shape, x$dtype) attr(t, ".shm_ref") <- shm_ref # prevent GC of the mapping @@ -694,7 +698,7 @@ tensors_from_shared <- function(x) { # Used during cleanup of prefetched but unconsumed tasks. shm_unlink_recursive <- function(x) { if (inherits(x, "torch_shared_tensor")) { - cpp_shm_unlink(x$name) + if (nzchar(x$name)) cpp_shm_unlink(x$name) } else if (is.list(x)) { lapply(x, shm_unlink_recursive) } diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 9010364a22..3893767b7c 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -56487,7 +56487,7 @@ static const R_CallMethodDef CallEntries[] = { }; void register_callables(DllInfo *dll); -RcppExport void R_init_torchpkg(DllInfo *dll) { +RcppExport void R_init_torch(DllInfo *dll) { R_registerRoutines(dll, NULL, CallEntries, NULL, NULL); R_useDynamicSymbols(dll, FALSE); register_callables(dll); diff --git a/src/tensor.cpp b/src/tensor.cpp index 62e7835d61..2ef8242e98 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -137,6 +137,14 @@ Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { auto elem_size = lantern_Tensor_element_size(tensor.get()); size_t nbytes = static_cast(numel) * static_cast(elem_size); + // mmap(... , 0, ...) is invalid on POSIX — skip SHM for empty tensors + if (nbytes == 0) { + return Rcpp::List::create( + Rcpp::Named("name") = "", + Rcpp::Named("nbytes") = 0.0 + ); + } + std::string name = "/torch_" + std::to_string(getpid()) + "_" + std::to_string(shm_counter++); diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index f56bff6215..3528e18cf4 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -57,3 +57,26 @@ test_that("SHM segments from unconsumed batches are cleaned up", { expect_false(cpp_shm_exists(shm1$name)) expect_false(cpp_shm_exists(shm2$name)) }) + +test_that("zero-length tensors work with SHM transport", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + + ds <- dataset( + initialize = function() {}, + .getitem = function(i) { + list( + x = torch_randn(5), + empty = torch_tensor(numeric(0)) + ) + }, + .length = function() { 10 } + ) + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_tensor_shape(batch$x, c(5, 5)) + expect_equal(batch$empty$numel(), 0) +}) From d07263573022fa61e38f34e80effc8af71beaf07 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 16:45:40 -0300 Subject: [PATCH 10/21] Handle non-tensor batch objects in SHM transport path When a custom collate_fn returns non-tensor, non-list objects (e.g. character vectors, scalars), the SHM path passes them through unchanged. from_exportable_tensor now only attempts rawConnection deserialization on raw vectors/connections, and passes everything else through as-is. --- R/utils-data-dataloader.R | 17 ++++++++----- .../testthat/test-utils-data-dataloader-shm.R | 25 +++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 7d91bd2989..0e1cd9250e 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -646,13 +646,18 @@ from_exportable_tensor <- function(x) { if (inherits(x, "torch_shared_tensor") || inherits(x, "torch_shared_batch")) { return(tensors_from_shared(x)) } - if (!inherits(x, "connection")) { - con <- rawConnection(x) - on.exit({close(con)}) - } else { - con <- x + if (is.raw(x) || inherits(x, "connection")) { + if (!inherits(x, "connection")) { + con <- rawConnection(x) + on.exit({close(con)}) + } else { + con <- x + } + return(torch_load(con)) } - torch_load(con) + # Non-tensor, non-serialized payload (e.g. from a custom collate_fn + # that returns scalars, character vectors, etc.) — pass through as-is. + x } # Convert batch tensors to POSIX shared memory for IPC. diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index 3528e18cf4..ac47f1e9ed 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -80,3 +80,28 @@ test_that("zero-length tensors work with SHM transport", { expect_tensor_shape(batch$x, c(5, 5)) expect_equal(batch$empty$numel(), 0) }) + +test_that("custom collate returning non-tensor objects works with SHM", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + + ds <- dataset( + initialize = function() {}, + .getitem = function(i) { + list(x = torch_randn(3), label = paste0("item_", i)) + }, + .length = function() { 10 } + ) + + # Custom collate that returns a character vector (not a tensor or list) + my_collate <- function(batch) { + sapply(batch, function(b) b$label) + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(is.character(batch)) + expect_equal(length(batch), 5) +}) From 53579aebe0a64cf0e449752f71e2c50180f224b3 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 17:35:28 -0300 Subject: [PATCH 11/21] Preserve list attributes in SHM roundtrip, drop torch_shared_batch class Use attributes(out) <- attributes(x) instead of a marker class to preserve names, custom classes, and other attributes through the SHM tensor replacement. from_exportable_tensor now always tries tensors_from_shared (no-op for non-shared objects) instead of checking for torch_shared_batch. Move SHM-specific regression tests (custom collate with non-tensor output, list class preservation) to the SHM test file since the old serialization path doesn't support these scenarios. --- R/utils-data-dataloader.R | 18 ++--- .../testthat/test-utils-data-dataloader-shm.R | 66 +++++++------------ tests/testthat/test-utils-data-dataloader.R | 47 ++++++++++++- 3 files changed, 81 insertions(+), 50 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 0e1cd9250e..4e48ab75be 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -643,9 +643,8 @@ to_exportable_tensor <- function(x, con, use_shm = FALSE) { from_exportable_tensor <- function(x) { if (coro::is_exhausted(x)) return(x) - if (inherits(x, "torch_shared_tensor") || inherits(x, "torch_shared_batch")) { - return(tensors_from_shared(x)) - } + # tensors_from_shared is a no-op for non-shared objects + x <- tensors_from_shared(x) if (is.raw(x) || inherits(x, "connection")) { if (!inherits(x, "connection")) { con <- rawConnection(x) @@ -674,7 +673,9 @@ tensors_to_shared <- function(x) { )) } if (is.list(x)) { - return(structure(lapply(x, tensors_to_shared), class = c("torch_shared_batch", "list"))) + out <- lapply(x, tensors_to_shared) + attributes(out) <- attributes(x) + return(out) } x } @@ -685,16 +686,17 @@ tensors_from_shared <- function(x) { if (coro::is_exhausted(x)) return(x) if (inherits(x, "torch_shared_tensor")) { if (x$nbytes == 0) { - # Empty tensor — no SHM segment was created return(torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape)) } shm_ref <- cpp_map_shm(x$name, x$nbytes) t <- torch_tensor_from_buffer(shm_ref, x$shape, x$dtype) - attr(t, ".shm_ref") <- shm_ref # prevent GC of the mapping + attr(t, ".shm_ref") <- shm_ref return(t) } - if (inherits(x, "torch_shared_batch") || is.list(x)) { - return(lapply(x, tensors_from_shared)) + if (is.list(x)) { + out <- lapply(x, tensors_from_shared) + attributes(out) <- attributes(x) + return(out) } x } diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index ac47f1e9ed..67b689274e 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -9,30 +9,6 @@ test_that("all dataloader tests pass with SHM transport enabled", { source(test_path("test-utils-data-dataloader.R"), local = TRUE) }) -test_that("in-place ops work on SHM-backed tensors", { - skip_on_os("windows") - withr::local_options(torch.dataloader_use_shm = TRUE) - - ds <- dataset( - initialize = function() { - self$x <- matrix(1:20, nrow = 4, ncol = 5) - }, - .getitem = function(i) { - torch_tensor(self$x[i, ], dtype = torch_float()) - }, - .length = function() { 4 } - ) - - dl <- dataloader(ds(), batch_size = 2, num_workers = 1) - iter <- dataloader_make_iter(dl) - batch <- dataloader_next(iter) - - # in-place ops must not segfault on SHM-backed tensors - expect_no_error(batch$add_(1)) - expect_no_error(batch$div_(2)) - expect_no_error(batch$mul_(3)) -}) - test_that("SHM segments from unconsumed batches are cleaned up", { skip_on_os("windows") @@ -45,12 +21,12 @@ test_that("SHM segments from unconsumed batches are cleaned up", { expect_true(cpp_shm_exists(shm2$name)) # shm_unlink_recursive walks a shared result and unlinks all segments - result <- structure(list( + result <- list( structure(list(name = shm1$name, nbytes = shm1$nbytes, shape = 10L, dtype = "float"), class = "torch_shared_tensor"), structure(list(name = shm2$name, nbytes = shm2$nbytes, shape = 10L, dtype = "float"), class = "torch_shared_tensor") - ), class = c("torch_shared_batch", "list")) + ) torch:::shm_unlink_recursive(result) @@ -58,50 +34,58 @@ test_that("SHM segments from unconsumed batches are cleaned up", { expect_false(cpp_shm_exists(shm2$name)) }) -test_that("zero-length tensors work with SHM transport", { +test_that("custom collate returning non-tensor objects works with SHM", { skip_on_os("windows") withr::local_options(torch.dataloader_use_shm = TRUE) ds <- dataset( initialize = function() {}, .getitem = function(i) { - list( - x = torch_randn(5), - empty = torch_tensor(numeric(0)) - ) + list(x = torch_randn(3), label = paste0("item_", i)) }, .length = function() { 10 } ) - dl <- dataloader(ds(), batch_size = 5, num_workers = 1) + my_collate <- function(batch) { + sapply(batch, function(b) b$label) + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) iter <- dataloader_make_iter(dl) batch <- dataloader_next(iter) - expect_tensor_shape(batch$x, c(5, 5)) - expect_equal(batch$empty$numel(), 0) + expect_true(is.character(batch)) + expect_equal(length(batch), 5) }) -test_that("custom collate returning non-tensor objects works with SHM", { +test_that("SHM preserves list class and names through roundtrip", { skip_on_os("windows") withr::local_options(torch.dataloader_use_shm = TRUE) ds <- dataset( - initialize = function() {}, + initialize = function() { + self$x <- matrix(rnorm(100), nrow = 10, ncol = 10) + }, .getitem = function(i) { - list(x = torch_randn(3), label = paste0("item_", i)) + list(x = torch_tensor(self$x[i, ]), y = i) }, .length = function() { 10 } ) - # Custom collate that returns a character vector (not a tensor or list) my_collate <- function(batch) { - sapply(batch, function(b) b$label) + out <- list( + x = torch_stack(lapply(batch, function(b) b$x)), + y = sapply(batch, function(b) b$y) + ) + class(out) <- c("my_batch", "list") + out } dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) iter <- dataloader_make_iter(dl) batch <- dataloader_next(iter) - expect_true(is.character(batch)) - expect_equal(length(batch), 5) + expect_true(inherits(batch, "my_batch")) + expect_named(batch, c("x", "y")) + expect_tensor_shape(batch$x, c(5, 10)) }) diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index e2fdf2e85c..a66a530d36 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -624,5 +624,50 @@ test_that("a case that errors in luz", { ds <- get_iterable_ds() dl <- dataloader(ds, batch_size = 32) expect_equal(length(coro::collect(dl)), 4) - + +}) + +test_that("in-place ops work on multiworker dataloader batches", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(1:20, nrow = 4, ncol = 5) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ], dtype = torch_float()) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_no_error(batch$add_(1)) + expect_no_error(batch$div_(2)) + expect_no_error(batch$mul_(3)) }) + +test_that("zero-length tensors work with multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() {}, + .getitem = function(i) { + list( + x = torch_randn(5), + empty = torch_tensor(numeric(0)) + ) + }, + .length = function() { 10 } + ) + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_tensor_shape(batch$x, c(5, 5)) + expect_equal(batch$empty$numel(), 0) +}) + From f26e74ff607094ae8f36a4e3f55346180f78b32b Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 18:00:55 -0300 Subject: [PATCH 12/21] Keep SHM mappings alive for derived tensors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace cpp_map_shm + attribute-based lifetime with cpp_tensor_from_shm which does the full consumer path in C++: open → mmap → from_blob → clone → munmap. The clone gives the tensor its own storage, so derived tensors ($view, $transpose, etc.) share the cloned storage rather than aliasing a fragile SHM mapping. No external pointer or attribute needed. Total cost is still 2 memcpys (1 producer + 1 consumer clone), which is faster than the default serialize + pipe + deserialize path. Adds regression test: derive a tensor via $view(), GC the original batch, verify the derived tensor is still valid. --- R/RcppExports.R | 4 +-- R/utils-data-dataloader.R | 5 +-- src/RcppExports.cpp | 14 ++++---- src/tensor.cpp | 39 +++++++++------------ tests/testthat/test-utils-data-dataloader.R | 27 ++++++++++++++ 5 files changed, 55 insertions(+), 34 deletions(-) diff --git a/R/RcppExports.R b/R/RcppExports.R index 23c27d5a61..1509a5c2d8 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -16945,8 +16945,8 @@ cpp_tensor_to_shm <- function(tensor) { .Call(`_torch_cpp_tensor_to_shm`, tensor) } -cpp_map_shm <- function(name, nbytes_dbl) { - .Call(`_torch_cpp_map_shm`, name, nbytes_dbl) +cpp_tensor_from_shm <- function(name, nbytes_dbl, shape, options) { + .Call(`_torch_cpp_tensor_from_shm`, name, nbytes_dbl, shape, options) } cpp_shm_exists <- function(name) { diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 4e48ab75be..8136140c96 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -688,10 +688,7 @@ tensors_from_shared <- function(x) { if (x$nbytes == 0) { return(torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape)) } - shm_ref <- cpp_map_shm(x$name, x$nbytes) - t <- torch_tensor_from_buffer(shm_ref, x$shape, x$dtype) - attr(t, ".shm_ref") <- shm_ref - return(t) + return(cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype))) } if (is.list(x)) { out <- lapply(x, tensors_from_shared) diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 3893767b7c..b184fe3cf9 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -51633,15 +51633,17 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// cpp_map_shm -SEXP cpp_map_shm(std::string name, double nbytes_dbl); -RcppExport SEXP _torch_cpp_map_shm(SEXP nameSEXP, SEXP nbytes_dblSEXP) { +// cpp_tensor_from_shm +torch::Tensor cpp_tensor_from_shm(std::string name, double nbytes_dbl, std::vector shape, XPtrTorchTensorOptions options); +RcppExport SEXP _torch_cpp_tensor_from_shm(SEXP nameSEXP, SEXP nbytes_dblSEXP, SEXP shapeSEXP, SEXP optionsSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); Rcpp::traits::input_parameter< double >::type nbytes_dbl(nbytes_dblSEXP); - rcpp_result_gen = Rcpp::wrap(cpp_map_shm(name, nbytes_dbl)); + Rcpp::traits::input_parameter< std::vector >::type shape(shapeSEXP); + Rcpp::traits::input_parameter< XPtrTorchTensorOptions >::type options(optionsSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_tensor_from_shm(name, nbytes_dbl, shape, options)); return rcpp_result_gen; END_RCPP } @@ -56433,7 +56435,7 @@ static const R_CallMethodDef CallEntries[] = { {"_torch_cpp_tensor_from_buffer", (DL_FUNC) &_torch_cpp_tensor_from_buffer, 3}, {"_torch_cpp_buffer_from_tensor", (DL_FUNC) &_torch_cpp_buffer_from_tensor, 1}, {"_torch_cpp_tensor_to_shm", (DL_FUNC) &_torch_cpp_tensor_to_shm, 1}, - {"_torch_cpp_map_shm", (DL_FUNC) &_torch_cpp_map_shm, 2}, + {"_torch_cpp_tensor_from_shm", (DL_FUNC) &_torch_cpp_tensor_from_shm, 4}, {"_torch_cpp_shm_exists", (DL_FUNC) &_torch_cpp_shm_exists, 1}, {"_torch_cpp_shm_unlink", (DL_FUNC) &_torch_cpp_shm_unlink, 1}, {"_torch_cpp_torch_tensor_dtype", (DL_FUNC) &_torch_cpp_torch_tensor_dtype, 1}, @@ -56487,7 +56489,7 @@ static const R_CallMethodDef CallEntries[] = { }; void register_callables(DllInfo *dll); -RcppExport void R_init_torch(DllInfo *dll) { +RcppExport void R_init_torchpkg(DllInfo *dll) { R_registerRoutines(dll, NULL, CallEntries, NULL, NULL); R_useDynamicSymbols(dll, FALSE); register_callables(dll); diff --git a/src/tensor.cpp b/src/tensor.cpp index 2ef8242e98..b703f31a1a 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -116,15 +116,6 @@ SEXP cpp_buffer_from_tensor (torch::Tensor data) { static int shm_counter = 0; -static void shm_mapping_destructor(SEXP x) { - void* ptr = R_ExternalPtrAddr(x); - if (ptr) { - SEXP tag = R_ExternalPtrTag(x); - size_t nbytes = static_cast(REAL(tag)[0]); - munmap(ptr, nbytes); - R_ClearExternalPtr(x); - } -} #endif // _WIN32 // [[Rcpp::export]] @@ -175,31 +166,35 @@ Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { #endif } +// Map SHM, create tensor via from_blob, clone it, then munmap + unlink. +// The clone gives the tensor its own storage so no SHM lifetime concerns: +// derived tensors ($view, $transpose, etc.) share the cloned storage, +// not the SHM mapping. // [[Rcpp::export]] -SEXP cpp_map_shm(std::string name, double nbytes_dbl) { +torch::Tensor cpp_tensor_from_shm(std::string name, double nbytes_dbl, + std::vector shape, + XPtrTorchTensorOptions options) { #ifdef _WIN32 Rcpp::stop("SHM transport is not supported on Windows"); - return R_NilValue; + return torch::Tensor(); #else size_t nbytes = static_cast(nbytes_dbl); - int fd = shm_open(name.c_str(), O_RDWR, 0); + int fd = shm_open(name.c_str(), O_RDONLY, 0); if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); - // Map writable. shm_unlink orphans the region immediately so we are - // the sole owner — writes are safe and won't corrupt other processes. - void* ptr = mmap(NULL, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + void* ptr = mmap(NULL, nbytes, PROT_READ, MAP_SHARED, fd, 0); close(fd); - shm_unlink(name.c_str()); // orphan: mapping persists until munmap + shm_unlink(name.c_str()); if (ptr == MAP_FAILED) Rcpp::stop("mmap failed: %s", strerror(errno)); - // External pointer: addr = mapped SHM, tag = nbytes for destructor - SEXP nbytes_sexp = PROTECT(Rf_ScalarReal(nbytes_dbl)); - SEXP extptr = PROTECT(R_MakeExternalPtr(ptr, nbytes_sexp, R_NilValue)); - R_RegisterCFinalizerEx(extptr, shm_mapping_destructor, TRUE); - UNPROTECT(2); - return extptr; + // from_blob aliases the mapping, clone copies into tensor-owned storage + torch::Tensor view = lantern_from_blob(ptr, &shape[0], shape.size(), nullptr, 0, options.get()); + torch::Tensor owned = lantern_Tensor_clone(view.get()); + + munmap(ptr, nbytes); + return owned; #endif } diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index a66a530d36..399f97042f 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -649,6 +649,33 @@ test_that("in-place ops work on multiworker dataloader batches", { expect_no_error(batch$mul_(3)) }) +test_that("derived tensors survive GC of original batch", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(1:20, nrow = 4, ncol = 5) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ], dtype = torch_float()) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + # Derive a new tensor, drop the original, force GC + v <- batch$view(-1) + expected <- as.numeric(v) + rm(batch, iter, dl) + gc() + + # v must still be valid — no segfault, correct values + expect_equal(as.numeric(v), expected) +}) + test_that("zero-length tensors work with multiworker dataloader", { if (cuda_is_available()) skip_on_os("windows") From c7d2548745918282e4d69533c73f7594f23b0a06 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 18:08:00 -0300 Subject: [PATCH 13/21] Preserve requires_grad when round-tripping tensors through SHM The SHM descriptor now records requires_grad alongside shape and dtype. tensors_from_shared calls requires_grad_(TRUE) on reconstruction when the flag was set. Adds regression test with a dataset that returns both grad-enabled and grad-disabled tensors. --- R/utils-data-dataloader.R | 11 +++++++--- tests/testthat/test-utils-data-dataloader.R | 24 +++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 8136140c96..33a18bf59c 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -668,7 +668,8 @@ tensors_to_shared <- function(x) { shm <- cpp_tensor_to_shm(t) return(structure( list(name = shm$name, nbytes = shm$nbytes, - shape = t$shape, dtype = tolower(as.character(t$dtype))), + shape = t$shape, dtype = tolower(as.character(t$dtype)), + requires_grad = t$requires_grad), class = "torch_shared_tensor" )) } @@ -686,9 +687,13 @@ tensors_from_shared <- function(x) { if (coro::is_exhausted(x)) return(x) if (inherits(x, "torch_shared_tensor")) { if (x$nbytes == 0) { - return(torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape)) + t <- torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape) + if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + return(t) } - return(cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype))) + t <- cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype)) + if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + return(t) } if (is.list(x)) { out <- lapply(x, tensors_from_shared) diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index 399f97042f..11c273ba8c 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -698,3 +698,27 @@ test_that("zero-length tensors work with multiworker dataloader", { expect_equal(batch$empty$numel(), 0) }) +test_that("requires_grad is preserved through multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + list( + x = torch_tensor(self$x[i, ])$requires_grad_(TRUE), + y = torch_tensor(self$x[i, ]) + ) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(batch$x$requires_grad) + expect_false(batch$y$requires_grad) +}) + From 54baf471b0004c4fdaa559dd5564523023a64dea Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 18:35:09 -0300 Subject: [PATCH 14/21] Preserve aliasing when the same tensor appears multiple times in a batch Memoize tensors_to_shared by xptr_address and tensors_from_shared by SHM name so that duplicate tensor references in a batch (e.g. from a custom collate returning list(a = t, b = t)) reconstruct to the same tensor, preserving storage sharing and in-place mutation visibility. --- R/utils-data-dataloader.R | 78 ++++++++++++------- .../testthat/test-utils-data-dataloader-shm.R | 1 + tests/testthat/test-utils-data-dataloader.R | 29 +++++++ 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 33a18bf59c..71612f9ad6 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -661,46 +661,64 @@ from_exportable_tensor <- function(x) { # Convert batch tensors to POSIX shared memory for IPC. # Single memcpy: tensor data -> SHM. Called in the worker process. +# Memoizes by data pointer so tensors sharing storage produce the same +# SHM descriptor, preserving aliasing through the roundtrip. tensors_to_shared <- function(x) { - if (coro::is_exhausted(x)) return(x) - if (is_torch_tensor(x)) { - t <- x$cpu()$contiguous() - shm <- cpp_tensor_to_shm(t) - return(structure( - list(name = shm$name, nbytes = shm$nbytes, - shape = t$shape, dtype = tolower(as.character(t$dtype)), - requires_grad = t$requires_grad), - class = "torch_shared_tensor" - )) - } - if (is.list(x)) { - out <- lapply(x, tensors_to_shared) - attributes(out) <- attributes(x) - return(out) + memo <- new.env(parent = emptyenv()) + to_shared <- function(x) { + if (is_torch_tensor(x)) { + t <- x$cpu()$contiguous() + key <- xptr_address(t) + if (!is.null(memo[[key]])) return(memo[[key]]) + shm <- cpp_tensor_to_shm(t) + result <- structure( + list(name = shm$name, nbytes = shm$nbytes, + shape = t$shape, dtype = tolower(as.character(t$dtype)), + requires_grad = t$requires_grad), + class = "torch_shared_tensor" + ) + memo[[key]] <- result + return(result) + } + if (is.list(x)) { + out <- lapply(x, to_shared) + attributes(out) <- attributes(x) + return(out) + } + x } - x + if (coro::is_exhausted(x)) return(x) + to_shared(x) } # Reconstruct tensors from POSIX shared memory. -# Zero-copy: from_blob aliases the mapped SHM directly. +# Memoizes by SHM name so duplicate references reconstruct to the same +# tensor, preserving storage sharing from the original batch. tensors_from_shared <- function(x) { - if (coro::is_exhausted(x)) return(x) - if (inherits(x, "torch_shared_tensor")) { - if (x$nbytes == 0) { - t <- torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape) + memo <- new.env(parent = emptyenv()) + from_shared <- function(x) { + if (inherits(x, "torch_shared_tensor")) { + if (x$nbytes == 0) { + t <- torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape) + if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + return(t) + } + key <- x$name + if (!is.null(memo[[key]])) return(memo[[key]]) + t <- cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype)) if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + memo[[key]] <- t return(t) } - t <- cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype)) - if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) - return(t) - } - if (is.list(x)) { - out <- lapply(x, tensors_from_shared) - attributes(out) <- attributes(x) - return(out) + if (is.list(x)) { + out <- lapply(x, from_shared) + attributes(out) <- attributes(x) + return(out) + } + x } - x + if (coro::is_exhausted(x)) return(x) + from_shared(x) } # Unlink SHM segments from a shared result without mapping them. diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R index 67b689274e..53446de7b4 100644 --- a/tests/testthat/test-utils-data-dataloader-shm.R +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -89,3 +89,4 @@ test_that("SHM preserves list class and names through roundtrip", { expect_named(batch, c("x", "y")) expect_tensor_shape(batch$x, c(5, 10)) }) + diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index 11c273ba8c..80ca5a8cf7 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -722,3 +722,32 @@ test_that("requires_grad is preserved through multiworker dataloader", { expect_false(batch$y$requires_grad) }) +test_that("aliased tensors from custom collate preserve sharing through multiworker", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { 4 } + ) + + # Collate that returns the same tensor in two slots + my_collate <- function(batch) { + t <- torch_stack(batch) + list(a = t, b = t) + } + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + # In-place update on a should be visible through b + batch$a$add_(100) + expect_equal(as.numeric(batch$a), as.numeric(batch$b)) +}) + + From 9abf8d1f3f42de2914764d89c5ce14f895c7af80 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 23 Apr 2026 22:44:00 -0300 Subject: [PATCH 15/21] Fall back to serialization when SHM allocation fails In containerized environments where /dev/shm is capped (e.g. Docker's default 64MB), shm_open/ftruncate can fail for large batches. to_exportable_tensor now catches SHM errors and falls back to the previous pipe-based serialization path transparently. --- R/utils-data-dataloader.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 71612f9ad6..6e1de81826 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -632,7 +632,9 @@ as_iterator.dataloader <- function(x) { # reconstruct it after transfering via futures to_exportable_tensor <- function(x, con, use_shm = FALSE) { if (use_shm) { - return(tensors_to_shared(x)) + result <- tryCatch(tensors_to_shared(x), error = function(e) NULL) + if (!is.null(result)) return(result) + # SHM failed (e.g. /dev/shm full in Docker) — fall back to serialization } if (is.null(con)) { return(tensor_to_raw_vector(x)) From a5982d8798392d527473f3946bbe1ec481d49c7f Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 08:59:25 -0300 Subject: [PATCH 16/21] Respect explicit socket-transport override over SHM Check use_socket_con() before use_shm() so that users who set options(torch.dataloader_use_socket_con = TRUE) get socket transport as expected, without needing to also disable SHM. --- R/utils-data-dataloader.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 6e1de81826..5ad6676a61 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -778,11 +778,11 @@ r_session <- R6::R6Class( using_socket_con = FALSE, using_shm = FALSE, initialize = function() { - if (use_shm()) { - self$using_shm <- TRUE - } else if (use_socket_con()) { + if (use_socket_con()) { self$port <- parallelly::freePort() self$using_socket_con <- TRUE + } else if (use_shm()) { + self$using_shm <- TRUE } self$session <- callr::r_session$new() }, From 14f5753302a9098a05b72112b7ceff14a4c89c26 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 09:07:23 -0300 Subject: [PATCH 17/21] Preserve aliasing for repeated non-contiguous tensors Compute the memoization key from xptr_address(x) before calling contiguous(), which allocates a fresh tensor and changes the address. This preserves aliasing when a custom collate returns the same non-contiguous view in multiple slots. --- R/utils-data-dataloader.R | 4 +-- tests/testthat/test-utils-data-dataloader.R | 28 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 5ad6676a61..0414f1fa97 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -669,9 +669,9 @@ tensors_to_shared <- function(x) { memo <- new.env(parent = emptyenv()) to_shared <- function(x) { if (is_torch_tensor(x)) { - t <- x$cpu()$contiguous() - key <- xptr_address(t) + key <- xptr_address(x) if (!is.null(memo[[key]])) return(memo[[key]]) + t <- x$cpu()$contiguous() shm <- cpp_tensor_to_shm(t) result <- structure( list(name = shm$name, nbytes = shm$nbytes, diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index 80ca5a8cf7..f49d35ca9d 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -750,4 +750,32 @@ test_that("aliased tensors from custom collate preserve sharing through multiwor expect_equal(as.numeric(batch$a), as.numeric(batch$b)) }) +test_that("aliased non-contiguous views preserve sharing through multiworker", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { 4 } + ) + + # Collate returns the same non-contiguous view in both slots + my_collate <- function(batch) { + t <- torch_stack(batch) + v <- t[, 1:5] + list(a = v, b = v) + } + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + batch$a$add_(100) + expect_equal(as.numeric(batch$a), as.numeric(batch$b)) +}) + From d3802315295e9e8c32485815ee27ca0b07bcff4b Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 11:41:52 -0300 Subject: [PATCH 18/21] Unlink already-created SHM segments when conversion fails partway If tensors_to_shared() fails after converting some tensors (e.g. /dev/shm fills up mid-batch), the error handler now walks the memo env and unlinks all segments created so far before re-raising. The outer tryCatch in to_exportable_tensor then catches the error and falls back to serialization without leaking SHM. --- R/utils-data-dataloader.R | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 0414f1fa97..ea7aac9131 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -690,7 +690,14 @@ tensors_to_shared <- function(x) { x } if (coro::is_exhausted(x)) return(x) - to_shared(x) + tryCatch(to_shared(x), error = function(e) { + # Clean up any SHM segments created before the failure + for (key in ls(memo)) { + nm <- memo[[key]]$name + if (nzchar(nm)) tryCatch(cpp_shm_unlink(nm), error = function(e2) NULL) + } + stop(e) + }) } # Reconstruct tensors from POSIX shared memory. From 22f947b8a8f878fb5bb33c9c3a9322fe14ec80ce Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 13:24:57 -0300 Subject: [PATCH 19/21] Preserve canonical dtype names in SHM descriptors as.character() on some torch dtypes returns C++ names (Char, Byte, ComplexFloat, ComplexDouble) that torch_tensor_from_buffer doesn't accept. Add dtype_to_shm_string() mapping to canonical names (int8, uint8, cfloat, cdouble). Adds regression test covering all standard dtypes through the multiworker dataloader. --- R/utils-data-dataloader.R | 15 ++++++++- tests/testthat/test-utils-data-dataloader.R | 37 +++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index ea7aac9131..1621ad83ae 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -661,6 +661,19 @@ from_exportable_tensor <- function(x) { x } +# Map C++ dtype names to names accepted by torch_tensor_from_buffer / dtype_from_string. +# Most match with tolower(), but a few have different canonical names. +dtype_to_shm_string <- function(dtype) { + s <- tolower(as.character(dtype)) + switch(s, + "char" = "int8", + "byte" = "uint8", + "complexfloat" = "cfloat", + "complexdouble" = "cdouble", + s + ) +} + # Convert batch tensors to POSIX shared memory for IPC. # Single memcpy: tensor data -> SHM. Called in the worker process. # Memoizes by data pointer so tensors sharing storage produce the same @@ -675,7 +688,7 @@ tensors_to_shared <- function(x) { shm <- cpp_tensor_to_shm(t) result <- structure( list(name = shm$name, nbytes = shm$nbytes, - shape = t$shape, dtype = tolower(as.character(t$dtype)), + shape = t$shape, dtype = dtype_to_shm_string(t$dtype), requires_grad = t$requires_grad), class = "torch_shared_tensor" ) diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index f49d35ca9d..a878d97640 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -750,6 +750,43 @@ test_that("aliased tensors from custom collate preserve sharing through multiwor expect_equal(as.numeric(batch$a), as.numeric(batch$b)) }) +test_that("all standard dtypes roundtrip through multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- rnorm(40) + }, + .getitem = function(i) { + v <- self$x[((i-1)*10 + 1):(i*10)] + list( + float32 = torch_tensor(v, dtype = torch_float()), + float64 = torch_tensor(v, dtype = torch_double()), + int8 = torch_tensor(as.integer(v), dtype = torch_int8()), + uint8 = torch_tensor(abs(as.integer(v)), dtype = torch_uint8()), + int16 = torch_tensor(as.integer(v), dtype = torch_int16()), + int32 = torch_tensor(as.integer(v), dtype = torch_int32()), + int64 = torch_tensor(as.integer(v), dtype = torch_int64()), + bool = torch_tensor(v > 0, dtype = torch_bool()) + ) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(batch$float32$dtype == torch_float()) + expect_true(batch$float64$dtype == torch_double()) + expect_true(batch$int8$dtype == torch_int8()) + expect_true(batch$uint8$dtype == torch_uint8()) + expect_true(batch$int16$dtype == torch_int16()) + expect_true(batch$int32$dtype == torch_int32()) + expect_true(batch$int64$dtype == torch_int64()) + expect_true(batch$bool$dtype == torch_bool()) +}) + test_that("aliased non-contiguous views preserve sharing through multiworker", { if (cuda_is_available()) skip_on_os("windows") From d09dfb48524039d6edefdcef6db139eae72279fc Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 13:33:13 -0300 Subject: [PATCH 20/21] Guard 0-dim tensors before taking &shape[0] in SHM path When shape is empty (scalar tensor), &shape[0] is undefined behavior. Pass nullptr instead, matching the pattern used by create_tensor_from_atomic. Adds regression test with a custom collate that reduces a batch to a scalar. --- src/tensor.cpp | 3 ++- tests/testthat/test-utils-data-dataloader.R | 26 +++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/tensor.cpp b/src/tensor.cpp index b703f31a1a..cf43366583 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -190,7 +190,8 @@ torch::Tensor cpp_tensor_from_shm(std::string name, double nbytes_dbl, if (ptr == MAP_FAILED) Rcpp::stop("mmap failed: %s", strerror(errno)); // from_blob aliases the mapping, clone copies into tensor-owned storage - torch::Tensor view = lantern_from_blob(ptr, &shape[0], shape.size(), nullptr, 0, options.get()); + int64_t* shape_ptr = shape.empty() ? nullptr : &shape[0]; + torch::Tensor view = lantern_from_blob(ptr, shape_ptr, shape.size(), nullptr, 0, options.get()); torch::Tensor owned = lantern_Tensor_clone(view.get()); munmap(ptr, nbytes); diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index a878d97640..203d101e78 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -787,6 +787,32 @@ test_that("all standard dtypes roundtrip through multiworker dataloader", { expect_true(batch$bool$dtype == torch_bool()) }) +test_that("scalar tensors work with multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- rnorm(10) + }, + .getitem = function(i) { + torch_tensor(self$x[i]) + }, + .length = function() { 10 } + ) + + # Custom collate that reduces to a scalar + my_collate <- function(batch) { + torch_mean(torch_stack(batch)) + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_equal(length(batch$shape), 0) # 0-dim scalar + expect_true(is_torch_tensor(batch)) +}) + test_that("aliased non-contiguous views preserve sharing through multiworker", { if (cuda_is_available()) skip_on_os("windows") From d9fb783b3733d040f88bb3d8ca672c3b1379eb22 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Fri, 24 Apr 2026 14:51:29 -0300 Subject: [PATCH 21/21] Wait indefinitely for prefetched tasks during finalization Change poll_process timeout from 5s to -1 (indefinite) in the finalizer's task drain loop. A timeout would skip read() and shm_unlink_recursive(), leaking SHM segments for slow datasets. The worker will either finish or die when the session is closed. --- R/utils-data-dataloader.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 1621ad83ae..466d1207b3 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -586,9 +586,11 @@ MultiProcessingDataLoaderIter <- R6::R6Class( tasks = list(), finalize = function() { # Drain any prefetched tasks so their SHM segments are cleaned up. + # Wait indefinitely (-1) — the worker will finish or die when the + # session is closed. A timeout would leak SHM segments. for (task in private$tasks) { tryCatch({ - task$session$poll_process(timeout = 5000) + task$session$poll_process(timeout = -1) result <- task$session$read() if (!is.null(result$result)) { shm_unlink_recursive(result$result)