-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Support u32
indices for HashJoinExec
#16434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cc @Dandandan |
🤖 |
🤖: Benchmark completed Details
|
Those benchmarks make sense, just saves memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for using 32-bit row indices in the hash join executor when the build side has fewer than u32::MAX
rows to save memory.
- Introduce
JoinHashMapType
trait withJoinHashMapU32
andJoinHashMapU64
implementations - Change all callers and constructors of the old
JoinHashMap
to useBox<dyn JoinHashMapType>
- Update memory estimation to pick
u32
oru64
variant based on row count and add corresponding tests
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
joins/utils.rs | Remove obsolete JoinHashMap export, keep only JoinHashMapType |
joins/symmetric_hash_join.rs | Update get_matched_indices calls to accept boxed iterator |
joins/stream_join_utils.rs | Implement JoinHashMapType for PruningJoinHashMap using new helpers |
joins/join_hash_map.rs | Define JoinHashMapType , JoinHashMapU32 , JoinHashMapU64 , and generic helpers |
joins/hash_join.rs | Construct boxed hash map variant, update memory estimation, and adjust tests |
Comments suppressed due to low confidence (3)
datafusion/physical-plan/src/joins/join_hash_map.rs:38
- [nitpick] Clarify this comment to indicate that the chained list uses either
Vec<u32>
orVec<u64>
rather than both. E.g., “stored as eitherVec<u32>
orVec<u64>
based on size requirements.”
/// The indices (values) are stored in a separate chained list stored as `Vec<u32>` `Vec<u64>`.
datafusion/physical-plan/src/joins/hash_join.rs:3581
- [nitpick] The test name now differs from the u32 variant (which uses
collisions
plural). For consistency, consider renaming both tests to match a common pattern, e.g.,join_with_hash_collision_u32
andjoin_with_hash_collision_u64
.
fn join_with_hash_collisions_u64() -> Result<()> {
datafusion/physical-plan/src/joins/join_hash_map.rs:93
- [nitpick] Add a doc comment to
JoinHashMapType
explaining its purpose and when each method should be used; this will help maintainers understand the runtime‐selected index strategy.
pub trait JoinHashMapType: Send + Sync {
// Flatten the deque | ||
let next: Vec<u64> = self.next.iter().copied().collect(); | ||
get_matched_indices::<u64>(&self.map, &next, iter, deleted_offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Collecting the VecDeque
into a new Vec<u64>
on every call can be expensive. Consider reusing a buffer or accessing the deque’s contiguous slice when possible to reduce allocations.
// Flatten the deque | |
let next: Vec<u64> = self.next.iter().copied().collect(); | |
get_matched_indices::<u64>(&self.map, &next, iter, deleted_offset) | |
// Access the deque's slices directly | |
let (head, tail) = self.next.as_slices(); | |
get_matched_indices::<u64>(&self.map, head, tail, iter, deleted_offset) |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be alittle bit difficult to do without changing function parameters
Which issue does this PR close?
Rationale for this change
We can use
u32
indices instead ofu64
indices when there are less thanu32::MAX
rows when building the hashmap. This acts as a memory optimizationWhat changes are included in this PR?
During
HashJoinExec
we construct theJoinLeftData
with aBox<dyn JoinHashMapType>
choosing between a u32 indice or u64JoinHashMap
.I changed the
JoinHashMapType
to hold theupdate_from_iter
,get_matched_indice
, andget_matched_indices_with_limit_offset
and split theJoinHashMap
intoJoinHashMapU32
andJoinHashMapU64
.I deliberately did not try to expose a generic in the trait, nor did I try to create a generic on the
JoinHashMap
struct as doing so would lead to many problems with having to call preceding functions with a generic; doing so is not possible since we are determining theJoinHashMapType
during runtime.Are these changes tested?
Yes I added a test for checking the hashmap created using u32 indices.