Add POSIX shared memory transport for multi-worker dataloaders#1456
Open
Add POSIX shared memory transport for multi-worker dataloaders#1456
Conversation
When `options(torch.dataloader_use_mori = TRUE)`, the dataloader uses POSIX shared memory (shm_open/mmap) instead of callr's pipe or socket connections for transferring tensor data between worker processes and the main process. The SHM path does a single memcpy (tensor → SHM) on the producer and zero copies on the consumer (from_blob aliases the mapped memory), compared to serialize + pipe transfer + deserialize in the default path. Changes: - src/tensor.cpp: Add cpp_tensor_to_shm() and cpp_map_shm() for direct POSIX SHM operations. Add r_dataptr_ro() to avoid ALTREP COW when reading buffers via from_blob. - R/utils-data-dataloader.R: Add SHM transport as a third option alongside callr pipe and socket connections, controlled by the torch.dataloader_use_mori option. - bench/bench-shm.R: Benchmark comparing default vs SHM transport. - .github/workflows/main.yaml: Add SHM benchmark to CI.
The implementation uses POSIX shared memory directly, not mori. Rename option to torch.dataloader_use_shm and internal variables accordingly.
db1944c to
996e198
Compare
The Rcpp-generated exports need function definitions on all platforms. Move the _WIN32 guard inside each function body so the symbols are always defined, but error at runtime on Windows. Skip SHM tests on Windows.
SHM is now the default for multi-worker dataloaders on non-Windows systems. Users can revert with options(torch.dataloader_use_shm = FALSE). The test file runs the full dataloader test suite twice: once with SHM disabled (legacy path) and once with SHM enabled.
- Map SHM with PROT_READ|PROT_WRITE + MAP_SHARED (MAP_PRIVATE is unsupported on macOS with POSIX SHM). Since we shm_unlink immediately after mapping, we are the sole owner and writes are safe. - SHM is now the default on non-Windows systems. - Add regression test for in-place tensor ops on SHM-backed batches. - Test both SHM-enabled and SHM-disabled paths. - Add NEWS bullet documenting the change and opt-out.
When iteration stops early (break, error, or dropping the iterator), prefetched tasks may have SHM segments that were never mapped by the consumer. The finalizer now drains outstanding tasks and calls shm_unlink on their segments to prevent /dev/shm leaks. Also fixes MAP_PRIVATE unsupported on macOS — uses MAP_SHARED with immediate shm_unlink to orphan the region, making writes safe since we are the sole owner.
Adds cpp_shm_exists() helper for tests. The test creates SHM segments and verifies shm_unlink_recursive cleans them up — fails without the finalize drain logic, passes with it.
mmap with size 0 is invalid on POSIX. Skip SHM creation for empty tensors (numel == 0) and reconstruct them directly on the consumer side. Adds regression test that exercises a dataset returning torch_tensor(numeric(0)) through the SHM dataloader.
When a custom collate_fn returns non-tensor, non-list objects (e.g. character vectors, scalars), the SHM path passes them through unchanged. from_exportable_tensor now only attempts rawConnection deserialization on raw vectors/connections, and passes everything else through as-is.
Use attributes(out) <- attributes(x) instead of a marker class to preserve names, custom classes, and other attributes through the SHM tensor replacement. from_exportable_tensor now always tries tensors_from_shared (no-op for non-shared objects) instead of checking for torch_shared_batch. Move SHM-specific regression tests (custom collate with non-tensor output, list class preservation) to the SHM test file since the old serialization path doesn't support these scenarios.
Replace cpp_map_shm + attribute-based lifetime with cpp_tensor_from_shm which does the full consumer path in C++: open → mmap → from_blob → clone → munmap. The clone gives the tensor its own storage, so derived tensors ($view, $transpose, etc.) share the cloned storage rather than aliasing a fragile SHM mapping. No external pointer or attribute needed. Total cost is still 2 memcpys (1 producer + 1 consumer clone), which is faster than the default serialize + pipe + deserialize path. Adds regression test: derive a tensor via $view(), GC the original batch, verify the derived tensor is still valid.
The SHM descriptor now records requires_grad alongside shape and dtype. tensors_from_shared calls requires_grad_(TRUE) on reconstruction when the flag was set. Adds regression test with a dataset that returns both grad-enabled and grad-disabled tensors.
Memoize tensors_to_shared by xptr_address and tensors_from_shared by SHM name so that duplicate tensor references in a batch (e.g. from a custom collate returning list(a = t, b = t)) reconstruct to the same tensor, preserving storage sharing and in-place mutation visibility.
In containerized environments where /dev/shm is capped (e.g. Docker's default 64MB), shm_open/ftruncate can fail for large batches. to_exportable_tensor now catches SHM errors and falls back to the previous pipe-based serialization path transparently.
Check use_socket_con() before use_shm() so that users who set options(torch.dataloader_use_socket_con = TRUE) get socket transport as expected, without needing to also disable SHM.
Compute the memoization key from xptr_address(x) before calling contiguous(), which allocates a fresh tensor and changes the address. This preserves aliasing when a custom collate returns the same non-contiguous view in multiple slots.
If tensors_to_shared() fails after converting some tensors (e.g. /dev/shm fills up mid-batch), the error handler now walks the memo env and unlinks all segments created so far before re-raising. The outer tryCatch in to_exportable_tensor then catches the error and falls back to serialization without leaking SHM.
as.character() on some torch dtypes returns C++ names (Char, Byte, ComplexFloat, ComplexDouble) that torch_tensor_from_buffer doesn't accept. Add dtype_to_shm_string() mapping to canonical names (int8, uint8, cfloat, cdouble). Adds regression test covering all standard dtypes through the multiworker dataloader.
When shape is empty (scalar tensor), &shape[0] is undefined behavior. Pass nullptr instead, matching the pattern used by create_tensor_from_atomic. Adds regression test with a custom collate that reduces a batch to a scalar.
Change poll_process timeout from 5s to -1 (indefinite) in the finalizer's task drain loop. A timeout would skip read() and shm_unlink_recursive(), leaking SHM segments for slow datasets. The worker will either finish or die when the session is closed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
shm_open/mmap), enabled viaoptions(torch.dataloader_use_shm = TRUE).from_blobaliases the mapped memory), compared to serialize + pipe transfer + deserialize in the default path.r_dataptr_ro()in the C++ layer to avoid triggering ALTREP copy-on-write when reading buffers viatorch::from_blob.Benchmark results (excluding worker startup)
Test plan