Skip to content

Commit

Permalink
refactor(cache): Clean up redundant code (facebookincubator#12024)
Browse files Browse the repository at this point in the history
Summary:

Refactor read checkpoint code and consolidate duplicate file clean up.

Reviewed By: xiaoxmeng

Differential Revision: D67868731
  • Loading branch information
zacw7 authored and facebook-github-bot committed Jan 14, 2025
1 parent 61e737c commit 066154f
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 107 deletions.
4 changes: 2 additions & 2 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ void registerVeloxMetrics() {
// Total number of SSD evict log file open errors.
DEFINE_METRIC(kMetricSsdCacheOpenLogErrors, facebook::velox::StatType::SUM);

// Total number of errors while deleting SSD checkpoint files.
// Total number of errors while deleting SSD checkpoint/evictlog files.
DEFINE_METRIC(
kMetricSsdCacheDeleteCheckpointErrors, facebook::velox::StatType::SUM);
kMetricSsdCacheMetaFileDeleteErrors, facebook::velox::StatType::SUM);

// Total number of errors while growing SSD cache files.
DEFINE_METRIC(kMetricSsdCacheGrowFileErrors, facebook::velox::StatType::SUM);
Expand Down
4 changes: 2 additions & 2 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ constexpr folly::StringPiece kMetricSsdCacheOpenCheckpointErrors{
constexpr folly::StringPiece kMetricSsdCacheOpenLogErrors{
"velox.ssd_cache_open_log_errors"};

constexpr folly::StringPiece kMetricSsdCacheDeleteCheckpointErrors{
"velox.ssd_cache_delete_checkpoint_errors"};
constexpr folly::StringPiece kMetricSsdCacheMetaFileDeleteErrors{
"velox.ssd_cache_delete_meta_file_errors"};

constexpr folly::StringPiece kMetricSsdCacheGrowFileErrors{
"velox.ssd_cache_grow_file_errors"};
Expand Down
4 changes: 2 additions & 2 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ void PeriodicStatsReporter::reportCacheStats() {
REPORT_IF_NOT_ZERO(
kMetricSsdCacheOpenLogErrors, deltaSsdStats.openLogErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheDeleteCheckpointErrors,
deltaSsdStats.deleteCheckpointErrors);
kMetricSsdCacheMetaFileDeleteErrors,
deltaSsdStats.deleteMetaFileErrors);
REPORT_IF_NOT_ZERO(
kMetricSsdCacheGrowFileErrors, deltaSsdStats.growFileErrors);
REPORT_IF_NOT_ZERO(
Expand Down
6 changes: 3 additions & 3 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 0);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 0);
Expand Down Expand Up @@ -520,7 +520,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
newSsdStats->openFileErrors = 10;
newSsdStats->openCheckpointErrors = 10;
newSsdStats->openLogErrors = 10;
newSsdStats->deleteCheckpointErrors = 10;
newSsdStats->deleteMetaFileErrors = 10;
newSsdStats->growFileErrors = 10;
newSsdStats->writeSsdErrors = 10;
newSsdStats->writeSsdDropped = 10;
Expand Down Expand Up @@ -570,7 +570,7 @@ TEST_F(PeriodicStatsReporterTest, basic) {
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 1);
ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 1);
Expand Down
121 changes: 49 additions & 72 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream>
#include <numeric>

#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"

DEFINE_bool(ssd_odirect, true, "Use O_DIRECT for SSD cache IO");
DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD");

Expand Down Expand Up @@ -521,7 +517,7 @@ void SsdFile::updateStats(SsdCacheStats& stats) const {
stats.openFileErrors += stats_.openFileErrors;
stats.openCheckpointErrors += stats_.openCheckpointErrors;
stats.openLogErrors += stats_.openLogErrors;
stats.deleteCheckpointErrors += stats_.deleteCheckpointErrors;
stats.deleteMetaFileErrors += stats_.deleteMetaFileErrors;
stats.growFileErrors += stats_.growFileErrors;
stats.writeSsdErrors += stats_.writeSsdErrors;
stats.writeCheckpointErrors += stats_.writeCheckpointErrors;
Expand Down Expand Up @@ -638,57 +634,37 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
}

