Skip to content

Commit cc30718

Browse files
committed
feat: implement connection timeout functionality
`timeout` argument shuts down idle connections after the specified time. Fixes #1677 Signed-off-by: Roman Gershman <[email protected]>
1 parent 5adb976 commit cc30718

File tree

7 files changed

+113
-14
lines changed

7 files changed

+113
-14
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,10 @@ Connection::~Connection() {
573573
UpdateLibNameVerMap(lib_name_, lib_ver_, -1);
574574
}
575575

576+
bool Connection::IsSending() const {
577+
return reply_builder_ && reply_builder_->IsSendActive();
578+
}
579+
576580
// Called from Connection::Shutdown() right after socket_->Shutdown call.
577581
void Connection::OnShutdown() {
578582
VLOG(1) << "Connection::OnShutdown";
@@ -1617,9 +1621,6 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) {
16171621

16181622
Connection::WeakRef Connection::Borrow() {
16191623
DCHECK(self_);
1620-
// If the connection is unaware of subscriptions, it could migrate threads, making this call
1621-
// unsafe. All external mechanisms that borrow references should register subscriptions.
1622-
DCHECK_GT(cc_->subscriptions, 0);
16231624

16241625
return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_);
16251626
}

src/facade/dragonfly_connection.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,16 @@ class Connection : public util::Connection {
311311
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
312312
static void TrackRequestSize(bool enable);
313313

314+
unsigned idle_time() const {
315+
return time(nullptr) - last_interaction_;
316+
}
317+
318+
Phase phase() const {
319+
return phase_;
320+
}
321+
322+
bool IsSending() const;
323+
314324
protected:
315325
void OnShutdown() override;
316326
void OnPreMigrateThread() override;

src/server/blocking_controller_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ class BlockingControllerTest : public Test {
2929
}
3030
void SetUp() override;
3131
void TearDown() override;
32+
3233
static void SetUpTestSuite() {
33-
ServerState::Init(kNumThreads, kNumThreads, nullptr);
34+
ServerState::Init(kNumThreads, kNumThreads, nullptr, nullptr);
3435
facade::tl_facade_stats = new facade::FacadeStats;
3536
}
3637

@@ -45,7 +46,7 @@ void BlockingControllerTest::SetUp() {
4546
pp_.reset(fb2::Pool::Epoll(kNumThreads));
4647
pp_->Run();
4748
pp_->AwaitBrief([](unsigned index, ProactorBase* p) {
48-
ServerState::Init(index, kNumThreads, nullptr);
49+
ServerState::Init(index, kNumThreads, nullptr, nullptr);
4950
if (facade::tl_facade_stats == nullptr) {
5051
facade::tl_facade_stats = new facade::FacadeStats;
5152
}

src/server/main_service.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
813813
config_registry.RegisterMutable("migration_finalization_timeout_ms");
814814
config_registry.RegisterMutable("table_growth_margin");
815815
config_registry.RegisterMutable("tcp_keepalive");
816+
config_registry.RegisterMutable("timeout");
816817
config_registry.RegisterMutable("managed_service_info");
817818

818819
config_registry.RegisterMutable(
@@ -848,19 +849,23 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
848849
shard_num = pp_.size();
849850
}
850851

852+
// We assume that listeners.front() is the main_listener
853+
// see dfly_main RunEngine. In unit tests, listeners are empty.
854+
facade::Listener* main_listener = listeners.empty() ? nullptr : listeners.front();
855+
851856
ChannelStore* cs = new ChannelStore{};
852857
// Must initialize before the shard_set because EngineShard::Init references ServerState.
853858
pp_.AwaitBrief([&](uint32_t index, ProactorBase* pb) {
854859
tl_facade_stats = new FacadeStats;
855-
ServerState::Init(index, shard_num, &user_registry_);
860+
ServerState::Init(index, shard_num, main_listener, &user_registry_);
856861
ServerState::tlocal()->UpdateChannelStore(cs);
857862
});
858863

859864
const auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
860865
// We assume that listeners.front() is the main_listener
861866
// see dfly_main RunEngine
862-
if (!tcp_disabled && !listeners.empty()) {
863-
acl_family_.Init(listeners.front(), &user_registry_);
867+
if (!tcp_disabled && main_listener) {
868+
acl_family_.Init(main_listener, &user_registry_);
864869
}
865870

866871
// Initialize shard_set with a callback running once in a while in the shard threads.
@@ -906,7 +911,7 @@ void Service::Shutdown() {
906911
shard_set->Shutdown();
907912
Transaction::Shutdown();
908913

909-
pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });
914+
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });
910915

911916
// wait for all the pending callbacks to stop.
912917
ThisFiber::SleepFor(10ms);

src/server/server_state.cc

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@ extern "C" {
1515
#include "base/flags.h"
1616
#include "base/logging.h"
1717
#include "facade/conn_context.h"
18+
#include "facade/dragonfly_connection.h"
1819
#include "server/journal/journal.h"
20+
#include "util/listener_interface.h"
1921

2022
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
23+
ABSL_FLAG(uint32_t, timeout, 0,
24+
"Close the connection after it is idle for N seconds (0 to disable)");
2125

2226
namespace dfly {
2327

28+
using namespace std;
29+
using namespace std::chrono_literals;
30+
2431
__thread ServerState* ServerState::state_ = nullptr;
2532

2633
ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
@@ -102,21 +109,33 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe
102109
}
103110

104111
ServerState::~ServerState() {
112+
watcher_fiber_.JoinIfNeeded();
105113
}
106114

107-
void ServerState::Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry) {
115+
void ServerState::Init(uint32_t thread_index, uint32_t num_shards,
116+
util::ListenerInterface* main_listener, acl::UserRegistry* registry) {
108117
state_ = new ServerState();
109118
state_->gstate_ = GlobalState::ACTIVE;
110119
state_->thread_index_ = thread_index;
111120
state_->user_registry = registry;
112121
state_->stats = Stats(num_shards);
122+
if (main_listener) {
123+
state_->watcher_fiber_ = util::fb2::Fiber(
124+
util::fb2::Launch::post, "ConnectionsWatcher",
125+
[state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); });
126+
}
113127
}
114128

115129
void ServerState::Destroy() {
116130
delete state_;
117131
state_ = nullptr;
118132
}
119133

134+
void ServerState::EnterLameDuck() {
135+
gstate_ = GlobalState::SHUTTING_DOWN;
136+
watcher_cv_.notify_all();
137+
}
138+
120139
ServerState::MemoryUsageStats ServerState::GetMemoryUsage(uint64_t now_ns) {
121140
static constexpr uint64_t kCacheEveryNs = 1000;
122141
if (now_ns > used_mem_last_update_ + kCacheEveryNs) {
@@ -208,4 +227,46 @@ ServerState* ServerState::SafeTLocal() {
208227
bool ServerState::ShouldLogSlowCmd(unsigned latency_usec) const {
209228
return slow_log_shard_.IsEnabled() && latency_usec >= log_slower_than_usec;
210229
}
230+
231+
void ServerState::ConnectionsWatcherFb(util::ListenerInterface* main) {
232+
while (true) {
233+
util::fb2::NoOpLock noop;
234+
if (watcher_cv_.wait_for(noop, 1s, [this] { return gstate_ == GlobalState::SHUTTING_DOWN; })) {
235+
break;
236+
}
237+
238+
uint32_t timeout = absl::GetFlag(FLAGS_timeout);
239+
if (timeout == 0) {
240+
continue;
241+
}
242+
243+
// We use weak refs, because ShutdownSelf below can potentially block the fiber,
244+
// and during this time some of the connections might be destroyed. Weak refs allow checking
245+
// validity of each connection.
246+
vector<facade::Connection::WeakRef> conn_refs;
247+
248+
auto cb = [&](unsigned thread_index, util::Connection* conn) {
249+
facade::Connection* dfly_conn = static_cast<facade::Connection*>(conn);
250+
using Phase = facade::Connection::Phase;
251+
auto phase = dfly_conn->phase();
252+
if ((phase == Phase::READ_SOCKET || dfly_conn->IsSending()) &&
253+
dfly_conn->idle_time() > timeout) {
254+
conn_refs.push_back(dfly_conn->Borrow());
255+
}
256+
};
257+
258+
// TODO: to traverse in batches some of the connections to avoid blocking
259+
// the thread for too long
260+
main->TraverseConnectionsOnThread(cb);
261+
262+
for (auto& ref : conn_refs) {
263+
facade::Connection* conn = ref.Get();
264+
if (conn) {
265+
VLOG(1) << "Closing connection due to timeout: " << conn->GetClientInfo();
266+
conn->ShutdownSelf();
267+
}
268+
}
269+
}
270+
}
271+
211272
} // end of namespace dfly

src/server/server_state.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ namespace facade {
2323
class Connection;
2424
}
2525

26+
namespace util {
27+
class ListenerInterface;
28+
}
29+
2630
namespace dfly {
2731

2832
namespace journal {
@@ -150,12 +154,11 @@ class ServerState { // public struct - to allow initialization.
150154
ServerState();
151155
~ServerState();
152156

153-
static void Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry);
157+
static void Init(uint32_t thread_index, uint32_t num_shards,
158+
util::ListenerInterface* main_listener, acl::UserRegistry* registry);
154159
static void Destroy();
155160

156-
void EnterLameDuck() {
157-
state_->gstate_ = GlobalState::SHUTTING_DOWN;
158-
}
161+
void EnterLameDuck();
159162

160163
void TxCountInc() {
161164
++live_transactions_;
@@ -302,6 +305,9 @@ class ServerState { // public struct - to allow initialization.
302305
size_t serialization_max_chunk_size;
303306

304307
private:
308+
// A fiber constantly watching connections on the main listener.
309+
void ConnectionsWatcherFb(util::ListenerInterface* main);
310+
305311
int64_t live_transactions_ = 0;
306312
SlowLogShard slow_log_shard_;
307313
mi_heap_t* data_heap_;
@@ -321,6 +327,10 @@ class ServerState { // public struct - to allow initialization.
321327
int client_pauses_[2] = {};
322328
util::fb2::EventCount client_pause_ec_;
323329

330+
// Monitors connections. Currently responsible for closing timed out connections.
331+
util::fb2::Fiber watcher_fiber_;
332+
util::fb2::CondVarAny watcher_cv_;
333+
324334
using Counter = util::SlidingCounter<7>;
325335
Counter qps_;
326336

tests/dragonfly/connection_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,3 +1038,14 @@ async def test_lib_name_ver(async_client: aioredis.Redis):
10381038
assert len(list) == 1
10391039
assert list[0]["lib-name"] == "dragonfly"
10401040
assert list[0]["lib-ver"] == "1.2.3.4"
1041+
1042+
1043+
@dfly_args({"timeout": 1})
1044+
async def test_timeout(df_server: DflyInstance, async_client: aioredis.Redis):
1045+
another_client = df_server.client()
1046+
await another_client.ping()
1047+
clients = await async_client.client_list()
1048+
assert len(clients) == 2
1049+
await asyncio.sleep(2)
1050+
clients = await async_client.client_list()
1051+
assert len(clients) == 1

0 commit comments

Comments
 (0)