From 846c2297275cebc8c7e3c21ff8d931f6efaa02a0 Mon Sep 17 00:00:00 2001 From: Juan Facorro Date: Tue, 1 Dec 2015 11:14:04 -0300 Subject: [PATCH] [#124] Handle SSE for 'Content-Type: text/event-stream' per RFC --- Makefile | 3 ++- src/shotgun.erl | 70 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/Makefile b/Makefile index 54c5b6b..8441234 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,9 @@ PROJECT = shotgun DEPS = gun dep_gun = git https://github.com/ninenines/gun.git 427230d -SHELL_DEPS = sync +SHELL_DEPS = sync recon dep_sync = git git://github.com/inaka/sync.git 0.1.3 +dep_recon = git https://github.com/ferd/recon 2.2.1 include erlang.mk diff --git a/src/shotgun.erl b/src/shotgun.erl index a88973b..69550ff 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -489,10 +489,11 @@ at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) -> -spec wait_response(term(), term()) -> term(). wait_response({'DOWN', _, _, _, Reason}, _State) -> exit(Reason); -wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers}, - #{from := From, - async := Async, - responses := Responses} = State) -> +wait_response({gun_response, _Pid, StreamRef, fin, StatusCode, Headers}, + #{ from := From + , stream := StreamRef + , async := Async + , responses := Responses} = State) -> Response = #{status_code => StatusCode, headers => Headers}, NewResponses = case Async of @@ -504,23 +505,24 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers}, queue:in(Response, Responses) end, {next_state, at_rest, State#{responses => NewResponses}, 0}; -wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers}, - #{from := From, stream := StreamRef, async := Async} = State) -> - StateName = - case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of - {<<"transfer-encoding">>, <<"chunked">>} when Async == true-> - Result = {ok, StreamRef}, - gen_fsm:reply(From, Result), - receive_chunk; - _ -> - receive_data - end, +wait_response({gun_response, _Pid, StreamRef, nofin, StatusCode, Headers}, + #{ from := From + , stream := StreamRef + , async := Async} = State) -> + StateName = case is_chunked(Headers) orelse is_event_stream(Headers) of + true when Async -> + gen_fsm:reply(From, {ok, StreamRef}), + receive_chunk; + false -> + receive_data + end, { next_state , StateName , State#{status_code := StatusCode, headers := Headers} }; -wait_response({gun_error, _Pid, _StreamRef, Error}, - #{from := From} = State) -> +wait_response({gun_error, _Pid, StreamRef, Error}, + #{ from := From + , stream := StreamRef} = State) -> gen_fsm:reply(From, {error, Error}), {next_state, at_rest, State, 0}; wait_response(body_chunked, @@ -536,12 +538,16 @@ wait_response(Event, State) -> receive_data({'DOWN', _, _, _, _Reason}, _State) -> error(incomplete); receive_data({gun_data, _Pid, StreamRef, nofin, Data}, - #{stream := StreamRef, data := DataAcc} = State) -> + #{ stream := StreamRef + , data := DataAcc} = State) -> NewData = <>, {next_state, receive_data, State#{data => NewData}}; -receive_data({gun_data, _Pid, _StreamRef, fin, Data}, - #{data := DataAcc, from := From, status_code - := StatusCode, headers := Headers} = State) -> +receive_data({gun_data, _Pid, StreamRef, fin, Data}, + #{ data := DataAcc + , from := From + , stream := StreamRef + , status_code := StatusCode + , headers := Headers} = State) -> NewData = <>, Result = {ok, #{status_code => StatusCode, headers => Headers, @@ -558,7 +564,8 @@ receive_data({gun_error, _Pid, StreamRef, _Reason}, -spec receive_chunk(term(), term()) -> term(). receive_chunk({'DOWN', _, _, _, _Reason}, _State) -> error(incomplete); -receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) -> +receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, + #{stream := StreamRef} = State) -> NewState = manage_chunk(IsFin, StreamRef, Data, State), case IsFin of fin -> @@ -566,7 +573,8 @@ receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) -> nofin -> {next_state, receive_chunk, NewState} end; -receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) -> +receive_chunk({gun_error, _Pid, StreamRef, _Reason}, + #{stream := StreamRef} = State) -> {next_state, at_rest, State, 0}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -677,6 +685,22 @@ encode_basic_auth([], []) -> encode_basic_auth(Username, Password) -> base64:encode(Username ++ [$: | Password]). +%% @private +-spec is_chunked([proplists:property()]) -> boolean(). +is_chunked(Headers) -> + <<"chunked">> =:= keyfind(<<"transfer-encoding">>, Headers). + +-spec is_event_stream([proplists:property()]) -> boolean(). +is_event_stream(Headers) -> + <<"text/event-stream">> =:= keyfind(<<"content-type">>, Headers). + +%% @private +keyfind(Key, List) -> + case lists:keyfind(Key, 1, List) of + {_, Value} -> Value; + _ -> undefined + end. + %% @private sse_events(IsFin, Data, State = #{buffer := Buffer}) -> NewBuffer = <>,