2020#include < memory>
2121#include < mutex>
2222
23- #include " MilvusConnection.h"
24- #include " common.pb.h"
2523#include " milvus/MilvusClient.h"
26- #include " utils/RpcUtils .h"
24+ #include " utils/ConnectionHandler .h"
2725
2826/* *
2927 * @brief namespace milvus
@@ -36,7 +34,7 @@ class MilvusClientImpl : public MilvusClient {
3634 ~MilvusClientImpl () override ;
3735
3836 Status
39- Connect (const ConnectParam& connect_param ) final ;
37+ Connect (const ConnectParam& param ) final ;
4038
4139 Status
4240 Disconnect () final ;
@@ -356,126 +354,6 @@ class MilvusClientImpl : public MilvusClient {
356354 RemovePrivilegesFromGroup (const std::string& group_name, const std::vector<std::string>& privileges) final ;
357355
358356 private:
359- // This interface is not exposed to users
360- Status
361- getLoadingProgress (const std::string& collection_name, const std::vector<std::string> partition_names,
362- uint32_t & progress);
363-
364- private:
365- /* *
366- * Internal wait for status query done.
367- *
368- * @param [in] query_function one time query for return Status, return TIMEOUT status if not done
369- * @param [in] progress_monitor timeout setting for waiting progress
370- * @return Status, the final status
371- */
372- static Status
373- WaitForStatus (const std::function<Status(Progress&)>& query_function, const ProgressMonitor& progress_monitor);
374-
375- /* *
376- * @brief template for public api call
377- * validate -> pre -> rpc -> wait_for_status -> post
378- */
379- template <typename Request, typename Response>
380- Status
381- apiHandler (const std::function<Status(void )>& validate, std::function<Status(Request&)> pre ,
382- Status (MilvusConnection::*rpc)(const Request&, Response&, const GrpcOpts&),
383- std::function<Status(const Response&)> wait_for_status, std::function<Status(const Response&)> post ) {
384- if (connection_ == nullptr ) {
385- return {StatusCode::NOT_CONNECTED, " Connection is not created!" };
386- }
387-
388- // validate input
389- if (validate) {
390- auto status = validate ();
391- if (!status.IsOk ()) {
392- return status;
393- }
394- }
395-
396- // construct rpc request
397- Request rpc_request;
398- if (pre ) {
399- auto status = pre (rpc_request);
400- if (!status.IsOk ()) {
401- return status;
402- }
403- }
404-
405- // call rpc interface
406- Response rpc_response;
407- // the timeout value can be changed by MilvusClient::SetRpcDeadlineMs()
408- uint64_t timeout = connection_->GetConnectParam ().RpcDeadlineMs ();
409- auto func = std::bind (rpc, connection_.get (), rpc_request, std::placeholders::_1, GrpcOpts{timeout});
410- auto caller = [&func, &rpc_response]() { return func (rpc_response); };
411- auto status = Retry (caller, retry_param_);
412- if (!status.IsOk ()) {
413- // response's status already checked in connection class
414- return status;
415- }
416-
417- // wait loop
418- if (wait_for_status) {
419- status = wait_for_status (rpc_response);
420- if (!status.IsOk ()) {
421- return status;
422- }
423- }
424-
425- // process results
426- if (post ) {
427- status = post (rpc_response);
428- if (!status.IsOk ()) {
429- return status;
430- }
431- }
432- return Status::OK ();
433- }
434-
435- /* *
436- * @brief template for public api call
437- */
438- template <typename Request, typename Response>
439- Status
440- apiHandler (std::function<Status(void )> validate, std::function<Status(Request&)> pre ,
441- Status (MilvusConnection::*rpc)(const Request&, Response&, const GrpcOpts&),
442- std::function<Status(const Response&)> post ) {
443- return apiHandler (validate, pre , rpc, std::function<Status (const Response&)>{}, post );
444- }
445-
446- /* *
447- * @brief template for public api call
448- */
449- template <typename Request, typename Response>
450- Status
451- apiHandler (std::function<Status(void )> validate, std::function<Status(Request&)> pre ,
452- Status (MilvusConnection::*rpc)(const Request&, Response&, const GrpcOpts&)) {
453- return apiHandler (validate, pre , rpc, std::function<Status (const Response&)>{},
454- std::function<Status (const Response&)>{});
455- }
456-
457- /* *
458- * @brief template for public api call
459- */
460- template <typename Request, typename Response>
461- Status
462- apiHandler (std::function<Status(Request&)> pre ,
463- Status (MilvusConnection::*rpc)(const Request&, Response&, const GrpcOpts&),
464- std::function<Status(const Response&)> post ) {
465- return apiHandler (std::function<Status (void )>{}, pre , rpc, std::function<Status (const Response&)>{}, post );
466- }
467-
468- /* *
469- * @brief template for public api call
470- */
471- template <typename Request, typename Response>
472- Status
473- apiHandler (std::function<Status(Request&)> pre ,
474- Status (MilvusConnection::*rpc)(const Request&, Response&, const GrpcOpts&)) {
475- return apiHandler (std::function<Status (void )>{}, pre , rpc, std::function<Status (const Response&)>{},
476- std::function<Status (const Response&)>{});
477- }
478-
479357 /* *
480358 * @brief return desc if it is existing, else call describeCollection() and cache it
481359 */
@@ -494,16 +372,12 @@ class MilvusClientImpl : public MilvusClient {
494372 void
495373 removeCollectionDesc (const std::string& collection_name);
496374
497- std::string
498- currentDbName (const std::string& overwrite_db_name) const ;
499-
500375 template <typename ArgClass>
501376 Status
502377 iteratorPrepare (ArgClass& arguments);
503378
504379 private:
505- MilvusConnectionPtr connection_;
506- RetryParam retry_param_;
380+ ConnectionHandler connection_;
507381
508382 // cache of collection schemas
509383 // this cache is db level, once useDatabase() is called, this cache will be cleaned
0 commit comments