Skip to content

Add nats-subscribe for NATS PubSub messages #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions nats-subscribe/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM debian:bookworm-slim

WORKDIR /app

ADD requirements.txt .

# Doing all this with a debian:bookworm-slim image leads to a ~190MB image.
# With python:3.11 and "pip install -r requirements.txt" you get an image
# that's over 1GB. python:3.11-slim is missing gcc which is needed for ed25519,
# which is required for NKEY / JWT authentication support.
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
build-essential \
python3-dev \
python3-pip \
&& \
pip install --break-system-packages -r requirements.txt && \
apt-get purge -y --auto-remove --purge \
build-essential \
python3-dev \
&& \
find \
/var/cache/apt \
/var/cache/ldconfig \
/var/lib/apt/lists \
-mindepth 1 -delete && \
rm -rf /root/.cache

ADD main.py .

CMD ["python3", "main.py"]
18 changes: 18 additions & 0 deletions nats-subscribe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Usage

Listen on a NATS PubSub subject and send the webhook to a URL.

```
NATS_URL=nats://localhost:4222 SUBJECT=mysubject WEBHOOK=https://app.windmill.dev/api/w/github-sync-example/jobs/run/p/f/examples/query_postgres python3 nats-subscribe/main.py
```

To test, run the following in a shell:

```
NATS_URL=nats://localhost:4222 nats publish mysubject '{"foo": 42}'
```

Note that the subject of the received message will be included in an
`X-NATS-Subject` header. This can be useful in combination with Windmill's
[request headers](https://www.windmill.dev/docs/core_concepts/webhooks#request-headers)
functionality.
138 changes: 138 additions & 0 deletions nats-subscribe/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 Tiger Computing Ltd
# Author: Chris Boot <[email protected]>

"""
NATS PubSub to HTTP Webhook adapter.
"""

import asyncio
import logging
import os
import re
import ssl
import sys
from functools import partial
from types import SimpleNamespace

import aiohttp

import nats

DEFAULT_URL = "nats://localhost:4222"
DEFAULT_SUBJECT = "test"
DEFAULT_WEBHOOK = "http://localhost:8080"

cfg = SimpleNamespace()
log = logging.getLogger("nats_subscribe")


def configure() -> None:
envs = {
"DEBUG",
"NATS_URL",
"NATS_USER",
"NATS_PASSWORD",
"NATS_CREDS",
"NATS_NKEY",
"NATS_CERT",
"NATS_KEY",
"NATS_CA",
"NATS_TIMEOUT",
"SUBJECT",
"WEBHOOK",
}

for env in envs:
setattr(cfg, env.lower(), os.environ.get(env))

logging.basicConfig(
format="%(levelname)s: %(message)s",
level=logging.DEBUG if cfg.debug else logging.INFO,
)


async def nats_error_callback(ex: Exception) -> None:
log.error("nats: encountered error", exc_info=ex)
sys.exit(1)


async def connect() -> nats.NATS:
options = {
"servers": re.split(r"[, ]+", cfg.nats_url or DEFAULT_URL),
"user": cfg.nats_user,
"password": cfg.nats_password,
"user_credentials": cfg.nats_creds,
"nkeys_seed": cfg.nats_nkey,
}

if cfg.nats_cert or cfg.nats_key or cfg.nats_ca:
tls = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)

if cfg.nats_ca:
tls.load_verify_locations(cfg.nats_ca)

if cfg.nats_cert or cfg.nats_key:
tls.load_cert_chain(cfg.nats_cert, cfg.nats_key)

options["tls"] = tls

if cfg.nats_timeout is not None:
options["connect_timeout"] = int(cfg.nats_timeout)

return await nats.connect(
error_cb=nats_error_callback,
verbose=(cfg.debug is not None),
**options,
)


async def handle_message(
msg: nats.aio.msg.Msg,
session: aiohttp.ClientSession,
) -> None:
subject = msg.subject
data = msg.data.decode()

log.info(f"Received on '{subject}'")
log.info(data)

async with session.post(
cfg.webhook or DEFAULT_WEBHOOK,
data=msg.data,
headers={
"Content-Type": "application/json",
"X-NATS-Subject": subject,
},
) as resp:
log.info(f"Webhook status: {resp.status} {resp.reason}")

async for _ in resp.content.iter_chunks():
# just throw away the response data
pass


async def main() -> None:
configure()
nc = await connect()

async with aiohttp.ClientSession() as session:
sub = await nc.subscribe(
cfg.subject or DEFAULT_SUBJECT,
cb=partial(handle_message, session=session),
)
log.info(f"Subscribed on '{sub.subject}'")

try:
# Wait forever
await asyncio.wait_for(asyncio.Future(), timeout=None)
except asyncio.CancelledError:
pass
finally:
await nc.drain()


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions nats-subscribe/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
aiohttp[speedups] ~= 3.8
nats-py[nkeys] ~= 2.3