Skip to content

Commit

Permalink
Replicate multi-log in one rpc #794 (#860)
Browse files Browse the repository at this point in the history
* Replicate multi-log in one rpc #794
  • Loading branch information
lylei authored and bluebore committed Mar 3, 2017
1 parent baa4db6 commit ad878bb
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 36 deletions.
2 changes: 2 additions & 0 deletions sandbox/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ echo '--master_slave_log_limit=300' >> bfs.flag
echo '--master_log_gc_interval=10' >> bfs.flag
echo '--logdb_log_size=1' >> bfs.flag
echo '--log_replicate_timeout=3' >> bfs.flag
echo '--snapshot_step=10000' >> bfs.flag
echo '--log_batch_size=1000' >> bfs.flag

for((i=0;i<$ns_num;i++));
do
Expand Down
1 change: 1 addition & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ DEFINE_int32(node_index, 0, "Nameserver node index");
DEFINE_int32(snapshot_step, 1000, "Number of entries in one package");
DEFINE_int32(logdb_log_size, 128, "Logdb log size, in MB");
DEFINE_int32(log_replicate_timeout, 10, "Syncronized log replication timeout, in seconds");
DEFINE_int32(log_batch_size, 100, "Log number in one package");
// ha - master_slave
DEFINE_string(master_slave_role, "master", "This server's role in master/slave ha strategy");
DEFINE_int64(master_slave_log_limit, 20000000, "Master will keep at most x log entries");
Expand Down
108 changes: 73 additions & 35 deletions src/nameserver/master_slave.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ DECLARE_int64(master_slave_log_limit);
DECLARE_int32(master_log_gc_interval);
DECLARE_int32(logdb_log_size);
DECLARE_int32(log_replicate_timeout);
DECLARE_int32(log_batch_size);

