Skip to content

Commit

Permalink
chore: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Jan 20, 2025
1 parent e37b2e3 commit fb3072a
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ bool ServerState::ShouldLogSlowCmd(unsigned latency_usec) const {
}

void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) {
optional<facade::Connection::WeakRef> last_reference;

while (true) {
util::fb2::NoOpLock noop;
if (watcher_cv_.wait_for(noop, 1s, [this] { return gstate_ == GlobalState::SHUTTING_DOWN; })) {
Expand All @@ -240,6 +242,11 @@ void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) {
continue;
}

facade::Connection* from = nullptr;
if (last_reference && !last_reference->IsExpired()) {
from = last_reference->Get();
}

// We use weak refs, because ShutdownSelf below can potentially block the fiber,
// and during this time some of the connections might be destroyed. Weak refs allow checking
// validity of each connection.
Expand All @@ -255,9 +262,12 @@ void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) {
}
};

// TODO: to traverse in batches some of the connections to avoid blocking
// the thread for too long
main->TraverseConnectionsOnThread(cb);
util::Connection* next = main->TraverseConnectionsOnThread(cb, 100, from);
if (next) {
last_reference = static_cast<facade::Connection*>(next)->Borrow();
} else {
last_reference.reset();
}

for (auto& ref : conn_refs) {
facade::Connection* conn = ref.Get();
Expand Down

0 comments on commit fb3072a

Please sign in to comment.