Skip to content

Commit

Permalink
add first class raft config read/write callbacks
Browse files Browse the repository at this point in the history
Reviewed By: hsun324

Differential Revision: D70581993

fbshipit-source-id: 8dbf40844d318ebe167ee87a718ef5b2dd185e74
  • Loading branch information
Sarah Hassan authored and facebook-github-bot committed Mar 5, 2025
1 parent 4675904 commit 01adb94
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
34 changes: 32 additions & 2 deletions src/wa_raft_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
label/1
]).

%% Config API
-export([
config/1
]).

%% Misc API
-export([
status/1
Expand Down Expand Up @@ -141,6 +146,11 @@
%% callback `storage_apply/4` for details.
-callback storage_label(Handle :: storage_handle()) -> {ok, Label :: wa_raft_label:label()} | error().
-optional_callbacks([storage_label/1]).


%% Issue a read command to get the config of the current storage state.
-callback storage_config(Handle :: storage_handle()) -> {ok, Version :: wa_raft_log:log_pos(), Config :: wa_raft_server:config()} | undefined.

%%-----------------------------------------------------------------------------
%% RAFT Storage Provider - Write Commands
%%-----------------------------------------------------------------------------
Expand Down Expand Up @@ -199,6 +209,15 @@
%% desired consistency guarantee then implementations can raise an error to
%% cause the apply to be aborted safely.
-callback storage_write_metadata(Handle :: storage_handle(), Key :: metadata(), Version :: wa_raft_log:log_pos(), Value :: term()) -> ok | error().
-optional_callbacks([storage_write_metadata/4]).

%% Apply a write command to update the raft config stored by the storage provider
%% on behalf of the RAFT implementation. Subsequent calls to read the config
%% should return the updated version and value.
%% If the command could not be applied in a manner so as to preserve the
%% desired consistency guarantee then implementations can raise an error to be
%% aborted safely.
-callback storage_apply_config(Config :: wa_raft_server:config(), Position :: wa_raft_log:log_pos(), Handle :: storage_handle()) -> {Result :: ok | error(), NewHandle :: storage_handle()}.

%%-----------------------------------------------------------------------------
%% RAFT Storage Provider - Read Commands
Expand Down Expand Up @@ -230,6 +249,7 @@
%% Apply a read command against the storage state to read the most recently
%% written value of the metadata with the provided key, if such a value exists.
-callback storage_read_metadata(Handle :: storage_handle(), Key :: metadata()) -> {ok, Version :: wa_raft_log:log_pos(), Value :: term()} | undefined | error().
-optional_callbacks([storage_read_metadata/2]).

%%-----------------------------------------------------------------------------
%% RAFT Storage Provider - Snapshots
Expand Down Expand Up @@ -349,10 +369,14 @@ delete_snapshot(ServiceRef, Name) ->
read_metadata(ServiceRef, Key) ->
gen_server:call(ServiceRef, {read_metadata, Key}, ?RAFT_STORAGE_CALL_TIMEOUT()).

-spec label(ServiceRef :: pid() | atom()) -> wa_raft_label:label().
-spec label(ServiceRef :: pid() | atom()) -> {ok, Label :: wa_raft_label:label()} | wa_raft_storage:error().
label(ServiceRef) ->
gen_server:call(ServiceRef, label, ?RAFT_STORAGE_CALL_TIMEOUT()).

-spec config(ServiceRef :: pid() | atom()) -> {ok, wa_raft_log:log_pos(), wa_raft_server:config()} | undefined | wa_raft_storage:error().
config(ServiceRef) ->
gen_server:call(ServiceRef, config, ?RAFT_STORAGE_CALL_TIMEOUT()).

-ifdef(TEST).
-spec reset(ServiceRef :: pid() | atom(), Position :: wa_raft_log:log_pos(), Config :: wa_raft_server:config() | undefined) -> ok | error().
reset(ServiceRef, Position, Config) ->
Expand Down Expand Up @@ -428,7 +452,8 @@ init(#raft_options{application = App, table = Table, partition = Partition, data
{snapshot_create, Name :: string()} |
{snapshot_open, Path :: file:filename(), LastAppliedPos :: wa_raft_log:log_pos()} |
{read_metadata, Key :: metadata()} |
label.
label |
config.
handle_call(open, _From, #state{last_applied = LastApplied} = State) ->
{reply, {ok, LastApplied}, State};

Expand Down Expand Up @@ -460,6 +485,11 @@ handle_call({read_metadata, Key}, _From, #state{module = Module, handle = Handle
Result = Module:storage_read_metadata(Handle, Key),
{reply, Result, State};

handle_call(config, _From, #state{module = Module, handle = Handle} = State) ->
?RAFT_COUNT('raft.storage.config'),
Result = Module:storage_config(Handle),
{reply, Result, State};

handle_call(label, _From, #state{module = Module, handle = Handle} = State) ->
?RAFT_COUNT('raft.storage.label'),
Result = Module:storage_label(Handle),
Expand Down
18 changes: 18 additions & 0 deletions src/wa_raft_storage_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
storage_close/1,
storage_label/1,
storage_position/1,
storage_config/1,
storage_apply/3,
storage_apply/4,
storage_apply_config/3,
storage_write_metadata/4,
storage_read/3,
storage_read_metadata/2,
Expand Down Expand Up @@ -65,6 +67,13 @@ storage_label(#state{storage = Storage}) ->
[] -> {ok, undefined}
end.

-spec storage_config(#state{}) -> {ok, wa_raft_log:log_pos(), wa_raft_server:config()} | undefined.
storage_config(#state{storage = Storage}) ->
case ets:lookup(Storage, {?METADATA_TAG, config}) of
[{_, {Version, Value}}] -> {ok, Version, Value};
[] -> undefined
end.

-spec storage_apply(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), Label :: wa_raft_label:label(), Storage :: #state{}) -> {ok, #state{}}.
storage_apply(Command, Position, Label, #state{storage = Storage} = State) ->
true = ets:insert(Storage, {?LABEL_TAG, Label}),
Expand All @@ -82,6 +91,15 @@ storage_apply({delete, _Table, Key}, Position, #state{storage = Storage} = State
true = ets:insert(Storage, {?POSITION_TAG, Position}),
{ok, State}.

-spec storage_apply_config(
Config :: wa_raft_server:config(),
LogPos :: wa_raft_log:log_pos(),
State :: #state{}
) -> {ok | wa_raft_storage:error(), #state{}}.
storage_apply_config(Config, LogPos, #state{storage = Storage} = State) ->
true = ets:insert(Storage, [{{?METADATA_TAG, config}, {LogPos, Config}}, {?POSITION_TAG, LogPos}]),
{ok, State}.

-spec storage_write_metadata(#state{}, wa_raft_storage:metadata(), wa_raft_log:log_pos(), term()) -> ok.
storage_write_metadata(#state{storage = Storage}, Key, Version, Value) ->
true = ets:insert(Storage, [{{?METADATA_TAG, Key}, {Version, Value}}, {?POSITION_TAG, Version}]),
Expand Down

0 comments on commit 01adb94

Please sign in to comment.