Skip to content

Commit

Permalink
Basic functionality working
Browse files Browse the repository at this point in the history
  • Loading branch information
Hannes Doyle committed Mar 17, 2017
1 parent e142979 commit efc970a
Show file tree
Hide file tree
Showing 11 changed files with 634 additions and 0 deletions.
94 changes: 94 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# dotenv
.env

# virtualenv
.venv
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
============
gerritevents
============

Fetch git-bare repos from gerrit when they are updated and propagated to the mirrors.

Run broker with::

$ gerrit-event-broker --config-file ./config.yaml --log-file ./gerrit-events-broker.log

Run client with::

$ gerrit-event-client --config-file ./config.yaml --log-file ./gerrit-events-clients.log

Why
===

Instead of having a lot of servers connected to the ssh port of gerrit and
listening for the same event, let them subscribe to a ZeroMQ socket where the
update is published. Primarly intended to have the amount of connections to
the gerrit server kept to a minimum.

How
===

Gerritevents consists of 2 applications, one listening for gerrit
stream-events through SSH and is intended to run as a systemd service.
Example file of this will be found in examples folder soon(tm).
A predefined subset of stream-events will be broadcasted on a ZeroMQ PUBLISH
socket.

The second application connects to the broker application with ZeroMQ
SUBSCRIBE socket. When receiving a message the client makes a lookup in its
config if the repo is defined. If it's defined it will create a subprocess
with git-fetch.

Requirements
============

* aiozmq (sending messages between broker and client)
* pyzmq
* asyncssh (connection to gerrit server)
* PyYAML (for configuration)
46 changes: 46 additions & 0 deletions bin/gerrit-event-broker
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3

import asyncio
import argparse

from gerritevents.launcher import CliLauncher
from gerritevents.broker import GerritEventBroker
from gerritevents import Log

log = Log(__name__)

DESC = """
Connects to a gerrit servers SSH port and listens to all events.
"""


class GerritEventBrokerLauncher(CliLauncher):
def setup_parser(self):
super().setup_parser(description=DESC)

def run(self):
gerrit = GerritEventBroker(
server=self.config['broker']['gerritmaster'],
events=self.config['broker']['events'],
zmq_port=self.config['broker']['port'])

tasks = [
asyncio.ensure_future(gerrit.ssh_connection()),
asyncio.ensure_future(gerrit.message_handler()),
asyncio.ensure_future(gerrit.zmq_publisher()),
asyncio.ensure_future(gerrit.zmq_keepalive())
]

loop = asyncio.get_event_loop()
try:
# We want the application to die when we no longer receive a heartbeat
# This is so that systemd can restart it for us
loop.run_until_complete(
asyncio.wait(tasks, return_when='FIRST_COMPLETED'))
finally:
loop.close()


if __name__ == '__main__':
gerritlauncher = GerritEventBrokerLauncher()
gerritlauncher.start()
54 changes: 54 additions & 0 deletions bin/gerrit-event-client
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3

import asyncio
import zmq
import zmq.asyncio
import argparse

from gerritevents.launcher import CliLauncher
from gerritevents.client import GerritEventSubscriber
from gerritevents import Log

log = Log(__name__)

DESC = """
Listener for gerrit-event-broker which in turns listen to events from gerrit.
"""


class GerritEventClientLauncher(CliLauncher):
def setup_parser(self):
super().setup_parser(description=DESC)

def run(self):
# Starting event loop, running with ZMQEventLoop since zmq library requires
# it and running with aiozmq as a subscriber is too troublesome with too
# little benefit.
event_loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(event_loop)
loop = asyncio.get_event_loop()

subscriber = GerritEventSubscriber(
loop,
git_binary=self.config['main']['git'],
brokers=self.config['client']['brokers'],
repos=self.config['client']['repos'])

tasks = [
asyncio.ensure_future(subscriber.zmq_subscriber()),
asyncio.ensure_future(subscriber.dispatcher()),
asyncio.ensure_future(subscriber.heartbeat())
]

