diff --git a/applications/fax/src/fax_jobs.erl b/applications/fax/src/fax_jobs.erl index 607396f1870..8b20ce034ae 100644 --- a/applications/fax/src/fax_jobs.erl +++ b/applications/fax/src/fax_jobs.erl @@ -96,7 +96,7 @@ start_link(AccountId) -> -spec init([kz_term:ne_binary()]) -> {'ok', state(), timeout()}. init([AccountId]) -> _ = kz_util:spawn(fun cleanup_jobs/1, [AccountId]), - State = #state{account_id=AccountId + State = #state{account_id = AccountId ,limits = #{account => ?DEFAULT_LIMITS(AccountId) } ,jobs = ?INIT_JOBS }, @@ -207,7 +207,8 @@ distribute_jobs(#state{jobs=#{distribute := [] ,pending := #{} ,running := #{} } - }=State) -> State; + }=State) -> + State; distribute_jobs(#state{jobs=#{distribute := [] ,serialize := [] } @@ -220,7 +221,8 @@ distribute_jobs(#state{jobs=#{distribute := [] State#state{jobs=Jobs#{distribute => Serialize ,serialize => [] } - ,stale=0}; + ,stale=0 + }; distribute_jobs(#state{account_id=AccountId ,limits= #{account := MaxAccount} ,jobs=#{pending := Pending diff --git a/applications/fax/src/fax_worker.erl b/applications/fax/src/fax_worker.erl index 9c0bee2073d..6248dd58cd8 100644 --- a/applications/fax/src/fax_worker.erl +++ b/applications/fax/src/fax_worker.erl @@ -84,7 +84,6 @@ -define(DEFAULT_RETRY_COUNT, kapps_config:get_integer(?CONFIG_CAT, <<"default_retry_count">>, 3)). -define(DEFAULT_COMPARE_FIELD, kapps_config:get_binary(?CONFIG_CAT, <<"default_compare_field">>, <<"result_cause">>)). - -define(CALLFLOW_LIST, <<"callflows/listing_by_number">>). -define(ENSURE_CID_KEY, <<"ensure_valid_caller_id">>). -define(DEFAULT_ENSURE_CID, kapps_config:get_is_true(?CONFIG_CAT, ?ENSURE_CID_KEY, 'true')). @@ -120,7 +119,8 @@ start_link(FaxJob) -> ,{'queue_options', ?QUEUE_OPTIONS} ,{'consume_options', ?CONSUME_OPTIONS} ] - ,[FaxJob, CallId]). + ,[FaxJob, CallId] + ). -spec server_name(kz_json:object()) -> {'via', 'kz_globals', kz_term:ne_binary()}. server_name(FaxJob) -> @@ -144,13 +144,13 @@ handle_fax_event(JObj, Props) -> Srv = props:get_value('server', Props), JobId = kz_call_event:authorizing_id(JObj), Event = kz_call_event:application_event(JObj), - gen_server:cast(Srv, {'fax_status', Event , JobId, JObj}). + gen_server:cast(Srv, {'fax_status', Event, JobId, JObj}). -spec handle_job_status_query(kz_json:object(), kz_term:proplist()) -> any(). handle_job_status_query(JObj, Props) -> 'true' = kapi_fax:query_status_v(JObj), Srv = props:get_value('server', Props), - JobId = kz_json:get_value(<<"Job-ID">>, JObj), + JobId = kapi_fax:job_id(JObj), Queue = kz_api:server_id(JObj), MsgId = kz_api:msg_id(JObj), gen_server:cast(Srv, {'query_status', JobId, Queue, MsgId, JObj}). @@ -163,7 +163,7 @@ handle_job_status_query(JObj, Props) -> %% @doc Initializes the server. %% @end %%------------------------------------------------------------------------------ --spec init([kz_json:object() | kz_term:ne_binary()]) -> {'ok', state()}. +-spec init([fax_job() | kz_term:ne_binary()]) -> {'ok', state()}. init([FaxJob, CallId]) -> CtrlQ = kapi_fax:control_queue(FaxJob), JobId = kapi_fax:job_id(FaxJob), @@ -197,9 +197,11 @@ handle_cast({'tx_resp', JobId, JObj}, #state{job_id=JobId <<"SUCCESS">> -> lager:debug("received successful attempt to originate fax, continue processing"), send_status(State, <<"received successful attempt to originate fax">>), - {'noreply', State#state{stage = ?FAX_NEGOTIATE - ,status = <<"negotiating">> - }, ?NEGOTIATE_TIMEOUT + {'noreply' + ,State#state{stage = ?FAX_NEGOTIATE + ,status = <<"negotiating">> + } + ,?NEGOTIATE_TIMEOUT }; _Else -> lager:debug("received failed attempt to tx fax, releasing job: ~s", [_Else]), @@ -209,7 +211,7 @@ handle_cast({'tx_resp', JobId, JObj}, #state{job_id=JobId {'noreply', State#state{job=Doc, resp = Resp}} end; handle_cast({'tx_resp', JobId2, _}, #state{job_id=JobId}=State) -> - lager:debug("received txresp for ~s but this JobId is ~s",[JobId2, JobId]), + lager:debug("received txresp for ~s but this JobId is ~s", [JobId2, JobId]), {'noreply', State}; handle_cast({'fax_status', <<"negociateresult">>, JobId, JObj}, State) -> Data = kz_call_event:application_data(JObj), @@ -384,6 +386,7 @@ handle_cast(_Msg, State) -> handle_info('timeout', #state{stage='undefined'}=State) -> {'noreply', State}; handle_info('timeout', #state{stage=Stage, job=JObj}=State) -> + lager:debug("timeout waiting in stage ~s", [Stage]), {Resp, Doc} = release_failed_job('job_timeout', Stage, JObj), gen_server:cast(self(), 'stop'), {'noreply', State#state{job=Doc, resp = Resp}}; @@ -554,7 +557,7 @@ release_successful_job(Resp, JObj) -> 'false' -> 'undefined' end } - | fax_util:fax_properties(kz_json:get_value(<<"Application-Data">>, Resp, Resp)) + | fax_util:fax_properties(kz_json:get_json_value(<<"Application-Data">>, Resp, Resp)) ]), release_job(Result, JObj, Resp). @@ -565,34 +568,42 @@ release_job(Result, JObj) -> -spec release_job(kz_term:proplist(), kz_json:object(), kz_json:object()) -> release_ret(). release_job(Result, JObj, Resp) -> Success = props:is_true(<<"success">>, Result, 'false'), - Updaters = [fun(J) -> - Attempts = kz_json:get_integer_value(<<"attempts">>, J, 0), - kz_json:set_value(<<"attempts">>, Attempts + 1, J) - end + Updaters = [fun increment_attempts/1 ,fun(J) -> kz_json:set_value(<<"tx_result">>, kz_json:from_list(Result), J) end - ,fun(J) -> kz_json:delete_key(<<"pvt_queue">>, J) end + ,fun remove_pvt_queue/1 ,fun apply_reschedule_logic/1 - ,fun(J) -> - Attempts = kz_json:get_integer_value(<<"attempts">>, J, 0), - Retries = kz_json:get_integer_value(<<"retries">>, J, 1), - lager:debug("releasing job with retries: ~b attempts: ~b", [Retries, Attempts]), - case Retries - Attempts >= 1 of - _ when Success -> - lager:debug("releasing job with status: completed"), - kz_json:set_value(<<"pvt_job_status">>, <<"completed">>, J); - 'true' -> - lager:debug("releasing job with status: pending"), - kz_json:set_value(<<"pvt_job_status">>, <<"pending">>, J); - 'false' -> - lager:debug("releasing job with status: failed"), - kz_json:set_value(<<"pvt_job_status">>, <<"failed">>, J) - end - end + ,fun(J) -> update_job_status(J, Success) end ], Update = lists:foldl(fun(F, J) -> F(J) end, JObj, Updaters), {'ok', Saved} = kz_datamgr:ensure_saved(?KZ_FAXES_DB, Update), {Resp, Saved}. +-spec update_job_status(kz_json:object(), boolean()) -> kz_json:object(). +update_job_status(JObj, 'true') -> + lager:debug("releasing job with status: completed"), + kz_json:set_value(<<"pvt_job_status">>, <<"completed">>, JObj); +update_job_status(JObj, 'false') -> + Attempts = kz_json:get_integer_value(<<"attempts">>, JObj, 0), + Retries = kz_json:get_integer_value(<<"retries">>, JObj, 1), + lager:debug("unsuccessful job has attempted ~b of ~b retries", [Attempts, Retries]), + case Retries - Attempts >= 1 of + 'true' -> + lager:debug("releasing job with status: pending"), + kz_json:set_value(<<"pvt_job_status">>, <<"pending">>, JObj); + 'false' -> + lager:debug("releasing job with status: failed"), + kz_json:set_value(<<"pvt_job_status">>, <<"failed">>, JObj) + end. + +-spec increment_attempts(kz_json:object()) -> kz_json:object(). +increment_attempts(JObj) -> + Attempts = kz_json:get_integer_value(<<"attempts">>, JObj, 0), + kz_json:set_value(<<"attempts">>, Attempts + 1, JObj). + +-spec remove_pvt_queue(kz_json:object()) -> kz_json:object(). +remove_pvt_queue(JObj) -> + kz_json:delete_key(<<"pvt_queue">>, JObj). + -spec apply_reschedule_logic(kz_json:object()) -> kz_json:object(). apply_reschedule_logic(JObj) -> Map = kapps_config:get_json(?CONFIG_CAT, <<"reschedule">>, kz_json:new()), @@ -602,7 +613,8 @@ apply_reschedule_logic(JObj) -> JObj2; {'ok', JObj2} -> lager:debug("rule '~s' applied in fax reschedule logic" - ,[kz_json:get_value(<<"reschedule_rule">>, JObj2)]), + ,[kz_json:get_value(<<"reschedule_rule">>, JObj2)] + ), JObj2 end. @@ -662,11 +674,11 @@ maybe_notify(JObj, Resp, <<"failed">>) -> ]), kapps_notify_publisher:cast(Message, fun kapi_notifications:publish_fax_outbound_error/1); maybe_notify(_JObj, _Resp, Status) -> - lager:debug("notify Status ~p not handled",[Status]). + lager:debug("notify status ~p not handled", [Status]). -spec maybe_move_doc(kz_json:object(), kz_term:ne_binary()) -> {'ok', kz_json:object()} | - {'error', any()}. + kz_datamgr:data_error(). maybe_move_doc(JObj, <<"completed">>) -> move_doc(JObj); maybe_move_doc(JObj, <<"failed">>) -> @@ -676,7 +688,7 @@ maybe_move_doc(JObj, _) -> -spec move_doc(kz_json:object()) -> {'ok', kz_json:object()} | - {'error', any()}. + kz_datamgr:data_error(). move_doc(JObj) -> FromId = kz_doc:id(JObj), {Year, Month, _D} = kz_term:to_date(kz_doc:created(JObj)), @@ -686,22 +698,64 @@ move_doc(JObj) -> ToDB = kz_util:format_account_modb(AccountMODb, 'encoded'), ToId = ?MATCH_MODB_PREFIX(kz_term:to_binary(Year), kz_date:pad_month(Month), FromId), Options = ['override_existing_document' - ,{'transform', fun(_, B) -> kz_json:set_value(<<"folder">>, <<"outbox">>, B) end} + ,{'transform', fun move_to_outbox/2} ], lager:debug("moving fax outbound document ~s from faxes to ~s with id ~s", [FromId, AccountMODb, ToId]), - kazoo_modb:move_doc(FromDB, {<<"fax">>, FromId}, ToDB, ToId, Options). + case kazoo_modb:move_doc(FromDB, {<<"fax">>, FromId}, ToDB, ToId, Options) of + {'ok', _}=OK -> OK; + {'error', 'conflict'} -> + handle_move_conflict(JObj, FromDB, FromId, ToDB, ToId); + {'error', _}=Error -> Error + end. + +-spec handle_move_conflict(kz_json:object(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> + {'ok', kz_json:object()} | + kz_datamgr:data_error(). +handle_move_conflict(SourceJObj, FromDB, FromId, ToDB, ToId) -> + lager:info("moving ~s to ~s/~s conflicted", [FromId, ToDB, ToId]), + case kz_datamgr:open_cache_doc(ToDB, ToId) of + {'ok', MovedDoc} -> + handle_if_moved_successfully(SourceJObj, FromDB, FromId, ToDB, ToId, MovedDoc); + {'error', _E}=Error -> + lager:debug("failed to open moved doc: ~p", [_E]), + Error + end. + +-spec handle_if_moved_successfully(kz_json:object(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object()) -> + {'ok', kz_json:object()} | + {'error', 'conflict'}. +handle_if_moved_successfully(SourceJObj, FromDB, FromId, ToDB, ToId, MovedDoc) -> + case kz_doc:are_equal(SourceJObj, MovedDoc) of + 'true' -> + _Deleted = kz_datamgr:del_doc(FromDB, FromId), + lager:debug("deleted from doc: ~p", [_Deleted]), + {'ok', MovedDoc}; + 'false' -> + lager:info("docs don't match enough, removing destination and retrying"), + _ = kz_datamgr:del_doc(ToDB, ToId), + {'error', 'conflict'} + end. + +-spec move_to_outbox(kz_json:object(), kz_json:object()) -> kz_json:object(). +move_to_outbox(_SourceJObj, DestJObj) -> + kz_json:set_value(<<"folder">>, <<"outbox">>, DestJObj). -spec fax_error(kz_json:object()) -> kz_term:api_binary(). fax_error(JObj) -> - kz_json:get_first_defined([ [<<"Application-Data">>, <<"Fax-Result-Text">>] - , [<<"tx_result">>, <<"result_text">>] - ], JObj). + kz_json:get_first_defined([[<<"Application-Data">>, <<"Fax-Result-Text">>] + ,[<<"tx_result">>, <<"result_text">>] + ] + ,JObj + ). -spec notify_emails(kz_json:object()) -> kz_term:ne_binaries(). notify_emails(JObj) -> Emails = kz_json:get_first_defined([?NOTIFICATION_OUTBOUND_EMAIL ,?NOTIFICATION_EMAIL - ], JObj, []), + ] + ,JObj + ,[] + ), fax_util:notify_email_list(Emails). -spec notify_fields(kz_json:object(), kz_json:object()) -> kz_term:proplist(). @@ -819,7 +873,7 @@ send_fax(JobId, JObj, Q, ToDID) -> ,{<<"Application-Data">>, get_proxy_url(JobId)} ,{<<"Origination-Call-ID">>, CallId} ,{<<"Bypass-E164">>, kz_json:is_true(<<"bypass_e164">>, JObj)} - ,{<<"Fax-T38-Enabled">>, false} + ,{<<"Fax-T38-Enabled">>, 'false'} | kz_api:default_headers(Q, ?APP_NAME, ?APP_VERSION) ]), lager:debug("sending fax originate request ~s with call-id ~s", [JobId, CallId]), @@ -848,7 +902,7 @@ maybe_hunt_account_id(JObj, AccountId) -> resource_ccvs(JobId) -> kz_json:from_list([{<<"Authorizing-ID">>, JobId} ,{<<"Authorizing-Type">>, <<"outbound_fax">>} - ,{<<"RTCP-MUX">>, false} + ,{<<"RTCP-MUX">>, 'false'} ]). -spec get_did(kz_json:object()) -> kz_term:api_binary(). diff --git a/core/kazoo/src/kz_doc.erl b/core/kazoo/src/kz_doc.erl index 3f679cfc56b..61a1a65b098 100644 --- a/core/kazoo/src/kz_doc.erl +++ b/core/kazoo/src/kz_doc.erl @@ -107,6 +107,8 @@ ,setters/2 ]). +-export([are_equal/2]). + -ifdef(TEST). -export([remove_pvt/1]). -endif. @@ -853,3 +855,30 @@ setters(JObj, Routines) -> ,JObj ,Routines ). + +%%------------------------------------------------------------------------------ +%% @doc Compare documents for equality +%% +%% Two documents are considered equal if their public fields match and +%% their attachments metadata match +%% @end +%%------------------------------------------------------------------------------ +-spec are_equal(doc(), doc()) -> boolean(). +are_equal(FirstDoc, SecondDoc) -> + lists:all(fun(F) -> F(FirstDoc, SecondDoc) end + ,[fun are_equal_public/2 + ,fun are_equal_attachments/2 + ] + ). + +-spec are_equal_public(doc(), doc()) -> boolean(). +are_equal_public(FirstDoc, SecondDoc) -> + FirstPublic = public_fields(FirstDoc), + SecondPublic = public_fields(SecondDoc), + kz_json:are_equal(FirstPublic, SecondPublic). + +-spec are_equal_attachments(doc(), doc()) -> boolean(). +are_equal_attachments(FirstDoc, SecondDoc) -> + FirstAtts = attachments(FirstDoc), + SecondAtts = attachments(SecondDoc), + kz_json:are_equal(FirstAtts, SecondAtts). diff --git a/core/kazoo_data/src/kzs_doc.erl b/core/kazoo_data/src/kzs_doc.erl index 71922ef9445..f71079e86da 100644 --- a/core/kazoo_data/src/kzs_doc.erl +++ b/core/kazoo_data/src/kzs_doc.erl @@ -246,8 +246,9 @@ copy_doc(Src, Dst, CopySpec, CopyFun, Opts) -> {'ok', SourceDoc} -> Props = [{<<"_id">>, DestDocId} ,{<<"pvt_account_db">>, DestDbName} + | [{Key, 'null'} || Key <- ?DELETE_KEYS] ], - DestinationDoc = kz_json:set_values(Props, kz_json:delete_keys(?DELETE_KEYS, SourceDoc)), + DestinationDoc = kz_json:set_values(Props, SourceDoc), Doc = copy_transform(Transform, SourceDoc, DestinationDoc), case CopyFun(Dst, DestDbName, Doc, Options) of {'ok', JObj} -> diff --git a/core/kazoo_proper/src/pqc_kz_datamgr.erl b/core/kazoo_proper/src/pqc_kz_datamgr.erl new file mode 100644 index 00000000000..9fc4d8bf537 --- /dev/null +++ b/core/kazoo_proper/src/pqc_kz_datamgr.erl @@ -0,0 +1,57 @@ +%%%----------------------------------------------------------------------------- +%%% @copyright (C) 2011-2020, 2600Hz +%%% @doc data adapter behaviour +%%% @end +%%%----------------------------------------------------------------------------- +-module(pqc_kz_datamgr). + +-export([seq/0, seq_kzoo_56/0 + ,cleanup/0 + ]). + +-define(FROM_DB, <<"account%2F38%2F12%2F6201976b8666ab8c005daa7-from">>). +-define(FROM_DOC_ID, <>). +-define(TO_DB, <<"account%2F9f%2Fd0%2Fac025637b340164120d9e6c43-to">>). +-define(TO_DOC_ID, <>). + +-spec seq() -> 'ok'. +seq() -> + Fs = [fun seq_kzoo_56/0], + lists:foreach(fun run_it/1, Fs). + +-spec run_it(fun(() -> 'ok')) -> 'ok'. +run_it(F) -> F(). + +-spec seq_kzoo_56() -> 'ok'. +seq_kzoo_56() -> + Doc = kz_json:from_list([{<<"_id">>, ?FROM_DOC_ID} + | [{kz_binary:rand_hex(4), kz_binary:rand_hex(5)} || _ <- lists:seq(1,10)] + ]), + + 'true' = kz_datamgr:db_create(?FROM_DB), + 'true' = kz_datamgr:db_create(?TO_DB), + + {'ok', _SavedDoc} = kz_datamgr:save_doc(?FROM_DB, Doc), + + {'ok', MP3} = file:read_file(filename:join([code:priv_dir('kazoo_proper'), "mp3.mp3"])), + + _ = [save_mp3(?FROM_DB, ?FROM_DOC_ID, AttId, MP3) || AttId <- lists:seq(1, 3)], + + {'ok', SourceDoc} = kz_datamgr:open_doc(?FROM_DB, ?FROM_DOC_ID), + + {'ok', MovedDoc} = kz_datamgr:move_doc(?FROM_DB, ?FROM_DOC_ID, ?TO_DB, ?TO_DOC_ID, []), + + 'true' = kz_doc:are_equal(SourceDoc, MovedDoc), + lager:info("docs are the same"), + + cleanup(). + +-spec cleanup() -> 'ok'. +cleanup() -> + 'true' = kz_datamgr:db_delete(?FROM_DB), + 'true' = kz_datamgr:db_delete(?TO_DB), + lager:info("CLEANUP FINISHED"). + +save_mp3(DB, DocId, AttId, MP3) -> + AttachmentName = <<"att-", (AttId+$0)>>, + {'ok', _SavedWithAtt} = kz_datamgr:put_attachment(DB, DocId, AttachmentName, MP3).