Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 10 additions & 159 deletions bcos-gateway/bcos-gateway/Gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ using namespace bcos::crypto;

void Gateway::start()
{
if (m_gatewayRateLimiter)
{
m_gatewayRateLimiter->start();
}
if (m_p2pInterface)
{
m_p2pInterface->start();
Expand All @@ -50,10 +54,6 @@ void Gateway::start()
{
m_gatewayNodeManager->start();
}
if (m_rateLimiterStatisticsTimer)
{
m_rateLimiterStatisticsTimer->start();
}

GATEWAY_LOG(INFO) << LOG_DESC("start end.");

Expand All @@ -62,10 +62,6 @@ void Gateway::start()

void Gateway::stop()
{
if (m_rateLimiterStatisticsTimer)
{
m_rateLimiterStatisticsTimer->stop();
}
// erase the registered handler
if (m_p2pInterface)
{
Expand All @@ -81,6 +77,10 @@ void Gateway::stop()
{
m_gatewayNodeManager->stop();
}
if (m_gatewayRateLimiter)
{
m_gatewayRateLimiter->stop();
}
GATEWAY_LOG(INFO) << LOG_DESC("stop end.");
return;
}
Expand Down Expand Up @@ -437,155 +437,6 @@ bool Gateway::checkGroupInfo(bcos::group::GroupInfo::Ptr _groupInfo)
return true;
}

bool Gateway::checkBWRateLimit(ratelimiter::RateLimiterManager::Ptr _rateLimiterManager,
const std::string& _endPoint, const std::string& _groupID, uint16_t _moduleID,
uint64_t _msgLength, SessionCallbackFunc _callback)
{
// endpoint of the p2p connection
const std::string& endPoint = _endPoint;
// group of the message, empty string means the message is p2p's own message
const std::string& groupID = _groupID;
// moduleID of the message, zero means the message is p2p's own message
uint16_t moduleID = _moduleID;
// the length of the message
uint64_t msgLength = _msgLength;

std::string errorMsg;
do
{
// total outgoing bandwidth
ratelimiter::RateLimiterInterface::Ptr totalOutGoingBWLimit =
_rateLimiterManager->getRateLimiter(
ratelimiter::RateLimiterManager::TOTAL_OUTGOING_KEY);

// connection outgoing bandwidth
ratelimiter::RateLimiterInterface::Ptr connOutGoingBWLimit =
_rateLimiterManager->getConnRateLimiter(endPoint);

// group outgoing bandwidth
ratelimiter::RateLimiterInterface::Ptr groupOutGoingBWLimit = nullptr;
if (!groupID.empty())
{
groupOutGoingBWLimit = _rateLimiterManager->getGroupRateLimiter(groupID);
}

auto modulesWithNoBwLimit = _rateLimiterManager->modulesWithNoBwLimit();

// if moduleID is zero, the P2P network itself's message, the ratelimiter does not limit
// P2P own's messages
if (moduleID == 0)
{
if (totalOutGoingBWLimit)
{
totalOutGoingBWLimit->tryAcquire(msgLength);
}

if (connOutGoingBWLimit)
{
connOutGoingBWLimit->tryAcquire(msgLength);
}
}
// if moduleID is not zero, the message comes from the front
// There are two scenarios:
// 1. ulimit module message rate or
// 2. limit module message rate
else if (modulesWithNoBwLimit.count(moduleID))
{ // case 1: ulimit module message rate or, just for statistic

if (totalOutGoingBWLimit)
{
totalOutGoingBWLimit->tryAcquire(msgLength);
}

if (connOutGoingBWLimit)
{
connOutGoingBWLimit->tryAcquire(msgLength);
}

if (groupOutGoingBWLimit)
{
groupOutGoingBWLimit->tryAcquire(msgLength);
}
}
else
{ // case 2: limit module message rate

if (totalOutGoingBWLimit && !totalOutGoingBWLimit->tryAcquire(msgLength))
{
// total outgoing bandwidth overflow
errorMsg = "the network total outgoing bandwidth overflow";
break;
}

if (connOutGoingBWLimit && !connOutGoingBWLimit->tryAcquire(msgLength))
{
// connection outgoing bandwidth overflow
errorMsg =
"the network connection outgoing bandwidth overflow, endpoint: " + endPoint;
if (totalOutGoingBWLimit)
{
totalOutGoingBWLimit->rollback(msgLength);
}

break;
}

if (groupOutGoingBWLimit && !groupOutGoingBWLimit->tryAcquire(msgLength))
{
// group outgoing bandwidth overflow
errorMsg = "the group outgoing bandwidth overflow, groupID: " + groupID;
if (totalOutGoingBWLimit)
{
totalOutGoingBWLimit->rollback(msgLength);
}

if (connOutGoingBWLimit)
{
connOutGoingBWLimit->rollback(msgLength);
}

break;
}
}

m_rateLimiterStatistics->updateOutGoing(endPoint, msgLength, true);
m_rateLimiterStatistics->updateOutGoing(groupID, moduleID, msgLength, true);

return true;
} while (0);

m_rateLimiterStatistics->updateOutGoing(endPoint, msgLength, false);
m_rateLimiterStatistics->updateOutGoing(groupID, moduleID, msgLength, false);

// TODO: use thread pool
if (_callback)
{
_callback(NetworkException(BandwidthOverFlow, errorMsg), Message::Ptr());
}

return false;
}

// gateway bw check
bool Gateway::checkBWRateLimit(
SessionFace::Ptr _session, Message::Ptr _msg, SessionCallbackFunc _callback)
{
GatewayMessageExtAttributes::Ptr msgExtAttributes = nullptr;
if (_msg->extAttributes())
{
msgExtAttributes =
std::dynamic_pointer_cast<GatewayMessageExtAttributes>(_msg->extAttributes());
}

std::string groupID = msgExtAttributes ? msgExtAttributes->groupID() : std::string();
uint16_t moduleID = msgExtAttributes ? msgExtAttributes->moduleID() : 0;
std::string endPoint = _session->nodeIPEndpoint().address();
uint64_t msgLength = _msg->length();

return checkBWRateLimit(
m_rateLimiterManager, endPoint, groupID, moduleID, msgLength, _callback);
}

void Gateway::asyncNotifyGroupInfo(
bcos::group::GroupInfo::Ptr _groupInfo, std::function<void(Error::Ptr&&)> _callback)
{
Expand Down Expand Up @@ -641,7 +492,7 @@ void Gateway::onReceiveP2PMessage(
}
*/

m_rateLimiterStatistics->updateInComing(groupID, moduleID, _msg->length());
m_gatewayRateLimiter->checkInComing(groupID, moduleID, _msg->length());

auto srcNodeID = options->srcNodeID();
const auto& dstNodeIDs = options->dstNodeIDs();
Expand Down Expand Up @@ -689,7 +540,7 @@ void Gateway::onReceiveBroadcastMessage(
// moduleID
uint16_t moduleID = options->moduleID();

m_rateLimiterStatistics->updateInComing(groupID, moduleID, _msg->length());
m_gatewayRateLimiter->checkInComing(groupID, moduleID, _msg->length());

/*
// TODO: if outgoing bandwidth exceeds the upper limit
Expand Down
43 changes: 4 additions & 39 deletions bcos-gateway/bcos-gateway/Gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#pragma once

#include "bcos-gateway/libratelimit/GateWayRateLimiter.h"
#include <bcos-framework/front/FrontServiceInterface.h>
#include <bcos-framework/gateway/GatewayInterface.h>
#include <bcos-gateway/Common.h>
Expand All @@ -40,16 +41,14 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this<Gat
using Ptr = std::shared_ptr<Gateway>;
Gateway(std::string const& _chainID, P2PInterface::Ptr _p2pInterface,
GatewayNodeManager::Ptr _gatewayNodeManager, bcos::amop::AMOPImpl::Ptr _amop,
ratelimiter::RateLimiterManager::Ptr _rateLimiterManager,
ratelimiter::RateLimiterStatistics::Ptr _rateLimiterStatistics,
ratelimiter::GateWayRateLimiter::Ptr _gatewayRateLimiter,
std::string _gatewayServiceName = "localGateway")
: m_gatewayServiceName(_gatewayServiceName),
m_chainID(_chainID),
m_p2pInterface(_p2pInterface),
m_gatewayNodeManager(_gatewayNodeManager),
m_amop(_amop),
m_rateLimiterManager(_rateLimiterManager),
m_rateLimiterStatistics(_rateLimiterStatistics)
m_gatewayRateLimiter(_gatewayRateLimiter)
{
m_p2pInterface->registerHandlerByMsgType(GatewayMessageType::PeerToPeerMessage,
boost::bind(&Gateway::onReceiveP2PMessage, this, boost::placeholders::_1,
Expand All @@ -58,20 +57,6 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this<Gat
m_p2pInterface->registerHandlerByMsgType(GatewayMessageType::BroadcastMessage,
boost::bind(&Gateway::onReceiveBroadcastMessage, this, boost::placeholders::_1,
boost::placeholders::_2, boost::placeholders::_3));

m_rateLimiterStatisticsTimer =
std::make_shared<Timer>(m_rateLimiterStatisticsPeriodMS, "ratelimiter_reporter");
auto rateLimiterStatisticsTimer = m_rateLimiterStatisticsTimer;
auto _rateLimiterStatisticsPeriodMS = m_rateLimiterStatisticsPeriodMS;
m_rateLimiterStatisticsTimer->registerTimeoutHandler(
[rateLimiterStatisticsTimer, _rateLimiterStatisticsPeriodMS, _rateLimiterStatistics,
_rateLimiterManager]() {
auto io = _rateLimiterStatistics->inAndOutStat(_rateLimiterStatisticsPeriodMS);
GATEWAY_LOG(DEBUG) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.first);
GATEWAY_LOG(DEBUG) << LOG_DESC("\n [ratelimiter stat]") << LOG_DESC(io.second);
_rateLimiterStatistics->flushStat();
rateLimiterStatisticsTimer->restart();
});
}
~Gateway() override { stop(); }

Expand Down Expand Up @@ -193,19 +178,6 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this<Gat
return m_gatewayNodeManager->unregisterNode(_groupID, _nodeID);
}

// gateway traffic limiting policy impl
bool checkBWRateLimit(ratelimiter::RateLimiterManager::Ptr _rateLimiterManager,
const std::string& _endPoint, const std::string& _groupID, uint16_t _moduleID,
uint64_t _msgLength, SessionCallbackFunc _callback);
bool checkBWRateLimit(
SessionFace::Ptr _session, Message::Ptr _msg, SessionCallbackFunc _callback);

uint32_t rateLimiterStatisticsPeriodMS() const { return m_rateLimiterStatisticsPeriodMS; }
void setRateLimiterStatisticsPeriodMS(uint32_t _rateLimiterStatisticsPeriodMS)
{
m_rateLimiterStatisticsPeriodMS = _rateLimiterStatisticsPeriodMS;
}

protected:
// for UT
Gateway() {}
Expand Down Expand Up @@ -234,15 +206,8 @@ class Gateway : public GatewayInterface, public std::enable_shared_from_this<Gat
GatewayNodeManager::Ptr m_gatewayNodeManager;
bcos::amop::AMOPImpl::Ptr m_amop;

// For bandwidth limitation
ratelimiter::RateLimiterManager::Ptr m_rateLimiterManager;
// For bandwidth statistics
ratelimiter::RateLimiterStatistics::Ptr m_rateLimiterStatistics;

//
uint32_t m_rateLimiterStatisticsPeriodMS = 60000; // ms
// the timer that periodically prints the rate
std::shared_ptr<Timer> m_rateLimiterStatisticsTimer;
ratelimiter::GateWayRateLimiter::Ptr m_gatewayRateLimiter;
};
} // namespace gateway
} // namespace bcos
8 changes: 8 additions & 0 deletions bcos-gateway/bcos-gateway/GatewayConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt)
[flow_control]
; rate limiter stat reporter interval, unit: ms
; stat_reporter_interval=60000
;
; rate limiter stat reporter info level, default: 1
; stat_reporter_level=1

; the module that does not limit bandwidth
; list of all modules: raft,pbft,amop,block_sync,txs_sync,light_node,cons_txs_sync
Expand Down Expand Up @@ -395,6 +398,9 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt)
// stat_reporter_interval=60000
int32_t statReporterInterval = _pt.get<int32_t>("flow_control.stat_reporter_interval", 60000);

// stat_reporter_level=1
int32_t statReporterLevel = _pt.get<int32_t>("flow_control.stat_reporter_level", 1);

// modules_without_bw_limit=raft,pbft
std::string strNoLimitModules =
_pt.get<std::string>("flow_control.modules_without_bw_limit", "raft,pbft,cons_txs_sync");
Expand Down Expand Up @@ -517,6 +523,7 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt)
}

m_rateLimiterConfig.statReporterInterval = statReporterInterval;
m_rateLimiterConfig.statReporterLevel = statReporterLevel;
m_rateLimiterConfig.modulesWithNoBwLimit = moduleIDs;
m_rateLimiterConfig.totalOutgoingBwLimit = totalOutgoingBwLimit;
m_rateLimiterConfig.connOutgoingBwLimit = connOutgoingBwLimit;
Expand All @@ -542,6 +549,7 @@ void GatewayConfig::initRatelimitConfig(const boost::property_tree::ptree& _pt)
<< LOG_KV("rateLimiterConfigEffect",
m_rateLimiterConfig.hasRateLimiterConfigEffect())
<< LOG_KV("statReporterInterval", statReporterInterval)
<< LOG_KV("statReporterLevel", statReporterLevel)
<< LOG_KV("totalOutgoingBwLimit", totalOutgoingBwLimit)
<< LOG_KV("connOutgoingBwLimit", connOutgoingBwLimit)
<< LOG_KV("groupOutgoingBwLimit", groupOutgoingBwLimit)
Expand Down
15 changes: 14 additions & 1 deletion bcos-gateway/bcos-gateway/GatewayConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,22 @@ class GatewayConfig
std::string enNodeKey;
};

// config for redis
struct RedisConfig
{
std::string redisServerIP = "127.0.0.1";
uint16_t redisServerPort = 6379;
int32_t redisTimeOut = -1;
int32_t redisPoolSize = 16;
};

// config for rate limit
struct RateLimiterConfig
{
// report interval ms
// stat reporter interval, unit: ms
int32_t statReporterInterval = 60000;
// stat reporter info level, default: 1
uint32_t statReporterLevel = 1;

// total outgoing bandwidth limit
int64_t totalOutgoingBwLimit = -1;
Expand Down Expand Up @@ -126,6 +137,7 @@ class GatewayConfig
CertConfig certConfig() const { return m_certConfig; }
SMCertConfig smCertConfig() const { return m_smCertConfig; }
RateLimiterConfig rateLimiterConfig() const { return m_rateLimiterConfig; }
RedisConfig redisConfig() const { return m_redisConfig; }

const std::set<NodeIPEndpoint>& connectedNodes() const { return m_connectedNodes; }

Expand All @@ -149,6 +161,7 @@ class GatewayConfig
SMCertConfig m_smCertConfig;

RateLimiterConfig m_rateLimiterConfig;
RedisConfig m_redisConfig;

std::string m_certPath;
std::string m_nodePath;
Expand Down
Loading