diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 770ae3fe2a..da9db3a3c0 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -138,6 +138,11 @@ jobs: bash bench/benchmark_cpu_cache/run.sh 2>&1 | tee -a $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY + - name: Run SHM dataloader benchmark + run: | + echo '## SHM Dataloader Benchmark' >> $GITHUB_STEP_SUMMARY + Rscript bench/bench-shm.R 2>&1 | tee -a $GITHUB_STEP_SUMMARY + build-gpu-image: needs: lantern if: ${{ always() && needs.lantern.result != 'failed' }} diff --git a/NEWS.md b/NEWS.md index 92d58ab7d1..babb46e1dd 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,9 @@ # torch (development version) +- Multi-worker dataloaders now use POSIX shared memory for tensor transfer on + Unix systems, resulting in up to 2x faster data loading. To revert to the + previous behavior, set `options(torch.dataloader_use_shm = FALSE)`. (#1456) + # torch 0.17.0 ## Breaking changes diff --git a/R/RcppExports.R b/R/RcppExports.R index cf5595b1ee..1509a5c2d8 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -16941,6 +16941,22 @@ cpp_buffer_from_tensor <- function(data) { .Call(`_torch_cpp_buffer_from_tensor`, data) } +cpp_tensor_to_shm <- function(tensor) { + .Call(`_torch_cpp_tensor_to_shm`, tensor) +} + +cpp_tensor_from_shm <- function(name, nbytes_dbl, shape, options) { + .Call(`_torch_cpp_tensor_from_shm`, name, nbytes_dbl, shape, options) +} + +cpp_shm_exists <- function(name) { + .Call(`_torch_cpp_shm_exists`, name) +} + +cpp_shm_unlink <- function(name) { + invisible(.Call(`_torch_cpp_shm_unlink`, name)) +} + cpp_torch_tensor_dtype <- function(x) { .Call(`_torch_cpp_torch_tensor_dtype`, x) } diff --git a/R/utils-data-dataloader.R b/R/utils-data-dataloader.R index 435231c541..466d1207b3 100644 --- a/R/utils-data-dataloader.R +++ b/R/utils-data-dataloader.R @@ -363,7 +363,7 @@ MultiProcessingDataLoaderIter <- R6::R6Class( } worker_config <- function(id, num_workers, seed, init_fn, globals, - packages, socket_port = NULL) { + packages, socket_port = NULL, use_shm = FALSE) { library(torch) .worker_info <<- list( id = id, @@ -386,27 +386,28 @@ MultiProcessingDataLoaderIter <- R6::R6Class( if (!is.null(init_fn)) { init_fn(id) } - + .socket_con <<- NULL if (!is.null(socket_port)) { # We need to wait for the main process to start the server, so here we # retry a few times until the conection works. for(i in 1:20) { tr <- try({.socket_con <<- socketConnection( - port = socket_port, - blocking = TRUE, + port = socket_port, + blocking = TRUE, open = "a+b" - )}, silent = TRUE) - + )}, silent = TRUE) + if (!inherits(tr, "try-error")) break Sys.sleep(0.5) - + if (i == 20) { runtime_error("Could not create a connection with the main process.") } } } - + + .use_shm <<- use_shm } fetcher <- self$.dataset_fetcher$fetch @@ -424,7 +425,8 @@ MultiProcessingDataLoaderIter <- R6::R6Class( init_fn = self$.worker_init_fn, globals = self$.worker_globals, packages = self$.worker_packages, - socket_port = worker$port + socket_port = worker$port, + use_shm = worker$using_shm ) ) @@ -464,11 +466,11 @@ MultiProcessingDataLoaderIter <- R6::R6Class( # send task to the worker if (coro::is_exhausted(index)) { worker$session$call(function() { - torch:::to_exportable_tensor(coro::exhausted(), .socket_con) + torch:::to_exportable_tensor(coro::exhausted(), .socket_con, .use_shm) }) } else { worker$session$call(function(index) { - torch:::to_exportable_tensor(fetcher(index), .socket_con) + torch:::to_exportable_tensor(fetcher(index), .socket_con, .use_shm) }, list(index = index)) } @@ -481,7 +483,27 @@ MultiProcessingDataLoaderIter <- R6::R6Class( task <- private$tasks[[1]] private$tasks <- private$tasks[-1] - if (!task$using_socket_con) { + if (task$using_shm) { + # SHM path: tensor data is in shared memory, only a small + # reference comes through callr's pipe. + p <- task$session$poll_process(timeout = self$.timeout) + if (p == "timeout") { + runtime_error("dataloader worker timed out.") + } + result <- task$session$read() + if (!is.null(result$error)) { + if (packageVersion("callr") >= "3.7.1") { + rlang::abort( + "Error when getting dataset item.", + parent = result$error, + class = "runtime_error" + ) + } else { + runtime_error(result$error$message) + } + } + from_exportable_tensor(result$result) + } else if (!task$using_socket_con) { # wait for the process to be ready p <- task$session$poll_process(timeout = self$.timeout) if (p == "timeout") { @@ -563,6 +585,19 @@ MultiProcessingDataLoaderIter <- R6::R6Class( private = list( tasks = list(), finalize = function() { + # Drain any prefetched tasks so their SHM segments are cleaned up. + # Wait indefinitely (-1) — the worker will finish or die when the + # session is closed. A timeout would leak SHM segments. + for (task in private$tasks) { + tryCatch({ + task$session$poll_process(timeout = -1) + result <- task$session$read() + if (!is.null(result$result)) { + shm_unlink_recursive(result$result) + } + }, error = function(e) NULL) + } + private$tasks <- list() lapply(private$workers, function(x) { x$close_socket_con() }) @@ -597,7 +632,12 @@ as_iterator.dataloader <- function(x) { # takes a tensor and saves it's state in a field so we can # reconstruct it after transfering via futures -to_exportable_tensor <- function(x, con) { +to_exportable_tensor <- function(x, con, use_shm = FALSE) { + if (use_shm) { + result <- tryCatch(tensors_to_shared(x), error = function(e) NULL) + if (!is.null(result)) return(result) + # SHM failed (e.g. /dev/shm full in Docker) — fall back to serialization + } if (is.null(con)) { return(tensor_to_raw_vector(x)) } @@ -606,13 +646,114 @@ to_exportable_tensor <- function(x, con) { } from_exportable_tensor <- function(x) { - if (!inherits(x, "connection")) { - con <- rawConnection(x) - on.exit({close(con)}) - } else { - con <- x + if (coro::is_exhausted(x)) return(x) + # tensors_from_shared is a no-op for non-shared objects + x <- tensors_from_shared(x) + if (is.raw(x) || inherits(x, "connection")) { + if (!inherits(x, "connection")) { + con <- rawConnection(x) + on.exit({close(con)}) + } else { + con <- x + } + return(torch_load(con)) } - torch_load(con) + # Non-tensor, non-serialized payload (e.g. from a custom collate_fn + # that returns scalars, character vectors, etc.) — pass through as-is. + x +} + +# Map C++ dtype names to names accepted by torch_tensor_from_buffer / dtype_from_string. +# Most match with tolower(), but a few have different canonical names. +dtype_to_shm_string <- function(dtype) { + s <- tolower(as.character(dtype)) + switch(s, + "char" = "int8", + "byte" = "uint8", + "complexfloat" = "cfloat", + "complexdouble" = "cdouble", + s + ) +} + +# Convert batch tensors to POSIX shared memory for IPC. +# Single memcpy: tensor data -> SHM. Called in the worker process. +# Memoizes by data pointer so tensors sharing storage produce the same +# SHM descriptor, preserving aliasing through the roundtrip. +tensors_to_shared <- function(x) { + memo <- new.env(parent = emptyenv()) + to_shared <- function(x) { + if (is_torch_tensor(x)) { + key <- xptr_address(x) + if (!is.null(memo[[key]])) return(memo[[key]]) + t <- x$cpu()$contiguous() + shm <- cpp_tensor_to_shm(t) + result <- structure( + list(name = shm$name, nbytes = shm$nbytes, + shape = t$shape, dtype = dtype_to_shm_string(t$dtype), + requires_grad = t$requires_grad), + class = "torch_shared_tensor" + ) + memo[[key]] <- result + return(result) + } + if (is.list(x)) { + out <- lapply(x, to_shared) + attributes(out) <- attributes(x) + return(out) + } + x + } + if (coro::is_exhausted(x)) return(x) + tryCatch(to_shared(x), error = function(e) { + # Clean up any SHM segments created before the failure + for (key in ls(memo)) { + nm <- memo[[key]]$name + if (nzchar(nm)) tryCatch(cpp_shm_unlink(nm), error = function(e2) NULL) + } + stop(e) + }) +} + +# Reconstruct tensors from POSIX shared memory. +# Memoizes by SHM name so duplicate references reconstruct to the same +# tensor, preserving storage sharing from the original batch. +tensors_from_shared <- function(x) { + memo <- new.env(parent = emptyenv()) + from_shared <- function(x) { + if (inherits(x, "torch_shared_tensor")) { + if (x$nbytes == 0) { + t <- torch_tensor(numeric(0), dtype = x$dtype)$reshape(x$shape) + if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + return(t) + } + key <- x$name + if (!is.null(memo[[key]])) return(memo[[key]]) + t <- cpp_tensor_from_shm(x$name, x$nbytes, x$shape, list(dtype = x$dtype)) + if (isTRUE(x$requires_grad)) t <- t$requires_grad_(TRUE) + memo[[key]] <- t + return(t) + } + if (is.list(x)) { + out <- lapply(x, from_shared) + attributes(out) <- attributes(x) + return(out) + } + x + } + if (coro::is_exhausted(x)) return(x) + from_shared(x) +} + +# Unlink SHM segments from a shared result without mapping them. +# Used during cleanup of prefetched but unconsumed tasks. +shm_unlink_recursive <- function(x) { + if (inherits(x, "torch_shared_tensor")) { + if (nzchar(x$name)) cpp_shm_unlink(x$name) + } else if (is.list(x)) { + lapply(x, shm_unlink_recursive) + } + invisible(NULL) } walk_fields <- function(env, nms, func) { @@ -657,10 +798,13 @@ r_session <- R6::R6Class( con = NULL, session = NULL, using_socket_con = FALSE, + using_shm = FALSE, initialize = function() { if (use_socket_con()) { - self$port <- parallelly::freePort() + self$port <- parallelly::freePort() self$using_socket_con <- TRUE + } else if (use_shm()) { + self$using_shm <- TRUE } self$session <- callr::r_session$new() }, @@ -681,3 +825,7 @@ r_session <- R6::R6Class( use_socket_con <- function() { getOption("torch.dataloader_use_socket_con", FALSE) } + +use_shm <- function() { + getOption("torch.dataloader_use_shm", .Platform$OS.type != "windows") +} diff --git a/bench/bench-shm.R b/bench/bench-shm.R new file mode 100644 index 0000000000..032b981288 --- /dev/null +++ b/bench/bench-shm.R @@ -0,0 +1,55 @@ +#!/usr/bin/env Rscript +# Benchmark: POSIX shared memory IPC vs default callr pipe for dataloaders. +# Measures only data transfer time (excludes worker startup). + +library(torch) + +make_ds <- function(n, p) { + dataset( + initialize = function() { + self$x <- matrix(rnorm(n * p), nrow = n, ncol = p) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { nrow(self$x) } + ) +} + +bench_transfer <- function(n, p, bs, nw, n_reps = 3) { + times <- numeric(n_reps) + for (r in seq_len(n_reps)) { + dl <- dataloader(make_ds(n, p)(), batch_size = bs, num_workers = nw) + iter <- dataloader_make_iter(dl) + # first batch warms up workers, discard it + dataloader_next(iter) + start <- proc.time()["elapsed"] + while (!is.null(dataloader_next(iter, completed = NULL))) { } + times[r] <- proc.time()["elapsed"] - start + } + median(times) +} + +configs <- list( + list(n = 500, p = 1000, bs = 32, label = "500x1K, bs=32"), + list(n = 200, p = 50000, bs = 64, label = "200x50K, bs=64"), + list(n = 200, p = 100000, bs = 64, label = "200x100K, bs=64"), + list(n = 100, p = 500000, bs = 64, label = "100x500K, bs=64"), + list(n = 100, p = 1000000, bs = 32, label = "100x1M, bs=32") +) + +cat("| Config | Default | SHM | Speedup | MB/batch |\n") +cat("|---|---|---|---|---|\n") + +for (cfg in configs) { + batch_mb <- cfg$bs * cfg$p * 4 / 1024^2 + + options(torch.dataloader_use_shm = FALSE) + t_default <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) + + options(torch.dataloader_use_shm = TRUE) + t_shm <- bench_transfer(cfg$n, cfg$p, cfg$bs, 2) + + cat(sprintf("| %s | %.3fs | %.3fs | %.2fx | %.1f |\n", + cfg$label, t_default, t_shm, t_default / t_shm, batch_mb)) +} diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 08311053dc..b184fe3cf9 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -51622,6 +51622,52 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// cpp_tensor_to_shm +Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor); +RcppExport SEXP _torch_cpp_tensor_to_shm(SEXP tensorSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< torch::Tensor >::type tensor(tensorSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_tensor_to_shm(tensor)); + return rcpp_result_gen; +END_RCPP +} +// cpp_tensor_from_shm +torch::Tensor cpp_tensor_from_shm(std::string name, double nbytes_dbl, std::vector shape, XPtrTorchTensorOptions options); +RcppExport SEXP _torch_cpp_tensor_from_shm(SEXP nameSEXP, SEXP nbytes_dblSEXP, SEXP shapeSEXP, SEXP optionsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + Rcpp::traits::input_parameter< double >::type nbytes_dbl(nbytes_dblSEXP); + Rcpp::traits::input_parameter< std::vector >::type shape(shapeSEXP); + Rcpp::traits::input_parameter< XPtrTorchTensorOptions >::type options(optionsSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_tensor_from_shm(name, nbytes_dbl, shape, options)); + return rcpp_result_gen; +END_RCPP +} +// cpp_shm_exists +bool cpp_shm_exists(std::string name); +RcppExport SEXP _torch_cpp_shm_exists(SEXP nameSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + rcpp_result_gen = Rcpp::wrap(cpp_shm_exists(name)); + return rcpp_result_gen; +END_RCPP +} +// cpp_shm_unlink +void cpp_shm_unlink(std::string name); +RcppExport SEXP _torch_cpp_shm_unlink(SEXP nameSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< std::string >::type name(nameSEXP); + cpp_shm_unlink(name); + return R_NilValue; +END_RCPP +} // cpp_torch_tensor_dtype Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x); RcppExport SEXP _torch_cpp_torch_tensor_dtype(SEXP xSEXP) { @@ -56388,6 +56434,10 @@ static const R_CallMethodDef CallEntries[] = { {"_torch_cpp_torch_tensor_print", (DL_FUNC) &_torch_cpp_torch_tensor_print, 2}, {"_torch_cpp_tensor_from_buffer", (DL_FUNC) &_torch_cpp_tensor_from_buffer, 3}, {"_torch_cpp_buffer_from_tensor", (DL_FUNC) &_torch_cpp_buffer_from_tensor, 1}, + {"_torch_cpp_tensor_to_shm", (DL_FUNC) &_torch_cpp_tensor_to_shm, 1}, + {"_torch_cpp_tensor_from_shm", (DL_FUNC) &_torch_cpp_tensor_from_shm, 4}, + {"_torch_cpp_shm_exists", (DL_FUNC) &_torch_cpp_shm_exists, 1}, + {"_torch_cpp_shm_unlink", (DL_FUNC) &_torch_cpp_shm_unlink, 1}, {"_torch_cpp_torch_tensor_dtype", (DL_FUNC) &_torch_cpp_torch_tensor_dtype, 1}, {"_torch_torch_tensor_cpp", (DL_FUNC) &_torch_torch_tensor_cpp, 5}, {"_torch_cpp_as_array", (DL_FUNC) &_torch_cpp_as_array, 1}, diff --git a/src/tensor.cpp b/src/tensor.cpp index 390982d109..cf43366583 100644 --- a/src/tensor.cpp +++ b/src/tensor.cpp @@ -57,24 +57,45 @@ void* r_dataptr(SEXP x) case LGLSXP: p = (void*) LOGICAL(x); break; case INTSXP: p = (void*) INTEGER(x); break; case REALSXP: p = (void*) REAL(x); break; - case CPLXSXP: p = (void*) COMPLEX(x); break; + case CPLXSXP: p = (void*) COMPLEX(x); break; case RAWSXP: p = (void*) RAW(x); break; case EXTPTRSXP: return (char*) R_ExternalPtrAddr(x); break; default: Rcpp::stop("invalid object type"); break; } if (p == NULL) Rcpp::stop("NULL address pointer"); - return p; + return p; +} + +// Read-only variant that avoids triggering copy-on-write materialization +// on ALTREP objects (e.g. mori shared memory vectors). Use this when +// the caller only needs to read from the buffer (e.g. torch::from_blob). +void* r_dataptr_ro(SEXP x) +{ + void* p; + switch(TYPEOF(x)) + { + case CHARSXP: p = (void*) CHAR(x); break; + case LGLSXP: + case INTSXP: + case REALSXP: + case CPLXSXP: + case RAWSXP: p = (void*) DATAPTR_RO(x); break; + case EXTPTRSXP: return (char*) R_ExternalPtrAddr(x); break; + default: Rcpp::stop("invalid object type"); break; + } + if (p == NULL) Rcpp::stop("NULL address pointer"); + return p; } // [[Rcpp::export]] torch::Tensor cpp_tensor_from_buffer(const SEXP& data, std::vector shape, XPtrTorchTensorOptions options) { return lantern_from_blob( - r_dataptr(data), - &shape[0], - shape.size(), + r_dataptr_ro(data), + &shape[0], + shape.size(), // we use the default strides nullptr, - 0, + 0, options.get() ); } @@ -88,6 +109,119 @@ SEXP cpp_buffer_from_tensor (torch::Tensor data) { return buffer; } +#ifndef _WIN32 +#include +#include +#include + +static int shm_counter = 0; + +#endif // _WIN32 + +// [[Rcpp::export]] +Rcpp::List cpp_tensor_to_shm(torch::Tensor tensor) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); + return Rcpp::List(); +#else + auto numel = lantern_Tensor_numel(tensor.get()); + auto elem_size = lantern_Tensor_element_size(tensor.get()); + size_t nbytes = static_cast(numel) * static_cast(elem_size); + + // mmap(... , 0, ...) is invalid on POSIX — skip SHM for empty tensors + if (nbytes == 0) { + return Rcpp::List::create( + Rcpp::Named("name") = "", + Rcpp::Named("nbytes") = 0.0 + ); + } + + std::string name = "/torch_" + std::to_string(getpid()) + "_" + + std::to_string(shm_counter++); + + int fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0600); + if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); + + if (ftruncate(fd, nbytes) < 0) { + close(fd); + shm_unlink(name.c_str()); + Rcpp::stop("ftruncate failed: %s", strerror(errno)); + } + + void* ptr = mmap(NULL, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if (ptr == MAP_FAILED) { + shm_unlink(name.c_str()); + Rcpp::stop("mmap failed: %s", strerror(errno)); + } + + // Single memcpy: tensor data -> SHM + lantern_buffer_from_tensor(tensor.get(), ptr, nbytes); + munmap(ptr, nbytes); + + return Rcpp::List::create( + Rcpp::Named("name") = name, + Rcpp::Named("nbytes") = static_cast(nbytes) + ); +#endif +} + +// Map SHM, create tensor via from_blob, clone it, then munmap + unlink. +// The clone gives the tensor its own storage so no SHM lifetime concerns: +// derived tensors ($view, $transpose, etc.) share the cloned storage, +// not the SHM mapping. +// [[Rcpp::export]] +torch::Tensor cpp_tensor_from_shm(std::string name, double nbytes_dbl, + std::vector shape, + XPtrTorchTensorOptions options) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); + return torch::Tensor(); +#else + size_t nbytes = static_cast(nbytes_dbl); + + int fd = shm_open(name.c_str(), O_RDONLY, 0); + if (fd < 0) Rcpp::stop("shm_open failed: %s", strerror(errno)); + + void* ptr = mmap(NULL, nbytes, PROT_READ, MAP_SHARED, fd, 0); + close(fd); + shm_unlink(name.c_str()); + + if (ptr == MAP_FAILED) Rcpp::stop("mmap failed: %s", strerror(errno)); + + // from_blob aliases the mapping, clone copies into tensor-owned storage + int64_t* shape_ptr = shape.empty() ? nullptr : &shape[0]; + torch::Tensor view = lantern_from_blob(ptr, shape_ptr, shape.size(), nullptr, 0, options.get()); + torch::Tensor owned = lantern_Tensor_clone(view.get()); + + munmap(ptr, nbytes); + return owned; +#endif +} + +// [[Rcpp::export]] +bool cpp_shm_exists(std::string name) { +#ifdef _WIN32 + return false; +#else + int fd = shm_open(name.c_str(), O_RDONLY, 0); + if (fd >= 0) { + close(fd); + return true; + } + return false; +#endif +} + +// [[Rcpp::export]] +void cpp_shm_unlink(std::string name) { +#ifdef _WIN32 + Rcpp::stop("SHM transport is not supported on Windows"); +#else + shm_unlink(name.c_str()); +#endif +} + // [[Rcpp::export]] Rcpp::XPtr cpp_torch_tensor_dtype(torch::Tensor x) { XPtrTorchDtype out = lantern_Tensor_dtype(x.get()); @@ -119,7 +253,7 @@ torch::Tensor create_tensor_from_atomic(SEXP x, torch::Dtype cdtype) { strides.size(), options.get()); } - return lantern_from_blob(r_dataptr(x), &dim[0], dim.size(), &strides[0], + return lantern_from_blob(r_dataptr_ro(x), &dim[0], dim.size(), &strides[0], strides.size(), options.get()); }(); diff --git a/tests/testthat/test-utils-data-dataloader-shm.R b/tests/testthat/test-utils-data-dataloader-shm.R new file mode 100644 index 0000000000..53446de7b4 --- /dev/null +++ b/tests/testthat/test-utils-data-dataloader-shm.R @@ -0,0 +1,92 @@ +test_that("all dataloader tests pass with SHM transport disabled", { + withr::local_options(torch.dataloader_use_shm = FALSE) + source(test_path("test-utils-data-dataloader.R"), local = TRUE) +}) + +test_that("all dataloader tests pass with SHM transport enabled", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + source(test_path("test-utils-data-dataloader.R"), local = TRUE) +}) + +test_that("SHM segments from unconsumed batches are cleaned up", { + skip_on_os("windows") + + # Create SHM segments as a worker would + t <- torch_randn(10) + shm1 <- cpp_tensor_to_shm(t) + shm2 <- cpp_tensor_to_shm(t) + + expect_true(cpp_shm_exists(shm1$name)) + expect_true(cpp_shm_exists(shm2$name)) + + # shm_unlink_recursive walks a shared result and unlinks all segments + result <- list( + structure(list(name = shm1$name, nbytes = shm1$nbytes, shape = 10L, dtype = "float"), + class = "torch_shared_tensor"), + structure(list(name = shm2$name, nbytes = shm2$nbytes, shape = 10L, dtype = "float"), + class = "torch_shared_tensor") + ) + + torch:::shm_unlink_recursive(result) + + expect_false(cpp_shm_exists(shm1$name)) + expect_false(cpp_shm_exists(shm2$name)) +}) + +test_that("custom collate returning non-tensor objects works with SHM", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + + ds <- dataset( + initialize = function() {}, + .getitem = function(i) { + list(x = torch_randn(3), label = paste0("item_", i)) + }, + .length = function() { 10 } + ) + + my_collate <- function(batch) { + sapply(batch, function(b) b$label) + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(is.character(batch)) + expect_equal(length(batch), 5) +}) + +test_that("SHM preserves list class and names through roundtrip", { + skip_on_os("windows") + withr::local_options(torch.dataloader_use_shm = TRUE) + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(100), nrow = 10, ncol = 10) + }, + .getitem = function(i) { + list(x = torch_tensor(self$x[i, ]), y = i) + }, + .length = function() { 10 } + ) + + my_collate <- function(batch) { + out <- list( + x = torch_stack(lapply(batch, function(b) b$x)), + y = sapply(batch, function(b) b$y) + ) + class(out) <- c("my_batch", "list") + out + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(inherits(batch, "my_batch")) + expect_named(batch, c("x", "y")) + expect_tensor_shape(batch$x, c(5, 10)) +}) + diff --git a/tests/testthat/test-utils-data-dataloader.R b/tests/testthat/test-utils-data-dataloader.R index e2fdf2e85c..203d101e78 100644 --- a/tests/testthat/test-utils-data-dataloader.R +++ b/tests/testthat/test-utils-data-dataloader.R @@ -624,5 +624,221 @@ test_that("a case that errors in luz", { ds <- get_iterable_ds() dl <- dataloader(ds, batch_size = 32) expect_equal(length(coro::collect(dl)), 4) - + +}) + +test_that("in-place ops work on multiworker dataloader batches", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(1:20, nrow = 4, ncol = 5) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ], dtype = torch_float()) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_no_error(batch$add_(1)) + expect_no_error(batch$div_(2)) + expect_no_error(batch$mul_(3)) +}) + +test_that("derived tensors survive GC of original batch", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(1:20, nrow = 4, ncol = 5) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ], dtype = torch_float()) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + # Derive a new tensor, drop the original, force GC + v <- batch$view(-1) + expected <- as.numeric(v) + rm(batch, iter, dl) + gc() + + # v must still be valid — no segfault, correct values + expect_equal(as.numeric(v), expected) +}) + +test_that("zero-length tensors work with multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() {}, + .getitem = function(i) { + list( + x = torch_randn(5), + empty = torch_tensor(numeric(0)) + ) + }, + .length = function() { 10 } + ) + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_tensor_shape(batch$x, c(5, 5)) + expect_equal(batch$empty$numel(), 0) +}) + +test_that("requires_grad is preserved through multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + list( + x = torch_tensor(self$x[i, ])$requires_grad_(TRUE), + y = torch_tensor(self$x[i, ]) + ) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(batch$x$requires_grad) + expect_false(batch$y$requires_grad) }) + +test_that("aliased tensors from custom collate preserve sharing through multiworker", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { 4 } + ) + + # Collate that returns the same tensor in two slots + my_collate <- function(batch) { + t <- torch_stack(batch) + list(a = t, b = t) + } + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + # In-place update on a should be visible through b + batch$a$add_(100) + expect_equal(as.numeric(batch$a), as.numeric(batch$b)) +}) + +test_that("all standard dtypes roundtrip through multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- rnorm(40) + }, + .getitem = function(i) { + v <- self$x[((i-1)*10 + 1):(i*10)] + list( + float32 = torch_tensor(v, dtype = torch_float()), + float64 = torch_tensor(v, dtype = torch_double()), + int8 = torch_tensor(as.integer(v), dtype = torch_int8()), + uint8 = torch_tensor(abs(as.integer(v)), dtype = torch_uint8()), + int16 = torch_tensor(as.integer(v), dtype = torch_int16()), + int32 = torch_tensor(as.integer(v), dtype = torch_int32()), + int64 = torch_tensor(as.integer(v), dtype = torch_int64()), + bool = torch_tensor(v > 0, dtype = torch_bool()) + ) + }, + .length = function() { 4 } + ) + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_true(batch$float32$dtype == torch_float()) + expect_true(batch$float64$dtype == torch_double()) + expect_true(batch$int8$dtype == torch_int8()) + expect_true(batch$uint8$dtype == torch_uint8()) + expect_true(batch$int16$dtype == torch_int16()) + expect_true(batch$int32$dtype == torch_int32()) + expect_true(batch$int64$dtype == torch_int64()) + expect_true(batch$bool$dtype == torch_bool()) +}) + +test_that("scalar tensors work with multiworker dataloader", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- rnorm(10) + }, + .getitem = function(i) { + torch_tensor(self$x[i]) + }, + .length = function() { 10 } + ) + + # Custom collate that reduces to a scalar + my_collate <- function(batch) { + torch_mean(torch_stack(batch)) + } + + dl <- dataloader(ds(), batch_size = 5, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + expect_equal(length(batch$shape), 0) # 0-dim scalar + expect_true(is_torch_tensor(batch)) +}) + +test_that("aliased non-contiguous views preserve sharing through multiworker", { + if (cuda_is_available()) skip_on_os("windows") + + ds <- dataset( + initialize = function() { + self$x <- matrix(rnorm(40), nrow = 4, ncol = 10) + }, + .getitem = function(i) { + torch_tensor(self$x[i, ]) + }, + .length = function() { 4 } + ) + + # Collate returns the same non-contiguous view in both slots + my_collate <- function(batch) { + t <- torch_stack(batch) + v <- t[, 1:5] + list(a = v, b = v) + } + + dl <- dataloader(ds(), batch_size = 2, num_workers = 1, collate_fn = my_collate) + iter <- dataloader_make_iter(dl) + batch <- dataloader_next(iter) + + batch$a$add_(100) + expect_equal(as.numeric(batch$a), as.numeric(batch$b)) +}) + +