if (evictLogWriteFile_ != nullptr) {
try {
if (keepLog) {
truncateEvictLogFile();
} else {
deleteEvictLogFile();
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what();
if (keepLog) {
truncateFile(evictLogWriteFile_.get());
} else {
deleteFile(std::move(evictLogWriteFile_));
}
}

if (checkpointWriteFile_ != nullptr) {
deleteCheckpointFile();
deleteFile(std::move(checkpointWriteFile_));
}
}

void SsdFile::truncateEvictLogFile() {
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();
void SsdFile::truncateFile(WriteFile* file) {
VELOX_CHECK_NOT_NULL(file);
file->truncate(0);
file->flush();
}

void SsdFile::truncateCheckpointFile() {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
checkpointWriteFile_->truncate(0);
checkpointWriteFile_->flush();
}

void SsdFile::deleteEvictLogFile() {
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->close();
evictLogWriteFile_.reset();
const auto evictLogFilePath = getEvictLogFilePath();
if (fs_->exists(evictLogFilePath)) {
fs_->remove(evictLogFilePath);
}
}

void SsdFile::deleteCheckpointFile() {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
void SsdFile::deleteFile(std::unique_ptr<WriteFile> file) {
VELOX_CHECK_NOT_NULL(file);
const auto filePath = file->getName();
try {
checkpointWriteFile_->close();
checkpointWriteFile_.reset();
const auto checkpointFilePath = getCheckpointFilePath();
if (fs_->exists(checkpointFilePath)) {
fs_->remove(checkpointFilePath);
file->close();
file.reset();
if (fs_->exists(filePath)) {
fs_->remove(filePath);
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what();
++stats_.deleteMetaFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< fmt::format("Error in deleting file {}: {}", filePath, e.what());
}
}

Expand Down Expand Up @@ -776,7 +752,7 @@ void SsdFile::checkpoint(bool force) {

try {
VELOX_CHECK_NOT_NULL(checkpointWriteFile_);
truncateCheckpointFile();
truncateFile(checkpointWriteFile_.get());
// The checkpoint state file contains:
// int32_t The 4 bytes of checkpoint version,
// int32_t maxRegions,
Expand Down Expand Up @@ -859,34 +835,22 @@ void SsdFile::initializeCheckpoint() {
return;
}

bool hasCheckpoint = true;
std::unique_ptr<common::FileInputStream> checkpointInputStream;
filesystems::FileOptions writeFileOptions;
writeFileOptions.shouldThrowOnFileAlreadyExists = false;

const auto checkpointPath = getCheckpointFilePath();
const auto checkpointPath = checkpointFilePath();
try {
checkpointWriteFile_ =
fs_->openFileForWrite(checkpointPath, writeFileOptions);

auto checkpointReadFile = fs_->openFileForRead(checkpointPath);
checkpointInputStream = std::make_unique<common::FileInputStream>(
std::move(checkpointReadFile),
1 << 20,
memory::memoryManager()->cachePool());
} catch (std::exception& e) {
hasCheckpoint = false;
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}",
e.what(),
shardId_,
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled",
checkpointPath);
} catch (const std::exception& e) {
++stats_.writeCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << fmt::format(
"Could not initilize checkpoint file {} for writing: {}: ",
checkpointPath,
e.what());
}

const auto logPath = getEvictLogFilePath();
const auto logPath = evictLogFilePath();
try {
evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions);
} catch (std::exception& e) {
Expand All @@ -896,9 +860,7 @@ void SsdFile::initializeCheckpoint() {
}

try {
if (hasCheckpoint) {
readCheckpoint(std::move(checkpointInputStream));
}
readCheckpoint();
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
try {
Expand Down Expand Up @@ -995,15 +957,30 @@ std::vector<T> readVector(common::FileInputStream* stream, int32_t size) {
}
} // namespace

void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
void SsdFile::readCheckpoint() {
const auto checkpointPath = checkpointFilePath();
std::unique_ptr<common::FileInputStream> stream;
try {
auto checkpointReadFile = fs_->openFileForRead(checkpointPath);
stream = std::make_unique<common::FileInputStream>(
std::move(checkpointReadFile),
1 << 20,
memory::memoryManager()->cachePool());
} catch (std::exception& e) {
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(WARNING)
<< fmt::format("Error openning checkpoint file {}: ", e.what());
return;
}

const auto versionMagic = readString(stream.get(), 4);
const auto checkpoinHasChecksum =
isChecksumEnabledOnCheckpointVersion(versionMagic);
if (checksumEnabled_ && !checkpoinHasChecksum) {
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}",
shardId_,
getCheckpointFilePath());
checkpointPath);
return;
}

Expand All @@ -1026,7 +1003,7 @@ void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
idMap[id] = StringIdLease(fileIds(), id, name);
}

const auto logPath = getEvictLogFilePath();
const auto logPath = evictLogFilePath();
const auto evictLogReadFile = fs_->openFileForRead(logPath);
const auto logSize = evictLogReadFile->size();
std::vector<uint32_t> evicted(logSize / sizeof(uint32_t));
Expand Down Expand Up @@ -1106,7 +1083,7 @@ void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
writableRegions_.size(),
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled",
getCheckpointFilePath());
checkpointFilePath());
}

} // namespace facebook::velox::cache
33 changes: 13 additions & 20 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ struct SsdCacheStats {
openFileErrors = tsanAtomicValue(other.openFileErrors);
openCheckpointErrors = tsanAtomicValue(other.openCheckpointErrors);
openLogErrors = tsanAtomicValue(other.openLogErrors);
deleteCheckpointErrors = tsanAtomicValue(other.deleteCheckpointErrors);
deleteMetaFileErrors = tsanAtomicValue(other.deleteMetaFileErrors);
growFileErrors = tsanAtomicValue(other.growFileErrors);
writeSsdErrors = tsanAtomicValue(other.writeSsdErrors);
writeSsdDropped = tsanAtomicValue(other.writeSsdDropped);
Expand Down Expand Up @@ -192,8 +192,8 @@ struct SsdCacheStats {
result.openCheckpointErrors =
openCheckpointErrors - other.openCheckpointErrors;
result.openLogErrors = openLogErrors - other.openLogErrors;
result.deleteCheckpointErrors =
deleteCheckpointErrors - other.deleteCheckpointErrors;
result.deleteMetaFileErrors =
deleteMetaFileErrors - other.deleteMetaFileErrors;
result.growFileErrors = growFileErrors - other.growFileErrors;
result.writeSsdErrors = writeSsdErrors - other.writeSsdErrors;
result.writeSsdDropped = writeSsdDropped - other.writeSsdDropped;
Expand Down Expand Up @@ -232,7 +232,7 @@ struct SsdCacheStats {
tsan_atomic<uint32_t> openFileErrors{0};
tsan_atomic<uint32_t> openCheckpointErrors{0};
tsan_atomic<uint32_t> openLogErrors{0};
tsan_atomic<uint32_t> deleteCheckpointErrors{0};
tsan_atomic<uint32_t> deleteMetaFileErrors{0};
tsan_atomic<uint32_t> growFileErrors{0};
tsan_atomic<uint32_t> writeSsdErrors{0};
tsan_atomic<uint32_t> writeSsdDropped{0};
Expand Down Expand Up @@ -368,12 +368,12 @@ class SsdFile {
}

/// Returns the eviction log file path.
std::string getEvictLogFilePath() const {
std::string evictLogFilePath() const {
return fileName_ + kLogExtension;
}

/// Returns the checkpoint file path.
std::string getCheckpointFilePath() const {
std::string checkpointFilePath() const {
return fileName_ + kCheckpointExtension;
}

Expand Down Expand Up @@ -440,10 +440,9 @@ class SsdFile {
// Verifies that 'entry' has the data at 'run'.
void verifyWrite(AsyncDataCacheEntry& entry, SsdRun run);

// Reads a checkpoint state file and sets 'this' accordingly if read is
// successful. Return true for successful read. A failed read deletes the
// checkpoint and leaves the log truncated open.
void readCheckpoint(std::unique_ptr<common::FileInputStream> stream);
// Reads a checkpoint file and sets 'this' accordingly if read succeeds. A
// failed read deletes the checkpoint and leaves the truncated log open.
void readCheckpoint();

// Logs an error message, deletes the checkpoint and stop making new
// checkpoints.
Expand Down Expand Up @@ -488,17 +487,11 @@ class SsdFile {
// file system not supporting cow feature.
void disableFileCow();

// Truncates the eviction log file to 0.
void truncateEvictLogFile();
// Truncates the given file to 0.
void truncateFile(WriteFile* file);

// Truncates the checkpoint file to 0.
void truncateCheckpointFile();

// Deletes the eviction log file if it exists.
void deleteEvictLogFile();

// Deletes the checkpoint file if it exists.
void deleteCheckpointFile();
// Deletes the given file if it exists.
void deleteFile(std::unique_ptr<WriteFile> file);

// Allocates 'kCheckpointBufferSize' buffer from cache memory pool for
// checkpointing.
Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/tests/CacheTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class SsdCacheTestHelper {
uint64_t totalEvictionLogFilesSize() {
uint64_t size = 0;
for (auto& file : ssdCache_->files_) {
std::filesystem::path p{file->getEvictLogFilePath()};
std::filesystem::path p{file->evictLogFilePath()};
size += std::filesystem::file_size(p);
}
return size;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ TEST_F(SsdFileTest, fileCorruption) {
// Corrupt the Checkpoint file. Cache cannot be recovered. All entries are
// lost.
ssdFile_->checkpoint(true);
corruptSsdFile(ssdFile_->getCheckpointFilePath());
corruptSsdFile(ssdFile_->checkpointFilePath());
stats.clear();
ssdFile_->updateStats(stats);
EXPECT_EQ(stats.readCheckpointErrors, 0);
Expand Down
6 changes: 5 additions & 1 deletion velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ class WriteFile {
/// be needed to get the exact size written, and this should be able to be
/// called after the file close.
virtual uint64_t size() const = 0;

virtual const std::string getName() const {
VELOX_NYI("{} is not implemented", __FUNCTION__);
}
};

// We currently do a simple implementation for the in-memory files
Expand Down Expand Up @@ -349,7 +353,7 @@ class LocalWriteFile final : public WriteFile {
return size_;
}

const std::string& getName() const {
const std::string getName() const final {
return path_;
}

Expand Down
Loading

0 comments on commit 066154f

Please sign in to comment.