diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 5707688fc..8ce26a5ed 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -1734,7 +1734,7 @@ void TabletIO::ListSnapshot(std::vector* snapshot_id) { } } -uint64_t TabletIO::Rollback(uint64_t snapshot_id, StatusCode* status) { +uint64_t TabletIO::GetRollback(uint64_t snapshot_id, StatusCode* status) { uint64_t sequence; { MutexLock lock(&m_mutex); diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index 55056f286..3a2d384bc 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -152,7 +152,7 @@ class TabletIO { bool ReleaseSnapshot(uint64_t snapshot_id, StatusCode* status = NULL); void ListSnapshot(std::vector* snapshot_id); - uint64_t Rollback(uint64_t snapshot_id, StatusCode* status); + uint64_t GetRollback(uint64_t snapshot_id, StatusCode* status); uint32_t GetLGidByCFName(const std::string& cfname); diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 40a6d4ebf..890ea6be6 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -204,6 +204,7 @@ bool MasterImpl::Restore(const std::map& tabletnode_li m_zk_adapter->UpdateRootTabletNode(meta_tablet_addr); RestoreUserTablet(tablet_list); + RestoreRollback(); // restore success m_restored = true; @@ -2449,7 +2450,7 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, CHECK(tablet->GetStatus() == kTableOnLoad); StatusCode status = response->status(); delete request; - delete response; + // delete response; const std::string& server_addr = tablet->GetServerAddr(); // server down @@ -2483,6 +2484,20 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, } ProcessReadyTablet(tablet); + // process rollback info + for (int32_t i = 0; i < response->rollbacks_size(); ++i) { + VLOG(10) << "Update rollback: tablet " << tablet->GetPath() << " " + << response->rollbacks(i).ShortDebugString(); + tablet->UpdateRollback(response->rollbacks(i).name(), + response->rollbacks(i).snapshot_id(), + response->rollbacks(i).rollback_point()); + } + WriteClosure* done = + NewClosure(this, &MasterImpl::LoadRollbackCallback, + tablet, FLAGS_tera_master_meta_retry_times, response); + BatchWriteMetaTableAsync(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), + false, done); + // load next node->FinishLoad(tablet); TabletPtr next_tablet; @@ -2540,6 +2555,44 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, FLAGS_tera_master_control_tabletnode_retry_period, task); } +void MasterImpl::LoadRollbackCallback(TabletPtr tablet, int32_t retry_times, + LoadTabletResponse* rpc_response, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to update rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablet << "..."; + } else { + LOG(WARNING) << "fail to update rollback to meta: " + << StatusCodeToString(status) << ", " << tablet << "..."; + } + if (retry_times <= 0) { + for (int32_t i = 0; i < rpc_response->rollbacks_size(); ++i) { + const Rollback& cur_rollback = rpc_response->rollbacks(i); + tablet->UpdateRollback(cur_rollback.name(), cur_rollback.snapshot_id(), + leveldb::kMaxSequenceNumber); + } + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::LoadRollbackCallback, tablet, + retry_times - 1, rpc_response); + SuspendMetaOperation(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), + false, done); + } + delete rpc_response; + return; + } + for (int32_t i = 0; i < rpc_response->rollbacks_size(); ++i) { + VLOG(10) << "LoadRollbackCallback " << tablet->GetPath() << ": " + << rpc_response->rollbacks(i).ShortDebugString(); + } + delete rpc_response; +} + bool MasterImpl::UnloadTabletSync(const std::string& table_name, const std::string& key_start, const std::string& key_end, @@ -3040,33 +3093,118 @@ void MasterImpl::GetRollback(const RollbackRequest* request, return; } - RollbackTask* task = new RollbackTask; - table->GetTablet(&task->tablets); + bool rollback_exist, rollback_done; + table->GetRollbackStatus(request->rollback_name(), &rollback_exist, &rollback_done); + if (rollback_exist) { + response->set_status(kMasterOk); + if (rollback_done) { + LOG(INFO) << "rollback " << request->rollback_name() << " already exists and done"; + response->set_done(true); + done->Run(); + return; + } else { + LOG(INFO) << "rollback " << request->rollback_name() + << " already exists but has not complete yet"; + response->set_done(false); + done->Run(); + return; + } + } - assert(task->tablets.size()); + std::vector tablets; + table->GetTablet(&tablets); + // write memory and meta with default rollback_point + int sid = table->AddRollback(request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->AddRollback(request->rollback_name(), request->snapshot_id(), + leveldb::kMaxSequenceNumber); + assert(sid == tsid); + } + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, tablets, + FLAGS_tera_master_meta_retry_times, request, response, done); + BatchWriteMetaTableAsync(table, tablets, false, closure); +} - task->rollback_points.resize(task->tablets.size()); - task->request = request; - task->response = response; - task->done = done; +void MasterImpl::AddDefaultRollbackCallback(TablePtr table, + std::vector tablets, + int32_t retry_times, + const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, + google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + // all the row status should be the same + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to write rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablets[0] << "..."; + } else { + LOG(WARNING) << "fail to write rollback to meta: " + << StatusCodeToString(status) << ", " << tablets[0] << "..."; + } + if (retry_times <= 0) { + int sid = table->DelRollback(rpc_request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->DelRollback(rpc_request->rollback_name()); + assert(sid == tsid); + } + rpc_response->set_status(kMetaTabletError); + rpc_done->Run(); + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, + tablets, retry_times - 1, rpc_request, rpc_response, + rpc_done); + SuspendMetaOperation(table, tablets, false, done); + } + return; + } + LOG(INFO) << "Add default rollback " << rpc_request->rollback_name() << " to " + << rpc_request->table_name() << " done"; + + RollbackTask* task = new RollbackTask; + table->GetTablet(&task->tablets); + task->rollback_points.resize(task->tablets.size(), leveldb::kMaxSequenceNumber); + task->retry.resize(task->tablets.size(), false); + task->request = rpc_request; + task->response = rpc_response; + task->done = rpc_done; task->table = table; task->task_num = 0; task->finish_num = 0; - task->aborted = false; + ApplyRollbackTask(task); + +} + +void MasterImpl::ApplyRollbackTask(RollbackTask* task) { MutexLock lock(&task->mutex); for (uint32_t i = 0; i < task->tablets.size(); ++i) { TabletPtr tablet = task->tablets[i]; ++task->task_num; - RollbackClosure* closure = - NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); - RollbackAsync(tablet, request->snapshot_id(), 3000, closure); - } - if (task->task_num == 0) { - LOG(WARNING) << "fail to rollback to snapshot: " << request->table_name() - << ", all tables kTabletNodeOffLine"; - response->set_status(kTabletNodeOffLine); - done->Run(); - return; + TabletStatus status = tablet->GetStatus(); + if (status == kTableReady) { + VLOG(1) << "rollback tablet ready: " << i; + RollbackClosure* closure = + NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); + RollbackAsync(tablet, task->request->snapshot_id(), 3000, closure); + } else if (status == kTableOnLoad) { + VLOG(1) << "rollback tablet onload: " << i; + task->retry[i] = true; + RollbackCallback(i, task, NULL, NULL, false, 0); + } else { + VLOG(1) << "rollback tablet other: " << StatusCodeToString(status); + RollbackCallback(i, task, NULL, NULL, false, 0); + } } } @@ -3083,8 +3221,7 @@ void MasterImpl::RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, request->mutable_key_range()->set_key_start(tablet->GetKeyStart()); request->mutable_key_range()->set_key_end(tablet->GetKeyEnd()); - LOG(INFO) << "RollbackAsync id: " << request->sequence_id() << ", " - << "server: " << addr; + VLOG(10) << "RollbackAsync id: " << snapshot_id << ", " << "server: " << addr; node_client.Rollback(request, response, done); } @@ -3094,45 +3231,78 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, bool failed, int error_code) { MutexLock lock(&task->mutex); ++task->finish_num; - VLOG(6) << "MasterImpl Rollback id= " << tablet_id - << " finish_num= " << task->finish_num - << ". Return " << master_response->rollback_point(); if (task->finish_num != task->task_num) { - if (!failed && master_response->status() == kTabletNodeOk) { - task->rollback_points[tablet_id] = master_response->rollback_point(); + if (!failed && master_response && master_response->status() == kTabletNodeOk) { + task->rollback_points[tablet_id] = master_response->rollback_point(); + VLOG(6) << "MasterImpl Rollback tablet id= " << tablet_id + << " finish_num= " << task->finish_num + << ". Return " << master_response->rollback_point(); } else { - task->aborted = true; + // rpc failed or tablet not ready, do not update rollback_point + VLOG(1) << "MasterImpl Rollback tablet id= " << tablet_id + << " finish_num= " << task->finish_num + << ". Failed"; } return; } - if (failed || task->aborted) { - LOG(WARNING) << "MasterImpl Rollback fail done"; - task->response->set_status(kTabletNodeOffLine); - task->done->Run(); - } else { + if (!failed && master_response && master_response->status() == kTabletNodeOk) { + VLOG(6) << "MasterImpl Rollback tablet id= " << tablet_id + << " finish_num= " << task->finish_num + << ". Return " << master_response->rollback_point(); task->rollback_points[tablet_id] = master_response->rollback_point(); - LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->AddRollback(task->request->rollback_name()); - for (uint32_t i = 0; i < task->tablets.size(); ++i) { - int tsid = task->tablets[i]->AddRollback(task->request->rollback_name(), - master_request->snapshot_id(), - task->rollback_points[i]); + } else { + VLOG(1) << "MasterImpl Rollback tablet id= " << tablet_id + << " finish_num= " << task->finish_num + << ". Failed"; + // rpc failed or tablet not ready, do not update rollback_point + } + std::vector retry_tablets; + int sid = task->table->GetRollbackSize(); + for (uint32_t i = 0; i < task->tablets.size(); ++i) { + if (!task->retry[i]) { + int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(), + task->request->snapshot_id(), + task->rollback_points[i]); assert(sid == tsid); - } - WriteClosure* closure = - NewClosure(this, &MasterImpl::AddRollbackCallback, - task->table, task->tablets, - FLAGS_tera_master_meta_retry_times, - task->request, task->response, task->done); - BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); + } else { + retry_tablets.push_back(task->tablets[i]); + } + } + + // retry when talets were onloading + if (retry_tablets.size() != 0) { + RollbackTask* retry_task = new RollbackTask; + retry_task->tablets = retry_tablets; + assert(retry_task->tablets.size()); + retry_task->rollback_points.resize(retry_task->tablets.size(), leveldb::kMaxSequenceNumber); + retry_task->retry.resize(retry_task->tablets.size(), false); + retry_task->request = task->request; + retry_task->response = task->response; + retry_task->done = task->done; + retry_task->table = task->table; + retry_task->task_num = 0; + retry_task->finish_num = 0; + ThreadPool::Task task = + boost::bind(&MasterImpl::ApplyRollbackTask, this, retry_task); + m_thread_pool->DelayTask(5, task); } + + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddRollbackCallback, + task->table, task->tablets, retry_tablets.size() == 0, + FLAGS_tera_master_meta_retry_times, + task->request, task->response, task->done); + BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); + + LOG(INFO) << "MasterImpl rollback all tablet done"; task->mutex.Unlock(); delete task; } void MasterImpl::AddRollbackCallback(TablePtr table, std::vector tablets, + bool no_more_retry, int32_t retry_times, const RollbackRequest* rpc_request, RollbackResponse* rpc_response, @@ -3158,21 +3328,66 @@ void MasterImpl::AddRollbackCallback(TablePtr table, << StatusCodeToString(status) << ", " << tablets[0] << "..."; } if (retry_times <= 0) { + ////////// TODO /////////// rpc_response->set_status(kMetaTabletError); - rpc_done->Run(); + if (rpc_done) { + rpc_done->Run(); + } } else { WriteClosure* done = NewClosure(this, &MasterImpl::AddRollbackCallback, table, - tablets, retry_times - 1, rpc_request, rpc_response, + tablets, no_more_retry, retry_times - 1, rpc_request, rpc_response, rpc_done); SuspendMetaOperation(table, tablets, false, done); } return; } - LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() - << ", write meta " << rpc_request->snapshot_id() << " done"; - rpc_response->set_status(kMasterOk); - rpc_done->Run(); + if (no_more_retry) { + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() + << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; + table->SetRollbackStatus(rpc_request->rollback_name(), true); + rpc_response->set_status(kMasterOk); + rpc_response->set_done(true); + if (rpc_done) { + rpc_done->Run(); + } + } else { + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() + << ", write meta with snpashot_id " << rpc_request->snapshot_id() + << " partially done, wait for retry"; + } +} + +void MasterImpl::RestoreRollback() { + std::vector tables; + m_tablet_manager->ShowTable(&tables, NULL); + for (uint32_t table_i = 0; table_i < tables.size(); ++table_i) { + TablePtr cur_table = tables[table_i]; + std::vector > > retries; + cur_table->GetRollbackTablets(&retries); + for (uint32_t ri = 0; ri < retries.size(); ++ri) { + RollbackRequest* request = new RollbackRequest; + RollbackResponse* response = new RollbackResponse; + request->set_sequence_id(0); + request->set_table_name(cur_table->GetTableName()); + request->set_snapshot_id(retries[ri].first.snapshot_id()); + request->set_rollback_name(retries[ri].first.name()); + + RollbackTask* task = new RollbackTask; + task->tablets = retries[ri].second; + task->rollback_points.resize(task->tablets.size(), leveldb::kMaxSequenceNumber); + task->retry.resize(task->tablets.size(), false); + task->request = request; + task->response = response; + task->done = NULL; + task->table = cur_table; + task->task_num = 0; + task->finish_num = 0; + ThreadPool::Task threadpool_task = + boost::bind(&MasterImpl::ApplyRollbackTask, this, task); + m_thread_pool->AddTask(threadpool_task); + } + } } void MasterImpl::ClearUnusedSnapshots(TabletPtr tablet, const TabletMeta& meta) { diff --git a/src/master/master_impl.h b/src/master/master_impl.h index 1488864e4..0acae375c 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -199,11 +199,11 @@ class MasterImpl { google::protobuf::Closure* done; TablePtr table; std::vector tablets; + std::vector retry; std::vector rollback_points; int task_num; int finish_num; mutable Mutex mutex; - bool aborted; }; void SafeModeCmdCtrl(const CmdCtrlRequest* request, @@ -297,14 +297,21 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout, - RollbackClosure* done); + void AddDefaultRollbackCallback(TablePtr table, std::vector tablets, + int32_t retry_times, const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, WriteTabletResponse* response, + bool failed, int error_code); + void ApplyRollbackTask(RollbackTask* task); + void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, + int32_t timeout, RollbackClosure* done); void RollbackCallback(int32_t tablet_id, RollbackTask* task, SnapshotRollbackRequest* master_request, SnapshotRollbackResponse* master_response, bool failed, int error_code); void AddRollbackCallback(TablePtr table, std::vector tablets, + bool no_more_retry, int32_t retry_times, const RollbackRequest* rpc_request, RollbackResponse* rpc_response, @@ -312,7 +319,12 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - + void LoadRollbackCallback(TabletPtr tablet, int32_t retry_times, + LoadTabletResponse* rpc_response, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code); + void RestoreRollback(); void ScheduleQueryTabletNode(); void QueryTabletNode(); void QueryTabletNodeAsync(std::string addr, int32_t timeout, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index da9b4378d..3a6f177bf 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -348,7 +348,7 @@ void Tablet::DelSnapshot(int32_t id) { snapshot_list->RemoveLast(); } -int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { +int32_t Tablet::AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) { MutexLock lock(&m_mutex); Rollback rollback; rollback.set_name(name); @@ -358,11 +358,47 @@ int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rol return m_meta.rollbacks_size() - 1; } +int32_t Tablet::DelRollback(const std::string& name) { + MutexLock lock(&m_mutex); + int32_t rollback_size = m_meta.rollbacks_size(); + bool rollback_exist = false; + for (int32_t i = 0; i < rollback_size; ++i) { + if (m_meta.rollbacks(i).name() == name) { + VLOG(10) << "delete tablet rollback " << m_meta.rollbacks(i).ShortDebugString(); + m_meta.mutable_rollbacks()->SwapElements(i, rollback_size - 1); + m_meta.mutable_rollbacks()->RemoveLast(); + rollback_exist = true; + break; + } + } + assert(rollback_exist); + return m_meta.rollbacks_size() - 1; +} + void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); for (int i = 0; i < m_meta.rollbacks_size(); i++) { rollbacks->push_back(m_meta.rollbacks(i)); + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); + } +} + +int32_t Tablet::UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) { + MutexLock lock(&m_mutex); + bool has_rollback_name = false; + for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { + Rollback* cur_rollback = m_meta.mutable_rollbacks(i); + if (cur_rollback->name() == name) { + has_rollback_name = true; + assert(cur_rollback->snapshot_id() == snapshot_id); + cur_rollback->set_rollback_point(rollback_point); + } + } + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); } + assert(has_rollback_name); + return m_meta.rollbacks_size() - 1; } bool Tablet::IsBound() { @@ -632,15 +668,75 @@ void Table::ListSnapshot(std::vector* snapshots) { *snapshots = m_snapshot_list; } -int32_t Table::AddRollback(std::string rollback_name) { +int32_t Table::AddRollback(const std::string& rollback_name) { MutexLock lock(&m_mutex); - m_rollback_names.push_back(rollback_name); - return m_rollback_names.size() - 1; + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it == m_rollbacks.end()); + m_rollbacks[rollback_name] = false; + return m_rollbacks.size() - 1; +} + +int32_t Table::DelRollback(const std::string& rollback_name) { + MutexLock lock(&m_mutex); + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it != m_rollbacks.end()); + m_rollbacks.erase(it); + return m_rollbacks.size() - 1; } void Table::ListRollback(std::vector* rollback_names) { MutexLock lock(&m_mutex); - *rollback_names = m_rollback_names; + std::map::iterator it = m_rollbacks.begin(); + for (; it != m_rollbacks.end(); ++it) { + rollback_names->push_back(it->first); + } +} + +void Table::GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done) { + MutexLock lock(&m_mutex); + std::map::iterator it = m_rollbacks.find(rollback_name); + if (it == m_rollbacks.end()) { + *exists = false; + *done = false; + } else { + *exists = true; + *done = it->second; + } +} + +void Table::SetRollbackStatus(const std::string& rollback_name, bool done) { + MutexLock lock(&m_mutex); + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it != m_rollbacks.end()); + it->second = done; +} + +void Table::GetRollbackTablets(std::vector > >* rollback_tablets) { + MutexLock lock(&m_mutex); + for (std::map::iterator it = m_rollbacks.begin(); + it != m_rollbacks.end(); ++it) { + if (!it->second) { + VLOG(6) << "rollback " << it->first << " marked undone."; + std::vector tablets; + Rollback rollback; + for (TabletList::iterator tablet_it = m_tablets_list.begin(); tablet_it != m_tablets_list.end(); + ++tablet_it) { + TabletPtr cur_tablet = tablet_it->second; + for (int32_t ri = 0; ri < cur_tablet->m_meta.rollbacks_size(); ++ri) { + if (cur_tablet->m_meta.rollbacks(ri).name() == it->first) { + rollback.CopyFrom(cur_tablet->m_meta.rollbacks(ri)); + tablets.push_back(cur_tablet); + } + } + } + if (tablets.empty()) { + it->second = true; + VLOG(6) << "rollback " << it->first << " already done."; + } else { + rollback_tablets->push_back(std::make_pair(rollback, tablets)); + } + } + } } void Table::AddDeleteTabletCount() { @@ -672,8 +768,9 @@ void Table::ToMeta(TableMeta* meta) { for (size_t i = 0; i < m_snapshot_list.size(); i++) { meta->add_snapshot_list(m_snapshot_list[i]); } - for (size_t i = 0; i < m_rollback_names.size(); ++i) { - meta->add_rollback_names(m_rollback_names[i]); + std::map::iterator it = m_rollbacks.begin(); + for (; it != m_rollbacks.end(); ++it) { + meta->add_rollback_names(it->first); } } @@ -769,7 +866,7 @@ bool TabletManager::AddTable(const std::string& table_name, LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i); } for (int32_t i = 0; i < meta.rollback_names_size(); ++i) { - (*table)->m_rollback_names.push_back(meta.rollback_names(i)); + (*table)->m_rollbacks[meta.rollback_names(i)] = false; LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i); } (*table)->m_mutex.Unlock(); diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index d1aee8091..3e8c3aa76 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -106,7 +106,9 @@ class Tablet { int32_t AddSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); - int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t DelRollback(const std::string& name); + int32_t UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point); void ListRollback(std::vector* rollbacks); // is belong to a table? @@ -175,8 +177,13 @@ class Table { int32_t AddSnapshot(uint64_t snapshot); int32_t DelSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshots); - int32_t AddRollback(std::string rollback_name); + int32_t AddRollback(const std::string& rollback_name); + int32_t DelRollback(const std::string& rollback_name); void ListRollback(std::vector* rollback_names); + int32_t GetRollbackSize() {return m_rollbacks.size() - 1;} + void GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done); + void SetRollbackStatus(const std::string& rollback_name, bool done); + void GetRollbackTablets(std::vector > >* rollback_tablets); void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, @@ -195,7 +202,7 @@ class Table { std::string m_name; TableSchema m_schema; std::vector m_snapshot_list; - std::vector m_rollback_names; + std::map m_rollbacks; TableStatus m_status; uint32_t m_deleted_tablet_num; uint64_t m_max_tablet_no; diff --git a/src/proto/master_rpc.proto b/src/proto/master_rpc.proto index 1674a33f8..2a46d83d0 100644 --- a/src/proto/master_rpc.proto +++ b/src/proto/master_rpc.proto @@ -203,8 +203,9 @@ message RollbackRequest { } message RollbackResponse { - optional StatusCode status = 1; - optional uint64 sequence_id = 2; + optional uint64 sequence_id = 1; + optional StatusCode status = 2; + optional bool done = 3; } // admin diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 53b0df546..5b90f495a 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -76,6 +76,7 @@ message LoadTabletRequest { message LoadTabletResponse { required StatusCode status = 1; required uint64 sequence_id = 2; + repeated Rollback rollbacks = 3; } message UnloadTabletRequest { diff --git a/src/sdk/client_impl.cc b/src/sdk/client_impl.cc index 42a67ac54..38ca4c831 100644 --- a/src/sdk/client_impl.cc +++ b/src/sdk/client_impl.cc @@ -812,8 +812,13 @@ bool ClientImpl::Rollback(const string& name, uint64_t snapshot, if (master_client.GetRollback(&request, &response)) { if (response.status() == kMasterOk) { - std::cout << name << " rollback to snapshot sucessfully" << std::endl; - return true; + if (response.done()) { + std::cout << name << " rollback to snapshot sucessfully" << std::endl; + return true; + } else { + std::cout << name << " rollback has not complete yet" << std::endl; + return false; + } } } err->SetFailed(ErrorCode::kSystem, StatusCodeToString(response.status())); diff --git a/src/tabletnode/remote_tabletnode.cc b/src/tabletnode/remote_tabletnode.cc index 5587bb76f..512f99997 100644 --- a/src/tabletnode/remote_tabletnode.cc +++ b/src/tabletnode/remote_tabletnode.cc @@ -320,7 +320,7 @@ void RemoteTabletNode::DoRollback(google::protobuf::RpcController* controller, google::protobuf::Closure* done) { uint64_t id = request->sequence_id(); LOG(INFO) << "accept RPC (Rollback) id: " << id; - m_tabletnode_impl->Rollback(request, response, done); + m_tabletnode_impl->GetRollback(request, response, done); LOG(INFO) << "finish RPC (Rollback) id: " << id; } diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index 5a500d423..b59d6a6d6 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -237,14 +237,8 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, snapshots[request->snapshots_id(i)] = request->snapshots_sequence(i); } - // to recover rollbacks + //////////// TODO /////////////////// std::map rollbacks; - int32_t num_of_rollbacks = request->rollbacks_size(); - for (int32_t i = 0; i < num_of_rollbacks; ++i) { - rollbacks[request->rollbacks(i).snapshot_id()] = request->rollbacks(i).rollback_point(); - VLOG(10) << "load tablet with rollback " << request->rollbacks(i).snapshot_id() - << "-" << request->rollbacks(i).rollback_point(); - } LOG(INFO) << "start load tablet, id: " << request->sequence_id() << ", table: " << request->tablet_name() @@ -290,6 +284,21 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, tablet_io->DecRef(); response->set_status(kTabletNodeOk); } + // recover rollbacks + int32_t num_of_rollbacks = request->rollbacks_size(); + for (int32_t i = 0; i < num_of_rollbacks; ++i) { + Rollback cur_rollback = request->rollbacks(i); + uint64_t rollback_point = tablet_io->GetRollback(cur_rollback.snapshot_id(), &status); + VLOG(10) << "recover rollback: " << cur_rollback.ShortDebugString(); + if (cur_rollback.rollback_point() == leveldb::kMaxSequenceNumber) { + Rollback rollback; + rollback.set_name(cur_rollback.name()); + rollback.set_snapshot_id(cur_rollback.snapshot_id()); + rollback.set_rollback_point(rollback_point); + response->add_rollbacks()->CopyFrom(rollback); + VLOG(10) << "new rollback: " << rollback.ShortDebugString(); + } + } LOG(INFO) << "load tablet: " << request->path() << " [" << DebugString(key_start) << ", " << DebugString(key_end) << "]"; @@ -601,7 +610,7 @@ void TabletNodeImpl::ReleaseSnapshot(const ReleaseSnapshotRequest* request, done->Run(); } -void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, +void TabletNodeImpl::GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, google::protobuf::Closure* done) { StatusCode status = kTabletNodeOk; io::TabletIO* tablet_io = m_tablet_manager->GetTablet(request->table_name(), @@ -617,7 +626,7 @@ void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRo done->Run(); return; } - uint64_t rollback_point = tablet_io->Rollback(request->snapshot_id(), &status); + uint64_t rollback_point = tablet_io->GetRollback(request->snapshot_id(), &status); if (status != kTabletNodeOk) { response->set_status(status); } else { diff --git a/src/tabletnode/tabletnode_impl.h b/src/tabletnode/tabletnode_impl.h index 7e847c6a6..fd03676db 100644 --- a/src/tabletnode/tabletnode_impl.h +++ b/src/tabletnode/tabletnode_impl.h @@ -82,7 +82,7 @@ class TabletNodeImpl { ReleaseSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, + void GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, google::protobuf::Closure* done); void Query(const QueryRequest* request, QueryResponse* response, diff --git a/src/teracli_main.cc b/src/teracli_main.cc index 99a72d948..f0e4fbdc8 100644 --- a/src/teracli_main.cc +++ b/src/teracli_main.cc @@ -295,7 +295,7 @@ int32_t DropOp(Client* client, int32_t argc, char** argv, ErrorCode* err) { std::string tablename = argv[2]; if (!client->DeleteTable(tablename, err)) { - LOG(ERROR) << "fail to delete table, " << err->GetReason(); + LOG(ERROR) << "fail to delete table"; return -1; } return 0; diff --git a/test/testcase/common.py b/test/testcase/common.py index 218044468..3741a2cbd 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -97,7 +97,9 @@ def create_kv_table(): ret = subprocess.Popen(const.teracli_binary + ' create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) print ''.join(ret.stderr.readlines()) - + ret = parse_showinfo() + print ret + def create_singleversion_table(): print 'create single version table' diff --git a/test/testcase/test_rollback.py b/test/testcase/test_rollback.py new file mode 100644 index 000000000..d05f0f877 --- /dev/null +++ b/test/testcase/test_rollback.py @@ -0,0 +1,151 @@ +""" +Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. +""" + +import nose +import time + +import common +from conf import const + + +@nose.tools.with_setup(common.create_kv_table, common.cleanup) +def test_rollback_kv(): + """ + test kv rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table(): + """ + test table rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + +@nose.tools.with_setup(common.create_kv_table) +def test_rollback_kv_relaunch(): + """ + test kv rollback w/relaunch + 1. test_rollback_kv() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_kv() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table_relaunch(): + """ + test table rollback w/relaunch + 1. test_rollback_table() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_table() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_kv_multitablets(): + """ + test kv rollback w/multi tablets + 1. test_rollback_kv_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'kv.schema', deli=const.data_path + 'deli.10') + test_rollback_kv_relaunch() + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_table_multitablets(): + """ + test table rollback w/multi tablets + 1. test_rollback_table_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'table.schema', deli=const.data_path + 'deli.10') + test_rollback_table_relaunch()