Skip to content
109 changes: 57 additions & 52 deletions src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ bool TabletIO::Load(const TableSchema& schema,
StatusCode* status) {
{
MutexLock lock(&m_mutex);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加上DebugString吧,防止乱码

if (m_status == kReady && m_start_key == key_start
&& m_end_key == key_end) {
return true;
Expand Down Expand Up @@ -258,15 +259,22 @@ bool TabletIO::Load(const TableSchema& schema,
m_status = kReady;
m_db_ref_count--;
}


// TODO: test
timeval mtime;
gettimeofday(&mtime, NULL);
VLOG(20) << __func__ << ", Avail, " << m_tablet_path
<< ", " << DebugString(m_start_key) << ", " << DebugString(m_end_key)
<< ", tv_sec " << mtime.tv_sec << ", tv_usec " << mtime.tv_usec;

LOG(INFO) << "[Load] Load " << m_tablet_path << " done";
return true;
}

bool TabletIO::Unload(StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kSplited) {
if (m_status != kReady) {
SetStatusCode(m_status, status);
return false;
}
Expand All @@ -281,7 +289,14 @@ bool TabletIO::Unload(StatusCode* status) {
MutexLock lock(&m_mutex);
m_status = kUnLoading2;
}


// TODO: test
timeval mtime;
gettimeofday(&mtime, NULL);
VLOG(20) << __func__ << ", NotAvail, " << m_tablet_path
<< ", " << DebugString(m_start_key) << ", " << DebugString(m_end_key)
<< ", tv_sec " << mtime.tv_sec << ", tv_usec " << mtime.tv_usec;

uint32_t retry = 0;
while (m_db_ref_count > 1) {
LOG(ERROR) << "tablet is busy, db ref: " << m_db_ref_count
Expand Down Expand Up @@ -314,83 +329,62 @@ bool TabletIO::Unload(StatusCode* status) {

{
MutexLock lock(&m_mutex);
m_status = kNotInit;
m_status = kFrozen;
m_db_ref_count--;
}
return true;
}

bool TabletIO::Split(std::string* split_key, StatusCode* status) {
bool TabletIO::GetMidKey(std::string* mid_key, StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady) {
if (m_status != kReady &&
m_status != kUnLoading) {
SetStatusCode(m_status, status);
return false;
}
if (m_compact_status == kTableOnCompact) {
SetStatusCode(kTableNotSupport, status);
return false;
}
m_status = kOnSplit;
m_db_ref_count++;
}

int64_t table_size = GetDataSize(NULL, status);
if (table_size <= 0) {
SetStatusCode(kTableNotSupport, status);
MutexLock lock(&m_mutex);
m_status = kReady;
m_db_ref_count--;
return false;
}

// find mid key
std::string raw_split_key;
if (!m_db->FindSplitKey(m_raw_start_key, m_raw_end_key, 0.5,
&raw_split_key)) {
VLOG(5) << "fail to find split key";
SetStatusCode(kTableNotSupport, status);
MutexLock lock(&m_mutex);
m_status = kReady;
SetStatusCode(kTableNotSupport, status);
m_db_ref_count--;
return false;
}


// parse key
leveldb::Slice key_split;

if (m_kv_only && m_table_schema.raw_key() == Readable) {
key_split = raw_split_key;
} else { // Table && TTL-KV
} else {
leveldb::Slice cf_split;
leveldb::Slice qu_split;
if (!m_key_operator->ExtractTeraKey(raw_split_key, &key_split,
&cf_split, &qu_split, NULL, NULL)) {
VLOG(5) << "fail to extract split key";
SetStatusCode(kTableNotSupport, status);
MutexLock lock(&m_mutex);
m_status = kReady;
m_db_ref_count--;
return false;
}
}

VLOG(5) << "start: [" << DebugString(m_start_key)
<< "], end: [" << DebugString(m_end_key)
<< "], split: [" << DebugString(key_split.ToString()) << "]";

if (key_split.empty() || key_split.ToString() <= m_start_key
|| (!m_end_key.empty() && key_split.ToString() >= m_end_key)) {
// sanity check key
if (key_split.empty() || key_split.ToString() <= m_start_key ||
(!m_end_key.empty() && key_split.ToString() >= m_end_key)) {
SetStatusCode(kTableNotSupport, status);
MutexLock lock(&m_mutex);
m_status = kReady;
m_db_ref_count--;
return false;
}

*split_key = key_split.ToString();


*mid_key = key_split.ToString();
{
MutexLock lock(&m_mutex);
m_status = kSplited;
SetStatusCode(kTabletNodeOk, status);
m_db_ref_count--;
}
return true;
Expand Down Expand Up @@ -444,8 +438,8 @@ int64_t TabletIO::GetDataSize(const std::string& start_key,
StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kOnSplit
&& m_status != kSplited && m_status != kUnLoading) {
if (m_status != kReady &&
m_status != kUnLoading) {
SetStatusCode(m_status, status);
return -1;
}
Expand Down Expand Up @@ -526,8 +520,8 @@ int64_t TabletIO::GetDataSize(std::vector<uint64_t>* lgsize,
StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kOnSplit
&& m_status != kSplited && m_status != kUnLoading) {
if (m_status != kReady &&
m_status != kUnLoading) {
SetStatusCode(m_status, status);
return -1;
}
Expand Down Expand Up @@ -932,9 +926,10 @@ bool TabletIO::ReadCells(const RowReaderInfo& row_reader, RowResult* value_list,
uint64_t snapshot_id, StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kOnSplit
&& m_status != kSplited && m_status != kUnLoading) {
if (m_status == kUnLoading2) {
if (m_status != kReady &&
m_status != kUnLoading) {
if (m_status == kUnLoading2 ||
m_status == kFrozen) {
// keep compatable for old sdk protocol
// we can remove this in the future.
SetStatusCode(kUnLoading, status);
Expand Down Expand Up @@ -1078,9 +1073,10 @@ bool TabletIO::Write(const WriteTabletRequest* request,
StatusCode* status) {
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kOnSplit
&& m_status != kSplited && m_status != kUnLoading) {
if (m_status == kUnLoading2) {
if (m_status != kReady &&
m_status != kUnLoading) {
if (m_status == kUnLoading2 ||
m_status == kFrozen) {
// keep compatable for old sdk protocol
// we can remove this in the future.
SetStatusCode(kUnLoading, status);
Expand All @@ -1106,9 +1102,10 @@ bool TabletIO::ScanRows(const ScanTabletRequest* request,
StatusCode status = kTabletNodeOk;
{
MutexLock lock(&m_mutex);
if (m_status != kReady && m_status != kOnSplit
&& m_status != kSplited && m_status != kUnLoading) {
if (m_status == kUnLoading2) {
if (m_status != kReady &&
m_status != kUnLoading) {
if (m_status == kUnLoading2 ||
m_status == kFrozen) {
// keep compatable for old sdk protocol
// we can remove this in the future.
SetStatusCode(kUnLoading, &status);
Expand Down Expand Up @@ -1738,6 +1735,12 @@ const leveldb::RawKeyOperator* TabletIO::GetRawKeyOperator() {
}

void TabletIO::GetAndClearCounter(TabletCounter* counter, int64_t interval) {
std::string mid_key;
StatusCode status = kTabletNodeOk;
if (!GetMidKey(&mid_key, &status)) {
mid_key.clear();
}

counter->set_low_read_cell(m_counter.low_read_cell.Clear() * 1000000 / interval);
counter->set_scan_rows(m_counter.scan_rows.Clear() * 1000000 / interval);
counter->set_scan_kvs(m_counter.scan_kvs.Clear() * 1000000 / interval);
Expand All @@ -1749,6 +1752,8 @@ void TabletIO::GetAndClearCounter(TabletCounter* counter, int64_t interval) {
counter->set_write_kvs(m_counter.write_kvs.Clear() * 1000000 / interval);
counter->set_write_size(m_counter.write_size.Clear() * 1000000 / interval);
counter->set_is_on_busy(IsBusy());

counter->set_mid_key(mid_key);
}

int32_t TabletIO::AddRef() {
Expand Down
9 changes: 5 additions & 4 deletions src/io/tablet_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ class TabletIO {
kNotInit = kTabletNotInit,
kReady = kTabletReady,
kOnLoad = kTabletOnLoad,
kOnSplit = kTabletOnSplit,
kSplited = kTabletSplited,
kUnLoading = kTabletUnLoading,
kUnLoading2 = kTabletUnLoading2
kUnLoading2 = kTabletUnLoading2,
kFrozen = kTabletFrozen,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kUnLoading2 kFrozen能加个注释不,分别代表什么状态

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kunloading2是原代码的状态,tablietio进行该卸载第二阶段,需要阻塞读写;kfrozen状态是卸载结束后tabletio进入的状态,表示该区间目前不操作;

};
typedef std::map< std::string, std::set<std::string> > ColumnFamilyMap;
struct ScanOptions {
Expand Down Expand Up @@ -96,7 +95,9 @@ class TabletIO {
leveldb::TableCache* table_cache = NULL,
StatusCode* status = NULL);
virtual bool Unload(StatusCode* status = NULL);
virtual bool Split(std::string* split_key, StatusCode* status = NULL);

virtual bool GetMidKey(std::string* mid_key, StatusCode* status);

virtual bool Compact(StatusCode* status = NULL);
bool CompactMinor(StatusCode* status = NULL);
bool Destroy(StatusCode* status = NULL);
Expand Down
3 changes: 3 additions & 0 deletions src/leveldb/db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ bool Version::FindSplitKey(const Slice* smallest_user_key,
split_size += files_[step_level][now_pos[step_level]]->file_size;
now_pos[step_level] ++;
}
if (largest_file == NULL) {
return false;
}
*split_key = largest_file->largest.user_key().ToString();
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/leveldb/util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ class PosixEnv : public Env {
close(fd);
locks_.Remove(fname);
} else {
//fprintf(stderr, "%s, lock %s\n", __func__, fname.c_str());
PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd;
my_lock->name_ = fname;
Expand All @@ -756,6 +757,7 @@ class PosixEnv : public Env {
if (LockOrUnlock(my_lock->fd_, false) == -1) {
result = IOError("unlock", errno);
}
//fprintf(stderr, "%s, unlock %s\n", __func__, my_lock->name_.c_str());
locks_.Remove(my_lock->name_);
close(my_lock->fd_);
delete my_lock;
Expand Down
Loading