From 8a2bd39a2a1877f733a1b8a2162afb228c9f6469 Mon Sep 17 00:00:00 2001 From: Diogo Baeder de Paula Pinto Date: Tue, 29 Sep 2015 16:18:57 -0300 Subject: [PATCH 1/5] Fixing tests; preparing for tests with tox; preparing for next release --- .gitignore | 3 ++- pystatsd/__init__.py | 2 +- tests/__init__.py | 3 +-- tests/{client.py => test_client.py} | 4 ++-- tests/{server.py => test_server.py} | 0 5 files changed, 6 insertions(+), 6 deletions(-) rename tests/{client.py => test_client.py} (97%) rename tests/{server.py => test_server.py} (100%) diff --git a/.gitignore b/.gitignore index 253051e..dfc90ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ -*.pyc +*.py[cdo] build dist *.egg-info *.log +.tox/ diff --git a/pystatsd/__init__.py b/pystatsd/__init__.py index 15b65b2..a0691e1 100644 --- a/pystatsd/__init__.py +++ b/pystatsd/__init__.py @@ -1,4 +1,4 @@ from .statsd import Client from .server import Server -VERSION = (0, 1, 10) +VERSION = (0, 2, 0) diff --git a/tests/__init__.py b/tests/__init__.py index 84ed79a..792d600 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1 @@ -from .client import * -from .server import * +# diff --git a/tests/client.py b/tests/test_client.py similarity index 97% rename from tests/client.py rename to tests/test_client.py index ef8636e..fe64394 100644 --- a/tests/client.py +++ b/tests/test_client.py @@ -79,9 +79,9 @@ def test_basic_client_update_stats_multi(self): self.client.update_stats(stats, 5) for stat, value in data.items(): - stat_str = stat + value + stat_str = '{}:{}'.format(stat, value) # thanks tos9 in #python for 'splaining the return_value bit. - self.mock_socket.return_value.sendto.assert_call_any( + self.mock_socket.return_value.sendto.assert_any_call( bytes(stat_str, 'utf-8'), self.addr) def test_basic_client_timing(self): diff --git a/tests/server.py b/tests/test_server.py similarity index 100% rename from tests/server.py rename to tests/test_server.py From 0f9c0df4ff237d6d88e39e1d55e5490a65607556 Mon Sep 17 00:00:00 2001 From: Diogo Baeder de Paula Pinto Date: Tue, 29 Sep 2015 17:06:58 -0300 Subject: [PATCH 2/5] Fixing for Python 3 --- pystatsd/server.py | 16 ++++++++++------ statsd_test.py | 20 +++++++++++++++++--- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/pystatsd/server.py b/pystatsd/server.py index 40118c8..7d07ef9 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -49,7 +49,7 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite', ganglia_host='localhost', ganglia_port=8649, ganglia_spoof_host='statsd:statsd', gmetric_exec='/usr/bin/gmetric', gmetric_options = '-d', - graphite_host='localhost', graphite_port=2003, global_prefix=None, + graphite_host='localhost', graphite_port=2003, global_prefix=None, flush_interval=10000, no_aggregate_counters=False, counters_prefix='stats', timers_prefix='stats.timers', expire=0): @@ -156,7 +156,8 @@ def flush(self): elif self.transport == 'ganglia': g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) - for k, (v, t) in self.counters.items(): + items = tuple(self.counters.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring counter %s (age: %s)" % (k, ts -t)) @@ -182,7 +183,8 @@ def flush(self): del(self.counters[k]) stats += 1 - for k, (v, t) in self.gauges.items(): + items = tuple(self.gauges.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring gauge %s (age: %s)" % (k, ts - t)) @@ -204,7 +206,8 @@ def flush(self): stats += 1 - for k, (v, t) in self.timers.items(): + items = tuple(self.timers.items()) + for k, (v, t) in items: if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring timer %s (age: %s)" % (k, ts - t)) @@ -312,10 +315,11 @@ def signal_handler(signal, frame): self._set_timer() while 1: data, addr = self._sock.recvfrom(self.buf) + data = data.decode('utf-8') try: self.process(data) except Exception as error: - log.error("Bad data from %s: %s",addr,error) + log.error("Bad data from %s: %s", addr, error) def stop(self): @@ -364,7 +368,7 @@ def run_server(): # Use gmetric parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric") parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60") - # + # parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000) parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true') parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) diff --git a/statsd_test.py b/statsd_test.py index 12bb37c..5d5139b 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -1,13 +1,27 @@ #!/usr/bin/env python +import time +from multiprocessing import Process + from pystatsd import Client, Server -sc = Client('localhost', 8125) +def worker(): + srvr = Server(debug=True, flush_interval=500) + srvr.serve() + + +p = Process(target=worker, daemon=False) +p.start() +time.sleep(1) + + +sc = Client('localhost', 8125) sc.timing('python_test.time', 500) sc.increment('python_test.inc_int') sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) -srvr = Server(debug=True) -srvr.serve() + +time.sleep(2) +p.terminate() From dea3f35a359dcb1d87c74841b1df36fb7dbf4c2a Mon Sep 17 00:00:00 2001 From: Diogo Baeder de Paula Pinto Date: Tue, 29 Sep 2015 17:07:14 -0300 Subject: [PATCH 3/5] Adding tox.ini --- tox.ini | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 tox.ini diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..72dac54 --- /dev/null +++ b/tox.ini @@ -0,0 +1,6 @@ +[tox] +envlist = py27,py34 + +[testenv] +deps=-rrequirements.txt +commands=nosetests tests From a713344bf94060a78b99e100f96b2e66666e5120 Mon Sep 17 00:00:00 2001 From: Diogo Baeder de Paula Pinto Date: Wed, 30 Sep 2015 01:00:33 -0300 Subject: [PATCH 4/5] Adding graphite-udp transport --- Makefile | 4 +++- README.md | 17 ++++++++++++++- pystatsd/server.py | 52 +++++++++++++++++++++++++++++++--------------- statsd_test.py | 8 ++++--- 4 files changed, 59 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index fbc3854..75dad46 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ - pypi: python setup.py sdist upload + +test: + tox diff --git a/README.md b/README.md index eaed58b..9c81799 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ server, a front end/proxy for the Graphite stats collection and graphing server. - code: https://github.com/etsy/statsd - blog post: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/ -**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.5, 2.6, and 2.7. +**pystatsd** has [been tested on](http://travis-ci.org/sivy/py-statsd) python 2.7 and 3.4. Status ------------- @@ -38,6 +38,14 @@ See statsd_test for sample usage: sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) +or, to run the server from a simple command, if you have the package installed: + + $ pystatsd-server + +Check out the options for the command for more information: + + $ pystatsd-server --help + Building a Debian Package ------------- @@ -55,6 +63,13 @@ will read configuration variables from /etc/default/pystatsd. By default the pystatsd daemon runs as user 'nobody' which is a good thing from a security perspective. +Developing +------------- + +Make sure you have `tox` installed, then just run the tests: + + $ make test + Troubleshooting ------------- diff --git a/pystatsd/server.py b/pystatsd/server.py index 7d07ef9..9187e77 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -42,6 +42,12 @@ def _clean_key(k): %(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s ''' +GRAPHITE = 'graphite' +GRAPHITE_UDP = 'graphite-udp' +GRAPHITE_TRANSPORTS = (GRAPHITE, GRAPHITE_UDP) +GANGLIA = 'ganglia' +GANGLIA_GMETRIC = 'ganglia-gmetric' + class Server(object): @@ -151,9 +157,9 @@ def flush(self): ts = int(time.time()) stats = 0 - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string = '' - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) items = tuple(self.counters.items()) @@ -169,14 +175,14 @@ def flush(self): if self.debug: print("Sending %s => count=%s" % (k, v)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) stat_string += msg - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: # We put counters in _counters group. Underscore is to make sure counters show up # first in the GUI. Change below if you disagree g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: self.send_to_ganglia_using_gmetric(k,v, "_counters", "count") # Clear the counter once the data is sent @@ -195,13 +201,13 @@ def flush(self): if self.debug: print("Sending %s => value=%s" % (k, v)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: # note: counters and gauges implicitly end up in the same namespace msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) stat_string += msg - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: self.send_to_ganglia_using_gmetric(k,v, "_gauges", "gauge") stats += 1 @@ -235,7 +241,7 @@ def flush(self): print("Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" \ % (k, min, mean, max, self.pct_threshold, max_threshold, count)) - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string += TIMER_MSG % { 'prefix': self.timers_prefix, @@ -249,7 +255,7 @@ def flush(self): 'ts': ts, } - elif self.transport == 'ganglia': + elif self.transport == GANGLIA: # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like # 3521 k ms which is 3.521 seconds # What group should these metrics be in. For the time being we'll set it to the name of the key @@ -259,7 +265,7 @@ def flush(self): g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host) g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': + elif self.transport == GANGLIA_GMETRIC: # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like # 3521 k ms which is 3.521 seconds group = k @@ -271,7 +277,7 @@ def flush(self): stats += 1 - if self.transport == 'graphite': + if self.transport in GRAPHITE_TRANSPORTS: stat_string += "statsd.numStats %s %d\n" % (stats, ts) @@ -281,11 +287,8 @@ def flush(self): '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] ]) - graphite = socket.socket() try: - graphite.connect((self.graphite_host, self.graphite_port)) - graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) - graphite.close() + self._flush_graphite(stat_string) except socket.error as e: log.error("Error communicating with Graphite: %s" % e) if self.debug: @@ -295,6 +298,19 @@ def flush(self): print("\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" \ % (stats)) + def _flush_graphite(self, stat_string): + message = bytes(bytearray(stat_string, "utf-8")) + + if self.transport == GRAPHITE: + graphite = socket.socket() + graphite.connect((self.graphite_host, self.graphite_port)) + graphite.sendall(message) + graphite.close() + else: + host = socket.gethostbyname(self.graphite_host) + graphite = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + graphite.sendto(message, (host, self.graphite_port)) + def _set_timer(self): self._timer = threading.Timer(self.flush_interval / 1000, self.on_timer) self._timer.daemon = True @@ -358,7 +374,9 @@ def run_server(): parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False) parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='') parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125) - parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite") + parser.add_argument('-r', '--transport', dest='transport', + help='transport to use: graphite (TCP), graphite-udp (UDP), ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', + type=str, default=GRAPHITE) parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003) parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost') # Uses embedded Ganglia Library diff --git a/statsd_test.py b/statsd_test.py index 5d5139b..eb4e38e 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -7,11 +7,13 @@ def worker(): - srvr = Server(debug=True, flush_interval=500) - srvr.serve() + #srvr = Server(debug=True, flush_interval=500) + #srvr.serve() + pass -p = Process(target=worker, daemon=False) +#p = Process(target=worker, daemon=False) +p = Process(target=worker) p.start() time.sleep(1) From 8d5d69e3d79da1b4d0275a3e7d400a284bbf6193 Mon Sep 17 00:00:00 2001 From: Diogo Baeder de Paula Pinto Date: Wed, 30 Sep 2015 01:03:42 -0300 Subject: [PATCH 5/5] Fixing example script --- statsd_test.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/statsd_test.py b/statsd_test.py index eb4e38e..5d5139b 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -7,13 +7,11 @@ def worker(): - #srvr = Server(debug=True, flush_interval=500) - #srvr.serve() - pass + srvr = Server(debug=True, flush_interval=500) + srvr.serve() -#p = Process(target=worker, daemon=False) -p = Process(target=worker) +p = Process(target=worker, daemon=False) p.start() time.sleep(1)