146
146
receive_snapshot_timeout = ? DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT :: non_neg_integer (),
147
147
install_snap_rpc_timeout :: non_neg_integer (),
148
148
aten_poll_interval = 1000 :: non_neg_integer (),
149
- counter :: undefined | counters :counters_ref ()
149
+ counter :: undefined | counters :counters_ref (),
150
+ worker_pid :: pid ()
150
151
}).
151
152
152
153
-record (state , {conf :: # conf {},
@@ -301,17 +302,13 @@ multi_statem_call([ServerId | ServerIds], Msg, Errs, Timeout) ->
301
302
% %%===================================================================
302
303
303
304
init (#{reply_to := ReplyTo } = Config ) ->
304
- % % we have a reply to key, perform init async
305
305
{ok , post_init , maps :remove (reply_to , Config ),
306
- [{next_event , internal , {go , ReplyTo }}]};
307
- init (Config ) ->
308
- % % no reply_to key, must have been started by an older node run synchronous
309
- % % init
310
- State = do_init (Config ),
311
- {ok , recover , State , [{next_event , cast , go }]}.
306
+ [{next_event , internal , {go , ReplyTo }}]}.
312
307
313
308
do_init (#{id := Id ,
314
- cluster_name := ClusterName } = Config0 ) ->
309
+ parent := ParentPid ,
310
+ cluster_name := ClusterName } = Config0 )
311
+ when is_pid (ParentPid ) ->
315
312
Key = ra_lib :ra_server_id_to_local_name (Id ),
316
313
true = ets :insert (ra_state , {Key , init , unknown }),
317
314
process_flag (trap_exit , true ),
@@ -362,6 +359,16 @@ do_init(#{id := Id,
362
359
ReceiveSnapshotTimeout = maps :get (receive_snapshot_timeout , SysConf ,
363
360
? DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT ),
364
361
AtenPollInt = application :get_env (aten , poll_interval , 1000 ),
362
+ % % TODO: full error handling
363
+ WorkerPid = case ra_server_sup :start_ra_worker (ParentPid , Config ) of
364
+ {ok , P } -> P ;
365
+ {error , {already_started , P }} ->
366
+ P
367
+ end ,
368
+ ra_env :configure_logger (logger ),
369
+ % % monitor worker process, it is easier to handle than linking as we're
370
+ % % already processing all downs
371
+ _ = monitor (process , WorkerPid ),
365
372
State = # state {conf = # conf {log_id = LogId ,
366
373
cluster_name = ClusterName ,
367
374
name = Key ,
@@ -373,7 +380,8 @@ do_init(#{id := Id,
373
380
install_snap_rpc_timeout = InstallSnapRpcTimeout ,
374
381
receive_snapshot_timeout = ReceiveSnapshotTimeout ,
375
382
aten_poll_interval = AtenPollInt ,
376
- counter = Counter },
383
+ counter = Counter ,
384
+ worker_pid = WorkerPid },
377
385
low_priority_commands = ra_ets_queue :new (),
378
386
server_state = ServerState },
379
387
ok = net_kernel :monitor_nodes (true , [nodedown_reason ]),
@@ -1513,7 +1521,7 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}},
1513
1521
SS = ra_server :update_peer (To , #{status => disconnected }, SS0 ),
1514
1522
{State0 # state {server_state = SS }, Actions }
1515
1523
end ;
1516
- handle_effect (_ , {delete_snapshot , Dir , SnapshotRef }, _ , State0 , Actions ) ->
1524
+ handle_effect (_ , {delete_snapshot , Dir , SnapshotRef }, _ , State0 , Actions ) ->
1517
1525
% % delete snapshots in separate process
1518
1526
_ = spawn (fun () ->
1519
1527
ra_snapshot :delete (Dir , SnapshotRef )
@@ -1604,6 +1612,11 @@ handle_effect(follower, {record_leader_msg, _LeaderId}, _, State0, Actions) ->
1604
1612
handle_effect (_ , {record_leader_msg , _LeaderId }, _ , State0 , Actions ) ->
1605
1613
% % non follower states don't need to reset state timeout after an effect
1606
1614
{State0 , Actions };
1615
+ handle_effect (_ , {bg_work , FunOrMfa , ErrFun }, _ ,
1616
+ # state {conf = # conf {worker_pid = WorkerPid }} = State0 , Actions ) ->
1617
+ % % non follower states don't need to reset state timeout after an effect
1618
+ ra_worker :queue_work (WorkerPid , FunOrMfa , ErrFun ),
1619
+ {State0 , Actions };
1607
1620
handle_effect (_ , _ , _ , State0 , Actions ) ->
1608
1621
{State0 , Actions }.
1609
1622
@@ -2018,6 +2031,11 @@ handle_node_status_change(Node, Status, InfoList, RaftState,
2018
2031
monitors = Monitors }),
2019
2032
{keep_state , State , Actions }.
2020
2033
2034
+ handle_process_down (Pid , Info , _RaftState ,
2035
+ # state {conf = # conf {worker_pid = Pid }} = State ) ->
2036
+ ? WARN (" ~ts : worker exited with ~w " ,
2037
+ [log_id (State ), Info ]),
2038
+ {stop , Info , State };
2021
2039
handle_process_down (Pid , Info , RaftState ,
2022
2040
# state {monitors = Monitors0 ,
2023
2041
pending_notifys = Nots ,
0 commit comments