Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.pyc
*.py[cdo]
build
dist
*.egg-info
*.log
.tox/
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@

pypi:
python setup.py sdist upload

test:
tox
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ server, a front end/proxy for the Graphite stats collection and graphing server.
- code: https://github.com/etsy/statsd
- blog post: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/

**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.5, 2.6, and 2.7.
**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.7 and 3.4.

Status
-------------
Expand Down Expand Up @@ -38,6 +38,14 @@ See statsd_test for sample usage:
sc.decrement('python_test.decr_int')
sc.gauge('python_test.gauge', 42)

or, to run the server from a simple command, if you have the package installed:

$ pystatsd-server

Check out the options for the command for more information:

$ pystatsd-server --help


Building a Debian Package
-------------
Expand All @@ -55,6 +63,13 @@ will read configuration variables from /etc/default/pystatsd. By default the
pystatsd daemon runs as user 'nobody' which is a good thing from a security
perspective.

Developing
-------------

Make sure you have `tox` installed, then just run the tests:

$ make test

Troubleshooting
-------------

Expand Down
2 changes: 1 addition & 1 deletion pystatsd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .statsd import Client
from .server import Server

VERSION = (0, 1, 10)
VERSION = (0, 2, 0)
68 changes: 45 additions & 23 deletions pystatsd/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ def _clean_key(k):
%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s
'''

GRAPHITE = 'graphite'
GRAPHITE_UDP = 'graphite-udp'
GRAPHITE_TRANSPORTS = (GRAPHITE, GRAPHITE_UDP)
GANGLIA = 'ganglia'
GANGLIA_GMETRIC = 'ganglia-gmetric'


class Server(object):

def __init__(self, pct_threshold=90, debug=False, transport='graphite',
ganglia_host='localhost', ganglia_port=8649,
ganglia_spoof_host='statsd:statsd',
gmetric_exec='/usr/bin/gmetric', gmetric_options = '-d',
graphite_host='localhost', graphite_port=2003, global_prefix=None,
graphite_host='localhost', graphite_port=2003, global_prefix=None,
flush_interval=10000,
no_aggregate_counters=False, counters_prefix='stats',
timers_prefix='stats.timers', expire=0):
Expand Down Expand Up @@ -151,12 +157,13 @@ def flush(self):
ts = int(time.time())
stats = 0

if self.transport == 'graphite':
if self.transport in GRAPHITE_TRANSPORTS:
stat_string = ''
elif self.transport == 'ganglia':
elif self.transport == GANGLIA:
g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol)

for k, (v, t) in self.counters.items():
items = tuple(self.counters.items())
for k, (v, t) in items:
if self.expire > 0 and t + self.expire < ts:
if self.debug:
print("Expiring counter %s (age: %s)" % (k, ts -t))
Expand All @@ -168,21 +175,22 @@ def flush(self):
if self.debug:
print("Sending %s => count=%s" % (k, v))

if self.transport == 'graphite':
if self.transport in GRAPHITE_TRANSPORTS:
msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts)
stat_string += msg
elif self.transport == 'ganglia':
elif self.transport == GANGLIA:
# We put counters in _counters group. Underscore is to make sure counters show up
# first in the GUI. Change below if you disagree
g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host)
elif self.transport == 'ganglia-gmetric':
elif self.transport == GANGLIA_GMETRIC:
self.send_to_ganglia_using_gmetric(k,v, "_counters", "count")

# Clear the counter once the data is sent
del(self.counters[k])
stats += 1

for k, (v, t) in self.gauges.items():
items = tuple(self.gauges.items())
for k, (v, t) in items:
if self.expire > 0 and t + self.expire < ts:
if self.debug:
print("Expiring gauge %s (age: %s)" % (k, ts - t))
Expand All @@ -193,18 +201,19 @@ def flush(self):
if self.debug:
print("Sending %s => value=%s" % (k, v))

if self.transport == 'graphite':
if self.transport in GRAPHITE_TRANSPORTS:
# note: counters and gauges implicitly end up in the same namespace
msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts)
stat_string += msg
elif self.transport == 'ganglia':
elif self.transport == GANGLIA:
g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host)
elif self.transport == 'ganglia-gmetric':
elif self.transport == GANGLIA_GMETRIC:
self.send_to_ganglia_using_gmetric(k,v, "_gauges", "gauge")

stats += 1

for k, (v, t) in self.timers.items():
items = tuple(self.timers.items())
for k, (v, t) in items:
if self.expire > 0 and t + self.expire < ts:
if self.debug:
print("Expiring timer %s (age: %s)" % (k, ts - t))
Expand Down Expand Up @@ -232,7 +241,7 @@ def flush(self):
print("Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" \
% (k, min, mean, max, self.pct_threshold, max_threshold, count))

if self.transport == 'graphite':
if self.transport in GRAPHITE_TRANSPORTS:

stat_string += TIMER_MSG % {
'prefix': self.timers_prefix,
Expand All @@ -246,7 +255,7 @@ def flush(self):
'ts': ts,
}

elif self.transport == 'ganglia':
elif self.transport == GANGLIA:
# We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like
# 3521 k ms which is 3.521 seconds
# What group should these metrics be in. For the time being we'll set it to the name of the key
Expand All @@ -256,7 +265,7 @@ def flush(self):
g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host)
g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host)
g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host)
elif self.transport == 'ganglia-gmetric':
elif self.transport == GANGLIA_GMETRIC:
# We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like
# 3521 k ms which is 3.521 seconds
group = k
Expand All @@ -268,7 +277,7 @@ def flush(self):

stats += 1

if self.transport == 'graphite':
if self.transport in GRAPHITE_TRANSPORTS:

stat_string += "statsd.numStats %s %d\n" % (stats, ts)

Expand All @@ -278,11 +287,8 @@ def flush(self):
'%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1]
])

graphite = socket.socket()
try:
graphite.connect((self.graphite_host, self.graphite_port))
graphite.sendall(bytes(bytearray(stat_string, "utf-8")))
graphite.close()
self._flush_graphite(stat_string)
except socket.error as e:
log.error("Error communicating with Graphite: %s" % e)
if self.debug:
Expand All @@ -292,6 +298,19 @@ def flush(self):
print("\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" \
% (stats))

def _flush_graphite(self, stat_string):
message = bytes(bytearray(stat_string, "utf-8"))

if self.transport == GRAPHITE:
graphite = socket.socket()
graphite.connect((self.graphite_host, self.graphite_port))
graphite.sendall(message)
graphite.close()
else:
host = socket.gethostbyname(self.graphite_host)
graphite = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
graphite.sendto(message, (host, self.graphite_port))

def _set_timer(self):
self._timer = threading.Timer(self.flush_interval / 1000, self.on_timer)
self._timer.daemon = True
Expand All @@ -312,10 +331,11 @@ def signal_handler(signal, frame):
self._set_timer()
while 1:
data, addr = self._sock.recvfrom(self.buf)
data = data.decode('utf-8')
try:
self.process(data)
except Exception as error:
log.error("Bad data from %s: %s",addr,error)
log.error("Bad data from %s: %s", addr, error)


def stop(self):
Expand Down Expand Up @@ -354,7 +374,9 @@ def run_server():
parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False)
parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='')
parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125)
parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite")
parser.add_argument('-r', '--transport', dest='transport',
help='transport to use: graphite (TCP), graphite-udp (UDP), ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)',
type=str, default=GRAPHITE)
parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003)
parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost')
# Uses embedded Ganglia Library
Expand All @@ -364,7 +386,7 @@ def run_server():
# Use gmetric
parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric")
parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60")
#
#
parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000)
parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true')
parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None)
Expand Down
20 changes: 17 additions & 3 deletions statsd_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
#!/usr/bin/env python

import time
from multiprocessing import Process

from pystatsd import Client, Server

sc = Client('localhost', 8125)

def worker():
srvr = Server(debug=True, flush_interval=500)
srvr.serve()


p = Process(target=worker, daemon=False)
p.start()
time.sleep(1)


sc = Client('localhost', 8125)
sc.timing('python_test.time', 500)
sc.increment('python_test.inc_int')
sc.decrement('python_test.decr_int')
sc.gauge('python_test.gauge', 42)

srvr = Server(debug=True)
srvr.serve()

time.sleep(2)
p.terminate()
3 changes: 1 addition & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .client import *
from .server import *
#
4 changes: 2 additions & 2 deletions tests/client.py → tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def test_basic_client_update_stats_multi(self):
self.client.update_stats(stats, 5)

for stat, value in data.items():
stat_str = stat + value
stat_str = '{}:{}'.format(stat, value)
# thanks tos9 in #python for 'splaining the return_value bit.
self.mock_socket.return_value.sendto.assert_call_any(
self.mock_socket.return_value.sendto.assert_any_call(
bytes(stat_str, 'utf-8'), self.addr)

def test_basic_client_timing(self):
Expand Down
File renamed without changes.
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tox]
envlist = py27,py34

[testenv]
deps=-rrequirements.txt
commands=nosetests tests