Skip to content

Commit

Permalink
Add optional storage callback for creating empty snapshots for bootst…
Browse files Browse the repository at this point in the history
…rapping

Summary:
Add an optional callback to the `wa_raft_storage` behavior that
can be implemented to provide a way to construct a snapshot
of an empty storage state. This empty storage state can be used
to more reliably bootstrap RAFT partitions by simultaneously
constructing and loading the same empty storage state over all
replicas of a cluster when compared to the traditional method
of bootstrapping via promotion of a single leader.

Reviewed By: jaher

Differential Revision: D68859661

fbshipit-source-id: 76d8e863e4193034dee6f4a39096d811eb7f3f49
  • Loading branch information
hsun324 authored and facebook-github-bot committed Mar 7, 2025
1 parent 5592c64 commit a9ab3fd
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 17 deletions.
2 changes: 2 additions & 0 deletions include/wa_raft_rpc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@

-define(ENABLE_COMMAND, ?RAFT_COMMAND(enable, undefined)).
-define(DISABLE_COMMAND(Reason), ?RAFT_COMMAND(disable, Reason)).

-define(BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_COMMAND(bootstrap, {Position, Config, Data})).
90 changes: 77 additions & 13 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
handover/2,
handover_candidates/1,
disable/2,
enable/1
enable/1,
bootstrap/4
]).

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -221,7 +222,7 @@
-type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()).

-type command() :: commit_command() | read_command() | status_command() | promote_command() | resign_command() | adjust_membership_command() | snapshot_available_command() |
handover_candidates_command() | handover_command() | enable_command() | disable_command().
handover_candidates_command() | handover_command() | enable_command() | disable_command() | bootstrap_command().
-type commit_command() :: ?COMMIT_COMMAND(wa_raft_acceptor:op()).
-type read_command() :: ?READ_COMMAND(wa_raft_acceptor:read_op()).
-type status_command() :: ?STATUS_COMMAND.
Expand All @@ -233,6 +234,7 @@
-type handover_command() :: ?HANDOVER_COMMAND(node()).
-type enable_command() :: ?ENABLE_COMMAND.
-type disable_command() :: ?DISABLE_COMMAND(term()).
-type bootstrap_command() :: ?BOOTSTRAP_COMMAND(wa_raft_log:log_pos(), config(), dynamic()).

-type internal_event() :: advance_term_event() | force_election_event().
-type advance_term_event() :: ?ADVANCE_TERM(wa_raft_log:log_term()).
Expand Down Expand Up @@ -516,6 +518,10 @@ disable(Server, Reason) ->
enable(Server) ->
gen_statem:call(Server, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()).

-spec bootstrap(Server :: gen_statem:server_ref(), Position :: wa_raft_log:log_pos(), Config :: config(), Data :: dynamic()) -> ok | wa_raft:error().
bootstrap(Server, Position, Config, Data) ->
gen_statem:call(Server, ?BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_STORAGE_CALL_TIMEOUT()).

%%------------------------------------------------------------------------------
%% RAFT Server - State Machine Implementation - General Callbacks
%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -758,6 +764,52 @@ stalled(_Type, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _PrevLogTerm, _Entr
send_rpc(Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, 0), NewState),
{keep_state, NewState};

