Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/topic/neverlord/sim-clock-tuning'
Browse files Browse the repository at this point in the history
* origin/topic/neverlord/sim-clock-tuning:
  Minimize messaging in sim_clock::advance_time

(cherry picked from commit 3a2f894)
  • Loading branch information
timwoj committed Jan 28, 2023
1 parent 3047baa commit 5248208
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions src/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,38 @@ class sim_clock : public endpoint::clock {
--pending_count_;
}
}
// Wait for response messages.
// Send messages to all actors that we sync with.
caf::scoped_actor self{ctx_->sys};
for (auto& who : sync_with_actors) {
for (auto& who : sync_with_actors)
self->send(who, atom::sync_point_v, self);
self->delayed_send(self, timeout::frontend, atom::tick_v);
// Schedule a timeout (tick) message to abort syncing in case the actors
// take too long.
auto tout_mme = caf::make_mailbox_element(self->ctrl(),
caf::make_message_id(),
caf::no_stages, atom::tick_v);
auto& caf_clock = self->clock();
auto tout =
caf_clock.schedule_message(caf_clock.now() + timeout::frontend,
caf::actor_cast<caf::strong_actor_ptr>(self),
std::move(tout_mme));
// Wait for response messages.
bool abort_syncing = false;
for (size_t i = 0; !abort_syncing && i < sync_with_actors.size(); ++i)
self->receive(
[&](atom::sync_point) {
// nop
},
[&](atom::tick) {
BROKER_DEBUG("advance_time actor syncing timed out");
abort_syncing = true;
},
[&](caf::error& e) {
BROKER_DEBUG("advance_time actor syncing failed");
abort_syncing = true;
});
}
// Dispose the timeout if it's still pending to avoid unnecessary messaging.
if (!abort_syncing)
tout.dispose();
}

void send_later(worker dest, timespan after, void* vptr) override {
Expand Down

0 comments on commit 5248208

Please sign in to comment.