diff --git a/nats-subscribe/Dockerfile b/nats-subscribe/Dockerfile new file mode 100644 index 0000000..453a3c5 --- /dev/null +++ b/nats-subscribe/Dockerfile @@ -0,0 +1,31 @@ +FROM debian:bookworm-slim + +WORKDIR /app + +ADD requirements.txt . + +# Doing all this with a debian:bookworm-slim image leads to a ~190MB image. +# With python:3.11 and "pip install -r requirements.txt" you get an image +# that's over 1GB. python:3.11-slim is missing gcc which is needed for ed25519, +# which is required for NKEY / JWT authentication support. +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + build-essential \ + python3-dev \ + python3-pip \ + && \ + pip install --break-system-packages -r requirements.txt && \ + apt-get purge -y --auto-remove --purge \ + build-essential \ + python3-dev \ + && \ + find \ + /var/cache/apt \ + /var/cache/ldconfig \ + /var/lib/apt/lists \ + -mindepth 1 -delete && \ + rm -rf /root/.cache + +ADD main.py . + +CMD ["python3", "main.py"] diff --git a/nats-subscribe/README.md b/nats-subscribe/README.md new file mode 100644 index 0000000..d88493d --- /dev/null +++ b/nats-subscribe/README.md @@ -0,0 +1,18 @@ +# Usage + +Listen on a NATS PubSub subject and send the webhook to a URL. + +``` +NATS_URL=nats://localhost:4222 SUBJECT=mysubject WEBHOOK=https://app.windmill.dev/api/w/github-sync-example/jobs/run/p/f/examples/query_postgres python3 nats-subscribe/main.py +``` + +To test, run the following in a shell: + +``` +NATS_URL=nats://localhost:4222 nats publish mysubject '{"foo": 42}' +``` + +Note that the subject of the received message will be included in an +`X-NATS-Subject` header. This can be useful in combination with Windmill's +[request headers](https://www.windmill.dev/docs/core_concepts/webhooks#request-headers) +functionality. diff --git a/nats-subscribe/main.py b/nats-subscribe/main.py new file mode 100755 index 0000000..2ebe04e --- /dev/null +++ b/nats-subscribe/main.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 + +# Copyright 2023 Tiger Computing Ltd +# Author: Chris Boot + +""" +NATS PubSub to HTTP Webhook adapter. +""" + +import asyncio +import logging +import os +import re +import ssl +import sys +from functools import partial +from types import SimpleNamespace + +import aiohttp + +import nats + +DEFAULT_URL = "nats://localhost:4222" +DEFAULT_SUBJECT = "test" +DEFAULT_WEBHOOK = "http://localhost:8080" + +cfg = SimpleNamespace() +log = logging.getLogger("nats_subscribe") + + +def configure() -> None: + envs = { + "DEBUG", + "NATS_URL", + "NATS_USER", + "NATS_PASSWORD", + "NATS_CREDS", + "NATS_NKEY", + "NATS_CERT", + "NATS_KEY", + "NATS_CA", + "NATS_TIMEOUT", + "SUBJECT", + "WEBHOOK", + } + + for env in envs: + setattr(cfg, env.lower(), os.environ.get(env)) + + logging.basicConfig( + format="%(levelname)s: %(message)s", + level=logging.DEBUG if cfg.debug else logging.INFO, + ) + + +async def nats_error_callback(ex: Exception) -> None: + log.error("nats: encountered error", exc_info=ex) + sys.exit(1) + + +async def connect() -> nats.NATS: + options = { + "servers": re.split(r"[, ]+", cfg.nats_url or DEFAULT_URL), + "user": cfg.nats_user, + "password": cfg.nats_password, + "user_credentials": cfg.nats_creds, + "nkeys_seed": cfg.nats_nkey, + } + + if cfg.nats_cert or cfg.nats_key or cfg.nats_ca: + tls = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) + + if cfg.nats_ca: + tls.load_verify_locations(cfg.nats_ca) + + if cfg.nats_cert or cfg.nats_key: + tls.load_cert_chain(cfg.nats_cert, cfg.nats_key) + + options["tls"] = tls + + if cfg.nats_timeout is not None: + options["connect_timeout"] = int(cfg.nats_timeout) + + return await nats.connect( + error_cb=nats_error_callback, + verbose=(cfg.debug is not None), + **options, + ) + + +async def handle_message( + msg: nats.aio.msg.Msg, + session: aiohttp.ClientSession, +) -> None: + subject = msg.subject + data = msg.data.decode() + + log.info(f"Received on '{subject}'") + log.info(data) + + async with session.post( + cfg.webhook or DEFAULT_WEBHOOK, + data=msg.data, + headers={ + "Content-Type": "application/json", + "X-NATS-Subject": subject, + }, + ) as resp: + log.info(f"Webhook status: {resp.status} {resp.reason}") + + async for _ in resp.content.iter_chunks(): + # just throw away the response data + pass + + +async def main() -> None: + configure() + nc = await connect() + + async with aiohttp.ClientSession() as session: + sub = await nc.subscribe( + cfg.subject or DEFAULT_SUBJECT, + cb=partial(handle_message, session=session), + ) + log.info(f"Subscribed on '{sub.subject}'") + + try: + # Wait forever + await asyncio.wait_for(asyncio.Future(), timeout=None) + except asyncio.CancelledError: + pass + finally: + await nc.drain() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nats-subscribe/requirements.txt b/nats-subscribe/requirements.txt new file mode 100644 index 0000000..443d97f --- /dev/null +++ b/nats-subscribe/requirements.txt @@ -0,0 +1,2 @@ +aiohttp[speedups] ~= 3.8 +nats-py[nkeys] ~= 2.3