44//! CUDA stream utility functions.
55
66use std:: fmt:: Debug ;
7+ use std:: mem:: size_of;
8+ use std:: mem:: size_of_val;
79use std:: ops:: Deref ;
810use std:: sync:: Arc ;
911
1012use cudarc:: driver:: CudaSlice ;
1113use cudarc:: driver:: CudaStream ;
1214use cudarc:: driver:: DeviceRepr ;
15+ use cudarc:: driver:: ValidAsZeroBits ;
1316use cudarc:: driver:: result:: stream;
1417use futures:: future:: BoxFuture ;
1518use kanal:: Sender ;
1619use tracing:: warn;
1720use vortex:: array:: buffer:: BufferHandle ;
1821use vortex:: error:: VortexResult ;
22+ use vortex:: error:: vortex_ensure;
1923use vortex:: error:: vortex_err;
2024
2125use crate :: CudaDeviceBuffer ;
@@ -62,22 +66,32 @@ impl VortexCudaStream {
6266 /// synchronously before returning. For **pinned** host memory the transfer
6367 /// is truly async and the source must stay alive until the copy completes
6468 /// (guaranteed by the returned future capturing it).
69+ ///
70+ /// The returned [`BufferHandle`] keeps the source byte length, while its
71+ /// CUDA allocation may include zeroed tail padding. This is needed for
72+ /// Arrow validity buffers passed to cuDF, which reads masks as 32-bit words.
6573 pub ( crate ) fn copy_to_device < T , D > (
6674 & self ,
6775 data : D ,
6876 ) -> VortexResult < BoxFuture < ' static , VortexResult < BufferHandle > > >
6977 where
70- T : DeviceRepr + Debug + Send + Sync + ' static ,
78+ T : DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + ' static ,
7179 D : AsRef < [ T ] > + Send + ' static ,
7280 {
7381 let host_slice: & [ T ] = data. as_ref ( ) ;
82+ let byte_count = size_of_val ( host_slice) ;
83+ let allocation_len = padded_device_allocation_len :: < T > ( byte_count) ?;
7484 // `device_alloc` binds the CUDA context to the current thread.
75- let mut cuda_slice: CudaSlice < T > = self . device_alloc ( host_slice . len ( ) ) ?;
85+ let mut cuda_slice: CudaSlice < T > = self . device_alloc :: < T > ( allocation_len ) ?;
7686
77- self . memcpy_htod ( host_slice, & mut cuda_slice)
87+ let mut values = cuda_slice. slice_mut ( ..host_slice. len ( ) ) ;
88+ self . memcpy_htod ( host_slice, & mut values)
7889 . map_err ( |e| vortex_err ! ( "Failed to schedule H2D copy: {}" , e) ) ?;
7990
91+ zero_padding ( self , & mut cuda_slice, host_slice. len ( ) ) ?;
92+
8093 let cuda_buf = CudaDeviceBuffer :: new ( cuda_slice) ;
94+ let buffer = BufferHandle :: new_device ( Arc :: new ( cuda_buf) ) . slice ( 0 ..byte_count) ;
8195 let stream = Arc :: clone ( & self . 0 ) ;
8296
8397 Ok ( Box :: pin ( async move {
@@ -86,7 +100,7 @@ impl VortexCudaStream {
86100 // Keep source memory alive until copy completes.
87101 let _keep_alive = data;
88102
89- Ok ( BufferHandle :: new_device ( Arc :: new ( cuda_buf ) ) )
103+ Ok ( buffer )
90104 } ) )
91105 }
92106
@@ -99,20 +113,62 @@ impl VortexCudaStream {
99113 /// For **pageable** host memory (the common case), `memcpy_htod` stages
100114 /// the source into a driver-managed pinned buffer before returning, so
101115 /// the source data is safe to drop after this call.
116+ ///
117+ /// Like [`copy_to_device`](Self::copy_to_device), this preserves the source
118+ /// byte length on the returned handle while keeping any tail padding in the
119+ /// backing CUDA allocation.
102120 pub ( crate ) fn copy_to_device_sync < T > ( & self , data : & [ T ] ) -> VortexResult < BufferHandle >
103121 where
104- T : DeviceRepr + Debug + Send + Sync + ' static ,
122+ T : DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + ' static ,
105123 {
106- let mut cuda_slice: CudaSlice < T > = self . device_alloc ( data. len ( ) ) ?;
124+ let byte_count = size_of_val ( data) ;
125+ let allocation_len = padded_device_allocation_len :: < T > ( byte_count) ?;
126+ let mut cuda_slice: CudaSlice < T > = self . device_alloc ( allocation_len) ?;
107127
108- self . memcpy_htod ( data, & mut cuda_slice)
128+ let mut values = cuda_slice. slice_mut ( ..data. len ( ) ) ;
129+ self . memcpy_htod ( data, & mut values)
109130 . map_err ( |e| vortex_err ! ( "Failed to schedule H2D copy: {}" , e) ) ?;
110131
132+ zero_padding ( self , & mut cuda_slice, data. len ( ) ) ?;
133+
111134 let cuda_buf = CudaDeviceBuffer :: new ( cuda_slice) ;
112- Ok ( BufferHandle :: new_device ( Arc :: new ( cuda_buf) ) )
135+ Ok ( BufferHandle :: new_device ( Arc :: new ( cuda_buf) ) . slice ( 0 ..byte_count ) )
113136 }
114137}
115138
139+ /// Returns the typed CUDA allocation length for `byte_count`.
140+ ///
141+ /// The backing allocation is padded for cuDF's 32-bit validity mask reads.
142+ /// The returned length is in `T` elements.
143+ fn padded_device_allocation_len < T > ( byte_count : usize ) -> VortexResult < usize > {
144+ let element_size = size_of :: < T > ( ) ;
145+ vortex_ensure ! (
146+ element_size != 0 ,
147+ "cannot copy zero-sized values to CUDA device"
148+ ) ;
149+ let min_allocation_bytes = byte_count. next_multiple_of ( size_of :: < u32 > ( ) ) ;
150+ Ok ( min_allocation_bytes. div_ceil ( element_size) )
151+ }
152+
153+ /// Zeroes the allocation tail after the copied values.
154+ ///
155+ /// Returned handles are sliced to the copied byte count; the trailing padding
156+ /// exists so a final 32-bit mask read stays within the backing allocation.
157+ fn zero_padding < T : DeviceRepr + ValidAsZeroBits > (
158+ stream : & VortexCudaStream ,
159+ cuda_slice : & mut CudaSlice < T > ,
160+ copied_len : usize ,
161+ ) -> VortexResult < ( ) > {
162+ if copied_len >= cuda_slice. len ( ) {
163+ return Ok ( ( ) ) ;
164+ }
165+
166+ let mut padding = cuda_slice. slice_mut ( copied_len..) ;
167+ stream
168+ . memset_zeros ( & mut padding)
169+ . map_err ( |e| vortex_err ! ( "Failed to zero device buffer padding: {}" , e) )
170+ }
171+
116172/// Registers a callback and asynchronously waits for its completion.
117173///
118174/// This function can be used to asynchronously wait for events previously
@@ -191,3 +247,47 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec
191247
192248 Ok ( rx. to_async ( ) )
193249}
250+
251+ #[ cfg( test) ]
252+ mod tests {
253+ use vortex:: error:: VortexResult ;
254+ use vortex:: session:: VortexSession ;
255+
256+ use super :: padded_device_allocation_len;
257+ use crate :: CudaSession ;
258+
259+ #[ test]
260+ fn test_padded_device_allocation_len ( ) -> VortexResult < ( ) > {
261+ assert_eq ! ( padded_device_allocation_len:: <u8 >( 0 ) ?, 0 ) ;
262+ assert_eq ! ( padded_device_allocation_len:: <u8 >( 1 ) ?, 4 ) ;
263+ assert_eq ! ( padded_device_allocation_len:: <u8 >( 4 ) ?, 4 ) ;
264+ assert_eq ! ( padded_device_allocation_len:: <u8 >( 5 ) ?, 8 ) ;
265+ assert_eq ! ( padded_device_allocation_len:: <u32 >( 1 ) ?, 1 ) ;
266+ assert_eq ! ( padded_device_allocation_len:: <u32 >( 5 ) ?, 2 ) ;
267+ Ok ( ( ) )
268+ }
269+
270+ #[ crate :: test]
271+ async fn test_copy_to_device_preserves_visible_len_with_padding ( ) -> VortexResult < ( ) > {
272+ let ctx = CudaSession :: create_execution_ctx ( & VortexSession :: empty ( ) ) ?;
273+ let handle = ctx. stream ( ) . copy_to_device ( vec ! [ 0xab_u8 ] ) ?. await ?;
274+
275+ assert_eq ! ( handle. len( ) , 1 ) ;
276+ let host = handle. try_to_host ( ) ?. await ?;
277+ assert_eq ! ( host. as_slice( ) , & [ 0xab ] ) ;
278+
279+ Ok ( ( ) )
280+ }
281+
282+ #[ crate :: test]
283+ async fn test_copy_to_device_sync_preserves_visible_len_with_padding ( ) -> VortexResult < ( ) > {
284+ let ctx = CudaSession :: create_execution_ctx ( & VortexSession :: empty ( ) ) ?;
285+ let handle = ctx. stream ( ) . copy_to_device_sync ( & [ 1_u8 , 2 , 3 , 4 , 5 ] ) ?;
286+
287+ assert_eq ! ( handle. len( ) , 5 ) ;
288+ let host = handle. try_to_host ( ) ?. await ?;
289+ assert_eq ! ( host. as_slice( ) , & [ 1 , 2 , 3 , 4 , 5 ] ) ;
290+
291+ Ok ( ( ) )
292+ }
293+ }
0 commit comments