17
17
*
18
18
*/
19
19
20
- //! example function for concat recordbatch(may not work)
21
- //! ```rust
22
- //! # use arrow::record_batch::RecordBatch;
23
- //! # use arrow::error::Result;
24
- //!
25
- //! fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result<RecordBatch> {
26
- //! let schema = batch1.schema();
27
- //! let columns = schema
28
- //! .fields()
29
- //! .iter()
30
- //! .enumerate()
31
- //! .map(|(i, _)| -> Result<_> {
32
- //! let array1 = batch1.column(i);
33
- //! let array2 = batch2.column(i);
34
- //! let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?;
35
- //! Ok(array)
36
- //! })
37
- //! .collect::<Result<Vec<_>>>()?;
38
- //!
39
- //! RecordBatch::try_new(schema.clone(), columns)
40
- //! }
41
- //! ```
42
-
43
20
use std:: {
44
21
collections:: { HashMap , HashSet } ,
45
22
sync:: Arc ,
46
23
} ;
47
24
48
- use arrow_array:: {
49
- Array , ArrayRef , RecordBatch , StringArray , TimestampMillisecondArray , UInt64Array ,
50
- } ;
25
+ use arrow_array:: { ArrayRef , RecordBatch , StringArray , TimestampMillisecondArray , UInt64Array } ;
51
26
use arrow_schema:: { ArrowError , DataType , Field , Schema , TimeUnit } ;
52
27
use arrow_select:: take:: take;
53
28
use chrono:: { DateTime , Utc } ;
@@ -62,31 +37,6 @@ use serde_json::{Map, Value};
62
37
63
38
use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
64
39
65
- /// Replaces columns in a record batch with new arrays.
66
- ///
67
- /// # Arguments
68
- ///
69
- /// * `schema` - The schema of the record batch.
70
- /// * `batch` - The record batch to modify.
71
- /// * `indexes` - The indexes of the columns to replace.
72
- /// * `arrays` - The new arrays to replace the columns with.
73
- ///
74
- /// # Returns
75
- ///
76
- /// The modified record batch with the columns replaced.
77
- pub fn replace_columns (
78
- schema : Arc < Schema > ,
79
- batch : & RecordBatch ,
80
- indexes : & [ usize ] ,
81
- arrays : & [ Arc < dyn Array + ' static > ] ,
82
- ) -> RecordBatch {
83
- let mut batch_arrays = batch. columns ( ) . iter ( ) . map ( Arc :: clone) . collect_vec ( ) ;
84
- for ( & index, arr) in indexes. iter ( ) . zip ( arrays. iter ( ) ) {
85
- batch_arrays[ index] = Arc :: clone ( arr) ;
86
- }
87
- RecordBatch :: try_new ( schema, batch_arrays) . unwrap ( )
88
- }
89
-
90
40
/// Converts a slice of record batches to JSON.
91
41
///
92
42
/// # Arguments
@@ -213,40 +163,11 @@ pub fn reverse(rb: &RecordBatch) -> RecordBatch {
213
163
mod tests {
214
164
use std:: sync:: Arc ;
215
165
216
- use arrow_array:: { Array , Int32Array , RecordBatch } ;
217
- use arrow_schema:: { DataType , Field , Schema } ;
166
+ use arrow_array:: RecordBatch ;
167
+ use arrow_schema:: Schema ;
218
168
219
169
use super :: * ;
220
170
221
- #[ test]
222
- fn check_replace ( ) {
223
- let schema = Schema :: new ( vec ! [
224
- Field :: new( "a" , DataType :: Int32 , false ) ,
225
- Field :: new( "b" , DataType :: Int32 , false ) ,
226
- Field :: new( "c" , DataType :: Int32 , false ) ,
227
- ] ) ;
228
-
229
- let schema_ref = Arc :: new ( schema) ;
230
-
231
- let rb = RecordBatch :: try_new (
232
- schema_ref. clone ( ) ,
233
- vec ! [
234
- Arc :: new( Int32Array :: from_value( 0 , 3 ) ) ,
235
- Arc :: new( Int32Array :: from_value( 0 , 3 ) ) ,
236
- Arc :: new( Int32Array :: from_value( 0 , 3 ) ) ,
237
- ] ,
238
- )
239
- . unwrap ( ) ;
240
-
241
- let arr: Arc < dyn Array + ' static > = Arc :: new ( Int32Array :: from_value ( 0 , 3 ) ) ;
242
-
243
- let new_rb = replace_columns ( schema_ref. clone ( ) , & rb, & [ 2 ] , & [ arr] ) ;
244
-
245
- assert_eq ! ( new_rb. schema( ) , schema_ref) ;
246
- assert_eq ! ( new_rb. num_columns( ) , 3 ) ;
247
- assert_eq ! ( new_rb. num_rows( ) , 3 )
248
- }
249
-
250
171
#[ test]
251
172
fn check_empty_json_to_record_batches ( ) {
252
173
let r = RecordBatch :: new_empty ( Arc :: new ( Schema :: empty ( ) ) ) ;
0 commit comments