Skip to content
2 changes: 1 addition & 1 deletion src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ void TabletIO::ListSnapshot(std::vector<uint64_t>* snapshot_id) {
}
}

uint64_t TabletIO::Rollback(uint64_t snapshot_id, StatusCode* status) {
uint64_t TabletIO::GetRollback(uint64_t snapshot_id, StatusCode* status) {
uint64_t sequence;
{
MutexLock lock(&m_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/io/tablet_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class TabletIO {
bool ReleaseSnapshot(uint64_t snapshot_id, StatusCode* status = NULL);
void ListSnapshot(std::vector<uint64_t>* snapshot_id);

uint64_t Rollback(uint64_t snapshot_id, StatusCode* status);
uint64_t GetRollback(uint64_t snapshot_id, StatusCode* status);

uint32_t GetLGidByCFName(const std::string& cfname);

Expand Down
317 changes: 266 additions & 51 deletions src/master/master_impl.cc

Large diffs are not rendered by default.

20 changes: 16 additions & 4 deletions src/master/master_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ class MasterImpl {
google::protobuf::Closure* done;
TablePtr table;
std::vector<TabletPtr> tablets;
std::vector<bool> retry;
std::vector<uint64_t> rollback_points;
int task_num;
int finish_num;
mutable Mutex mutex;
bool aborted;
};

void SafeModeCmdCtrl(const CmdCtrlRequest* request,
Expand Down Expand Up @@ -297,22 +297,34 @@ class MasterImpl {
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code);
void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout,
RollbackClosure* done);
void AddDefaultRollbackCallback(TablePtr table, std::vector<TabletPtr> tablets,
int32_t retry_times, const RollbackRequest* rpc_request,
RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done,
WriteTabletRequest* request, WriteTabletResponse* response,
bool failed, int error_code);
void ApplyRollbackTask(RollbackTask* task);
void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id,
int32_t timeout, RollbackClosure* done);
void RollbackCallback(int32_t tablet_id, RollbackTask* task,
SnapshotRollbackRequest* master_request,
SnapshotRollbackResponse* master_response,
bool failed, int error_code);
void AddRollbackCallback(TablePtr table,
std::vector<TabletPtr> tablets,
bool no_more_retry,
int32_t retry_times,
const RollbackRequest* rpc_request,
RollbackResponse* rpc_response,
google::protobuf::Closure* rpc_done,
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code);

void LoadRollbackCallback(TabletPtr tablet, int32_t retry_times,
LoadTabletResponse* rpc_response,
WriteTabletRequest* request,
WriteTabletResponse* response,
bool failed, int error_code);
void RestoreRollback();
void ScheduleQueryTabletNode();
void QueryTabletNode();
void QueryTabletNodeAsync(std::string addr, int32_t timeout,
Expand Down
113 changes: 105 additions & 8 deletions src/master/tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ void Tablet::DelSnapshot(int32_t id) {
snapshot_list->RemoveLast();
}

int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) {
int32_t Tablet::AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) {
MutexLock lock(&m_mutex);
Rollback rollback;
rollback.set_name(name);
Expand All @@ -358,11 +358,47 @@ int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rol
return m_meta.rollbacks_size() - 1;
}

int32_t Tablet::DelRollback(const std::string& name) {
MutexLock lock(&m_mutex);
int32_t rollback_size = m_meta.rollbacks_size();
bool rollback_exist = false;
for (int32_t i = 0; i < rollback_size; ++i) {
if (m_meta.rollbacks(i).name() == name) {
VLOG(10) << "delete tablet rollback " << m_meta.rollbacks(i).ShortDebugString();
m_meta.mutable_rollbacks()->SwapElements(i, rollback_size - 1);
m_meta.mutable_rollbacks()->RemoveLast();
rollback_exist = true;
break;
}
}
assert(rollback_exist);
return m_meta.rollbacks_size() - 1;
}

void Tablet::ListRollback(std::vector<Rollback>* rollbacks) {
MutexLock lock(&m_mutex);
for (int i = 0; i < m_meta.rollbacks_size(); i++) {
rollbacks->push_back(m_meta.rollbacks(i));
VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString();
}
}

int32_t Tablet::UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) {
MutexLock lock(&m_mutex);
bool has_rollback_name = false;
for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) {
Rollback* cur_rollback = m_meta.mutable_rollbacks(i);
if (cur_rollback->name() == name) {
has_rollback_name = true;
assert(cur_rollback->snapshot_id() == snapshot_id);
cur_rollback->set_rollback_point(rollback_point);
}
}
for (int i = 0; i < m_meta.rollbacks_size(); i++) {
VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString();
}
assert(has_rollback_name);
return m_meta.rollbacks_size() - 1;
}

bool Tablet::IsBound() {
Expand Down Expand Up @@ -632,15 +668,75 @@ void Table::ListSnapshot(std::vector<uint64_t>* snapshots) {
*snapshots = m_snapshot_list;
}

int32_t Table::AddRollback(std::string rollback_name) {
int32_t Table::AddRollback(const std::string& rollback_name) {
MutexLock lock(&m_mutex);
m_rollback_names.push_back(rollback_name);
return m_rollback_names.size() - 1;
std::map<std::string, bool>::iterator it = m_rollbacks.find(rollback_name);
assert(it == m_rollbacks.end());
m_rollbacks[rollback_name] = false;
return m_rollbacks.size() - 1;
}

int32_t Table::DelRollback(const std::string& rollback_name) {
MutexLock lock(&m_mutex);
std::map<std::string, bool>::iterator it = m_rollbacks.find(rollback_name);
assert(it != m_rollbacks.end());
m_rollbacks.erase(it);
return m_rollbacks.size() - 1;
}

void Table::ListRollback(std::vector<std::string>* rollback_names) {
MutexLock lock(&m_mutex);
*rollback_names = m_rollback_names;
std::map<std::string, bool>::iterator it = m_rollbacks.begin();
for (; it != m_rollbacks.end(); ++it) {
rollback_names->push_back(it->first);
}
}

void Table::GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done) {
MutexLock lock(&m_mutex);
std::map<std::string, bool>::iterator it = m_rollbacks.find(rollback_name);
if (it == m_rollbacks.end()) {
*exists = false;
*done = false;
} else {
*exists = true;
*done = it->second;
}
}

void Table::SetRollbackStatus(const std::string& rollback_name, bool done) {
MutexLock lock(&m_mutex);
std::map<std::string, bool>::iterator it = m_rollbacks.find(rollback_name);
assert(it != m_rollbacks.end());
it->second = done;
}

void Table::GetRollbackTablets(std::vector<std::pair<Rollback, std::vector<TabletPtr> > >* rollback_tablets) {
MutexLock lock(&m_mutex);
for (std::map<std::string, bool>::iterator it = m_rollbacks.begin();
it != m_rollbacks.end(); ++it) {
if (!it->second) {
VLOG(6) << "rollback " << it->first << " marked undone.";
std::vector<TabletPtr> tablets;
Rollback rollback;
for (TabletList::iterator tablet_it = m_tablets_list.begin(); tablet_it != m_tablets_list.end();
++tablet_it) {
TabletPtr cur_tablet = tablet_it->second;
for (int32_t ri = 0; ri < cur_tablet->m_meta.rollbacks_size(); ++ri) {
if (cur_tablet->m_meta.rollbacks(ri).name() == it->first) {
rollback.CopyFrom(cur_tablet->m_meta.rollbacks(ri));
tablets.push_back(cur_tablet);
}
}
}
if (tablets.empty()) {
it->second = true;
VLOG(6) << "rollback " << it->first << " already done.";
} else {
rollback_tablets->push_back(std::make_pair(rollback, tablets));
}
}
}
}

void Table::AddDeleteTabletCount() {
Expand Down Expand Up @@ -672,8 +768,9 @@ void Table::ToMeta(TableMeta* meta) {
for (size_t i = 0; i < m_snapshot_list.size(); i++) {
meta->add_snapshot_list(m_snapshot_list[i]);
}
for (size_t i = 0; i < m_rollback_names.size(); ++i) {
meta->add_rollback_names(m_rollback_names[i]);
std::map<std::string, bool>::iterator it = m_rollbacks.begin();
for (; it != m_rollbacks.end(); ++it) {
meta->add_rollback_names(it->first);
}
}

Expand Down Expand Up @@ -769,7 +866,7 @@ bool TabletManager::AddTable(const std::string& table_name,
LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i);
}
for (int32_t i = 0; i < meta.rollback_names_size(); ++i) {
(*table)->m_rollback_names.push_back(meta.rollback_names(i));
(*table)->m_rollbacks[meta.rollback_names(i)] = false;
LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i);
}
(*table)->m_mutex.Unlock();
Expand Down
13 changes: 10 additions & 3 deletions src/master/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ class Tablet {
int32_t AddSnapshot(uint64_t snapshot);
void ListSnapshot(std::vector<uint64_t>* snapshot);
void DelSnapshot(int32_t id);
int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point);
int32_t AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point);
int32_t DelRollback(const std::string& name);
int32_t UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point);
void ListRollback(std::vector<Rollback>* rollbacks);

// is belong to a table?
Expand Down Expand Up @@ -175,8 +177,13 @@ class Table {
int32_t AddSnapshot(uint64_t snapshot);
int32_t DelSnapshot(uint64_t snapshot);
void ListSnapshot(std::vector<uint64_t>* snapshots);
int32_t AddRollback(std::string rollback_name);
int32_t AddRollback(const std::string& rollback_name);
int32_t DelRollback(const std::string& rollback_name);
void ListRollback(std::vector<std::string>* rollback_names);
int32_t GetRollbackSize() {return m_rollbacks.size() - 1;}
void GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done);
void SetRollbackStatus(const std::string& rollback_name, bool done);
void GetRollbackTablets(std::vector<std::pair<Rollback, std::vector<TabletPtr> > >* rollback_tablets);
void AddDeleteTabletCount();
bool NeedDelete();
void ToMetaTableKeyValue(std::string* packed_key = NULL,
Expand All @@ -195,7 +202,7 @@ class Table {
std::string m_name;
TableSchema m_schema;
std::vector<uint64_t> m_snapshot_list;
std::vector<std::string> m_rollback_names;
std::map<std::string, bool> m_rollbacks;
TableStatus m_status;
uint32_t m_deleted_tablet_num;
uint64_t m_max_tablet_no;
Expand Down
5 changes: 3 additions & 2 deletions src/proto/master_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ message RollbackRequest {
}

message RollbackResponse {
optional StatusCode status = 1;
optional uint64 sequence_id = 2;
optional uint64 sequence_id = 1;
optional StatusCode status = 2;
optional bool done = 3;
}

// admin
Expand Down
1 change: 1 addition & 0 deletions src/proto/tabletnode_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message LoadTabletRequest {
message LoadTabletResponse {
required StatusCode status = 1;
required uint64 sequence_id = 2;
repeated Rollback rollbacks = 3;
}

message UnloadTabletRequest {
Expand Down
9 changes: 7 additions & 2 deletions src/sdk/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,13 @@ bool ClientImpl::Rollback(const string& name, uint64_t snapshot,

if (master_client.GetRollback(&request, &response)) {
if (response.status() == kMasterOk) {
std::cout << name << " rollback to snapshot sucessfully" << std::endl;
return true;
if (response.done()) {
std::cout << name << " rollback to snapshot sucessfully" << std::endl;
return true;
} else {
std::cout << name << " rollback has not complete yet" << std::endl;
return false;
}
}
}
err->SetFailed(ErrorCode::kSystem, StatusCodeToString(response.status()));
Expand Down
2 changes: 1 addition & 1 deletion src/tabletnode/remote_tabletnode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void RemoteTabletNode::DoRollback(google::protobuf::RpcController* controller,
google::protobuf::Closure* done) {
uint64_t id = request->sequence_id();
LOG(INFO) << "accept RPC (Rollback) id: " << id;
m_tabletnode_impl->Rollback(request, response, done);
m_tabletnode_impl->GetRollback(request, response, done);
LOG(INFO) << "finish RPC (Rollback) id: " << id;
}

Expand Down
27 changes: 18 additions & 9 deletions src/tabletnode/tabletnode_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,8 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request,
snapshots[request->snapshots_id(i)] = request->snapshots_sequence(i);
}

// to recover rollbacks
//////////// TODO ///////////////////
std::map<uint64_t, uint64_t> rollbacks;
int32_t num_of_rollbacks = request->rollbacks_size();
for (int32_t i = 0; i < num_of_rollbacks; ++i) {
rollbacks[request->rollbacks(i).snapshot_id()] = request->rollbacks(i).rollback_point();
VLOG(10) << "load tablet with rollback " << request->rollbacks(i).snapshot_id()
<< "-" << request->rollbacks(i).rollback_point();
}

LOG(INFO) << "start load tablet, id: " << request->sequence_id()
<< ", table: " << request->tablet_name()
Expand Down Expand Up @@ -290,6 +284,21 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request,
tablet_io->DecRef();
response->set_status(kTabletNodeOk);
}
// recover rollbacks
int32_t num_of_rollbacks = request->rollbacks_size();
for (int32_t i = 0; i < num_of_rollbacks; ++i) {
Rollback cur_rollback = request->rollbacks(i);
uint64_t rollback_point = tablet_io->GetRollback(cur_rollback.snapshot_id(), &status);
VLOG(10) << "recover rollback: " << cur_rollback.ShortDebugString();
if (cur_rollback.rollback_point() == leveldb::kMaxSequenceNumber) {
Rollback rollback;
rollback.set_name(cur_rollback.name());
rollback.set_snapshot_id(cur_rollback.snapshot_id());
rollback.set_rollback_point(rollback_point);
response->add_rollbacks()->CopyFrom(rollback);
VLOG(10) << "new rollback: " << rollback.ShortDebugString();
}
}

LOG(INFO) << "load tablet: " << request->path() << " ["
<< DebugString(key_start) << ", " << DebugString(key_end) << "]";
Expand Down Expand Up @@ -601,7 +610,7 @@ void TabletNodeImpl::ReleaseSnapshot(const ReleaseSnapshotRequest* request,
done->Run();
}

void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response,
void TabletNodeImpl::GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response,
google::protobuf::Closure* done) {
StatusCode status = kTabletNodeOk;
io::TabletIO* tablet_io = m_tablet_manager->GetTablet(request->table_name(),
Expand All @@ -617,7 +626,7 @@ void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRo
done->Run();
return;
}
uint64_t rollback_point = tablet_io->Rollback(request->snapshot_id(), &status);
uint64_t rollback_point = tablet_io->GetRollback(request->snapshot_id(), &status);
if (status != kTabletNodeOk) {
response->set_status(status);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/tabletnode/tabletnode_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TabletNodeImpl {
ReleaseSnapshotResponse* response,
google::protobuf::Closure* done);

void Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response,
void GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response,
google::protobuf::Closure* done);

void Query(const QueryRequest* request, QueryResponse* response,
Expand Down
2 changes: 1 addition & 1 deletion src/teracli_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ int32_t DropOp(Client* client, int32_t argc, char** argv, ErrorCode* err) {

std::string tablename = argv[2];
if (!client->DeleteTable(tablename, err)) {
LOG(ERROR) << "fail to delete table, " << err->GetReason();
LOG(ERROR) << "fail to delete table";
return -1;
}
return 0;
Expand Down
4 changes: 3 additions & 1 deletion test/testcase/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def create_kv_table():
ret = subprocess.Popen(const.teracli_binary + ' create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
print ''.join(ret.stdout.readlines())
print ''.join(ret.stderr.readlines())

ret = parse_showinfo()
print ret


def create_singleversion_table():
print 'create single version table'
Expand Down
Loading