From 5248208a76d3755dce85a06f74b571be2173114a Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 14 Nov 2022 12:21:07 -0700 Subject: [PATCH] Merge remote-tracking branch 'origin/topic/neverlord/sim-clock-tuning' * origin/topic/neverlord/sim-clock-tuning: Minimize messaging in sim_clock::advance_time (cherry picked from commit 3a2f894154c718633e232f4f8dbdeb7f9b0d28fa) --- src/endpoint.cc | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/endpoint.cc b/src/endpoint.cc index c768975e..c701b030 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -204,22 +204,38 @@ class sim_clock : public endpoint::clock { --pending_count_; } } - // Wait for response messages. + // Send messages to all actors that we sync with. caf::scoped_actor self{ctx_->sys}; - for (auto& who : sync_with_actors) { + for (auto& who : sync_with_actors) self->send(who, atom::sync_point_v, self); - self->delayed_send(self, timeout::frontend, atom::tick_v); + // Schedule a timeout (tick) message to abort syncing in case the actors + // take too long. + auto tout_mme = caf::make_mailbox_element(self->ctrl(), + caf::make_message_id(), + caf::no_stages, atom::tick_v); + auto& caf_clock = self->clock(); + auto tout = + caf_clock.schedule_message(caf_clock.now() + timeout::frontend, + caf::actor_cast(self), + std::move(tout_mme)); + // Wait for response messages. + bool abort_syncing = false; + for (size_t i = 0; !abort_syncing && i < sync_with_actors.size(); ++i) self->receive( [&](atom::sync_point) { // nop }, [&](atom::tick) { BROKER_DEBUG("advance_time actor syncing timed out"); + abort_syncing = true; }, [&](caf::error& e) { BROKER_DEBUG("advance_time actor syncing failed"); + abort_syncing = true; }); - } + // Dispose the timeout if it's still pending to avoid unnecessary messaging. + if (!abort_syncing) + tout.dispose(); } void send_later(worker dest, timespan after, void* vptr) override {