stalled({call, From}, ?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Term} = Position, Config, Data),
#raft_state{name = Name, self = Self, data_dir = PartitionPath, log_view = View, storage = Storage, current_term = CurrentTerm, last_applied = LastApplied} = State0) ->
case Index > LastApplied andalso Term > 0 of
true ->
?LOG_NOTICE("Server[~0p, term ~0p, stalled] attempting bootstrap at ~0p:~0p with config ~0p and data ~0P.",
[Name, CurrentTerm, Index, Term, Config, Data, 30], #{domain => [whatsapp, wa_raft]}),
Path = filename:join(PartitionPath, io_lib:format("snapshot.~0p.~0p.bootstrap.tmp", [Index, Term])),
try
ok = wa_raft_storage:make_empty_snapshot(Storage, Path, Position, Config, Data),
ok = wa_raft_storage:open_snapshot(Storage, Path, Position),
{ok, NewView} = wa_raft_log:reset(View, Position),
State1 = State0#raft_state{log_view = NewView, last_applied = Index, commit_index = Index},
State2 = load_config(State1),
case Term > CurrentTerm of
true ->
case is_single_member(Self, config(State2)) of
true ->
State3 = advance_term(?FUNCTION_NAME, Term, node(), State2),
?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to leader as sole member after successful bootstrap.",
[Name, State3#raft_state.current_term], #{domain => [whatsapp, wa_raft]}),
{next_state, leader, State3, {reply, From, ok}};
false ->
State3 = advance_term(?FUNCTION_NAME, Term, undefined, State2),
?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after successful bootstrap.",
[Name, State3#raft_state.current_term], #{domain => [whatsapp, wa_raft]}),
{next_state, follower, State3, {reply, From, ok}}
end;
false ->
?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after successful bootstrap.",
[Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}),
{next_state, follower, State2, {reply, From, ok}}
end
catch
_:Reason ->
?LOG_WARNING("Server[~0p, term ~0p, stalled] failed to bootstrap due to ~0p.",
[Name, CurrentTerm, Reason], #{domain => [whatsapp, wa_raft]}),
{keep_state_and_data, {reply, From, {error, Reason}}}
after
catch file:del_dir_r(Path)
end;
false ->
?LOG_NOTICE("Server[~0p, term ~0p, stalled] at ~0p rejecting request to bootstrap at invalid position ~0p:~0p.",
[Name, CurrentTerm, LastApplied, Index, Term], #{domain => [whatsapp, wa_raft]}),
{keep_state_and_data, {reply, From, {error, rejected}}}
end;

stalled({call, From}, ?SNAPSHOT_AVAILABLE_COMMAND(Root, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos),
#raft_state{name = Name, log_view = View0, storage = Storage,
current_term = CurrentTerm, last_applied = LastApplied} = State0) ->
Expand Down Expand Up @@ -1027,15 +1079,15 @@ leader(cast, ?READ_COMMAND({From, _Command}), #raft_state{table = Table, partiti
{keep_state, State};
%% [Strong Read] Leader is eligible to serve strong reads.
leader(cast, ?READ_COMMAND({From, Command}),
#raft_state{name = Name, table = Table, partition = Partition, log_view = View0, storage = Storage,
#raft_state{name = Name, self = Self, table = Table, partition = Partition, log_view = View0, storage = Storage,
current_term = CurrentTerm, commit_index = CommitIndex, last_applied = LastApplied, first_current_term_log_index = FirstLogIndex} = State0) ->
?LOG_DEBUG("Server[~0p, term ~0p, leader] receives strong read request", [Name, CurrentTerm]),
LastLogIndex = wa_raft_log:last_index(View0),
Pending = wa_raft_log:pending(View0),
ReadIndex = max(CommitIndex, FirstLogIndex),
case config_membership(config(State0)) of
case is_single_member(Self, config(State0)) of
% If we are a single node cluster and we are fully-applied, then immediately dispatch.
[{Name, Node}] when Node =:= node(), Pending =:= 0, ReadIndex =:= LastApplied ->
true when Pending =:= 0, ReadIndex =:= LastApplied ->
wa_raft_storage:read(Storage, From, Command),
{keep_state, State0};
_ ->
Expand Down Expand Up @@ -1934,6 +1986,18 @@ member(Peer, #{membership := Membership}, _Default) ->
member(_Peer, _Config, Default) ->
Default.

%% Returns true only if the membership of the current configuration contains exactly
%% the provided peer and that the provided peer is not specified as a witness.
-spec is_single_member(Peer :: #raft_identity{} | peer(), Config :: config()) -> IsSingleMember :: boolean().
is_single_member(#raft_identity{name = Name, node = Node}, Config) ->
is_single_member({Name, Node}, Config);
is_single_member(Peer, #{membership := Membership, witness := Witnesses}) ->
Membership =:= [Peer] andalso not lists:member(Peer, Witnesses);
is_single_member(Peer, #{membership := Membership}) ->
Membership =:= [Peer];
is_single_member(_Peer, #{}) ->
false.

%% Get the non-empty membership list from the provided config. Raises an error
%% if the membership list is missing or empty.
-spec config_membership(Config :: config()) -> Membership :: membership().
Expand Down Expand Up @@ -2065,17 +2129,17 @@ get_log_entry(#raft_state{current_term = CurrentTerm, label_module = LabelModule
{State0#raft_state{last_label = NewLabel}, {CurrentTerm, {Ref, NewLabel, Command}}}.

-spec apply_single_node_cluster(State0 :: #raft_state{}) -> State1 :: #raft_state{}.
apply_single_node_cluster(#raft_state{name = Name, log_view = View0} = State0) ->
apply_single_node_cluster(#raft_state{self = Self, log_view = View} = State) ->
% TODO(hsun324) T112326686: Review after RAFT RPC id changes.
case config_membership(config(State0)) of
[{Name, Node}] when Node =:= node() ->
View1 = case wa_raft_log:sync(View0) of
case is_single_member(Self, config(State)) of
true ->
NewView = case wa_raft_log:sync(View) of
{ok, L} -> L;
_ -> View0
_ -> View
end,
maybe_apply(State0#raft_state{log_view = View1});
_ ->
State0
maybe_apply(State#raft_state{log_view = NewView});
false ->
State
end.

%% Leader - check quorum and apply logs if necessary
Expand Down
34 changes: 32 additions & 2 deletions src/wa_raft_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
open_snapshot/3,
create_snapshot/1,
create_snapshot/2,
make_empty_snapshot/5,
delete_snapshot/2
]).

Expand Down Expand Up @@ -256,6 +257,20 @@
%% raised.
-callback storage_open_snapshot(Path :: file:filename(), ExpectedPosition :: wa_raft_log:log_pos(), Handle :: storage_handle()) -> {ok, NewHandle :: storage_handle()} | error().

%%-----------------------------------------------------------------------------
%% RAFT Storage Provider - Bootstrapping
%%-----------------------------------------------------------------------------

%% Create a new snapshot at the provided path that contains some directory
%% tree that when subsequently loaded using `storage_open_snapshot` results in
%% a storage state with the provided last applied position and for which
%% subsequent calls to `storage_config` returns the provided position as the
%% version and the config as the value. Extra data may be used by implementors
%% to provide extra state via arguments to external APIs that use this
%% endpoint, such as the partition bootstrapping API.
-callback storage_make_empty_snapshot(Name :: atom(), Identifier :: #raft_identifier{}, Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: dynamic()) -> ok | error().
-optional_callback([storage_make_empty_snapshot/6]).

%%-----------------------------------------------------------------------------
%% RAFT Storage - Types
%%-----------------------------------------------------------------------------
Expand All @@ -277,6 +292,7 @@
name :: atom(),
table :: wa_raft:table(),
partition :: wa_raft:partition(),
identifier :: #raft_identifier{},
root_dir :: file:filename(),
module :: module(),
handle :: storage_handle(),
Expand Down Expand Up @@ -341,6 +357,10 @@ create_snapshot(ServiceRef) ->
create_snapshot(ServiceRef, Name) ->
gen_server:call(ServiceRef, {snapshot_create, Name}, ?RAFT_STORAGE_CALL_TIMEOUT()).

-spec make_empty_snapshot(ServiceRef :: pid() | atom(), Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: term()) -> ok | error().
make_empty_snapshot(ServiceRef, Path, Position, Config, Data) ->
gen_server:call(ServiceRef, {make_empty_snapshot, Path, Position, Config, Data}).

-spec delete_snapshot(ServiceRef :: pid() | atom(), Name :: string()) -> ok.
delete_snapshot(ServiceRef, Name) ->
gen_server:cast(ServiceRef, {snapshot_delete, Name}).
Expand Down Expand Up @@ -388,13 +408,13 @@ registered_name(Table, Partition) ->
%%-------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> {ok, #state{}}.
init(#raft_options{application = App, table = Table, partition = Partition, database = RootDir, storage_name = Name, storage_module = Module}) ->
init(#raft_options{table = Table, partition = Partition, identifier = Identifier, database = RootDir, storage_name = Name, storage_module = Module}) ->
process_flag(trap_exit, true),

?LOG_NOTICE("Storage[~0p] starting for partition ~0p/~0p at ~0p using ~0p",
[Name, Table, Partition, RootDir, Module], #{domain => [whatsapp, wa_raft]}),

Handle = Module:storage_open(Name, #raft_identifier{application = App, table = Table, partition = Partition}, RootDir),
Handle = Module:storage_open(Name, Identifier, RootDir),
LastApplied = Module:storage_position(Handle),

?LOG_NOTICE("Storage[~0p] opened at position ~0p.",
Expand All @@ -404,6 +424,7 @@ init(#raft_options{application = App, table = Table, partition = Partition, data
name = Name,
table = Table,
partition = Partition,
identifier = Identifier,
root_dir = RootDir,
module = Module,
handle = Handle,
Expand All @@ -427,6 +448,7 @@ init(#raft_options{application = App, table = Table, partition = Partition, data
status |
{snapshot_create, Name :: string()} |
{snapshot_open, Path :: file:filename(), LastAppliedPos :: wa_raft_log:log_pos()} |
{make_empty_snapshot, Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: term()} |
label |
config.
handle_call(open, _From, #state{last_applied = LastApplied} = State) ->
Expand Down Expand Up @@ -455,6 +477,14 @@ handle_call({snapshot_open, SnapshotPath, LogPos}, _From, #state{name = Name, mo
{error, Reason} -> {reply, {error, Reason}, State}
end;

handle_call({make_empty_snapshot, Path, Position, Config, Data}, _From, #state{name = Name, identifier = Identifier, module = Module} = State) ->
?LOG_NOTICE("Storage[~0p] making bootstrap snapshot ~0p at ~0p with config ~0p and data ~0P.",
[Name, Path, Position, Config, Data, 30], #{domain => [whatsapp, wa_raft]}),
case erlang:function_exported(Module, storage_make_empty_snapshot, 6) of
true -> {reply, Module:storage_make_empty_snapshot(Name, Identifier, Path, Position, Config, Data), State};
false -> {reply, {error, not_supported}, State}
end;

handle_call(config, _From, #state{module = Module, handle = Handle} = State) ->
?RAFT_COUNT('raft.storage.config'),
Result = Module:storage_config(Handle),
Expand Down
15 changes: 13 additions & 2 deletions src/wa_raft_storage_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
storage_apply_config/3,
storage_read/3,
storage_create_snapshot/2,
storage_open_snapshot/3
storage_open_snapshot/3,
storage_make_empty_snapshot/6
]).

-include("wa_raft.hrl").

%% Options used for the ETS table
-define(OPTIONS, [set, public, {read_concurrency, true}, {write_concurrency, true}]).

%% Filename used for the actual ETS table file in a snapshot
-define(SNAPSHOT_FILENAME, "data").

Expand All @@ -46,7 +50,7 @@

-spec storage_open(atom(), #raft_identifier{}, file:filename()) -> #state{}.
storage_open(Name, #raft_identifier{table = Table, partition = Partition}, _RootDir) ->
Storage = ets:new(Name, [set, public, {read_concurrency, true}, {write_concurrency, true}]),
Storage = ets:new(Name, ?OPTIONS),
#state{name = Name, table = Table, partition = Partition, storage = Storage}.

-spec storage_close(#state{}) -> ok.
Expand Down Expand Up @@ -130,3 +134,10 @@ storage_open_snapshot(SnapshotPath, SnapshotPosition, #state{storage = Storage}
{error, Reason} ->
{error, Reason}
end.

-spec storage_make_empty_snapshot(atom(), #raft_identifier{}, file:filename(), wa_raft_log:log_pos(), wa_raft_server:config(), dynamic()) -> ok | wa_raft_storage:error().
storage_make_empty_snapshot(Name, #raft_identifier{table = Table, partition = Partition}, SnapshotPath, SnapshotPosition, Config, _Data) ->
Storage = ets:new(Name, ?OPTIONS),
State = #state{name = Name, table = Table, partition = Partition, storage = Storage},
storage_apply_config(Config, SnapshotPosition, State),
storage_create_snapshot(SnapshotPath, State).

0 comments on commit a9ab3fd

Please sign in to comment.