Skip to content

Added schedule_exchange() and schedule_wait() to communication_object#190

Merged
msimberg merged 91 commits intoghex-org:masterfrom
philip-paul-mueller:phimuell__async-mpi-2
Mar 17, 2026
Merged

Added schedule_exchange() and schedule_wait() to communication_object#190
msimberg merged 91 commits intoghex-org:masterfrom
philip-paul-mueller:phimuell__async-mpi-2

Conversation

@philip-paul-mueller
Copy link
Copy Markdown
Collaborator

@philip-paul-mueller philip-paul-mueller commented Dec 18, 2025

This PR adds the the schedule_exchange() and schedule_wait() function to the communication object.
They behave similar to the regular exchange() and wait() but they accepts an additional CUDA stream as argument.

schedule_exchange() will wait with packing until all work that has been scheduled on the passed stream has finished, which removes the need for an external synchronization.
It is important to note, that the function will only return after all data have been send, which is the same behaviour than exchange().

schedule_wait() is similar to wait(), it will launch unpacking but will make synchronize it with the passed stream.
This means that every work that is submitted to the stream after the function has returned, will not start until all unpacking has been completed.

The PR also extends the Python bindings.
The bindings are able to interpret the following Python objects:

  • None is interpreted as default stream, i..e nullptr.
  • If the object has a __cuda_stream__ method it is assumed to follow Nvidia's stream protocol.
  • If the object has a .ptr attribute it is assumed that it follows CuPy's Stream implementation.

Note:
For CPU memory schedule_exchange() and schedule_wait() behave the exact same way as exchange() and wait(), i.e. exchange() starts to pack immediately and wait() only returns after the unpacking has finished.
If CPU and GPU memory is exchanged in the same transaction then the behaviour is a mix of both, the CPU parts are immediately packed but the packing of the GPU memory synchronizes with the stream.
The same holds for schedule_wait(), which will only return if the CPU memory has been unpacked, but will return after the unpacking of the GPU memory has been initiated.
What happens exactly also depends on if the CPU memory is processed before the GPU memory.
Thus it is safe to mix the two but it is not recommended.


NOTE:

  • This PR replaces PR#186, with a refactored solution.
  • The main important changes to this function are in the following files:
    • include/ghex/communication_object.hpp: Adding of the schedule_*() functions.
    • include/ghex/device/cuda/stream.hpp: Adding of a (very simple) event pool.
    • bindings/python/src/_pyghex/unstructured/communication_object.cpp: Updating the bindings.
  • This PR depends on PR#189.
  • This PR depends on PR#191, which adds formatting.

TODO:

  • This PR still has the wrong formatting applied to it.
  • Test on ICON4Py production setup.
  • Ask Fabian if we should add CuPy as a dependency for the binding tests such that we can also test the GPU tests.

msimberg and others added 30 commits November 3, 2025 12:48
*
* TODO: Should the handle expose this function?
*/
void complete_schedule_exchange()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this private and if a caller needs to synchronize after a schedule_wait, just make them call wait instead?

schedule_exchange(stream);
schedule_wait(stream);
// now I really want to synchronize
wait();

IMO this would simplify the API in that wait can always be called as a wait-for-everything-to-finish regardless of how the exchange or wait was done previously. wait may of course need to call complete_schedule_exchange internally (or parts of it) for this approach to work.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider it.

Copy link
Copy Markdown
Collaborator Author

@philip-paul-mueller philip-paul-mueller Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have though about it again.

Regarding your example, while I can understand why it should work (in fact I also think it must work), I do not think that it is the right (what ever that means) way to achieve it.
The scheduled functions essentially introduce a kind of stream semantic into GHEX.
So, I would argue that to check if the transmission has finished one should synchronize with the stream passed to schedule_exchange().
The only thing that GHEX must do is, not start a new exchange before the old has finished or delete something that is still in use by a transfer that is still ongoing, which is what complete_schedule_exchange() does.
So I think that you are right when you say that it should become private as it should probably never be called directly by a user.
Instead all exchange*() functions call it to make sure that the exchange has happened and it is safe to start a new one.
Because wait() deallocate memory it must also call it to make sure that it does not delete something that is still in use.
As a side effect your example code will work and do what you want, i.e. the full synchronization.
But, as I outlined above, in my opinion, it is not the right way of doing it, but it must work.

Does this make sense?

Comment on lines +835 to +837
#ifdef GHEX_CUDACC
assert(has_scheduled_exchange());
#endif
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't belong here, or the logic is wrong elsewhere. clear() is called by complete_scheduled_exchange after the event has been reset, meaning this always fails. Should this be assert(!has_scheduled_exchange())?

Copy link
Copy Markdown
Collaborator Author

@philip-paul-mueller philip-paul-mueller Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it should be assert(!...), I am just wondering why the CI "passes".
Probably because asserts are disabled?

* exchange. A user will never have to call it directly. If there was no such
* exchange or GPU support was disabled, the function does nothing.
*
* \note This should be a private function, but the tests need them.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do the tests need this function?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator

@boeschf boeschf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you for the many improvements! This is a much needed addition.

There are a few nitpicks on which I commented. Feel free to postpone them/ignore them.

