From efc970acf4ad7773bd1f04d80d7e869436b7455f Mon Sep 17 00:00:00 2001 From: Hannes Doyle Date: Fri, 17 Mar 2017 08:52:02 +0100 Subject: [PATCH] Basic functionality working --- .gitignore | 94 +++++++++++++++++++++++ README.md | 43 +++++++++++ bin/gerrit-event-broker | 46 +++++++++++ bin/gerrit-event-client | 54 +++++++++++++ examples/config.yaml | 16 ++++ gerritevents/__init__.py | 26 +++++++ gerritevents/broker.py | 86 +++++++++++++++++++++ gerritevents/client.py | 160 +++++++++++++++++++++++++++++++++++++++ gerritevents/launcher.py | 105 +++++++++++++++++++++++++ requirements.txt | 4 + setup.py | 0 11 files changed, 634 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100755 bin/gerrit-event-broker create mode 100755 bin/gerrit-event-client create mode 100644 examples/config.yaml create mode 100644 gerritevents/__init__.py create mode 100644 gerritevents/broker.py create mode 100644 gerritevents/client.py create mode 100644 gerritevents/launcher.py create mode 100644 requirements.txt create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c538041 --- /dev/null +++ b/.gitignore @@ -0,0 +1,94 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..63850d4 --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +============ +gerritevents +============ + +Fetch git-bare repos from gerrit when they are updated and propagated to the mirrors. + +Run broker with:: + + $ gerrit-event-broker --config-file ./config.yaml --log-file ./gerrit-events-broker.log + +Run client with:: + + $ gerrit-event-client --config-file ./config.yaml --log-file ./gerrit-events-clients.log + +Why +=== + +Instead of having a lot of servers connected to the ssh port of gerrit and +listening for the same event, let them subscribe to a ZeroMQ socket where the +update is published. Primarly intended to have the amount of connections to +the gerrit server kept to a minimum. + +How +=== + +Gerritevents consists of 2 applications, one listening for gerrit +stream-events through SSH and is intended to run as a systemd service. +Example file of this will be found in examples folder soon(tm). +A predefined subset of stream-events will be broadcasted on a ZeroMQ PUBLISH +socket. + +The second application connects to the broker application with ZeroMQ +SUBSCRIBE socket. When receiving a message the client makes a lookup in its +config if the repo is defined. If it's defined it will create a subprocess +with git-fetch. + +Requirements +============ + + * aiozmq (sending messages between broker and client) + * pyzmq + * asyncssh (connection to gerrit server) + * PyYAML (for configuration) \ No newline at end of file diff --git a/bin/gerrit-event-broker b/bin/gerrit-event-broker new file mode 100755 index 0000000..aa4b35a --- /dev/null +++ b/bin/gerrit-event-broker @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 + +import asyncio +import argparse + +from gerritevents.launcher import CliLauncher +from gerritevents.broker import GerritEventBroker +from gerritevents import Log + +log = Log(__name__) + +DESC = """ +Connects to a gerrit servers SSH port and listens to all events. +""" + + +class GerritEventBrokerLauncher(CliLauncher): + def setup_parser(self): + super().setup_parser(description=DESC) + + def run(self): + gerrit = GerritEventBroker( + server=self.config['broker']['gerritmaster'], + events=self.config['broker']['events'], + zmq_port=self.config['broker']['port']) + + tasks = [ + asyncio.ensure_future(gerrit.ssh_connection()), + asyncio.ensure_future(gerrit.message_handler()), + asyncio.ensure_future(gerrit.zmq_publisher()), + asyncio.ensure_future(gerrit.zmq_keepalive()) + ] + + loop = asyncio.get_event_loop() + try: + # We want the application to die when we no longer receive a heartbeat + # This is so that systemd can restart it for us + loop.run_until_complete( + asyncio.wait(tasks, return_when='FIRST_COMPLETED')) + finally: + loop.close() + + +if __name__ == '__main__': + gerritlauncher = GerritEventBrokerLauncher() + gerritlauncher.start() \ No newline at end of file diff --git a/bin/gerrit-event-client b/bin/gerrit-event-client new file mode 100755 index 0000000..88a57dd --- /dev/null +++ b/bin/gerrit-event-client @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 + +import asyncio +import zmq +import zmq.asyncio +import argparse + +from gerritevents.launcher import CliLauncher +from gerritevents.client import GerritEventSubscriber +from gerritevents import Log + +log = Log(__name__) + +DESC = """ +Listener for gerrit-event-broker which in turns listen to events from gerrit. +""" + + +class GerritEventClientLauncher(CliLauncher): + def setup_parser(self): + super().setup_parser(description=DESC) + + def run(self): + # Starting event loop, running with ZMQEventLoop since zmq library requires + # it and running with aiozmq as a subscriber is too troublesome with too + # little benefit. + event_loop = zmq.asyncio.ZMQEventLoop() + asyncio.set_event_loop(event_loop) + loop = asyncio.get_event_loop() + + subscriber = GerritEventSubscriber( + loop, + git_binary=self.config['main']['git'], + brokers=self.config['client']['brokers'], + repos=self.config['client']['repos']) + + tasks = [ + asyncio.ensure_future(subscriber.zmq_subscriber()), + asyncio.ensure_future(subscriber.dispatcher()), + asyncio.ensure_future(subscriber.heartbeat()) + ] + + try: + # We want the application to die when we no longer receive a heartbeat + # This is so that systemd can restart it for us + loop.run_until_complete( + asyncio.wait(tasks, return_when='FIRST_COMPLETED')) + finally: + loop.close() + + +if __name__ == '__main__': + gerritlauncher = GerritEventClientLauncher() + gerritlauncher.start() \ No newline at end of file diff --git a/examples/config.yaml b/examples/config.yaml new file mode 100644 index 0000000..0d63498 --- /dev/null +++ b/examples/config.yaml @@ -0,0 +1,16 @@ +main: + git: '//git' +client: + brokers: + - 'tcp://:' + repos: + : + path: '//' + origin: 'ssh://:29418/' + refs: '+refs/*:refs/*' +broker: + gerritmaster: '' + port: '' + events: + - 'ref-replication-done' + # - 'ref-replication' \ No newline at end of file diff --git a/gerritevents/__init__.py b/gerritevents/__init__.py new file mode 100644 index 0000000..e75c3b1 --- /dev/null +++ b/gerritevents/__init__.py @@ -0,0 +1,26 @@ +import logging + +class Log: + def __init__(self, name): + self.logger = logging.getLogger(name) + + def log(self, lvl, event=None, **kwargs): + assert (event) + msg_items = ['event=%s' % event] + \ + ['%s="%s"' % (k, v) for k, v in kwargs.items()] + self.logger.log(lvl, ' '.join(msg_items)) + + def debug(self, **kwargs): + self.log(logging.DEBUG, **kwargs) + + def info(self, **kwargs): + self.log(logging.INFO, **kwargs) + + def warning(self, **kwargs): + self.log(logging.WARNING, **kwargs) + + def error(self, **kwargs): + self.log(logging.ERROR, **kwargs) + + def critical(self, **kwargs): + self.log(logging.CRITICAL, **kwargs) \ No newline at end of file diff --git a/gerritevents/broker.py b/gerritevents/broker.py new file mode 100644 index 0000000..df92d2f --- /dev/null +++ b/gerritevents/broker.py @@ -0,0 +1,86 @@ +import asyncio +import asyncssh +import aiozmq +import zmq +import sys +import json + +from gerritevents import Log + +log = Log(__name__) + + +class MySSHClientSession(asyncssh.SSHClientSession): + def data_received(self, data, datatype): + log.debug(event='data.received', message=data) + GerritEventBroker.raw_messages.put_nowait(data) + + def connection_lost(self, exc): + if exc: + log.error(event='connection.lost', message=exc) + + +class GerritEventBroker(): + + raw_messages = asyncio.Queue() + + def __init__(self, server='', port=29418, events=[], zmq_port=''): + self.server = server + self.port = port + self.events = events + self.zmq_port = zmq_port + self.messages = asyncio.Queue() + self.publisher = None + + async def ssh_connection(self): + """ + Connect to gerrit server and stream events to messages queue + """ + log.info(event='ssh.connection', message='starting') + async with asyncssh.connect(self.server, port=self.port) as conn: + chan, session = await conn.create_session(MySSHClientSession, + 'gerrit stream-events') + await chan.wait_closed() + log.error(event='ssh.connection', message='terminating application') + self.publisher.close() + sys.exit(1) + + + async def message_handler(self): + """ + Handle and parse all events from the 'gerrit stream-events' to match + our filters + """ + while True: + raw_data = await self.raw_messages.get() + data = json.loads(raw_data) + if data['type'] in self.events: + self.messages.put_nowait([data['type'], data['project']]) + + async def zmq_publisher(self): + """ + Create a publisher server which is feeding the stream with all events + occuring in the self.messages Queue + """ + log.info(event='zmq.publisher', message='starting') + self.publisher = await aiozmq.create_zmq_stream( + zmq.PUB, bind='tcp://*:' + self.zmq_port) + while True: + data = await self.messages.get() + msg = (b'gerritstream', str(data[0]).encode('utf-8'), + str(data[1]).encode('utf-8')) + log.debug(event='zmq.publisher', message=msg) + self.publisher.write(msg) + + async def zmq_keepalive(self): + """ + Keep-alive and rudimentary heartbeat (client will simply be informed + when the server dies and can do alternative syncs meanwhile) + """ + while True: + self.messages.put_nowait(['keepalive', 'ping']) + await asyncio.sleep(10) + + +if __name__ == '__main__': + print('override me!') diff --git a/gerritevents/client.py b/gerritevents/client.py new file mode 100644 index 0000000..a93a8d0 --- /dev/null +++ b/gerritevents/client.py @@ -0,0 +1,160 @@ +import asyncio +import zmq +import zmq.asyncio +import sys +import argparse +import logging + +from gerritevents import Log + +log = Log(__name__) + + +class GerritEventSubscriber: + def __init__(self, loop, git_binary=None, brokers=None, repos=None): + self.loop = loop + self.git_binary = git_binary + self.brokers = brokers + self.ctx = zmq.asyncio.Context() + self.beat = asyncio.Queue() + self.message_queue = asyncio.Queue() + self.sock = self.ctx.socket(zmq.SUB) + self.repos = repos + self.running_fetch = [] + self.re_schedule_fetch = [] + + async def heartbeat(self): + """ + Very simple heartbeat handler. Will make the client stop if no beats + are received during a 60 sec time frame + """ + log.info(event='heartbeat', message='starting heartbeat') + counter = 0 + while True: + try: + log.debug(event='heartbeat', message='received') + self.beat.get_nowait() + counter = 0 + except: + log.warning( + event='heartbeat', message='missed', counter=counter) + counter += 1 + if counter >= 5: + msg = 'too many beats missed! shutting down!' + log.error(event='heartbeat', message=msg) + # Closing our zmq socket so we get a clean exit + self.sock.close() + break + await asyncio.sleep(10) + + async def dispatcher(self): + """ + server that listens for queue events where it schedule git fetches + """ + log.info(event='dispatcher', message='starting') + while True: + repo = await self.message_queue.get() + log.debug( + event='dispatcher', message='new dispatch', repository=repo) + if repo not in self.running_fetch: + log.debug( + event='dispatcher', + message='scheduling fetch', + repository=repo) + self.running_fetch.append(repo) + asyncio.ensure_future(self.fetch_git(repo)) + else: + if repo not in self.re_schedule_fetch: + log.debug( + event='dispatcher', + message='scheduling re-fetch', + repository=repo) + self.re_schedule_fetch.append(repo) + asyncio.ensure_future(self.re_dispatcher(repo)) + else: + log.debug( + event='dispatcher', + message='already in re-fetch', + repository=repo) + + async def re_dispatcher(self, repo): + """ + Sorting out rescheduling of git fetches when an instance is alread + running + """ + while True: + if repo not in self.running_fetch: + log.debug( + event='re.dispatcher', + message='scheduling fetch', + repository=repo) + self.running_fetch.append(repo) + self.re_schedule_fetch.remove(repo) + await self.fetch_git(repo) + break + log.debug( + event='re.dispatcher', + message='waiting for timeslot', + repository=repo) + await asyncio.sleep(1) + + async def fetch_git(self, repo): + """ + Subprocess that updates the git repo + """ + log.debug(event='fetch.git', message='starting', repository=repo) + proc = await asyncio.create_subprocess_exec( + self.git_binary, + '-C', + self.repos[repo]['path'], + 'fetch', + self.repos[repo]['origin'], + self.repos[repo]['refs'], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + stdout = await proc.stdout.read() + stderr = await proc.stderr.read() + await proc.wait() + log.info(event='fetch.git', message='completed', repository=repo) + log.debug( + event='fetch.git', + message='output', + repository=repo, + stdout=stdout.decode('ascii').rstrip(), + stderr=stderr.decode('ascii').rstrip()) + self.running_fetch.remove(repo) + + async def zmq_subscriber(self): + """ + Subscribe to events from configured ZMQ brokers + """ + log.info(event='zmq.subscriber', message='starting') + self.sock.setsockopt(zmq.SUBSCRIBE, b'gerritstream') + for broker in self.brokers: + self.sock.connect(broker) + while True: + answer = await self.sock.recv_multipart() + message = [x.decode('utf-8') for x in answer[1:]] + if message[0] == 'keepalive': + self.beat.put_nowait(message[1]) + elif message[0] == 'ref-replication-done': + if message[1] in self.repos.keys(): + log.debug( + event='zmq.subscriber', + message='placing in queue', + repository=message[1]) + self.message_queue.put_nowait(message[1]) + else: + log.debug( + event='zmq.subscriber', + message='unknown repo', + repository=message[1]) + else: + log.debug( + event='zmq.subscriber', + message='unknown message type', + payload=message) + + +if __name__ == '__main__': + print('you should override me!') diff --git a/gerritevents/launcher.py b/gerritevents/launcher.py new file mode 100644 index 0000000..da35598 --- /dev/null +++ b/gerritevents/launcher.py @@ -0,0 +1,105 @@ +import logging +import logging.handlers +import argparse +import sys +import os +import yaml + +from gerritevents import Log + +log = Log(__name__) +version = '0.0.1' + + +class Launcher: + LOG_ROTATE = 8 # Days + + def __init__(self, log_file=None, debug=False, config_file=None): + self._log_file = log_file + self._debug = debug + self._config_file = config_file + self.config = None + + # Use canonical paths + if self._log_file: + self._log_file = os.path.realpath(self._log_file) + + def _setup_logging(self): + # Top-level logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + # Log format + log_formatter = logging.Formatter( + '%(asctime)s %(process)d %(levelname)s %(name)s %(message)s') + + # Rotating file logger + if self._log_file: + try: + log_rotate = logging.handlers.TimedRotatingFileHandler( + self._log_file, + when='midnight', + backupCount=self.LOG_ROTATE) + except IOError as e: + log.error(event='logfile.error', error=e, path=self._log_file) + sys.exit(1) + log_rotate.setFormatter(log_formatter) + if self._debug: + log_rotate.setLevel(logging.DEBUG) + else: + log_rotate.setLevel(logging.INFO) + logger.addHandler(log_rotate) + + def _setup_config(self): + self.config = yaml.load(self._config_file) + + def start(self): + """Launch application""" + # Setup logging + self._setup_logging() + self._setup_config() + + # Application status + success = None + + # Exit code + exit_code = 255 + + success = self.run() + + if success is True: + exit_code = 0 + elif success is False: + exit_code = 1 + return exit_code + + def run(self): + """Override this method to start the application logic""" + pass + + +class CliLauncher(Launcher): + def __init__(self, raw_args=None): + self.setup_parser() + + self.args = self.parser.parse_args(args=raw_args) + + super().__init__( + debug=self.args.debug, + log_file=self.args.log_file, + config_file=self.args.config_file) + + def setup_parser(self, description=None): + self.parser = argparse.ArgumentParser(description=description) + + # FIXME: version should be added. + self.parser.add_argument( + '-V', '--version', action='version', version=version) + self.parser.add_argument( + '--debug', action='store_true', help='turn on debug logging') + self.parser.add_argument( + '--config-file', + type=argparse.FileType('r'), + help="YAML formated configuration file") + self.parser.add_argument( + '--log-file', metavar='PATH', help='log file path') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c9d7d84 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +aiozmq>=0.7.1 +asyncssh>=1.9.0 +PyYAML>=3.12 +pyzmq>=16.0.2 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e69de29