Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 83 additions & 24 deletions src/master/master_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2384,14 +2384,13 @@ void MasterImpl::LoadTabletAsync(TabletPtr tablet, LoadClosure* done, uint64_t)
request->add_snapshots_id(snapshot_id[i]);
request->add_snapshots_sequence(snapshot_seq[i]);
}
std::vector<uint64_t> rollback_snapshots;
std::vector<uint64_t> rollback_points;
table->ListRollback(&rollback_snapshots);
tablet->ListRollback(&rollback_points);
assert(rollback_snapshots.size() == rollback_points.size());
for (uint32_t i = 0; i < rollback_snapshots.size(); ++i) {
request->add_rollback_snapshots(rollback_snapshots[i]);
request->add_rollback_points(rollback_points[i]);
std::vector<std::string> rollback_names;
std::vector<Rollback> rollbacks;
table->ListRollback(&rollback_names);
tablet->ListRollback(&rollbacks);
assert(rollback_names.size() == rollbacks.size());
for (uint32_t i = 0; i < rollbacks.size(); ++i) {
request->add_rollbacks()->CopyFrom(rollbacks[i]);
}

TabletMeta meta;
Expand Down Expand Up @@ -2981,9 +2980,9 @@ void MasterImpl::ReleaseSnapshotCallback(ReleaseSnapshotRequest* request,
/// 删掉删不掉无所谓, 不计较~
}

void MasterImpl::Rollback(const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
void MasterImpl::GetRollback(const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
LOG(INFO) << "MasterImpl Rollback";
response->set_sequence_id(request->sequence_id());

Expand All @@ -3005,31 +3004,88 @@ void MasterImpl::Rollback(const RollbackRequest* request,
return;
}

std::vector<TabletPtr> tablets;
table->GetTablet(&tablets);
// write memory and meta with default rollback_point
int sid = table->AddRollback(request->rollback_name());
for (uint32_t i = 0; i < tablets.size(); ++i) {
int tsid = tablets[i]->AddRollback(request->rollback_name(), request->snapshot_id(),
leveldb::kMaxSequenceNumber);
assert(sid == tsid);
}
WriteClosure* closure =
NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, tablets,
FLAGS_tera_master_meta_retry_times, request, response, done);
BatchWriteMetaTableAsync(table, tablets, false, closure);
}

void MasterImpl::AddDefaultRollbackCallback(TablePtr table,
std::vector<TabletPtr> tablets,
int32_t retry_times,
const RollbackRequest* rpc_request,
RollbackResponse* rpc_response,
google::protobuf::Closure* rpc_done,
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code) {
StatusCode status = response->status();
if (!failed && status == kTabletNodeOk) {
// all the row status should be the same
CHECK_GT(response->row_status_list_size(), 0);
status = response->row_status_list(0);
}
delete request;
delete response;
if (failed || status != kTabletNodeOk) {
if (failed) {
LOG(WARNING) << "fail to write rollback to meta: "
<< sofa::pbrpc::RpcErrorCodeToString(error_code) << ", "
<< tablets[0] << "...";
} else {
LOG(WARNING) << "fail to write rollback to meta: "
<< StatusCodeToString(status) << ", " << tablets[0] << "...";
}
if (retry_times <= 0) {
rpc_response->set_status(kMetaTabletError);
rpc_done->Run();
} else {
WriteClosure* done =
NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table,
tablets, retry_times - 1, rpc_request, rpc_response,
rpc_done);
SuspendMetaOperation(table, tablets, false, done);
}
return;
}
LOG(INFO) << "Add default rollback " << rpc_request->rollback_name() << " to "
<< rpc_request->table_name() << " done";

RollbackTask* task = new RollbackTask;
table->GetTablet(&task->tablets);

assert(task->tablets.size());

task->rollback_points.resize(task->tablets.size());
task->request = request;
task->response = response;
task->done = done;
task->request = rpc_request;
task->response = rpc_response;
task->done = rpc_done;
task->table = table;
task->task_num = 0;
task->finish_num = 0;
task->aborted = false;
MutexLock lock(&task->mutex);
for (uint32_t i = 0; i < task->tablets.size(); ++i) {
TabletPtr tablet = task->tablets[i];
++task->task_num;
RollbackClosure* closure =
NewClosure(this, &MasterImpl::RollbackCallback, static_cast<int32_t>(i), task);
RollbackAsync(tablet, request->snapshot_id(), 3000, closure);
RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure);
}
if (task->task_num == 0) {
LOG(WARNING) << "fail to rollback to snapshot: " << request->table_name()
LOG(WARNING) << "fail to rollback to snapshot: " << rpc_request->table_name()
<< ", all tables kTabletNodeOffLine";
response->set_status(kTabletNodeOffLine);
done->Run();
rpc_done->Run();
return;
}
}
Expand Down Expand Up @@ -3077,18 +3133,21 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task,
} else {
task->rollback_points[tablet_id] = master_response->rollback_point();
LOG(INFO) << "MasterImpl rollback all tablet done";
int sid = task->table->AddRollback(master_request->snapshot_id());
int sid = task->table->GetRollbackSize();
for (uint32_t i = 0; i < task->tablets.size(); ++i) {
int tsid = task->tablets[i]->AddRollback(task->rollback_points[i]);
int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(),
master_request->snapshot_id(),
task->rollback_points[i]);
assert(sid == tsid);
}
WriteClosure* closure =
NewClosure(this, &MasterImpl::AddRollbackCallback,
task->table, task->tablets,
FLAGS_tera_master_meta_retry_times,
task->request, task->response, task->done);
task->table, task->tablets,
FLAGS_tera_master_meta_retry_times,
task->request, task->response, task->done);
BatchWriteMetaTableAsync(task->table, task->tablets, false, closure);
}
task->mutex.Unlock();
delete task;
}

