Skip to content

Commit

Permalink
Merge pull request #957 from bucko909/flow-control
Browse files Browse the repository at this point in the history
 #956 Flow control fixes
  • Loading branch information
deniszh authored Feb 2, 2025
2 parents eb345a5 + dbf2925 commit aa7974e
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 24 deletions.
63 changes: 44 additions & 19 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,33 @@ DESTINATIONS = 127.0.0.1:2004

# This is the maximum number of datapoints that can be queued up
# for a single destination. Once this limit is hit, we will
# stop accepting new data if USE_FLOW_CONTROL is True, otherwise
# we will drop any subsequently received datapoints.
# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and
# internally-generated datapoints will still be processed, and data
# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE
# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped
# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused.
MAX_QUEUE_SIZE = 10000

# This is the factor that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# This is the factor of the max length of a queue before data will be dropped
# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data
# is still processed, which can send a queue slightly over the configured max.
MAX_QUEUE_SIZE_HARD_PCT = 1.25

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE.
USE_FLOW_CONTROL = True

# This defines the maximum "message size" between carbon daemons. If
# your queue is large, setting this to a lower number will cause the
# relay to forward smaller discrete chunks of stats, which may prevent
Expand All @@ -509,26 +532,11 @@ MAX_DATAPOINTS_PER_MESSAGE = 500
# If this is blank carbon-relay runs as the user that invokes it
# USER =

# This is the percentage that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# To allow for batch efficiency from the pickle protocol and to benefit from
# other batching advantages, all writes are deferred by putting them into a queue,
# and then the queue is flushed and sent a small fraction of a second later.
TIME_TO_DEFER_SENDING = 0.0001

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE.
USE_FLOW_CONTROL = True

# If enabled this setting is used to timeout metric client connection if no
# metrics have been sent in specified time in seconds
#METRIC_CLIENT_IDLE_TIMEOUT = None
Expand Down Expand Up @@ -640,10 +648,27 @@ REPLICATION_FACTOR = 1

# This is the maximum number of datapoints that can be queued up
# for a single destination. Once this limit is hit, we will
# stop accepting new data if USE_FLOW_CONTROL is True, otherwise
# we will drop any subsequently received datapoints.
# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and
# internally-generated datapoints will still be processed, and data
# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE
# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped
# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused.
MAX_QUEUE_SIZE = 10000

# This is the factor that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# This is the factor of the max length of a queue before data will be dropped
# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data
# is still processed, which can send a queue slightly over the configured max.
MAX_QUEUE_SIZE_HARD_PCT = 1.25

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
Expand Down
13 changes: 12 additions & 1 deletion lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def watermarks(self):

@property
def is_full(self):
if settings.CACHE_SIZE_HARD_MAX == float('inf'):
return False
else:
return self.size >= settings.CACHE_SIZE_HARD_MAX

@property
def is_nearly_full(self):
if settings.MAX_CACHE_SIZE == float('inf'):
return False
else:
Expand Down Expand Up @@ -252,8 +259,12 @@ def store(self, metric, datapoint):
# Not a duplicate, hence process if cache is not full
if self.is_full:
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
events.cacheOverflow()
else:
if self.is_nearly_full:
# This will disable reading when flow control is enabled
log.msg("MetricCache is nearly full: self.size=%d" % self.size)
events.cacheFull()
if not self[metric]:
self.new_metrics.append(metric)
self.size += 1
Expand Down
9 changes: 8 additions & 1 deletion lib/carbon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@


SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT
if settings.USE_FLOW_CONTROL:
SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE * settings.MAX_QUEUE_SIZE_HARD_PCT
else:
SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE


class CarbonClientProtocol(object):
Expand Down Expand Up @@ -350,7 +354,10 @@ def sendDatapoint(self, metric, datapoint):
if self.queueSize >= settings.MAX_QUEUE_SIZE:
if not self.queueFull.called:
self.queueFull.callback(self.queueSize)
instrumentation.increment(self.fullQueueDrops)
if self.queueSize < SEND_QUEUE_HARD_MAX:
self.enqueue(metric, datapoint)
else:
instrumentation.increment(self.fullQueueDrops)
else:
self.enqueue(metric, datapoint)

