Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/atomdb/redis_mongodb/RedisContext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ redisReply* RedisContext::execute(const char* command) {
}
}

bool RedisContext::ping() {
redisReply* reply = this->execute("PING");
if (reply == NULL) return false;
if (reply->type != REDIS_REPLY_STATUS) return false;
return string(reply->str) == string("PONG");
}

void RedisContext::append_command(const char* command) {
if (cluster_flag) {
if (redisClusterAppendCommand(cluster_ctx, command) == REDIS_OK) {
Expand Down
1 change: 1 addition & 0 deletions src/atomdb/redis_mongodb/RedisContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class RedisContext {
void set_context(redisClusterContext* ctx);

redisReply* execute(const char* command);
bool ping();
void append_command(const char* command);
void flush_commands();
bool has_error() const;
Expand Down
14 changes: 10 additions & 4 deletions src/atomdb/redis_mongodb/RedisContextPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ shared_ptr<RedisContext> RedisContextPool::acquire() {
if (!pool.empty()) {
auto ctx = pool.front();
pool.pop();
return ctx;
if (ctx->ping()) {
LOG_DEBUG("Context is alive, returning to the caller.");
return shared_ptr<RedisContext>(ctx.get(),
[this, ctx](RedisContext* ptr) { this->release(ctx); });
}
LOG_DEBUG("Context is not alive, creating a new one.");
total_contexts--;
}

// Create a new context if the pool is empty
Expand Down Expand Up @@ -83,9 +89,9 @@ shared_ptr<RedisContext> RedisContextPool::acquire() {

void RedisContextPool::release(shared_ptr<RedisContext> ctx) {
if (!ctx) return;

unique_lock<mutex> lock(this->pool_mutex);
pool.push(ctx);
total_contexts--;
pool_cond_var.notify_one();
LOG_DEBUG("Context added to the pool (size: " << pool.size()
<< ", contexts: " << to_string(total_contexts) << ")");
pool_cond_var.notify_all();
}
16 changes: 13 additions & 3 deletions src/atomdb/redis_mongodb/RedisMongoDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,15 @@ uint RedisMongoDB::get_next_score(const string& key) {

void RedisMongoDB::set_next_score(const string& key, uint score) {
if (SKIP_REDIS) return;

auto ctx = this->redis_pool->acquire();
set_next_score_with_context(ctx, key, score);
}

void RedisMongoDB::set_next_score_with_context(shared_ptr<RedisContext> ctx,
const string& key,
uint score) {
if (SKIP_REDIS) return;

string command = "SET " + key + " " + to_string(score);
redisReply* reply = ctx->execute(command.c_str());
if (reply == NULL) Utils::error("Redis error at set_next_score: <" + command + ">");
Expand Down Expand Up @@ -1012,8 +1019,11 @@ vector<string> RedisMongoDB::add_links(const vector<atoms::Link*>& links,

LOG_DEBUG("Setting next scores: patterns=" + to_string(this->patterns_next_score.load()) +
", incoming=" + to_string(this->incoming_set_next_score.load()));
set_next_score(REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load());
set_next_score(REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load());

set_next_score_with_context(
ctx, REDIS_PATTERNS_PREFIX + ":next_score", this->patterns_next_score.load());
set_next_score_with_context(
ctx, REDIS_INCOMING_PREFIX + ":next_score", this->incoming_set_next_score.load());

if (is_transactional) {
lock_guard<mutex> composite_type_hashes_map_lock(this->composite_type_hashes_map_mutex);
Expand Down
1 change: 1 addition & 0 deletions src/atomdb/redis_mongodb/RedisMongoDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class RedisMongoDB : public AtomDB {

uint get_next_score(const string& key);
void set_next_score(const string& key, uint score);
void set_next_score_with_context(shared_ptr<RedisContext> ctx, const string& key, uint score);
void reset_scores();

void add_pattern(const string& handle, const string& pattern_handle);
Expand Down
Loading