In general, there are a few missing pieces still, which could be addressed in the future:

  • only the unstructured part is exposed to Python, but not the structured
  • the bulk_communication_object could perhaps also benefit from the enriched interface

Overall, the communication object becomes quite an unwieldy beast, and we should start thinking about moving to a true async runtime.

if (!m_moved) { GHEX_CHECK_CUDA_RESULT_NO_THROW(cudaEventDestroy(m_event)) }
}

operator bool() const noexcept { return m_moved; }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the semantics are backwards here - this should be changed everywhere, but perhaps in another PR.

Comment on lines +260 to +263
// Make sure that communication has finished and we can deallocate
// the buffers. Maybe the call to `clear()` is too much here and
// we should only wait.
complete_schedule_exchange();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments seem not to match the code

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part about clear() was about what happens inside the complete_schedule_exchange() and was more like a note to myself.

Comment on lines +615 to +616
template<bool UseAsyncStream, typename... StreamType>
void pack_and_send_impl(StreamType&&... sync_streams)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the template parameter pack is only needed to allow for either zero or exactly one argument - this is not really nice, and entails the whole array expansion below. I would simplify this (enable_if for example), but I am ok with this considering this is an implementation detail.

Copy link
Copy Markdown
Collaborator Author

@philip-paul-mueller philip-paul-mueller Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it to allow multiple streams, but realized that it was a bit hard to do so I abandoned this path.

I have now separated the two implementation, it is a little bit code dublication but not much.

@philip-paul-mueller
Copy link
Copy Markdown
Collaborator Author

@boeschf Thanks for your comments.
I think we should discuss the more general ones tomorrow in person.

Copy link
Copy Markdown
Collaborator

@boeschf boeschf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, looks good to me

@msimberg
Copy link
Copy Markdown
Collaborator

I will merge this now to prepare a new release. Further cleanups are still useful, but will be part of #195 and other PRs for the release after this one.

@msimberg msimberg merged commit 871b380 into ghex-org:master Mar 17, 2026
11 of 12 checks passed
@msimberg msimberg mentioned this pull request Mar 17, 2026
philip-paul-mueller added a commit to C2SM/icon4py that referenced this pull request Mar 18, 2026
This PR introduces the [scheduled exchange
feature](ghex-org/GHEX#190) from GHEX into
ICON4Py.

These exchange allows to call the exchange function before all work has
been completed, i.e. the exchange will wait until the previous work is
done. A similar feature is the "scheduled wait", that allows to initiate
the receive without the need to wait on its completion.

In addition to this the function also renamed the functions related to
halo exchange:
- `exchange()` was renamed to `start()`.
- `wait()` was renamed to `finish()` (that might now return before the
transfer has fully concluded).
- `exchange_and_wait()` was renamed to `exchange()`.

All of these functions now accepts the an argument called `stream`,
which defaults to `DEFAULT_STREAM`. It is indicate how synchronization
with the stream should be performed.
In case of `start()` it means that the actual exchange should not start
until all work previously submitted to `stream` has finished. For
`finish()` it means that further work, submitted to `stream`, should not
start until the exchange has ended. For `finish()` it is also possible
to specify `BLOCK`, which means that `finish()` waits until the transfer
has fully finished.

The orchestrator was not updated, but the change were made in such a way
that it continues to work in diffusion, although using the original,
blocking behaviour.


---------

Co-authored-by: Philip Mueller <[email protected]>
Co-authored-by: Hannes Vogt <[email protected]>
Co-authored-by: Copilot Autofix powered by AI <[email protected]>
Co-authored-by: Mikael Simberg <[email protected]>
Co-authored-by: Hannes Vogt <[email protected]>
msimberg added a commit to C2SM/icon4py that referenced this pull request Mar 26, 2026
Adds gtfn_gpu backend to the distributed CI pipeline. dace_gpu is still
left out because compilation takes too long.

The base image is upgraded because it's possible, but not strictly
necessary. The CPU-only version of the pipeline needed 25.04 (24.04 and
25.10 did not work for various reasons). However, since OpenMPI and
libfabric are now built manually in the container the base image version
is less of a constraint. 24.04 doesn't have matching GCC/CUDA versions
and 26.04 doesn't exist yet, but the pipeline should eventually use
26.04.

OpenMPI and libfabric are built manually for slingshot support because
getting the ubuntu repository packages to work with GPU support did not
seem possible/easy. The installation is based on
https://github.com/eth-cscs/cray-network-stack.

GHEX needs an upgrade, because there's a bug in how strides are
calculated for GPU buffers. @philip-paul-mueller has already fixed this
in ghex-org/GHEX#190 but we should wait for that
to be merged (and probably test in icon-exclaim first).

This also fixes a few cupy/numpy incompatibilities.
`revert_repeated_index_to_invalid` was updated to only deal with numpy
for now as the connectivities are always numpy arrays.
`test_halo_exchange_for_sparse_field` is marked `embedded_only`. The
non-MPI test was already marked embedded-only.

This does not try to unify the default and distributed CI pipeline
definitions. That should, however, be done done sooner or later as well.

---------

Co-authored-by: Jacopo Canton <[email protected]>
Co-authored-by: Nicoletta Farabullini <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants