diff --git a/bcos-gateway/bcos-gateway/Gateway.cpp b/bcos-gateway/bcos-gateway/Gateway.cpp index 5e1a38e9af..a31be2dd12 100644 --- a/bcos-gateway/bcos-gateway/Gateway.cpp +++ b/bcos-gateway/bcos-gateway/Gateway.cpp @@ -38,6 +38,10 @@ using namespace bcos::crypto; void Gateway::start() { + if (m_gatewayRateLimiter) + { + m_gatewayRateLimiter->start(); + } if (m_p2pInterface) { m_p2pInterface->start(); @@ -50,10 +54,6 @@ void Gateway::start() { m_gatewayNodeManager->start(); } - if (m_rateLimiterStatisticsTimer) - { - m_rateLimiterStatisticsTimer->start(); - } GATEWAY_LOG(INFO) << LOG_DESC("start end."); @@ -62,10 +62,6 @@ void Gateway::start() void Gateway::stop() { - if (m_rateLimiterStatisticsTimer) - { - m_rateLimiterStatisticsTimer->stop(); - } // erase the registered handler if (m_p2pInterface) { @@ -81,6 +77,10 @@ void Gateway::stop() { m_gatewayNodeManager->stop(); } + if (m_gatewayRateLimiter) + { + m_gatewayRateLimiter->stop(); + } GATEWAY_LOG(INFO) << LOG_DESC("stop end."); return; } @@ -437,155 +437,6 @@ bool Gateway::checkGroupInfo(bcos::group::GroupInfo::Ptr _groupInfo) return true; } -bool Gateway::checkBWRateLimit(ratelimiter::RateLimiterManager::Ptr _rateLimiterManager, - const std::string& _endPoint, const std::string& _groupID, uint16_t _moduleID, - uint64_t _msgLength, SessionCallbackFunc _callback) -{ - // endpoint of the p2p connection - const std::string& endPoint = _endPoint; - // group of the message, empty string means the message is p2p's own message - const std::string& groupID = _groupID; - // moduleID of the message, zero means the message is p2p's own message - uint16_t moduleID = _moduleID; - // the length of the message - uint64_t msgLength = _msgLength; - - std::string errorMsg; - do - { - // total outgoing bandwidth - ratelimiter::RateLimiterInterface::Ptr totalOutGoingBWLimit = - _rateLimiterManager->getRateLimiter( - ratelimiter::RateLimiterManager::TOTAL_OUTGOING_KEY); - - // connection outgoing bandwidth - ratelimiter::RateLimiterInterface::Ptr connOutGoingBWLimit = - _rateLimiterManager->getConnRateLimiter(endPoint); - - // group outgoing bandwidth - ratelimiter::RateLimiterInterface::Ptr groupOutGoingBWLimit = nullptr; - if (!groupID.empty()) - { - groupOutGoingBWLimit = _rateLimiterManager->getGroupRateLimiter(groupID); - } - - auto modulesWithNoBwLimit = _rateLimiterManager->modulesWithNoBwLimit(); - - // if moduleID is zero, the P2P network itself's message, the ratelimiter does not limit - // P2P own's messages - if (moduleID == 0) - { - if (totalOutGoingBWLimit) - { - totalOutGoingBWLimit->tryAcquire(msgLength); - } - - if (connOutGoingBWLimit) - { - connOutGoingBWLimit->tryAcquire(msgLength); - } - } - // if moduleID is not zero, the message comes from the front - // There are two scenarios: - // 1. ulimit module message rate or - // 2. limit module message rate - else if (modulesWithNoBwLimit.count(moduleID)) - { // case 1: ulimit module message rate or, just for statistic - - if (totalOutGoingBWLimit) - { - totalOutGoingBWLimit->tryAcquire(msgLength); - } - - if (connOutGoingBWLimit) - { - connOutGoingBWLimit->tryAcquire(msgLength); - } - - if (groupOutGoingBWLimit) - { - groupOutGoingBWLimit->tryAcquire(msgLength); - } - } - else - { // case 2: limit module message rate - - if (totalOutGoingBWLimit && !totalOutGoingBWLimit->tryAcquire(msgLength)) - { - // total outgoing bandwidth overflow - errorMsg = "the network total outgoing bandwidth overflow"; - break; - } - - if (connOutGoingBWLimit && !connOutGoingBWLimit->tryAcquire(msgLength)) - { - // connection outgoing bandwidth overflow - errorMsg = - "the network connection outgoing bandwidth overflow, endpoint: " + endPoint; - if (totalOutGoingBWLimit) - { - totalOutGoingBWLimit->rollback(msgLength); - } - - break; - } - - if (groupOutGoingBWLimit && !groupOutGoingBWLimit->tryAcquire(msgLength)) - { - // group outgoing bandwidth overflow - errorMsg = "the group outgoing bandwidth overflow, groupID: " + groupID; - if (totalOutGoingBWLimit) - { - totalOutGoingBWLimit->rollback(msgLength); - } - - if (connOutGoingBWLimit) - { - connOutGoingBWLimit->rollback(msgLength); - } - - break; - } - } - - m_rateLimiterStatistics->updateOutGoing(endPoint, msgLength, true); - m_rateLimiterStatistics->updateOutGoing(groupID, moduleID, msgLength, true); - - return true; - } while (0); - - m_rateLimiterStatistics->updateOutGoing(endPoint, msgLength, false); - m_rateLimiterStatistics->updateOutGoing(groupID, moduleID, msgLength, false); - - // TODO: use thread pool - if (_callback) - { - _callback(NetworkException(BandwidthOverFlow, errorMsg), Message::Ptr()); - } - - return false; -} - -// gateway bw check -bool Gateway::checkBWRateLimit( - SessionFace::Ptr _session, Message::Ptr _msg, SessionCallbackFunc _callback) -{ - GatewayMessageExtAttributes::Ptr msgExtAttributes = nullptr; - if (_msg->extAttributes()) - { - msgExtAttributes = - std::dynamic_pointer_cast(_msg->extAttributes()); - } - - std::string groupID = msgExtAttributes ? msgExtAttributes->groupID() : std::string(); - uint16_t moduleID = msgExtAttributes ? msgExtAttributes->moduleID() : 0; - std::string endPoint = _session->nodeIPEndpoint().address(); - uint64_t msgLength = _msg->length(); - - return checkBWRateLimit( - m_rateLimiterManager, endPoint, groupID, moduleID, msgLength, _callback); -} - void Gateway::asyncNotifyGroupInfo( bcos::group::GroupInfo::Ptr _groupInfo, std::function _callback) { @@ -641,7 +492,7 @@ void Gateway::onReceiveP2PMessage( } */ - m_rateLimiterStatistics->updateInComing(groupID, moduleID, _msg->length()); + m_gatewayRateLimiter->checkInComing(groupID, moduleID, _msg->length()); auto srcNodeID = options->srcNodeID(); const auto& dstNodeIDs = options->dstNodeIDs(); @@ -689,7 +540,7 @@ void Gateway::onReceiveBroadcastMessage( // moduleID uint16_t moduleID = options->moduleID(); - m_rateLimiterStatistics->updateInComing(groupID, moduleID, _msg->length()); + m_gatewayRateLimiter->checkInComing(groupID, moduleID, _msg->length()); /* // TODO: if outgoing bandwidth exceeds the upper limit diff --git a/bcos-gateway/bcos-gateway/Gateway.h b/bcos-gateway/bcos-gateway/Gateway.h index 97662c57ab..5979807419 100644 --- a/bcos-gateway/bcos-gateway/Gateway.h +++ b/bcos-gateway/bcos-gateway/Gateway.h @@ -20,6 +20,7 @@ #pragma once +#include "bcos-gateway/libratelimit/GateWayRateLimiter.h" #include #include #include @@ -40,16 +41,14 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this; Gateway(std::string const& _chainID, P2PInterface::Ptr _p2pInterface, GatewayNodeManager::Ptr _gatewayNodeManager, bcos::amop::AMOPImpl::Ptr _amop, - ratelimiter::RateLimiterManager::Ptr _rateLimiterManager, - ratelimiter::RateLimiterStatistics::Ptr _rateLimiterStatistics, + ratelimiter::GateWayRateLimiter::Ptr _gatewayRateLimiter, std::string _gatewayServiceName = "localGateway") : m_gatewayServiceName(_gatewayServiceName), m_chainID(_chainID), m_p2pInterface(_p2pInterface), m_gatewayNodeManager(_gatewayNodeManager), m_amop(_amop), - m_rateLimiterManager(_rateLimiterManager), - m_rateLimiterStatistics(_rateLimiterStatistics) + m_gatewayRateLimiter(_gatewayRateLimiter) { m_p2pInterface->registerHandlerByMsgType(GatewayMessageType::PeerToPeerMessage, boost::bind(&Gateway::onReceiveP2PMessage, this, boost::placeholders::_1, @@ -58,20 +57,6 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_thisregisterHandlerByMsgType(GatewayMessageType::BroadcastMessage, boost::bind(&Gateway::onReceiveBroadcastMessage, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3)); - - m_rateLimiterStatisticsTimer = - std::make_shared(m_rateLimiterStatisticsPeriodMS, "ratelimiter_reporter"); - auto rateLimiterStatisticsTimer = m_rateLimiterStatisticsTimer; - auto _rateLimiterStatisticsPeriodMS = m_rateLimiterStatisticsPeriodMS; - m_rateLimiterStatisticsTimer->registerTimeoutHandler( - [rateLimiterStatisticsTimer, _rateLimiterStatisticsPeriodMS, _rateLimiterStatistics, - _rateLimiterManager]() { - auto io = _rateLimiterStatistics->inAndOutStat(_rateLimiterStatisticsPeriodMS); - GATEWAY_LOG(DEBUG) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.first); - GATEWAY_LOG(DEBUG) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.second); - _rateLimiterStatistics->flushStat(); - rateLimiterStatisticsTimer->restart(); - }); } ~Gateway() override { stop(); } @@ -193,19 +178,6 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_thisunregisterNode(_groupID, _nodeID); } - // gateway traffic limiting policy impl - bool checkBWRateLimit(ratelimiter::RateLimiterManager::Ptr _rateLimiterManager, - const std::string& _endPoint, const std::string& _groupID, uint16_t _moduleID, - uint64_t _msgLength, SessionCallbackFunc _callback); - bool checkBWRateLimit( - SessionFace::Ptr _session, Message::Ptr _msg, SessionCallbackFunc _callback); - - uint32_t rateLimiterStatisticsPeriodMS() const { return m_rateLimiterStatisticsPeriodMS; } - void setRateLimiterStatisticsPeriodMS(uint32_t _rateLimiterStatisticsPeriodMS) - { - m_rateLimiterStatisticsPeriodMS = _rateLimiterStatisticsPeriodMS; - } - protected: // for UT Gateway() {} @@ -234,15 +206,8 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this m_rateLimiterStatisticsTimer; + ratelimiter::GateWayRateLimiter::Ptr m_gatewayRateLimiter; }; } // namespace gateway } // namespace bcos diff --git a/bcos-gateway/bcos-gateway/GatewayConfig.cpp b/bcos-gateway/bcos-gateway/GatewayConfig.cpp index f74bd3bca4..052ccf2357 100644 --- a/bcos-gateway/bcos-gateway/GatewayConfig.cpp +++ b/bcos-gateway/bcos-gateway/GatewayConfig.cpp @@ -362,6 +362,9 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt) [flow_control] ; rate limiter stat reporter interval, unit: ms ; stat_reporter_interval=60000 + ; + ; rate limiter stat reporter info level, default: 1 + ; stat_reporter_level=1 ; the module that does not limit bandwidth ; list of all modules: raft,pbft,amop,block_sync,txs_sync,light_node,cons_txs_sync @@ -395,6 +398,9 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt) // stat_reporter_interval=60000 int32_t statReporterInterval = _pt.get("flow_control.stat_reporter_interval", 60000); + // stat_reporter_level=1 + int32_t statReporterLevel = _pt.get("flow_control.stat_reporter_level", 1); + // modules_without_bw_limit=raft,pbft std::string strNoLimitModules = _pt.get("flow_control.modules_without_bw_limit", "raft,pbft,cons_txs_sync"); @@ -517,6 +523,7 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt) } m_rateLimiterConfig.statReporterInterval = statReporterInterval; + m_rateLimiterConfig.statReporterLevel = statReporterLevel; m_rateLimiterConfig.modulesWithNoBwLimit = moduleIDs; m_rateLimiterConfig.totalOutgoingBwLimit = totalOutgoingBwLimit; m_rateLimiterConfig.connOutgoingBwLimit = connOutgoingBwLimit; @@ -542,6 +549,7 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt) << LOG_KV("rateLimiterConfigEffect", m_rateLimiterConfig.hasRateLimiterConfigEffect()) << LOG_KV("statReporterInterval", statReporterInterval) + << LOG_KV("statReporterLevel", statReporterLevel) << LOG_KV("totalOutgoingBwLimit", totalOutgoingBwLimit) << LOG_KV("connOutgoingBwLimit", connOutgoingBwLimit) << LOG_KV("groupOutgoingBwLimit", groupOutgoingBwLimit) diff --git a/bcos-gateway/bcos-gateway/GatewayConfig.h b/bcos-gateway/bcos-gateway/GatewayConfig.h index 3e208ff148..e24418f2e4 100644 --- a/bcos-gateway/bcos-gateway/GatewayConfig.h +++ b/bcos-gateway/bcos-gateway/GatewayConfig.h @@ -43,11 +43,22 @@ class GatewayConfig std::string enNodeKey; }; + // config for redis + struct RedisConfig + { + std::string redisServerIP = "127.0.0.1"; + uint16_t redisServerPort = 6379; + int32_t redisTimeOut = -1; + int32_t redisPoolSize = 16; + }; + // config for rate limit struct RateLimiterConfig { - // report interval ms + // stat reporter interval, unit: ms int32_t statReporterInterval = 60000; + // stat reporter info level, default: 1 + uint32_t statReporterLevel = 1; // total outgoing bandwidth limit int64_t totalOutgoingBwLimit = -1; @@ -126,6 +137,7 @@ class GatewayConfig CertConfig certConfig() const { return m_certConfig; } SMCertConfig smCertConfig() const { return m_smCertConfig; } RateLimiterConfig rateLimiterConfig() const { return m_rateLimiterConfig; } + RedisConfig redisConfig() const { return m_redisConfig; } const std::set& connectedNodes() const { return m_connectedNodes; } @@ -149,6 +161,7 @@ class GatewayConfig SMCertConfig m_smCertConfig; RateLimiterConfig m_rateLimiterConfig; + RedisConfig m_redisConfig; std::string m_certPath; std::string m_nodePath; diff --git a/bcos-gateway/bcos-gateway/GatewayFactory.cpp b/bcos-gateway/bcos-gateway/GatewayFactory.cpp index 9c8f9c50dc..a255fff86f 100644 --- a/bcos-gateway/bcos-gateway/GatewayFactory.cpp +++ b/bcos-gateway/bcos-gateway/GatewayFactory.cpp @@ -4,9 +4,13 @@ */ #include "bcos-gateway/libratelimit/DistributedRateLimiter.h" +#include "bcos-gateway/libratelimit/GateWayRateLimiter.h" +#include "bcos-utilities/BoostLog.h" +#include "bcos-utilities/Common.h" #include #include #include +#include #include #include #include @@ -17,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -26,7 +31,11 @@ #include #include #include +#include +#include #include +#include +#include using namespace bcos::rpc; using namespace bcos; @@ -393,9 +402,11 @@ std::shared_ptr GatewayFactory::buildRateLimite rateLimiterManager->setModulesWithNoBwLimit(_rateLimiterConfig.modulesWithNoBwLimit); rateLimiterManager->setRateLimiterFactory(rateLimiterFactory); - + /* + // TODO: auto dsRateLimiter = std::make_shared(-1); std::ignore = dsRateLimiter; + */ return rateLimiterManager; } @@ -463,13 +474,25 @@ std::shared_ptr GatewayFactory::buildGateway(GatewayConfig::Ptr _config service->setMessageFactory(messageFactory); service->setKeyFactory(keyFactory); +// TODO: add the redis initialization switch +#if 0 + auto redisConfig = _config->redisConfig(); + + auto redis = initRedis(redisConfig.redisServerIP, redisConfig.redisServerPort, + redisConfig.redisPoolSize, redisConfig.redisTimeOut); +#endif + // init rate limit const auto& rateLimiterConfig = _config->rateLimiterConfig(); auto rateLimiterManager = buildRateLimiterManager(_config->rateLimiterConfig()); - auto rateLimiterStatistics = std::make_shared(); - auto rateLimiterStatisticsWeakPtr = - std::weak_ptr(rateLimiterStatistics); + auto rateLimiterStat = std::make_shared(); + auto rateLimiterStatWeakPtr = std::weak_ptr(rateLimiterStat); + + auto gatewayRateLimiter = + std::make_shared(rateLimiterManager, rateLimiterStat); + auto gatewayRateLimiterWeakptr = + std::weak_ptr(gatewayRateLimiter); // init GatewayNodeManager GatewayNodeManager::Ptr gatewayNodeManager; @@ -496,8 +519,8 @@ std::shared_ptr GatewayFactory::buildGateway(GatewayConfig::Ptr _config amop = buildAMOP(service, pubHex); } // init Gateway - auto gateway = std::make_shared(m_chainID, service, gatewayNodeManager, amop, - rateLimiterManager, rateLimiterStatistics, _gatewayServiceName); + auto gateway = std::make_shared( + m_chainID, service, gatewayNodeManager, amop, gatewayRateLimiter, _gatewayServiceName); auto weakptrGatewayNodeManager = std::weak_ptr(gatewayNodeManager); // register disconnect handler service->registerDisconnectHandler( @@ -519,28 +542,44 @@ std::shared_ptr GatewayFactory::buildGateway(GatewayConfig::Ptr _config nodeMgr->onRemoveNodeIDs(_unreachableNode); }); - auto gatewayWeakPtr = std::weak_ptr(gateway); - - service->setBeforeMessageHandler([gatewayWeakPtr](SessionFace::Ptr _session, + service->setBeforeMessageHandler([gatewayRateLimiterWeakptr](SessionFace::Ptr _session, Message::Ptr _msg, SessionCallbackFunc _callback) { - auto gateway = gatewayWeakPtr.lock(); - if (!gateway) + auto gatewayRateLimiter = gatewayRateLimiterWeakptr.lock(); + if (!gatewayRateLimiter) { return true; } + GatewayMessageExtAttributes::Ptr msgExtAttributes = nullptr; + if (_msg->extAttributes()) + { + msgExtAttributes = + std::dynamic_pointer_cast(_msg->extAttributes()); + } + + std::string groupID = msgExtAttributes ? msgExtAttributes->groupID() : std::string(); + uint16_t moduleID = msgExtAttributes ? msgExtAttributes->moduleID() : 0; + std::string endpoint = _session->nodeIPEndpoint().address(); + uint64_t msgLength = _msg->length(); + // bandwidth limit check - return gateway->checkBWRateLimit(_session, _msg, _callback); + auto r = gatewayRateLimiter->checkOutGoing(endpoint, groupID, moduleID, msgLength); + if (!r.first && _callback) + { + _callback(NetworkException(BandwidthOverFlow, r.second), Message::Ptr()); + } + + return r.first; }); service->setOnMessageHandler( - [rateLimiterStatisticsWeakPtr](SessionFace::Ptr _session, Message::Ptr _message) { - auto rateLimiterStatistics = rateLimiterStatisticsWeakPtr.lock(); - if (rateLimiterStatistics) + [rateLimiterStatWeakPtr](SessionFace::Ptr _session, Message::Ptr _message) { + auto rateLimiterStat = rateLimiterStatWeakPtr.lock(); + if (rateLimiterStat) { - auto endPoint = _session->nodeIPEndpoint().address(); + auto endpoint = _session->nodeIPEndpoint().address(); auto msgLength = _message->length(); - rateLimiterStatistics->updateInComing(endPoint, msgLength); + rateLimiterStat->updateInComing(endpoint, msgLength); } }); @@ -607,6 +646,97 @@ void GatewayFactory::initFailOver( GATEWAY_FACTORY_LOG(INFO) << LOG_DESC("initFailOver for gateway success"); } +/** + * @brief init redis + * + * @param _redisIP + * @param _redisPort + * @param _redisPoolSize + * @param _redisTimeOut + * @return std::shared_ptr + */ +std::shared_ptr GatewayFactory::initRedis(const std::string& _redisIP, + uint16_t _redisPort, uint32_t _redisPoolSize, uint32_t _redisTimeOut) +{ + GATEWAY_FACTORY_LOG(INFO) << LOG_BADGE("initRedis") << LOG_KV("redisIP", _redisIP) + << LOG_KV("redisPort", _redisPort) + << LOG_KV("redisPoolSize", _redisPoolSize) + << LOG_KV("redisTimeOut(ms)", _redisTimeOut); + + sw::redis::ConnectionOptions connection_options; + connection_options.host = _redisIP; // Required. + connection_options.port = _redisPort; // Optional. + // connection_options.password = "auth"; // Optional. No password by default. + // connection_options.db = 1; // Optional. Use the 0th database by default. + + // Optional. Timeout before we successfully send request to or receive response from redis. + // By default, the timeout is 0ms, i.e. never timeout and block until we send or receive + // successfully. NOTE: if any command is timed out, we throw a TimeoutError exception. + connection_options.socket_timeout = std::chrono::milliseconds(_redisTimeOut); + // connection_options.connect_timeout = std::chrono::milliseconds(3000); + connection_options.keep_alive = true; + + sw::redis::ConnectionPoolOptions pool_options; + // Pool size, i.e. max number of connections. + pool_options.size = _redisPoolSize; + + std::shared_ptr redis = nullptr; + try + { + // Connect to Redis server with a connection pool. + redis = std::make_shared(connection_options, pool_options); + + // test whether redis functions properly + // 1. set key + // 2. get key + // 3. del key + + std::string key = "Gateway -> " + std::to_string(utcTime()); + std::string value = "Hello, FISCO-BCOS 3.0."; + + bool setR = redis->set(key, value); + if (setR) + { + GATEWAY_FACTORY_LOG(INFO) << LOG_BADGE("initRedis") << LOG_DESC("set ok"); + + auto getR = redis->get(key); + if (getR) + { + GATEWAY_FACTORY_LOG(INFO) << LOG_BADGE("initRedis") << LOG_DESC("get ok") + << LOG_KV("key", key) << LOG_KV("value", getR.value()); + } + else + { + GATEWAY_FACTORY_LOG(WARNING) + << LOG_BADGE("initRedis") << LOG_DESC("get failed, why???"); + } + + redis->del(key); + } + else + { + GATEWAY_FACTORY_LOG(WARNING) + << LOG_BADGE("initRedis") << LOG_DESC("set failed, why???"); + } + } + catch (std::exception& e) + { + // Note: redis++ exception handling + // https://github.com/sewenew/redis-plus-plus#exception + std::exception_ptr ePtr = std::make_exception_ptr(e); + + GATEWAY_FACTORY_LOG(ERROR) + << LOG_BADGE("initRedis") << LOG_DESC("initialize redis exception") + << LOG_KV("error", e.what()); + + std::throw_with_nested(e); + } + + GATEWAY_FACTORY_LOG(INFO) << LOG_BADGE("initRedis") << LOG_DESC("initialize redis completely"); + + return redis; +} + bcos::amop::AMOPImpl::Ptr GatewayFactory::buildAMOP( P2PInterface::Ptr _network, P2pID const& _p2pNodeID) { diff --git a/bcos-gateway/bcos-gateway/GatewayFactory.h b/bcos-gateway/bcos-gateway/GatewayFactory.h index 26e1fa6f55..ad1b225d22 100644 --- a/bcos-gateway/bcos-gateway/GatewayFactory.h +++ b/bcos-gateway/bcos-gateway/GatewayFactory.h @@ -12,6 +12,7 @@ #include #include #include +#include #include namespace bcos @@ -58,22 +59,43 @@ class GatewayFactory const GatewayConfig::RateLimiterConfig& _rateLimiterConfig); /** - * @brief: construct Gateway - * @param _configPath: config.ini paths - * @return void + * @brief construct Gateway + * + * @param _configPath + * @param _airVersion + * @param _entryPoint + * @param _gatewayServiceName + * @return Gateway::Ptr */ Gateway::Ptr buildGateway(const std::string& _configPath, bool _airVersion, bcos::election::LeaderEntryPointInterface::Ptr _entryPoint, std::string const& _gatewayServiceName); + /** - * @brief: construct Gateway - * @param _config: config parameter object - * @return void + * @brief construct Gateway + * + * @param _config + * @param _airVersion + * @param _entryPoint + * @param _gatewayServiceName + * @return Gateway::Ptr */ Gateway::Ptr buildGateway(GatewayConfig::Ptr _config, bool _airVersion, bcos::election::LeaderEntryPointInterface::Ptr _entryPoint, std::string const& _gatewayServiceName); + /** + * @brief init redis + * + * @param _redisIP + * @param _redisPort + * @param _redisPoolSize + * @param _redisTimeOut + * @return std::shared_ptr + */ + std::shared_ptr initRedis(const std::string& _redisIP, uint16_t _redisPort, + uint32_t _redisPoolSize, uint32_t _redisTimeOut = 3000); + protected: virtual bcos::amop::AMOPImpl::Ptr buildAMOP( bcos::gateway::P2PInterface::Ptr _network, bcos::gateway::P2pID const& _p2pNodeID); diff --git a/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.cpp b/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.cpp index f78fc22e56..e65859f963 100644 --- a/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.cpp +++ b/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.cpp @@ -30,65 +30,6 @@ using namespace bcos; using namespace bcos::gateway; using namespace bcos::gateway::ratelimiter; -DistributedRateLimiter::DistributedRateLimiter(int64_t _maxQPS) -{ - std::ignore = _maxQPS; -} - -/* -std::shared_ptr DistributedRateLimiter::initRedis( - const std::string& _redisIP, uint16_t& _redisPort) -{ - - std::string redisIP = "127.0.0.1"; - uint16_t redisPort = 6379; - m_redis = initRedis(redisIP, redisPort); - - RATELIMIT_LOG(INFO) << LOG_BADGE("[NEWOBJ][DistributedRateLimiter]") - << LOG_KV("redisIP", redisIP) << LOG_KV("redisPort", redisPort); - - - sw::redis::ConnectionOptions connection_options; - connection_options.host = _redisIP; // Required. - connection_options.port = _redisPort; // Optional. - // connection_options.password = "auth"; // Optional. No password by default. - // connection_options.db = 1; // Optional. Use the 0th database by default. - - // Optional. Timeout before we successfully send request to or receive response from redis. - // By default, the timeout is 0ms, i.e. never timeout and block until we send or receive - // successfully. NOTE: if any command is timed out, we throw a TimeoutError exception. - connection_options.socket_timeout = std::chrono::milliseconds(2000); - connection_options.connect_timeout = std::chrono::milliseconds(2000); - - sw::redis::ConnectionPoolOptions pool_options; - // Pool size, i.e. max number of connections. - pool_options.size = 100; - - // Connect to Redis server with a connection pool. - auto redis = std::make_shared(connection_options, pool_options); - - - std::string key = "Redis Key"; - std::string value = "Hello, Redis."; - redis->set(key, value); - - auto r = redis->get(key); - if (r) - { - RATELIMIT_LOG(INFO) << LOG_BADGE("[NEWOBJ][DistributedRateLimiter]") - << LOG_DESC("redis get") << LOG_KV("key", key) - << LOG_KV("value", r.value()); - } - else - { - RATELIMIT_LOG(INFO) << LOG_BADGE("[NEWOBJ][DistributedRateLimiter]") - << LOG_DESC("redis get failed"); - } - - return redis; -} -*/ - /** * @brief acquire permits * diff --git a/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.h b/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.h index 9d5bc6e85a..d35bd4a25e 100644 --- a/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.h +++ b/bcos-gateway/bcos-gateway/libratelimit/DistributedRateLimiter.h @@ -22,6 +22,7 @@ #include #include +#include namespace bcos { @@ -30,11 +31,6 @@ namespace gateway namespace ratelimiter { -/** - * @brief - * Distributed limited bandwidth - */ - class DistributedRateLimiter : public RateLimiterInterface { public: @@ -43,7 +39,10 @@ class DistributedRateLimiter : public RateLimiterInterface using UniquePtr = std::unique_ptr; public: - DistributedRateLimiter(int64_t _maxQPS); + DistributedRateLimiter(const std::string& _rateLimitKey, int64_t _tokenRate, + std::shared_ptr _redis) + : m_rateLimitKey(_rateLimitKey), m_tokenRate(_tokenRate), m_redis(_redis) + {} DistributedRateLimiter(DistributedRateLimiter&&) = delete; DistributedRateLimiter(const DistributedRateLimiter&) = delete; @@ -78,6 +77,19 @@ class DistributedRateLimiter : public RateLimiterInterface * @return */ void rollback(int64_t _requiredPermits) override; + +public: + std::string rateLimitKey() const { return m_rateLimitKey; } + int64_t tokenRate() const { return m_tokenRate; } + std::shared_ptr redis() const { return m_redis; } + +private: + // key of distributed limit + std::string m_rateLimitKey; + // rate + int64_t m_tokenRate; + // redis instance + std::shared_ptr m_redis; }; } // namespace ratelimiter diff --git a/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.cpp b/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.cpp new file mode 100644 index 0000000000..554ec96bfe --- /dev/null +++ b/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.cpp @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2021 FISCO BCOS. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GateWayRateLimiter.cpp + * @author: octopus + * @date 2022-09-30 + */ + +#include + +using namespace bcos::gateway::ratelimiter; + +std::pair GateWayRateLimiter::checkOutGoing(const std::string& _endpoint, + const std::string& _groupID, uint16_t _moduleID, uint64_t _msgLength) +{ + // endpoint of the p2p connection + const std::string& endpoint = _endpoint; + // group of the message, empty string means the message is p2p's own message + const std::string& groupID = _groupID; + // moduleID of the message, zero means the message is p2p's own message + uint16_t moduleID = _moduleID; + // the length of the message + uint64_t msgLength = _msgLength; + + std::string errorMsg; + do + { + // total outgoing bandwidth + ratelimiter::RateLimiterInterface::Ptr totalOutGoingBWLimit = + m_rateLimiterManager->getRateLimiter( + ratelimiter::RateLimiterManager::TOTAL_OUTGOING_KEY); + + // connection outgoing bandwidth + ratelimiter::RateLimiterInterface::Ptr connOutGoingBWLimit = + m_rateLimiterManager->getConnRateLimiter(endpoint); + + // group outgoing bandwidth + ratelimiter::RateLimiterInterface::Ptr groupOutGoingBWLimit = nullptr; + if (!groupID.empty()) + { + groupOutGoingBWLimit = m_rateLimiterManager->getGroupRateLimiter(groupID); + } + + auto modulesWithNoBwLimit = m_rateLimiterManager->modulesWithNoBwLimit(); + + // if moduleID is zero, the P2P network itself's message, the ratelimiter does not limit + // P2P own's messages + if (moduleID == 0) + { + if (totalOutGoingBWLimit) + { + totalOutGoingBWLimit->tryAcquire(msgLength); + } + + if (connOutGoingBWLimit) + { + connOutGoingBWLimit->tryAcquire(msgLength); + } + } + // if moduleID is not zero, the message comes from the front + // There are two scenarios: + // 1. ulimit module message rate or + // 2. limit module message rate + else if (modulesWithNoBwLimit.count(moduleID)) + { // case 1: ulimit module message rate or, just for statistic + + if (totalOutGoingBWLimit) + { + totalOutGoingBWLimit->tryAcquire(msgLength); + } + + if (connOutGoingBWLimit) + { + connOutGoingBWLimit->tryAcquire(msgLength); + } + + if (groupOutGoingBWLimit) + { + groupOutGoingBWLimit->tryAcquire(msgLength); + } + } + else + { // case 2: limit module message rate + + if (totalOutGoingBWLimit && !totalOutGoingBWLimit->tryAcquire(msgLength)) + { + // total outgoing bandwidth overflow + errorMsg = "the network total outgoing bandwidth overflow"; + break; + } + + if (connOutGoingBWLimit && !connOutGoingBWLimit->tryAcquire(msgLength)) + { + // connection outgoing bandwidth overflow + errorMsg = + "the network connection outgoing bandwidth overflow, endpoint: " + endpoint; + if (totalOutGoingBWLimit) + { + totalOutGoingBWLimit->rollback(msgLength); + } + + break; + } + + if (groupOutGoingBWLimit && !groupOutGoingBWLimit->tryAcquire(msgLength)) + { + // group outgoing bandwidth overflow + errorMsg = "the group outgoing bandwidth overflow, groupID: " + groupID; + if (totalOutGoingBWLimit) + { + totalOutGoingBWLimit->rollback(msgLength); + } + + if (connOutGoingBWLimit) + { + connOutGoingBWLimit->rollback(msgLength); + } + + break; + } + } + + m_rateLimiterStat->updateOutGoing(endpoint, msgLength, true); + m_rateLimiterStat->updateOutGoing(groupID, moduleID, msgLength, true); + + return std::pair(true, ""); + } while (0); + + m_rateLimiterStat->updateOutGoing(endpoint, msgLength, false); + m_rateLimiterStat->updateOutGoing(groupID, moduleID, msgLength, false); + + return std::pair(false, errorMsg); +} + +std::pair GateWayRateLimiter::checkInComing( + const std::string& _groupID, uint16_t _moduleID, uint64_t _msgLength) +{ + m_rateLimiterStat->updateInComing(_groupID, _moduleID, _msgLength); + return std::pair(true, ""); +} diff --git a/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.h b/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.h new file mode 100644 index 0000000000..c789805d92 --- /dev/null +++ b/bcos-gateway/bcos-gateway/libratelimit/GateWayRateLimiter.h @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2021 FISCO BCOS. + * SPDX-License-Identifier: Apache-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @file GateWayRateLimiter.h + * @author: octopus + * @date 2022-09-30 + */ + +#pragma once + +#include "bcos-utilities/BoostLog.h" +#include "bcos-utilities/Timer.h" +#include +#include +#include +#include + +namespace bcos +{ +namespace gateway +{ +namespace ratelimiter +{ + +class GateWayRateLimiter +{ +public: + using Ptr = std::shared_ptr; + using ConstPtr = std::shared_ptr; + using UniquePtr = std::unique_ptr; + +public: + GateWayRateLimiter(ratelimiter::RateLimiterManager::Ptr _rateLimiterManager, + ratelimiter::RateLimiterStat::Ptr _rateLimiterStat) + : m_rateLimiterManager(_rateLimiterManager), m_rateLimiterStat(_rateLimiterStat) + {} + + GateWayRateLimiter(GateWayRateLimiter&&) = delete; + GateWayRateLimiter(const GateWayRateLimiter&) = delete; + GateWayRateLimiter& operator=(const GateWayRateLimiter&) = delete; + GateWayRateLimiter& operator=(GateWayRateLimiter&&) = delete; + + ~GateWayRateLimiter() { stop(); } + +public: + void start() + { + if (m_running) + { + RATELIMIT_LOG(INFO) << LOG_DESC("GateWayRateLimiter is running"); + return; + } + m_running = true; + + auto statReporterInterval = m_rateLimiterManager->rateLimiterConfig().statReporterInterval; + auto statReporterLevel = m_rateLimiterManager->rateLimiterConfig().statReporterLevel; + if (statReporterLevel > 0) + { + m_rateLimiterStatTimer = + std::make_shared(statReporterInterval, "ratelimiter_reporter"); + auto rateLimiterStatTimer = m_rateLimiterStatTimer; + auto rateLimiterStat = m_rateLimiterStat; + + m_rateLimiterStatTimer->registerTimeoutHandler( + [rateLimiterStatTimer, statReporterInterval, rateLimiterStat]() { + auto io = rateLimiterStat->inAndOutStat(statReporterInterval); + GATEWAY_LOG(INFO) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.first); + GATEWAY_LOG(INFO) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.second); + rateLimiterStat->flushStat(); + rateLimiterStatTimer->restart(); + }); + } + + RATELIMIT_LOG(INFO) << LOG_DESC("GateWayRateLimiter start ok") + << LOG_KV("statReporterInterval", statReporterInterval) + << LOG_KV("statReporterLevel", statReporterLevel); + } + + void stop() + { + if (!m_running) + { + RATELIMIT_LOG(INFO) << LOG_DESC("GateWayRateLimiter has been stopped"); + return; + } + + m_running = false; + if (m_rateLimiterStatTimer) + { + m_rateLimiterStatTimer->stop(); + } + + RATELIMIT_LOG(INFO) << LOG_DESC("GateWayRateLimiter stop end"); + } + +public: + std::pair checkOutGoing(const std::string& _endpoint, + const std::string& _groupID, uint16_t _moduleID, uint64_t _msgLength); + + std::pair checkInComing( + const std::string& _groupID, uint16_t _moduleID, uint64_t _msgLength); + +private: + bool m_running = false; + + ratelimiter::RateLimiterManager::Ptr m_rateLimiterManager; + ratelimiter::RateLimiterStat::Ptr m_rateLimiterStat; + // the timer that periodically report the stat + std::shared_ptr m_rateLimiterStatTimer; +}; + +} // namespace ratelimiter +} // namespace gateway +} // namespace bcos \ No newline at end of file diff --git a/bcos-gateway/bcos-gateway/libratelimit/ModuleWhiteList.h b/bcos-gateway/bcos-gateway/libratelimit/ModuleWhiteList.h index 4bbe491ef8..535484f48b 100644 --- a/bcos-gateway/bcos-gateway/libratelimit/ModuleWhiteList.h +++ b/bcos-gateway/bcos-gateway/libratelimit/ModuleWhiteList.h @@ -41,7 +41,16 @@ enum ModuleID Raft = 1001, BlockSync = 2000, TxsSync = 2001, + ConsTxsSync = 2002, AMOP = 3000, + + LIGHTNODE_GETBLOCK = 4000, + LIGHTNODE_GETTRANSACTIONS, + LIGHTNODE_GETRECEIPTS, + LIGHTNODE_GETSTATUS, + LIGHTNODE_SENDTRANSACTION, + LIGHTNODE_CALL, + LIGHTNODE_END = 4999 }; */ diff --git a/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.cpp b/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.cpp index 967d24fe38..c5bc2f6916 100644 --- a/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.cpp +++ b/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * @file RateLimiterStatistics.cpp + * @file RateLimiterStat.cpp * @author: octopus * @date 2022-06-30 */ @@ -32,16 +32,16 @@ using namespace bcos; using namespace bcos::gateway; using namespace bcos::gateway::ratelimiter; -const std::string RateLimiterStatistics::TOTAL_INCOMING = " total "; -const std::string RateLimiterStatistics::TOTAL_OUTGOING = " total "; +const std::string RateLimiterStat::TOTAL_INCOMING = " total "; +const std::string RateLimiterStat::TOTAL_OUTGOING = " total "; -double Stat::calcAvgRate(int64_t _data, int64_t _periodMS) +double Stat::calcAvgRate(uint64_t _data, uint32_t _periodMS) { auto avgRate = (double)_data * 8 * 1000 / 1024 / 1024 / _periodMS; return avgRate; } -std::optional Stat::toString(const std::string& _prefix, int64_t _periodMS) +std::optional Stat::toString(const std::string& _prefix, uint32_t _periodMS) { if (lastDataSize.load() == 0) { @@ -52,35 +52,34 @@ std::optional Stat::toString(const std::string& _prefix, int64_t _p std::stringstream ss; - ss << " \t[" << _prefix << "] " << " \t" << " |total data: " << totalDataSize.load() << " |last data: " << lastDataSize.load() - << " |total count: " << totalCount.load() << " |last count: " << lastCount.load() - << " |total failed: " << totalFailedCount.load() - << " |last failed: " << lastFailedCount.load() << " |avg rate(Mb/s): "; + << " |total times: " << totalTimes.load() << " |last times: " << lastTimes.load() + << " |total failed times: " << totalFailedTimes.load() + << " |last failed times: " << lastFailedTimes.load() << " |avg rate(Mb/s): "; ss << std::fixed << std::setprecision(2) << avgRate; return ss.str(); } -std::string RateLimiterStatistics::toGroupKey(const std::string& _groupID) +std::string RateLimiterStat::toGroupKey(const std::string& _groupID) { return " group : " + _groupID; } -std::string RateLimiterStatistics::toModuleKey(uint16_t _moduleID) +std::string RateLimiterStat::toModuleKey(uint16_t _moduleID) { return " module : " + protocol::moduleIDToString((protocol::ModuleID)_moduleID); } -std::string RateLimiterStatistics::toEndPointKey(const std::string& _ep) +std::string RateLimiterStat::toEndPointKey(const std::string& _ep) { return " endpoint: " + _ep; } -void RateLimiterStatistics::updateInComing(const std::string& _endPoint, uint64_t _dataSize) +void RateLimiterStat::updateInComing(const std::string& _endPoint, uint64_t _dataSize) { std::string epKey = toEndPointKey(_endPoint); std::string totalKey = TOTAL_OUTGOING; @@ -100,8 +99,7 @@ void RateLimiterStatistics::updateInComing(const std::string& _endPoint, uint64_ epInStat.update(_dataSize); } -void RateLimiterStatistics::updateOutGoing( - const std::string& _endPoint, uint64_t _dataSize, bool suc) +void RateLimiterStat::updateOutGoing(const std::string& _endPoint, uint64_t _dataSize, bool suc) { std::string epKey = toEndPointKey(_endPoint); std::string totalKey = TOTAL_OUTGOING; @@ -126,7 +124,7 @@ void RateLimiterStatistics::updateOutGoing( } } -void RateLimiterStatistics::updateInComing( +void RateLimiterStat::updateInComing( const std::string& _groupID, uint16_t _moduleID, uint64_t _dataSize) { std::ignore = _moduleID; @@ -156,7 +154,7 @@ void RateLimiterStatistics::updateInComing( */ } -void RateLimiterStatistics::updateOutGoing( +void RateLimiterStat::updateOutGoing( const std::string& _groupID, uint16_t _moduleID, uint64_t _dataSize, bool suc) { std::ignore = _moduleID; @@ -199,7 +197,7 @@ void RateLimiterStatistics::updateOutGoing( */ } -void RateLimiterStatistics::flushStat() +void RateLimiterStat::flushStat() { { std::lock_guard l(m_inLock); @@ -218,7 +216,7 @@ void RateLimiterStatistics::flushStat() } } -std::pair RateLimiterStatistics::inAndOutStat(uint32_t _intervalMS) +std::pair RateLimiterStat::inAndOutStat(uint32_t _intervalMS) { std::string in = " :"; { diff --git a/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.h b/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.h index 3557767d9c..586f7ccf39 100644 --- a/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.h +++ b/bcos-gateway/bcos-gateway/libratelimit/RateLimiterStatistics.h @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * @file RateLimiterStatistics.h + * @file RateLimiterStat.h * @author: octopus * @date 2022-06-30 */ @@ -40,24 +40,24 @@ struct Stat std::atomic totalDataSize; std::atomic lastDataSize; - std::atomic totalCount; - std::atomic lastCount; + std::atomic totalTimes; + std::atomic lastTimes; - std::atomic totalFailedCount; - std::atomic lastFailedCount; + std::atomic totalFailedTimes; + std::atomic lastFailedTimes; public: void resetLast() { - lastCount = 0; + lastTimes = 0; lastDataSize = 0; - lastFailedCount = 0; + lastFailedTimes = 0; } void update(uint64_t _dataSize) { - totalCount++; - lastCount++; + totalTimes++; + lastTimes++; totalDataSize += _dataSize; lastDataSize += _dataSize; @@ -65,24 +65,24 @@ struct Stat void updateFailed() { - totalFailedCount++; - lastFailedCount++; + totalFailedTimes++; + lastFailedTimes++; } - double calcAvgRate(int64_t _data, int64_t _periodMS); + double calcAvgRate(uint64_t _data, uint32_t _periodMS); - std::optional toString(const std::string& _prefix, int64_t _periodMS); + std::optional toString(const std::string& _prefix, uint32_t _periodMS); }; -class RateLimiterStatistics +class RateLimiterStat { public: const static std::string TOTAL_INCOMING; const static std::string TOTAL_OUTGOING; public: - using Ptr = std::shared_ptr; - using ConstPtr = std::shared_ptr; + using Ptr = std::shared_ptr; + using ConstPtr = std::shared_ptr; public: void updateInComing(const std::string& _endPoint, uint64_t _dataSize);