Skip to content

Commit

Permalink
[4.3] KZOO-56: handle conflict when moving faxes to MODB (#6386)
Browse files Browse the repository at this point in the history
* [4.3] KZOO-56: handle conflict when moving faxes to MODB

The faxes DB fills up when the `move_doc` fails with a conflict when
inserting the doc into the MODB. Handle the error by checking that the
document exists in the MODB and forcing deletion of the doc in the
faxes database.
  • Loading branch information
jamesaimonetti authored Mar 23, 2020
1 parent c8f7b00 commit bd168f6
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 47 deletions.
8 changes: 5 additions & 3 deletions applications/fax/src/fax_jobs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ start_link(AccountId) ->
-spec init([kz_term:ne_binary()]) -> {'ok', state(), timeout()}.
init([AccountId]) ->
_ = kz_util:spawn(fun cleanup_jobs/1, [AccountId]),
State = #state{account_id=AccountId
State = #state{account_id = AccountId
,limits = #{account => ?DEFAULT_LIMITS(AccountId) }
,jobs = ?INIT_JOBS
},
Expand Down Expand Up @@ -207,7 +207,8 @@ distribute_jobs(#state{jobs=#{distribute := []
,pending := #{}
,running := #{}
}
}=State) -> State;
}=State) ->
State;
distribute_jobs(#state{jobs=#{distribute := []
,serialize := []
}
Expand All @@ -220,7 +221,8 @@ distribute_jobs(#state{jobs=#{distribute := []
State#state{jobs=Jobs#{distribute => Serialize
,serialize => []
}
,stale=0};
,stale=0
};
distribute_jobs(#state{account_id=AccountId
,limits= #{account := MaxAccount}
,jobs=#{pending := Pending
Expand Down
140 changes: 97 additions & 43 deletions applications/fax/src/fax_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
-define(DEFAULT_RETRY_COUNT, kapps_config:get_integer(?CONFIG_CAT, <<"default_retry_count">>, 3)).
-define(DEFAULT_COMPARE_FIELD, kapps_config:get_binary(?CONFIG_CAT, <<"default_compare_field">>, <<"result_cause">>)).


-define(CALLFLOW_LIST, <<"callflows/listing_by_number">>).
-define(ENSURE_CID_KEY, <<"ensure_valid_caller_id">>).
-define(DEFAULT_ENSURE_CID, kapps_config:get_is_true(?CONFIG_CAT, ?ENSURE_CID_KEY, 'true')).
Expand Down Expand Up @@ -120,7 +119,8 @@ start_link(FaxJob) ->
,{'queue_options', ?QUEUE_OPTIONS}
,{'consume_options', ?CONSUME_OPTIONS}
]
,[FaxJob, CallId]).
,[FaxJob, CallId]
).

-spec server_name(kz_json:object()) -> {'via', 'kz_globals', kz_term:ne_binary()}.
server_name(FaxJob) ->
Expand All @@ -144,13 +144,13 @@ handle_fax_event(JObj, Props) ->
Srv = props:get_value('server', Props),
JobId = kz_call_event:authorizing_id(JObj),
Event = kz_call_event:application_event(JObj),
gen_server:cast(Srv, {'fax_status', Event , JobId, JObj}).
gen_server:cast(Srv, {'fax_status', Event, JobId, JObj}).