try:
# We want the application to die when we no longer receive a heartbeat
# This is so that systemd can restart it for us
loop.run_until_complete(
asyncio.wait(tasks, return_when='FIRST_COMPLETED'))
finally:
loop.close()


if __name__ == '__main__':
gerritlauncher = GerritEventClientLauncher()
gerritlauncher.start()
16 changes: 16 additions & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
main:
git: '/<path-to-git-binary>/git'
client:
brokers:
- 'tcp://<broker-ip>:<broker-port>'
repos:
<repo1>:
path: '/<path>/<bare-repo.git>'
origin: 'ssh://<user@<host>:29418/<repo>'
refs: '+refs/*:refs/*'
broker:
gerritmaster: '<hostname>'
port: '<port-number>'
events:
- 'ref-replication-done'
# - 'ref-replication'
26 changes: 26 additions & 0 deletions gerritevents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

class Log:
def __init__(self, name):
self.logger = logging.getLogger(name)

def log(self, lvl, event=None, **kwargs):
assert (event)
msg_items = ['event=%s' % event] + \
['%s="%s"' % (k, v) for k, v in kwargs.items()]
self.logger.log(lvl, ' '.join(msg_items))

def debug(self, **kwargs):
self.log(logging.DEBUG, **kwargs)

def info(self, **kwargs):
self.log(logging.INFO, **kwargs)

def warning(self, **kwargs):
self.log(logging.WARNING, **kwargs)

def error(self, **kwargs):
self.log(logging.ERROR, **kwargs)

def critical(self, **kwargs):
self.log(logging.CRITICAL, **kwargs)
86 changes: 86 additions & 0 deletions gerritevents/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import asyncssh
import aiozmq
import zmq
import sys
import json

from gerritevents import Log

log = Log(__name__)


class MySSHClientSession(asyncssh.SSHClientSession):
def data_received(self, data, datatype):
log.debug(event='data.received', message=data)
GerritEventBroker.raw_messages.put_nowait(data)

def connection_lost(self, exc):
if exc:
log.error(event='connection.lost', message=exc)


class GerritEventBroker():

raw_messages = asyncio.Queue()

def __init__(self, server='', port=29418, events=[], zmq_port=''):
self.server = server
self.port = port
self.events = events
self.zmq_port = zmq_port
self.messages = asyncio.Queue()
self.publisher = None

async def ssh_connection(self):
"""
Connect to gerrit server and stream events to messages queue
"""
log.info(event='ssh.connection', message='starting')
async with asyncssh.connect(self.server, port=self.port) as conn:
chan, session = await conn.create_session(MySSHClientSession,
'gerrit stream-events')
await chan.wait_closed()
log.error(event='ssh.connection', message='terminating application')
self.publisher.close()
sys.exit(1)


async def message_handler(self):
"""
Handle and parse all events from the 'gerrit stream-events' to match
our filters
"""
while True:
raw_data = await self.raw_messages.get()
data = json.loads(raw_data)
if data['type'] in self.events:
self.messages.put_nowait([data['type'], data['project']])

async def zmq_publisher(self):
"""
Create a publisher server which is feeding the stream with all events
occuring in the self.messages Queue
"""
log.info(event='zmq.publisher', message='starting')
self.publisher = await aiozmq.create_zmq_stream(
zmq.PUB, bind='tcp://*:' + self.zmq_port)
while True:
data = await self.messages.get()
msg = (b'gerritstream', str(data[0]).encode('utf-8'),
str(data[1]).encode('utf-8'))
log.debug(event='zmq.publisher', message=msg)
self.publisher.write(msg)

async def zmq_keepalive(self):
"""
Keep-alive and rudimentary heartbeat (client will simply be informed
when the server dies and can do alternative syncs meanwhile)
"""
while True:
self.messages.put_nowait(['keepalive', 'ping'])
await asyncio.sleep(10)


if __name__ == '__main__':
print('override me!')
Loading

0 comments on commit efc970a

Please sign in to comment.