Skip to content

Commit

Permalink
Handle ssd cache region score overflow (facebookincubator#9709)
Browse files Browse the repository at this point in the history
Summary:
When SsdFileTracker looks for regions to evict, it calculates average
score of all regions and picks regions with no pins and whose scores no
larger than the average as candidates. It first adds up all the scores
then divided it by the number of regions to calculate the average.
However, if the scores are large, the sum can overflow to a small
integer, leading to a scenario where none of the individual region
scores is less or equal to the average. When this happens, even when
all regions are unpinned, the ssd cache file will not be able to find
any candidates, blocking all cache writes.
Therefore, overflow needs to be handled properly.

Pull Request resolved: facebookincubator#9709

Reviewed By: xiaoxmeng

Differential Revision: D56965266

Pulled By: zacw7

fbshipit-source-id: 414e67975568618a0d6ba48b947c0de73cb8103c
  • Loading branch information
zacw7 authored and facebook-github-bot committed May 8, 2024
1 parent 49b1246 commit 5c4903f
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 33 deletions.
4 changes: 2 additions & 2 deletions velox/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,14 @@ class TpchBenchmark {
#endif

if (cache_) {
cache_->clear();
cache_->testingClear();
}
}
if (FLAGS_clear_ssd_cache) {
if (cache_) {
auto ssdCache = cache_->ssdCache();
if (ssdCache) {
ssdCache->clear();
ssdCache->testingClear();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ CacheStats AsyncDataCache::refreshStats() const {
return stats;
}

void AsyncDataCache::clear() {
void AsyncDataCache::testingClear() {
for (auto& shard : shards_) {
memory::Allocation unused;
shard->evict(std::numeric_limits<uint64_t>::max(), true, 0, unused);
Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ class AsyncDataCache : public memory::Cache {
}

// Drops all unpinned entries. Pins stay valid.
void clear();
void testingClear();

// Saves all entries with 'ssdSaveable_' to 'ssdCache_'.
void saveToSsd();
Expand Down
4 changes: 2 additions & 2 deletions velox/common/caching/SsdCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ SsdCacheStats SsdCache::stats() const {
return stats;
}

void SsdCache::clear() {
void SsdCache::testingClear() {
for (auto& file : files_) {
file->clear();
file->testingClear();
}
}

Expand Down
2 changes: 1 addition & 1 deletion velox/common/caching/SsdCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SsdCache {
/// Drops all entries. Outstanding pins become invalid but reading them will
/// mostly succeed since the files will not be rewritten until new content is
/// stored.
void clear();
void testingClear();

/// Deletes backing files. Used in testing.
void testingDeleteFiles();
Expand Down
7 changes: 4 additions & 3 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,14 @@ void SsdFile::updateStats(SsdCacheStats& stats) const {
stats.readCheckpointErrors += stats_.readCheckpointErrors;
}

void SsdFile::clear() {
void SsdFile::testingClear() {
std::lock_guard<std::shared_mutex> l(mutex_);
entries_.clear();
std::fill(regionSizes_.begin(), regionSizes_.end(), 0);
std::fill(erasedRegionSizes_.begin(), erasedRegionSizes_.end(), 0);
writableRegions_.resize(numRegions_);
std::iota(writableRegions_.begin(), writableRegions_.end(), 0);
tracker_.testingClear();
}

void SsdFile::deleteFile() {
Expand Down Expand Up @@ -863,8 +864,8 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
maxRegions_,
"Trying to start from checkpoint with a different capacity");
numRegions_ = readNumber<int32_t>(state);
std::vector<int64_t> scores(maxRegions);
state.read(asChar(scores.data()), maxRegions_ * sizeof(uint64_t));
std::vector<double> scores(maxRegions);
state.read(asChar(scores.data()), maxRegions_ * sizeof(double));
std::unordered_map<uint64_t, StringIdLease> idMap;
for (;;) {
const auto id = readNumber<uint64_t>(state);
Expand Down
6 changes: 5 additions & 1 deletion velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class SsdFile {
void updateStats(SsdCacheStats& stats) const;

/// Resets this' to a post-construction empty state. See SsdCache::clear().
void clear();
void testingClear();

// Deletes the backing file. Used in testing.
void deleteFile();
Expand All @@ -277,6 +277,10 @@ class SsdFile {
return fileName_;
}

std::vector<double> testingCopyScores() {
return tracker_.copyScores();
}

private:
// 4 first bytes of a checkpoint file. Allows distinguishing between format
// versions.
Expand Down
6 changes: 3 additions & 3 deletions velox/common/caching/SsdFileTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ void SsdFileTracker::fileTouched(int32_t totalEntries) {
}

void SsdFileTracker::regionFilled(int32_t region) {
const uint64_t best =
const double best =
*std::max_element(regionScores_.begin(), regionScores_.end());
regionScores_[region] = std::max<int64_t>(regionScores_[region], best * 1.1);
regionScores_[region] = std::max<double>(regionScores_[region], best * 1.1);
}

std::vector<int32_t> SsdFileTracker::findEvictionCandidates(
Expand All @@ -42,7 +42,7 @@ std::vector<int32_t> SsdFileTracker::findEvictionCandidates(
// Calculates average score of regions with no pins. Returns up to
// 'numCandidates' unpinned regions with score <= average, lowest scoring
// region first.
int64_t scoreSum = 0;
double scoreSum = 0;
int32_t numUnpinned = 0;
for (int i = 0; i < numRegions; ++i) {
if (regionPins[i] > 0) {
Expand Down
15 changes: 10 additions & 5 deletions velox/common/caching/SsdFileTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ class SsdFileTracker {
const std::vector<int32_t>& regionPins);

// Expose the region access data. Used in checkpointing cache state.
std::vector<tsan_atomic<uint64_t>>& regionScores() {
std::vector<tsan_atomic<double>>& regionScores() {
return regionScores_;
}

void setRegionScores(const std::vector<int64_t>& scores) {
void setRegionScores(const std::vector<double>& scores) {
VELOX_CHECK_EQ(scores.size(), regionScores_.size());
for (auto i = 0; i < scores.size(); ++i) {
regionScores_[i] = scores[i];
Expand All @@ -76,18 +76,23 @@ class SsdFileTracker {
/// Exports a copy of the scores. Tsan will report an error if a
/// pointer to atomics is passed to write(). Therefore copy the
/// atomics into non-atomics before writing.
std::vector<uint64_t> copyScores() {
std::vector<uint64_t> scores(regionScores_.size());
std::vector<double> copyScores() {
std::vector<double> scores(regionScores_.size());
for (auto i = 0; i < scores.size(); ++i) {
scores[i] = tsanAtomicValue(regionScores_[i]);
}
return scores;
}

/// Resets scores of all regions.
void testingClear() {
std::fill(regionScores_.begin(), regionScores_.end(), 0);
}

private:
static constexpr int32_t kDecayInterval = 1000;

std::vector<tsan_atomic<uint64_t>> regionScores_;
std::vector<tsan_atomic<double>> regionScores_;

// Count of lookups. The scores are decayed every time the count goes
// over kDecayInterval or half count of cache entries, whichever comes first.
Expand Down
4 changes: 2 additions & 2 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ TEST_F(AsyncDataCacheTest, pin) {
EXPECT_EQ(0, stats.numShared);
EXPECT_EQ(0, stats.numExclusive);

cache_->clear();
cache_->testingClear();
stats = cache_->refreshStats();
EXPECT_EQ(0, stats.largeSize);
EXPECT_EQ(0, stats.numEntries);
Expand Down Expand Up @@ -816,7 +816,7 @@ TEST_F(AsyncDataCacheTest, DISABLED_ssd) {
ASSERT_EQ(ramStats.numShared, 0);
ASSERT_EQ(ramStats.numExclusive, 0);

cache_->ssdCache()->clear();
cache_->ssdCache()->testingClear();
// We cut the tail off one of the cache shards.
corruptFile(fmt::format("{}/cache0.cpt", tempDirectory_->getPath()));
// We open the cache from checkpoint. Reading checks the data integrity, here
Expand Down
45 changes: 37 additions & 8 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SsdFileTest : public testing::Test {
protected:
static constexpr int64_t kMB = 1 << 20;

static void SetUpTestCase() {
void SetUp() override {
memory::MemoryManager::testingSetInstance({});
}

Expand All @@ -56,22 +56,27 @@ class SsdFileTest : public testing::Test {
}

void initializeCache(
int64_t maxBytes,
int64_t ssdBytes = 0,
bool setNoCowFlag = false) {
int64_t checkpointIntervalBytes = 0,
bool disableFileCow = false) {
// tmpfs does not support O_DIRECT, so turn this off for testing.
FLAGS_ssd_odirect = false;
cache_ = AsyncDataCache::create(memory::memoryManager()->allocator());

fileName_ = StringIdLease(fileIds(), "fileInStorage");

tempDirectory_ = exec::test::TempDirectoryPath::create();
initializeSsdFile(ssdBytes, checkpointIntervalBytes, disableFileCow);
}

void initializeSsdFile(
int64_t ssdBytes = 0,
int64_t checkpointIntervalBytes = 0,
bool disableFileCow = false) {
ssdFile_ = std::make_unique<SsdFile>(
fmt::format("{}/ssdtest", tempDirectory_->getPath()),
0, // shardId
bits::roundUp(ssdBytes, SsdFile::kRegionSize) / SsdFile::kRegionSize,
0, // checkpointInternalBytes
setNoCowFlag);
checkpointIntervalBytes,
disableFileCow);
}

static void initializeContents(int64_t sequence, memory::Allocation& alloc) {
Expand Down Expand Up @@ -240,7 +245,7 @@ class SsdFileTest : public testing::Test {
TEST_F(SsdFileTest, writeAndRead) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;
std::vector<TestEntry> allEntries;
initializeCache(128 * kMB, kSsdSize);
initializeCache(kSsdSize);
FLAGS_ssd_verify_write = true;
for (auto startOffset = 0; startOffset <= kSsdSize - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
Expand Down Expand Up @@ -304,6 +309,30 @@ TEST_F(SsdFileTest, writeAndRead) {
}
}

TEST_F(SsdFileTest, checkpoint) {
constexpr int64_t kSsdSize = 16 * SsdFile::kRegionSize;
const int32_t checkpointIntervalBytes = 5 * SsdFile::kRegionSize;
FLAGS_ssd_verify_write = true;
initializeCache(kSsdSize, checkpointIntervalBytes);

for (auto startOffset = 0; startOffset <= kSsdSize - SsdFile::kRegionSize;
startOffset += SsdFile::kRegionSize) {
auto pins =
makePins(fileName_.id(), startOffset, 4096, 2048 * 1025, 62 * kMB);
ssdFile_->write(pins);
readAndCheckPins(pins);
}
const auto originalRegionScores = ssdFile_->testingCopyScores();
EXPECT_EQ(originalRegionScores.size(), 16);

// Re-initialize SSD file from checkpoint.
ssdFile_->checkpoint(true);
initializeSsdFile(kSsdSize, checkpointIntervalBytes);
const auto recoveredRegionScores = ssdFile_->testingCopyScores();
EXPECT_EQ(recoveredRegionScores.size(), 16);
EXPECT_EQ(originalRegionScores, recoveredRegionScores);
}

#ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG
TEST_F(SsdFileTest, disabledCow) {
LOG(ERROR) << "here";
Expand Down
19 changes: 19 additions & 0 deletions velox/common/caching/tests/SsdFileTrackerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,23 @@ TEST(SsdFileTrackerTest, tracker) {
tracker.findEvictionCandidates(kNumRegions, kNumRegions, pins);
std::vector<int32_t> expected{0, 1, 4, 5, 6, 7, 8, 9};
EXPECT_EQ(candidates, expected);

// Test large region scores.
tracker.testingClear();
for (auto region = 0; region < kNumRegions; ++region) {
tracker.regionRead(region, INT32_MAX);
tracker.regionRead(region, region * 100'000'000);
}
for (int i = 0; i < 999; ++i) {
for (auto region = 0; region < kNumRegions; ++region) {
tracker.regionFilled(region);
}
}
for (const auto score : tracker.copyScores()) {
EXPECT_TRUE(std::isinf(score));
}
// Mark all regions to be evictable.
std::fill(pins.begin(), pins.end(), 0);
candidates = tracker.findEvictionCandidates(3, kNumRegions, pins);
EXPECT_EQ(candidates.size(), 3);
}
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/test/CacheInputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ TEST_F(CacheTest, ssd) {
EXPECT_LT(0, ioStats_->rawOverreadBytes());
auto fullStripeBytes = ioStats_->rawBytesRead();
auto bytes = ioStats_->rawBytesRead();
cache_->clear();
cache_->testingClear();
// We read 10 stripes with some columns sparsely accessed.
readLoop("testfile", 30, 70, 10, 10, 1, ioStats_);
auto sparseStripeBytes = (ioStats_->rawBytesRead() - bytes) / 10;
Expand All @@ -510,7 +510,7 @@ TEST_F(CacheTest, ssd) {
"prefix1_", 0, kSsdBytes / bytesPerFile, 30, 100, 1, kStripesPerFile, 4);

waitForWrite();
cache_->clear();
cache_->testingClear();
// Read double this to get some eviction from SSD.
readFiles(
"prefix1_",
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) {
const std::vector<int32_t> numPrefetchSplits = {0, 2};
for (const auto& numPrefetchSplit : numPrefetchSplits) {
SCOPED_TRACE(fmt::format("numPrefetchSplit {}", numPrefetchSplit));
asyncDataCache_->clear();
asyncDataCache_->testingClear();
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), vectors);

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void OperatorTestBase::setupMemory(
int64_t memoryPoolInitCapacity,
int64_t memoryPoolReservedCapacity) {
if (asyncDataCache_ != nullptr) {
asyncDataCache_->clear();
asyncDataCache_->testingClear();
asyncDataCache_.reset();
}
MemoryManagerOptions options;
Expand Down

0 comments on commit 5c4903f

Please sign in to comment.