Skip to content

Commit

Permalink
Fetch fresh snapshot from peer when joining (take 2) (#6758)
Browse files Browse the repository at this point in the history
Co-authored-by: Amaury Chamayou <[email protected]>
Co-authored-by: Amaury Chamayou <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent c157ac1 commit 542c2ba
Show file tree
Hide file tree
Showing 25 changed files with 1,221 additions and 216 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
# libc++
tdnf -y install libcxx-devel llvm-libunwind-devel llvm-libunwind-static
# Dependencies
tdnf -y install openssl-devel libuv-devel nghttp2-devel
tdnf -y install openssl-devel libuv-devel nghttp2-devel curl-devel
# Test dependencies
tdnf -y install libarrow-devel parquet-libs-devel lldb npm jq expect
# Install CDDL via rubygems
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and this project adheres Fto [Semantic Versioning](http://semver.org/spec/v2.0.0
- nghttp2 is now picked up from the OS rather than vendored to enable libcurl usage
- Misc dependency updates (#6725)

### Added

- Joining nodes can now request a snapshot from their peers at startup, rather than relying on file access. The joinee's snapshot will be fetched and used if it is more recent than the joiner has access to (#6758).

## [6.0.0-dev11]

[6.0.0-dev11]: https://github.com/microsoft/CCF/releases/tag/6.0.0-dev11
Expand Down
12 changes: 10 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual")
endif()

target_link_libraries(
cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX} ccfcrypto.host
cchost
PRIVATE uv
${TLS_LIBRARY}
${CMAKE_DL_LIBS}
${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX}
ccfcrypto.host
curl
http_parser.host
)

install(TARGETS cchost DESTINATION bin)
Expand Down Expand Up @@ -696,6 +703,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp
)
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@
"type": "boolean",
"default": true,
"description": "Whether to follow redirects to the primary node of the existing service to join"
},
"fetch_recent_snapshot": {
"type": "boolean",
"default": true,
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
}
},
"required": ["target_rpc_address"],
Expand Down
8 changes: 8 additions & 0 deletions include/ccf/ds/nonstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ namespace ccf::nonstd
});
}

static inline std::string_view trim(
std::string_view s, std::string_view trim_chars = " \t\r\n")
{
const auto start = std::min(s.find_first_not_of(trim_chars), s.size());
const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size());
return s.substr(start, end - start);
}

/// Iterate through tuple, calling functor on each element
template <size_t I = 0, typename F, typename... Ts>
static void tuple_for_each(const std::tuple<Ts...>& t, const F& f)
Expand Down
10 changes: 10 additions & 0 deletions include/ccf/node/startup_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ namespace ccf
bool operator==(const Attestation&) const = default;
};
Attestation attestation = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};
};

struct StartupConfig : CCFConfig
Expand Down
24 changes: 24 additions & 0 deletions python/src/ccf/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]:
raise ValueError(f"Could not read seqno range from ledger file {filename}")


def snapshot_index_from_filename(filename: str) -> Tuple[int, int]:
elements = (
os.path.basename(filename)
.replace(COMMITTED_FILE_SUFFIX, "")
.replace("snapshot_", "")
.split("_")
)
if len(elements) == 2:
return (int(elements[0]), int(elements[1]))
else:
raise ValueError(f"Could not read snapshot index from file name {filename}")


class GcmHeader:
view: int
seqno: int
Expand Down Expand Up @@ -851,6 +864,17 @@ def get_len(self) -> int:
return self._file_size


def latest_snapshot(snapshots_dir):
best_name, best_seqno = None, None
for s in os.listdir(snapshots_dir):
with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot:
snapshot_seqno = snapshot.get_public_domain().get_seqno()
if best_seqno is None or snapshot_seqno > best_seqno:
best_name = s
best_seqno = snapshot_seqno
return best_name


