Skip to content

Commit ea59332

Browse files
committed
suspension fixes
1 parent 0ba32fb commit ea59332

File tree

4 files changed

+31
-11
lines changed

4 files changed

+31
-11
lines changed

src/ra_server.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
checkpoint/3,
5353
persist_last_applied/1,
5454
peers/1,
55+
peer_status/2,
5556
update_peer/3,
5657
update_disconnected_peers/3,
5758
handle_down/5,
@@ -2548,6 +2549,14 @@ new_peer_with(Map) ->
25482549
peers(#{cfg := #cfg{id = Id}, cluster := Peers}) ->
25492550
maps:remove(Id, Peers).
25502551

2552+
peer_status(PeerId, #{cluster := Peers}) ->
2553+
case Peers of
2554+
#{PeerId := #{status := Status}} ->
2555+
Status;
2556+
_ ->
2557+
undefined
2558+
end.
2559+
25512560
%% remove any peers that are currently receiving a snapshot
25522561
peers_with_normal_status(State) ->
25532562
maps:filter(fun (_, #{status := normal}) -> true;

src/ra_server_proc.erl

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -579,8 +579,13 @@ leader(info, {Status, Node, InfoList}, State0)
579579
when Status =:= nodedown orelse
580580
Status =:= nodeup ->
581581
handle_node_status_change(Node, Status, InfoList, ?FUNCTION_NAME, State0);
582-
leader(info, {update_peer, PeerId, Update}, State0) ->
583-
State = update_peer(PeerId, Update, State0),
582+
leader(info, {unsuspend_peer, PeerId}, State0) ->
583+
State = case ra_server:peer_status(PeerId, State0#state.server_state) of
584+
suspended ->
585+
update_peer(PeerId, #{status => normal}, State0);
586+
_ ->
587+
State0
588+
end,
584589
{keep_state, State, []};
585590
leader(_, tick_timeout, State0) ->
586591
{State1, RpcEffs} = make_rpcs(State0),
@@ -1393,13 +1398,15 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) ->
13931398
{State, lists:reverse(Actions)}.
13941399

13951400
handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
1396-
#state{conf = Conf} = State0, Actions) ->
1401+
#state{conf = Conf,
1402+
server_state = SS} = State0, Actions) ->
13971403
% fully qualified use only so that we can mock it for testing
13981404
% TODO: review / refactor to remove the mod call here
1405+
PeerStatus = ra_server:peer_status(To, SS),
13991406
case ?MODULE:send_rpc(To, Rpc, State0) of
14001407
ok ->
14011408
{State0, Actions};
1402-
nosuspend ->
1409+
nosuspend when PeerStatus == normal ->
14031410
%% update peer status to suspended and spawn a process
14041411
%% to send the rpc without nosuspend so that it will block until
14051412
%% the data can get through
@@ -1410,11 +1417,13 @@ handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
14101417
%% the peer status back to normal
14111418
ok = gen_statem:cast(To, Rpc),
14121419
incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1),
1413-
Self ! {update_peer, To, #{status => normal}}
1420+
Self ! {unsuspend_peer, To}
14141421
end),
1415-
?DEBUG("~ts: temporarily suspending peer ~w due to full distribution buffer",
1416-
[log_id(State0), To]),
1422+
% ?DEBUG("~ts: temporarily suspending peer ~w due to full distribution buffer ~W",
1423+
% [log_id(State0), To, Rpc, 5]),
14171424
{update_peer(To, #{status => suspended}, State0), Actions};
1425+
nosuspend ->
1426+
{State0, Actions};
14181427
noconnect ->
14191428
%% for noconnects just allow it to pipeline and catch up later
14201429
{State0, Actions}
@@ -1976,6 +1985,8 @@ send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
19761985
Result = read_chunks_and_send_rpc(RPC, To, ReadState, 1,
19771986
ChunkSize, InstallTimeout,
19781987
SnapState),
1988+
?DEBUG("~ts: sending snapshot to ~w completed",
1989+
[LogId, To]),
19791990
ok = gen_statem:cast(Id, {To, Result})
19801991
end.
19811992

test/ra_kv_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ snapshot_replication_interrupted(_Config) ->
7171
Data = crypto:strong_rand_bytes(100_000),
7272
%% write 10k entries of the same key
7373
[{ok, #{}} = ra_kv:put(KvId, term_to_binary(I), Data, 5000)
74-
|| I <- lists:seq(1, 50_000)],
75-
?assertMatch({ok, #{machine := #{num_keys := 50_000}}, KvId},
74+
|| I <- lists:seq(1, 10_000)],
75+
?assertMatch({ok, #{machine := #{num_keys := 10_000}}, KvId},
7676
ra:member_overview(KvId)),
7777

7878
ra_log_wal:force_roll_over(ra_log_wal),
@@ -99,7 +99,7 @@ snapshot_replication_interrupted(_Config) ->
9999
ct:pal("ra_state ~p", [ets:tab2list(ra_state)]),
100100
ok = ra:stop_server(?SYS, KvId3),
101101
[{ok, #{}} = ra_kv:put(KvId, term_to_binary(I), Data, 5000)
102-
|| I <- lists:seq(50_001, 50_010)],
102+
|| I <- lists:seq(10_001, 10_010)],
103103
ok = ra:restart_server(?SYS, KvId3),
104104
{ok, #{log := #{last_index := Kv1LastIndex }}, _} = ra:member_overview(KvId),
105105
ok = ra_lib:retry(

test/ra_server_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ recover_restores_cluster_changes(_Config) ->
273273

274274
% n2 joins
275275
{leader, #{cluster := Cluster,
276-
log := Log0}, _} =
276+
log := Log0}, _Effs} =
277277
ra_server:handle_leader({command, {'$ra_join', meta(),
278278
N2, await_consensus}}, State),
279279
{LIdx, _} = ra_log:last_index_term(Log0),

0 commit comments

Comments
 (0)