diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index d56aebd24..7ad73056b 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -134,6 +134,7 @@ bool TabletIO::Load(const TableSchema& schema, StatusCode* status) { { MutexLock lock(&m_mutex); + if (m_status == kReady && m_start_key == key_start && m_end_key == key_end) { return true; @@ -258,7 +259,14 @@ bool TabletIO::Load(const TableSchema& schema, m_status = kReady; m_db_ref_count--; } - + + // TODO: test + timeval mtime; + gettimeofday(&mtime, NULL); + VLOG(20) << __func__ << ", Avail, " << m_tablet_path + << ", " << DebugString(m_start_key) << ", " << DebugString(m_end_key) + << ", tv_sec " << mtime.tv_sec << ", tv_usec " << mtime.tv_usec; + LOG(INFO) << "[Load] Load " << m_tablet_path << " done"; return true; } @@ -266,7 +274,7 @@ bool TabletIO::Load(const TableSchema& schema, bool TabletIO::Unload(StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kSplited) { + if (m_status != kReady) { SetStatusCode(m_status, status); return false; } @@ -281,7 +289,14 @@ bool TabletIO::Unload(StatusCode* status) { MutexLock lock(&m_mutex); m_status = kUnLoading2; } - + + // TODO: test + timeval mtime; + gettimeofday(&mtime, NULL); + VLOG(20) << __func__ << ", NotAvail, " << m_tablet_path + << ", " << DebugString(m_start_key) << ", " << DebugString(m_end_key) + << ", tv_sec " << mtime.tv_sec << ", tv_usec " << mtime.tv_usec; + uint32_t retry = 0; while (m_db_ref_count > 1) { LOG(ERROR) << "tablet is busy, db ref: " << m_db_ref_count @@ -314,83 +329,62 @@ bool TabletIO::Unload(StatusCode* status) { { MutexLock lock(&m_mutex); - m_status = kNotInit; + m_status = kFrozen; m_db_ref_count--; } return true; } -bool TabletIO::Split(std::string* split_key, StatusCode* status) { +bool TabletIO::GetMidKey(std::string* mid_key, StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady) { + if (m_status != kReady && + m_status != kUnLoading) { SetStatusCode(m_status, status); return false; } - if (m_compact_status == kTableOnCompact) { - SetStatusCode(kTableNotSupport, status); - return false; - } - m_status = kOnSplit; m_db_ref_count++; } - int64_t table_size = GetDataSize(NULL, status); - if (table_size <= 0) { - SetStatusCode(kTableNotSupport, status); - MutexLock lock(&m_mutex); - m_status = kReady; - m_db_ref_count--; - return false; - } - + // find mid key std::string raw_split_key; if (!m_db->FindSplitKey(m_raw_start_key, m_raw_end_key, 0.5, &raw_split_key)) { - VLOG(5) << "fail to find split key"; - SetStatusCode(kTableNotSupport, status); MutexLock lock(&m_mutex); - m_status = kReady; + SetStatusCode(kTableNotSupport, status); m_db_ref_count--; return false; } - + + // parse key leveldb::Slice key_split; - if (m_kv_only && m_table_schema.raw_key() == Readable) { key_split = raw_split_key; - } else { // Table && TTL-KV + } else { leveldb::Slice cf_split; leveldb::Slice qu_split; if (!m_key_operator->ExtractTeraKey(raw_split_key, &key_split, &cf_split, &qu_split, NULL, NULL)) { - VLOG(5) << "fail to extract split key"; SetStatusCode(kTableNotSupport, status); MutexLock lock(&m_mutex); - m_status = kReady; m_db_ref_count--; return false; } } - VLOG(5) << "start: [" << DebugString(m_start_key) - << "], end: [" << DebugString(m_end_key) - << "], split: [" << DebugString(key_split.ToString()) << "]"; - - if (key_split.empty() || key_split.ToString() <= m_start_key - || (!m_end_key.empty() && key_split.ToString() >= m_end_key)) { + // sanity check key + if (key_split.empty() || key_split.ToString() <= m_start_key || + (!m_end_key.empty() && key_split.ToString() >= m_end_key)) { SetStatusCode(kTableNotSupport, status); MutexLock lock(&m_mutex); - m_status = kReady; m_db_ref_count--; return false; } - - *split_key = key_split.ToString(); - + + *mid_key = key_split.ToString(); { MutexLock lock(&m_mutex); - m_status = kSplited; + SetStatusCode(kTabletNodeOk, status); m_db_ref_count--; } return true; @@ -444,8 +438,8 @@ int64_t TabletIO::GetDataSize(const std::string& start_key, StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kOnSplit - && m_status != kSplited && m_status != kUnLoading) { + if (m_status != kReady && + m_status != kUnLoading) { SetStatusCode(m_status, status); return -1; } @@ -526,8 +520,8 @@ int64_t TabletIO::GetDataSize(std::vector* lgsize, StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kOnSplit - && m_status != kSplited && m_status != kUnLoading) { + if (m_status != kReady && + m_status != kUnLoading) { SetStatusCode(m_status, status); return -1; } @@ -932,9 +926,10 @@ bool TabletIO::ReadCells(const RowReaderInfo& row_reader, RowResult* value_list, uint64_t snapshot_id, StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kOnSplit - && m_status != kSplited && m_status != kUnLoading) { - if (m_status == kUnLoading2) { + if (m_status != kReady && + m_status != kUnLoading) { + if (m_status == kUnLoading2 || + m_status == kFrozen) { // keep compatable for old sdk protocol // we can remove this in the future. SetStatusCode(kUnLoading, status); @@ -1078,9 +1073,10 @@ bool TabletIO::Write(const WriteTabletRequest* request, StatusCode* status) { { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kOnSplit - && m_status != kSplited && m_status != kUnLoading) { - if (m_status == kUnLoading2) { + if (m_status != kReady && + m_status != kUnLoading) { + if (m_status == kUnLoading2 || + m_status == kFrozen) { // keep compatable for old sdk protocol // we can remove this in the future. SetStatusCode(kUnLoading, status); @@ -1106,9 +1102,10 @@ bool TabletIO::ScanRows(const ScanTabletRequest* request, StatusCode status = kTabletNodeOk; { MutexLock lock(&m_mutex); - if (m_status != kReady && m_status != kOnSplit - && m_status != kSplited && m_status != kUnLoading) { - if (m_status == kUnLoading2) { + if (m_status != kReady && + m_status != kUnLoading) { + if (m_status == kUnLoading2 || + m_status == kFrozen) { // keep compatable for old sdk protocol // we can remove this in the future. SetStatusCode(kUnLoading, &status); @@ -1738,6 +1735,12 @@ const leveldb::RawKeyOperator* TabletIO::GetRawKeyOperator() { } void TabletIO::GetAndClearCounter(TabletCounter* counter, int64_t interval) { + std::string mid_key; + StatusCode status = kTabletNodeOk; + if (!GetMidKey(&mid_key, &status)) { + mid_key.clear(); + } + counter->set_low_read_cell(m_counter.low_read_cell.Clear() * 1000000 / interval); counter->set_scan_rows(m_counter.scan_rows.Clear() * 1000000 / interval); counter->set_scan_kvs(m_counter.scan_kvs.Clear() * 1000000 / interval); @@ -1749,6 +1752,8 @@ void TabletIO::GetAndClearCounter(TabletCounter* counter, int64_t interval) { counter->set_write_kvs(m_counter.write_kvs.Clear() * 1000000 / interval); counter->set_write_size(m_counter.write_size.Clear() * 1000000 / interval); counter->set_is_on_busy(IsBusy()); + + counter->set_mid_key(mid_key); } int32_t TabletIO::AddRef() { diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index 979ac959a..e6f000343 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -38,10 +38,9 @@ class TabletIO { kNotInit = kTabletNotInit, kReady = kTabletReady, kOnLoad = kTabletOnLoad, - kOnSplit = kTabletOnSplit, - kSplited = kTabletSplited, kUnLoading = kTabletUnLoading, - kUnLoading2 = kTabletUnLoading2 + kUnLoading2 = kTabletUnLoading2, + kFrozen = kTabletFrozen, }; typedef std::map< std::string, std::set > ColumnFamilyMap; struct ScanOptions { @@ -96,7 +95,9 @@ class TabletIO { leveldb::TableCache* table_cache = NULL, StatusCode* status = NULL); virtual bool Unload(StatusCode* status = NULL); - virtual bool Split(std::string* split_key, StatusCode* status = NULL); + + virtual bool GetMidKey(std::string* mid_key, StatusCode* status); + virtual bool Compact(StatusCode* status = NULL); bool CompactMinor(StatusCode* status = NULL); bool Destroy(StatusCode* status = NULL); diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 2f9f6e0c5..ca1fb00a1 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -589,6 +589,9 @@ bool Version::FindSplitKey(const Slice* smallest_user_key, split_size += files_[step_level][now_pos[step_level]]->file_size; now_pos[step_level] ++; } + if (largest_file == NULL) { + return false; + } *split_key = largest_file->largest.user_key().ToString(); return true; } diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc index 61036d177..46a051764 100644 --- a/src/leveldb/util/env_posix.cc +++ b/src/leveldb/util/env_posix.cc @@ -742,6 +742,7 @@ class PosixEnv : public Env { close(fd); locks_.Remove(fname); } else { + //fprintf(stderr, "%s, lock %s\n", __func__, fname.c_str()); PosixFileLock* my_lock = new PosixFileLock; my_lock->fd_ = fd; my_lock->name_ = fname; @@ -756,6 +757,7 @@ class PosixEnv : public Env { if (LockOrUnlock(my_lock->fd_, false) == -1) { result = IOError("unlock", errno); } + //fprintf(stderr, "%s, unlock %s\n", __func__, my_lock->name_.c_str()); locks_.Remove(my_lock->name_); close(my_lock->fd_); delete my_lock; diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 0d06c587a..52dd9151d 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -49,6 +49,8 @@ DECLARE_int32(tera_master_meta_retry_times); DECLARE_bool(tera_zk_enabled); DECLARE_int64(tera_master_split_tablet_size); +DECLARE_int64(debug_tera_master_split_phase_crash); + DECLARE_int64(tera_master_merge_tablet_size); DECLARE_bool(tera_master_kick_tabletnode_enabled); DECLARE_int32(tera_master_kick_tabletnode_query_fail_times); @@ -144,6 +146,30 @@ MasterImpl::~MasterImpl() { delete m_stat_table; } +inline void MasterImpl::DebugTeraMasterCrashOrSuspend( + enum debug_tera_master_crash_func_set debug_func, int64_t phase) +{ +#ifndef DNDEBUG + LOG(INFO) << FLAGS_debug_tera_master_split_phase_crash << ", phase " << phase; + switch (debug_func) { + case DEBUG_master_split_crash_or_suspend: + { + if (FLAGS_debug_tera_master_split_phase_crash != 50 && + FLAGS_debug_tera_master_split_phase_crash != 51 && + FLAGS_debug_tera_master_split_phase_crash != 60 && + FLAGS_debug_tera_master_split_phase_crash != 61 && + FLAGS_debug_tera_master_split_phase_crash != 70 && + FLAGS_debug_tera_master_split_phase_crash != 71) { + break; + } + CHECK(FLAGS_debug_tera_master_split_phase_crash != phase); + break; + } + default:; + } +#endif +} + bool MasterImpl::Init() { if (FLAGS_tera_zk_enabled) { m_zk_adapter.reset(new MasterZkAdapter(this, m_local_addr)); @@ -154,7 +180,6 @@ bool MasterImpl::Init() { LOG(INFO) << "fake zk mode!"; m_zk_adapter.reset(new FakeMasterZkAdapter(this, m_local_addr)); } - SetMasterStatus(kIsSecondary); m_thread_pool->AddTask(boost::bind(&MasterImpl::InitAsync, this)); return true; @@ -320,6 +345,18 @@ bool MasterImpl::RestoreMetaTablet(const std::vector& tablet_list, } void MasterImpl::RestoreUserTablet(const std::vector& report_meta_list) { + // repair with split log + std::vector log_tablets; + m_tablet_manager->GetTabletWithOpLog(&log_tablets); + + std::vector::iterator log_it = log_tablets.begin(); + for (; log_it != log_tablets.end(); ++log_it) { + LOG(INFO) << __func__ << ", status " << StatusCodeToString((*log_it)->GetStatus()) + << ", path " << (*log_it)->GetPath(); + (*log_it)->SetStatusIf(kTableReady, kTableNotInit); + (*log_it)->SetStatusIf(kTableOnSplit, kTableReady); + } + std::vector::const_iterator meta_it = report_meta_list.begin(); for (; meta_it != report_meta_list.end(); ++meta_it) { const TabletMeta& meta = *meta_it; @@ -330,26 +367,40 @@ void MasterImpl::RestoreUserTablet(const std::vector& report_meta_li const std::string& server_addr = meta.server_addr(); int64_t size = meta.table_size(); CompactStatus compact_status = meta.compact_status(); - - TabletPtr tablet; - if (!m_tablet_manager->FindTablet(table_name, key_start, &tablet) - || !tablet->Verify(table_name, key_start, key_end, path, server_addr)) { - LOG(INFO) << "unload unexpected table: " << path << ", server: " - << server_addr; - TabletMeta unknown_meta = meta; - unknown_meta.set_status(kTableUnLoading); - TabletPtr unknown_tablet(new Tablet(unknown_meta)); - UnloadClosure* done = - NewClosure(this, &MasterImpl::UnloadTabletCallback, - unknown_tablet, FLAGS_tera_master_impl_retry_times); - UnloadTabletAsync(unknown_tablet, done); + + TabletNodePtr node; + bool node_valid = m_tabletnode_manager->FindTabletNode(server_addr, &node); + // split repair + if ( node_valid && + m_tablet_manager->RepairWithSplitLog(log_tablets, meta, node->m_uuid)) { + // nothing done here } else { - tablet->SetStatus(kTableReady); - tablet->SetSize(size); - tablet->SetCompactStatus(compact_status); + TabletPtr tablet; + if (!node_valid || !m_tablet_manager->FindTablet(table_name, key_start, &tablet) + || !tablet->Verify(table_name, key_start, key_end, path, server_addr)) { + LOG(INFO) << "unload unexpected table " << path << ", server: " << server_addr; + TabletMeta unknown_meta = meta; + unknown_meta.set_status(kTableUnLoading); + TabletPtr unknown_tablet(new Tablet(unknown_meta)); + UnloadClosure* done = + NewClosure(this, &MasterImpl::UnloadTabletCallback, + unknown_tablet, FLAGS_tera_master_impl_retry_times); + UnloadTabletAsync(unknown_tablet, done); + } else { + tablet->SetServerId(node->m_uuid); + tablet->SetStatus(kTableReady); + tablet->SetSize(size); + tablet->SetCompactStatus(compact_status); + } } } + log_it = log_tablets.begin(); + for (; log_it != log_tablets.end(); ++log_it) { + SplitTablet(*log_it, 2); + } + + // reload all offline tablet std::vector dead_node_tablet_list; std::vector all_tablet_list; m_tablet_manager->ShowTable(NULL, &all_tablet_list); @@ -357,7 +408,8 @@ void MasterImpl::RestoreUserTablet(const std::vector& report_meta_li for (it = all_tablet_list.begin(); it != all_tablet_list.end(); ++it) { TabletPtr tablet = *it; const std::string& server_addr = tablet->GetServerAddr(); - if (tablet->GetStatus() == kTableReady) { + if ((tablet->GetStatus() == kTableReady) || + (tablet->GetStatus() == kTableOnSplit)) { VLOG(8) << "READY Tablet, " << tablet; continue; } @@ -1163,6 +1215,7 @@ void MasterImpl::TabletCmdCtrl(const CmdCtrlRequest* request, if (tablet_id == t->GetPath()) { tablet = t; found = true; + break; } } if (!found) { @@ -1186,12 +1239,7 @@ void MasterImpl::TabletCmdCtrl(const CmdCtrlRequest* request, response->set_status(kInvalidArgument); return; } - std::string split_key; - if (request->arg_list_size() == 3) { - split_key = request->arg_list(2); - LOG(INFO) << "ignore user specified split key: not support"; - } - TrySplitTablet(tablet); + SplitTablet(tablet, 0); response->set_status(kMasterOk); } else { response->set_status(kInvalidArgument); @@ -1546,8 +1594,7 @@ bool MasterImpl::TabletNodeLoadBalance(TabletNodePtr tabletnode, Scheduler* sche // tablet size is error, skip it continue; } else if (tablet->GetDataSize() > (split_size << 20)) { - TrySplitTablet(tablet); - any_tablet_split = true; + any_tablet_split = SplitTablet(tablet, 0); continue; } else if (tablet->GetDataSize() < (merge_size << 20)) { TryMergeTablet(tablet); @@ -1841,6 +1888,8 @@ void MasterImpl::TabletNodeRecoveryCallback(std::string addr, if (tablet->GetStatus() == kTableOffLine) { LOG(INFO) << "try load, " << tablet; TryLoadTablet(tablet, addr); + } else { + RescheduleOnSplitWait(tablet); } } @@ -1879,18 +1928,14 @@ void MasterImpl::DeleteTabletNode(const std::string& tabletnode_addr) { ProcessOffLineTablet(tablet); } - if (tablet->SetStatusIf(kTableOnSplit, kTableSplitFail)) { - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - ScanMetaTableAsync(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); - } - if (tablet->GetTableName() == FLAGS_tera_master_meta_table_name && tablet->GetStatus() == kTableOffLine) { LOG(INFO) << "try move meta tablet"; TryLoadTablet(tablet); } + + // reschedule tablet (on wait split state) to other ts + RescheduleOnSplitWait(tablet); } TryEnterSafeMode(); @@ -2011,27 +2056,19 @@ bool MasterImpl::LeaveSafeMode(StatusCode* status) { return true; } -void MasterImpl::LoadAllOffLineTablets() { +void MasterImpl::LoadAllDeadNodeTablets() { std::vector all_tablet_list; m_tablet_manager->ShowTable(NULL, &all_tablet_list); std::vector::iterator it; for (it = all_tablet_list.begin(); it != all_tablet_list.end(); ++it) { TabletPtr tablet = *it; - if (tablet->GetStatus() == kTableOffLine) { - LOG(INFO) << "try move, " << tablet; - TryLoadTablet(tablet); - } - } -} -void MasterImpl::LoadAllDeadNodeTablets() { - std::vector all_tablet_list; - m_tablet_manager->ShowTable(NULL, &all_tablet_list); + // reschudule wait split + if (RescheduleOnSplitWait(tablet)) { + continue; + } - std::vector::iterator it; - for (it = all_tablet_list.begin(); it != all_tablet_list.end(); ++it) { - TabletPtr tablet = *it; if (tablet->GetStatus() != kTableOffLine) { continue; } @@ -2350,8 +2387,7 @@ void MasterImpl::UnloadTabletCallback(TabletPtr tablet, int32_t retry, UnloadTabletResponse* response, bool failed, int error_code) { CHECK(tablet->GetStatus() == kTableUnLoading - || tablet->GetStatus() == kTableOnLoad - || tablet->GetStatus() == kTableOnSplit); + || tablet->GetStatus() == kTableOnLoad); StatusCode status = response->status(); delete request; delete response; @@ -2369,12 +2405,6 @@ void MasterImpl::UnloadTabletCallback(TabletPtr tablet, int32_t retry, } else if (tablet->SetAddrAndStatusIf("", kTableOffLine, kTableOnLoad)) { ProcessOffLineTablet(tablet); TryLoadTablet(tablet); - } else { - CHECK(tablet->GetStatus() == kTableOnSplit); - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - ScanMetaTableAsync(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); } return; } @@ -2415,29 +2445,6 @@ void MasterImpl::UnloadTabletCallback(TabletPtr tablet, int32_t retry, } node->FinishLoad(next_tablet); } - } else { - CHECK(tablet->GetStatus() == kTableOnSplit); - // don't know split result, scan meta to determine the result - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - ScanMetaTableAsync(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); - - // split next tablet - TabletNodePtr node; - if (m_tabletnode_manager->FindTabletNode(server_addr, &node) - && node->m_uuid == tablet->GetServerId()) { - node->FinishSplit(tablet); - TabletPtr next_tablet; - while (node->SplitNextWaitTablet(&next_tablet)) { - if (next_tablet->SetStatusIf(kTableOnSplit, kTableReady)) { - next_tablet->SetServerId(node->m_uuid); - SplitTabletAsync(next_tablet); - break; - } - node->FinishSplit(next_tablet); - } - } } return; } @@ -2455,8 +2462,7 @@ void MasterImpl::UnloadTabletCallback(TabletPtr tablet, int32_t retry, if (retry <= 0) { LOG(ERROR) << "abort UnloadTablet: kick tabletnode, " << tablet; tablet->SetStatusIf(kTableUnLoadFail, kTableUnLoading) - || tablet->SetStatusIf(kTableLoadFail, kTableOnLoad) - || tablet->SetStatusIf(kTableSplitFail, kTableOnSplit); + || tablet->SetStatusIf(kTableLoadFail, kTableOnLoad); TryKickTabletNode(tablet->GetServerAddr()); return; } @@ -3052,109 +3058,6 @@ void MasterImpl::RetryCollectTabletInfo(std::string addr, QueryTabletNodeAsync(addr, FLAGS_tera_master_collect_info_timeout, false, done); } -void MasterImpl::SplitTabletAsync(TabletPtr tablet) { - const std::string& table_name = tablet->GetTableName(); - const std::string& server_addr = tablet->GetServerAddr(); - const std::string& key_start = tablet->GetKeyStart(); - const std::string& key_end = tablet->GetKeyEnd(); - - tabletnode::TabletNodeClient node_client(server_addr, - FLAGS_tera_master_split_rpc_timeout); - - SplitTabletRequest* request = new SplitTabletRequest; - SplitTabletResponse* response = new SplitTabletResponse; - request->set_sequence_id(m_this_sequence_id.Inc()); - request->set_tablet_name(table_name); - request->mutable_key_range()->set_key_start(key_start); - request->mutable_key_range()->set_key_end(key_end); - request->add_child_tablets(tablet->GetTable()->GetNextTabletNo()); - request->add_child_tablets(tablet->GetTable()->GetNextTabletNo()); - - tablet->ToMeta(request->mutable_tablet_meta()); - std::vector snapshots; - tablet->GetTable()->ListSnapshot(&snapshots); - LOG(INFO) << "SplitTabletAsync snapshot num " << snapshots.size(); - Closure* done = - NewClosure(this, &MasterImpl::SplitTabletCallback, tablet); - - LOG(INFO) << "SplitTabletAsync id: " << request->sequence_id() << ", " - << tablet; - node_client.SplitTablet(request, response, done); -} - -void MasterImpl::SplitTabletCallback(TabletPtr tablet, - SplitTabletRequest* request, - SplitTabletResponse* response, - bool failed, int error_code) { - CHECK(tablet->GetStatus() == kTableOnSplit); - StatusCode status = response->status(); - delete request; - delete response; - const std::string& server_addr = tablet->GetServerAddr(); - - // fail - if (failed || (status != kTabletNodeOk && status != kTableNotSupport - && status != kMetaTabletError)) { - if (failed) { - LOG(WARNING) << "fail to split: " - << sofa::pbrpc::RpcErrorCodeToString(error_code) - << ", " << tablet; - } else { - LOG(WARNING) << "fail to split: " - << StatusCodeToString(status) << ", " << tablet; - } - UnloadClosure* done = - NewClosure(this, &MasterImpl::UnloadTabletCallback, tablet, - FLAGS_tera_master_impl_retry_times); - UnloadTabletAsync(tablet, done); - return; - } - - // success - if (status == kTabletNodeOk) { - // tabletnode unloaded the tablet - LOG(INFO) << "RPC SplitTablet success"; - } else if (status == kTableNotSupport) { - // tabletnode refused to split and didn't unload the tablet - tablet->SetStatusIf(kTableReady, kTableOnSplit); - ProcessReadyTablet(tablet); - LOG(ERROR) << "ts refused to split tablet: " - << StatusCodeToString(status) << ", " << tablet; - } else { - CHECK(status == kMetaTabletError); - // meta table is not ok - LOG(ERROR) << "fail to split: " << StatusCodeToString(status) << ", " - << tablet; - } - - TabletNodePtr node; - if (m_tabletnode_manager->FindTabletNode(server_addr, &node) - && node->m_uuid == tablet->GetServerId()) { - node->FinishSplit(tablet); - TabletPtr next_tablet; - while (node->SplitNextWaitTablet(&next_tablet)) { - if (next_tablet->SetStatusIf(kTableOnSplit, kTableReady)) { - next_tablet->SetServerId(node->m_uuid); - SplitTabletAsync(next_tablet); - break; - } - node->FinishSplit(next_tablet); - } - } - - if (status == kTableNotSupport) { - return; - } - - // scan meta tablet - if (tablet->GetStatus() == kTableOnSplit) { - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - ScanMetaTableAsync(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); - } -} - void MasterImpl::TryLoadTablet(TabletPtr tablet, std::string server_addr) { if (!tablet->IsBound()) { return; @@ -3316,11 +3219,6 @@ void MasterImpl::RetryUnloadTablet(TabletPtr tablet, int32_t retry_times) { ProcessOffLineTablet(tablet); TryLoadTablet(tablet); } else { - CHECK(tablet->GetStatus() == kTableOnSplit); - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - ScanMetaTableAsync(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); } return; } @@ -3330,35 +3228,432 @@ void MasterImpl::RetryUnloadTablet(TabletPtr tablet, int32_t retry_times) { UnloadTabletAsync(tablet, done); } -bool MasterImpl::TrySplitTablet(TabletPtr tablet) { - const std::string& server_addr = tablet->GetServerAddr(); +bool MasterImpl::SplitTablet(TabletPtr tablet, uint32_t phase) +{ + // split sanity check, race condition + if (phase == 0) { + const std::string server_addr = tablet->GetServerAddr(); + TabletOpLog log; - // abort if server down + // TS shutdown + TabletNodePtr node; + if (!m_tabletnode_manager->FindTabletNode(server_addr, &node)) { + LOG(WARNING) << "TS " << server_addr << ", down"; + return false; + } + + // collect tablet's split log context + if (!tablet->CollectSplitContext(&log)) { + VLOG(10) << " no need split"; + return false; + } + + // race condition + if (!tablet->SetStatusIf(kTableOnSplit, kTableReady)) { + LOG(WARNING) << "split race condition fail, status " + << StatusCodeToString(tablet->GetStatus()); + return false; + } + // race success + tablet->SetTabletOpLog(log); + VLOG(20) << __func__ << ": phase " << phase << ", " << tablet->GetPath() + << ", keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() << ", mid_key " << log.mid_key() + << ", lchild " << log.lchild_tablet() << ", rchild " << log.rchild_tablet(); + phase = 1; + } + + // write log phase + if (phase == 1) { + CHECK(tablet->GetStatus() == kTableOnSplit); + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 50); + + std::string server_addr = tablet->GetServerAddr(); + std::string table_name = tablet->GetTableName(); + + VLOG(20) << tablet->GetPath() << ": split phase 1, tablet[addr " + << tablet->GetServerAddr() << ", uuid " << tablet->GetServerId() << "]"; + + // if TS server down, try on other node and rewrite tablet's addr. + // if TS restart, retry split on it. + TabletNodePtr node; + if (!m_tabletnode_manager->FindTabletNode(server_addr, &node)) { + server_addr.clear(); + while (server_addr.empty()) { + if (!m_tabletnode_manager->ScheduleTabletNode(m_size_scheduler.get(), + table_name, false, &server_addr)) { + LOG(WARNING) << "split reschule, no available ts: " << tablet; + tablet->SetStatusIf(kTableOnSplitWait, kTableOnSplit); + return false; + } + if (!server_addr.empty() && + !m_tabletnode_manager->FindTabletNode(server_addr, &node)) { + server_addr.clear(); + } + } + tablet->SetAddr(server_addr); + } + tablet->SetServerId(node->m_uuid); + + VLOG(20) << tablet->GetPath() << ": split phase 1, tablet[addr " + << tablet->GetServerAddr() << ", uuid " << tablet->GetServerId() << "]"; + + // now tablet's server_addr and m_uuid is valid + if (!node->TrySplit(tablet)) { + VLOG(20) << "split delay, ts split too much: " << tablet; + tablet->SetStatusIf(kTableOnSplitWait, kTableOnSplit); + return false; + } + + // use write meta rpc + TablePtr null_table; + std::vector tablets(1, tablet); + WriteClosure* done = + NewClosure(this, &MasterImpl::SplitTabletWriteLogCallback, + tablet, 0); + BatchWriteMetaTableAsync(null_table, tablets, false, done); + } + + // send split cmd to TS + if (phase == 2) { + CHECK(tablet->GetStatus() == kTableOnSplit); + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 60); + + TabletOpLog log; + tablet->GetTabletOpLog(&log); + VLOG(20) << "phase 2, " << tablet->GetTableName() << ", keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() << ", mid_key " << log.mid_key() + << ", lchild " << log.lchild_tablet() << ", rchild " << log.rchild_tablet(); + + SplitClosure* done = + NewClosure(this, &MasterImpl::SplitTabletCallback, tablet, 0); + SplitTabletAsync(tablet, done); + } + + // clear log phase + if (phase == 3) { + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 70); + + TabletOpLog log; + tablet->GetTabletOpLog(&log); + VLOG(20) << "phase 3, " << tablet->GetTableName() << ", keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() << ", mid_key " << log.mid_key() + << ", lchild " << log.lchild_tablet() << ", rchild " << log.rchild_tablet(); + + SplitTabletUpdateMetaAsync(tablet); + } + return true; +} + +// ts down or restart, reschedule OnsplitWait +bool MasterImpl::RescheduleOnSplitWait(TabletPtr tablet) +{ TabletNodePtr node; - if (!m_tabletnode_manager->FindTabletNode(server_addr, &node)) { - LOG(WARNING) << "abort split on " << server_addr << ": server down"; + if (tablet->GetStatus() != kTableOnSplitWait) { return false; } - - // delay split - if (!node->TrySplit(tablet)) { - LOG(INFO) << "delay split table " << tablet->GetPath() - << ", too many tablets are splitting on server: " << server_addr; + if (m_tabletnode_manager->FindTabletNode(tablet->GetServerAddr(), &node) && + (tablet->GetServerId() == node->m_uuid)) { return false; } - // abort if status switch to offline (server down / disable) - if (!tablet->SetStatusIf(kTableOnSplit, kTableReady)) { - LOG(ERROR) << "error state, abort split table " << tablet->GetPath(); + if (!tablet->SetStatusIf(kTableOnSplit, kTableOnSplitWait)) { return false; } - - // if server down here, let split callback take care of status switch - LOG(INFO) << "begin split table " << tablet->GetPath(); - tablet->SetServerId(node->m_uuid); - SplitTabletAsync(tablet); + SplitTablet(tablet, 1); return true; } +void MasterImpl::SplitTabletWriteLogCallback(TabletPtr tablet, + int32_t retry_times, + WriteTabletRequest *request, + WriteTabletResponse *response, + bool failed, int ErrCode) { + CHECK(tablet->GetStatus() == kTableOnSplit); + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + + // handle fail, retry + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(ERROR) << " rpc ErrCode " + << sofa::pbrpc::RpcErrorCodeToString(ErrCode) << ", " + << tablet; + } else { + LOG(ERROR) << "meta TS ErrCode " + << StatusCodeToString(status) << ", " << tablet; + } + // retry unlimited + TablePtr null_table; + std::vector tablets(1, tablet); + WriteClosure *done = + NewClosure(this, &MasterImpl::SplitTabletWriteLogCallback, + tablet, retry_times + 1); + SuspendMetaOperation(null_table, tablets, false, done); + return ; + } + + // handle success + VLOG(20) << tablet->GetTableName() << ", write log success, keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd(); + + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 51); + SplitTablet(tablet, 2); +} + +// send split cmd to TS +void MasterImpl::SplitTabletAsync(TabletPtr tablet, SplitClosure *done) { + SplitTabletRequest* request = new SplitTabletRequest; + SplitTabletResponse* response = new SplitTabletResponse; + CHECK(request); + CHECK(response); + + TabletOpLog log; + CHECK(tablet->GetTabletOpLog(&log)); + + // construct request and response + request->set_sequence_id(m_this_sequence_id.Inc()); + request->set_table_name(tablet->GetTableName()); + request->mutable_key_range()->set_key_start(tablet->GetKeyStart()); + request->mutable_key_range()->set_key_end(tablet->GetKeyEnd()); + + request->set_parent_tablet(leveldb::GetTabletNumFromPath(tablet->GetPath())); + request->add_child_tablets(log.lchild_tablet()); + request->add_child_tablets(log.rchild_tablet()); + request->set_mid_key(log.mid_key()); + + //request->set_session_id(m_this_session_id.Inc()); + request->mutable_schema()->CopyFrom(tablet->GetSchema()); + std::vector snapshot_id; + std::vector snapshot_sequence; + tablet->GetTable()->ListSnapshot(&snapshot_id); + tablet->ListSnapshot(&snapshot_sequence); + CHECK(snapshot_id.size() == snapshot_sequence.size()); + for (uint32_t i = 0; i < snapshot_id.size(); i++) { + request->add_snapshots_id(snapshot_id[i]); + request->add_snapshots_sequence(snapshot_sequence[i]); + } + + response->set_status(kTabletSplitError); + + // send split rpc + tabletnode::TabletNodeClient node_client(tablet->GetServerAddr(), + FLAGS_tera_master_split_rpc_timeout); + node_client.SplitTablet(request, response, done); +} + +void MasterImpl::SplitTabletCallback(TabletPtr tablet, + int32_t retry_times, + SplitTabletRequest* request, + SplitTabletResponse* response, + bool failed, int error_code) { + CHECK(tablet->GetStatus() == kTableOnSplit); + StatusCode status = response->status(); + delete request; + delete response; + + TabletNodePtr node; + std::string server_addr = tablet->GetServerAddr(); + + if (failed || status != kTabletNodeOk) { + if (m_tabletnode_manager->FindTabletNode(server_addr, &node) && + (node->m_uuid == tablet->GetServerId())) { + // TS alive, delay split rpc re-send + boost::function closure = + boost::bind(&MasterImpl::DelayRetrySplitTabletAsync, this, tablet, retry_times); + m_thread_pool->DelayTask(FLAGS_tera_master_control_tabletnode_retry_period, closure); + return ; + } + + // schedule tablet on other TS + SplitTablet(tablet, 1); + return ; + } + + VLOG(20) << tablet->GetPath() << ", ServerAddr " << server_addr + << ", ServerId " << tablet->GetServerId(); + // ts split rpc success + if (m_tabletnode_manager->FindTabletNode(server_addr, &node) && + (node->m_uuid == tablet->GetServerId())) { + node->FinishSplit(tablet); + + // reschedule next wait split rpc on this node + TabletPtr next_tablet; + while (node->SplitNextWaitTablet(&next_tablet)) { + if (next_tablet->GetStatus() != kTableOnSplitWait) { + LOG(WARNING) << "tablet status Change, while wait split: " << next_tablet + << ", node addr " << node->GetAddr() << ", uuid " << node->GetId(); + } + if (next_tablet->SetStatusIf(kTableOnSplit, kTableOnSplitWait)) { + TablePtr null_table; + std::vector tablets(1, next_tablet); + WriteClosure* done = + NewClosure(this, &MasterImpl::SplitTabletWriteLogCallback, + next_tablet, 0); + BatchWriteMetaTableAsync(null_table, tablets, false, done); + break; + } + node->FinishSplit(next_tablet); + LOG(WARNING) << "next tablet resched when wait: " << next_tablet; + } + } + + TabletOpLog log; + tablet->GetTabletOpLog(&log); + VLOG(20) << tablet->GetTableName() + << ", keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() + << ", mid_key " << log.mid_key() + << ", lchild " << log.lchild_tablet() + << ", rchild " << log.rchild_tablet(); + + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 61); + SplitTablet(tablet, 3); + return; +} + +void MasterImpl::DelayRetrySplitTabletAsync(TabletPtr delay_tablet, + int32_t delay_retry_times) +{ + LOG(INFO) << __func__ << ", " << delay_tablet->GetPath(); + SplitClosure* done = + NewClosure(this, &MasterImpl::SplitTabletCallback, + delay_tablet, delay_retry_times + 1); + SplitTabletAsync(delay_tablet, done); + + // when split fail too many, kick + if ((delay_retry_times + 1) > FLAGS_tera_master_impl_retry_times) { + TryKickTabletNode(delay_tablet->GetServerAddr()); + } + return; +} + +void MasterImpl::SplitTabletUpdateMetaAsync(TabletPtr tablet) { + // construct tablet meta + TabletOpLog log; + CHECK(tablet->GetTabletOpLog(&log)); + TabletMeta lchild_meta, rchild_meta, parent_meta; + + tablet->ToMeta(&parent_meta); + + tablet->ToMeta(&lchild_meta); + tablet->ToMeta(&rchild_meta); + + lchild_meta.mutable_key_range()->set_key_end(log.mid_key()); + rchild_meta.mutable_key_range()->set_key_start(log.mid_key()); + + lchild_meta.set_path(leveldb::GetChildTabletPath(parent_meta.path(), log.lchild_tablet())); + rchild_meta.set_path(leveldb::GetChildTabletPath(parent_meta.path(), log.rchild_tablet())); + + // set new tablet's state to ready + lchild_meta.set_status(kTableReady); + rchild_meta.set_status(kTableReady); + + lchild_meta.set_table_size(parent_meta.table_size() / 2); + rchild_meta.set_table_size(parent_meta.table_size() / 2); + + lchild_meta.clear_parent_tablets(); + rchild_meta.clear_parent_tablets(); + for (int i = 0; i < parent_meta.lg_size_size(); i++) { + lchild_meta.set_lg_size(i, parent_meta.lg_size(i) / 2); + rchild_meta.set_lg_size(i, parent_meta.lg_size(i) / 2); + } + lchild_meta.clear_log(); + rchild_meta.clear_log(); + + // clear log, add new tablet meta + TabletPtr lchild_tablet; + TabletPtr rchild_tablet; + lchild_tablet.reset(new Tablet(lchild_meta, tablet->GetTable())); + rchild_tablet.reset(new Tablet(rchild_meta, tablet->GetTable())); + + lchild_tablet->SetAddr(tablet->GetServerAddr()); + lchild_tablet->SetServerId(tablet->GetServerId()); + rchild_tablet->SetAddr(tablet->GetServerAddr()); + rchild_tablet->SetServerId(tablet->GetServerId()); + + std::vector tablets; + tablets.clear(); + //tablets.push_back(tablet); + tablets.push_back(lchild_tablet); + tablets.push_back(rchild_tablet); + + VLOG(20) << tablet->GetTableName() + << ", parent: keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() + << ", lchild: keystart " << tablets[0]->GetKeyStart() + << ", keyend " << tablets[0]->GetKeyEnd() + << ", rchild: keystart " << tablets[1]->GetKeyStart() + << ", keyend " << tablets[1]->GetKeyEnd(); + + TablePtr null_table; + // construct write meta request + WriteClosure* done = NewClosure(this, &MasterImpl::SplitTabletUpdateMetaCallback, + null_table, tablets, tablet, 0); + BatchWriteMetaTableAsync(null_table, tablets, false, done); +} + +void MasterImpl::SplitTabletUpdateMetaCallback(TablePtr null_table, + std::vector tablets, + TabletPtr tablet, // parent tablet + int32_t retry_times, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int ErrCode) { + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + + // no need care free lchild, rchild tablet + CHECK(tablets.size() == 2); + TabletPtr lchild_tablet = tablets[0]; + TabletPtr rchild_tablet = tablets[1]; + CHECK(tablet->GetStatus() == kTableOnSplit); + + // handle fail, retry + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(ERROR) << " rpc ErrCode " + << sofa::pbrpc::RpcErrorCodeToString(ErrCode) << ", " + << tablet; + } else { + LOG(ERROR) << "meta TS ErrCode " + << StatusCodeToString(status) << ", " << tablet; + } + // retry unlimited, suspend write meta request + WriteClosure *done = + NewClosure(this, &MasterImpl::SplitTabletUpdateMetaCallback, + null_table, tablets, tablet, retry_times + 1); + SuspendMetaOperation(null_table, tablets, false, done); + return; + } + + // handle success, enable new tablet + tablets.clear(); + VLOG(20) << "split " << tablet->GetTableName() << " success" + << ", parent: keystart " << tablet->GetKeyStart() + << ", keyend " << tablet->GetKeyEnd() + << ", lchild: keystart " << lchild_tablet->GetKeyStart() + << ", keyend " << lchild_tablet->GetKeyEnd() + << ", rchild: keystart " << rchild_tablet->GetKeyStart() + << ", keyend " << rchild_tablet->GetKeyEnd(); + + CHECK(m_tablet_manager->SplitTablet(tablet, lchild_tablet, rchild_tablet)); + + DebugTeraMasterCrashOrSuspend(DEBUG_master_split_crash_or_suspend, 71); + ProcessReadyTablet(lchild_tablet); + ProcessReadyTablet(rchild_tablet); + return; +} + bool MasterImpl::TryMergeTablet(TabletPtr tablet) { MutexLock lock(&m_tablet_mutex); const std::string& server_addr = tablet->GetServerAddr(); @@ -4105,121 +4400,6 @@ void MasterImpl::ScanMetaTableAsync(const std::string& table_name, meta_node_client.ScanTablet(request, response, done); } -void MasterImpl::ScanMetaCallbackForSplit(TabletPtr tablet, - ScanTabletRequest* request, - ScanTabletResponse* response, - bool failed, int error_code) { - CHECK(tablet->GetStatus() == kTableOnSplit); - delete request; - - if (failed || response->status() != kTabletNodeOk) { - if (failed) { - LOG(ERROR) << "fail to scan meta table: " - << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " << tablet; - } else { - LOG(ERROR) << "fail to scan meta table: " - << StatusCodeToString(response->status()) << ", " << tablet; - } - ScanClosure* done = - NewClosure(this, &MasterImpl::ScanMetaCallbackForSplit, tablet); - SuspendMetaOperation(tablet->GetTableName(), tablet->GetKeyStart(), - tablet->GetKeyEnd(), done); - delete response; - return; - } - - uint32_t record_size = response->results().key_values_size(); - VLOG(5) << "scan meta table: " << record_size << " records"; - if (record_size > 2 || record_size == 0) { - LOG(ERROR) << kSms << "split into " << record_size << " pieces, " - << tablet; - // TryKickTabletNode(tablet->GetServerAddr()); - WriteClosure* closure = - NewClosure(this, &MasterImpl::RepairMetaAfterSplitCallback, - tablet, response, FLAGS_tera_master_meta_retry_times); - RepairMetaTableAsync(tablet, response, closure); - return; - } - - std::string server_addr = tablet->GetServerAddr(); - const std::string& key_start = tablet->GetKeyStart(); - const std::string& key_end = tablet->GetKeyEnd(); - - const KeyValuePair& first_record = response->results().key_values(0); - TabletMeta first_meta; - ParseMetaTableKeyValue(first_record.key(), first_record.value(), - &first_meta); - const std::string& first_key_start = first_meta.key_range().key_start(); - const std::string& first_key_end = first_meta.key_range().key_end(); - - if (record_size == 1) { - if (tablet->Verify(first_key_start, first_key_end, first_meta.table_name(), - first_meta.path(), first_meta.server_addr())) { - LOG(WARNING) << "split not complete, " << tablet; - tablet->SetStatus(kTableOffLine); - ProcessOffLineTablet(tablet); - TryLoadTablet(tablet); - delete response; - } else { - LOG(ERROR) << kSms << "split into " << record_size << " pieces, " - << tablet; - WriteClosure* closure = - NewClosure(this, &MasterImpl::RepairMetaAfterSplitCallback, - tablet, response, FLAGS_tera_master_meta_retry_times); - RepairMetaTableAsync(tablet, response, closure); - } - return; - } - - const KeyValuePair& second_record = response->results().key_values(1); - TabletMeta second_meta; - ParseMetaTableKeyValue(second_record.key(), second_record.value(), - &second_meta); - const std::string& second_key_start = second_meta.key_range().key_start(); - const std::string& second_key_end = second_meta.key_range().key_end(); - - if (first_key_start != key_start || first_key_end != second_key_start - || second_key_end != key_end || key_start >= second_key_start - || (!key_end.empty() && key_end <= second_key_start) - || (key_end.empty() && second_key_start.empty())) { - LOG(ERROR) << kSms << "two splits are not successive, " << tablet; - // TryKickTabletNode(tablet->GetServerAddr()); - WriteClosure* closure = - NewClosure(this, &MasterImpl::RepairMetaAfterSplitCallback, - tablet, response, FLAGS_tera_master_meta_retry_times); - RepairMetaTableAsync(tablet, response, closure); - return; - } - TabletPtr first_tablet, second_tablet; - // update second child tablet meta - second_meta.set_status(kTableOffLine); - m_tablet_manager->AddTablet(second_meta, TableSchema(), &second_tablet); - for (int i = 0; i < second_meta.snapshot_list_size(); ++i) { - second_tablet->AddSnapshot(second_meta.snapshot_list(i)); - LOG(INFO) << "second_tablet add snapshot " << second_meta.snapshot_list(i); - } - - // delete old tablet - tablet->SetStatus(kTableDeleted); - m_tablet_manager->DeleteTablet(tablet->GetTableName(), tablet->GetKeyStart()); - - // update first child tablet meta - first_meta.set_status(kTableOffLine); - m_tablet_manager->AddTablet(first_meta, TableSchema(), &first_tablet); - for (int i = 0; i < first_meta.snapshot_list_size(); ++i) { - first_tablet->AddSnapshot(first_meta.snapshot_list(i)); - LOG(INFO) << "first_tablet add snapshot " << first_meta.snapshot_list(i); - } - - LOG(INFO) << "try load child tablets, \nfirst: " << first_meta.ShortDebugString() - << "\nsecond: " << second_meta.ShortDebugString(); - ProcessOffLineTablet(first_tablet); - TryLoadTablet(first_tablet, server_addr); - ProcessOffLineTablet(second_tablet); - TryLoadTablet(second_tablet, server_addr); - delete response; -} - void MasterImpl::RepairMetaTableAsync(TabletPtr tablet, ScanTabletResponse* scan_resp, WriteClosure* done) { @@ -4260,53 +4440,6 @@ void MasterImpl::RepairMetaTableAsync(TabletPtr tablet, meta_node_client.WriteTablet(request, response, done); } -void MasterImpl::RepairMetaAfterSplitCallback(TabletPtr tablet, - ScanTabletResponse* scan_resp, - int32_t retry_times, - WriteTabletRequest* request, - WriteTabletResponse* response, - bool failed, int error_code) { - CHECK(tablet->GetStatus() == kTableOnSplit); - StatusCode status = response->status(); - if (!failed && status == kTabletNodeOk) { - // all the row status should be the same - CHECK_GT(response->row_status_list_size(), 0); - status = response->row_status_list(0); - } - delete request; - delete response; - - if (failed || status != kTabletNodeOk) { - if (failed) { - LOG(ERROR) << "fail to repair meta table: " - << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " << tablet; - } else { - LOG(ERROR) << "fail to repair meta table: " - << StatusCodeToString(response->status()) << ", " << tablet; - } - if (retry_times <= 0) { - LOG(ERROR) << kSms << "abort repair meta, " << tablet; - delete scan_resp; - // we can still repair it at next split - tablet->SetStatusIf(kTableOffLine, kTableOnSplit); - ProcessOffLineTablet(tablet); - TryLoadTablet(tablet); - } else { - WriteClosure* done = - NewClosure(this, &MasterImpl::RepairMetaAfterSplitCallback, - tablet, scan_resp, retry_times - 1); - SuspendMetaOperation(tablet, scan_resp, done); - } - return; - } - LOG(INFO) << "repair meta success, " << tablet; - - delete scan_resp; - tablet->SetStatusIf(kTableOffLine, kTableOnSplit); - ProcessOffLineTablet(tablet); - TryLoadTablet(tablet); -} - void MasterImpl::SuspendMetaOperation(TablePtr table, const std::vector& tablets, bool is_delete, WriteClosure* done) { std::vector meta_entries; diff --git a/src/master/master_impl.h b/src/master/master_impl.h index 37f5ae48e..5db93057d 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -65,7 +65,14 @@ class MasterImpl { MasterImpl(); ~MasterImpl(); - + + // use for debug master's crash + enum debug_tera_master_crash_func_set { + DEBUG_master_split_crash_or_suspend, + }; + inline void DebugTeraMasterCrashOrSuspend( + enum debug_tera_master_crash_func_set debug_func, int64_t phase); + bool Init(); bool Restore(const std::map& tabletnode_list); @@ -199,10 +206,35 @@ class MasterImpl { void RetryLoadTablet(TabletPtr tablet, int32_t retry_times); void RetryUnloadTablet(TabletPtr tablet, int32_t retry_times); - bool TrySplitTablet(TabletPtr tablet); + + bool SplitTablet(TabletPtr tablet, uint32_t phase); + bool RescheduleOnSplitWait(TabletPtr tablet); + void SplitTabletWriteLogCallback(TabletPtr tablet, + int32_t retry_times, + WriteTabletRequest *request, + WriteTabletResponse *response, + bool failed, int ErrCode); + void SplitTabletAsync(TabletPtr tablet, SplitClosure *done); + void SplitTabletCallback(TabletPtr tablet, + int32_t retry_times, + SplitTabletRequest* request, + SplitTabletResponse* response, + bool failed, int error_code); + void DelayRetrySplitTabletAsync(TabletPtr delay_tablet, + int32_t delay_retry_times); + void SplitTabletUpdateMetaAsync(TabletPtr tablet); + void SplitTabletUpdateMetaCallback(TablePtr null_table, + std::vector tablets, + TabletPtr tablet, // parent tablet + int32_t retry_times, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int ErrCode); + bool TryMergeTablet(TabletPtr tablet); void TryMoveTablet(TabletPtr tablet, const std::string& server_addr = ""); + void TryReleaseCache(bool enbaled_debug = false); void ReleaseCacheWrapper(); void EnableReleaseCacheTimer(); @@ -300,11 +332,6 @@ class MasterImpl { sem_t* finish_counter, Mutex* mutex); void RetryQueryNewTabletNode(std::string addr); - void SplitTabletAsync(TabletPtr tablet); - void SplitTabletCallback(TabletPtr tablet, SplitTabletRequest* request, - SplitTabletResponse* response, bool failed, - int error_code); - void MergeTabletAsync(TabletPtr tablet_p1, TabletPtr tablet_p2); void MergeTabletAsyncPhase2(TabletPtr tablet_p1, TabletPtr tablet_p2); void MergeTabletUnloadCallback(TabletPtr tablet, TabletPtr tablet2, Mutex* mutex, @@ -371,21 +398,11 @@ class MasterImpl { const std::string& tablet_key_start, const std::string& tablet_key_end, ScanClosure* done); - void ScanMetaCallbackForSplit(TabletPtr tablet, - ScanTabletRequest* request, - ScanTabletResponse* response, - bool failed, int error_code); - + void RepairMetaTableAsync(TabletPtr tablet, ScanTabletResponse* response, WriteClosure* done); - void RepairMetaAfterSplitCallback(TabletPtr tablet, - ScanTabletResponse* scan_resp, - int32_t retry_times, - WriteTabletRequest* request, - WriteTabletResponse* response, - bool failed, int error_code); - + // load metabale to master memory bool LoadMetaTable(const std::string& meta_tablet_addr, StatusCode* ret_status); @@ -413,7 +430,6 @@ class MasterImpl { void MoveOffLineTablets(const std::vector& tablet_list); double LiveNodeTabletRatio(); void LoadAllDeadNodeTablets(); - void LoadAllOffLineTablets(); void CollectAllTabletInfo(const std::map& tabletnode_list, std::vector* tablet_list); diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 681ce5425..54ff81cb7 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -69,6 +69,56 @@ Tablet::~Tablet() { m_table.reset(); } +bool Tablet::CollectSplitContext(TabletOpLog* log) { + // MutexLock lock(&m_mutex); + m_mutex.Lock(); + + if (!m_counter_list.empty()) { + TabletCounter counter = m_counter_list.back(); + std::string mid_key; + std::string key_start = m_meta.key_range().key_start(); + std::string key_end = m_meta.key_range().key_end(); + + mid_key.clear(); + if (counter.has_mid_key()) { + mid_key = counter.mid_key(); + if ((mid_key <= key_start) || + (key_end.size() > 0 && mid_key >= key_end)) { + mid_key.clear(); + } + } + + if (mid_key.size()) { + m_mutex.Unlock(); + + log->set_type(kSplitLog); + log->set_mid_key(mid_key); + // will get table lock + log->set_lchild_tablet(this->GetTable()->GetNextTabletNo()); + log->set_rchild_tablet(this->GetTable()->GetNextTabletNo()); + return true; + } + } + m_mutex.Unlock(); + return false; +} + +void Tablet::SetTabletOpLog(TabletOpLog& log) +{ + MutexLock lock(&m_mutex); + m_meta.mutable_log()->CopyFrom(log); +} + +bool Tablet::GetTabletOpLog(TabletOpLog* log) +{ + MutexLock lock(&m_mutex); + if (!m_meta.has_log()) { + return false; + } + *log = m_meta.log(); + return log->type() != kNullLog; +} + void Tablet::ToMeta(TabletMeta* meta) { MutexLock lock(&m_mutex); meta->CopyFrom(m_meta); @@ -428,14 +478,14 @@ bool Tablet::CheckStatusSwitch(TabletStatus old_status, } break; case kTableOnSplit: - if (new_status == kTableReady // request rejected - || new_status == kTableOffLine // split fail - || new_status == kTableSplitFail) { // don't know result, wait tabletnode to be killed + if (new_status == kTableReady // request rejected + || new_status == kTableOffLine // split fail + || new_status == kTableOnSplitWait) { // split wait return true; } break; - case kTableSplitFail: - if (new_status == kTableOnSplit) { // tabletnode is killed, ready to scan meta + case kTableOnSplitWait: + if (new_status == kTableOnSplit) { // reschedule split return true; } break; @@ -715,6 +765,100 @@ void TabletManager::Init() { void TabletManager::Stop() { } +bool TabletManager::GetTabletWithOpLog(std::vector* log_tablets) +{ + m_mutex.Lock(); + TableList::iterator it = m_all_tables.begin(); + for (; it != m_all_tables.end(); ++it) { + Table& table = *it->second; + table.m_mutex.Lock(); + + Table::TabletList::iterator it2 = table.m_tablets_list.begin(); + for (; it2 != table.m_tablets_list.end(); ++it2) { + TabletPtr tablet = it2->second; + //tablet->m_mutex.Lock(); + + TabletOpLog log; + if (tablet->GetTabletOpLog(&log)) { + log_tablets->push_back(tablet); + } + + //tablet->m_mutex.Unlock(); + } + table.m_mutex.Unlock(); + } + m_mutex.Unlock(); + return true; +} + +bool TabletManager::RepairWithSplitLog(std::vector& log_tablets, + const TabletMeta& report_tabletmeta, + std::string& node_uuid) +{ + for (uint32_t i = 0; i < log_tablets.size(); i++) { + TabletPtr tablet = log_tablets[i]; + TabletOpLog log; + CHECK(tablet->GetTabletOpLog(&log)); + CHECK(log.type() == kSplitLog); + + + if ((tablet->GetTableName() == report_tabletmeta.table_name()) && + ((tablet->GetKeyStart() == report_tabletmeta.key_range().key_start()) || + (log.mid_key() == report_tabletmeta.key_range().key_start()) + )) { + tablet->SetAddr(report_tabletmeta.server_addr()); + tablet->SetServerId(node_uuid); + LOG(INFO) << __func__ << ", log tablet " << tablet->GetPath() + << ", addr " << tablet->GetServerAddr() + << ", uuid " << tablet->GetServerId(); + return true; + } + } + return false; +} + +bool TabletManager::SplitTablet(TabletPtr parent, + TabletPtr lchild_tablet, + TabletPtr rchild_tablet) +{ + std::string table_name = parent->GetTableName(); + m_mutex.Lock(); + + TableList::iterator it = m_all_tables.find(table_name); + CHECK(it != m_all_tables.end()); + Table& table = *it->second; + + std::string lchild_path = lchild_tablet->GetPath(); + std::string rchild_path = rchild_tablet->GetPath(); + + uint64_t lnum = leveldb::GetTabletNumFromPath(lchild_path); + uint64_t rnum = leveldb::GetTabletNumFromPath(rchild_path); + std::string lstart = lchild_tablet->GetKeyStart(); + std::string rstart = rchild_tablet->GetKeyStart(); + + table.m_mutex.Lock(); + if (table.m_max_tablet_no < lnum || + table.m_max_tablet_no < rnum) { + table.m_max_tablet_no = lnum > rnum ? lnum : rnum; + } + + Table::TabletList::iterator tablet_it = table.m_tablets_list.find(lstart); + CHECK(tablet_it != table.m_tablets_list.end()); + table.m_tablets_list.erase(tablet_it); + + tablet_it = table.m_tablets_list.find(rstart); + CHECK(tablet_it == table.m_tablets_list.end()); + + // lchild and rchild's state is ready + table.m_tablets_list[lstart] = lchild_tablet; + table.m_tablets_list[rstart] = rchild_tablet; + table.m_mutex.Unlock(); + + m_mutex.Unlock(); + + return true; +} + bool TabletManager::AddTable(const std::string& table_name, const TableMeta& meta, TablePtr* table, StatusCode* ret_status) { diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index 941566a02..de95eff4c 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -61,6 +61,10 @@ class Tablet { explicit Tablet(const TabletMeta& meta); Tablet(const TabletMeta& meta, TablePtr table); ~Tablet(); + + bool CollectSplitContext(TabletOpLog* log); + void SetTabletOpLog(TabletOpLog& log); + bool GetTabletOpLog(TabletOpLog* log); void ToMeta(TabletMeta* meta); const std::string& GetTableName(); @@ -206,6 +210,17 @@ class TabletManager { void Init(); void Stop(); + + bool GetTabletWithOpLog(std::vector* log_tablets); + + bool RepairWithSplitLog(std::vector& log_tablets, + const TabletMeta& report_tabletmeta, + std::string& node_uuid); + + bool SplitTablet(TabletPtr parent, TabletPtr lchild_tablet, + TabletPtr rchild_tablet); + + bool LoadMetaTable(const std::string& addr, StatusCode* ret_status = NULL); bool DumpMetaTable(const std::string& addr, StatusCode* ret_status = NULL); bool ClearMetaTable(const std::string& addr, StatusCode* ret_status = NULL); diff --git a/src/master/tabletnode_manager.cc b/src/master/tabletnode_manager.cc index 71b6f7491..bda114908 100644 --- a/src/master/tabletnode_manager.cc +++ b/src/master/tabletnode_manager.cc @@ -178,17 +178,23 @@ bool TabletNode::LoadNextWaitTablet(TabletPtr* tablet) { return true; } +// @TrySplit: if tablet need split, add and only add +// once into waitqueue. bool TabletNode::TrySplit(TabletPtr tablet) { MutexLock lock(&m_mutex); - m_data_size -= tablet->GetDataSize(); -// VLOG(5) << "split on: " << m_addr << ", size: " << tablet->GetDataSize() -// << ", total size: " << m_data_size; + + VLOG(20) << __func__ << ": addr " << m_addr << ", uuid " << m_uuid + << ", sizeof split wait queue " << m_wait_split_list.size() + << ", split counter " << m_onsplit_count; if (m_wait_split_list.empty() && m_onsplit_count < static_cast(FLAGS_tera_master_max_split_concurrency)) { ++m_onsplit_count; return true; } - m_wait_split_list.push_back(tablet); + if (std::find(m_wait_split_list.begin(), m_wait_split_list.end(), tablet) == + m_wait_split_list.end()) { + m_wait_split_list.push_back(tablet); + } return false; } @@ -198,6 +204,8 @@ bool TabletNode::FinishSplit(TabletPtr tablet) { return true; } +// @SplitNextWaitTablet: pop first waiting tablet, +// and add onsplit counter. bool TabletNode::SplitNextWaitTablet(TabletPtr* tablet) { MutexLock lock(&m_mutex); if (m_onsplit_count >= static_cast(FLAGS_tera_master_max_split_concurrency)) { diff --git a/src/master/tabletnode_manager.h b/src/master/tabletnode_manager.h index d57988980..15a19d49c 100644 --- a/src/master/tabletnode_manager.h +++ b/src/master/tabletnode_manager.h @@ -140,7 +140,6 @@ class TabletNodeManager { const std::vector& tablet_candidates, size_t* tablet_index); bool CheckStateSwitch(NodeState old_state, NodeState new_state); - private: mutable Mutex m_mutex; MasterImpl* m_master_impl; diff --git a/src/proto/proto_helper.cc b/src/proto/proto_helper.cc index 54eac8bbe..9db2d6041 100644 --- a/src/proto/proto_helper.cc +++ b/src/proto/proto_helper.cc @@ -151,12 +151,10 @@ std::string StatusCodeToString(int32_t status) { return "kTableOnLoad"; case kTableLoadFail: return "kTableLoadFail"; - case kTableWaitSplit: - return "kTableWaitSplit"; case kTableOnSplit: return "kTableOnSplit"; - case kTableSplitFail: - return "kTableSplitFail"; + case kTableOnSplitWait: + return "kTableOnSplitWait"; case kTableUnLoading: return "kTableUnLoading"; case kTableUnLoadFail: diff --git a/src/proto/status_code.proto b/src/proto/status_code.proto index cf1168cc9..f4292db77 100644 --- a/src/proto/status_code.proto +++ b/src/proto/status_code.proto @@ -34,7 +34,7 @@ enum StatusCode { kTabletOnMerge = 50; kTabletUnLoading = 52; kTabletUnLoading2 = 68; - + kTableNotFound = 45; kTableCorrupt = 46; kTableNotSupport = 47; @@ -53,6 +53,7 @@ enum StatusCode { kTableNotExist = 202; kTableIsBusy = 203; kTableMergeError = 204; + kTabletSplitError = 205; // register kInvalidSequenceId = 304; @@ -112,10 +113,9 @@ enum TabletStatus { kTableWaitLoad = 58; kTableOnLoad = 43; kTableLoadFail = 60; - kTableWaitSplit = 59; kTableOnSplit = 44; kTableSplited = 51; - kTableSplitFail = 61; + kTableOnSplitWait = 59; kTableUnLoading = 52; kTableUnLoading2 = 68; kTableUnLoadFail = 62; @@ -125,6 +125,7 @@ enum TabletStatus { kTabletPending = 65; kTabletOnSnapshot = 66; kTabletDelSnapshot = 67; + kTabletFrozen = 91; } enum TableStatus { diff --git a/src/proto/table_meta.proto b/src/proto/table_meta.proto index a119a6bb2..bc450a468 100644 --- a/src/proto/table_meta.proto +++ b/src/proto/table_meta.proto @@ -55,6 +55,7 @@ message TabletCounter { optional uint32 write_rows = 8; optional uint32 write_kvs = 9; optional uint32 write_size = 10; + optional bytes mid_key = 11; optional bool is_on_busy = 15 [default = false]; } @@ -80,6 +81,23 @@ message TabletMeta { repeated uint64 snapshot_list = 11; repeated uint64 parent_tablets = 12; repeated int64 lg_size = 13; + optional TabletOpLog log = 14; // till now, only use for split +} + +enum LogType { + option allow_alias = true; + kNullLog = 1; + kSplitLog = 10; +} + +message TabletOpLog { + optional LogType type = 1 [default = kNullLog]; + + // for split + optional bytes mid_key = 2; + //optional uint64 mid_size = 3; + optional uint64 lchild_tablet = 4; + optional uint64 rchild_tablet = 5; } message TableMetaList { diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index dfdb06f6e..60ce60d1a 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -255,16 +255,24 @@ message ReadTabletResponse { optional BytesList detail = 4; } +// new split message message SplitTabletRequest { required uint64 sequence_id = 1; - required string tablet_name = 2; - required KeyRange key_range = 3; - optional TabletMeta tablet_meta = 4; - repeated uint64 child_tablets = 5; + required string table_name = 2; // 表名 + required KeyRange key_range = 3; //keyrange + + optional uint64 parent_tablet = 4; + repeated uint64 child_tablets = 5;// 新的tablet号:分裂2个 + required bytes mid_key = 6;//用于分裂的key + + optional string session_id = 7; //uuid,zk产生 + optional TableSchema schema = 8;//用于terakey的解析 + repeated uint64 snapshots_id = 9; + repeated uint64 snapshots_sequence = 10; } message SplitTabletResponse { - required StatusCode status = 1 [default = kTableMergeError]; + required StatusCode status = 1; required uint64 sequence_id = 2; } diff --git a/src/tabletnode/remote_tabletnode.h b/src/tabletnode/remote_tabletnode.h index e0dbb2f98..53019be0b 100644 --- a/src/tabletnode/remote_tabletnode.h +++ b/src/tabletnode/remote_tabletnode.h @@ -120,7 +120,7 @@ class RemoteTabletNode : public TabletNodeServer { const SplitTabletRequest* request, SplitTabletResponse* response, google::protobuf::Closure* done); - + void DoMergeTablet(google::protobuf::RpcController* controller, const MergeTabletRequest* request, MergeTabletResponse* response, diff --git a/src/tabletnode/tablet_manager.cc b/src/tabletnode/tablet_manager.cc index 5330a0ff5..5258c180b 100644 --- a/src/tabletnode/tablet_manager.cc +++ b/src/tabletnode/tablet_manager.cc @@ -22,6 +22,88 @@ TabletManager::TabletManager() {} TabletManager::~TabletManager() {} +// @TestSplitTablet: test whether split finish +// +// @return: return true if split success +bool TabletManager::TestSplitTablet(const std::string& table_name, + const std::string& start_key, + const std::string& mid_key, + const std::string& end_key) { + TabletRange parent(table_name, start_key, end_key); + TabletRange lchild(table_name, start_key, mid_key); + TabletRange rchild(table_name, mid_key, end_key); + + MutexLock lock(&m_mutex); + + std::map::iterator it; + it = m_tablet_list.find(rchild); + if (it != m_tablet_list.end()) { + // get right child + LOG(INFO) << __func__ << ", step into phase 2, " << table_name << ", start " + << start_key << ", mid " << mid_key << ", end " << end_key; + it = m_tablet_list.find(lchild); + CHECK(it != m_tablet_list.end()); + return true; + } + VLOG(20) << __func__ << ", split not finish, " << table_name << ", start " + << start_key << ", mid " << mid_key << ", end " << end_key; + return false; +} + +// @SplitTabletIO: delete parent tabletio, add left && right child +bool TabletManager::SplitTabletIO(const std::string& table_name, + const std::string& key_start, + const std::string& mid_key, + const std::string& key_end, + io::TabletIO *parent_tabletIO, + io::TabletIO *left_tabletIO, + io::TabletIO *right_tabletIO) +{ + TabletRange parent(table_name, key_start, key_end); + TabletRange lchild(table_name, key_start, mid_key); + TabletRange rchild(table_name, mid_key, key_end); + + MutexLock lock(&m_mutex); + std::map::iterator it; + it = m_tablet_list.find(parent); + if (it != m_tablet_list.end()) { + m_tablet_list.erase(it); + parent_tabletIO->DecRef(); + + LOG(INFO) << __func__ << ": " << table_name << ", start " << key_start + << ", mid_key " << mid_key << ", end " << key_end; + + m_tablet_list.insert(std::pair(lchild, left_tabletIO)); + m_tablet_list.insert(std::pair(rchild, right_tabletIO)); + left_tabletIO->AddRef(); + right_tabletIO->AddRef(); + return true; + } + return false; +} + +// @AddTablet: add tabletIO into tabletmanager +bool TabletManager::AddTablet(const std::string& table_name, + const std::string& key_start, + const std::string& key_end, + io::TabletIO *tablet_io) +{ + MutexLock lock(&m_mutex); + + TabletRange range(table_name, key_start, key_end); + std::map::iterator it = + m_tablet_list.find(range); + if (it != m_tablet_list.end()) { + LOG(INFO) << "tablet exist: " << table_name << ", " << key_start + << ", old endkey " << it->first.key_end << ", new endkey " + << key_end; + return false; + } + m_tablet_list.insert(std::pair(range, tablet_io)); + tablet_io->AddRef(); + return true; +} + bool TabletManager::AddTablet(const std::string& table_name, const std::string& table_path, const std::string& key_start, diff --git a/src/tabletnode/tablet_manager.h b/src/tabletnode/tablet_manager.h index 5f21fbef7..e35d1ac77 100644 --- a/src/tabletnode/tablet_manager.h +++ b/src/tabletnode/tablet_manager.h @@ -48,6 +48,24 @@ class TabletManager { TabletManager(); virtual ~TabletManager(); + virtual bool TestSplitTablet(const std::string& table_name, + const std::string& start_key, + const std::string& mid_key, + const std::string& end_key); + + virtual bool SplitTabletIO(const std::string& table_name, + const std::string& key_start, + const std::string& mid_key, + const std::string& key_end, + io::TabletIO *parent_tabletIO, + io::TabletIO *left_tabletIO, + io::TabletIO *right_tabletIO); + + virtual bool AddTablet(const std::string& table_name, + const std::string& key_start, + const std::string& key_end, + io::TabletIO *tablet_io); + virtual bool AddTablet(const std::string& table_name, const std::string& table_path, const std::string& key_start, const std::string& key_end, io::TabletIO** tablet_io, StatusCode* status = NULL); diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index f5853d0ed..3f143ed73 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -33,6 +33,9 @@ #include "utils/string_util.h" #include "utils/timer.h" #include "utils/utils_cmd.h" +#include "leveldb/util/string_ext.h" + +DECLARE_int64(debug_tera_ts_split_phase_crash); DECLARE_string(tera_tabletnode_port); DECLARE_int64(tera_heartbeat_period); @@ -140,6 +143,31 @@ TabletNodeImpl::~TabletNodeImpl() { } } +inline void TabletNodeImpl::DebugTeraTabletServerCrashOrSuspend( + enum debug_tera_ts_crash_func_set debug_func, int64_t phase) +{ +#ifndef DNDEBUG + LOG(INFO) << FLAGS_debug_tera_ts_split_phase_crash << ", phase " << phase; + switch (debug_func) { + case DEBUG_ts_split_crash_or_suspend: + { + if (FLAGS_debug_tera_ts_split_phase_crash != 20 && + FLAGS_debug_tera_ts_split_phase_crash != 25 && + FLAGS_debug_tera_ts_split_phase_crash != 30 && + FLAGS_debug_tera_ts_split_phase_crash != 35 && + FLAGS_debug_tera_ts_split_phase_crash != 40 && + FLAGS_debug_tera_ts_split_phase_crash != 45 && + FLAGS_debug_tera_ts_split_phase_crash != 50) { + break; + } + CHECK(FLAGS_debug_tera_ts_split_phase_crash != phase); + break; + } + default:; + } +#endif +} + bool TabletNodeImpl::Init() { if (FLAGS_tera_zk_enabled) { m_zk_adapter.reset(new TabletNodeZkAdapter(this, m_local_addr)); @@ -651,76 +679,262 @@ void TabletNodeImpl::SplitTablet(const SplitTabletRequest* request, SplitTabletResponse* response, google::protobuf::Closure* done) { response->set_sequence_id(request->sequence_id()); - - std::string split_key, path; + + int stage = 1; StatusCode status = kTabletNodeOk; - io::TabletIO* tablet_io = m_tablet_manager->GetTablet(request->tablet_name(), - request->key_range().key_start(), - request->key_range().key_end(), - &status); - if (tablet_io == NULL) { - LOG(WARNING) << "split fail to get tablet: " << request->tablet_name() - << " [" << DebugString(request->key_range().key_start()) - << ", " << DebugString(request->key_range().key_end()) - << "], status: " << StatusCodeToString(status); - response->set_status(kKeyNotInRange); + io::TabletIO *parent_tabletio = NULL; + + leveldb::Env *env = io::LeveldbBaseEnv(); + std::string parent_tabletname = leveldb::GetTabletPathFromNum(request->table_name(), + request->parent_tablet()); + std::string left_tabletname = leveldb::GetChildTabletPath(parent_tabletname, + request->child_tablets(0)); + std::string right_tabletname = leveldb::GetChildTabletPath(parent_tabletname, + request->child_tablets(1)); + + VLOG(20) << __func__ << ": parent " << parent_tabletname << ", lchild " + << left_tabletname << ", rchild " << right_tabletname << ", start " + << request->key_range().key_start() << ", end " + << request->key_range().key_end(); + + if (m_tablet_manager->TestSplitTablet(request->table_name(), + request->key_range().key_start(), + request->mid_key(), + request->key_range().key_end())) { + // split finish; + response->set_status(kTabletNodeOk); done->Run(); - return; + return ; + } + + // snapshot relatively + CHECK(request->snapshots_id_size() == request->snapshots_sequence_size()); + std::map snapshots; + for (int i = 0; i < request->snapshots_id_size(); i++) { + snapshots[request->snapshots_id(i)] = request->snapshots_sequence(i); + } + + parent_tabletio = m_tablet_manager->GetTablet(request->table_name(), + request->key_range().key_start(), + request->key_range().key_end(), + &status); + if (parent_tabletio == NULL) { + LOG(INFO) << __func__ << ", " << parent_tabletname + << " not found, start " << request->key_range().key_start() + << ", end " << request->key_range().key_end(); } - if (!tablet_io->Split(&split_key, &status)) { - LOG(ERROR) << "fail to split tablet: " << tablet_io->GetTablePath() - << " [" << DebugString(tablet_io->GetStartKey()) - << ", " << DebugString(tablet_io->GetEndKey()) - << "], status: " << StatusCodeToString(status); - if (status == kTableNotSupport) { - response->set_status(kTableNotSupport); - } else { - response->set_status((StatusCode)tablet_io->GetStatus()); + CHECK(request->child_tablets_size() == 2); + // if parent tablet not in TM + if (parent_tabletio == NULL) { + // test ldb's current file is exit + // 前缀+表名+tablet000+lg[]+current + parent_tabletio = new io::TabletIO(); + CHECK(parent_tabletio != NULL); + + int lg_num = request->schema().locality_groups_size(); + for (int i = 0; i < lg_num; i++) { + // discard delete lg + if (request->schema().locality_groups(i).is_del()) { + LOG(INFO) << "LG[" << i << "] is del"; + continue; + } + std::string left_lgname = FLAGS_tera_tabletnode_path_prefix + "/" + + left_tabletname + "/" + leveldb::Uint64ToString(i); + std::string right_lgname = FLAGS_tera_tabletnode_path_prefix + "/" + + right_tabletname + "/" + leveldb::Uint64ToString(i); + + LOG(INFO) << __func__ << ", left lgname " << left_lgname + << ", right lgname " << right_lgname; + + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 20); + + if (env->FileExists(leveldb::CurrentFileName(left_lgname))) { + // step into split stage 2 + parent_tabletio->SetStatus(io::TabletIO::kFrozen); + stage = 2; + break; + } + if (env->FileExists(leveldb::CurrentFileName(right_lgname))) { + // step into split stage 2 + parent_tabletio->SetStatus(io::TabletIO::kFrozen); + stage = 2; + break; + } + } + + if (stage == 1) { + parent_tabletio->SetStatus(io::TabletIO::kNotInit); + } + + // add into tabletmanager and refcount++ + if (!m_tablet_manager->AddTablet(request->table_name(), + request->key_range().key_start(), + request->key_range().key_end(), + parent_tabletio)) { + // free parent's mem + parent_tabletio->DecRef(); + parent_tabletio = NULL; + // TODO: race condition !! + LOG(WARNING) << "split condition, " << request->table_name() << ", start " + << request->key_range().key_start() << ", end " + << request->key_range().key_end() << ", split stage " << stage; + response->set_status(kTabletSplitError); + done->Run(); + return ; + } + + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 25); + // this thread new parent_tabletio + std::vector null_parent_tablets; + VLOG(20) << __func__ << ", load parent " << parent_tabletname + << ", start " << request->key_range().key_start() + << ", end " << request->key_range().key_end(); + + if (!parent_tabletio->Load(request->schema(), request->key_range().key_start(), + request->key_range().key_end(), parent_tabletname, + null_parent_tablets, snapshots, m_ldb_logger, + m_ldb_block_cache, m_ldb_table_cache, &status)) { + parent_tabletio->DecRef(); + m_tablet_manager->RemoveTablet(request->table_name(), + request->key_range().key_start(), + request->key_range().key_end(), + &status); + response->set_status(kTabletSplitError); + done->Run(); + return ; } - tablet_io->DecRef(); - done->Run(); - return; } - int64_t first_half_size = - tablet_io->GetDataSize(request->key_range().key_start(), split_key); - int64_t second_half_size = - tablet_io->GetDataSize(split_key, request->key_range().key_end()); - LOG(INFO) << "split tablet: " << tablet_io->GetTablePath() - << " [" << DebugString(tablet_io->GetStartKey()) - << ", " << DebugString(tablet_io->GetEndKey()) - << "], split key: " << DebugString(split_key); - if (!tablet_io->Unload(&status)) { - LOG(ERROR) << "fail to unload tablet: " << tablet_io->GetTablePath() - << " [" << DebugString(tablet_io->GetStartKey()) - << ", " << DebugString(tablet_io->GetEndKey()) - << "], status: " << StatusCodeToString(status); - response->set_status((StatusCode)tablet_io->GetStatus()); - tablet_io->DecRef(); + // double check wether parent tablet in TM + CHECK(parent_tabletio); + if (stage == 1) { + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 35); + if (!parent_tabletio->Unload(&status)) { + parent_tabletio->DecRef(); + response->set_status(kTabletSplitError); + done->Run(); + return ; + } + } + + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 40); + // when step into stage 2, no race condition; + // And parent tabletIO has been unload + io::TabletIO *lchild_tabletio = NULL; + io::TabletIO *rchild_tabletio = NULL; + + lchild_tabletio = new io::TabletIO(); + CHECK(lchild_tabletio != NULL); + lchild_tabletio->SetStatus(io::TabletIO::kNotInit); + + rchild_tabletio = new io::TabletIO(); + CHECK(rchild_tabletio != NULL); + rchild_tabletio->SetStatus(io::TabletIO::kNotInit); + + // left && right tabletIO here no need refcount; + // because no one see them except SPLIT func. + std::vector parent_tablets; + parent_tablets.push_back(request->parent_tablet()); + + // load left child + VLOG(20) << __func__ << ": parent " << parent_tablets[0] << ", " + << left_tabletname << ", start " << request->key_range().key_start() + << ", end " << request->mid_key(); + sem_t finish_counter; + sem_init(&finish_counter, 0, 0); + StatusCode lstatus = kTabletNodeOk; + bool lflag = true; + boost::function closure = + boost::bind(&TabletNodeImpl::LoadTabletForSplitAsync, this, lchild_tabletio, + request, 0, parent_tablets, snapshots, + &lstatus, &finish_counter); + m_thread_pool->AddTask(closure); + + // load right child + VLOG(20) << __func__ << ": parent " << parent_tablets[0] << ", " + << right_tabletname << ", start " << request->mid_key() + << ", end " << request->key_range().key_end(); + + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 45); + bool rflag = rchild_tabletio->Load(request->schema(), request->mid_key(), + request->key_range().key_end(), right_tabletname, + parent_tablets, snapshots, m_ldb_logger, + m_ldb_block_cache, m_ldb_table_cache, &status); + + sem_wait(&finish_counter); + sem_destroy(&finish_counter); + lflag = (lstatus == kTabletNodeOk); + + // left or right child fail + if (!lflag || !rflag) { + LOG(WARNING) << __func__ << ", load child fail, lflag " << lflag + << ", rflag " << rflag << ", status " << StatusCodeToString(status); + lchild_tabletio->Unload(&status); + rchild_tabletio->Unload(&status); + + // delete left && right child + lchild_tabletio->DecRef(); + rchild_tabletio->DecRef(); + + // delete parent + parent_tabletio->DecRef(); + m_tablet_manager->RemoveTablet(request->table_name(), + request->key_range().key_start(), + request->key_range().key_end(), + &status); + response->set_status(kTabletSplitError); done->Run(); - return; + return ; } - TableSchema schema; - schema.CopyFrom(tablet_io->GetSchema()); - path = tablet_io->GetTablePath(); - LOG(INFO) << "unload tablet: " << tablet_io->GetTablePath() - << " [" << DebugString(tablet_io->GetStartKey()) - << ", " << DebugString(tablet_io->GetEndKey()) << "]"; - tablet_io->DecRef(); - if (!m_tablet_manager->RemoveTablet(request->tablet_name(), - request->key_range().key_start(), - request->key_range().key_end(), - &status)) { - LOG(ERROR) << "fail to remove tablet: " << request->tablet_name() - << " [" << DebugString(request->key_range().key_start()) - << ", " << DebugString(request->key_range().key_end()) - << "], status: " << StatusCodeToString(status); + // delete parent tabletio, add left && right child in tablet manager; + // split success parent tabletIO will decref. + CHECK(m_tablet_manager->SplitTabletIO(request->table_name(), + request->key_range().key_start(), + request->mid_key(), + request->key_range().key_end(), + parent_tabletio, + lchild_tabletio, + rchild_tabletio)); + // split success, delete parent tabletIO + parent_tabletio->DecRef(); + lchild_tabletio->DecRef(); + rchild_tabletio->DecRef(); + response->set_status(kTabletNodeOk); + done->Run(); + + DebugTeraTabletServerCrashOrSuspend(DEBUG_ts_split_crash_or_suspend, 50); + return; +} + +// @child_index: 0 for left, 1 for right +void TabletNodeImpl::LoadTabletForSplitAsync(io::TabletIO* tabletio, + const SplitTabletRequest* request, + int child_index, + const std::vector parent_tablets, + std::map snapshots, + StatusCode* status, + sem_t* finish_counter) { + SetStatusCode(kTabletNodeOk, status); + std::string parent_tabletname = leveldb::GetTabletPathFromNum(request->table_name(), + request->parent_tablet()); + std::string path = leveldb::GetChildTabletPath(parent_tabletname, + request->child_tablets(child_index)); + std::string key_start; + std::string key_end; + if (child_index == 0) { + key_start = request->key_range().key_start(); + key_end = request->mid_key(); + } else { + key_start = request->mid_key(); + key_end = request->key_range().key_end(); } - UpdateMetaTableAsync(request, response, done, path, split_key, schema, - first_half_size, second_half_size, request->tablet_meta()); + tabletio->Load(request->schema(), key_start, key_end, path, + parent_tablets, snapshots, m_ldb_logger, + m_ldb_block_cache, m_ldb_table_cache, status); + sem_post(finish_counter); } bool TabletNodeImpl::CheckInKeyRange(const KeyList& key_list, @@ -801,96 +1015,6 @@ void TabletNodeImpl::SetRootTabletAddr(const std::string& root_tablet_addr) { m_root_tablet_addr = root_tablet_addr; } -void TabletNodeImpl::UpdateMetaTableAsync(const SplitTabletRequest* rpc_request, - SplitTabletResponse* rpc_response, google::protobuf::Closure* rpc_done, - const std::string& path, const std::string& key_split, - const TableSchema& schema, int64_t first_size, int64_t second_size, - const TabletMeta& meta) { - WriteTabletRequest* request = new WriteTabletRequest; - WriteTabletResponse* response = new WriteTabletResponse; - request->set_sequence_id(m_this_sequence_id++); - request->set_tablet_name(FLAGS_tera_master_meta_table_name); - request->set_is_sync(true); - request->set_is_instant(true); - - TabletMeta tablet_meta; - tablet_meta.CopyFrom(meta); - tablet_meta.set_server_addr(m_local_addr); - tablet_meta.clear_parent_tablets(); - tablet_meta.add_parent_tablets(leveldb::GetTabletNumFromPath(path)); - - std::string meta_key, meta_value; - VLOG(5) << "update meta for split tablet: " << path - << " [" << DebugString(rpc_request->key_range().key_start()) - << ", " << DebugString(rpc_request->key_range().key_end()) << "]"; - - CHECK(2 == rpc_request->child_tablets_size()); - // first write 2nd half - tablet_meta.set_path(leveldb::GetChildTabletPath(path, rpc_request->child_tablets(0))); - tablet_meta.set_table_size(second_size); - tablet_meta.mutable_key_range()->set_key_start(key_split); - tablet_meta.mutable_key_range()->set_key_end(rpc_request->key_range().key_end()); - MakeMetaTableKeyValue(tablet_meta, &meta_key, &meta_value); - RowMutationSequence* mu_seq = request->add_row_list(); - mu_seq->set_row_key(meta_key); - Mutation* mutation = mu_seq->add_mutation_sequence(); - mutation->set_type(kPut); - mutation->set_value(meta_value); - VLOG(5) << "write meta: key [" << DebugString(meta_key) - << "], value_size: " << meta_value.size(); - - // then write 1st half - // update root_tablet_addr in fake zk mode - if (!FLAGS_tera_zk_enabled) { - m_zk_adapter->GetRootTableAddr(&m_root_tablet_addr); - } - TabletNodeClient meta_tablet_client(m_root_tablet_addr); - - tablet_meta.set_path(leveldb::GetChildTabletPath(path, rpc_request->child_tablets(1))); - tablet_meta.set_table_size(first_size); - tablet_meta.mutable_key_range()->set_key_start(rpc_request->key_range().key_start()); - tablet_meta.mutable_key_range()->set_key_end(key_split); - MakeMetaTableKeyValue(tablet_meta, &meta_key, &meta_value); - mu_seq = request->add_row_list(); - mu_seq->set_row_key(meta_key); - mutation = mu_seq->add_mutation_sequence(); - mutation->set_type(kPut); - mutation->set_value(meta_value); - VLOG(5) << "write meta: key [" << DebugString(meta_key) - << "], value_size: " << meta_value.size(); - - Closure* done = - NewClosure(this, &TabletNodeImpl::UpdateMetaTableCallback, rpc_request, - rpc_response, rpc_done); - meta_tablet_client.WriteTablet(request, response, done); -} - - -void TabletNodeImpl::UpdateMetaTableCallback(const SplitTabletRequest* rpc_request, - SplitTabletResponse* rpc_response, google::protobuf::Closure* rpc_done, - WriteTabletRequest* request, WriteTabletResponse* response, bool failed, - int error_code) { - if (failed) { - rpc_response->set_status(kMetaTabletError); - } else if (response->status() != kTabletNodeOk) { - LOG(ERROR) << "fail to update meta for tablet: " - << request->tablet_name() << " [" - << DebugString(rpc_request->key_range().key_start()) - << ", " << DebugString(rpc_request->key_range().key_end()) - << "], status: " << StatusCodeToString(response->status()); - rpc_response->set_status(kMetaTabletError); - } else { - LOG(INFO) << "split tablet success: " << rpc_request->tablet_name() - << " [" << DebugString(rpc_request->key_range().key_start()) - << ", " << DebugString(rpc_request->key_range().key_end()) << "]"; - rpc_response->set_status(kTabletNodeOk); - } - - delete request; - delete response; - rpc_done->Run(); -} - /* * all cached tablets/files: * ------------------------------------------ diff --git a/src/tabletnode/tabletnode_impl.h b/src/tabletnode/tabletnode_impl.h index 71f6138ca..c96d65b40 100644 --- a/src/tabletnode/tabletnode_impl.h +++ b/src/tabletnode/tabletnode_impl.h @@ -6,6 +6,7 @@ #define TERA_TABLETNODE_TABLETNODE_IMPL_H_ #include +#include #include "common/base/scoped_ptr.h" #include "common/thread_pool.h" @@ -37,6 +38,13 @@ class TabletNodeImpl { TabletNodeImpl(const TabletNodeInfo& tabletnode_info, TabletManager* tablet_manager = NULL); ~TabletNodeImpl(); + + // use for debug ts's crash + enum debug_tera_ts_crash_func_set { + DEBUG_ts_split_crash_or_suspend, + }; + inline void DebugTeraTabletServerCrashOrSuspend( + enum debug_tera_ts_crash_func_set debug_func, int64_t phase); bool Init(); @@ -88,7 +96,7 @@ class TabletNodeImpl { void SplitTablet(const SplitTabletRequest* request, SplitTabletResponse* response, google::protobuf::Closure* done); - + void EnterSafeMode(); void LeaveSafeMode(); void ExitService(); @@ -109,7 +117,7 @@ class TabletNodeImpl { void TryReleaseMallocCache(); -private: +private: bool CheckInKeyRange(const KeyList& key_list, const std::string& key_start, const std::string& key_end); @@ -122,18 +130,16 @@ class TabletNodeImpl { bool CheckInKeyRange(const RowReaderList& reader_list, const std::string& key_start, const std::string& key_end); - - void UpdateMetaTableAsync(const SplitTabletRequest* request, - SplitTabletResponse* response, google::protobuf::Closure* done, - const std::string& path, const std::string& key_split, - const TableSchema& schema, int64_t first_size, int64_t second_size, - const TabletMeta& meta); - void UpdateMetaTableCallback(const SplitTabletRequest* rpc_request, - SplitTabletResponse* rpc_response, google::protobuf::Closure* rpc_done, - WriteTabletRequest* request, WriteTabletResponse* response, - bool failed, int error_code); - + void InitCacheSystem(); + + void LoadTabletForSplitAsync(io::TabletIO* tabletio, + const SplitTabletRequest* request, + int child_index, + const std::vector parent_tablets, + std::map snapshots, + StatusCode* status, + sem_t* finish_counter); void ReleaseMallocCache(); void EnableReleaseMallocCacheTimer(int32_t expand_factor = 1); diff --git a/src/tera_flags.cc b/src/tera_flags.cc index 5f2de978c..6e322554b 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -101,6 +101,9 @@ DEFINE_string(tera_master_meta_table_name, "meta_table", "the meta table name"); DEFINE_string(tera_master_meta_table_path, "meta", "the path of meta table"); DEFINE_int64(tera_master_split_tablet_size, 512, "the size (in MB) of tablet to trigger split"); +DEFINE_int64(debug_tera_master_split_phase_crash, 0, "debug master crash when trigger split: set {50, 51, 60, 61, 70, 71} will crash"); +DEFINE_int64(debug_tera_ts_split_phase_crash, 0, "debug ts crash when trigger split: set {20,25,30,35,40,45,50} will crash"); + DEFINE_bool(tera_master_merge_enabled, false, "enable the auto-merge tablet"); DEFINE_int64(tera_master_merge_tablet_size, 0, "the size (in MB) of tablet to trigger merge"); DEFINE_string(tera_master_gc_strategy, "default", "gc strategy, [default, incremental]");