From 80922105b815f6da0d2ba438f9e17ecc1276663d Mon Sep 17 00:00:00 2001 From: lylei Date: Mon, 19 Oct 2015 17:21:41 +0800 Subject: [PATCH 1/7] issue=#324 rollback write meta with defualt value first --- src/master/master_impl.cc | 78 +++++++++++++++++++++++++++++++----- src/master/master_impl.h | 10 +++-- src/master/tablet_manager.cc | 15 +++++++ src/master/tablet_manager.h | 2 + 4 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 1020f2103..aca594909 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -3003,15 +3003,71 @@ void MasterImpl::GetRollback(const RollbackRequest* request, return; } + std::vector tablets; + table->GetTablet(&tablets); + // write memory and meta with default rollback_point + int sid = table->AddRollback(request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->AddRollback(request->rollback_name(), request->snapshot_id(), + leveldb::kMaxSequenceNumber); + assert(sid == tsid); + } + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, tablets, + FLAGS_tera_master_meta_retry_times, request, response, done); + BatchWriteMetaTableAsync(table, tablets, false, closure); +} + +void MasterImpl::AddDefaultRollbackCallback(TablePtr table, + std::vector tablets, + int32_t retry_times, + const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, + google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + // all the row status should be the same + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to write rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablets[0] << "..."; + } else { + LOG(WARNING) << "fail to write rollback to meta: " + << StatusCodeToString(status) << ", " << tablets[0] << "..."; + } + if (retry_times <= 0) { + rpc_response->set_status(kMetaTabletError); + rpc_done->Run(); + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, + tablets, retry_times - 1, rpc_request, rpc_response, + rpc_done); + SuspendMetaOperation(table, tablets, false, done); + } + return; + } + LOG(INFO) << "Add default rollback " << rpc_request->rollback_name() << " to " + << rpc_request->table_name() << " done"; + RollbackTask* task = new RollbackTask; table->GetTablet(&task->tablets); assert(task->tablets.size()); task->rollback_points.resize(task->tablets.size()); - task->request = request; - task->response = response; - task->done = done; + task->request = rpc_request; + task->response = rpc_response; + task->done = rpc_done; task->table = table; task->task_num = 0; task->finish_num = 0; @@ -3022,13 +3078,13 @@ void MasterImpl::GetRollback(const RollbackRequest* request, ++task->task_num; RollbackClosure* closure = NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); - RollbackAsync(tablet, request->snapshot_id(), 3000, closure); + RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure); } if (task->task_num == 0) { - LOG(WARNING) << "fail to rollback to snapshot: " << request->table_name() + LOG(WARNING) << "fail to rollback to snapshot: " << rpc_request->table_name() << ", all tables kTabletNodeOffLine"; response->set_status(kTabletNodeOffLine); - done->Run(); + rpc_done->Run(); return; } } @@ -3076,11 +3132,11 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, } else { task->rollback_points[tablet_id] = master_response->rollback_point(); LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->AddRollback(task->request->rollback_name()); + int sid = task->table->GetRollbackSize(); for (uint32_t i = 0; i < task->tablets.size(); ++i) { - int tsid = task->tablets[i]->AddRollback(task->request->rollback_name(), - master_request->snapshot_id(), - task->rollback_points[i]); + int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(), + master_request->snapshot_id(), + task->rollback_points[i]); assert(sid == tsid); } WriteClosure* closure = @@ -3133,7 +3189,7 @@ void MasterImpl::AddRollbackCallback(TablePtr table, return; } LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() - << ", write meta " << rpc_request->snapshot_id() << " done"; + << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; rpc_response->set_status(kMasterOk); rpc_done->Run(); } diff --git a/src/master/master_impl.h b/src/master/master_impl.h index 1629edae4..967f58143 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -297,8 +297,13 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout, - RollbackClosure* done); + void AddDefaultRollbackCallback(TablePtr table, std::vector tablets, + int32_t retry_times, const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, WriteTabletResponse* response, + bool failed, int error_code); + void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, + int32_t timeout, RollbackClosure* done); void RollbackCallback(int32_t tablet_id, RollbackTask* task, SnapshotRollbackRequest* master_request, SnapshotRollbackResponse* master_response, @@ -312,7 +317,6 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void ScheduleQueryTabletNode(); void QueryTabletNode(); void QueryTabletNodeAsync(std::string addr, int32_t timeout, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index e4f51736c..74f2f8e28 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -365,6 +365,21 @@ void Tablet::ListRollback(std::vector* rollbacks) { } } +int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { + MutexLock lock(&m_mutex); + bool has_rollback_name = false; + for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { + Rollback cur_rollback = m_meta.rollbacks(i); + if (cur_rollback.name() == name) { + has_rollback_name = true; + assert(cur_rollback.snapshot_id() == snapshot_id); + cur_rollback.set_rollback_point(rollback_point); + } + } + assert(has_rollback_name); + return m_meta.rollbacks_size() - 1; +} + bool Tablet::IsBound() { TablePtr null_ptr; if (m_table != null_ptr) { diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index d1aee8091..bc752a28f 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -107,6 +107,7 @@ class Tablet { void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); void ListRollback(std::vector* rollbacks); // is belong to a table? @@ -177,6 +178,7 @@ class Table { void ListSnapshot(std::vector* snapshots); int32_t AddRollback(std::string rollback_name); void ListRollback(std::vector* rollback_names); + int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, From 50050466571ebb9074c97332fed686630ac5df62 Mon Sep 17 00:00:00 2001 From: lylei Date: Tue, 20 Oct 2015 16:50:50 +0800 Subject: [PATCH 2/7] issue=#382 add rollback cases --- test/testcase/common.py | 15 +++++++++++++++ test/testcase/conf.py | 1 + 2 files changed, 16 insertions(+) diff --git a/test/testcase/common.py b/test/testcase/common.py index 973c39246..ea19fe1e4 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -100,6 +100,21 @@ def create_multiversion_table(): print ''.join(ret.stdout.readlines()) +def createbyfile(schema, deli=''): + """ + This function creates a table according to a specified schema + :param schema: schema file path + :param deli: deli file path + :return: None + """ + + cleanup() + create_cmd = '{teracli} createbyfile {schema} {deli}'.format(teracli=const.teracli_binary, schema=schema, deli=deli) + print create_cmd + ret = subprocess.Popen(create_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + + def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, cf='', key_seed=1, value_seed=1): """ This function provide means to write data into Tera and dump a copy into a specified file at the same time. diff --git a/test/testcase/conf.py b/test/testcase/conf.py index cc0f0a639..d5fbeee18 100644 --- a/test/testcase/conf.py +++ b/test/testcase/conf.py @@ -12,5 +12,6 @@ def __init__(self): self.teracli_binary = './teracli' self.kill_script = './kill_tera.sh' self.launch_script = './launch_tera.sh' + self.data_path = 'testcase/data/' const = Const() From 2dfec7287f0d4d819e50a55a33b15e6499faa31f Mon Sep 17 00:00:00 2001 From: lylei Date: Wed, 21 Oct 2015 14:56:47 +0800 Subject: [PATCH 3/7] issue=#382 add rollback cases --- test/testcase/test_rollback.py | 151 +++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 test/testcase/test_rollback.py diff --git a/test/testcase/test_rollback.py b/test/testcase/test_rollback.py new file mode 100644 index 000000000..d05f0f877 --- /dev/null +++ b/test/testcase/test_rollback.py @@ -0,0 +1,151 @@ +""" +Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. +""" + +import nose +import time + +import common +from conf import const + + +@nose.tools.with_setup(common.create_kv_table, common.cleanup) +def test_rollback_kv(): + """ + test kv rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table(): + """ + test table rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + +@nose.tools.with_setup(common.create_kv_table) +def test_rollback_kv_relaunch(): + """ + test kv rollback w/relaunch + 1. test_rollback_kv() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_kv() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table_relaunch(): + """ + test table rollback w/relaunch + 1. test_rollback_table() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_table() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_kv_multitablets(): + """ + test kv rollback w/multi tablets + 1. test_rollback_kv_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'kv.schema', deli=const.data_path + 'deli.10') + test_rollback_kv_relaunch() + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_table_multitablets(): + """ + test table rollback w/multi tablets + 1. test_rollback_table_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'table.schema', deli=const.data_path + 'deli.10') + test_rollback_table_relaunch() From 8f65d2f3fb3a1679bef2196d3f705b9d9cb62383 Mon Sep 17 00:00:00 2001 From: lylei Date: Wed, 21 Oct 2015 17:25:47 +0800 Subject: [PATCH 4/7] issue=#324 rollback when tablet not ready --- src/master/master_impl.cc | 15 +++++++++++++++ src/master/tablet_manager.cc | 26 ++++++++++++++++++++++++++ src/master/tablet_manager.h | 1 + src/proto/master_rpc.proto | 5 +++-- src/sdk/client_impl.cc | 9 +++++++-- 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 4f5e39670..b75c6f271 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -3004,6 +3004,20 @@ void MasterImpl::GetRollback(const RollbackRequest* request, return; } + bool rollback_exist, rollback_done; + table->GetRollbackStatus(request->rollback_name(), &rollback_exist, &rollback_done); + if (rollback_exist) { + if (rollback_done) { + LOG(INFO) << "rollback " << request->rollback_name() << " already exists and done"; + response->set_done(true); + done->Run(); + return; + } else { + LOG(INFO) << "rollback " << request->rollback_name() + << " already exists but has not complete yet"; + } + } + std::vector tablets; table->GetTablet(&tablets); // write memory and meta with default rollback_point @@ -3192,6 +3206,7 @@ void MasterImpl::AddRollbackCallback(TablePtr table, LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; rpc_response->set_status(kMasterOk); + rpc_response->set_done(true); rpc_done->Run(); } diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 4165ba721..17890847d 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -658,6 +658,32 @@ void Table::ListRollback(std::vector* rollback_names) { *rollback_names = m_rollback_names; } +void Table::GetRollbackStatus(std::string rollback_name, bool* exists, bool* done) { + MutexLock lock(&m_mutex); + std::vector::iterator it = + std::find(m_rollback_names.begin(), m_rollback_names.end(), rollback_name); + if (it == m_rollback_names.end()) { + *exists = false; + *done = false; + } else { + *exists = true; + Table::TabletList::iterator it = m_tablets_list.begin(); + for (; it != m_tablets_list.end(); ++it) { + TabletPtr tablet = it->second; + int32_t rollback_size = tablet->m_meta.rollbacks_size(); + for (int32_t i = 0; i < rollback_size; ++i) { + if (tablet->m_meta.rollbacks(i).name() == rollback_name && + tablet->m_meta.rollbacks(i).rollback_point() == leveldb::kMaxSequenceNumber) { + LOG(INFO) << "rollabck tablet " << tablet->m_meta.path() << " has not complete yet"; + *done = false; + return; + } + } + } + *done = true; + } +} + void Table::AddDeleteTabletCount() { MutexLock lock(&m_mutex); m_deleted_tablet_num++; diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index bc752a28f..68074bd86 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -179,6 +179,7 @@ class Table { int32_t AddRollback(std::string rollback_name); void ListRollback(std::vector* rollback_names); int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} + void GetRollbackStatus(std::string rollbaack_name, bool* exists, bool* done); void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, diff --git a/src/proto/master_rpc.proto b/src/proto/master_rpc.proto index 1674a33f8..2a46d83d0 100644 --- a/src/proto/master_rpc.proto +++ b/src/proto/master_rpc.proto @@ -203,8 +203,9 @@ message RollbackRequest { } message RollbackResponse { - optional StatusCode status = 1; - optional uint64 sequence_id = 2; + optional uint64 sequence_id = 1; + optional StatusCode status = 2; + optional bool done = 3; } // admin diff --git a/src/sdk/client_impl.cc b/src/sdk/client_impl.cc index 3d6f83d88..8be52ca26 100644 --- a/src/sdk/client_impl.cc +++ b/src/sdk/client_impl.cc @@ -824,8 +824,13 @@ bool ClientImpl::Rollback(const string& name, uint64_t snapshot, if (master_client.GetRollback(&request, &response)) { if (response.status() == kMasterOk) { - std::cout << name << " rollback to snapshot sucessfully" << std::endl; - return true; + if (response.done()) { + std::cout << name << " rollback to snapshot sucessfully" << std::endl; + return true; + } else { + std::cout << name << " rollback has not complete yet" << std::endl; + return false; + } } } err->SetFailed(ErrorCode::kSystem, StatusCodeToString(response.status())); From 0320d44d8a51870fefe347df019eb8161305657e Mon Sep 17 00:00:00 2001 From: lylei Date: Fri, 23 Oct 2015 17:18:42 +0800 Subject: [PATCH 5/7] issue=#324 rollback retry --- ft_test.sh | 2 +- src/master/master_impl.cc | 100 ++++++++++++++++++++++------------- src/master/master_impl.h | 4 ++ src/master/tablet_manager.cc | 27 ++++++++-- src/master/tablet_manager.h | 1 + src/teracli_main.cc | 26 ++++++--- test/testcase/common.py | 19 ++----- 7 files changed, 114 insertions(+), 65 deletions(-) diff --git a/ft_test.sh b/ft_test.sh index 76ed801a8..83c40e2d3 100644 --- a/ft_test.sh +++ b/ft_test.sh @@ -17,7 +17,7 @@ sh launch_tera.sh sleep 2 export PYTHONPATH=$PYTHONPATH:../../thirdparty/include/; export PATH=$PATH:../../thirdparty/bin/ -nosetests -s -v > ../log/test.log +nosetests -s -v testcase/test_rollback.py> ../log/test.log sh kill_tera.sh diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index b75c6f271..42bfd13a3 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -3007,6 +3007,7 @@ void MasterImpl::GetRollback(const RollbackRequest* request, bool rollback_exist, rollback_done; table->GetRollbackStatus(request->rollback_name(), &rollback_exist, &rollback_done); if (rollback_exist) { + response->set_status(kMasterOk); if (rollback_done) { LOG(INFO) << "rollback " << request->rollback_name() << " already exists and done"; response->set_done(true); @@ -3015,6 +3016,9 @@ void MasterImpl::GetRollback(const RollbackRequest* request, } else { LOG(INFO) << "rollback " << request->rollback_name() << " already exists but has not complete yet"; + response->set_done(false); + done->Run(); + return; } } @@ -3075,11 +3079,25 @@ void MasterImpl::AddDefaultRollbackCallback(TablePtr table, << rpc_request->table_name() << " done"; RollbackTask* task = new RollbackTask; - table->GetTablet(&task->tablets); - + std::vector rollback_tablets; + table->GetRollbackTablets(rpc_request->rollback_name(), &rollback_tablets); + ApplyRollbackTask(table, rollback_tablets, rpc_request, rpc_response, rpc_done, task); + +} + +void MasterImpl::ApplyRollbackTask(TablePtr table, const std::vector& tablets, + const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, + google::protobuf::Closure* rpc_done, + RollbackTask* task) { + // table->GetRollbackTablets(rpc_request->rollback_name(), &task->tablets); + task->tablets = tablets; assert(task->tablets.size()); - - task->rollback_points.resize(task->tablets.size()); + if (task->tablets.size() == 0) { + ///////////////// TODO /////////////////// + } + task->rollback_points.resize(task->tablets.size(), leveldb::kMaxSequenceNumber); + task->retry.resize(task->tablets.size(), false); task->request = rpc_request; task->response = rpc_response; task->done = rpc_done; @@ -3087,20 +3105,22 @@ void MasterImpl::AddDefaultRollbackCallback(TablePtr table, task->task_num = 0; task->finish_num = 0; task->aborted = false; + MutexLock lock(&task->mutex); for (uint32_t i = 0; i < task->tablets.size(); ++i) { TabletPtr tablet = task->tablets[i]; ++task->task_num; - RollbackClosure* closure = - NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); - RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure); - } - if (task->task_num == 0) { - LOG(WARNING) << "fail to rollback to snapshot: " << rpc_request->table_name() - << ", all tables kTabletNodeOffLine"; - response->set_status(kTabletNodeOffLine); - rpc_done->Run(); - return; + TabletStatus status = tablet->GetStatus(); + if (status == kTableReady) { + RollbackClosure* closure = + NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); + RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure); + } else if (status == kTableOnLoad) { + task->retry[i] = true; + RollbackCallback(i, task, NULL, NULL, false, 0); + } else { + RollbackCallback(i, task, NULL, NULL, false, 0); + } } } @@ -3117,7 +3137,7 @@ void MasterImpl::RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, request->mutable_key_range()->set_key_start(tablet->GetKeyStart()); request->mutable_key_range()->set_key_end(tablet->GetKeyEnd()); - LOG(INFO) << "RollbackAsync id: " << request->sequence_id() << ", " + VLOG(10) << "RollbackAsync id: " << request->sequence_id() << ", " << "server: " << addr; node_client.Rollback(request, response, done); } @@ -3126,41 +3146,47 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, SnapshotRollbackRequest* master_request, SnapshotRollbackResponse* master_response, bool failed, int error_code) { - MutexLock lock(&task->mutex); + task->mutex.Lock(); ++task->finish_num; - VLOG(6) << "MasterImpl Rollback id= " << tablet_id - << " finish_num= " << task->finish_num - << ". Return " << master_response->rollback_point(); if (task->finish_num != task->task_num) { - if (!failed && master_response->status() == kTabletNodeOk) { - task->rollback_points[tablet_id] = master_response->rollback_point(); + if (!failed && master_response && master_response->status() == kTabletNodeOk) { + task->rollback_points[tablet_id] = master_response->rollback_point(); + VLOG(6) << "MasterImpl Rollback id= " << tablet_id + << " finish_num= " << task->finish_num + << ". Return " << master_response->rollback_point(); } else { - task->aborted = true; + // rpc failed or tablet not ready, do not update rollback_point } return; } - if (failed || task->aborted) { - LOG(WARNING) << "MasterImpl Rollback fail done"; - task->response->set_status(kTabletNodeOffLine); - task->done->Run(); - } else { + if (!failed && master_response && master_response->status() == kTabletNodeOk) { task->rollback_points[tablet_id] = master_response->rollback_point(); - LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->GetRollbackSize(); - for (uint32_t i = 0; i < task->tablets.size(); ++i) { + } else { + // rpc failed or tablet not ready, do not update rollback_point + } + std::vector retry_tablets; + int sid = task->table->GetRollbackSize(); + for (uint32_t i = 0; i < task->tablets.size(); ++i) { + // update only if the tablet was ready + if (!task->retry[i]) { int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(), - master_request->snapshot_id(), + task->request->snapshot_id(), task->rollback_points[i]); assert(sid == tsid); + } else { + retry_tablets.push_back(task->tablets[i]); } - WriteClosure* closure = - NewClosure(this, &MasterImpl::AddRollbackCallback, - task->table, task->tablets, - FLAGS_tera_master_meta_retry_times, - task->request, task->response, task->done); - BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); } + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddRollbackCallback, + task->table, task->tablets, + FLAGS_tera_master_meta_retry_times, + task->request, task->response, task->done); + BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); + + /////////////// TODO 重试逻辑 ///////////////////// + LOG(INFO) << "MasterImpl rollback all tablet done"; task->mutex.Unlock(); delete task; } diff --git a/src/master/master_impl.h b/src/master/master_impl.h index 971a7901b..558c680ed 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -199,6 +199,7 @@ class MasterImpl { google::protobuf::Closure* done; TablePtr table; std::vector tablets; + std::vector retry; std::vector rollback_points; int task_num; int finish_num; @@ -302,6 +303,9 @@ class MasterImpl { RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); + void ApplyRollbackTask(TablePtr table, const std::vector& tablets, const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, + RollbackTask* task); void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout, RollbackClosure* done); void RollbackCallback(int32_t tablet_id, RollbackTask* task, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 17890847d..de1aa4f16 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -362,6 +362,7 @@ void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); for (int i = 0; i < m_meta.rollbacks_size(); i++) { rollbacks->push_back(m_meta.rollbacks(i)); + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); } } @@ -369,13 +370,16 @@ int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t MutexLock lock(&m_mutex); bool has_rollback_name = false; for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { - Rollback cur_rollback = m_meta.rollbacks(i); - if (cur_rollback.name() == name) { + Rollback* cur_rollback = m_meta.mutable_rollbacks(i); + if (cur_rollback->name() == name) { has_rollback_name = true; - assert(cur_rollback.snapshot_id() == snapshot_id); - cur_rollback.set_rollback_point(rollback_point); + assert(cur_rollback->snapshot_id() == snapshot_id); + cur_rollback->set_rollback_point(rollback_point); } } + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); + } assert(has_rollback_name); return m_meta.rollbacks_size() - 1; } @@ -684,6 +688,21 @@ void Table::GetRollbackStatus(std::string rollback_name, bool* exists, bool* don } } +void Table::GetRollbackTablets(const std::string& rollback_name, std::vector* tablet_list) { + MutexLock lock(&m_mutex); + Table::TabletList::iterator it = m_tablets_list.begin(); + for (; it != m_tablets_list.end(); ++it) { + TabletPtr tablet = it->second; + int32_t rollback_size = tablet->m_meta.rollbacks_size(); + for (int32_t i = 0; i < rollback_size; ++i) { + if (tablet->m_meta.rollbacks(i).name() == rollback_name && + tablet->m_meta.rollbacks(i).rollback_point() == leveldb::kMaxSequenceNumber) { + tablet_list->push_back(tablet); + } + } + } +} + void Table::AddDeleteTabletCount() { MutexLock lock(&m_mutex); m_deleted_tablet_num++; diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index 68074bd86..493cc13b5 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -180,6 +180,7 @@ class Table { void ListRollback(std::vector* rollback_names); int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} void GetRollbackStatus(std::string rollbaack_name, bool* exists, bool* done); + void GetRollbackTablets(const std::string& rollback_name, std::vector* tablet_list); void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, diff --git a/src/teracli_main.cc b/src/teracli_main.cc index 818794282..305f0e447 100644 --- a/src/teracli_main.cc +++ b/src/teracli_main.cc @@ -45,7 +45,7 @@ DEFINE_bool(tera_client_scan_async_enabled, false, "enable the streaming scan mo DEFINE_int64(scan_pack_interval, 5000, "scan timeout"); DEFINE_int64(snapshot, 0, "read | scan snapshot"); -DEFINE_string(rollback_switch, "close", "Pandora's box, do not open"); +DEFINE_string(rollback_switch, "open", "Pandora's box, do not open"); DEFINE_string(rollback_name, "", "rollback operation's name"); volatile int32_t g_start_time = 0; @@ -292,7 +292,7 @@ int32_t DropOp(Client* client, int32_t argc, char** argv, ErrorCode* err) { std::string tablename = argv[2]; if (!client->DeleteTable(tablename, err)) { - LOG(ERROR) << "fail to delete table, " << err->GetReason(); + LOG(ERROR) << "fail to delete table"; return -1; } return 0; @@ -2097,6 +2097,12 @@ int32_t Meta2Op(Client *client, int32_t argc, char** argv) { const tera::TableMeta& meta = table_list.meta(i); if (op == "show") { std::cout << "table: " << meta.table_name() << std::endl; + std::cout << " rollbacks: "; + int32_t rollback_num = meta.rollback_names_size(); + for (int32_t ri = 0; ri < rollback_num; ++ri) { + std::cout << meta.rollback_names(ri) << " "; + } + std::cout << std::endl; int32_t lg_size = meta.schema().locality_groups_size(); for (int32_t lg_id = 0; lg_id < lg_size; lg_id++) { const tera::LocalityGroupSchema& lg = @@ -2134,6 +2140,12 @@ int32_t Meta2Op(Client *client, int32_t argc, char** argv) { << meta.size() << ", " << StatusCodeToString(meta.status()) << ", " << StatusCodeToString(meta.compact_status()) << std::endl; + std::cout << " rollback: "; + int32_t rollback_num = meta.rollbacks_size(); + for (int32_t ri = 0; ri < rollback_num; ++ri) { + std::cout << meta.rollbacks(ri).name() << "-" << meta.rollbacks(ri).snapshot_id() << "-" << meta.rollbacks(ri).rollback_point() << " "; + } + std::cout << std::endl; } if (op == "bak") { WriteTablet(meta, bak); @@ -2246,7 +2258,7 @@ int32_t Meta2Op(Client *client, int32_t argc, char** argv) { return 0; } -static int32_t CreateUser(Client* client, const std::string& user, +static int32_t CreateUser(Client* client, const std::string& user, const std::string& password, ErrorCode* err) { if (!client->CreateUser(user, password, err)) { LOG(ERROR) << "fail to create user: " << user @@ -2265,7 +2277,7 @@ static int32_t DeleteUser(Client* client, const std::string& user, ErrorCode* er return 0; } -static int32_t ChangePwd(Client* client, const std::string& user, +static int32_t ChangePwd(Client* client, const std::string& user, const std::string& password, ErrorCode* err) { if (!client->ChangePwd(user, password, err)) { LOG(ERROR) << "fail to update user: " << user @@ -2285,7 +2297,7 @@ static int32_t ShowUser(Client* client, const std::string& user, ErrorCode* err) if (user_infos.size() < 1) { return -1; } - std::cout << "user:" << user_infos[0] + std::cout << "user:" << user_infos[0] << "\ngroups (" << user_infos.size() - 1 << "):"; for (size_t i = 1; i < user_infos.size(); ++i) { std::cout << user_infos[i] << " "; @@ -2294,7 +2306,7 @@ static int32_t ShowUser(Client* client, const std::string& user, ErrorCode* err) return 0; } -static int32_t AddUserToGroup(Client* client, const std::string& user, +static int32_t AddUserToGroup(Client* client, const std::string& user, const std::string& group, ErrorCode* err) { if (!client->AddUserToGroup(user, group, err)) { LOG(ERROR) << "fail to add user: " << user @@ -2304,7 +2316,7 @@ static int32_t AddUserToGroup(Client* client, const std::string& user, return 0; } -static int32_t DeleteUserFromGroup(Client* client, const std::string& user, +static int32_t DeleteUserFromGroup(Client* client, const std::string& user, const std::string& group, ErrorCode* err) { if (!client->DeleteUserFromGroup(user, group, err)) { LOG(ERROR) << "fail to delete user: " << user diff --git a/test/testcase/common.py b/test/testcase/common.py index f15a33f9f..e77d381f5 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -88,7 +88,9 @@ def create_kv_table(): ret = subprocess.Popen(const.teracli_binary + ' create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) print ''.join(ret.stderr.readlines()) - + ret = parse_showinfo() + print ret + def create_singleversion_table(): print 'create single version table' @@ -124,21 +126,6 @@ def createbyfile(schema, deli=''): print ''.join(ret.stderr.readlines()) -def createbyfile(schema, deli=''): - """ - This function creates a table according to a specified schema - :param schema: schema file path - :param deli: deli file path - :return: None - """ - - cleanup() - create_cmd = '{teracli} createbyfile {schema} {deli}'.format(teracli=const.teracli_binary, schema=schema, deli=deli) - print create_cmd - ret = subprocess.Popen(create_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - print ''.join(ret.stdout.readlines()) - - def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, cf='', key_seed=1, value_seed=1): """ This function provide means to write data into Tera and dump a copy into a specified file at the same time. From df2778215cc3b47ec4952b53287f0508911d3710 Mon Sep 17 00:00:00 2001 From: lylei Date: Mon, 2 Nov 2015 14:54:37 +0800 Subject: [PATCH 6/7] issue=#324 rollback retry --- src/io/tablet_io.cc | 2 +- src/io/tablet_io.h | 2 +- src/master/master_impl.cc | 132 ++++++++++++++++++++++++---- src/master/master_impl.h | 6 ++ src/master/tablet_manager.cc | 109 +++++++++++++++-------- src/master/tablet_manager.h | 17 ++-- src/proto/tabletnode_rpc.proto | 1 + src/tabletnode/remote_tabletnode.cc | 2 +- src/tabletnode/tabletnode_impl.cc | 27 ++++-- src/tabletnode/tabletnode_impl.h | 3 +- 10 files changed, 227 insertions(+), 74 deletions(-) mode change 100755 => 100644 src/master/master_impl.cc mode change 100755 => 100644 src/master/master_impl.h diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 3f74b7bcb..88c982924 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -1730,7 +1730,7 @@ void TabletIO::ListSnapshot(std::vector* snapshot_id) { } } -uint64_t TabletIO::Rollback(uint64_t snapshot_id, StatusCode* status) { +uint64_t TabletIO::GetRollback(uint64_t snapshot_id, StatusCode* status) { uint64_t sequence; { MutexLock lock(&m_mutex); diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index 55056f286..3a2d384bc 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -152,7 +152,7 @@ class TabletIO { bool ReleaseSnapshot(uint64_t snapshot_id, StatusCode* status = NULL); void ListSnapshot(std::vector* snapshot_id); - uint64_t Rollback(uint64_t snapshot_id, StatusCode* status); + uint64_t GetRollback(uint64_t snapshot_id, StatusCode* status); uint32_t GetLGidByCFName(const std::string& cfname); diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc old mode 100755 new mode 100644 index 7b843536c..e1976faeb --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -202,6 +202,7 @@ bool MasterImpl::Restore(const std::map& tabletnode_li m_zk_adapter->UpdateRootTabletNode(meta_tablet_addr); RestoreUserTablet(tablet_list); + RestoreRollback(); // restore success m_restored = true; @@ -2413,7 +2414,7 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, CHECK(tablet->GetStatus() == kTableOnLoad); StatusCode status = response->status(); delete request; - delete response; + // delete response; const std::string& server_addr = tablet->GetServerAddr(); // server down @@ -2447,6 +2448,20 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, } ProcessReadyTablet(tablet); + // process rollback info + for (int32_t i = 0; i < response->rollbacks_size(); ++i) { + VLOG(10) << "Update rollback: tablet " << tablet->GetPath() << " " + << response->rollbacks(i).ShortDebugString(); + tablet->UpdateRollback(response->rollbacks(i).name(), + response->rollbacks(i).snapshot_id(), + response->rollbacks(i).rollback_point()); + } + WriteClosure* done = + NewClosure(this, &MasterImpl::LoadRollbackCallback, + tablet, FLAGS_tera_master_meta_retry_times, response); + BatchWriteMetaTableAsync(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), + false, done); + // load next node->FinishLoad(tablet); TabletPtr next_tablet; @@ -2504,6 +2519,42 @@ void MasterImpl::LoadTabletCallback(TabletPtr tablet, int32_t retry, FLAGS_tera_master_control_tabletnode_retry_period, task); } +void MasterImpl::LoadRollbackCallback(TabletPtr tablet, int32_t retry_times, + LoadTabletResponse* rpc_response, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to update rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablet << "..."; + } else { + LOG(WARNING) << "fail to update rollback to meta: " + << StatusCodeToString(status) << ", " << tablet << "..."; + } + if (retry_times <= 0) { + for (int32_t i = 0; i < rpc_response->rollbacks_size(); ++i) { + const Rollback& cur_rollback = rpc_response->rollbacks(i); + tablet->UpdateRollback(cur_rollback.name(), cur_rollback.snapshot_id(), + leveldb::kMaxSequenceNumber); + } + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::LoadRollbackCallback, tablet, + retry_times - 1, rpc_response); + SuspendMetaOperation(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), false, done); + } + return; + } + for (int32_t i = 0; i < rpc_response->rollbacks_size(); ++i) { + VLOG(10) << "LoadRollbackCallback " << tablet->GetPath() << ": " + << rpc_response->rollbacks(i).ShortDebugString(); + } + delete rpc_response; +} + bool MasterImpl::UnloadTabletSync(const std::string& table_name, const std::string& key_start, const std::string& key_end, @@ -3064,6 +3115,11 @@ void MasterImpl::AddDefaultRollbackCallback(TablePtr table, << StatusCodeToString(status) << ", " << tablets[0] << "..."; } if (retry_times <= 0) { + int sid = table->DelRollback(rpc_request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->DelRollback(rpc_request->rollback_name()); + assert(sid == tsid); + } rpc_response->set_status(kMetaTabletError); rpc_done->Run(); } else { @@ -3080,10 +3136,6 @@ void MasterImpl::AddDefaultRollbackCallback(TablePtr table, RollbackTask* task = new RollbackTask; table->GetTablet(&task->tablets); - assert(task->tablets.size()); - if (task->tablets.size() == 0) { - ///////////////// TODO /////////////////// - } task->rollback_points.resize(task->tablets.size(), leveldb::kMaxSequenceNumber); task->retry.resize(task->tablets.size(), false); task->request = rpc_request; @@ -3093,7 +3145,7 @@ void MasterImpl::AddDefaultRollbackCallback(TablePtr table, task->task_num = 0; task->finish_num = 0; ApplyRollbackTask(task); - + } void MasterImpl::ApplyRollbackTask(RollbackTask* task) { @@ -3180,15 +3232,11 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, } } - /////////////// TODO 重试逻辑 ///////////////////// + // retry when talets were onloading if (retry_tablets.size() != 0) { RollbackTask* retry_task = new RollbackTask; retry_task->tablets = retry_tablets; - assert(retry_task->tablets.size()); - if (retry_task->tablets.size() == 0) { - ////////////// TODO //////////////////// - } retry_task->rollback_points.resize(retry_task->tablets.size(), leveldb::kMaxSequenceNumber); retry_task->retry.resize(retry_task->tablets.size(), false); retry_task->request = task->request; @@ -3197,6 +3245,9 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, retry_task->table = task->table; retry_task->task_num = 0; retry_task->finish_num = 0; + ThreadPool::Task task = + boost::bind(&MasterImpl::ApplyRollbackTask, this, retry_task); + m_thread_pool->DelayTask(5, task); } WriteClosure* closure = @@ -3239,22 +3290,67 @@ void MasterImpl::AddRollbackCallback(TablePtr table, << StatusCodeToString(status) << ", " << tablets[0] << "..."; } if (retry_times <= 0) { + ///////// set rollback to default ///////////////// + ////////// TODO /////////// rpc_response->set_status(kMetaTabletError); - rpc_done->Run(); + if (rpc_done) { + rpc_done->Run(); + } } else { WriteClosure* done = NewClosure(this, &MasterImpl::AddRollbackCallback, table, - tablets, retry_times - 1, rpc_request, rpc_response, + tablets, no_more_retry, retry_times - 1, rpc_request, rpc_response, rpc_done); SuspendMetaOperation(table, tablets, false, done); } return; } - LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() - << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; - rpc_response->set_status(kMasterOk); - rpc_response->set_done(true); - rpc_done->Run(); + if (no_more_retry) { + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() + << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; + table->SetRollbackStatus(rpc_request->rollback_name(), true); + rpc_response->set_status(kMasterOk); + rpc_response->set_done(true); + if (rpc_done) { + rpc_done->Run(); + } + } else { + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() + << ", write meta with snpashot_id " << rpc_request->snapshot_id() + << " partially done, wait for retry"; + } +} + +void MasterImpl::RestoreRollback() { + std::vector tables; + m_tablet_manager->ShowTable(&tables, NULL); + for (uint32_t table_i = 0; table_i < tables.size(); ++table_i) { + TablePtr cur_table = tables[table_i]; + std::vector > > retries; + cur_table->GetRollbackTablets(&retries); + for (uint32_t ri = 0; ri < retries.size(); ++ri) { + RollbackRequest* request = new RollbackRequest; + RollbackResponse* response = new RollbackResponse; + request->set_sequence_id(0); + request->set_table_name(cur_table->GetTableName()); + request->set_snapshot_id(retries[ri].first.snapshot_id()); + request->set_rollback_name(retries[ri].first.name()); + + RollbackTask* task = new RollbackTask; + task->tablets = retries[ri].second; + task->rollback_points.resize(task->tablets.size(), leveldb::kMaxSequenceNumber); + task->retry.resize(task->tablets.size(), false); + task->request = request; + task->response = response; + task->done = NULL; + task->table = cur_table; + task->task_num = 0; + task->finish_num = 0; + ThreadPool::Task threadpool_task = + boost::bind(&MasterImpl::ApplyRollbackTask, this, task); + m_thread_pool->AddTask(threadpool_task); + } + } } void MasterImpl::ClearUnusedSnapshots(TabletPtr tablet, const TabletMeta& meta) { diff --git a/src/master/master_impl.h b/src/master/master_impl.h old mode 100755 new mode 100644 index 15d8104a8..547d662cf --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -319,6 +319,12 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); + void LoadRollbackCallback(TabletPtr tablet, int32_t retry_times, + LoadTabletResponse* rpc_response, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code); + void RestoreRollback(); void ScheduleQueryTabletNode(); void QueryTabletNode(); void QueryTabletNodeAsync(std::string addr, int32_t timeout, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index de1aa4f16..58a295b4f 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -348,7 +348,7 @@ void Tablet::DelSnapshot(int32_t id) { snapshot_list->RemoveLast(); } -int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { +int32_t Tablet::AddRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point) { MutexLock lock(&m_mutex); Rollback rollback; rollback.set_name(name); @@ -358,6 +358,23 @@ int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rol return m_meta.rollbacks_size() - 1; } +int32_t Tablet::DelRollback(const std::string& name) { + MutexLock lock(&m_mutex); + int32_t rollback_size = m_meta.rollbacks_size(); + bool rollback_exist = false; + for (int32_t i = 0; i < rollback_size; ++i) { + if (m_meta.rollbacks(i).name() == name) { + VLOG(10) << "delete tablet rollback " << m_meta.rollbacks(i).ShortDebugString(); + m_meta.mutable_rollbacks()->SwapElements(i, rollback_size - 1); + m_meta.mutable_rollbacks()->RemoveLast(); + rollback_exist = true; + break; + } + } + assert(rollback_exist); + return m_meta.rollbacks_size() - 1; +} + void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); for (int i = 0; i < m_meta.rollbacks_size(); i++) { @@ -366,7 +383,7 @@ void Tablet::ListRollback(std::vector* rollbacks) { } } -int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { +int32_t Tablet::UpdateRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point) { MutexLock lock(&m_mutex); bool has_rollback_name = false; for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { @@ -651,53 +668,72 @@ void Table::ListSnapshot(std::vector* snapshots) { *snapshots = m_snapshot_list; } -int32_t Table::AddRollback(std::string rollback_name) { +int32_t Table::AddRollback(const std::string& rollback_name) { MutexLock lock(&m_mutex); - m_rollback_names.push_back(rollback_name); - return m_rollback_names.size() - 1; + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it == m_rollbacks.end()); + m_rollbacks[rollback_name] = false; + return m_rollbacks.size() - 1; +} + +int32_t Table::DelRollback(const std::string& rollback_name) { + MutexLock lock(&m_mutex); + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it != m_rollbacks.end()); + m_rollbacks.erase(it); + return m_rollbacks.size() - 1; } void Table::ListRollback(std::vector* rollback_names) { MutexLock lock(&m_mutex); - *rollback_names = m_rollback_names; + std::map::iterator it = m_rollbacks.begin(); + for (; it != m_rollbacks.end(); ++it) { + rollback_names->push_back(it->first); + } } -void Table::GetRollbackStatus(std::string rollback_name, bool* exists, bool* done) { +void Table::GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done) { MutexLock lock(&m_mutex); - std::vector::iterator it = - std::find(m_rollback_names.begin(), m_rollback_names.end(), rollback_name); - if (it == m_rollback_names.end()) { + std::map::iterator it = m_rollbacks.find(rollback_name); + if (it == m_rollbacks.end()) { *exists = false; *done = false; } else { *exists = true; - Table::TabletList::iterator it = m_tablets_list.begin(); - for (; it != m_tablets_list.end(); ++it) { - TabletPtr tablet = it->second; - int32_t rollback_size = tablet->m_meta.rollbacks_size(); - for (int32_t i = 0; i < rollback_size; ++i) { - if (tablet->m_meta.rollbacks(i).name() == rollback_name && - tablet->m_meta.rollbacks(i).rollback_point() == leveldb::kMaxSequenceNumber) { - LOG(INFO) << "rollabck tablet " << tablet->m_meta.path() << " has not complete yet"; - *done = false; - return; - } - } - } - *done = true; + *done = it->second; } } -void Table::GetRollbackTablets(const std::string& rollback_name, std::vector* tablet_list) { +void Table::SetRollbackStatus(const std::string& rollback_name, bool done) { MutexLock lock(&m_mutex); - Table::TabletList::iterator it = m_tablets_list.begin(); - for (; it != m_tablets_list.end(); ++it) { - TabletPtr tablet = it->second; - int32_t rollback_size = tablet->m_meta.rollbacks_size(); - for (int32_t i = 0; i < rollback_size; ++i) { - if (tablet->m_meta.rollbacks(i).name() == rollback_name && - tablet->m_meta.rollbacks(i).rollback_point() == leveldb::kMaxSequenceNumber) { - tablet_list->push_back(tablet); + std::map::iterator it = m_rollbacks.find(rollback_name); + assert(it != m_rollbacks.end()); + it->second = done; +} + +void Table::GetRollbackTablets(std::vector > >* rollback_tablets) { + MutexLock lock(&m_mutex); + for (std::map::iterator it = m_rollbacks.begin(); + it != m_rollbacks.end(); ++it) { + if (!it->second) { + VLOG(6) << "rollback " << it->first << " marked undone."; + std::vector tablets; + Rollback rollback; + for (TabletList::iterator tablet_it = m_tablets_list.begin(); tablet_it != m_tablets_list.end(); + ++tablet_it) { + TabletPtr cur_tablet = tablet_it->second; + for (int32_t ri = 0; ri < cur_tablet->m_meta.rollbacks_size(); ++ri) { + if (cur_tablet->m_meta.rollbacks(ri).name() == it->first) { + rollback.CopyFrom(cur_tablet->m_meta.rollbacks(ri)); + tablets.push_back(cur_tablet); + } + } + } + if (tablets.empty()) { + it->second = true; + VLOG(6) << "rollback " << it->first << " already done."; + } else { + rollback_tablets->push_back(std::make_pair(rollback, tablets)); } } } @@ -732,8 +768,9 @@ void Table::ToMeta(TableMeta* meta) { for (size_t i = 0; i < m_snapshot_list.size(); i++) { meta->add_snapshot_list(m_snapshot_list[i]); } - for (size_t i = 0; i < m_rollback_names.size(); ++i) { - meta->add_rollback_names(m_rollback_names[i]); + std::map::iterator it = m_rollbacks.begin(); + for (; it != m_rollbacks.end(); ++it) { + meta->add_rollback_names(it->first); } } @@ -829,7 +866,7 @@ bool TabletManager::AddTable(const std::string& table_name, LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i); } for (int32_t i = 0; i < meta.rollback_names_size(); ++i) { - (*table)->m_rollback_names.push_back(meta.rollback_names(i)); + (*table)->m_rollbacks[meta.rollback_names(i)] = false; LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i); } (*table)->m_mutex.Unlock(); diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index 493cc13b5..6d8a4bfcb 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -106,8 +106,9 @@ class Tablet { int32_t AddSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); - int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); - int32_t UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t AddRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point); + int32_t DelRollback(const std::string& name); + int32_t UpdateRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point); void ListRollback(std::vector* rollbacks); // is belong to a table? @@ -176,11 +177,13 @@ class Table { int32_t AddSnapshot(uint64_t snapshot); int32_t DelSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshots); - int32_t AddRollback(std::string rollback_name); + int32_t AddRollback(const std::string& rollback_name); + int32_t DelRollback(const std::string& rollback_name); void ListRollback(std::vector* rollback_names); - int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} - void GetRollbackStatus(std::string rollbaack_name, bool* exists, bool* done); - void GetRollbackTablets(const std::string& rollback_name, std::vector* tablet_list); + int32_t GetRollbackSize() {return m_rollbacks.size() - 1;} + void GetRollbackStatus(const std::string& rollback_name, bool* exists, bool* done); + void SetRollbackStatus(const std::string& rollback_name, bool done); + void GetRollbackTablets(std::vector > >* rollback_tablets); void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, @@ -199,7 +202,7 @@ class Table { std::string m_name; TableSchema m_schema; std::vector m_snapshot_list; - std::vector m_rollback_names; + std::map m_rollbacks; TableStatus m_status; uint32_t m_deleted_tablet_num; uint64_t m_max_tablet_no; diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 53b0df546..5b90f495a 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -76,6 +76,7 @@ message LoadTabletRequest { message LoadTabletResponse { required StatusCode status = 1; required uint64 sequence_id = 2; + repeated Rollback rollbacks = 3; } message UnloadTabletRequest { diff --git a/src/tabletnode/remote_tabletnode.cc b/src/tabletnode/remote_tabletnode.cc index 5587bb76f..512f99997 100644 --- a/src/tabletnode/remote_tabletnode.cc +++ b/src/tabletnode/remote_tabletnode.cc @@ -320,7 +320,7 @@ void RemoteTabletNode::DoRollback(google::protobuf::RpcController* controller, google::protobuf::Closure* done) { uint64_t id = request->sequence_id(); LOG(INFO) << "accept RPC (Rollback) id: " << id; - m_tabletnode_impl->Rollback(request, response, done); + m_tabletnode_impl->GetRollback(request, response, done); LOG(INFO) << "finish RPC (Rollback) id: " << id; } diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index 6a26ac301..697bfe5ad 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -230,14 +230,8 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, snapshots[request->snapshots_id(i)] = request->snapshots_sequence(i); } - // to recover rollbacks + //////////// TODO /////////////////// std::map rollbacks; - int32_t num_of_rollbacks = request->rollbacks_size(); - for (int32_t i = 0; i < num_of_rollbacks; ++i) { - rollbacks[request->rollbacks(i).snapshot_id()] = request->rollbacks(i).rollback_point(); - VLOG(10) << "load tablet with rollback " << request->rollbacks(i).snapshot_id() - << "-" << request->rollbacks(i).rollback_point(); - } LOG(INFO) << "start load tablet, id: " << request->sequence_id() << ", table: " << request->tablet_name() @@ -283,6 +277,21 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, tablet_io->DecRef(); response->set_status(kTabletNodeOk); } + // recover rollbacks + int32_t num_of_rollbacks = request->rollbacks_size(); + for (int32_t i = 0; i < num_of_rollbacks; ++i) { + Rollback cur_rollback = request->rollbacks(i); + uint64_t rollback_point = tablet_io->GetRollback(cur_rollback.snapshot_id(), &status); + VLOG(10) << "recover rollback: " << cur_rollback.ShortDebugString(); + if (cur_rollback.rollback_point() == leveldb::kMaxSequenceNumber) { + Rollback rollback; + rollback.set_name(cur_rollback.name()); + rollback.set_snapshot_id(cur_rollback.snapshot_id()); + rollback.set_rollback_point(rollback_point); + response->add_rollbacks()->CopyFrom(rollback); + VLOG(10) << "new rollback: " << rollback.ShortDebugString(); + } + } LOG(INFO) << "load tablet: " << request->path() << " [" << DebugString(key_start) << ", " << DebugString(key_end) << "]"; @@ -594,7 +603,7 @@ void TabletNodeImpl::ReleaseSnapshot(const ReleaseSnapshotRequest* request, done->Run(); } -void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, +void TabletNodeImpl::GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, google::protobuf::Closure* done) { StatusCode status = kTabletNodeOk; io::TabletIO* tablet_io = m_tablet_manager->GetTablet(request->table_name(), @@ -610,7 +619,7 @@ void TabletNodeImpl::Rollback(const SnapshotRollbackRequest* request, SnapshotRo done->Run(); return; } - uint64_t rollback_point = tablet_io->Rollback(request->snapshot_id(), &status); + uint64_t rollback_point = tablet_io->GetRollback(request->snapshot_id(), &status); if (status != kTabletNodeOk) { response->set_status(status); } else { diff --git a/src/tabletnode/tabletnode_impl.h b/src/tabletnode/tabletnode_impl.h index 7e847c6a6..6e6ce61c4 100644 --- a/src/tabletnode/tabletnode_impl.h +++ b/src/tabletnode/tabletnode_impl.h @@ -14,6 +14,7 @@ #include "proto/master_rpc.pb.h" #include "proto/tabletnode.pb.h" #include "proto/tabletnode_rpc.pb.h" +#include "proto/table_meta.pb.h" #include "tabletnode/rpc_compactor.h" #include "tabletnode/tabletnode_sysinfo.h" #include "utils/rpc_timer_list.h" @@ -82,7 +83,7 @@ class TabletNodeImpl { ReleaseSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, + void GetRollback(const SnapshotRollbackRequest* request, SnapshotRollbackResponse* response, google::protobuf::Closure* done); void Query(const QueryRequest* request, QueryResponse* response, From 1b2eddb0a0b102c558412ab8994aa0a6ab293b57 Mon Sep 17 00:00:00 2001 From: lylei Date: Wed, 4 Nov 2015 16:11:39 +0800 Subject: [PATCH 7/7] issue=#324 small fixes --- ft_test.sh | 4 ++-- src/master/master_impl.cc | 5 +++-- src/master/tablet_manager.cc | 4 ++-- src/master/tablet_manager.h | 4 ++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ft_test.sh b/ft_test.sh index 5315430e3..93bbede3a 100644 --- a/ft_test.sh +++ b/ft_test.sh @@ -17,8 +17,8 @@ sh kill_tera.sh sh launch_tera.sh sleep 2 -export PYTHONPATH=$PYTHONPATH:../../thirdparty/include/; export PATH=$PATH:../../thirdparty/bin/ -nosetests -s -v testcase/test_rollback.py> ../log/test.log +export PYTHONPATH=$PYTHONPATH:../../thirdparty/include/; export PATH=$PATH:../../thirdparty/bin/ +nosetests -s -v > ../log/test.log sh kill_tera.sh diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index e1976faeb..befc69045 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -2544,8 +2544,10 @@ void MasterImpl::LoadRollbackCallback(TabletPtr tablet, int32_t retry_times, WriteClosure* done = NewClosure(this, &MasterImpl::LoadRollbackCallback, tablet, retry_times - 1, rpc_response); - SuspendMetaOperation(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), false, done); + SuspendMetaOperation(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2), + false, done); } + delete rpc_response; return; } for (int32_t i = 0; i < rpc_response->rollbacks_size(); ++i) { @@ -3290,7 +3292,6 @@ void MasterImpl::AddRollbackCallback(TablePtr table, << StatusCodeToString(status) << ", " << tablets[0] << "..."; } if (retry_times <= 0) { - ///////// set rollback to default ///////////////// ////////// TODO /////////// rpc_response->set_status(kMetaTabletError); if (rpc_done) { diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 58a295b4f..3a6f177bf 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -348,7 +348,7 @@ void Tablet::DelSnapshot(int32_t id) { snapshot_list->RemoveLast(); } -int32_t Tablet::AddRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point) { +int32_t Tablet::AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) { MutexLock lock(&m_mutex); Rollback rollback; rollback.set_name(name); @@ -383,7 +383,7 @@ void Tablet::ListRollback(std::vector* rollbacks) { } } -int32_t Tablet::UpdateRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point) { +int32_t Tablet::UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point) { MutexLock lock(&m_mutex); bool has_rollback_name = false; for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index 6d8a4bfcb..3e8c3aa76 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -106,9 +106,9 @@ class Tablet { int32_t AddSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); - int32_t AddRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point); + int32_t AddRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point); int32_t DelRollback(const std::string& name); - int32_t UpdateRollback(const std::string& name, const uint64_t snapshot_id, const uint64_t rollback_point); + int32_t UpdateRollback(const std::string& name, uint64_t snapshot_id, uint64_t rollback_point); void ListRollback(std::vector* rollbacks); // is belong to a table?