diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index f8d48d4efa90..aceadfe8cf44 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -13,11 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30; constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31; constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32; constexpr uint8_t RDB_TYPE_SBF = 33; +constexpr uint8_t RDB_TYPE_SBF2 = 34; constexpr bool rdbIsObjectTypeDF(uint8_t type) { return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) || (type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) || - (type == RDB_TYPE_SBF); + (type == RDB_TYPE_SBF) || (type == RDB_TYPE_SBF2); } // Opcodes: Range 200-240 is used by DF extensions. diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a26aff41586e..732ee0122f56 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -54,6 +54,7 @@ extern "C" { ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size); ABSL_DECLARE_FLAG(int32_t, list_compress_depth); ABSL_DECLARE_FLAG(uint32_t, dbnum); +ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked); ABSL_FLAG(bool, rdb_load_dry_run, false, "Dry run RDB load without applying changes"); ABSL_FLAG(bool, rdb_ignore_expiry, false, "Ignore Key Expiry when loding from RDB snapshot"); @@ -188,7 +189,7 @@ string ModuleTypeName(uint64_t module_id) { bool RdbTypeAllowedEmpty(int type) { return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF || type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY || - type == RDB_TYPE_HASH_WITH_EXPIRY; + type == RDB_TYPE_HASH_WITH_EXPIRY || type == RDB_TYPE_SBF2; } DbSlice& GetCurrentDbSlice() { @@ -1316,6 +1317,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { case RDB_TYPE_SBF: iores = ReadSBF(); break; + case RDB_TYPE_SBF2: + iores = ReadSBF2(); + break; default: LOG(ERROR) << "Unsupported rdb type " << rdbtype; @@ -1851,7 +1855,7 @@ auto RdbLoaderBase::ReadRedisJson() -> io::Result { return OpaqueObj{std::move(dest), RDB_TYPE_JSON}; } -auto RdbLoaderBase::ReadSBF() -> io::Result { +auto RdbLoaderBase::ReadSBFImpl(bool chunking) -> io::Result { RdbSBF res; uint64_t options; SET_OR_UNEXPECT(LoadLen(nullptr), options); @@ -1874,7 +1878,27 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { unsigned hash_cnt; string filter_data; SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt); - SET_OR_UNEXPECT(FetchGenericString(), filter_data); + + if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) { + unsigned total_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), total_size); + + filter_data.resize(total_size); + size_t offset = 0; + while (offset < total_size) { + unsigned chunk_size = 0; + SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size); + error_code ec = FetchBuf(chunk_size, filter_data.data() + offset); + if (ec) { + return make_unexpected(ec); + } + + offset += chunk_size; + } + } else { + SET_OR_UNEXPECT(FetchGenericString(), filter_data); + } + size_t bit_len = filter_data.size() * 8; if (!is_power2(bit_len)) { // must be power of two return Unexpected(errc::rdb_file_corrupted); @@ -1884,6 +1908,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result { return OpaqueObj{std::move(res), RDB_TYPE_SBF}; } +auto RdbLoaderBase::ReadSBF() -> io::Result { + return ReadSBFImpl(false); +} + +auto RdbLoaderBase::ReadSBF2() -> io::Result { + return ReadSBFImpl(true); +} + template io::Result RdbLoaderBase::FetchInt() { auto ec = EnsureRead(sizeof(T)); if (ec) diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 4950ae39c612..9c837cc5f80e 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -169,7 +169,9 @@ class RdbLoaderBase { ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(int rdbtype); ::io::Result ReadRedisJson(); + ::io::Result ReadSBFImpl(bool chunking); ::io::Result ReadSBF(); + ::io::Result ReadSBF2(); std::error_code SkipModuleData(); std::error_code HandleCompressedBlob(int op_type); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index edbbac0e66e1..e2932fa0a302 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true, "Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+"); +// Flip this value to 'true' in March 2026. +ABSL_FLAG(bool, rdb_sbf_chunked, false, "Enable new save format for saving SBFs in chunks."); + namespace dfly { using namespace std; @@ -623,11 +626,20 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i))); string_view blob = sbf->data(i); - RETURN_ON_ERR(SaveString(blob)); + if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) { + RETURN_ON_ERR(SaveLen(blob.size())); + + for (size_t offset = 0; offset < blob.size(); offset += kFilterChunkSize) { + size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset); + RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len))); + } + } else { + RETURN_ON_ERR(SaveString(blob)); + } + FlushState flush_state = FlushState::kFlushMidEntry; if ((i + 1) == sbf->num_filters()) flush_state = FlushState::kFlushEndEntry; - FlushIfNeeded(flush_state); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 71d6e444d896..ca81ed5ad822 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -209,6 +209,7 @@ class SerializerBase { io::IoBuf mem_buf_; std::unique_ptr compressor_impl_; + static constexpr size_t kFilterChunkSize = 1ULL << 26; static constexpr size_t kMinStrSizeToCompress = 256; static constexpr size_t kMaxStrSizeToCompress = 1 * 1024 * 1024; static constexpr double kMinCompressionReductionPrecentage = 0.95; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 60a9931615ec..e32db2fdc41c 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -35,6 +35,7 @@ ABSL_DECLARE_FLAG(int32, list_max_listpack_size); ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode); ABSL_DECLARE_FLAG(bool, rdb_ignore_expiry); ABSL_DECLARE_FLAG(uint32_t, num_shards); +ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked); namespace dfly { @@ -670,6 +671,34 @@ TEST_F(RdbTest, SBF) { EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1)); } +TEST_F(RdbTest, SBFLargeFilterChunking) { + absl::SetFlag(&FLAGS_rdb_sbf_chunked, true); + max_memory_limit = 200000000; + + // Using this set of parameters for the BF.RESERVE command resulted in a + // filter size large enough to require chunking (> 64 MB). + const double error_rate = 0.001; + const size_t capacity = 50'000'000; + const size_t num_items = 100; + + size_t collisions = 0; + + Run({"BF.RESERVE", "large_key", std::to_string(error_rate), std::to_string(capacity)}); + for (size_t i = 0; i < num_items; i++) { + auto res = Run({"BF.ADD", "large_key", absl::StrCat("item", i)}); + if (*res.GetInt() == 0) + collisions++; + } + EXPECT_LT(static_cast(collisions) / num_items, error_rate); + + Run({"debug", "reload"}); + EXPECT_EQ(Run({"type", "large_key"}), "MBbloom--"); + + for (size_t i = 0; i < num_items; i++) { + EXPECT_THAT(Run({"BF.EXISTS", "large_key", absl::StrCat("item", i)}), IntArg(1)); + } +} + TEST_F(RdbTest, RestoreSearchIndexNameStartingWithColon) { // Create an index with a name that starts with ':' and add a sample document EXPECT_EQ(Run({"FT.CREATE", ":Order:index", "ON", "HASH", "PREFIX", "1", ":Order:", "SCHEMA",