From 0fbd8dd2d64229035e4a979c9bac8b2b2f0d267d Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 22 Oct 2025 20:37:14 -0400 Subject: [PATCH 1/3] fix: Implement chunking for large SBF filters This commit adds chunking functionalities for load/save operations of bloom filters. Additional information is added in the serialization of each filter. Specifically, when saving each filter the total size of the filter is written followed by chunks of the filter (max size of 64 MB per chunk). Signed-off-by: Eric --- src/server/rdb_load.cc | 18 +++++++++++++++++- src/server/rdb_save.cc | 11 +++++++++-- src/server/rdb_save.h | 2 ++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a26aff41586e..ba838368f232 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1874,7 +1874,23 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { unsigned hash_cnt; string filter_data; SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt); - SET_OR_UNEXPECT(FetchGenericString(), filter_data); + + unsigned total_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), total_size); + + filter_data.resize(total_size); + size_t offset = 0; + while (offset < total_size) { + unsigned chunk_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size); + error_code ec = FetchBuf(chunk_size, filter_data.data() + offset); + if (ec) { + return make_unexpected(ec); + } + + offset += chunk_size; + } + size_t bit_len = filter_data.size() * 8; if (!is_power2(bit_len)) { // must be power of two return Unexpected(errc::rdb_file_corrupted); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index edbbac0e66e1..b228d7fa42e9 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -623,11 +623,18 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i))); string_view blob = sbf->data(i); - RETURN_ON_ERR(SaveString(blob)); + size_t num_chunks = (blob.size() + kFilterChunkSize - 1) / kFilterChunkSize; + RETURN_ON_ERR(SaveLen(blob.size())); + + for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + size_t offset = chunk_idx * kFilterChunkSize; + size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset); + RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len))); + } + FlushState flush_state = FlushState::kFlushMidEntry; if ((i + 1) == sbf->num_filters()) flush_state = FlushState::kFlushEndEntry; - FlushIfNeeded(flush_state); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 71d6e444d896..5fbca1e74c9b 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -20,6 +20,8 @@ extern "C" { #include "server/journal/types.h" #include "server/table.h" +constexpr size_t kFilterChunkSize = 1ULL << 26; + typedef struct rax rax; typedef struct streamCG streamCG; typedef struct quicklistNode quicklistNode; From 450564446e23a28ea70a391241b0b8a4460f8e4f Mon Sep 17 00:00:00 2001 From: Eric Date: Thu, 23 Oct 2025 06:43:00 -0400 Subject: [PATCH 2/3] Implement unit test for SBF chunking logic --- src/server/rdb_test.cc | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 60a9931615ec..8d44265cbca8 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -670,6 +670,33 @@ TEST_F(RdbTest, SBF) { EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1)); } +TEST_F(RdbTest, SBFLargeFilterChunking) { + max_memory_limit = 200000000; + + // Using this set of parameters for the BF.RESERVE command resulted in a + // filter size large enough to require chunking (> 64 MB). + const double error_rate = 0.001; + const size_t capacity = 50'000'000; + const size_t num_items = 100; + + size_t collisions = 0; + + Run({"BF.RESERVE", "large_key", std::to_string(error_rate), std::to_string(capacity)}); + for (size_t i = 0; i < num_items; i++) { + auto res = Run({"BF.ADD", "large_key", absl::StrCat("item", i)}); + if (*res.GetInt() == 0) + collisions++; + } + EXPECT_LT(static_cast(collisions) / num_items, error_rate); + + Run({"debug", "reload"}); + EXPECT_EQ(Run({"type", "large_key"}), "MBbloom--"); + + for (size_t i = 0; i < num_items; i++) { + EXPECT_THAT(Run({"BF.EXISTS", "large_key", absl::StrCat("item", i)}), IntArg(1)); + } +} + TEST_F(RdbTest, RestoreSearchIndexNameStartingWithColon) { // Create an index with a name that starts with ':' and add a sample document EXPECT_EQ(Run({"FT.CREATE", ":Order:index", "ON", "HASH", "PREFIX", "1", ":Order:", "SCHEMA", From e83b84fb7ebe287428ee76bcd2ef05dbb7c3fd4f Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 5 Nov 2025 20:54:41 -0500 Subject: [PATCH 3/3] Add flags for setting save format for SBF filters Added a new flag `rdb_sbf_chunked` which determines the save format of SBFs. Also, separate functions for saving SBFs were added. Signed-off-by: Eric --- src/server/rdb_extensions.h | 3 ++- src/server/rdb_load.cc | 44 +++++++++++++++++++++++++------------ src/server/rdb_load.h | 2 ++ src/server/rdb_save.cc | 17 +++++++++----- src/server/rdb_save.h | 3 +-- src/server/rdb_test.cc | 2 ++ 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index f8d48d4efa90..aceadfe8cf44 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -13,11 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30; constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31; constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32; constexpr uint8_t RDB_TYPE_SBF = 33; +constexpr uint8_t RDB_TYPE_SBF2 = 34; constexpr bool rdbIsObjectTypeDF(uint8_t type) { return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) || (type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) || - (type == RDB_TYPE_SBF); + (type == RDB_TYPE_SBF) || (type == RDB_TYPE_SBF2); } // Opcodes: Range 200-240 is used by DF extensions. diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index ba838368f232..732ee0122f56 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -54,6 +54,7 @@ extern "C" { ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size); ABSL_DECLARE_FLAG(int32_t, list_compress_depth); ABSL_DECLARE_FLAG(uint32_t, dbnum); +ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked); ABSL_FLAG(bool, rdb_load_dry_run, false, "Dry run RDB load without applying changes"); ABSL_FLAG(bool, rdb_ignore_expiry, false, "Ignore Key Expiry when loding from RDB snapshot"); @@ -188,7 +189,7 @@ string ModuleTypeName(uint64_t module_id) { bool RdbTypeAllowedEmpty(int type) { return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF || type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY || - type == RDB_TYPE_HASH_WITH_EXPIRY; + type == RDB_TYPE_HASH_WITH_EXPIRY || type == RDB_TYPE_SBF2; } DbSlice& GetCurrentDbSlice() { @@ -1316,6 +1317,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { case RDB_TYPE_SBF: iores = ReadSBF(); break; + case RDB_TYPE_SBF2: + iores = ReadSBF2(); + break; default: LOG(ERROR) << "Unsupported rdb type " << rdbtype; @@ -1851,7 +1855,7 @@ auto RdbLoaderBase::ReadRedisJson() -> io::Result { return OpaqueObj{std::move(dest), RDB_TYPE_JSON}; } -auto RdbLoaderBase::ReadSBF() -> io::Result { +auto RdbLoaderBase::ReadSBFImpl(bool chunking) -> io::Result { RdbSBF res; uint64_t options; SET_OR_UNEXPECT(LoadLen(nullptr), options); @@ -1875,20 +1879,24 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { string filter_data; SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt); - unsigned total_size = 0; - SET_OR_UNEXPECT(LoadLen(nullptr), total_size); + if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) { + unsigned total_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), total_size); - filter_data.resize(total_size); - size_t offset = 0; - while (offset < total_size) { - unsigned chunk_size = 0; - SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size); - error_code ec = FetchBuf(chunk_size, filter_data.data() + offset); - if (ec) { - return make_unexpected(ec); - } + filter_data.resize(total_size); + size_t offset = 0; + while (offset < total_size) { + unsigned chunk_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size); + error_code ec = FetchBuf(chunk_size, filter_data.data() + offset); + if (ec) { + return make_unexpected(ec); + } - offset += chunk_size; + offset += chunk_size; + } + } else { + SET_OR_UNEXPECT(FetchGenericString(), filter_data); } size_t bit_len = filter_data.size() * 8; @@ -1900,6 +1908,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { return OpaqueObj{std::move(res), RDB_TYPE_SBF}; } +auto RdbLoaderBase::ReadSBF() -> io::Result { + return ReadSBFImpl(false); +} + +auto RdbLoaderBase::ReadSBF2() -> io::Result { + return ReadSBFImpl(true); +} + template io::Result RdbLoaderBase::FetchInt() { auto ec = EnsureRead(sizeof(T)); if (ec) diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 4950ae39c612..9c837cc5f80e 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -169,7 +169,9 @@ class RdbLoaderBase { ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(int rdbtype); ::io::Result ReadRedisJson(); + ::io::Result ReadSBFImpl(bool chunking); ::io::Result ReadSBF(); + ::io::Result ReadSBF2(); std::error_code SkipModuleData(); std::error_code HandleCompressedBlob(int op_type); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index b228d7fa42e9..e2932fa0a302 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true, "Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+"); +// Flip this value to 'true' in March 2026. +ABSL_FLAG(bool, rdb_sbf_chunked, false, "Enable new save format for saving SBFs in chunks."); + namespace dfly { using namespace std; @@ -623,13 +626,15 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i))); string_view blob = sbf->data(i); - size_t num_chunks = (blob.size() + kFilterChunkSize - 1) / kFilterChunkSize; - RETURN_ON_ERR(SaveLen(blob.size())); + if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) { + RETURN_ON_ERR(SaveLen(blob.size())); - for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { - size_t offset = chunk_idx * kFilterChunkSize; - size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset); - RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len))); + for (size_t offset = 0; offset < blob.size(); offset += kFilterChunkSize) { + size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset); + RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len))); + } + } else { + RETURN_ON_ERR(SaveString(blob)); } FlushState flush_state = FlushState::kFlushMidEntry; diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 5fbca1e74c9b..ca81ed5ad822 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -20,8 +20,6 @@ extern "C" { #include "server/journal/types.h" #include "server/table.h" -constexpr size_t kFilterChunkSize = 1ULL << 26; - typedef struct rax rax; typedef struct streamCG streamCG; typedef struct quicklistNode quicklistNode; @@ -211,6 +209,7 @@ class SerializerBase { io::IoBuf mem_buf_; std::unique_ptr compressor_impl_; + static constexpr size_t kFilterChunkSize = 1ULL << 26; static constexpr size_t kMinStrSizeToCompress = 256; static constexpr size_t kMaxStrSizeToCompress = 1 * 1024 * 1024; static constexpr double kMinCompressionReductionPrecentage = 0.95; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 8d44265cbca8..e32db2fdc41c 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -35,6 +35,7 @@ ABSL_DECLARE_FLAG(int32, list_max_listpack_size); ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode); ABSL_DECLARE_FLAG(bool, rdb_ignore_expiry); ABSL_DECLARE_FLAG(uint32_t, num_shards); +ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked); namespace dfly { @@ -671,6 +672,7 @@ TEST_F(RdbTest, SBF) { } TEST_F(RdbTest, SBFLargeFilterChunking) { + absl::SetFlag(&FLAGS_rdb_sbf_chunked, true); max_memory_limit = 200000000; // Using this set of parameters for the BF.RESERVE command resulted in a