namespace baidu {
namespace bfs {
Expand Down Expand Up @@ -107,6 +108,12 @@ void MasterSlaveImpl::Init(SyncCallbacks callbacks) {
}
applied_idx_++;
}
if (!IsLeader()) {
// need to clear sync_idx and applied idx in case this is a retired master
// which contains dirty data
sync_idx_ = -1;
applied_idx_ = -1;
}

rpc_client_ = new RpcClient();
rpc_client_->GetStub(slave_addr_, &slave_stub_);
Expand Down Expand Up @@ -266,10 +273,13 @@ void MasterSlaveImpl::AppendLog(::google::protobuf::RpcController* controller,
done->Run();
return;
}

for (int32_t i = 0; i < request->log_data_size(); ++i) {
log_callback_(request->log_data(i));
}
mu_.Lock();
current_idx_++;
current_idx_ += request->log_data_size();
mu_.Unlock();
log_callback_(request->log_data());
response->set_success(true);
done->Run();
}
Expand All @@ -291,6 +301,11 @@ void MasterSlaveImpl::Snapshot(::google::protobuf::RpcController* controller,
LOG(INFO, "%s Start to clean up the old namespace...", kLogPrefix.c_str());
erase_callback_();
LOG(INFO, "%s Done clean up the old namespace...", kLogPrefix.c_str());
term_ = request->term();
StatusCode s = logdb_->WriteMarker("term", term_);
if (s != kOK) {
LOG(FATAL, "%s Write marker term %ld failed", kLogPrefix.c_str(), term_);
}
slave_snapshot_seq_ = 0;
} else if (seq != slave_snapshot_seq_) {
LOG(INFO, "%s snapshot mismatch reqeust seq = %ld, slave sseq = %ld",
Expand Down Expand Up @@ -331,53 +346,66 @@ void MasterSlaveImpl::BackgroundLog() {

void MasterSlaveImpl::ReplicateLog() {
while (sync_idx_ < current_idx_) {
std::string entry;
{
MutexLock lock(&mu_);
if (sync_idx_ == current_idx_) {
mu_.Unlock();
break;
}
LOG(DEBUG, "%s ReplicateLog sync_idx_ = %d, current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_, current_idx_);
if (sync_idx_ <= gc_idx_ && gc_idx_ != -1) {
if (!SendSnapshot()) {
LOG(INFO, "%s Send snapshot failed", kLogPrefix.c_str());
EmptyLog();
}
continue;
}
StatusCode s = logdb_->Read(sync_idx_ + 1, &entry);
if (s != kOK) {
LOG(FATAL, "%s Read logdb_ failed sync_idx_ = %ld %s",
kLogPrefix.c_str(), sync_idx_ + 1, StatusCode_Name(s).c_str());
MutexLock lock(&mu_);
if (sync_idx_ == current_idx_) {
break;
}
LOG(DEBUG, "%s ReplicateLog sync_idx_ = %d, current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_, current_idx_);

if (sync_idx_ <= gc_idx_ && gc_idx_ != -1) {
mu_.Unlock();
if (!SendSnapshot()) {
LOG(INFO, "%s Send snapshot failed", kLogPrefix.c_str());
EmptyLog();
}
mu_.Lock();
continue;
}

master_slave::AppendLogRequest request;
master_slave::AppendLogResponse response;
request.set_log_data(entry);
request.set_index(sync_idx_ + 1);
std::string entry;
for (int i = 0; i < FLAGS_log_batch_size; ++i) {
StatusCode s = logdb_->Read(sync_idx_ + 1 + i, &entry);
if (s != kOK && s != kNsNotFound) {
LOG(FATAL, "%s Read logdb_ failed sync_idx_ = %ld ",
kLogPrefix.c_str(), sync_idx_ + 1);
}
if (s == kNsNotFound) {
break;
}
request.add_log_data(entry);
}
if (request.log_data_size() == 0) { // maybe slave is way behind
continue;
}
mu_.Unlock();
if (!rpc_client_->SendRequest(slave_stub_,
&master_slave::MasterSlave_Stub::AppendLog,
&request, &response, 15, 1)) {
LOG(WARNING, "%s Replicate log failed index = %d, current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_ + 1, current_idx_);
LOG(WARNING, "%s Replicate log failed index = %d, size = %d current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_ + 1, request.log_data_size(), current_idx_);
EmptyLog();
mu_.Lock();
continue;
}
mu_.Lock();
if (!response.success()) { // log mismatch
MutexLock lock(&mu_);
sync_idx_ = response.index() - 1;
LOG(INFO, "%s set sync_idx_ to %d", kLogPrefix.c_str(), sync_idx_);
continue;
}
thread_pool_->AddTask(std::bind(&MasterSlaveImpl::ProcessCallback,
this, sync_idx_ + 1, false));
mu_.Lock();
sync_idx_++;
LOG(DEBUG, "%s Replicate log done. sync_idx_ = %d, current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_ , current_idx_);
sync_idx_ = sync_idx_ + request.log_data_size();
mu_.Unlock();
for (int32_t i = 0; i < request.log_data_size(); ++i) {
thread_pool_->AddTask(std::bind(&MasterSlaveImpl::ProcessCallback,
this, sync_idx_ - i, false));
}
LOG(DEBUG, "%s Replicate log done. sync_idx_ = %d, size = %d current_idx_ = %d",
kLogPrefix.c_str(), sync_idx_, request.log_data_size(), current_idx_);
mu_.Lock();
}
log_done_.Signal();
}
Expand Down Expand Up @@ -409,11 +437,21 @@ bool MasterSlaveImpl::SendSnapshot() {
request.set_data(logstr);
request.set_seq(seq);
request.set_index(current_index);
while (!rpc_client_->SendRequest(slave_stub_,
request.set_term(term_);
for (int i = 0; i < 5; ++i) {
if (rpc_client_->SendRequest(slave_stub_,
&master_slave::MasterSlave_Stub::Snapshot,
&request, &response, 15, 1)) {
LOG(WARNING, "%s Send snapshot failed seq %ld",
kLogPrefix.c_str(), seq);
break;
} else {
LOG(WARNING, "%s Send snapshot failed seq %ld",
kLogPrefix.c_str(), seq);
if (i == 4) {
snapshot_callback_(0, NULL);
return false;
}
sleep(5);
}
}
LOG(INFO, "%s Send snapshot seq %ld done", kLogPrefix.c_str(), seq);
if (!response.success()) {
Expand Down
3 changes: 2 additions & 1 deletion src/proto/master_slave.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package baidu.bfs.master_slave;
option cc_generic_services = true;

message AppendLogRequest {
optional bytes log_data = 1;
repeated bytes log_data = 1;
optional int64 index = 2;
optional int64 term = 3;
}
Expand All @@ -17,6 +17,7 @@ message SnapshotRequest {
optional bytes data = 1;
optional int64 seq = 2;
optional int64 index = 3;
optional int64 term = 4;
}

message SnapshotResponse {
Expand Down

0 comments on commit ad878bb

Please sign in to comment.