diff --git a/.gitignore b/.gitignore index 253051e..dfc90ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ -*.pyc +*.py[cdo] build dist *.egg-info *.log +.tox/ diff --git a/Makefile b/Makefile index fbc3854..75dad46 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ - pypi: python setup.py sdist upload + +test: + tox diff --git a/README.md b/README.md index eaed58b..9c81799 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ server, a front end/proxy for the Graphite stats collection and graphing server. - code: https://github.com/etsy/statsd - blog post: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/ -**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.5, 2.6, and 2.7. +**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.7 and 3.4. Status ------------- @@ -38,6 +38,14 @@ See statsd_test for sample usage: sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) +or, to run the server from a simple command, if you have the package installed: + + $ pystatsd-server + +Check out the options for the command for more information: + + $ pystatsd-server --help + Building a Debian Package ------------- @@ -55,6 +63,13 @@ will read configuration variables from /etc/default/pystatsd. By default the pystatsd daemon runs as user 'nobody' which is a good thing from a security perspective. +Developing +------------- + +Make sure you have `tox` installed, then just run the tests: + + $ make test + Troubleshooting ------------- diff --git a/pystatsd/__init__.py b/pystatsd/__init__.py index 15b65b2..a0691e1 100644 --- a/pystatsd/__init__.py +++ b/pystatsd/__init__.py @@ -1,4 +1,4 @@ from .statsd import Client from .server import Server -VERSION = (0, 1, 10) +VERSION = (0, 2, 0) diff --git a/pystatsd/server.py b/pystatsd/server.py index 40118c8..9187e77 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -42,6 +42,12 @@ def _clean_key(k): %(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s ''' +GRAPHITE = 'graphite' +GRAPHITE_UDP = 'graphite-udp' +GRAPHITE_TRANSPORTS = (GRAPHITE, GRAPHITE_UDP) +GANGLIA = 'ganglia' +GANGLIA_GMETRIC = 'ganglia-gmetric' + class Server(object): @@ -49,7 +55,7 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite', ganglia_host='localhost', ganglia_port=8649, ganglia_spoof_host='statsd:statsd', gmetric_exec='/usr/bin/gmetric', gmetric_options = '-d', - graphite_host='localhost', graphite_port=2003, global_prefix=None, + graphite_host='localhost', graphite_port=2003, global_prefix=None, flush_interval=10000, no_aggregate_counters=False, counters_prefix='stats', timers_prefix='stats.timers', expire=0): @@ -151,12 +157,13 @@ def flush(self): ts = int(time.time()) stats = 0 - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string = '' - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) - for k, (v, t) in self.counters.items(): + items = tuple(self.counters.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring counter %s (age: %s)" % (k, ts -t)) @@ -168,21 +175,22 @@ def flush(self): if self.debug: print("Sending %s => count=%s" % (k, v)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) stat_string += msg - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: # We put counters in _counters group. Underscore is to make sure counters show up # first in the GUI. Change below if you disagree g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: self.send_to_ganglia_using_gmetric(k,v, "_counters", "count") # Clear the counter once the data is sent del(self.counters[k]) stats += 1 - for k, (v, t) in self.gauges.items(): + items = tuple(self.gauges.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring gauge %s (age: %s)" % (k, ts - t)) @@ -193,18 +201,19 @@ def flush(self): if self.debug: print("Sending %s => value=%s" % (k, v)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: # note: counters and gauges implicitly end up in the same namespace msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) stat_string += msg - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: self.send_to_ganglia_using_gmetric(k,v, "_gauges", "gauge") stats += 1 - for k, (v, t) in self.timers.items(): + items = tuple(self.timers.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring timer %s (age: %s)" % (k, ts - t)) @@ -232,7 +241,7 @@ def flush(self): print("Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" \ % (k, min, mean, max, self.pct_threshold, max_threshold, count)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string += TIMER_MSG % { 'prefix': self.timers_prefix, @@ -246,7 +255,7 @@ def flush(self): 'ts': ts, } - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like # 3521 k ms which is 3.521 seconds # What group should these metrics be in. For the time being we'll set it to the name of the key @@ -256,7 +265,7 @@ def flush(self): g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host) g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like # 3521 k ms which is 3.521 seconds group = k @@ -268,7 +277,7 @@ def flush(self): stats += 1 - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string += "statsd.numStats %s %d\n" % (stats, ts) @@ -278,11 +287,8 @@ def flush(self): '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] ]) - graphite = socket.socket() try: - graphite.connect((self.graphite_host, self.graphite_port)) - graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) - graphite.close() + self._flush_graphite(stat_string) except socket.error as e: log.error("Error communicating with Graphite: %s" % e) if self.debug: @@ -292,6 +298,19 @@ def flush(self): print("\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" \ % (stats)) + def _flush_graphite(self, stat_string): + message = bytes(bytearray(stat_string, "utf-8")) + + if self.transport == GRAPHITE: + graphite = socket.socket() + graphite.connect((self.graphite_host, self.graphite_port)) + graphite.sendall(message) + graphite.close() + else: + host = socket.gethostbyname(self.graphite_host) + graphite = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + graphite.sendto(message, (host, self.graphite_port)) + def _set_timer(self): self._timer = threading.Timer(self.flush_interval / 1000, self.on_timer) self._timer.daemon = True @@ -312,10 +331,11 @@ def signal_handler(signal, frame): self._set_timer() while 1: data, addr = self._sock.recvfrom(self.buf) + data = data.decode('utf-8') try: self.process(data) except Exception as error: - log.error("Bad data from %s: %s",addr,error) + log.error("Bad data from %s: %s", addr, error) def stop(self): @@ -354,7 +374,9 @@ def run_server(): parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False) parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='') parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125) - parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite") + parser.add_argument('-r', '--transport', dest='transport', + help='transport to use: graphite (TCP), graphite-udp (UDP), ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', + type=str, default=GRAPHITE) parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003) parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost') # Uses embedded Ganglia Library @@ -364,7 +386,7 @@ def run_server(): # Use gmetric parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric") parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60") - # + # parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000) parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true') parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) diff --git a/statsd_test.py b/statsd_test.py index 12bb37c..5d5139b 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -1,13 +1,27 @@ #!/usr/bin/env python +import time +from multiprocessing import Process + from pystatsd import Client, Server -sc = Client('localhost', 8125) +def worker(): + srvr = Server(debug=True, flush_interval=500) + srvr.serve() + + +p = Process(target=worker, daemon=False) +p.start() +time.sleep(1) + + +sc = Client('localhost', 8125) sc.timing('python_test.time', 500) sc.increment('python_test.inc_int') sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) -srvr = Server(debug=True) -srvr.serve() + +time.sleep(2) +p.terminate() diff --git a/tests/__init__.py b/tests/__init__.py index 84ed79a..792d600 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1 @@ -from .client import * -from .server import * +# diff --git a/tests/client.py b/tests/test_client.py similarity index 97% rename from tests/client.py rename to tests/test_client.py index ef8636e..fe64394 100644 --- a/tests/client.py +++ b/tests/test_client.py @@ -79,9 +79,9 @@ def test_basic_client_update_stats_multi(self): self.client.update_stats(stats, 5) for stat, value in data.items(): - stat_str = stat + value + stat_str = '{}:{}'.format(stat, value) # thanks tos9 in #python for 'splaining the return_value bit. - self.mock_socket.return_value.sendto.assert_call_any( + self.mock_socket.return_value.sendto.assert_any_call( bytes(stat_str, 'utf-8'), self.addr) def test_basic_client_timing(self): diff --git a/tests/server.py b/tests/test_server.py similarity index 100% rename from tests/server.py rename to tests/test_server.py diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..72dac54 --- /dev/null +++ b/tox.ini @@ -0,0 +1,6 @@ +[tox] +envlist = py27,py34 + +[testenv] +deps=-rrequirements.txt +commands=nosetests tests