Skip to content

Commit

Permalink
refactor(cache): Clean up redundant code (facebookincubator#12024)
Browse files Browse the repository at this point in the history
Summary:

Refactor read checkpoint code and consolidate duplicate file clean up.

Reviewed By: xiaoxmeng

Differential Revision: D67868731
  • Loading branch information
zacw7 authored and facebook-github-bot committed Jan 13, 2025
1 parent 022cd87 commit 02e683f
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 111 deletions.
1 change: 1 addition & 0 deletions velox/buffer/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target_link_libraries(
velox_memory
velox_buffer
velox_test_util
velox_file_test_utils
GTest::gtest
GTest::gtest_main
GTest::gmock
Expand Down
4 changes: 2 additions & 2 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ void registerVeloxMetrics() {
// Total number of SSD evict log file open errors.
DEFINE_METRIC(kMetricSsdCacheOpenLogErrors, facebook::velox::StatType::SUM);

// Total number of errors while deleting SSD checkpoint files.
// Total number of errors while deleting SSD checkpoint/evictlog files.
DEFINE_METRIC(
kMetricSsdCacheDeleteCheckpointErrors, facebook::velox::StatType::SUM);
kMetricSsdCacheMetaFileDeleteErrors, facebook::velox::StatType::SUM);

// Total number of errors while growing SSD cache files.
DEFINE_METRIC(kMetricSsdCacheGrowFileErrors, facebook::velox::StatType::SUM);
Expand Down
4 changes: 2 additions & 2 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ constexpr folly::StringPiece kMetricSsdCacheOpenCheckpointErrors{
constexpr folly::StringPiece kMetricSsdCacheOpenLogErrors{
"velox.ssd_cache_open_log_errors"};

constexpr folly::StringPiece kMetricSsdCacheDeleteCheckpointErrors{
"velox.ssd_cache_delete_checkpoint_errors"};
constexpr folly::StringPiece kMetricSsdCacheMetaFileDeleteErrors{
"velox.ssd_cache_delete_meta_file_errors"};

constexpr folly::StringPiece kMetricSsdCacheGrowFileErrors{
"velox.ssd_cache_grow_file_errors"};
Expand Down
4 changes: 2 additions & 2 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ void PeriodicStatsReporter::reportCacheStats() {
REPORT_IF_NOT_ZERO(
kMetricSsdCacheOpenLogErrors, deltaSsdStats.openLogErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheDeleteCheckpointErrors,
deltaSsdStats.deleteCheckpointErrors);
kMetricSsdCacheMetaFileDeleteErrors,
deltaSsdStats.deleteMetaFileErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheGrowFileErrors, deltaSsdStats.growFileErrors);
REPORT_IF_NOT_ZERO(
Expand Down
8 changes: 4 additions & 4 deletions velox/common/base/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ add_executable(velox_common_base_benchmarks BitUtilBenchmark.cpp)
target_link_libraries(
velox_common_base_benchmarks
PUBLIC Folly::follybenchmark
PRIVATE velox_common_base Folly::folly)
PRIVATE velox_common_base velox_file_test_utils Folly::folly)

add_executable(velox_common_stringsearch_benchmarks StringSearchBenchmark.cpp)

target_link_libraries(
velox_common_stringsearch_benchmarks
PUBLIC Folly::follybenchmark
PRIVATE velox_common_base Folly::folly)
PRIVATE velox_common_base velox_file_test_utils Folly::folly)

add_executable(velox_common_indexed_priority_queue_benchmark
IndexedPriorityQueueBenchmark.cpp)

target_link_libraries(
velox_common_indexed_priority_queue_benchmark
PUBLIC Folly::follybenchmark
PRIVATE velox_common_base Folly::folly)
PRIVATE velox_common_base velox_file_test_utils Folly::folly)

add_executable(velox_common_sorting_network_benchmark
SortingNetworkBenchmark.cpp)

target_link_libraries(
velox_common_sorting_network_benchmark
PUBLIC Folly::follybenchmark
PRIVATE velox_common_base Folly::folly)
PRIVATE velox_common_base velox_file_test_utils Folly::folly)
2 changes: 2 additions & 0 deletions velox/common/base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ target_link_libraries(
velox_id_map
velox_common_base
velox_memory
velox_file_test_utils
Boost::headers
gflags::gflags
glog::glog
Expand All @@ -81,5 +82,6 @@ target_link_libraries(
velox_common_base
velox_exception
velox_time
velox_file_test_utils
Folly::folly
gflags::gflags)
6 changes: 3 additions & 3 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 0);
Expand Down Expand Up @@ -520,7 +520,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
newSsdStats->openFileErrors = 10;
newSsdStats->openCheckpointErrors = 10;
newSsdStats->openLogErrors = 10;
newSsdStats->deleteCheckpointErrors = 10;
newSsdStats->deleteMetaFileErrors = 10;
newSsdStats->growFileErrors = 10;
newSsdStats->writeSsdErrors = 10;
newSsdStats->writeSsdDropped = 10;
Expand Down Expand Up @@ -570,7 +570,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 1);
Expand Down
121 changes: 49 additions & 72 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream>
#include <numeric>

#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"

DEFINE_bool(ssd_odirect, true, "Use O_DIRECT for SSD cache IO");
DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD");