-spec handle_job_status_query(kz_json:object(), kz_term:proplist()) -> any().
handle_job_status_query(JObj, Props) ->
'true' = kapi_fax:query_status_v(JObj),
Srv = props:get_value('server', Props),
JobId = kz_json:get_value(<<"Job-ID">>, JObj),
JobId = kapi_fax:job_id(JObj),
Queue = kz_api:server_id(JObj),
MsgId = kz_api:msg_id(JObj),
gen_server:cast(Srv, {'query_status', JobId, Queue, MsgId, JObj}).
Expand All @@ -163,7 +163,7 @@ handle_job_status_query(JObj, Props) ->
%% @doc Initializes the server.
%% @end
%%------------------------------------------------------------------------------
-spec init([kz_json:object() | kz_term:ne_binary()]) -> {'ok', state()}.
-spec init([fax_job() | kz_term:ne_binary()]) -> {'ok', state()}.
init([FaxJob, CallId]) ->
CtrlQ = kapi_fax:control_queue(FaxJob),
JobId = kapi_fax:job_id(FaxJob),
Expand Down Expand Up @@ -197,9 +197,11 @@ handle_cast({'tx_resp', JobId, JObj}, #state{job_id=JobId
<<"SUCCESS">> ->
lager:debug("received successful attempt to originate fax, continue processing"),
send_status(State, <<"received successful attempt to originate fax">>),
{'noreply', State#state{stage = ?FAX_NEGOTIATE
,status = <<"negotiating">>
}, ?NEGOTIATE_TIMEOUT
{'noreply'
,State#state{stage = ?FAX_NEGOTIATE
,status = <<"negotiating">>
}
,?NEGOTIATE_TIMEOUT
};
_Else ->
lager:debug("received failed attempt to tx fax, releasing job: ~s", [_Else]),
Expand All @@ -209,7 +211,7 @@ handle_cast({'tx_resp', JobId, JObj}, #state{job_id=JobId
{'noreply', State#state{job=Doc, resp = Resp}}
end;
handle_cast({'tx_resp', JobId2, _}, #state{job_id=JobId}=State) ->
lager:debug("received txresp for ~s but this JobId is ~s",[JobId2, JobId]),
lager:debug("received txresp for ~s but this JobId is ~s", [JobId2, JobId]),
{'noreply', State};
handle_cast({'fax_status', <<"negociateresult">>, JobId, JObj}, State) ->
Data = kz_call_event:application_data(JObj),
Expand Down Expand Up @@ -384,6 +386,7 @@ handle_cast(_Msg, State) ->
handle_info('timeout', #state{stage='undefined'}=State) ->
{'noreply', State};
handle_info('timeout', #state{stage=Stage, job=JObj}=State) ->
lager:debug("timeout waiting in stage ~s", [Stage]),
{Resp, Doc} = release_failed_job('job_timeout', Stage, JObj),
gen_server:cast(self(), 'stop'),
{'noreply', State#state{job=Doc, resp = Resp}};
Expand Down Expand Up @@ -554,7 +557,7 @@ release_successful_job(Resp, JObj) ->
'false' -> 'undefined'
end
}
| fax_util:fax_properties(kz_json:get_value(<<"Application-Data">>, Resp, Resp))
| fax_util:fax_properties(kz_json:get_json_value(<<"Application-Data">>, Resp, Resp))
]),
release_job(Result, JObj, Resp).

Expand All @@ -565,34 +568,42 @@ release_job(Result, JObj) ->
-spec release_job(kz_term:proplist(), kz_json:object(), kz_json:object()) -> release_ret().
release_job(Result, JObj, Resp) ->
Success = props:is_true(<<"success">>, Result, 'false'),
Updaters = [fun(J) ->
Attempts = kz_json:get_integer_value(<<"attempts">>, J, 0),
kz_json:set_value(<<"attempts">>, Attempts + 1, J)
end
Updaters = [fun increment_attempts/1
,fun(J) -> kz_json:set_value(<<"tx_result">>, kz_json:from_list(Result), J) end
,fun(J) -> kz_json:delete_key(<<"pvt_queue">>, J) end
,fun remove_pvt_queue/1
,fun apply_reschedule_logic/1
,fun(J) ->
Attempts = kz_json:get_integer_value(<<"attempts">>, J, 0),
Retries = kz_json:get_integer_value(<<"retries">>, J, 1),
lager:debug("releasing job with retries: ~b attempts: ~b", [Retries, Attempts]),
case Retries - Attempts >= 1 of
_ when Success ->
lager:debug("releasing job with status: completed"),
kz_json:set_value(<<"pvt_job_status">>, <<"completed">>, J);
'true' ->
lager:debug("releasing job with status: pending"),
kz_json:set_value(<<"pvt_job_status">>, <<"pending">>, J);
'false' ->
lager:debug("releasing job with status: failed"),
kz_json:set_value(<<"pvt_job_status">>, <<"failed">>, J)
end
end
,fun(J) -> update_job_status(J, Success) end
],
Update = lists:foldl(fun(F, J) -> F(J) end, JObj, Updaters),
{'ok', Saved} = kz_datamgr:ensure_saved(?KZ_FAXES_DB, Update),
{Resp, Saved}.

-spec update_job_status(kz_json:object(), boolean()) -> kz_json:object().
update_job_status(JObj, 'true') ->
lager:debug("releasing job with status: completed"),
kz_json:set_value(<<"pvt_job_status">>, <<"completed">>, JObj);
update_job_status(JObj, 'false') ->
Attempts = kz_json:get_integer_value(<<"attempts">>, JObj, 0),
Retries = kz_json:get_integer_value(<<"retries">>, JObj, 1),
lager:debug("unsuccessful job has attempted ~b of ~b retries", [Attempts, Retries]),
case Retries - Attempts >= 1 of
'true' ->
lager:debug("releasing job with status: pending"),
kz_json:set_value(<<"pvt_job_status">>, <<"pending">>, JObj);
'false' ->
lager:debug("releasing job with status: failed"),
kz_json:set_value(<<"pvt_job_status">>, <<"failed">>, JObj)
end.

-spec increment_attempts(kz_json:object()) -> kz_json:object().
increment_attempts(JObj) ->
Attempts = kz_json:get_integer_value(<<"attempts">>, JObj, 0),
kz_json:set_value(<<"attempts">>, Attempts + 1, JObj).

-spec remove_pvt_queue(kz_json:object()) -> kz_json:object().
remove_pvt_queue(JObj) ->
kz_json:delete_key(<<"pvt_queue">>, JObj).

-spec apply_reschedule_logic(kz_json:object()) -> kz_json:object().
apply_reschedule_logic(JObj) ->
Map = kapps_config:get_json(?CONFIG_CAT, <<"reschedule">>, kz_json:new()),
Expand All @@ -602,7 +613,8 @@ apply_reschedule_logic(JObj) ->
JObj2;
{'ok', JObj2} ->
lager:debug("rule '~s' applied in fax reschedule logic"
,[kz_json:get_value(<<"reschedule_rule">>, JObj2)]),
,[kz_json:get_value(<<"reschedule_rule">>, JObj2)]
),
JObj2
end.

Expand Down Expand Up @@ -662,11 +674,11 @@ maybe_notify(JObj, Resp, <<"failed">>) ->
]),
kapps_notify_publisher:cast(Message, fun kapi_notifications:publish_fax_outbound_error/1);
maybe_notify(_JObj, _Resp, Status) ->
lager:debug("notify Status ~p not handled",[Status]).
lager:debug("notify status ~p not handled", [Status]).

-spec maybe_move_doc(kz_json:object(), kz_term:ne_binary()) ->
{'ok', kz_json:object()} |
{'error', any()}.
kz_datamgr:data_error().
maybe_move_doc(JObj, <<"completed">>) ->
move_doc(JObj);
maybe_move_doc(JObj, <<"failed">>) ->
Expand All @@ -676,7 +688,7 @@ maybe_move_doc(JObj, _) ->

-spec move_doc(kz_json:object()) ->
{'ok', kz_json:object()} |
{'error', any()}.
kz_datamgr:data_error().
move_doc(JObj) ->
FromId = kz_doc:id(JObj),
{Year, Month, _D} = kz_term:to_date(kz_doc:created(JObj)),
Expand All @@ -686,22 +698,64 @@ move_doc(JObj) ->
ToDB = kz_util:format_account_modb(AccountMODb, 'encoded'),
ToId = ?MATCH_MODB_PREFIX(kz_term:to_binary(Year), kz_date:pad_month(Month), FromId),
Options = ['override_existing_document'
,{'transform', fun(_, B) -> kz_json:set_value(<<"folder">>, <<"outbox">>, B) end}
,{'transform', fun move_to_outbox/2}
],
lager:debug("moving fax outbound document ~s from faxes to ~s with id ~s", [FromId, AccountMODb, ToId]),
kazoo_modb:move_doc(FromDB, {<<"fax">>, FromId}, ToDB, ToId, Options).
case kazoo_modb:move_doc(FromDB, {<<"fax">>, FromId}, ToDB, ToId, Options) of
{'ok', _}=OK -> OK;
{'error', 'conflict'} ->
handle_move_conflict(JObj, FromDB, FromId, ToDB, ToId);
{'error', _}=Error -> Error
end.

-spec handle_move_conflict(kz_json:object(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) ->
{'ok', kz_json:object()} |
kz_datamgr:data_error().
handle_move_conflict(SourceJObj, FromDB, FromId, ToDB, ToId) ->
lager:info("moving ~s to ~s/~s conflicted", [FromId, ToDB, ToId]),
case kz_datamgr:open_cache_doc(ToDB, ToId) of
{'ok', MovedDoc} ->
handle_if_moved_successfully(SourceJObj, FromDB, FromId, ToDB, ToId, MovedDoc);
{'error', _E}=Error ->
lager:debug("failed to open moved doc: ~p", [_E]),
Error
end.

-spec handle_if_moved_successfully(kz_json:object(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object()) ->
{'ok', kz_json:object()} |
{'error', 'conflict'}.
handle_if_moved_successfully(SourceJObj, FromDB, FromId, ToDB, ToId, MovedDoc) ->
case kz_doc:are_equal(SourceJObj, MovedDoc) of
'true' ->
_Deleted = kz_datamgr:del_doc(FromDB, FromId),
lager:debug("deleted from doc: ~p", [_Deleted]),
{'ok', MovedDoc};
'false' ->
lager:info("docs don't match enough, removing destination and retrying"),
_ = kz_datamgr:del_doc(ToDB, ToId),
{'error', 'conflict'}
end.

-spec move_to_outbox(kz_json:object(), kz_json:object()) -> kz_json:object().
move_to_outbox(_SourceJObj, DestJObj) ->
kz_json:set_value(<<"folder">>, <<"outbox">>, DestJObj).

-spec fax_error(kz_json:object()) -> kz_term:api_binary().
fax_error(JObj) ->
kz_json:get_first_defined([ [<<"Application-Data">>, <<"Fax-Result-Text">>]
, [<<"tx_result">>, <<"result_text">>]
], JObj).
kz_json:get_first_defined([[<<"Application-Data">>, <<"Fax-Result-Text">>]
,[<<"tx_result">>, <<"result_text">>]
]
,JObj
).

-spec notify_emails(kz_json:object()) -> kz_term:ne_binaries().
notify_emails(JObj) ->
Emails = kz_json:get_first_defined([?NOTIFICATION_OUTBOUND_EMAIL
,?NOTIFICATION_EMAIL
], JObj, []),
]
,JObj
,[]
),
fax_util:notify_email_list(Emails).

-spec notify_fields(kz_json:object(), kz_json:object()) -> kz_term:proplist().
Expand Down Expand Up @@ -819,7 +873,7 @@ send_fax(JobId, JObj, Q, ToDID) ->
,{<<"Application-Data">>, get_proxy_url(JobId)}
,{<<"Origination-Call-ID">>, CallId}
,{<<"Bypass-E164">>, kz_json:is_true(<<"bypass_e164">>, JObj)}
,{<<"Fax-T38-Enabled">>, false}
,{<<"Fax-T38-Enabled">>, 'false'}
| kz_api:default_headers(Q, ?APP_NAME, ?APP_VERSION)
]),
lager:debug("sending fax originate request ~s with call-id ~s", [JobId, CallId]),
Expand Down Expand Up @@ -848,7 +902,7 @@ maybe_hunt_account_id(JObj, AccountId) ->
resource_ccvs(JobId) ->
kz_json:from_list([{<<"Authorizing-ID">>, JobId}
,{<<"Authorizing-Type">>, <<"outbound_fax">>}
,{<<"RTCP-MUX">>, false}
,{<<"RTCP-MUX">>, 'false'}
]).