class LedgerChunk:
"""
Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk.
Expand Down
6 changes: 6 additions & 0 deletions src/common/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ namespace ccf
snp_security_policy_file,
snp_uvm_endorsements_file);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCFConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network);
DECLARE_JSON_OPTIONAL_FIELDS(
Expand All @@ -94,6 +99,7 @@ namespace ccf
ledger_signatures,
jwt,
attestation,
snapshots,
node_to_node_message_limit,
historical_cache_soft_limit);

Expand Down
16 changes: 16 additions & 0 deletions src/ds/test/nonstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd"))
}
}
}

TEST_CASE("trim" * doctest::test_suite("nonstd"))
{
REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world");
REQUIRE(
ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") ==
"hello world");
REQUIRE(ccf::nonstd::trim("..hello..") == "..hello..");
REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello");

REQUIRE(ccf::nonstd::trim("hello") == "hello");
REQUIRE(ccf::nonstd::trim(" h") == "h");
REQUIRE(ccf::nonstd::trim("h ") == "h");
REQUIRE(ccf::nonstd::trim(" ") == "");
REQUIRE(ccf::nonstd::trim("") == "");
}
21 changes: 5 additions & 16 deletions src/host/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,6 @@ namespace host
};
Ledger ledger = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};

struct Logging
{
ccf::LoggerLevel host_level = ccf::LoggerLevel::INFO;
Expand Down Expand Up @@ -155,6 +145,7 @@ namespace host
ccf::NodeInfoNetwork::NetAddress target_rpc_address;
ccf::ds::TimeString retry_timeout = {"1000ms"};
bool follow_redirect = true;
bool fetch_recent_snapshot = true;

bool operator==(const Join&) const = default;
};
Expand Down Expand Up @@ -189,11 +180,6 @@ namespace host
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Ledger, directory, read_only_directories, chunk_size);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format);
Expand All @@ -216,7 +202,10 @@ namespace host
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Command::Join, retry_timeout, follow_redirect);
CCHostConfig::Command::Join,
retry_timeout,
follow_redirect,
fetch_recent_snapshot);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);
Expand Down
73 changes: 56 additions & 17 deletions src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include "process_launcher.h"
#include "rpc_connections.h"
#include "sig_term.h"
#include "snapshots.h"
#include "snapshots/fetch.h"
#include "snapshots/snapshot_manager.h"
#include "ticker.h"
#include "time_updater.h"

Expand Down Expand Up @@ -376,7 +377,7 @@ int main(int argc, char** argv)
config.ledger.read_only_directories);
ledger.register_message_handlers(bp.get_dispatcher());

asynchost::SnapshotManager snapshots(
snapshots::SnapshotManager snapshots(
config.snapshots.directory,
writer_factory,
config.snapshots.read_only_directory);
Expand Down Expand Up @@ -507,8 +508,6 @@ int main(int argc, char** argv)

ccf::StartupConfig startup_config(config);

startup_config.snapshot_tx_interval = config.snapshots.tx_count;

if (startup_config.attestation.snp_security_policy_file.has_value())
{
auto security_policy_file =
Expand Down Expand Up @@ -690,22 +689,62 @@ int main(int argc, char** argv)
config.command.type == StartType::Join ||
config.command.type == StartType::Recover)
{
auto latest_committed_snapshot =
snapshots.find_latest_committed_snapshot();
if (latest_committed_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);
auto latest_local_snapshot = snapshots.find_latest_committed_snapshot();

LOG_INFO_FMT(
"Found latest snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
if (
config.command.type == StartType::Join &&
config.command.join.fetch_recent_snapshot)
{
// Try to fetch a recent snapshot from peer
const size_t latest_local_idx = latest_local_snapshot.has_value() ?
snapshots::get_snapshot_idx_from_file_name(
latest_local_snapshot->second) :
0;
auto latest_peer_snapshot = snapshots::fetch_from_peer(
config.command.join.target_rpc_address,
config.command.service_certificate_file,
latest_local_idx);

if (latest_peer_snapshot.has_value())
{
LOG_INFO_FMT(
"Received snapshot {} from peer (size: {}) - writing this to disk "
"and using for join startup",
latest_peer_snapshot->snapshot_name,
latest_peer_snapshot->snapshot_data.size());

const auto dst_path = fs::path(config.snapshots.directory) /
fs::path(latest_peer_snapshot->snapshot_name);
if (files::exists(dst_path))
{
LOG_FATAL_FMT(
"Unable to write peer snapshot - already have a file at {}. "
"Exiting.",
dst_path);
return static_cast<int>(CLI::ExitCodes::FileError);
}
files::dump(latest_peer_snapshot->snapshot_data, dst_path);
startup_snapshot = latest_peer_snapshot->snapshot_data;
}
}
else

if (startup_snapshot.empty())
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
if (latest_local_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);

LOG_INFO_FMT(
"Found latest local snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
}
else
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/host/test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "crypto/openssl/hash.h"
#include "ds/files.h"
#include "ds/serialized.h"
#include "host/snapshots.h"
#include "kv/serialised_entry_format.h"
#include "snapshots/snapshot_manager.h"

#define DOCTEST_CONFIG_IMPLEMENT
#include <doctest/doctest.h>
Expand Down Expand Up @@ -1259,6 +1259,8 @@ TEST_CASE("Snapshot file name" * doctest::test_suite("snapshot"))
std::vector<size_t> snapshot_idx_interval_ranges = {
10, 1000, 10000, std::numeric_limits<size_t>::max() - 2};

using namespace snapshots;

for (auto const& snapshot_idx_interval_range : snapshot_idx_interval_ranges)
{
std::uniform_int_distribution<size_t> dist(1, snapshot_idx_interval_range);
Expand Down Expand Up @@ -1304,6 +1306,7 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))
auto snap_ro_dir = AutoDeleteFolder(snapshot_dir_read_only);
fs::create_directory(snapshot_dir_read_only);

using namespace snapshots;
SnapshotManager snapshots(snapshot_dir, wf, snapshot_dir_read_only);

size_t snapshot_interval = 5;
Expand Down
Loading

0 comments on commit 542c2ba

Please sign in to comment.