Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement connection timeout functionality #4407

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConnectionContext {
// connection state / properties.
bool conn_closing : 1;
bool req_auth : 1;
bool replica_conn : 1;
bool replica_conn : 1; // whether it's a replica connection on the master side.
bool authenticated : 1;
bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
Expand Down
7 changes: 4 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@ Connection::~Connection() {
UpdateLibNameVerMap(lib_name_, lib_ver_, -1);
}

bool Connection::IsSending() const {
return reply_builder_ && reply_builder_->IsSendActive();
}

// Called from Connection::Shutdown() right after socket_->Shutdown call.
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";
Expand Down Expand Up @@ -1638,9 +1642,6 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) {

Connection::WeakRef Connection::Borrow() {
DCHECK(self_);
// If the connection is unaware of subscriptions, it could migrate threads, making this call
// unsafe. All external mechanisms that borrow references should register subscriptions.
DCHECK_GT(cc_->subscriptions, 0);

return WeakRef(self_, socket_->proactor()->GetPoolIndex(), id_);
}
Expand Down
10 changes: 10 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,16 @@ class Connection : public util::Connection {
static void TrackRequestSize(bool enable);
static void EnsureMemoryBudget(unsigned tid);

unsigned idle_time() const {
return time(nullptr) - last_interaction_;
}

Phase phase() const {
return phase_;
}

bool IsSending() const;

protected:
void OnShutdown() override;
void OnPreMigrateThread() override;
Expand Down
5 changes: 3 additions & 2 deletions src/server/blocking_controller_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class BlockingControllerTest : public Test {
}
void SetUp() override;
void TearDown() override;

static void SetUpTestSuite() {
ServerState::Init(kNumThreads, kNumThreads, nullptr);
ServerState::Init(kNumThreads, kNumThreads, nullptr, nullptr);
facade::tl_facade_stats = new facade::FacadeStats;
}

Expand All @@ -45,7 +46,7 @@ void BlockingControllerTest::SetUp() {
pp_.reset(fb2::Pool::Epoll(kNumThreads));
pp_->Run();
pp_->AwaitBrief([](unsigned index, ProactorBase* p) {
ServerState::Init(index, kNumThreads, nullptr);
ServerState::Init(index, kNumThreads, nullptr, nullptr);
if (facade::tl_facade_stats == nullptr) {
facade::tl_facade_stats = new facade::FacadeStats;
}
Expand Down
13 changes: 9 additions & 4 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("migration_finalization_timeout_ms");
config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("tcp_keepalive");
config_registry.RegisterMutable("timeout");
config_registry.RegisterMutable("managed_service_info");

config_registry.RegisterMutable(
Expand Down Expand Up @@ -849,19 +850,23 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
shard_num = pp_.size();
}

// We assume that listeners.front() is the main_listener
// see dfly_main RunEngine. In unit tests, listeners are empty.
facade::Listener* main_listener = listeners.empty() ? nullptr : listeners.front();

ChannelStore* cs = new ChannelStore{};
// Must initialize before the shard_set because EngineShard::Init references ServerState.
pp_.AwaitBrief([&](uint32_t index, ProactorBase* pb) {
tl_facade_stats = new FacadeStats;
ServerState::Init(index, shard_num, &user_registry_);
ServerState::Init(index, shard_num, main_listener, &user_registry_);
ServerState::tlocal()->UpdateChannelStore(cs);
});

const auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
// We assume that listeners.front() is the main_listener
// see dfly_main RunEngine
if (!tcp_disabled && !listeners.empty()) {
acl_family_.Init(listeners.front(), &user_registry_);
if (!tcp_disabled && main_listener) {
acl_family_.Init(main_listener, &user_registry_);
}

// Initialize shard_set with a callback running once in a while in the shard threads.
Expand Down Expand Up @@ -907,7 +912,7 @@ void Service::Shutdown() {
shard_set->Shutdown();
Transaction::Shutdown();

pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });

// wait for all the pending callbacks to stop.
ThisFiber::SleepFor(10ms);
Expand Down
78 changes: 77 additions & 1 deletion src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_connection.h"
#include "server/journal/journal.h"
#include "util/listener_interface.h"

ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
ABSL_FLAG(uint32_t, timeout, 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to connection_timeout?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you intend to change the default in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name and the default value are compatible with valkey. see https://github.com/valkey-io/valkey/blob/unstable/valkey.conf#L161

"Close the connection after it is idle for N seconds (0 to disable)");

namespace dfly {

using namespace std;
using namespace std::chrono_literals;

__thread ServerState* ServerState::state_ = nullptr;

ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
Expand Down Expand Up @@ -102,21 +109,33 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe
}

ServerState::~ServerState() {
watcher_fiber_.JoinIfNeeded();
}

void ServerState::Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry) {
void ServerState::Init(uint32_t thread_index, uint32_t num_shards,
util::ListenerInterface* main_listener, acl::UserRegistry* registry) {
state_ = new ServerState();
state_->gstate_ = GlobalState::ACTIVE;
state_->thread_index_ = thread_index;
state_->user_registry = registry;
state_->stats = Stats(num_shards);
if (main_listener) {
state_->watcher_fiber_ = util::fb2::Fiber(
util::fb2::Launch::post, "ConnectionsWatcher",
[state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); });
}
}

void ServerState::Destroy() {
delete state_;
state_ = nullptr;
}

void ServerState::EnterLameDuck() {
gstate_ = GlobalState::SHUTTING_DOWN;
watcher_cv_.notify_all();
}

ServerState::MemoryUsageStats ServerState::GetMemoryUsage(uint64_t now_ns) {
static constexpr uint64_t kCacheEveryNs = 1000;
if (now_ns > used_mem_last_update_ + kCacheEveryNs) {
Expand Down Expand Up @@ -208,4 +227,61 @@ ServerState* ServerState::SafeTLocal() {
bool ServerState::ShouldLogSlowCmd(unsigned latency_usec) const {
return slow_log_shard_.IsEnabled() && latency_usec >= log_slower_than_usec;
}

void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) {
optional<facade::Connection::WeakRef> last_reference;

while (true) {
util::fb2::NoOpLock noop;
if (watcher_cv_.wait_for(noop, 1s, [this] { return gstate_ == GlobalState::SHUTTING_DOWN; })) {
break;
}

uint32_t timeout = absl::GetFlag(FLAGS_timeout);
if (timeout == 0) {
continue;
}

facade::Connection* from = nullptr;
if (last_reference && !last_reference->IsExpired()) {
from = last_reference->Get();
}

// We use weak refs, because ShutdownSelf below can potentially block the fiber,
// and during this time some of the connections might be destroyed. Weak refs allow checking
// validity of each connection.
vector<facade::Connection::WeakRef> conn_refs;

auto cb = [&](unsigned thread_index, util::Connection* conn) {
facade::Connection* dfly_conn = static_cast<facade::Connection*>(conn);
using Phase = facade::Connection::Phase;
auto phase = dfly_conn->phase();
bool is_replica = true;
if (dfly_conn->cntx()) {
is_replica = dfly_conn->cntx()->replica_conn;
}

if ((phase == Phase::READ_SOCKET || dfly_conn->IsSending()) &&
!is_replica && dfly_conn->idle_time() > timeout) {
conn_refs.push_back(dfly_conn->Borrow());
}
};

util::Connection* next = main->TraverseConnectionsOnThread(cb, 100, from);
if (next) {
last_reference = static_cast<facade::Connection*>(next)->Borrow();
} else {
last_reference.reset();
}

for (auto& ref : conn_refs) {
facade::Connection* conn = ref.Get();
if (conn) {
VLOG(1) << "Closing connection due to timeout: " << conn->GetClientInfo();
conn->ShutdownSelf();
}
}
}
}

} // end of namespace dfly
18 changes: 14 additions & 4 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ namespace facade {
class Connection;
}

namespace util {
class ListenerInterface;
}

namespace dfly {

namespace journal {
Expand Down Expand Up @@ -150,12 +154,11 @@ class ServerState { // public struct - to allow initialization.
ServerState();
~ServerState();

static void Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry);
static void Init(uint32_t thread_index, uint32_t num_shards,
util::ListenerInterface* main_listener, acl::UserRegistry* registry);
static void Destroy();

void EnterLameDuck() {
state_->gstate_ = GlobalState::SHUTTING_DOWN;
}
void EnterLameDuck();

void TxCountInc() {
++live_transactions_;
Expand Down Expand Up @@ -302,6 +305,9 @@ class ServerState { // public struct - to allow initialization.
size_t serialization_max_chunk_size;

private:
// A fiber constantly watching connections on the main listener.
void ConnectionsWatcherFb(util::ListenerInterface* main);

int64_t live_transactions_ = 0;
SlowLogShard slow_log_shard_;
mi_heap_t* data_heap_;
Expand All @@ -321,6 +327,10 @@ class ServerState { // public struct - to allow initialization.
int client_pauses_[2] = {};
util::fb2::EventCount client_pause_ec_;

// Monitors connections. Currently responsible for closing timed out connections.
util::fb2::Fiber watcher_fiber_;
util::fb2::CondVarAny watcher_cv_;

using Counter = util::SlidingCounter<7>;
Counter qps_;

Expand Down
11 changes: 11 additions & 0 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,3 +1051,14 @@ async def test_hiredis(df_factory):
server.start()
client = base_redis.Redis(port=server.port, protocol=3, cache_config=CacheConfig())
client.ping()


@dfly_args({"timeout": 1})
async def test_timeout(df_server: DflyInstance, async_client: aioredis.Redis):
another_client = df_server.client()
await another_client.ping()
clients = await async_client.client_list()
assert len(clients) == 2
await asyncio.sleep(2)
clients = await async_client.client_list()
assert len(clients) == 1
Loading