Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
constexpr uint8_t RDB_TYPE_SBF = 33;
constexpr uint8_t RDB_TYPE_SBF2 = 34;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
(type == RDB_TYPE_SBF);
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_SBF2);
}

// Opcodes: Range 200-240 is used by DF extensions.
Expand Down
38 changes: 35 additions & 3 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extern "C" {
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
ABSL_DECLARE_FLAG(uint32_t, dbnum);
ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked);
ABSL_FLAG(bool, rdb_load_dry_run, false, "Dry run RDB load without applying changes");
ABSL_FLAG(bool, rdb_ignore_expiry, false, "Ignore Key Expiry when loding from RDB snapshot");

Expand Down Expand Up @@ -188,7 +189,7 @@ string ModuleTypeName(uint64_t module_id) {
bool RdbTypeAllowedEmpty(int type) {
return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF ||
type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY ||
type == RDB_TYPE_HASH_WITH_EXPIRY;
type == RDB_TYPE_HASH_WITH_EXPIRY || type == RDB_TYPE_SBF2;
}

DbSlice& GetCurrentDbSlice() {
Expand Down Expand Up @@ -1316,6 +1317,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_SBF:
iores = ReadSBF();
break;
case RDB_TYPE_SBF2:
iores = ReadSBF2();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1851,7 +1855,7 @@ auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
}

auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadSBFImpl(bool chunking) -> io::Result<OpaqueObj> {
RdbSBF res;
uint64_t options;
SET_OR_UNEXPECT(LoadLen(nullptr), options);
Expand All @@ -1874,7 +1878,27 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
unsigned hash_cnt;
string filter_data;
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
SET_OR_UNEXPECT(FetchGenericString(), filter_data);

if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think you need to use FLAGS_rdb_sbf_chunked in the loader.
The load part should only rely on RDB_TYPE_SBF2 and RDB_TYPE_SBF to decide how to parse. it should be stateless in this sense.

unsigned total_size = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);

filter_data.resize(total_size);
size_t offset = 0;
while (offset < total_size) {
unsigned chunk_size = 0;
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
if (ec) {
return make_unexpected(ec);
}

offset += chunk_size;
}
} else {
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
}

size_t bit_len = filter_data.size() * 8;
if (!is_power2(bit_len)) { // must be power of two
return Unexpected(errc::rdb_file_corrupted);
Expand All @@ -1884,6 +1908,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
}

auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
return ReadSBFImpl(false);
}

auto RdbLoaderBase::ReadSBF2() -> io::Result<OpaqueObj> {
return ReadSBFImpl(true);
}

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadStreams(int rdbtype);
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadSBFImpl(bool chunking);
::io::Result<OpaqueObj> ReadSBF();
::io::Result<OpaqueObj> ReadSBF2();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
Expand Down
16 changes: 14 additions & 2 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true,
"Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+");

// Flip this value to 'true' in March 2026.
ABSL_FLAG(bool, rdb_sbf_chunked, false, "Enable new save format for saving SBFs in chunks.");

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -623,11 +626,20 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));

string_view blob = sbf->data(i);
RETURN_ON_ERR(SaveString(blob));
if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) {
RETURN_ON_ERR(SaveLen(blob.size()));

for (size_t offset = 0; offset < blob.size(); offset += kFilterChunkSize) {
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
}
} else {
RETURN_ON_ERR(SaveString(blob));
}

FlushState flush_state = FlushState::kFlushMidEntry;
if ((i + 1) == sbf->num_filters())
flush_state = FlushState::kFlushEndEntry;

FlushIfNeeded(flush_state);
}

Expand Down
1 change: 1 addition & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class SerializerBase {
io::IoBuf mem_buf_;
std::unique_ptr<detail::CompressorImpl> compressor_impl_;

static constexpr size_t kFilterChunkSize = 1ULL << 26;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr size_t kMaxStrSizeToCompress = 1 * 1024 * 1024;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
Expand Down
29 changes: 29 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
ABSL_DECLARE_FLAG(bool, rdb_ignore_expiry);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked);

namespace dfly {

Expand Down Expand Up @@ -670,6 +671,34 @@ TEST_F(RdbTest, SBF) {
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
}

TEST_F(RdbTest, SBFLargeFilterChunking) {
absl::SetFlag(&FLAGS_rdb_sbf_chunked, true);
max_memory_limit = 200000000;

// Using this set of parameters for the BF.RESERVE command resulted in a
// filter size large enough to require chunking (> 64 MB).
const double error_rate = 0.001;
const size_t capacity = 50'000'000;
const size_t num_items = 100;

size_t collisions = 0;

Run({"BF.RESERVE", "large_key", std::to_string(error_rate), std::to_string(capacity)});
for (size_t i = 0; i < num_items; i++) {
auto res = Run({"BF.ADD", "large_key", absl::StrCat("item", i)});
if (*res.GetInt() == 0)
collisions++;
}
EXPECT_LT(static_cast<double>(collisions) / num_items, error_rate);

Run({"debug", "reload"});
EXPECT_EQ(Run({"type", "large_key"}), "MBbloom--");

for (size_t i = 0; i < num_items; i++) {
EXPECT_THAT(Run({"BF.EXISTS", "large_key", absl::StrCat("item", i)}), IntArg(1));
}
}

TEST_F(RdbTest, RestoreSearchIndexNameStartingWithColon) {
// Create an index with a name that starts with ':' and add a sample document
EXPECT_EQ(Run({"FT.CREATE", ":Order:index", "ON", "HASH", "PREFIX", "1", ":Order:", "SCHEMA",
Expand Down
Loading