From 865428a88fb7add650e20aa212f3fda9e6cb0413 Mon Sep 17 00:00:00 2001 From: karl anderson Date: Fri, 23 Jan 2015 20:14:09 -0500 Subject: [PATCH] KAZOO-3258: flush the local cache prior to any operation that may update a couchdb document --- core/whistle_couch-1.0.0/src/couch_mgr.erl | 17 +++--- core/whistle_couch-1.0.0/src/couch_util.erl | 62 ++++++++++++++------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/core/whistle_couch-1.0.0/src/couch_mgr.erl b/core/whistle_couch-1.0.0/src/couch_mgr.erl index 5c80420c51f..adf45627351 100644 --- a/core/whistle_couch-1.0.0/src/couch_mgr.erl +++ b/core/whistle_couch-1.0.0/src/couch_mgr.erl @@ -529,19 +529,20 @@ cache_db_doc(DbName, DocId, Doc) -> {'error', _}=E -> E end. --spec flush_cache_doc(ne_binary(), ne_binary()) -> +-spec flush_cache_doc(ne_binary(), ne_binary() | wh_json:object()) -> 'ok' | {'error', 'invalid_db_name'}. --spec flush_cache_doc(ne_binary(), ne_binary(), wh_proplist()) -> +flush_cache_doc(DbName, Doc) -> + flush_cache_doc(DbName, Doc, []). + +-spec flush_cache_doc(ne_binary(), ne_binary() | wh_json:object(), wh_proplist()) -> 'ok' | {'error', 'invalid_db_name'}. -flush_cache_doc(DbName, DocId) -> - flush_cache_doc(DbName, DocId, []). -flush_cache_doc(DbName, DocId, Options) when ?VALID_DBNAME -> - couch_util:flush_cache_doc(wh_couch_connections:get_server(), DbName, DocId, Options); -flush_cache_doc(DbName, DocId, Options) -> +flush_cache_doc(DbName, Doc, Options) when ?VALID_DBNAME -> + couch_util:flush_cache_doc(DbName, Doc, Options); +flush_cache_doc(DbName, Doc, Options) -> case maybe_convert_dbname(DbName) of - {'ok', Db} -> flush_cache_doc(Db, DocId, Options); + {'ok', Db} -> flush_cache_doc(Db, Doc, Options); {'error', _}=E -> E end. diff --git a/core/whistle_couch-1.0.0/src/couch_util.erl b/core/whistle_couch-1.0.0/src/couch_util.erl index 2cdbc5adeab..0f36c663eb6 100644 --- a/core/whistle_couch-1.0.0/src/couch_util.erl +++ b/core/whistle_couch-1.0.0/src/couch_util.erl @@ -32,8 +32,12 @@ %% Doc related -export([open_cache_doc/4 ,cache_db_doc/3 - ,flush_cache_doc/4 - ,flush_cache_docs/0, flush_cache_docs/1 + ,flush_cache_doc/2 + ,flush_cache_doc/3 + ,flush_cache_docs/0 + ,flush_cache_docs/1 + ,flush_cache_docs/2 + ,flush_cache_docs/3 ,open_doc/4 ,lookup_doc_rev/3 ,save_doc/4 @@ -417,16 +421,24 @@ cache_db_doc(DbName, DocId, CacheValue) -> CacheProps = [{'origin', {'db', DbName, DocId}}], wh_cache:store_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}, CacheValue, CacheProps). --spec flush_cache_doc(server() | 'undefined', ne_binary() | db(), ne_binary(), wh_proplist()) -> 'ok'. -flush_cache_doc(Server, #db{name=Name}, DocId, Options) -> - flush_cache_doc(Server, wh_util:to_binary(Name), DocId, Options); -flush_cache_doc(_, DbName, DocId, _Options) -> - wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}). +-spec flush_cache_doc(ne_binary() | db(), ne_binary() | wh_json:object()) -> 'ok'. +flush_cache_doc(#db{name=Name}, Doc) -> + flush_cache_doc(#db{name=Name}, Doc, []). --spec flush_cache_docs() -> 'ok'. --spec flush_cache_docs(ne_binary()) -> 'ok'. +-spec flush_cache_doc(ne_binary() | db(), ne_binary() | wh_json:object(), wh_proplist()) -> 'ok'. +flush_cache_doc(#db{name=Name}, Doc, Options) -> + flush_cache_doc(wh_util:to_binary(Name), Doc, Options); +flush_cache_doc(DbName, DocId, _Options) when is_binary(DocId) -> + wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}); +flush_cache_doc(DbName, Doc, Options) -> + flush_cache_doc(DbName, doc_id(Doc), Options). +-spec flush_cache_docs() -> 'ok'. flush_cache_docs() -> wh_cache:flush_local(?WH_COUCH_CACHE). + +-spec flush_cache_docs(ne_binary() | db()) -> 'ok'. +flush_cache_docs(#db{name=Name}) -> + flush_cache_docs(wh_util:to_binary(Name)); flush_cache_docs(DbName) -> Filter = fun({?MODULE, DbName1, _DocId}=K, _) when DbName1 =:= DbName -> wh_cache:erase_local(?WH_COUCH_CACHE, K), @@ -436,6 +448,17 @@ flush_cache_docs(DbName) -> _ = wh_cache:filter_local(?WH_COUCH_CACHE, Filter), 'ok'. +-spec flush_cache_docs(ne_binary() | db(), ne_binaries() | wh_json:objects()) -> 'ok'. +flush_cache_docs(Db, Docs) -> + flush_cache_docs(Db, Docs, []). + +-spec flush_cache_docs(ne_binary() | db(), ne_binaries() | wh_json:objects(), wh_proplist()) -> 'ok'. +flush_cache_docs(Db, Docs, Options) -> + _ = [flush_cache_doc(Db, Doc, Options) + || Doc <- Docs + ], + 'ok'. + -spec open_doc(server(), ne_binary(), ne_binary(), wh_proplist()) -> {'ok', wh_json:object()} | couchbeam_error(). @@ -543,9 +566,7 @@ prepare_doc_for_del(Conn, #db{name=DbName}, Doc) -> couchbeam_error(). do_ensure_saved(#db{}=Db, Doc, Opts) -> case do_save_doc(Db, Doc, Opts) of - {'ok', JObj}=Ok -> - _ = maybe_publish_doc(Db, Doc, JObj), - Ok; + {'ok', _}=Ok -> Ok; {'error', 'conflict'} -> case do_fetch_rev(Db, doc_id(Doc)) of {'error', 'not_found'} -> @@ -582,9 +603,11 @@ do_fetch_doc(#db{}=Db, DocId, Options) -> do_save_doc(#db{}=Db, Docs, Options) when is_list(Docs) -> do_save_docs(Db, Docs, Options); do_save_doc(#db{}=Db, Doc, Options) -> - case ?RETRY_504(couchbeam:save_doc(Db, maybe_set_docid(Doc), Options)) of + PreparedDoc = maybe_set_docid(Doc), + _ = flush_cache_doc(Db, PreparedDoc), + case ?RETRY_504(couchbeam:save_doc(Db, PreparedDoc, Options)) of {'ok', JObj}=Ok -> - _ = maybe_publish_doc(Db, Doc, JObj), + _ = maybe_publish_doc(Db, PreparedDoc, JObj), Ok; Else -> Else end. @@ -609,6 +632,7 @@ do_save_docs(#db{}=Db, Docs, Options, Acc) -> case catch(lists:split(?MAX_BULK_INSERT, Docs)) of {'EXIT', _} -> PreparedDocs = [maybe_set_docid(D) || D <- Docs], + _ = flush_cache_docs(Db, PreparedDocs), case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of {'ok', JObjs} -> _ = maybe_publish_docs(Db, PreparedDocs, JObjs), @@ -617,6 +641,7 @@ do_save_docs(#db{}=Db, Docs, Options, Acc) -> end; {Save, Cont} -> PreparedDocs = [maybe_set_docid(D) || D <- Save], + _ = flush_cache_docs(Db, PreparedDocs), case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of {'ok', JObjs} -> _ = maybe_publish_docs(Db, PreparedDocs, JObjs), @@ -682,6 +707,7 @@ do_stream_attachment(#db{}=Db, DocId, AName, Caller) -> {'ok', wh_json:object()} | couchbeam_error(). do_put_attachment(#db{}=Db, DocId, AName, Contents, Options) -> + _ = flush_cache_doc(Db, DocId), case ?RETRY_504(couchbeam:put_attachment(Db, DocId, AName, Contents, Options)) of {'ok', JObj}=Ok -> maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), maybe_add_pvt_type(Db, DocId, JObj)), @@ -694,6 +720,7 @@ do_put_attachment(#db{}=Db, DocId, AName, Contents, Options) -> couchbeam_error(). do_del_attachment(#db{}=Db, DocId, AName, Options) -> Doc = wh_util:to_binary(http_uri:encode(wh_util:to_list(DocId))), + _ = flush_cache_doc(Db, DocId), case ?RETRY_504(couchbeam:delete_attachment(Db, Doc, AName, Options)) of {'ok', JObj}=Ok -> maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), maybe_add_pvt_type(Db, DocId, JObj)), @@ -796,15 +823,10 @@ retry504s(Fun, Cnt) -> end. -spec maybe_publish_docs(couchbeam_db(), wh_json:objects(), wh_json:objects()) -> 'ok'. -maybe_publish_docs(#db{name=DbName}=Db, Docs, JObjs) -> +maybe_publish_docs(#db{}=Db, Docs, JObjs) -> case couch_mgr:change_notice() of 'true' -> spawn(fun() -> - [wh_cache:erase_local(?WH_COUCH_CACHE - ,{?MODULE, wh_util:to_binary(DbName), doc_id(Doc)} - ) - || Doc <- Docs - ], [publish_doc(Db, Doc, JObj) || {Doc, JObj} <- lists:zip(Docs, JObjs), should_publish_doc(Doc)