Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,6 @@ namespace hpx::parallel {
hpx::future<Iter> parallel_partial_sort(ExPolicy&& policy, Iter first,
Iter middle, Iter last, std::uint32_t level, Comp&& comp = Comp());

HPX_CXX_EXPORT struct sort_thread_helper
{
template <typename... Ts>
decltype(auto) operator()(Ts&&... ts) const
{
return sort_thread(HPX_FORWARD(Ts, ts)...);
}
};

HPX_CXX_EXPORT struct parallel_partial_sort_helper
{
template <typename... Ts>
Expand Down Expand Up @@ -399,15 +390,15 @@ namespace hpx::parallel {
policy.parameters(), policy.executor(),
hpx::chrono::null_duration, cores, nelem);

hpx::future<Iter> left = execution::async_execute(
policy.executor(), sort_thread_helper(), policy, first,
c_last, comp, chunk_size);
hpx::future<Iter> left =
execution::async_execute(policy.executor(), sort_thread{},
policy, first, c_last, comp, chunk_size);

hpx::future<Iter> right;
if (middle != c_last)
{
right = execution::async_execute(policy.executor(),
parallel_partial_sort_helper(), policy, c_last + 1,
parallel_partial_sort_helper{}, policy, c_last + 1,
middle, last, level - 1, comp);
}
else
Expand Down
139 changes: 70 additions & 69 deletions libs/core/algorithms/include/hpx/parallel/algorithms/sort.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,51 +187,37 @@ namespace hpx::parallel {

// \brief this function is the work assigned to each thread in the
// parallel process
HPX_CXX_EXPORT template <typename ExPolicy, typename RandomIt,
typename Comp>
hpx::future<RandomIt> sort_thread(ExPolicy&& policy, RandomIt first,
RandomIt last, Comp comp, std::size_t chunk_size)
HPX_CXX_EXPORT struct sort_thread
{
std::ptrdiff_t const N = last - first;
if (static_cast<std::size_t>(N) <= chunk_size)
template <typename ExPolicy, typename RandomIt, typename Comp>
hpx::future<RandomIt> operator()(ExPolicy&& policy, RandomIt first,
RandomIt last, Comp comp, std::size_t chunk_size)
{
return execution::async_execute(policy.executor(),
[first, last, comp = HPX_MOVE(comp)]() -> RandomIt {
std::sort(first, last, comp);
return last;
});
}
std::ptrdiff_t const N = last - first;
if (static_cast<std::size_t>(N) <= chunk_size)
{
return execution::async_execute(policy.executor(),
[first, last, comp = HPX_MOVE(comp)]() -> RandomIt {
std::sort(first, last, comp);
return last;
});
}

// check if sorted
if (detail::is_sorted_sequential(first, last, comp))
{
return hpx::make_ready_future(last);
}
// check if sorted
if (detail::is_sorted_sequential(first, last, comp))
{
return hpx::make_ready_future(last);
}

// pivot selections
pivot9(first, last, comp);
// pivot selections
pivot9(first, last, comp);

using reference =
typename std::iterator_traits<RandomIt>::reference;
using reference =
typename std::iterator_traits<RandomIt>::reference;

reference val = *first;
RandomIt c_first = first + 1, c_last = last - 1;
reference val = *first;
RandomIt c_first = first + 1, c_last = last - 1;

while (comp(*c_first, val))
{
++c_first;
}
while (comp(val, *c_last))
{
--c_last;
}
while (c_first < c_last)
{
#if defined(HPX_HAVE_CXX20_STD_RANGES_ITER_SWAP)
std::ranges::iter_swap(c_first++, c_last--);
#else
std::iter_swap(c_first++, c_last--);
#endif
while (comp(*c_first, val))
{
++c_first;
Expand All @@ -240,40 +226,56 @@ namespace hpx::parallel {
{
--c_last;
}
}

while (c_first < c_last)
{
#if defined(HPX_HAVE_CXX20_STD_RANGES_ITER_SWAP)
std::ranges::iter_swap(first, c_last);
std::ranges::iter_swap(c_first++, c_last--);
#else
std::iter_swap(first, c_last);
std::iter_swap(c_first++, c_last--);
#endif

// spawn tasks for each sub section
hpx::future<RandomIt> left = execution::async_execute(
policy.executor(), &sort_thread<ExPolicy, RandomIt, Comp>,
policy, first, c_last, comp, chunk_size);

hpx::future<RandomIt> right = execution::async_execute(
policy.executor(), &sort_thread<ExPolicy, RandomIt, Comp>,
policy, c_first, last, comp, chunk_size);

return hpx::dataflow(
[last](hpx::future<RandomIt>&& leftf,
hpx::future<RandomIt>&& rightf) -> RandomIt {
if (leftf.has_exception() || rightf.has_exception())
while (comp(*c_first, val))
{
std::list<std::exception_ptr> errors;
if (leftf.has_exception())
errors.push_back(leftf.get_exception_ptr());
if (rightf.has_exception())
errors.push_back(rightf.get_exception_ptr());

throw exception_list(HPX_MOVE(errors));
++c_first;
}
return last;
},
HPX_MOVE(left), HPX_MOVE(right));
}
while (comp(val, *c_last))
{
--c_last;
}
}

#if defined(HPX_HAVE_CXX20_STD_RANGES_ITER_SWAP)
std::ranges::iter_swap(first, c_last);
#else
std::iter_swap(first, c_last);
#endif

// spawn tasks for each subsection
hpx::future<RandomIt> left =
execution::async_execute(policy.executor(), *this, policy,
first, c_last, comp, chunk_size);

hpx::future<RandomIt> right =
execution::async_execute(policy.executor(), *this, policy,
c_first, last, comp, chunk_size);

return hpx::dataflow(
[last](hpx::future<RandomIt>&& leftf,
hpx::future<RandomIt>&& rightf) -> RandomIt {
if (leftf.has_exception() || rightf.has_exception())
{
std::list<std::exception_ptr> errors;
if (leftf.has_exception())
errors.push_back(leftf.get_exception_ptr());
if (rightf.has_exception())
errors.push_back(rightf.get_exception_ptr());

throw exception_list(HPX_MOVE(errors));
}
return last;
},
HPX_MOVE(left), HPX_MOVE(right));
}
};

// policy : execution policy
// [in] first iterator to the first element to sort
Expand Down Expand Up @@ -324,8 +326,7 @@ namespace hpx::parallel {
return hpx::make_ready_future(last);
}

return execution::async_execute(policy.executor(),
&sort_thread<std::decay_t<ExPolicy>, RandomIt, Comp>,
return execution::async_execute(policy.executor(), sort_thread{},
HPX_FORWARD(ExPolicy, policy), first, last,
HPX_FORWARD(Comp, comp), chunk_size);
}
Expand Down
Loading
Loading