Skip to content

Commit b56fe08

Browse files
authored
Merge pull request #950 from singnet/artur-redis-ctx-ping
Add `RedisContext` ping before returning it to the pool
2 parents ef4e041 + ba6575e commit b56fe08

File tree

8 files changed

+295
-84
lines changed

8 files changed

+295
-84
lines changed

src/atomdb/redis_mongodb/RedisContext.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ redisReply* RedisContext::execute(const char* command) {
3939
}
4040
}
4141

42+
bool RedisContext::ping() {
43+
redisReply* reply = this->execute("PING");
44+
if (reply == NULL) return false;
45+
if (reply->type != REDIS_REPLY_STATUS) return false;
46+
return string(reply->str) == string("PONG");
47+
}
48+
4249
void RedisContext::append_command(const char* command) {
4350
if (cluster_flag) {
4451
if (redisClusterAppendCommand(cluster_ctx, command) == REDIS_OK) {

src/atomdb/redis_mongodb/RedisContext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class RedisContext {
1717
void set_context(redisClusterContext* ctx);
1818

1919
redisReply* execute(const char* command);
20+
bool ping();
2021
void append_command(const char* command);
2122
void flush_commands();
2223
bool has_error() const;

src/atomdb/redis_mongodb/RedisContextPool.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ shared_ptr<RedisContext> RedisContextPool::acquire() {
3232
if (!pool.empty()) {
3333
auto ctx = pool.front();
3434
pool.pop();
35-
return ctx;
35+
if (ctx->ping()) {
36+
LOG_DEBUG("Context is alive, returning to the caller.");
37+
return shared_ptr<RedisContext>(ctx.get(),
38+
[this, ctx](RedisContext* ptr) { this->release(ctx); });
39+
}
40+
LOG_DEBUG("Context is not alive, creating a new one.");
41+
total_contexts--;
3642
}
3743

3844
// Create a new context if the pool is empty
@@ -83,9 +89,9 @@ shared_ptr<RedisContext> RedisContextPool::acquire() {
8389

8490
void RedisContextPool::release(shared_ptr<RedisContext> ctx) {
8591
if (!ctx) return;
86-
8792
unique_lock<mutex> lock(this->pool_mutex);
8893
pool.push(ctx);
89-
total_contexts--;
90-
pool_cond_var.notify_one();
94+
LOG_DEBUG("Context added to the pool (size: " << pool.size()
95+
<< ", contexts: " << to_string(total_contexts) << ")");
96+
pool_cond_var.notify_all();
9197
}

src/atomdb/redis_mongodb/RedisMongoDB.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,15 @@ uint RedisMongoDB::get_next_score(const string& key) {
375375

376376
void RedisMongoDB::set_next_score(const string& key, uint score) {
377377
if (SKIP_REDIS) return;
378-
379378
auto ctx = this->redis_pool->acquire();
379+
set_next_score_with_context(ctx, key, score);
380+
}
381+
382+
void RedisMongoDB::set_next_score_with_context(shared_ptr<RedisContext> ctx,
383+
const string& key,
384+
uint score) {
385+
if (SKIP_REDIS) return;
386+
380387
string command = "SET " + key + " " + to_string(score);
381388
redisReply* reply = ctx->execute(command.c_str());
382389
if (reply == NULL) Utils::error("Redis error at set_next_score: <" + command + ">");
@@ -1012,8 +1019,11 @@ vector<string> RedisMongoDB::add_links(const vector<atoms::Link*>& links,
10121019

10131020
LOG_DEBUG("Setting next scores: patterns=" + to_string(this->patterns_next_score.load()) +
10141021
", incoming=" + to_string(this->incoming_set_next_score.load()));
1015-
set_next_score(REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load());
1016-
set_next_score(REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load());
1022+
1023+
set_next_score_with_context(
1024+
ctx, REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load());
1025+
set_next_score_with_context(
1026+
ctx, REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load());
10171027

10181028
if (is_transactional) {
10191029
lock_guard<mutex> composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex);

src/atomdb/redis_mongodb/RedisMongoDB.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ class RedisMongoDB : public AtomDB {
170170

171171
uint get_next_score(const string& key);
172172
void set_next_score(const string& key, uint score);
173+
void set_next_score_with_context(shared_ptr<RedisContext> ctx, const string& key, uint score);
173174
void reset_scores();
174175

175176
void add_pattern(const string& handle, const string& pattern_handle);

0 commit comments

Comments
 (0)