diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index 7832a45..84eea70 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -2125,6 +2125,10 @@ max_index_to_apply(MatchIndex, LastIndex, Config) -> to_member_list(Mapping, Default, Config) -> [maps:get(Node, Mapping, Default) || {_, Node} <- config_membership(Config)]. +-spec compute_max(Mapping :: #{node() => Value}, Default :: Value, Config :: config()) -> Max :: Value. +compute_max(Mapping, Default, Config) -> + lists:max(to_member_list(Mapping, Default, Config)). + %% Compute the quorum maximum value for the current membership given a config for %% the values represented by the given a mapping of peers (see note on config about %% RAFT RPC ids) to values assuming a default value for peers who are not represented @@ -2334,7 +2338,8 @@ heartbeat(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, LastFollowerHeartbeatTs = maps:get(FollowerId, LastHeartbeatTs, undefined), State1 = State0#raft_state{last_heartbeat_ts = LastHeartbeatTs#{FollowerId => NowTs}, leader_heartbeat_ts = NowTs}, LastIndex = wa_raft_log:last_index(View), - Witnesses = config_witnesses(config(State0)), + Config = config(State0), + Witnesses = config_witnesses(Config), case PrevLogTermRes =:= not_found orelse IsCatchingUp of %% catching up, or prep true -> {ok, LastTerm} = wa_raft_log:term(View, LastIndex), @@ -2348,15 +2353,17 @@ heartbeat(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, Entries = case lists:member({Name, FollowerId}, Witnesses) of true -> - MaxWitnessLogEntries = min(?RAFT_HEARTBEAT_MAX_ENTRIES_TO_WITNESS(App), CommitIndex - FollowerNextIndex + 1), - {ok, Terms} = wa_raft_log:get_terms(View, FollowerNextIndex, MaxWitnessLogEntries), + MaxWitnessLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES_TO_WITNESS(App), + MaxMatchIndex = compute_max(MatchIndex, 0, Config), + Limit = max(0, min(MaxWitnessLogEntries, MaxMatchIndex - FollowerNextIndex + 1)), + {ok, Terms} = wa_raft_log:get_terms(View, FollowerNextIndex, Limit), [{Term, []} || Term <- Terms]; - _ -> + false -> MaxLogEntries = ?RAFT_HEARTBEAT_MAX_ENTRIES(App), MaxHeartbeatSize = ?RAFT_HEARTBEAT_MAX_BYTES(App), {ok, Ret} = wa_raft_log:get(View, FollowerNextIndex, MaxLogEntries, MaxHeartbeatSize), Ret - end, + end, {ok, PrevLogTerm} = PrevLogTermRes, ?RAFT_GATHER('raft.leader.heartbeat.size', length(Entries)), ?LOG_DEBUG("Server[~0p, term ~0p, leader] heartbeat to follower ~p from ~p(~p entries). Commit index ~p",