Expand Down Expand Up @@ -521,7 +517,7 @@ void SsdFile::updateStats(SsdCacheStats& stats) const {
stats.openFileErrors += stats_.openFileErrors;
stats.openCheckpointErrors += stats_.openCheckpointErrors;
stats.openLogErrors += stats_.openLogErrors;
stats.deleteCheckpointErrors += stats_.deleteCheckpointErrors;
stats.deleteMetaFileErrors += stats_.deleteMetaFileErrors;
stats.growFileErrors += stats_.growFileErrors;
stats.writeSsdErrors += stats_.writeSsdErrors;
stats.writeCheckpointErrors += stats_.writeCheckpointErrors;
Expand Down Expand Up @@ -638,57 +634,37 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
}

if (evictLogWriteFile_ != nullptr) {
try {
if (keepLog) {
truncateEvictLogFile();
} else {
deleteEvictLogFile();
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what();
if (keepLog) {
truncateFile(evictLogWriteFile_.get());
} else {
deleteFile(std::move(evictLogWriteFile_));
}
}

if (checkpointWriteFile_ != nullptr) {
deleteCheckpointFile();
deleteFile(std::move(checkpointWriteFile_));
}
}

void SsdFile::truncateEvictLogFile() {
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();
void SsdFile::truncateFile(WriteFile* file) {
VELOX_CHECK_NOT_NULL(file);
file->truncate(0);
file->flush();
}

void SsdFile::truncateCheckpointFile() {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
checkpointWriteFile_->truncate(0);
checkpointWriteFile_->flush();
}

void SsdFile::deleteEvictLogFile() {
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->close();
evictLogWriteFile_.reset();
const auto evictLogFilePath = getEvictLogFilePath();
if (fs_->exists(evictLogFilePath)) {
fs_->remove(evictLogFilePath);
}
}

void SsdFile::deleteCheckpointFile() {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
void SsdFile::deleteFile(std::unique_ptr<WriteFile> file) {
VELOX_CHECK_NOT_NULL(file);
const auto filePath = file->getName();
try {
checkpointWriteFile_->close();
checkpointWriteFile_.reset();
const auto checkpointFilePath = getCheckpointFilePath();
if (fs_->exists(checkpointFilePath)) {
fs_->remove(checkpointFilePath);
file->close();
file.reset();
if (fs_->exists(filePath)) {
fs_->remove(filePath);
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what();
++stats_.deleteMetaFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< fmt::format("Error in deleting file {}: {}", filePath, e.what());
}
}

Expand Down Expand Up @@ -776,7 +752,7 @@ void SsdFile::checkpoint(bool force) {

try {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
truncateCheckpointFile();
truncateFile(checkpointWriteFile_.get());
// The checkpoint state file contains:
// int32_t The 4 bytes of checkpoint version,
// int32_t maxRegions,
Expand Down Expand Up @@ -859,34 +835,22 @@ void SsdFile::initializeCheckpoint() {
return;
}

bool hasCheckpoint = true;
std::unique_ptr<common::FileInputStream> checkpointInputStream;
filesystems::FileOptions writeFileOptions;
writeFileOptions.shouldThrowOnFileAlreadyExists = false;

const auto checkpointPath = getCheckpointFilePath();
const auto checkpointPath = checkpointFilePath();
try {
checkpointWriteFile_ =
fs_->openFileForWrite(checkpointPath, writeFileOptions);

auto checkpointReadFile = fs_->openFileForRead(checkpointPath);
checkpointInputStream = std::make_unique<common::FileInputStream>(
std::move(checkpointReadFile),
1 << 20,
memory::memoryManager()->cachePool());
} catch (std::exception& e) {
hasCheckpoint = false;
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}",
e.what(),
shardId_,
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled",
checkpointPath);
} catch (const std::exception& e) {
++stats_.writeCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << fmt::format(
"Could not initilize checkpoint file {} for writing: {}: ",
checkpointPath,
e.what());
}

const auto logPath = getEvictLogFilePath();
const auto logPath = evictLogFilePath();
try {
evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions);
} catch (std::exception& e) {
Expand All @@ -896,9 +860,7 @@ void SsdFile::initializeCheckpoint() {
}

try {
if (hasCheckpoint) {
readCheckpoint(std::move(checkpointInputStream));
}
readCheckpoint();
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
try {
Expand Down Expand Up @@ -995,15 +957,30 @@ std::vector<T> readVector(common::FileInputStream* stream, int32_t size) {
}
} // namespace

void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
void SsdFile::readCheckpoint() {
const auto checkpointPath = checkpointFilePath();
std::unique_ptr<common::FileInputStream> stream;
try {
auto checkpointReadFile = fs_->openFileForRead(checkpointPath);
stream = std::make_unique<common::FileInputStream>(
std::move(checkpointReadFile),
1 << 20,
memory::memoryManager()->cachePool());
} catch (std::exception& e) {
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(WARNING)
<< fmt::format("Error openning checkpoint file {}: ", e.what());
return;
}

const auto versionMagic = readString(stream.get(), 4);
const auto checkpoinHasChecksum =
isChecksumEnabledOnCheckpointVersion(versionMagic);
if (checksumEnabled_ && !checkpoinHasChecksum) {
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}",
shardId_,
getCheckpointFilePath());
checkpointPath);
return;
}

Expand All @@ -1026,7 +1003,7 @@ void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
idMap[id] = StringIdLease(fileIds(), id, name);
}

const auto logPath = getEvictLogFilePath();
const auto logPath = evictLogFilePath();
const auto evictLogReadFile = fs_->openFileForRead(logPath);
const auto logSize = evictLogReadFile->size();
std::vector<uint32_t> evicted(logSize / sizeof(uint32_t));
Expand Down Expand Up @@ -1106,7 +1083,7 @@ void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
writableRegions_.size(),
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled",
getCheckpointFilePath());
checkpointFilePath());
}

} // namespace facebook::velox::cache
Loading

0 comments on commit 02e683f

Please sign in to comment.