-spec get_did(kz_json:object()) -> kz_term:api_binary().
Expand Down
29 changes: 29 additions & 0 deletions core/kazoo/src/kz_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
,setters/2
]).

-export([are_equal/2]).

-ifdef(TEST).
-export([remove_pvt/1]).
-endif.
Expand Down Expand Up @@ -853,3 +855,30 @@ setters(JObj, Routines) ->
,JObj
,Routines
).

%%------------------------------------------------------------------------------
%% @doc Compare documents for equality
%%
%% Two documents are considered equal if their public fields match and
%% their attachments metadata match
%% @end
%%------------------------------------------------------------------------------
-spec are_equal(doc(), doc()) -> boolean().
are_equal(FirstDoc, SecondDoc) ->
lists:all(fun(F) -> F(FirstDoc, SecondDoc) end
,[fun are_equal_public/2
,fun are_equal_attachments/2
]
).

-spec are_equal_public(doc(), doc()) -> boolean().
are_equal_public(FirstDoc, SecondDoc) ->
FirstPublic = public_fields(FirstDoc),
SecondPublic = public_fields(SecondDoc),
kz_json:are_equal(FirstPublic, SecondPublic).

-spec are_equal_attachments(doc(), doc()) -> boolean().
are_equal_attachments(FirstDoc, SecondDoc) ->
FirstAtts = attachments(FirstDoc),
SecondAtts = attachments(SecondDoc),
kz_json:are_equal(FirstAtts, SecondAtts).
3 changes: 2 additions & 1 deletion core/kazoo_data/src/kzs_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ copy_doc(Src, Dst, CopySpec, CopyFun, Opts) ->
{'ok', SourceDoc} ->
Props = [{<<"_id">>, DestDocId}
,{<<"pvt_account_db">>, DestDbName}
| [{Key, 'null'} || Key <- ?DELETE_KEYS]
],
DestinationDoc = kz_json:set_values(Props, kz_json:delete_keys(?DELETE_KEYS, SourceDoc)),
DestinationDoc = kz_json:set_values(Props, SourceDoc),
Doc = copy_transform(Transform, SourceDoc, DestinationDoc),
case CopyFun(Dst, DestDbName, Doc, Options) of
{'ok', JObj} ->
Expand Down
Loading

0 comments on commit bd168f6

Please sign in to comment.