From be5c265d3c2a773d1971ff27e8d74694751e629a Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Mon, 13 Jan 2025 11:45:12 -0800 Subject: [PATCH] refactor(cache): Clean up redundant code (#12024) Summary: Refactor read checkpoint code and consolidate duplicate file clean up. Reviewed By: xiaoxmeng Differential Revision: D67868731 --- .github/workflows/linux-build-base.yml | 3 +- velox/common/base/Counters.cpp | 4 +- velox/common/base/Counters.h | 4 +- velox/common/base/PeriodicStatsReporter.cpp | 4 +- velox/common/base/tests/StatsReporterTest.cpp | 6 +- velox/common/caching/SsdFile.cpp | 121 +++++++----------- velox/common/caching/SsdFile.h | 33 ++--- velox/common/caching/tests/CacheTestUtil.h | 2 +- velox/common/caching/tests/SsdFileTest.cpp | 2 +- velox/common/file/File.h | 6 +- velox/common/file/tests/FaultyFile.cpp | 8 ++ velox/common/file/tests/FaultyFile.h | 6 +- 12 files changed, 90 insertions(+), 109 deletions(-) diff --git a/.github/workflows/linux-build-base.yml b/.github/workflows/linux-build-base.yml index 5a4e78d6a1e4..262896d5ffb2 100644 --- a/.github/workflows/linux-build-base.yml +++ b/.github/workflows/linux-build-base.yml @@ -86,7 +86,6 @@ jobs: run: | EXTRA_CMAKE_FLAGS=( "-DVELOX_ENABLE_BENCHMARKS=ON" - "-DVELOX_ENABLE_EXAMPLES=ON" "-DVELOX_ENABLE_ARROW=ON" "-DVELOX_ENABLE_PARQUET=ON" "-DVELOX_ENABLE_HDFS=ON" @@ -161,7 +160,7 @@ jobs: VELOX_DEPENDENCY_SOURCE: BUNDLED ICU_SOURCE: SYSTEM MAKEFLAGS: "NUM_THREADS=8 MAX_HIGH_MEM_JOBS=4 MAX_LINK_JOBS=3" - EXTRA_CMAKE_FLAGS: "-DVELOX_ENABLE_ARROW=ON -DVELOX_ENABLE_PARQUET=ON -DVELOX_ENABLE_EXAMPLES=ON" + EXTRA_CMAKE_FLAGS: "-DVELOX_ENABLE_ARROW=ON -DVELOX_ENABLE_PARQUET=ON" run: | if [[ "${USE_CLANG}" = "true" ]]; then export CC=/usr/bin/clang-15; export CXX=/usr/bin/clang++-15; fi make debug diff --git a/velox/common/base/Counters.cpp b/velox/common/base/Counters.cpp index 1ff64da4232c..e4bfb17c2c05 100644 --- a/velox/common/base/Counters.cpp +++ b/velox/common/base/Counters.cpp @@ -219,9 +219,9 @@ void registerVeloxMetrics() { // Total number of SSD evict log file open errors. DEFINE_METRIC(kMetricSsdCacheOpenLogErrors, facebook::velox::StatType::SUM); - // Total number of errors while deleting SSD checkpoint files. + // Total number of errors while deleting SSD checkpoint/evictlog files. DEFINE_METRIC( - kMetricSsdCacheDeleteCheckpointErrors, facebook::velox::StatType::SUM); + kMetricSsdCacheMetaFileDeleteErrors, facebook::velox::StatType::SUM); // Total number of errors while growing SSD cache files. DEFINE_METRIC(kMetricSsdCacheGrowFileErrors, facebook::velox::StatType::SUM); diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index 076ce4ec68c8..9133d27514f8 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -289,8 +289,8 @@ constexpr folly::StringPiece kMetricSsdCacheOpenCheckpointErrors{ constexpr folly::StringPiece kMetricSsdCacheOpenLogErrors{ "velox.ssd_cache_open_log_errors"}; -constexpr folly::StringPiece kMetricSsdCacheDeleteCheckpointErrors{ - "velox.ssd_cache_delete_checkpoint_errors"}; +constexpr folly::StringPiece kMetricSsdCacheMetaFileDeleteErrors{ + "velox.ssd_cache_delete_meta_file_errors"}; constexpr folly::StringPiece kMetricSsdCacheGrowFileErrors{ "velox.ssd_cache_grow_file_errors"}; diff --git a/velox/common/base/PeriodicStatsReporter.cpp b/velox/common/base/PeriodicStatsReporter.cpp index 5bd8781a6b08..802ed2037e40 100644 --- a/velox/common/base/PeriodicStatsReporter.cpp +++ b/velox/common/base/PeriodicStatsReporter.cpp @@ -198,8 +198,8 @@ void PeriodicStatsReporter::reportCacheStats() { REPORT_IF_NOT_ZERO( kMetricSsdCacheOpenLogErrors, deltaSsdStats.openLogErrors); REPORT_IF_NOT_ZERO( - kMetricSsdCacheDeleteCheckpointErrors, - deltaSsdStats.deleteCheckpointErrors); + kMetricSsdCacheMetaFileDeleteErrors, + deltaSsdStats.deleteMetaFileErrors); REPORT_IF_NOT_ZERO( kMetricSsdCacheGrowFileErrors, deltaSsdStats.growFileErrors); REPORT_IF_NOT_ZERO( diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index 7d6f08fbe74c..87fadae28665 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -487,7 +487,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 0); - ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 0); + ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 0); ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 0); @@ -520,7 +520,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { newSsdStats->openFileErrors = 10; newSsdStats->openCheckpointErrors = 10; newSsdStats->openLogErrors = 10; - newSsdStats->deleteCheckpointErrors = 10; + newSsdStats->deleteMetaFileErrors = 10; newSsdStats->growFileErrors = 10; newSsdStats->writeSsdErrors = 10; newSsdStats->writeSsdDropped = 10; @@ -570,7 +570,7 @@ TEST_F(PeriodicStatsReporterTest, basic) { ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenSsdErrors.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenCheckpointErrors.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheOpenLogErrors.str()), 1); - ASSERT_EQ(counterMap.count(kMetricSsdCacheDeleteCheckpointErrors.str()), 1); + ASSERT_EQ(counterMap.count(kMetricSsdCacheMetaFileDeleteErrors.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheGrowFileErrors.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdErrors.str()), 1); ASSERT_EQ(counterMap.count(kMetricSsdCacheWriteSsdDropped.str()), 1); diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index a876a091e3d5..86bd9e9a14e1 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -32,12 +32,8 @@ #include #include #include -#include #include -#include "velox/common/base/Counters.h" -#include "velox/common/base/StatsReporter.h" - DEFINE_bool(ssd_odirect, true, "Use O_DIRECT for SSD cache IO"); DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD"); @@ -521,7 +517,7 @@ void SsdFile::updateStats(SsdCacheStats& stats) const { stats.openFileErrors += stats_.openFileErrors; stats.openCheckpointErrors += stats_.openCheckpointErrors; stats.openLogErrors += stats_.openLogErrors; - stats.deleteCheckpointErrors += stats_.deleteCheckpointErrors; + stats.deleteMetaFileErrors += stats_.deleteMetaFileErrors; stats.growFileErrors += stats_.growFileErrors; stats.writeSsdErrors += stats_.writeSsdErrors; stats.writeCheckpointErrors += stats_.writeCheckpointErrors; @@ -638,57 +634,37 @@ void SsdFile::deleteCheckpoint(bool keepLog) { } if (evictLogWriteFile_ != nullptr) { - try { - if (keepLog) { - truncateEvictLogFile(); - } else { - deleteEvictLogFile(); - } - } catch (const std::exception& e) { - ++stats_.deleteCheckpointErrors; - VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what(); + if (keepLog) { + truncateFile(evictLogWriteFile_.get()); + } else { + deleteFile(std::move(evictLogWriteFile_)); } } if (checkpointWriteFile_ != nullptr) { - deleteCheckpointFile(); + deleteFile(std::move(checkpointWriteFile_)); } } -void SsdFile::truncateEvictLogFile() { - VELOX_CHECK_NOT_NULL(evictLogWriteFile_); - evictLogWriteFile_->truncate(0); - evictLogWriteFile_->flush(); +void SsdFile::truncateFile(WriteFile* file) { + VELOX_CHECK_NOT_NULL(file); + file->truncate(0); + file->flush(); } -void SsdFile::truncateCheckpointFile() { - VELOX_CHECK_NOT_NULL(checkpointWriteFile_); - checkpointWriteFile_->truncate(0); - checkpointWriteFile_->flush(); -} - -void SsdFile::deleteEvictLogFile() { - VELOX_CHECK_NOT_NULL(evictLogWriteFile_); - evictLogWriteFile_->close(); - evictLogWriteFile_.reset(); - const auto evictLogFilePath = getEvictLogFilePath(); - if (fs_->exists(evictLogFilePath)) { - fs_->remove(evictLogFilePath); - } -} - -void SsdFile::deleteCheckpointFile() { - VELOX_CHECK_NOT_NULL(checkpointWriteFile_); +void SsdFile::deleteFile(std::unique_ptr file) { + VELOX_CHECK_NOT_NULL(file); + const auto filePath = file->getName(); try { - checkpointWriteFile_->close(); - checkpointWriteFile_.reset(); - const auto checkpointFilePath = getCheckpointFilePath(); - if (fs_->exists(checkpointFilePath)) { - fs_->remove(checkpointFilePath); + file->close(); + file.reset(); + if (fs_->exists(filePath)) { + fs_->remove(filePath); } } catch (const std::exception& e) { - ++stats_.deleteCheckpointErrors; - VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what(); + ++stats_.deleteMetaFileErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << fmt::format("Error in deleting file {}: {}", filePath, e.what()); } } @@ -776,7 +752,7 @@ void SsdFile::checkpoint(bool force) { try { VELOX_CHECK_NOT_NULL(checkpointWriteFile_); - truncateCheckpointFile(); + truncateFile(checkpointWriteFile_.get()); // The checkpoint state file contains: // int32_t The 4 bytes of checkpoint version, // int32_t maxRegions, @@ -859,34 +835,22 @@ void SsdFile::initializeCheckpoint() { return; } - bool hasCheckpoint = true; - std::unique_ptr checkpointInputStream; filesystems::FileOptions writeFileOptions; writeFileOptions.shouldThrowOnFileAlreadyExists = false; - const auto checkpointPath = getCheckpointFilePath(); + const auto checkpointPath = checkpointFilePath(); try { checkpointWriteFile_ = fs_->openFileForWrite(checkpointPath, writeFileOptions); - - auto checkpointReadFile = fs_->openFileForRead(checkpointPath); - checkpointInputStream = std::make_unique( - std::move(checkpointReadFile), - 1 << 20, - memory::memoryManager()->cachePool()); - } catch (std::exception& e) { - hasCheckpoint = false; - ++stats_.openCheckpointErrors; - VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( - "Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", - e.what(), - shardId_, - checksumEnabled_ ? "enabled" : "disabled", - checksumReadVerificationEnabled_ ? "enabled" : "disabled", - checkpointPath); + } catch (const std::exception& e) { + ++stats_.writeCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) << fmt::format( + "Could not initilize checkpoint file {} for writing: {}: ", + checkpointPath, + e.what()); } - const auto logPath = getEvictLogFilePath(); + const auto logPath = evictLogFilePath(); try { evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions); } catch (std::exception& e) { @@ -896,9 +860,7 @@ void SsdFile::initializeCheckpoint() { } try { - if (hasCheckpoint) { - readCheckpoint(std::move(checkpointInputStream)); - } + readCheckpoint(); } catch (const std::exception& e) { ++stats_.readCheckpointErrors; try { @@ -995,7 +957,22 @@ std::vector readVector(common::FileInputStream* stream, int32_t size) { } } // namespace -void SsdFile::readCheckpoint(std::unique_ptr stream) { +void SsdFile::readCheckpoint() { + const auto checkpointPath = checkpointFilePath(); + std::unique_ptr stream; + try { + auto checkpointReadFile = fs_->openFileForRead(checkpointPath); + stream = std::make_unique( + std::move(checkpointReadFile), + 1 << 20, + memory::memoryManager()->cachePool()); + } catch (std::exception& e) { + ++stats_.openCheckpointErrors; + VELOX_SSD_CACHE_LOG(WARNING) + << fmt::format("Error openning checkpoint file {}: ", e.what()); + return; + } + const auto versionMagic = readString(stream.get(), 4); const auto checkpoinHasChecksum = isChecksumEnabledOnCheckpointVersion(versionMagic); @@ -1003,7 +980,7 @@ void SsdFile::readCheckpoint(std::unique_ptr stream) { VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( "Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}", shardId_, - getCheckpointFilePath()); + checkpointPath); return; } @@ -1026,7 +1003,7 @@ void SsdFile::readCheckpoint(std::unique_ptr stream) { idMap[id] = StringIdLease(fileIds(), id, name); } - const auto logPath = getEvictLogFilePath(); + const auto logPath = evictLogFilePath(); const auto evictLogReadFile = fs_->openFileForRead(logPath); const auto logSize = evictLogReadFile->size(); std::vector evicted(logSize / sizeof(uint32_t)); @@ -1106,7 +1083,7 @@ void SsdFile::readCheckpoint(std::unique_ptr stream) { writableRegions_.size(), checksumEnabled_ ? "enabled" : "disabled", checksumReadVerificationEnabled_ ? "enabled" : "disabled", - getCheckpointFilePath()); + checkpointFilePath()); } } // namespace facebook::velox::cache diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index adc54fa437a7..c2c260270201 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -164,7 +164,7 @@ struct SsdCacheStats { openFileErrors = tsanAtomicValue(other.openFileErrors); openCheckpointErrors = tsanAtomicValue(other.openCheckpointErrors); openLogErrors = tsanAtomicValue(other.openLogErrors); - deleteCheckpointErrors = tsanAtomicValue(other.deleteCheckpointErrors); + deleteMetaFileErrors = tsanAtomicValue(other.deleteMetaFileErrors); growFileErrors = tsanAtomicValue(other.growFileErrors); writeSsdErrors = tsanAtomicValue(other.writeSsdErrors); writeSsdDropped = tsanAtomicValue(other.writeSsdDropped); @@ -192,8 +192,8 @@ struct SsdCacheStats { result.openCheckpointErrors = openCheckpointErrors - other.openCheckpointErrors; result.openLogErrors = openLogErrors - other.openLogErrors; - result.deleteCheckpointErrors = - deleteCheckpointErrors - other.deleteCheckpointErrors; + result.deleteMetaFileErrors = + deleteMetaFileErrors - other.deleteMetaFileErrors; result.growFileErrors = growFileErrors - other.growFileErrors; result.writeSsdErrors = writeSsdErrors - other.writeSsdErrors; result.writeSsdDropped = writeSsdDropped - other.writeSsdDropped; @@ -232,7 +232,7 @@ struct SsdCacheStats { tsan_atomic openFileErrors{0}; tsan_atomic openCheckpointErrors{0}; tsan_atomic openLogErrors{0}; - tsan_atomic deleteCheckpointErrors{0}; + tsan_atomic deleteMetaFileErrors{0}; tsan_atomic growFileErrors{0}; tsan_atomic writeSsdErrors{0}; tsan_atomic writeSsdDropped{0}; @@ -368,12 +368,12 @@ class SsdFile { } /// Returns the eviction log file path. - std::string getEvictLogFilePath() const { + std::string evictLogFilePath() const { return fileName_ + kLogExtension; } /// Returns the checkpoint file path. - std::string getCheckpointFilePath() const { + std::string checkpointFilePath() const { return fileName_ + kCheckpointExtension; } @@ -440,10 +440,9 @@ class SsdFile { // Verifies that 'entry' has the data at 'run'. void verifyWrite(AsyncDataCacheEntry& entry, SsdRun run); - // Reads a checkpoint state file and sets 'this' accordingly if read is - // successful. Return true for successful read. A failed read deletes the - // checkpoint and leaves the log truncated open. - void readCheckpoint(std::unique_ptr stream); + // Reads a checkpoint file and sets 'this' accordingly if read succeeds. A + // failed read deletes the checkpoint and leaves the truncated log open. + void readCheckpoint(); // Logs an error message, deletes the checkpoint and stop making new // checkpoints. @@ -488,17 +487,11 @@ class SsdFile { // file system not supporting cow feature. void disableFileCow(); - // Truncates the eviction log file to 0. - void truncateEvictLogFile(); + // Truncates the given file to 0. + void truncateFile(WriteFile* file); - // Truncates the checkpoint file to 0. - void truncateCheckpointFile(); - - // Deletes the eviction log file if it exists. - void deleteEvictLogFile(); - - // Deletes the checkpoint file if it exists. - void deleteCheckpointFile(); + // Deletes the given file if it exists. + void deleteFile(std::unique_ptr file); // Allocates 'kCheckpointBufferSize' buffer from cache memory pool for // checkpointing. diff --git a/velox/common/caching/tests/CacheTestUtil.h b/velox/common/caching/tests/CacheTestUtil.h index bcf08755a330..731da4e876a8 100644 --- a/velox/common/caching/tests/CacheTestUtil.h +++ b/velox/common/caching/tests/CacheTestUtil.h @@ -111,7 +111,7 @@ class SsdCacheTestHelper { uint64_t totalEvictionLogFilesSize() { uint64_t size = 0; for (auto& file : ssdCache_->files_) { - std::filesystem::path p{file->getEvictLogFilePath()}; + std::filesystem::path p{file->evictLogFilePath()}; size += std::filesystem::file_size(p); } return size; diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index e9a0b6bc7a18..11eba733db77 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -554,7 +554,7 @@ TEST_F(SsdFileTest, fileCorruption) { // Corrupt the Checkpoint file. Cache cannot be recovered. All entries are // lost. ssdFile_->checkpoint(true); - corruptSsdFile(ssdFile_->getCheckpointFilePath()); + corruptSsdFile(ssdFile_->checkpointFilePath()); stats.clear(); ssdFile_->updateStats(stats); EXPECT_EQ(stats.readCheckpointErrors, 0); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 99bdaabc1533..63e25953af52 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -191,6 +191,10 @@ class WriteFile { /// be needed to get the exact size written, and this should be able to be /// called after the file close. virtual uint64_t size() const = 0; + + virtual const std::string getName() const { + VELOX_NYI("{} is not implemented", __FUNCTION__); + } }; // We currently do a simple implementation for the in-memory files @@ -349,7 +353,7 @@ class LocalWriteFile final : public WriteFile { return size_; } - const std::string& getName() const { + const std::string getName() const final { return path_; } diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index a694afdb5302..a5984113f901 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -134,4 +134,12 @@ std::unordered_map FaultyWriteFile::getAttributes() void FaultyWriteFile::close() { delegatedFile_->close(); } + +uint64_t FaultyWriteFile::size() const { + return delegatedFile_->size(); +} + +const std::string FaultyWriteFile::getName() const { + return delegatedFile_->getName(); +} } // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index dd54c6101d27..6d73eaebe625 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -103,9 +103,9 @@ class FaultyWriteFile : public WriteFile { void close() override; - uint64_t size() const override { - return delegatedFile_->size(); - } + uint64_t size() const override; + + const std::string getName() const override; private: const std::string path_;