Expand Down Expand Up @@ -3130,8 +3189,8 @@ void MasterImpl::AddRollbackCallback(TablePtr table,
}
return;
}
LOG(INFO) << "Rollback " << rpc_request->table_name()
<< ", write meta " << rpc_request->snapshot_id() << " done";
LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name()
<< ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done";
rpc_response->set_status(kMasterOk);
rpc_done->Run();
}
Expand Down
12 changes: 8 additions & 4 deletions src/master/master_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MasterImpl {
DelSnapshotResponse* response,
google::protobuf::Closure* done);

void Rollback(const RollbackRequest* request,
void GetRollback(const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done);

Expand Down Expand Up @@ -297,8 +297,13 @@ class MasterImpl {
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code);
void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout,
RollbackClosure* done);
void AddDefaultRollbackCallback(TablePtr table, std::vector<TabletPtr> tablets,
int32_t retry_times, const RollbackRequest* rpc_request,
RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done,
WriteTabletRequest* request, WriteTabletResponse* response,
bool failed, int error_code);
void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id,
int32_t timeout, RollbackClosure* done);
void RollbackCallback(int32_t tablet_id, RollbackTask* task,
SnapshotRollbackRequest* master_request,
SnapshotRollbackResponse* master_response,
Expand All @@ -312,7 +317,6 @@ class MasterImpl {
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code);

void ScheduleQueryTabletNode();
void QueryTabletNode();
void QueryTabletNodeAsync(std::string addr, int32_t timeout,
Expand Down
16 changes: 8 additions & 8 deletions src/master/remote_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ void RemoteMaster::DelSnapshot(google::protobuf::RpcController* controller,
m_thread_pool->AddTask(callback);
}

void RemoteMaster::Rollback(google::protobuf::RpcController* controller,
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
void RemoteMaster::GetRollback(google::protobuf::RpcController* controller,
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
ThreadPool::Task callback =
boost::bind(&RemoteMaster::DoRollback, this, controller,
request, response, done);
Expand Down Expand Up @@ -187,11 +187,11 @@ void RemoteMaster::DoDelSnapshot(google::protobuf::RpcController* controller,
}

void RemoteMaster::DoRollback(google::protobuf::RpcController* controller,
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done) {
LOG(INFO) << "accept RPC (Rollback)";
m_master_impl->Rollback(request, response, done);
m_master_impl->GetRollback(request, response, done);
LOG(INFO) << "finish RPC (Rollback)";
}

Expand Down
8 changes: 4 additions & 4 deletions src/master/remote_master.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class RemoteMaster : public MasterServer {
DelSnapshotResponse* response,
google::protobuf::Closure* done);

void Rollback(google::protobuf::RpcController* controller,
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done);
void GetRollback(google::protobuf::RpcController* controller,
const RollbackRequest* request,
RollbackResponse* response,
google::protobuf::Closure* done);

void CreateTable(google::protobuf::RpcController* controller,
const CreateTableRequest* request,
Expand Down
55 changes: 39 additions & 16 deletions src/master/tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,42 @@ void Tablet::DelSnapshot(int32_t id) {
snapshot_list->RemoveLast();
}

int32_t Tablet::AddRollback(uint64_t rollback_point) {
int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) {
MutexLock lock(&m_mutex);
m_meta.add_rollback_points(rollback_point);
return m_meta.rollback_points_size() - 1;
Rollback rollback;
rollback.set_name(name);
rollback.set_snapshot_id(snapshot_id);
rollback.set_rollback_point(rollback_point);
m_meta.add_rollbacks()->CopyFrom(rollback);
return m_meta.rollbacks_size() - 1;
}

void Tablet::ListRollback(std::vector<uint64_t>* rollback_points) {
void Tablet::ListRollback(std::vector<Rollback>* rollbacks) {
MutexLock lock(&m_mutex);
for (int i = 0; i < m_meta.rollback_points_size(); i++) {
rollback_points->push_back(m_meta.rollback_points(i));
for (int i = 0; i < m_meta.rollbacks_size(); i++) {
rollbacks->push_back(m_meta.rollbacks(i));
VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString();
}
}

int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) {
MutexLock lock(&m_mutex);
bool has_rollback_name = false;
for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) {
Rollback* cur_rollback = m_meta.mutable_rollbacks(i);
if (cur_rollback->name() == name) {
has_rollback_name = true;
assert(cur_rollback->snapshot_id() == snapshot_id);
cur_rollback->set_rollback_point(rollback_point);
}
}
for (int i = 0; i < m_meta.rollbacks_size(); i++) {
VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString();
}
assert(has_rollback_name);
return m_meta.rollbacks_size() - 1;
}

bool Tablet::IsBound() {
TablePtr null_ptr;
if (m_table != null_ptr) {
Expand Down Expand Up @@ -628,15 +651,15 @@ void Table::ListSnapshot(std::vector<uint64_t>* snapshots) {
*snapshots = m_snapshot_list;
}

int32_t Table::AddRollback(uint64_t snapshot_id) {
int32_t Table::AddRollback(std::string rollback_name) {
MutexLock lock(&m_mutex);
m_rollback_snapshots.push_back(snapshot_id);
return m_rollback_snapshots.size() - 1;
m_rollback_names.push_back(rollback_name);
return m_rollback_names.size() - 1;
}

void Table::ListRollback(std::vector<uint64_t>* snapshots) {
void Table::ListRollback(std::vector<std::string>* rollback_names) {
MutexLock lock(&m_mutex);
*snapshots = m_rollback_snapshots;
*rollback_names = m_rollback_names;
}

void Table::AddDeleteTabletCount() {
Expand Down Expand Up @@ -668,8 +691,8 @@ void Table::ToMeta(TableMeta* meta) {
for (size_t i = 0; i < m_snapshot_list.size(); i++) {
meta->add_snapshot_list(m_snapshot_list[i]);
}
for (size_t i = 0; i < m_rollback_snapshots.size(); ++i) {
meta->add_rollback_snapshot(m_rollback_snapshots[i]);
for (size_t i = 0; i < m_rollback_names.size(); ++i) {
meta->add_rollback_names(m_rollback_names[i]);
}
}

Expand Down Expand Up @@ -764,9 +787,9 @@ bool TabletManager::AddTable(const std::string& table_name,
(*table)->m_snapshot_list.push_back(meta.snapshot_list(i));
LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i);
}
for (int32_t i = 0; i < meta.rollback_snapshot_size(); ++i) {
(*table)->m_rollback_snapshots.push_back(meta.rollback_snapshot(i));
LOG(INFO) << table_name << " add rollback " << meta.rollback_snapshot(i);
for (int32_t i = 0; i < meta.rollback_names_size(); ++i) {
(*table)->m_rollback_names.push_back(meta.rollback_names(i));
LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i);
}
(*table)->m_mutex.Unlock();
return true;
Expand Down
12 changes: 7 additions & 5 deletions src/master/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ class Tablet {
int32_t AddSnapshot(uint64_t snapshot);
void ListSnapshot(std::vector<uint64_t>* snapshot);
void DelSnapshot(int32_t id);
int32_t AddRollback(uint64_t rollback_point);
void ListRollback(std::vector<uint64_t>* rollback_points);
int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point);
int32_t UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point);
void ListRollback(std::vector<Rollback>* rollbacks);

// is belong to a table?
bool IsBound();
Expand Down Expand Up @@ -175,8 +176,9 @@ class Table {
int32_t AddSnapshot(uint64_t snapshot);
int32_t DelSnapshot(uint64_t snapshot);
void ListSnapshot(std::vector<uint64_t>* snapshots);
int32_t AddRollback(uint64_t rollback_snapshot);
void ListRollback(std::vector<uint64_t>* rollback_snapshots);
int32_t AddRollback(std::string rollback_name);
void ListRollback(std::vector<std::string>* rollback_names);
int32_t GetRollbackSize() {return m_rollback_names.size() - 1;}
void AddDeleteTabletCount();
bool NeedDelete();
void ToMetaTableKeyValue(std::string* packed_key = NULL,
Expand All @@ -195,7 +197,7 @@ class Table {
std::string m_name;
TableSchema m_schema;
std::vector<uint64_t> m_snapshot_list;
std::vector<uint64_t> m_rollback_snapshots;
std::vector<std::string> m_rollback_names;
TableStatus m_status;
uint32_t m_deleted_tablet_num;
uint64_t m_max_tablet_no;
Expand Down
8 changes: 4 additions & 4 deletions src/proto/master_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ bool MasterClient::DelSnapshot(const DelSnapshotRequest* request,
"DelSnapshot", m_rpc_timeout);
}

bool MasterClient::Rollback(const RollbackRequest* request,
RollbackResponse* response) {
return SendMessageWithRetry(&MasterServer::Stub::Rollback,
bool MasterClient::GetRollback(const RollbackRequest* request,
RollbackResponse* response) {
return SendMessageWithRetry(&MasterServer::Stub::GetRollback,
request, response,
(Closure<void, RollbackRequest*, RollbackResponse*, bool, int>*)NULL,
"Rollback", m_rpc_timeout);
"GetRollback", m_rpc_timeout);
}

bool MasterClient::CreateTable(const CreateTableRequest* request,
Expand Down
Loading