Skip to content

Commit

Permalink
fix(server): client pause work while blocking commands run (#2584)
Browse files Browse the repository at this point in the history
fix #2576
fix #2661

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Feb 28, 2024
1 parent 8ef9262 commit 7e45270
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 9 deletions.
13 changes: 11 additions & 2 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
issuer_{issuer},
ignore_paused_{ignore_paused},
ignore_blocked_{ignore_blocked} {
bc_ = make_unique<util::fb2::BlockingCounter>(0);
}

void DispatchTracker::TrackOnThread() {
Expand All @@ -400,7 +401,15 @@ void DispatchTracker::TrackOnThread() {
}

bool DispatchTracker::Wait(absl::Duration duration) {
return bc_.WaitFor(absl::ToChronoMilliseconds(duration));
bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
if (!res && ignore_blocked_) {
// We track all connections again because a connection might became blocked between the time
// we call tracking the last time.
bc_.reset(new util::fb2::BlockingCounter(0));
TrackAll();
res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
}
return res;
}

void DispatchTracker::TrackAll() {
Expand All @@ -410,7 +419,7 @@ void DispatchTracker::TrackAll() {

void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_);
fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_);
}

} // namespace facade
2 changes: 1 addition & 1 deletion src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DispatchTracker {

std::vector<facade::Listener*> listeners_;
facade::Connection* issuer_;
util::fb2::BlockingCounter bc_{0};
std::unique_ptr<util::fb2::BlockingCounter> bc_;
bool ignore_paused_;
bool ignore_blocked_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class CommandId : public facade::CommandId {
return opt_mask_ & CO::WRITE;
}

bool IsBlocking() const {
return opt_mask_ & CO::BLOCKING;
}

static const char* OptName(CO::CommandOpt fl);

CommandId&& SetHandler(Handler f) && {
Expand Down
4 changes: 3 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
// paired with shardlocal eval
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));

if (!is_multi && !is_eval && cid != nullptr) {
const bool is_blocking = cid != nullptr && cid->IsBlocking();

if (!is_multi && !is_eval && !is_blocking && cid != nullptr) {
stored_cmds.reserve(args_list.size());
stored_cmds.emplace_back(cid, tail_args);
continue;
Expand Down
11 changes: 7 additions & 4 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,17 +559,20 @@ string_view GetRedisMode() {
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
// Set global pause state and track commands that are running when the pause state is flipped.
// Exlude already paused commands from the busy count.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */};
// Track connections and set pause state to be able to wait untill all running transactions read
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
// state after waking from blocking so if the trasaction was waken by another running
// command that did not pause on the new state yet we will pause after waking up.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */,
true /*ignore blocking*/};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});

// TODO handle blocking commands
// Wait for all busy commands to finish running before replying to guarantee
// that no more (write) operations will occur.
const absl::Duration kDispatchTimeout = absl::Seconds(1);
Expand Down
5 changes: 4 additions & 1 deletion src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1349,14 +1349,17 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();

// TBD set connection blocking state
// Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us.
cv_status status = blocking_barrier_.Wait(tp);

DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
--stats->num_blocked_clients;

// TBD set connection pause state
ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands

OpStatus result = OpStatus::OK;
if (status == cv_status::timeout) {
result = OpStatus::TIMED_OUT;
Expand Down
53 changes: 53 additions & 0 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import string
import pytest
import asyncio
import time
Expand Down Expand Up @@ -721,3 +722,55 @@ async def do_write():
await asyncio.sleep(0.0)
assert p3.done()
await p3


@pytest.mark.asyncio
async def test_blocking_command_client_pause(async_client: aioredis.Redis):
"""
1. Check client pause success when blocking transaction is running
2. lpush is paused after running client puase
3. once puased is finished lpush will run and blpop will pop the pushed value
"""

async def blocking_command():
res = await async_client.execute_command("blpop key 2")
assert res == ["key", "value"]

async def lpush_command():
await async_client.execute_command("lpush key value")

blocking = asyncio.create_task(blocking_command())
await asyncio.sleep(0.1)

res = await async_client.execute_command("client pause 1000")
assert res == "OK"

lpush = asyncio.create_task(lpush_command())
assert not lpush.done()

await lpush
await blocking


@pytest.mark.asyncio
async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis):
"""
Check running client pause command simultaneously with running multiple blocking command
from multiple connections
"""

async def just_blpop():
key = "".join(random.choices(string.ascii_letters, k=3))
await async_client.execute_command(f"blpop {key} 2")

async def client_pause():
res = await async_client.execute_command("client pause 1000")
assert res == "OK"

tasks = [just_blpop() for _ in range(20)]
tasks.append(client_pause())

all = asyncio.gather(*tasks)

assert not all.done()
await all

0 comments on commit 7e45270

Please sign in to comment.