From 46afdd89a70a61b5a0212aa27c00a3331f596459 Mon Sep 17 00:00:00 2001 From: Tomoaki Kawada Date: Wed, 19 Feb 2025 19:18:05 +0900 Subject: [PATCH] Preserve null dictionary values in `interleave` and `concat` kernels (#7144) * fix(select): preserve null values in `merge_dictionary_values` This function internally computes value masks describing which values from input dictionaries should remain in the output. Values never referenced by keys are considered redundant. Null values were considered redundant, but they are now preserved as of this commit. This change is necessary because keys can reference null values. Before this commit, the entries of `MergedDictionaries::key_mappings` corresponding to null values were left unset. This caused `concat` and `interleave` to remap all elements referencing them to whatever value at index 0, producing an erroneous result. * test(select): add test case `concat::test_string_dictionary_array_nulls_in_values` This test case passes dictionary arrays containing null values (but no null keys) to `concat`. * test(select): add test case `interleave::test_interleave_dictionary_nulls` This test case passes two dictionary arrays each containing null values or keys to `interleave`. * refactor(select): add type alias for `Interner` bucket Addresses `clippy::type-complexity`. --- arrow-select/src/concat.rs | 18 +++++++++++++++ arrow-select/src/dictionary.rs | 41 ++++++++++++++++++++++++---------- arrow-select/src/interleave.rs | 24 ++++++++++++++++++++ 3 files changed, 71 insertions(+), 12 deletions(-) diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 1f453466dc9b..050f4ae96a8a 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -739,6 +739,24 @@ mod tests { ) } + #[test] + fn test_string_dictionary_array_nulls_in_values() { + let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]); + let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]); + let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values)); + + let input_2_keys = Int32Array::from_iter_values([0]); + let input_2_values = StringArray::from(vec![None, Some("hello")]); + let input_2 = DictionaryArray::new(input_2_keys, Arc::new(input_2_values)); + + let expected = vec![Some("foo"), Some("bar"), None, Some("fiz"), None]; + + let concat = concat(&[&input_1 as _, &input_2 as _]).unwrap(); + let dictionary = concat.as_dictionary::(); + let actual = collect_string_dictionary(dictionary); + assert_eq!(actual, expected); + } + #[test] fn test_string_dictionary_merge() { let mut builder = StringDictionaryBuilder::::new(); diff --git a/arrow-select/src/dictionary.rs b/arrow-select/src/dictionary.rs index c363b99920a7..57aed644fe0c 100644 --- a/arrow-select/src/dictionary.rs +++ b/arrow-select/src/dictionary.rs @@ -32,10 +32,13 @@ use arrow_schema::{ArrowError, DataType}; /// Hash collisions will result in replacement struct Interner<'a, V> { state: RandomState, - buckets: Vec>, + buckets: Vec>>, shift: u32, } +/// A single bucket in [`Interner`]. +type InternerBucket<'a, V> = (Option<&'a [u8]>, V); + impl<'a, V> Interner<'a, V> { /// Capacity controls the number of unique buckets allocated within the Interner /// @@ -54,7 +57,11 @@ impl<'a, V> Interner<'a, V> { } } - fn intern Result, E>(&mut self, new: &'a [u8], f: F) -> Result<&V, E> { + fn intern Result, E>( + &mut self, + new: Option<&'a [u8]>, + f: F, + ) -> Result<&V, E> { let hash = self.state.hash_one(new); let bucket_idx = hash >> self.shift; Ok(match &mut self.buckets[bucket_idx as usize] { @@ -151,15 +158,19 @@ pub fn merge_dictionary_values( for (idx, dictionary) in dictionaries.iter().enumerate() { let mask = masks.and_then(|m| m.get(idx)); - let key_mask = match (dictionary.logical_nulls(), mask) { - (Some(n), None) => Some(n.into_inner()), - (None, Some(n)) => Some(n.clone()), - (Some(n), Some(m)) => Some(n.inner() & m), + let key_mask_owned; + let key_mask = match (dictionary.nulls(), mask) { + (Some(n), None) => Some(n.inner()), + (None, Some(n)) => Some(n), + (Some(n), Some(m)) => { + key_mask_owned = n.inner() & m; + Some(&key_mask_owned) + } (None, None) => None, }; let keys = dictionary.keys().values(); let values = dictionary.values().as_ref(); - let values_mask = compute_values_mask(keys, key_mask.as_ref(), values.len()); + let values_mask = compute_values_mask(keys, key_mask, values.len()); let masked_values = get_masked_values(values, &values_mask); num_values += masked_values.len(); @@ -223,7 +234,10 @@ fn compute_values_mask( } /// Return a Vec containing for each set index in `mask`, the index and byte value of that index -fn get_masked_values<'a>(array: &'a dyn Array, mask: &BooleanBuffer) -> Vec<(usize, &'a [u8])> { +fn get_masked_values<'a>( + array: &'a dyn Array, + mask: &BooleanBuffer, +) -> Vec<(usize, Option<&'a [u8]>)> { match array.data_type() { DataType::Utf8 => masked_bytes(array.as_string::(), mask), DataType::LargeUtf8 => masked_bytes(array.as_string::(), mask), @@ -239,10 +253,13 @@ fn get_masked_values<'a>(array: &'a dyn Array, mask: &BooleanBuffer) -> Vec<(usi fn masked_bytes<'a, T: ByteArrayType>( array: &'a GenericByteArray, mask: &BooleanBuffer, -) -> Vec<(usize, &'a [u8])> { +) -> Vec<(usize, Option<&'a [u8]>)> { let mut out = Vec::with_capacity(mask.count_set_bits()); for idx in mask.set_indices() { - out.push((idx, array.value(idx).as_ref())) + out.push(( + idx, + array.is_valid(idx).then_some(array.value(idx).as_ref()), + )) } out } @@ -311,10 +328,10 @@ mod tests { let b = DictionaryArray::new(Int32Array::new_null(10), Arc::new(StringArray::new_null(0))); let merged = merge_dictionary_values(&[&a, &b], None).unwrap(); - let expected = StringArray::from(vec!["bingo", "hello"]); + let expected = StringArray::from(vec![None, Some("bingo"), Some("hello")]); assert_eq!(merged.values.as_ref(), &expected); assert_eq!(merged.key_mappings.len(), 2); - assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]); + assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 2, 0]); assert_eq!(&merged.key_mappings[1], &[] as &[i32; 0]); } diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index 3557bda8f4c9..5fc019da78f1 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -441,6 +441,30 @@ mod tests { assert_eq!(&collected, &["c", "c", "c"]); } + #[test] + fn test_interleave_dictionary_nulls() { + let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]); + let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]); + let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values)); + let input_2: DictionaryArray = vec![None].into_iter().collect(); + + let expected = vec![Some("fiz"), None, None, Some("foo")]; + + let values = interleave( + &[&input_1 as _, &input_2 as _], + &[(0, 3), (0, 2), (1, 0), (0, 0)], + ) + .unwrap(); + let dictionary = values.as_dictionary::(); + let actual: Vec> = dictionary + .downcast_dict::() + .unwrap() + .into_iter() + .collect(); + + assert_eq!(actual, expected); + } + #[test] fn test_lists() { // [[1, 2], null, [3]]