Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
objdevs committed Jan 16, 2025
2 parents 7b3fd83 + 0e116b1 commit de7746a
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 63 deletions.
18 changes: 13 additions & 5 deletions .github/workflows/daily-sanitizers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ on:

jobs:
build:
runs-on: [self-hosted, linux, ARM64]
runs-on: [ubuntu-24.04]
strategy:
matrix:
container: ["ubuntu-dev:22"]
container: ["ubuntu-dev:24"]
build-type: [Debug]
compiler: [{ cxx: g++, c: gcc }]
cxx_flags: ["-Werror"]
compiler: [{ cxx: clang++, c: clang }]
# TODO bring it back when warnings on clang are fixed
# cxx_flags: ["-Werror"]
timeout-minutes: 90
env:
SCCACHE_GHA_ENABLED: "true"
Expand Down Expand Up @@ -59,6 +60,10 @@ jobs:
- name: Configure & Build
run: |
apt -y update
apt -y upgrade
apt install -y clang
which clang
echo "ulimit is"
ulimit -s
echo "-----------------------------"
Expand All @@ -76,7 +81,10 @@ jobs:
-DCMAKE_CXX_COMPILER_LAUNCHER=sccache \
-DCMAKE_CXX_FLAGS="${{matrix.cxx_flags}}" \
-DWITH_ASAN=ON \
-DWITH_USAN=ON
-DWITH_USAN=ON \
-DCMAKE_C_FLAGS=-Wno-error=unused-command-line-argument \
-DCMAKE_CXX_FLAGS=-Wno-error=unused-command-line-argument
# https://maskray.me/blog/2023-08-25-clang-wunused-command-line-argument (search for compiler-rt)
ninja src/all
Expand Down
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,24 @@ option(DF_USE_SSL "Provide support for SSL connections" ON)

find_package(OpenSSL)

SET(SANITIZERS OFF)

option(WITH_ASAN "Enable -fsanitize=address" OFF)
if (SUPPORT_ASAN AND WITH_ASAN)
message(STATUS "address sanitizer enabled")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address")
set(SANITIZERS ON)
endif()

option(WITH_USAN "Enable -fsanitize=undefined" OFF)
if (SUPPORT_USAN AND WITH_USAN)
message(STATUS "ub sanitizer enabled")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=undefined")
set(SANITIZERS ON)
endif()

if(SANITIZERS)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -rtlib=compiler-rt")
endif()

include(third_party)
Expand Down
8 changes: 6 additions & 2 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,12 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
}

uint32_t last_consumed = ptr - str.data();
if (ptr == end) { // we have not finished parsing.
is_broken_token_ = ptr[-1] > 32; // we stopped in the middle of the token.
if (ptr == end) { // we have not finished parsing.
if (cached_expr_->empty()) {
state_ = CMD_COMPLETE_S; // have not found anything besides whitespace.
} else {
is_broken_token_ = ptr[-1] > 32; // we stopped in the middle of the token.
}
return {INPUT_PENDING, last_consumed};
}

Expand Down
7 changes: 7 additions & 0 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,11 @@ TEST_F(RedisParserTest, InlineSplit) {
ASSERT_EQ(RedisParser::OK, Parse("ING\n"));
}

TEST_F(RedisParserTest, InlineReset) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\t \r\n"));
EXPECT_EQ(4, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("*1\r\n$3\r\nfoo\r\n"));
EXPECT_EQ(13, consumed_);
}

} // namespace facade
14 changes: 4 additions & 10 deletions src/server/cluster_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_support.h"
#include "common.h"

using namespace std;

Expand All @@ -31,22 +32,15 @@ void UniqueSlotChecker::Add(SlotId slot_id) {
return;
}

if (!slot_id_.has_value()) {
if (slot_id_ == kNoSlotId) {
slot_id_ = slot_id;
return;
}

if (*slot_id_ != slot_id) {
} else if (slot_id_ != slot_id) {
slot_id_ = kInvalidSlotId;
}
}

optional<SlotId> UniqueSlotChecker::GetUniqueSlotId() const {
if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) {
return nullopt;
}

return slot_id_;
return slot_id_ > kMaxSlotNum ? optional<SlotId>() : slot_id_;
}

namespace {
Expand Down
15 changes: 10 additions & 5 deletions src/server/cluster_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
#include <optional>
#include <string_view>

#include "common.h"

namespace dfly {

using SlotId = std::uint16_t;

constexpr SlotId kMaxSlotNum = 0x3FFF;
constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1;

// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same.
// Only works when cluster is enabled.
Expand All @@ -26,8 +22,17 @@ class UniqueSlotChecker {

std::optional<SlotId> GetUniqueSlotId() const;

void Reset() {
slot_id_ = kNoSlotId;
}

private:
std::optional<SlotId> slot_id_;
// kNoSlotId - if slot wasn't set at all
static constexpr SlotId kNoSlotId = kMaxSlotNum + 1;
// kInvalidSlotId - if several different slots were set
static constexpr SlotId kInvalidSlotId = kNoSlotId + 1;

SlotId slot_id_ = kNoSlotId;
};

SlotId KeySlot(std::string_view key);
Expand Down
45 changes: 30 additions & 15 deletions src/server/journal/cmd_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ class CommandAggregator {
}

enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {

// Returns whether CommitPending() was called
bool AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size();
members_.push_back(std::move(arg));

if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) {
CommitPending();
return true;
}

return false;
}

private:
Expand Down Expand Up @@ -65,26 +70,27 @@ CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer
: cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) {
}

void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
size_t CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true;
size_t commands = 1;
if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) {
switch (pv.ObjType()) {
case OBJ_SET:
SerializeSet(key, pv);
commands = SerializeSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_ZSET:
SerializeZSet(key, pv);
commands = SerializeZSet(key, pv);
use_restore_serialization = false;
break;
case OBJ_HASH:
SerializeHash(key, pv);
commands = SerializeHash(key, pv);
use_restore_serialization = false;
break;
case OBJ_LIST:
SerializeList(key, pv);
commands = SerializeList(key, pv);
use_restore_serialization = false;
break;
case OBJ_STRING:
Expand All @@ -105,6 +111,7 @@ void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const
SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms);
}
return commands;
}

void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
Expand Down Expand Up @@ -139,54 +146,62 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms)
SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
}

void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); },
max_serialization_buffer_size_);

size_t commands = 0;
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
commands += aggregator.AddArg(ce.ToString());
return true;
});
return commands;
}

void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); },
max_serialization_buffer_size_);

size_t commands = 0;
container_utils::IterateSortedSet(
pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString());
commands += aggregator.AddArg(ce.ToString());
return true;
},
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
return commands;
}

void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); },
max_serialization_buffer_size_);

size_t commands = 0;
container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString());
commands += aggregator.AddArg(v.ToString());
return true;
});
return commands;
}

void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
size_t CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); },
max_serialization_buffer_size_);

size_t commands = 0;
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString());
commands += aggregator.AddArg(ce.ToString());
return true;
});
return commands;
}

void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,
Expand Down
13 changes: 7 additions & 6 deletions src/server/journal/cmd_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ class CmdSerializer {

explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size);

void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
// Returns how many commands we broke this entry into (like multiple HSETs etc)
size_t SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);

void SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv);
size_t SerializeSet(std::string_view key, const PrimeValue& pv);
size_t SerializeZSet(std::string_view key, const PrimeValue& pv);
size_t SerializeHash(std::string_view key, const PrimeValue& pv);
size_t SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);

Expand Down
Loading

0 comments on commit de7746a

Please sign in to comment.