Skip to content

Commit

Permalink
Merge branch 'master' of AdRoll/erlmld
Browse files Browse the repository at this point in the history
  • Loading branch information
zerth committed Aug 16, 2018
2 parents 00f41e8 + 25d28e6 commit 9fc6c0e
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 17 deletions.
4 changes: 2 additions & 2 deletions include/erlmld.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

-record(stream_record, {
partition_key :: binary(),
timestamp :: non_neg_integer(), % approximate arrival time (ms)
timestamp :: undefined | non_neg_integer(), % approximate arrival time (ms)
delay :: non_neg_integer(), % approximate delay between this record and tip of stream (ms)
sequence_number :: sequence_number(),
data :: binary()
data :: term()
}).

-type worker_state() :: term().
Expand Down
15 changes: 15 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@
]}
]}
]}.

{dialyzer, [
{warnings, [unknown, no_return, error_handling]},
{plt_apps, top_level_deps},
{plt_extra_apps, []},
{plt_location, local},
{base_plt_apps, [erts, stdlib, kernel, sasl, runtime_tools, tools]},
{base_plt_location, global}
]}.

{xref_checks,[
undefined_function_calls,
locals_not_used,
deprecated_function_calls
]}.
1 change: 1 addition & 0 deletions src/erlmld.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
{applications,
[kernel,
stdlib,
crypto,
erlexec,
jiffy,
b64fast]},
Expand Down
16 changes: 16 additions & 0 deletions src/erlmld_batch_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
-export([initialize/3,
ready/1,
process_record/2,
checkpointed/3,
shutdown/2]).

-include("erlmld.hrl").
Expand All @@ -47,6 +48,10 @@
%% Optional callback to call each time process_record returns a checkpoint.
on_checkpoint :: fun((term(), shard_id()) -> term()),

%% Optional, false by default. Tells whether to log or not every successful
%% checkpoint from the KCL worker.
log_checkpoints :: boolean(),

description :: term(),
shard_id :: shard_id(),
count = 0, % non-ignored records seen
Expand All @@ -71,12 +76,14 @@

