Skip to content

Commit 2426ec8

Browse files
committed
feat(swc_parallel): Add parallel iterators
1 parent 29bd286 commit 2426ec8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+19389
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
use super::{plumbing::*, *};
2+
3+
struct BlocksCallback<S, C> {
4+
sizes: S,
5+
consumer: C,
6+
len: usize,
7+
}
8+
9+
impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
10+
where
11+
C: UnindexedConsumer<T>,
12+
S: Iterator<Item = usize>,
13+
{
14+
type Output = C::Result;
15+
16+
fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
17+
let mut remaining_len = self.len;
18+
let mut consumer = self.consumer;
19+
20+
// we need a local variable for the accumulated results
21+
// we call the reducer's identity by splitting at 0
22+
let (left_consumer, right_consumer, _) = consumer.split_at(0);
23+
let mut leftmost_res = left_consumer.into_folder().complete();
24+
consumer = right_consumer;
25+
26+
// now we loop on each block size
27+
while remaining_len > 0 && !consumer.full() {
28+
// we compute the next block's size
29+
let size = self.sizes.next().unwrap_or(usize::MAX);
30+
let capped_size = remaining_len.min(size);
31+
remaining_len -= capped_size;
32+
33+
// split the producer
34+
let (left_producer, right_producer) = producer.split_at(capped_size);
35+
producer = right_producer;
36+
37+
// split the consumer
38+
let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
39+
consumer = right_consumer;
40+
41+
leftmost_res = consumer.to_reducer().reduce(
42+
leftmost_res,
43+
bridge_producer_consumer(capped_size, left_producer, left_consumer),
44+
);
45+
}
46+
leftmost_res
47+
}
48+
}
49+
50+
/// `ExponentialBlocks` is a parallel iterator that consumes itself as a
51+
/// sequence of parallel blocks of increasing sizes (exponentially).
52+
///
53+
/// This struct is created by the [`by_exponential_blocks()`] method on
54+
/// [`IndexedParallelIterator`]
55+
///
56+
/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
57+
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
58+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
59+
#[derive(Debug, Clone)]
60+
pub struct ExponentialBlocks<I> {
61+
base: I,
62+
}
63+
64+
impl<I> ExponentialBlocks<I> {
65+
pub(super) fn new(base: I) -> Self {
66+
Self { base }
67+
}
68+
}
69+
70+
impl<I> ParallelIterator for ExponentialBlocks<I>
71+
where
72+
I: IndexedParallelIterator,
73+
{
74+
type Item = I::Item;
75+
76+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
77+
where
78+
C: UnindexedConsumer<Self::Item>,
79+
{
80+
let first = crate::current_num_threads();
81+
let callback = BlocksCallback {
82+
consumer,
83+
sizes: std::iter::successors(Some(first), exponential_size),
84+
len: self.base.len(),
85+
};
86+
self.base.with_producer(callback)
87+
}
88+
}
89+
90+
fn exponential_size(size: &usize) -> Option<usize> {
91+
Some(size.saturating_mul(2))
92+
}
93+
94+
/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
95+
/// of parallel blocks of constant sizes.
96+
///
97+
/// This struct is created by the [`by_uniform_blocks()`] method on
98+
/// [`IndexedParallelIterator`]
99+
///
100+
/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
101+
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
102+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
103+
#[derive(Debug, Clone)]
104+
pub struct UniformBlocks<I> {
105+
base: I,
106+
block_size: usize,
107+
}
108+
109+
impl<I> UniformBlocks<I> {
110+
pub(super) fn new(base: I, block_size: usize) -> Self {
111+
Self { base, block_size }
112+
}
113+
}
114+
115+
impl<I> ParallelIterator for UniformBlocks<I>
116+
where
117+
I: IndexedParallelIterator,
118+
{
119+
type Item = I::Item;
120+
121+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
122+
where
123+
C: UnindexedConsumer<Self::Item>,
124+
{
125+
let callback = BlocksCallback {
126+
consumer,
127+
sizes: std::iter::repeat(self.block_size),
128+
len: self.base.len(),
129+
};
130+
self.base.with_producer(callback)
131+
}
132+
}

0 commit comments

Comments
 (0)