diff --git a/src/erlmld_batch_processor.erl b/src/erlmld_batch_processor.erl index 9be9073..bfa5a09 100644 --- a/src/erlmld_batch_processor.erl +++ b/src/erlmld_batch_processor.erl @@ -105,8 +105,17 @@ initialize(Opts, ShardId, ISN) -> {ok, update_watchdog(State)}. -ready(State) -> - {ok, update_watchdog(State)}. +ready(#state{flusher_mod = FMod, flusher_state = FState} = State) -> + {ok, NFState, Tokens} = FMod:heartbeat(FState), + NState = flusher_state(State, NFState), + NNState = + case Tokens of + [] -> + NState; + _ -> + note_success(note_flush(NState), Tokens) + end, + maybe_checkpoint(update_watchdog(NNState)). process_record(#state{last_flush_time = LastFlush, diff --git a/src/erlmld_flusher.erl b/src/erlmld_flusher.erl index 53ad731..aa6fc1c 100644 --- a/src/erlmld_flusher.erl +++ b/src/erlmld_flusher.erl @@ -29,6 +29,18 @@ %%% The batch processor handles checkpointing and decides when to trigger %%% flushing. %%% +%%% heartbeat/1 will be called regardless of whether any records could be +%%% obtained from the stream. It may return the same values as flush/2. +%%% If it returns a non-empty list of tokens as the third tuple element, it +%%% is considered to have just performed a partial flush. This allows a +%%% flusher to flush even if no records were actually available on the +%%% stream (e.g., after a period of time has elapsed), avoiding potential +%%% near-deadlock situations which would only be resolved by additional +%%% stream records appearing (where the batch processor is waiting for +%%% tokens from the flusher before checkpointing, but the flusher is +%%% waiting for more records from the batch processor before producing +%%% tokens via flushing). +%%% %%% @end %%% Created : 20 Dec 2016 by Constantin Berzan @@ -47,3 +59,7 @@ -callback flush(flusher_state(), partial | full) -> {ok, flusher_state(), list(flusher_token())} | {error, term()}. + +-callback heartbeat(flusher_state()) -> + {ok, flusher_state(), list(flusher_token())} + | {error, term()}. diff --git a/src/erlmld_worker.erl b/src/erlmld_worker.erl index 6659c98..6c84068 100644 --- a/src/erlmld_worker.erl +++ b/src/erlmld_worker.erl @@ -26,9 +26,11 @@ %%% at the most recent sequence number. %%% %%% Before starting to process each batch of records, a worker's ready/1 callback is -%%% called, which should return a possibly-updated worker state. This can be useful -%%% when a record processor is using a watchdog timer and is far behind on a stream -%%% (and so won't receive any actual records for a while). +%%% called, which should return a possibly-updated worker state and possibly a +%%% checkpoint. This can be useful when a record processor is using a watchdog timer +%%% and is far behind on a stream (and so won't receive any actual records for a +%%% while), or if a stream has very low volume (records seen less frequently than +%%% desired checkpoint or flush intervals). %%% %%% When a shard lease has been lost or a shard has been completely processed, a worker %%% will be shut down. If the lease was lost, the worker will receive a reason of @@ -56,6 +58,7 @@ -callback ready(worker_state()) -> {ok, worker_state()} + | {ok, worker_state(), checkpoint()} | {error, term()}. -callback process_record(worker_state(), stream_record()) -> diff --git a/src/erlmld_wrk_statem.erl b/src/erlmld_wrk_statem.erl index d09af79..7a250cd 100644 --- a/src/erlmld_wrk_statem.erl +++ b/src/erlmld_wrk_statem.erl @@ -325,20 +325,31 @@ handle_event(?INTERNAL, #{<<"action">> := <<"shutdownRequested">>} = R, %% callback, then deaggregate them all at once if they are in KPL format, and then provide %% each in turn to the handler module, which will have the opportunity to checkpoint after %% each record. the MLD should wait for our "status" response before sending any -%% additional records or other requests. +%% additional records or other requests. if the worker returned a checkpoint response, +%% checkpoint before processing the records. handle_event(?INTERNAL, #{<<"action">> := <<"processRecords">>, <<"records">> := Records} = R, {?DISPATCH, ?REQUEST}, #data{handler_module = Mod, worker_state = {ok, WorkerState}} = Data) -> case Mod:ready(WorkerState) of - {ok, NWorkerState} -> + {error, _} = Error -> + {stop, Error}; + + Ready -> + {NWorkerState, Checkpoint} = case Ready of + {ok, S} -> {S, undefined}; + {ok, S, C} -> {S, C} + end, NData = worker_state(Data#data{is_v2 = maps:is_key(<<"millisBehindLatest">>, R)}, NWorkerState), - process_records(R, NData, deaggregate_kpl_records(R, Records)); - - {error, _} = Error -> - {stop, Error} + DeaggregatedRecords = deaggregate_kpl_records(R, Records), + case Checkpoint of + undefined -> + process_records(R, NData, DeaggregatedRecords); + _ -> + checkpoint(R, NData, Checkpoint, DeaggregatedRecords) + end end;