initialize(Opts, ShardId, ISN) ->
Defaults = #{on_checkpoint => fun(_, _) -> ok end,
log_checkpoints => false,
description => undefined,
enable_subsequence_checkpoints => false},
#{description := Description,
flusher_mod := FlusherMod,
flusher_mod_data := FlusherModData,
on_checkpoint := OnCheckpoint,
log_checkpoints := LogCheckpoints,
flush_interval_ms := FlushIntervalMs,
checkpoint_interval_ms := CheckpointIntervalMs,
watchdog_timeout_ms := WatchdogTimeoutMs,
Expand All @@ -85,6 +92,7 @@ initialize(Opts, ShardId, ISN) ->
flusher_mod = FlusherMod,
flusher_state = FlusherMod:init(ShardId, FlusherModData),
on_checkpoint = OnCheckpoint,
log_checkpoints = LogCheckpoints,
description = Description,
shard_id = ShardId,
flush_interval_ms = FlushIntervalMs,
Expand Down Expand Up @@ -114,6 +122,14 @@ process_record(#state{last_flush_time = LastFlush,
end,
maybe_checkpoint(update_watchdog(NState)).

checkpointed(#state{log_checkpoints = LogCheckpoints} = State,
SequenceNumber,
Checkpoint) ->
case LogCheckpoints of
true -> error_logger:info_msg("~p checkpointed at ~p (~p)~n", [State, Checkpoint, SequenceNumber]);
false -> ok
end,
{ok, State}.

shutdown(#state{description = Description,
shard_id = ShardId,
Expand Down
7 changes: 7 additions & 0 deletions src/erlmld_noisy_wrk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-export([initialize/3,
ready/1,
process_record/2,
checkpointed/3,
shutdown/2]).

-include("erlmld.hrl").
Expand Down Expand Up @@ -42,6 +43,12 @@ process_record(#state{shard_id = ShardId, count = Count} = State,
{ok, State#state{count = Count + 1}}
end.

checkpointed(#state{shard_id = ShardId, count = Count} = State,
SequenceNumber,
Checkpoint
) ->
io:format("~p (~p) checkpointed at ~p (~p)~n", [ShardId, Count, Checkpoint, SequenceNumber]),
{ok, State}.

shutdown(#state{shard_id = ShardId, count = Count}, Reason) ->
io:format("~p (~p) shutting down, reason: ~p~n", [ShardId, Count, Reason]),
Expand Down
4 changes: 4 additions & 0 deletions src/erlmld_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
| {ok, worker_state(), checkpoint()}
| {error, term()}.

-callback checkpointed(worker_state(), sequence_number(), checkpoint()) ->
{ok, worker_state()}
| {error, term()}.

-callback shutdown(worker_state(), shutdown_reason()) ->
ok
| {ok, checkpoint()}
Expand Down
34 changes: 19 additions & 15 deletions src/erlmld_wrk_statem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,23 @@
handler_data :: term(),

%% connected socket owned by this process:
socket :: gen_tcp:socket(),
socket :: undefined | gen_tcp:socket(),

%% input buffer; responses are small and we need no output buffer:
buf = [] :: list(binary()),
buf = [] :: [binary()],

%% worker state returned from handler module init:
worker_state :: term(),
worker_state :: undefined | term(),

%% if true, the MLD made a processRecords call with the V2 format (supplied
%% millisBehindLatest), so we will checkpoint using the V2 checkpoint format:
is_v2 = false :: boolean(),

%% most recent action name from the peer:
last_request :: binary(),
last_request :: undefined | binary(),

%% last attempted checkpoint:
last_checkpoint :: checkpoint()
last_checkpoint :: undefined | checkpoint()
}).

-define(INTERNAL, internal).
Expand Down Expand Up @@ -410,22 +410,26 @@ handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>,

handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>} = R,
{?DISPATCH, CheckpointState},
#data{worker_state = {ok, WorkerState},
#data{handler_module = Mod,
worker_state = {ok, WorkerState},
last_checkpoint = Checkpoint,
last_request = LastAction} = Data)
when CheckpointState == ?CHECKPOINT;
CheckpointState == ?SHUTDOWN_CHECKPOINT ->
%% successful checkpoint. fixme; provide indication of success to worker?
SN = sequence_number(R),
error_logger:info_msg("~p checkpointed at ~p (~p)~n", [WorkerState, Checkpoint, SN]),
case CheckpointState of
?CHECKPOINT ->
{next_state, ?PROCESS_RECORDS, Data};
?SHUTDOWN_CHECKPOINT ->
success(LastAction, Data, ?SHUTDOWN)
case Mod:checkpointed(WorkerState, SN, Checkpoint) of
{ok, NWorkerState} ->
NData = worker_state(Data, NWorkerState),
case CheckpointState of
?CHECKPOINT ->
{next_state, ?PROCESS_RECORDS, NData};
?SHUTDOWN_CHECKPOINT ->
success(LastAction, NData, ?SHUTDOWN)
end;
{error, _} = Error ->
{stop, Error}
end;


%% we were processing records and we attempted to checkpoint, but failed because another
%% worker stole our lease. abort record processing, return a 'success' response for
%% the current command, and read the next request, which should be a shutdown command.
Expand Down Expand Up @@ -586,7 +590,7 @@ next_line(Bin, #data{buf = Buf} = Data) ->
%% and remaining data. an "action" is a line which should have been a json-encoded map
%% containing an "action" key. if decoding fails with a thrown error, that error is
%% returned as the decoded value.
-spec next_action(binary(), #data{}) -> {map() | undefined, #data{}, binary()}.
-spec next_action(binary(), #data{}) -> {map() | undefined | {error, term()}, #data{}, binary()}.
next_action(Bin, Data) ->
case next_line(Bin, Data) of
{undefined, NData, Rest} ->
Expand Down

0 comments on commit 9fc6c0e

Please sign in to comment.