diff --git a/src/ucx/communicator.hpp b/src/ucx/communicator.hpp index 9321bc00..d74550f4 100644 --- a/src/ucx/communicator.hpp +++ b/src/ucx/communicator.hpp @@ -99,7 +99,16 @@ class communicator_impl : public communicator_base } else { - while (ucp_worker_progress(m_recv_worker->get())) {} + for (unsigned int i=0; i<10; ++i) + { + if (m_mutex.try_lock()) + { + auto p = ucp_worker_progress(m_recv_worker->get()); + m_mutex.unlock(); + if (!p) break; + } + } + //while (ucp_worker_progress(m_recv_worker->get())) {} } // work through ready recv callbacks, which were pushed to the queue by other threads // (including this thread) diff --git a/src/ucx/context.hpp b/src/ucx/context.hpp index 30197370..0dfa3542 100644 --- a/src/ucx/context.hpp +++ b/src/ucx/context.hpp @@ -195,7 +195,7 @@ class context_impl : public context_base if (m_thread_safe) m_mutex.lock(); ucp_request_cancel(m_worker->get(), s->m_ucx_ptr); while (ucp_worker_progress(m_worker->get())) {} - if (m_thread_safe) m_mutex.unlock(); + // check whether the cancelled callback was enqueued by consuming all queued cancelled // callbacks and putting them in a temporary vector bool found = false; @@ -212,6 +212,8 @@ class context_impl : public context_base for (auto x : m_cancel_recv_req_vec) while (!m_cancel_recv_req_queue.push(x)) {} + if (m_thread_safe) m_mutex.unlock(); + // delete callback here if it was actually cancelled if (found) {