Expand Down
7 changes: 6 additions & 1 deletion lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
MAX_DATAPOINTS_PER_MESSAGE=500,
MAX_AGGREGATION_INTERVALS=5,
FORWARD_ALL=True,
MAX_QUEUE_SIZE=1000,
MAX_QUEUE_SIZE=10000,
QUEUE_LOW_WATERMARK_PCT=0.8,
MAX_QUEUE_SIZE_HARD_PCT=1.25,
TIME_TO_DEFER_SENDING=0.0001,
ENABLE_AMQP=False,
AMQP_METRIC_NAME_IN_BODY=False,
Expand Down Expand Up @@ -297,6 +298,10 @@ def cleanpath(path):
state.database = database_class(settings)

settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
if settings.USE_FLOW_CONTROL:
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05
else:
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE

if "action" not in self:
self["action"] = "start"
Expand Down
3 changes: 2 additions & 1 deletion lib/carbon/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs):

metricReceived = Event('metricReceived')
metricGenerated = Event('metricGenerated')
cacheOverflow = Event('cacheOverflow')
cacheFull = Event('cacheFull')
cacheSpaceAvailable = Event('cacheSpaceAvailable')
pauseReceivingMetrics = Event('pauseReceivingMetrics')
Expand All @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs):
lambda metric, datapoint: state.instrumentation.increment('metricsReceived'))


cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True))
cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False))

Expand Down
12 changes: 12 additions & 0 deletions lib/carbon/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,21 @@ def setupAggregatorProcessor(root_service, settings):
"aggregation processor: file does not exist {0}".format(aggregation_rules_path))
RuleManager.read_from(aggregation_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRewriterProcessor(root_service, settings):
from carbon.rewrite import RewriteRuleManager

rewrite_rules_path = settings["rewrite-rules"]
RewriteRuleManager.read_from(rewrite_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRelayProcessor(root_service, settings):
from carbon.routers import DatapointRouter
Expand All @@ -193,6 +201,10 @@ def setupRelayProcessor(root_service, settings):
for destination in util.parseDestinations(settings.DESTINATIONS):
state.client_manager.startClient(destination)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupWriterProcessor(root_service, settings):
from carbon import cache # NOQA Register CacheFeedingProcessor
Expand Down
4 changes: 4 additions & 0 deletions lib/carbon/tests/benchmark_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
NaiveStrategy, MaxStrategy, RandomStrategy, SortedStrategy, \
TimeSortedStrategy, BucketMaxStrategy

from carbon.conf import settings

settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05

metric_cache = _MetricCache(DrainStrategy)
count = 0
Expand Down
30 changes: 29 additions & 1 deletion lib/carbon/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
Expand Down Expand Up @@ -67,6 +68,13 @@ def test_store_checks_fullness(self):
def test_store_on_full_triggers_events(self):
is_full_mock = PropertyMock(return_value=True)
with patch.object(_MetricCache, 'is_full', is_full_mock):
with patch('carbon.cache.events') as events_mock:
self.metric_cache.store('foo', (123456, 1.0))
events_mock.cacheOverflow.assert_called_with()

def test_store_on_nearly_full_triggers_events(self):
is_nearly_full_mock = PropertyMock(return_value=True)
with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock):
with patch('carbon.cache.events') as events_mock:
self.metric_cache.store('foo', (123456, 1.0))
events_mock.cacheFull.assert_called_with()
Expand Down Expand Up @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self):
size_mock.assert_not_called()

def test_is_full(self):
self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0
self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0
self._settings_patch.start()
with patch('carbon.cache.events'):
self.assertFalse(self.metric_cache.is_full)
Expand Down Expand Up @@ -178,8 +186,18 @@ def test_counts_multiple_datapoints(self):

class DrainStrategyTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
self._settings_patch.start()
self.metric_cache = _MetricCache()

def tearDown(self):
self._settings_patch.stop()

def test_bucketmax_strategy(self):
bucketmax_strategy = BucketMaxStrategy(self.metric_cache)
self.metric_cache.strategy = bucketmax_strategy
Expand Down Expand Up @@ -303,8 +321,18 @@ def test_time_sorted_strategy_min_lag(self):

class RandomStrategyTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
self._settings_patch.start()
self.metric_cache = _MetricCache()

def tearDown(self):
self._settings_patch.stop()

def test_random_strategy(self):
self.metric_cache.store('foo', (123456, 1.0))
self.metric_cache.store('bar', (123457, 2.0))
Expand Down

0 comments on commit aa7974e

Please sign in to comment.