diff --git a/src/shotgun.erl b/src/shotgun.erl index 459ad7f..c80e238 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -101,6 +101,8 @@ data => binary() }. +-type state() :: #{}. + -export_type([event/0]). %% @doc Starts the application and all the ones it depends on. @@ -237,6 +239,7 @@ put(Pid, Uri, Headers0, Body, Options) -> -spec request(pid(), http_verb(), iodata(), headers(), iodata(), options()) -> result(). request(Pid, get, Uri, Headers0, Body, Options) -> + Ref = make_ref(), try check_uri(Uri), #{handle_event := HandleEvent, @@ -249,22 +252,29 @@ request(Pid, get, Uri, Headers0, Body, Options) -> true -> {get_async, {HandleEvent, AsyncMode}, - {Uri, Headers, Body}}; + {Ref, Uri, Headers, Body}}; false -> - {get, {Uri, Headers, Body}} + {get, {Ref, Uri, Headers, Body}} end, gen_fsm:sync_send_event(Pid, Event, Timeout) catch + exit:{timeout, _} -> + gen_fsm:send_all_state_event(Pid, {cancel_req, Ref}), + {error, timeout}; _:Reason -> {error, Reason} end; request(Pid, Method, Uri, Headers0, Body, Options) -> + Ref = make_ref(), try check_uri(Uri), #{headers := Headers, timeout := Timeout} = process_options(Options, Headers0, Method), - Event = {Method, {Uri, Headers, Body}}, + Event = {Method, {Ref, Uri, Headers, Body}}, gen_fsm:sync_send_event(Pid, Event, Timeout) catch + exit:{timeout, _} -> + gen_fsm:send_all_state_event(Pid, {cancel_req, Ref}), + {error, timeout}; _:Reason -> {error, Reason} end. @@ -299,8 +309,6 @@ parse_event(EventBin) -> %% gen_fsm callbacks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --type state() :: #{}. - %% @private -spec init([term()]) -> {ok, at_rest, state()} | {stop, gun_open_timeout} | {stop, gun_open_failed}. @@ -332,7 +340,26 @@ init([Host, Port, Type, Opts]) -> end. %% @private --spec handle_event(shutdown, atom(), state()) -> {stop, normal, state()}. +-spec handle_event(shutdown | {cancel_req, reference()}, + StateName::atom(), state()) -> + {stop, normal, state()} | {next_state, at_rest, state()} | + {next_state, StateName::atom(), state()}. +%Cancel our current unit of work... +handle_event({cancel_req, ReqRef}, _, + #{req_ref := ReqRef, stream := StreamRef, pid := Pid} = State) -> + %This isn't guaranteed to cancel the pending request, but it will + %prevent us from getting messages about it in the future. + ok = gun:cancel(Pid, StreamRef), + {next_state, at_rest, State, 0}; +%Or unqueue a queued unit of work... +handle_event({cancel_req, ReqRef}, StateName, StateData) -> + NewStateData = remove_request(ReqRef, StateData), + case StateName of + at_rest -> + {next_state, at_rest, NewStateData, 0}; + _ -> + {next_state, StateName, NewStateData} + end; handle_event(shutdown, _StateName, StateData) -> {stop, normal, StateData}. @@ -393,12 +420,14 @@ at_rest(timeout, State) -> ok = gen_fsm:send_event(self(), Work), {next_state, at_rest, NewState} end; -at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, +at_rest({get_async, {HandleEvent, AsyncMode}, + {ReqRef, _, _, _} = Args, From}, State = #{pid := Pid}) -> StreamRef = do_http_verb(get, Pid, Args), CleanState = clean_state(State), NewState = CleanState#{ from => From, + req_ref => ReqRef, pid => Pid, stream => StreamRef, handle_event => HandleEvent, @@ -406,13 +435,15 @@ at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, async_mode => AsyncMode }, {next_state, wait_response, NewState}; -at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> +at_rest({HttpVerb, {ReqRef, _, _, _} = Args, From}, + State = #{pid := Pid}) -> StreamRef = do_http_verb(HttpVerb, Pid, Args), CleanState = clean_state(State), NewState = CleanState#{ pid => Pid, stream => StreamRef, - from => From + from => From, + req_ref => ReqRef }, {next_state, wait_response, NewState}. @@ -520,10 +551,12 @@ receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private -clean_state() -> clean_state(queue:new()). +clean_state() -> + clean_state(queue:new()). %% @private --spec clean_state(map()) -> map(); (queue:queue()) -> map(). +-spec clean_state(map()) -> map(); + (queue:queue()) -> map(). clean_state(State) when is_map(State) -> clean_state(get_pending_reqs(State)); clean_state(Reqs) -> @@ -539,12 +572,13 @@ clean_state(Reqs) -> async => false, async_mode => binary, buffer => <<"">>, + req_ref => undefined, pending_requests => Reqs }. %% @private -spec do_http_verb(http_verb(), pid(), tuple()) -> reference(). -do_http_verb(Method, Pid, {Uri, Headers, Body}) -> +do_http_verb(Method, Pid, {_, Uri, Headers, Body}) -> MethodStr = string:to_upper(atom_to_list(Method)), MethodBin = list_to_binary(MethodStr), gun:request(Pid, MethodBin, Uri, Headers, Body). @@ -642,17 +676,14 @@ check_uri(U) -> end. %% @private -enqueue_work_or_stop(FSM = at_rest, Event, From, State) -> - enqueue_work_or_stop(FSM, Event, From, State, 0); -enqueue_work_or_stop(FSM, Event, From, State) -> - enqueue_work_or_stop(FSM, Event, From, State, infinity). - -%% @private -enqueue_work_or_stop(FSM, Event, From, State, Timeout) -> +enqueue_work_or_stop(StateName, Event, From, State) -> case create_work(Event, From) of {ok, Work} -> NewState = append_work(Work, State), - {next_state, FSM, NewState, Timeout}; + case StateName of + at_rest -> {next_state, at_rest, NewState, 0}; + _ -> {next_state, StateName, NewState} + end; not_work -> {stop, {unexpected, Event}, State} end. @@ -690,3 +721,17 @@ append_work(Work, State) -> %% @private get_pending_reqs(State) -> maps:get(pending_requests, State). + +%% @private +-spec remove_request(reference(), state()) -> state(). +remove_request(ReqRef, #{pending_requests := PendingReqs} = StateData) -> + NewPendingReqs = + queue:filter(fun(E) -> + case E of + %Handle get_async + {_, _, {Ref, _, _, _}, _} -> Ref /= ReqRef; + %Handle all other work. + {_, {Ref, _, _, _}, _} -> Ref /= ReqRef + end + end, PendingReqs), + StateData#{pending_requests := NewPendingReqs}.