Skip to content

Commit 978e29a

Browse files
committed
Add distributed::reduce() algorithm
1 parent 2f9ba14 commit 978e29a

File tree

3 files changed

+571
-0
lines changed

3 files changed

+571
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,395 @@
1+
//---------------------------------------------------------------------------//
2+
// Copyright (c) 2016 Jakub Szuppe <[email protected]>
3+
//
4+
// Distributed under the Boost Software License, Version 1.0
5+
// See accompanying file LICENSE_1_0.txt or copy at
6+
// http://www.boost.org/LICENSE_1_0.txt
7+
//
8+
// See http://boostorg.github.com/compute for more information.
9+
//---------------------------------------------------------------------------//
10+
11+
#ifndef BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
12+
#define BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
13+
14+
#include <vector>
15+
16+
#include <boost/utility/enable_if.hpp>
17+
18+
#include <boost/compute/buffer.hpp>
19+
#include <boost/compute/algorithm/reduce.hpp>
20+
#include <boost/compute/algorithm/copy_n.hpp>
21+
#include <boost/compute/iterator/buffer_iterator.hpp>
22+
#include <boost/compute/type_traits/is_device_iterator.hpp>
23+
24+
#include <boost/compute/distributed/command_queue.hpp>
25+
#include <boost/compute/distributed/vector.hpp>
26+
27+
namespace boost {
28+
namespace compute {
29+
namespace distributed {
30+
31+
namespace detail {
32+
33+
template<class OutputIterator>
34+
inline ::boost::compute::command_queue&
35+
final_reduce_queue(OutputIterator result,
36+
command_queue &queue,
37+
typename boost::enable_if_c<
38+
!is_device_iterator<OutputIterator>::value
39+
>::type* = 0)
40+
{
41+
(void) result;
42+
43+
::boost::compute::command_queue& device_queue = queue.get(0);
44+
// CPU device is preferred, however if there is none, the first device
45+
// queue is used
46+
for(size_t i = 0; i < queue.size(); i++)
47+
{
48+
if(queue.get(i).get_device().type() & ::boost::compute::device::cpu)
49+
{
50+
device_queue = queue.get(i);
51+
break;
52+
}
53+
}
54+
return device_queue;
55+
}
56+
57+
template<class OutputIterator>
58+
inline ::boost::compute::command_queue&
59+
final_reduce_queue(OutputIterator result,
60+
command_queue &queue,
61+
typename boost::enable_if_c<
62+
is_device_iterator<OutputIterator>::value
63+
>::type* = 0)
64+
{
65+
// first, find all queues that can be used with result iterator
66+
const ::boost::compute::context& result_context =
67+
result.get_buffer().get_context();
68+
std::vector<size_t> compatible_queues;
69+
for(size_t i = 0; i < queue.size(); i++)
70+
{
71+
if(queue.get(i).get_context() == result_context)
72+
{
73+
compatible_queues.push_back(i);
74+
}
75+
}
76+
BOOST_ASSERT_MSG(
77+
!compatible_queues.empty(),
78+
"There is no device command queue that can be use to copy to result"
79+
);
80+
81+
// then choose device queue from compatible device queues
82+
83+
// CPU device is preferred, however if there is none, the first
84+
// compatible device queue is used
85+
::boost::compute::command_queue& device_queue = queue.get(compatible_queues[0]);
86+
for(size_t i = 0; i < compatible_queues.size(); i++)
87+
{
88+
size_t n = compatible_queues[i];
89+
if(queue.get(n).get_device().type() & ::boost::compute::device::cpu)
90+
{
91+
device_queue = queue.get(n);
92+
break;
93+
}
94+
}
95+
return device_queue;
96+
}
97+
98+
template<
99+
class InputType, weight_func weight, class Alloc,
100+
class OutputIterator,
101+
class BinaryFunction
102+
>
103+
inline void
104+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
105+
OutputIterator result,
106+
BinaryFunction function,
107+
command_queue &queue)
108+
{
109+
typedef typename
110+
boost::compute::result_of<BinaryFunction(InputType, InputType)>::type
111+
result_type;
112+
113+
// find device queue for the final reduction
114+
::boost::compute::command_queue& device_queue =
115+
final_reduce_queue(result, queue);
116+
117+
::boost::compute::buffer parts_results_device(
118+
device_queue.get_context(), input.parts() * sizeof(result_type)
119+
);
120+
121+
// if all devices queues are in the same OpenCL context we can
122+
// save part reduction directly into parts_results_device buffer
123+
size_t reduced = 0;
124+
if(queue.one_context())
125+
{
126+
// reduce each part of input vector
127+
for(size_t i = 0; i < input.parts(); i++)
128+
{
129+
if(input.begin(i) != input.end(i))
130+
{
131+
// async, because it stores result on device
132+
::boost::compute::reduce(
133+
input.begin(i),
134+
input.end(i),
135+
::boost::compute::make_buffer_iterator<result_type>(
136+
parts_results_device, reduced
137+
),
138+
function,
139+
queue.get(i)
140+
);
141+
reduced++;
142+
}
143+
}
144+
// add marker on every queue that is not device_queue, because
145+
// we need to know when reductions are done
146+
wait_list reduce_markers;
147+
reduce_markers.reserve(reduced);
148+
for(size_t i = 0; i < input.parts(); i++)
149+
{
150+
if(input.begin(i) != input.end(i) && queue.get(i) != device_queue)
151+
{
152+
reduce_markers.insert(queue.get(i).enqueue_marker());
153+
}
154+
}
155+
// if it is possible we enqueue a barrier in device_queue which waits
156+
// for reduce_markers (we can do this since all events are in the same
157+
// context); otherwise, we need to sync. wait for those events
158+
#ifdef CL_VERSION_1_2
159+
if(device_queue.check_device_version(1, 2)) {
160+
device_queue.enqueue_barrier(reduce_markers);
161+
}
162+
#endif
163+
{
164+
reduce_markers.wait();
165+
}
166+
}
167+
else
168+
{
169+
// reduce each part of input vector
170+
std::vector<result_type> parts_results_host(input.parts());
171+
for(size_t i = 0; i < input.parts(); i++)
172+
{
173+
if(input.begin(i) != input.end(i))
174+
{
175+
// sync, because it stores result on host
176+
::boost::compute::reduce(
177+
input.begin(i),
178+
input.end(i),
179+
&parts_results_host[reduced],
180+
function,
181+
queue.get(i)
182+
);
183+
reduced++;
184+
}
185+
}
186+
// sync, because it copies from host to device
187+
::boost::compute::copy_n(
188+
parts_results_host.begin(),
189+
reduced,
190+
::boost::compute::make_buffer_iterator<result_type>(
191+
parts_results_device
192+
),
193+
device_queue
194+
);
195+
}
196+
// final reduction
197+
// async if result is device_iterator, sync otherwise
198+
::boost::compute::reduce(
199+
::boost::compute::make_buffer_iterator<result_type>(
200+
parts_results_device
201+
),
202+
::boost::compute::make_buffer_iterator<result_type>(
203+
parts_results_device, reduced
204+
),
205+
result,
206+
function,
207+
device_queue
208+
);
209+
}
210+
211+
// special case for when OutputIterator is a host iterator
212+
// and binary operator is plus<T>
213+
template<
214+
class InputType, weight_func weight, class Alloc,
215+
class OutputIterator,
216+
class T
217+
>
218+
inline void
219+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
220+
OutputIterator result,
221+
::boost::compute::plus<T> function,
222+
command_queue &queue,
223+
typename boost::enable_if_c<
224+
!is_device_iterator<OutputIterator>::value
225+
>::type* = 0)
226+
{
227+
// reduce each part of input vector
228+
std::vector<T> parts_results_host(input.parts());
229+
for(size_t i = 0; i < input.parts(); i++)
230+
{
231+
::boost::compute::reduce(
232+
input.begin(i),
233+
input.end(i),
234+
&parts_results_host[i],
235+
function,
236+
queue.get(i)
237+
);
238+
}
239+
240+
// final reduction
241+
*result = parts_results_host[0];
242+
for(size_t i = 1; i < input.parts(); i++)
243+
{
244+
*result += static_cast<T>(parts_results_host[i]);
245+
}
246+
}
247+
248+
// special case for when OutputIterator is a host iterator
249+
// and binary operator is min<T>
250+
template<
251+
class InputType, weight_func weight, class Alloc,
252+
class OutputIterator,
253+
class T
254+
>
255+
inline void
256+
dispatch_reduce(vector<InputType, weight, Alloc> &input,
257+
OutputIterator result,
258+
::boost::compute::min<T> function,
259+
command_queue &queue,
260+
typename boost::enable_if_c<
261+
!is_device_iterator<OutputIterator>::value
262+
>::type* = 0)
263+
{
264+
// reduce each part of input vector
265+
std::vector<T> parts_results_host(input.parts());
266+
for(size_t i = 0; i < input.parts(); i++)
267+
{
268+
::boost::compute::reduce(
269+
input.begin(i),
270+
input.end(i),
271+
&parts_results_host[i],
272+
function,
273+
queue.get(i)
274+
);
275+
}
276+
277+
// final reduction
278+
*result = parts_results_host[0];
279+
for(size_t i = 1; i < input.parts(); i++)
280+
{
281+
*result = (std::min)(static_cast<T>(*result), parts_results_host[i]);
282+
}
283+
}
284+
285+
// special case for when OutputIterator is a host iterator
286+
// and binary operator is max<T>
287+
template<
288+
class InputType, weight_func weight, class Alloc,
289+
class OutputIterator,
290+
class T
291+
>
292+
inline void
293+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
294+
OutputIterator result,
295+
::boost::compute::max<T> function,
296+
command_queue &queue,
297+
typename boost::enable_if_c<
298+
!is_device_iterator<OutputIterator>::value
299+
>::type* = 0)
300+
{
301+
// reduce each part of input vector
302+
std::vector<T> parts_results_host(input.parts());
303+
for(size_t i = 0; i < input.parts(); i++)
304+
{
305+
::boost::compute::reduce(
306+
input.begin(i),
307+
input.end(i),
308+
&parts_results_host[i],
309+
function,
310+
queue.get(i)
311+
);
312+
}
313+
314+
// final reduction
315+
*result = parts_results_host[0];
316+
for(size_t i = 1; i < input.parts(); i++)
317+
{
318+
*result = (std::max)(static_cast<T>(*result), parts_results_host[i]);
319+
}
320+
}
321+
322+
} // end detail namespace
323+
324+
/// Returns the result of applying \p function to the elements in the
325+
/// \p input vector.
326+
///
327+
/// If no function is specified, \c plus will be used.
328+
///
329+
/// \param input input vector
330+
/// \param result iterator pointing to the output
331+
/// \param function binary reduction function
332+
/// \param queue distributed command queue to perform the operation
333+
///
334+
/// Distributed command queue \p queue has to span same set of compute devices
335+
/// (including their contexts) as distributed command queue used to create
336+
/// \p input vector.
337+
///
338+
/// If \p result is a device iterator, its underlying buffer must be allocated
339+
/// in context of at least one device command queue from \p queue.
340+
///
341+
/// The \c reduce() algorithm assumes that the binary reduction function is
342+
/// associative. When used with non-associative functions the result may
343+
/// be non-deterministic and vary in precision. Notably this affects the
344+
/// \c plus<float>() function as floating-point addition is not associative
345+
/// and may produce slightly different results than a serial algorithm.
346+
///
347+
/// This algorithm supports both host and device iterators for the
348+
/// result argument. This allows for values to be reduced and copied
349+
/// to the host all with a single function call.
350+
template<
351+
class InputType, weight_func weight, class Alloc,
352+
class OutputIterator,
353+
class BinaryFunction
354+
>
355+
inline void
356+
reduce(const vector<InputType, weight, Alloc> &input,
357+
OutputIterator result,
358+
BinaryFunction function,
359+
command_queue &queue)
360+
{
361+
if(input.empty()) {
362+
return;
363+
}
364+
365+
if(input.parts() == 1) {
366+
::boost::compute::reduce(
367+
input.begin(0),
368+
input.end(0),
369+
result,
370+
function,
371+
queue.get(0)
372+
);
373+
return;
374+
}
375+
detail::dispatch_reduce(input, result, function, queue);
376+
}
377+
378+
/// \overload
379+
template<
380+
class InputType, weight_func weight, class Alloc,
381+
class OutputIterator
382+
>
383+
inline void
384+
reduce(const vector<InputType, weight, Alloc> &input,
385+
OutputIterator result,
386+
command_queue &queue)
387+
{
388+
return reduce(input, result, ::boost::compute::plus<InputType>(), queue);
389+
}
390+
391+
} // end distributed namespace
392+
} // end compute namespace
393+
} // end boost namespace
394+
395+
#endif /* BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP */

0 commit comments

Comments
 (0)