diff --git a/include/wa_raft_rpc.hrl b/include/wa_raft_rpc.hrl index 6a5a49c..aa1cb90 100644 --- a/include/wa_raft_rpc.hrl +++ b/include/wa_raft_rpc.hrl @@ -90,3 +90,5 @@ -define(ENABLE_COMMAND, ?RAFT_COMMAND(enable, undefined)). -define(DISABLE_COMMAND(Reason), ?RAFT_COMMAND(disable, Reason)). + +-define(BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_COMMAND(bootstrap, {Position, Config, Data})). diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index cb66aba..1d5e5a7 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -96,7 +96,8 @@ handover/2, handover_candidates/1, disable/2, - enable/1 + enable/1, + bootstrap/4 ]). %%------------------------------------------------------------------------------ @@ -221,7 +222,7 @@ -type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()). -type command() :: commit_command() | read_command() | status_command() | promote_command() | resign_command() | adjust_membership_command() | snapshot_available_command() | - handover_candidates_command() | handover_command() | enable_command() | disable_command(). + handover_candidates_command() | handover_command() | enable_command() | disable_command() | bootstrap_command(). -type commit_command() :: ?COMMIT_COMMAND(wa_raft_acceptor:op()). -type read_command() :: ?READ_COMMAND(wa_raft_acceptor:read_op()). -type status_command() :: ?STATUS_COMMAND. @@ -233,6 +234,7 @@ -type handover_command() :: ?HANDOVER_COMMAND(node()). -type enable_command() :: ?ENABLE_COMMAND. -type disable_command() :: ?DISABLE_COMMAND(term()). +-type bootstrap_command() :: ?BOOTSTRAP_COMMAND(wa_raft_log:log_pos(), config(), dynamic()). -type internal_event() :: advance_term_event() | force_election_event(). -type advance_term_event() :: ?ADVANCE_TERM(wa_raft_log:log_term()). @@ -516,6 +518,10 @@ disable(Server, Reason) -> enable(Server) -> gen_statem:call(Server, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). +-spec bootstrap(Server :: gen_statem:server_ref(), Position :: wa_raft_log:log_pos(), Config :: config(), Data :: dynamic()) -> ok | wa_raft:error(). +bootstrap(Server, Position, Config, Data) -> + gen_statem:call(Server, ?BOOTSTRAP_COMMAND(Position, Config, Data), ?RAFT_STORAGE_CALL_TIMEOUT()). + %%------------------------------------------------------------------------------ %% RAFT Server - State Machine Implementation - General Callbacks %%------------------------------------------------------------------------------ @@ -758,6 +764,52 @@ stalled(_Type, ?REMOTE(Sender, ?APPEND_ENTRIES(PrevLogIndex, _PrevLogTerm, _Entr send_rpc(Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, 0), NewState), {keep_state, NewState}; +stalled({call, From}, ?BOOTSTRAP_COMMAND(#raft_log_pos{index = Index, term = Term} = Position, Config, Data), + #raft_state{name = Name, self = Self, data_dir = PartitionPath, log_view = View, storage = Storage, current_term = CurrentTerm, last_applied = LastApplied} = State0) -> + case Index > LastApplied andalso Term > 0 of + true -> + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] attempting bootstrap at ~0p:~0p with config ~0p and data ~0P.", + [Name, CurrentTerm, Index, Term, Config, Data, 30], #{domain => [whatsapp, wa_raft]}), + Path = filename:join(PartitionPath, io_lib:format("snapshot.~0p.~0p.bootstrap.tmp", [Index, Term])), + try + ok = wa_raft_storage:make_empty_snapshot(Storage, Path, Position, Config, Data), + ok = wa_raft_storage:open_snapshot(Storage, Path, Position), + {ok, NewView} = wa_raft_log:reset(View, Position), + State1 = State0#raft_state{log_view = NewView, last_applied = Index, commit_index = Index}, + State2 = load_config(State1), + case Term > CurrentTerm of + true -> + case is_single_member(Self, config(State2)) of + true -> + State3 = advance_term(?FUNCTION_NAME, Term, node(), State2), + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to leader as sole member after successful bootstrap.", + [Name, State3#raft_state.current_term], #{domain => [whatsapp, wa_raft]}), + {next_state, leader, State3, {reply, From, ok}}; + false -> + State3 = advance_term(?FUNCTION_NAME, Term, undefined, State2), + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after successful bootstrap.", + [Name, State3#raft_state.current_term], #{domain => [whatsapp, wa_raft]}), + {next_state, follower, State3, {reply, From, ok}} + end; + false -> + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] switching to follower after successful bootstrap.", + [Name, CurrentTerm], #{domain => [whatsapp, wa_raft]}), + {next_state, follower, State2, {reply, From, ok}} + end + catch + _:Reason -> + ?LOG_WARNING("Server[~0p, term ~0p, stalled] failed to bootstrap due to ~0p.", + [Name, CurrentTerm, Reason], #{domain => [whatsapp, wa_raft]}), + {keep_state_and_data, {reply, From, {error, Reason}}} + after + catch file:del_dir_r(Path) + end; + false -> + ?LOG_NOTICE("Server[~0p, term ~0p, stalled] at ~0p rejecting request to bootstrap at invalid position ~0p:~0p.", + [Name, CurrentTerm, LastApplied, Index, Term], #{domain => [whatsapp, wa_raft]}), + {keep_state_and_data, {reply, From, {error, rejected}}} + end; + stalled({call, From}, ?SNAPSHOT_AVAILABLE_COMMAND(Root, #raft_log_pos{index = SnapshotIndex, term = SnapshotTerm} = SnapshotPos), #raft_state{name = Name, log_view = View0, storage = Storage, current_term = CurrentTerm, last_applied = LastApplied} = State0) -> @@ -1027,15 +1079,15 @@ leader(cast, ?READ_COMMAND({From, _Command}), #raft_state{table = Table, partiti {keep_state, State}; %% [Strong Read] Leader is eligible to serve strong reads. leader(cast, ?READ_COMMAND({From, Command}), - #raft_state{name = Name, table = Table, partition = Partition, log_view = View0, storage = Storage, + #raft_state{name = Name, self = Self, table = Table, partition = Partition, log_view = View0, storage = Storage, current_term = CurrentTerm, commit_index = CommitIndex, last_applied = LastApplied, first_current_term_log_index = FirstLogIndex} = State0) -> ?LOG_DEBUG("Server[~0p, term ~0p, leader] receives strong read request", [Name, CurrentTerm]), LastLogIndex = wa_raft_log:last_index(View0), Pending = wa_raft_log:pending(View0), ReadIndex = max(CommitIndex, FirstLogIndex), - case config_membership(config(State0)) of + case is_single_member(Self, config(State0)) of % If we are a single node cluster and we are fully-applied, then immediately dispatch. - [{Name, Node}] when Node =:= node(), Pending =:= 0, ReadIndex =:= LastApplied -> + true when Pending =:= 0, ReadIndex =:= LastApplied -> wa_raft_storage:read(Storage, From, Command), {keep_state, State0}; _ -> @@ -1934,6 +1986,18 @@ member(Peer, #{membership := Membership}, _Default) -> member(_Peer, _Config, Default) -> Default. +%% Returns true only if the membership of the current configuration contains exactly +%% the provided peer and that the provided peer is not specified as a witness. +-spec is_single_member(Peer :: #raft_identity{} | peer(), Config :: config()) -> IsSingleMember :: boolean(). +is_single_member(#raft_identity{name = Name, node = Node}, Config) -> + is_single_member({Name, Node}, Config); +is_single_member(Peer, #{membership := Membership, witness := Witnesses}) -> + Membership =:= [Peer] andalso not lists:member(Peer, Witnesses); +is_single_member(Peer, #{membership := Membership}) -> + Membership =:= [Peer]; +is_single_member(_Peer, #{}) -> + false. + %% Get the non-empty membership list from the provided config. Raises an error %% if the membership list is missing or empty. -spec config_membership(Config :: config()) -> Membership :: membership(). @@ -2065,17 +2129,17 @@ get_log_entry(#raft_state{current_term = CurrentTerm, label_module = LabelModule {State0#raft_state{last_label = NewLabel}, {CurrentTerm, {Ref, NewLabel, Command}}}. -spec apply_single_node_cluster(State0 :: #raft_state{}) -> State1 :: #raft_state{}. -apply_single_node_cluster(#raft_state{name = Name, log_view = View0} = State0) -> +apply_single_node_cluster(#raft_state{self = Self, log_view = View} = State) -> % TODO(hsun324) T112326686: Review after RAFT RPC id changes. - case config_membership(config(State0)) of - [{Name, Node}] when Node =:= node() -> - View1 = case wa_raft_log:sync(View0) of + case is_single_member(Self, config(State)) of + true -> + NewView = case wa_raft_log:sync(View) of {ok, L} -> L; - _ -> View0 + _ -> View end, - maybe_apply(State0#raft_state{log_view = View1}); - _ -> - State0 + maybe_apply(State#raft_state{log_view = NewView}); + false -> + State end. %% Leader - check quorum and apply logs if necessary diff --git a/src/wa_raft_storage.erl b/src/wa_raft_storage.erl index 2bc2687..bcc4d3f 100644 --- a/src/wa_raft_storage.erl +++ b/src/wa_raft_storage.erl @@ -31,6 +31,7 @@ open_snapshot/3, create_snapshot/1, create_snapshot/2, + make_empty_snapshot/5, delete_snapshot/2 ]). @@ -256,6 +257,20 @@ %% raised. -callback storage_open_snapshot(Path :: file:filename(), ExpectedPosition :: wa_raft_log:log_pos(), Handle :: storage_handle()) -> {ok, NewHandle :: storage_handle()} | error(). +%%----------------------------------------------------------------------------- +%% RAFT Storage Provider - Bootstrapping +%%----------------------------------------------------------------------------- + +%% Create a new snapshot at the provided path that contains some directory +%% tree that when subsequently loaded using `storage_open_snapshot` results in +%% a storage state with the provided last applied position and for which +%% subsequent calls to `storage_config` returns the provided position as the +%% version and the config as the value. Extra data may be used by implementors +%% to provide extra state via arguments to external APIs that use this +%% endpoint, such as the partition bootstrapping API. +-callback storage_make_empty_snapshot(Name :: atom(), Identifier :: #raft_identifier{}, Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: dynamic()) -> ok | error(). +-optional_callback([storage_make_empty_snapshot/6]). + %%----------------------------------------------------------------------------- %% RAFT Storage - Types %%----------------------------------------------------------------------------- @@ -277,6 +292,7 @@ name :: atom(), table :: wa_raft:table(), partition :: wa_raft:partition(), + identifier :: #raft_identifier{}, root_dir :: file:filename(), module :: module(), handle :: storage_handle(), @@ -341,6 +357,10 @@ create_snapshot(ServiceRef) -> create_snapshot(ServiceRef, Name) -> gen_server:call(ServiceRef, {snapshot_create, Name}, ?RAFT_STORAGE_CALL_TIMEOUT()). +-spec make_empty_snapshot(ServiceRef :: pid() | atom(), Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: term()) -> ok | error(). +make_empty_snapshot(ServiceRef, Path, Position, Config, Data) -> + gen_server:call(ServiceRef, {make_empty_snapshot, Path, Position, Config, Data}). + -spec delete_snapshot(ServiceRef :: pid() | atom(), Name :: string()) -> ok. delete_snapshot(ServiceRef, Name) -> gen_server:cast(ServiceRef, {snapshot_delete, Name}). @@ -388,13 +408,13 @@ registered_name(Table, Partition) -> %%------------------------------------------------------------------- -spec init(Options :: #raft_options{}) -> {ok, #state{}}. -init(#raft_options{application = App, table = Table, partition = Partition, database = RootDir, storage_name = Name, storage_module = Module}) -> +init(#raft_options{table = Table, partition = Partition, identifier = Identifier, database = RootDir, storage_name = Name, storage_module = Module}) -> process_flag(trap_exit, true), ?LOG_NOTICE("Storage[~0p] starting for partition ~0p/~0p at ~0p using ~0p", [Name, Table, Partition, RootDir, Module], #{domain => [whatsapp, wa_raft]}), - Handle = Module:storage_open(Name, #raft_identifier{application = App, table = Table, partition = Partition}, RootDir), + Handle = Module:storage_open(Name, Identifier, RootDir), LastApplied = Module:storage_position(Handle), ?LOG_NOTICE("Storage[~0p] opened at position ~0p.", @@ -404,6 +424,7 @@ init(#raft_options{application = App, table = Table, partition = Partition, data name = Name, table = Table, partition = Partition, + identifier = Identifier, root_dir = RootDir, module = Module, handle = Handle, @@ -427,6 +448,7 @@ init(#raft_options{application = App, table = Table, partition = Partition, data status | {snapshot_create, Name :: string()} | {snapshot_open, Path :: file:filename(), LastAppliedPos :: wa_raft_log:log_pos()} | + {make_empty_snapshot, Path :: file:filename(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config(), Data :: term()} | label | config. handle_call(open, _From, #state{last_applied = LastApplied} = State) -> @@ -455,6 +477,14 @@ handle_call({snapshot_open, SnapshotPath, LogPos}, _From, #state{name = Name, mo {error, Reason} -> {reply, {error, Reason}, State} end; +handle_call({make_empty_snapshot, Path, Position, Config, Data}, _From, #state{name = Name, identifier = Identifier, module = Module} = State) -> + ?LOG_NOTICE("Storage[~0p] making bootstrap snapshot ~0p at ~0p with config ~0p and data ~0P.", + [Name, Path, Position, Config, Data, 30], #{domain => [whatsapp, wa_raft]}), + case erlang:function_exported(Module, storage_make_empty_snapshot, 6) of + true -> {reply, Module:storage_make_empty_snapshot(Name, Identifier, Path, Position, Config, Data), State}; + false -> {reply, {error, not_supported}, State} + end; + handle_call(config, _From, #state{module = Module, handle = Handle} = State) -> ?RAFT_COUNT('raft.storage.config'), Result = Module:storage_config(Handle), diff --git a/src/wa_raft_storage_ets.erl b/src/wa_raft_storage_ets.erl index 284b517..4606c20 100644 --- a/src/wa_raft_storage_ets.erl +++ b/src/wa_raft_storage_ets.erl @@ -22,11 +22,15 @@ storage_apply_config/3, storage_read/3, storage_create_snapshot/2, - storage_open_snapshot/3 + storage_open_snapshot/3, + storage_make_empty_snapshot/6 ]). -include("wa_raft.hrl"). +%% Options used for the ETS table +-define(OPTIONS, [set, public, {read_concurrency, true}, {write_concurrency, true}]). + %% Filename used for the actual ETS table file in a snapshot -define(SNAPSHOT_FILENAME, "data"). @@ -46,7 +50,7 @@ -spec storage_open(atom(), #raft_identifier{}, file:filename()) -> #state{}. storage_open(Name, #raft_identifier{table = Table, partition = Partition}, _RootDir) -> - Storage = ets:new(Name, [set, public, {read_concurrency, true}, {write_concurrency, true}]), + Storage = ets:new(Name, ?OPTIONS), #state{name = Name, table = Table, partition = Partition, storage = Storage}. -spec storage_close(#state{}) -> ok. @@ -130,3 +134,10 @@ storage_open_snapshot(SnapshotPath, SnapshotPosition, #state{storage = Storage} {error, Reason} -> {error, Reason} end. + +-spec storage_make_empty_snapshot(atom(), #raft_identifier{}, file:filename(), wa_raft_log:log_pos(), wa_raft_server:config(), dynamic()) -> ok | wa_raft_storage:error(). +storage_make_empty_snapshot(Name, #raft_identifier{table = Table, partition = Partition}, SnapshotPath, SnapshotPosition, Config, _Data) -> + Storage = ets:new(Name, ?OPTIONS), + State = #state{name = Name, table = Table, partition = Partition, storage = Storage}, + storage_apply_config(Config, SnapshotPosition, State), + storage_create_snapshot(SnapshotPath, State).