Skip to content

Commit

Permalink
Add new reply callback to distribution behavior
Browse files Browse the repository at this point in the history
Summary:
Add a new `reply` callback to the distribution behavior for replies
to RPCs that were issued as calls rather than casts. This is currently
only used to reply to heartbeats that were issued by bulk log
replication.

Reviewed By: jaher

Differential Revision: D68913516

fbshipit-source-id: 4c2384c809f546aa05873c4b14ccb97f2396bdc5
  • Loading branch information
hsun324 authored and facebook-github-bot committed Jan 30, 2025
1 parent 3428560 commit 9652466
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/wa_raft_distribution.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

-export([
cast/3,
call/4
call/4,
reply/3
]).

-include("wa_raft.hrl").
Expand All @@ -28,6 +29,7 @@

-callback cast(dest_addr(), #raft_identifier{}, term()) -> term().
-callback call(dest_addr(), #raft_identifier{}, term(), integer() | infinity) -> term().
-callback reply(gen_server:from() | gen_statem:from(), #raft_identifier{}, term()) -> term().

%%% ------------------------------------------------------------------------
%%% Erlang distribution default implementation
Expand All @@ -40,3 +42,7 @@ cast(DestAddr, _Identifier, Message) ->
-spec call(DestAddr :: dest_addr(), Identifier :: #raft_identifier{}, Message :: term(), Timeout :: integer() | infinity) -> term().
call(DestAddr, _Identifier, Message, Timeout) ->
gen_server:call(DestAddr, Message, Timeout).

-spec reply(From :: gen_server:from() | gen_statem:from(), Identifier :: #raft_identifier{}, Reply :: term()) -> term().
reply(From, _Identifier, Reply) ->
gen:reply(From, Reply).
11 changes: 9 additions & 2 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2445,7 +2445,7 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi
case append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, Data0) of
{ok, Accepted, NewMatchIndex, Data1} ->
send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewMatchIndex), Data1),
reply(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewMatchIndex)),
reply_rpc(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewMatchIndex), Data1),

LocalTrimIndex = case ?RAFT_LOG_ROTATION_BY_TRIM_INDEX(App) of
true -> TrimIndex;
Expand Down Expand Up @@ -2524,7 +2524,7 @@ append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_stat
%% RAFT Server - State Machine Implementation - Helpers
%%------------------------------------------------------------------------------

%% Generic reply function that operates based on event type.
%% Generic reply function for non-RPC requests that operates based on event type.
-spec reply(Type :: enter | gen_statem:event_type(), Message :: term()) -> ok | wa_raft:error().
reply(cast, _Message) ->
ok;
Expand All @@ -2535,6 +2535,13 @@ reply(Type, Message) ->
[Type, Message, 100], #{domain => [whatsapp, wa_raft]}),
ok.

%% Generic reply function for RPC requests that operates based on event type.
-spec reply_rpc(Type :: gen_statem:event_type(), Reply :: term(), Data :: #raft_state{}) -> term().
reply_rpc({call, From}, Reply, #raft_state{identifier = Identifier, distribution_module = DistributionModule}) ->
DistributionModule:reply(From, Identifier, Reply);
reply_rpc(_Other, _Reply, _Data) ->
ok.

-spec send_rpc(Destination :: #raft_identity{}, ProcedureCall :: normalized_procedure(), State :: #raft_state{}) -> term().
send_rpc(Destination, Procedure, #raft_state{self = Self, current_term = Term} = State) ->
cast(Destination, make_rpc(Self, Term, Procedure), State).
Expand Down

0 comments on commit 9652466

Please sign in to comment.